From 69ddc44c006d29436d8b5a912ac2386f6be460e0 Mon Sep 17 00:00:00 2001
From: Euler Taveira <euler.taveira@enterprisedb.com>
Date: Mon, 22 Nov 2021 15:02:19 -0300
Subject: [PATCH] Row filter for logical replication

This feature adds row filter for publication tables. When a publication is
defined or modified, rows that don't satisfy an optional WHERE clause will be
filtered out. This allows a database or set of tables to be partially
replicated. The row filter is per table. A new row filter can be added simply
by specifying a WHERE clause after the table name. The WHERE clause must be
enclosed by parentheses.

The WHERE clause should probably contain only columns that are part of the
primary key or that are covered by REPLICA IDENTITY. Otherwise, any UPDATEs and
DELETEs won't be replicated. For UPDATE and DELETE commands, it uses the old
row version (that is limited to primary key or REPLICA IDENTITY) to evaluate
the row filter.  INSERT uses the new row version to evaluate the row filter,
hence, you can use any column. If the row filter evaluates to NULL, it returns
false. The WHERE clause allows simple expressions. Simple expressions cannot
contain any aggregate or window functions, non-immutable functions,
user-defined types, operators or functions.  This restriction could possibly be
addressed in the future.

If you choose to do the initial table synchronization, only data that satisfies
the row filters is sent. If the subscription has several publications in which
a table has been published with different WHERE clauses, rows must satisfy any
expressions to be copied. In this case of different WHERE clauses, if one of
the expressions is not defined, rows are always replicated regardless of the
definition of the other expressions. If subscriber is a pre-15 version, data
synchronization won't use row filters if they are defined in the publisher.
Previous versions cannot handle row filters.

If your publication contains a partitioned table, the publication parameter
publish_via_partition_root determines if it uses the partition row filter (if
the parameter is false, the default) or the root partitioned table row filter.

Discussion: https://postgr.es/m/CAHE3wggb715X%2BmK_DitLXF25B%3DjE6xyNCH4YOwM860JR7HarGQ%40mail.gmail.com
---
 doc/src/sgml/catalogs.sgml                  |   8 +
 doc/src/sgml/ref/alter_publication.sgml     |  13 +-
 doc/src/sgml/ref/create_publication.sgml    |  35 +-
 doc/src/sgml/ref/create_subscription.sgml   |  23 +-
 src/backend/catalog/pg_publication.c        | 193 +++++++++-
 src/backend/commands/publicationcmds.c      |  76 +++-
 src/backend/nodes/copyfuncs.c               |   1 +
 src/backend/nodes/equalfuncs.c              |   1 +
 src/backend/parser/gram.y                   |  38 +-
 src/backend/parser/parse_agg.c              |   6 +
 src/backend/parser/parse_expr.c             |   6 +
 src/backend/parser/parse_func.c             |   4 +
 src/backend/replication/logical/tablesync.c | 118 +++++-
 src/backend/replication/pgoutput/pgoutput.c | 391 +++++++++++++++++++-
 src/bin/pg_dump/pg_dump.c                   |  24 +-
 src/bin/pg_dump/pg_dump.h                   |   1 +
 src/bin/psql/describe.c                     |  26 +-
 src/bin/psql/tab-complete.c                 |  27 +-
 src/include/catalog/pg_publication.h        |   3 +-
 src/include/catalog/pg_publication_rel.h    |   6 +
 src/include/nodes/parsenodes.h              |   1 +
 src/include/parser/parse_node.h             |   1 +
 src/test/regress/expected/publication.out   | 167 +++++++++
 src/test/regress/sql/publication.sql        |  94 +++++
 src/test/subscription/t/027_row_filter.pl   | 355 ++++++++++++++++++
 25 files changed, 1566 insertions(+), 52 deletions(-)
 mode change 100644 => 100755 src/backend/parser/gram.y
 create mode 100644 src/test/subscription/t/027_row_filter.pl

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index c1d11be73f..af6b1f684c 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -6299,6 +6299,14 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
        Reference to relation
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+      <structfield>prqual</structfield> <type>pg_node_tree</type>
+      </para>
+      <para>Expression tree (in <function>nodeToString()</function>
+      representation) for the relation's qualifying condition</para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml
index bb4ef5e5e2..5d9869c4f6 100644
--- a/doc/src/sgml/ref/alter_publication.sgml
+++ b/doc/src/sgml/ref/alter_publication.sgml
@@ -30,7 +30,7 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
 
 <phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
 
-    TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ]
+    TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ... ]
     ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
 </synopsis>
  </refsynopsisdiv>
@@ -52,7 +52,9 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
    remove one or more tables/schemas from the publication.  Note that adding
    tables/schemas to a publication that is already subscribed to will require a
    <literal>ALTER SUBSCRIPTION ... REFRESH PUBLICATION</literal> action on the
-   subscribing side in order to become effective.
+   subscribing side in order to become effective. Note also that the combination
+   of <literal>DROP</literal> with <literal>WHERE</literal> clause is not
+   allowed.
   </para>
 
   <para>
@@ -109,7 +111,12 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
       table name, only that table is affected.  If <literal>ONLY</literal> is not
       specified, the table and all its descendant tables (if any) are
       affected.  Optionally, <literal>*</literal> can be specified after the table
-      name to explicitly indicate that descendant tables are included.
+      name to explicitly indicate that descendant tables are included. If the
+      optional <literal>WHERE</literal> clause is specified, rows that do not
+      satisfy the <replaceable class="parameter">expression</replaceable> will
+      not be published. Note that parentheses are required around the
+      expression. The <replaceable class="parameter">expression</replaceable>
+      is evaluated with the role used for the replication connection.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml
index d805e8e77a..b21595f955 100644
--- a/doc/src/sgml/ref/create_publication.sgml
+++ b/doc/src/sgml/ref/create_publication.sgml
@@ -28,7 +28,7 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
 
 <phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
 
-    TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ]
+    TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ... ]
     ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
 </synopsis>
  </refsynopsisdiv>
@@ -76,6 +76,10 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
       This does not apply to a partitioned table, however.  The partitions of
       a partitioned table are always implicitly considered part of the
       publication, so they are never explicitly added to the publication.
+      If the optional <literal>WHERE</literal> clause is specified, rows that do 
+      not satisfy the <replaceable class="parameter">expression</replaceable> 
+      will not be published. Note that parentheses are required around the 
+      expression. It has no effect on <literal>TRUNCATE</literal> commands.
      </para>
 
      <para>
@@ -225,6 +229,23 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
    disallowed on those tables.
   </para>
 
+  <para>
+   The <literal>WHERE</literal> clause should contain only columns that are
+   part of the primary key or be covered by <literal>REPLICA
+   IDENTITY</literal> otherwise, <command>UPDATE</command> and
+   <command>DELETE</command> operations will not be replicated. That's because
+   old row is used and it only contains primary key or columns that are part of
+   the <literal>REPLICA IDENTITY</literal>; the remaining columns are
+   <literal>NULL</literal>. For <command>INSERT</command> operations, any column
+   might be used in the <literal>WHERE</literal> clause. New row is used and it
+   contains all columns. A <literal>NULL</literal> value causes the expression
+   to evaluate to false; avoid using columns without not-null constraints in the
+   <literal>WHERE</literal> clause. The <literal>WHERE</literal> clause allows
+   simple expressions. The simple expression cannot contain any aggregate or
+   window functions, non-immutable functions, user-defined types, operators or
+   functions.
+  </para>
+
   <para>
    For an <command>INSERT ... ON CONFLICT</command> command, the publication will
    publish the operation that actually results from the command.  So depending
@@ -247,6 +268,11 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
   <para>
    <acronym>DDL</acronym> operations are not published.
   </para>
+
+  <para>
+   The <literal>WHERE</literal> clause expression is executed with the role used
+   for the replication connection.
+  </para>
  </refsect1>
 
  <refsect1>
@@ -259,6 +285,13 @@ CREATE PUBLICATION mypublication FOR TABLE users, departments;
 </programlisting>
   </para>
 
+  <para>
+   Create a publication that publishes all changes from active departments:
+<programlisting>
+CREATE PUBLICATION active_departments FOR TABLE departments WHERE (active IS TRUE);
+</programlisting>
+  </para>
+
   <para>
    Create a publication that publishes all changes in all tables:
 <programlisting>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 990a41f1a1..4baa7fd781 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -208,6 +208,13 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
           that are being subscribed to when the replication starts.
           The default is <literal>true</literal>.
          </para>
+         <para>
+          If the publications contain conditional expressions, it will affect
+          what data is copied. Refer to the
+          <xref linkend="sql-createsubscription-notes" /> for details.
+          Row-filtering may also apply here and will affect what data is
+          copied. Refer to the Notes section below.
+         </para>
         </listitem>
        </varlistentry>
 
@@ -293,7 +300,7 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
   </variablelist>
  </refsect1>
 
- <refsect1>
+ <refsect1 id="sql-createsubscription-notes" xreflabel="Notes">
   <title>Notes</title>
 
   <para>
@@ -319,6 +326,20 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
    the parameter <literal>create_slot = false</literal>.  This is an
    implementation restriction that might be lifted in a future release.
   </para>
+
+  <para>
+   If any table in the publication has a <literal>WHERE</literal> clause, rows
+   that do not satisfy the <replaceable class="parameter">expression</replaceable>
+   will not be replicated. If the subscription has several publications in
+   which the same table has been published with different
+   <literal>WHERE</literal> clauses, a row will be replicated if any of the
+   expressions is satisfied. In this case of different <literal>WHERE</literal>
+   clauses, if one of the expressions is not defined, rows are always replicated
+   regardless of the definition of the other expressions. If the subscriber is a
+   <productname>PostgreSQL</productname> version before 15 then any row filtering
+   is ignored during the initial table synchronization phase.
+  </para>
+
  </refsect1>
 
  <refsect1>
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 63579b2f82..04e2270293 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -29,6 +29,7 @@
 #include "catalog/objectaddress.h"
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_namespace.h"
+#include "catalog/pg_proc.h"
 #include "catalog/pg_publication.h"
 #include "catalog/pg_publication_namespace.h"
 #include "catalog/pg_publication_rel.h"
@@ -36,6 +37,10 @@
 #include "commands/publicationcmds.h"
 #include "funcapi.h"
 #include "miscadmin.h"
+#include "nodes/nodeFuncs.h"
+#include "parser/parse_clause.h"
+#include "parser/parse_collate.h"
+#include "parser/parse_relation.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/catcache.h"
@@ -45,6 +50,12 @@
 #include "utils/rel.h"
 #include "utils/syscache.h"
 
+typedef struct RowFilterCheckInfo
+{
+	Relation	rel;
+	PublicationActions pubactions;
+} RowFilterCheckInfo;
+
 /*
  * Check if relation can be in given publication and throws appropriate
  * error if not.
@@ -108,6 +119,140 @@ check_publication_add_schema(Oid schemaid)
 				 errdetail("Temporary schemas cannot be replicated.")));
 }
 
+/*
+ * This routine checks if the row filter expression is a "simple expression".
+ * By "simple expression" it means:
+ *
+ * - simple or compound expressions;
+ *   Examples:
+ *     (Var Op Const)
+ *     (Var Op Var)
+ *     (Var Op Const) Bool (Var Op Const)
+ * - user-defined operators are not allowed;
+ * - user-defined types are not allowed;
+ * - user-defined functions are not allowed;
+ * - non-immutable builtin functions are not allowed.
+ *
+ * NOTES:
+ *
+ * User-defined functions, operators or types are not allowed because
+ * (a) if a user drops a user-defined object used in a row filter expression,
+ * the logical decoding infrastructure won't be able to recover from such pilot
+ * error even if the object is recreated again because a historic snapshot is
+ * used to execute the row filter.
+ * (b) a user-defined function can be used to access tables which could have
+ * unpleasant results because a historic snapshot is used. That's why only
+ * non-immutable functions are allowed in row filter expressions.
+ */
+static bool
+publication_row_filter_walker(Node *node, RowFilterCheckInfo *rfinfo)
+{
+	char	   *errormsg = NULL;
+
+	if (node == NULL)
+		return false;
+
+	if (IsA(node, Var))
+	{
+		Var		   *var = (Var *) node;
+		AttrNumber	attnum = var->varattno;
+		char		replident = rfinfo->rel->rd_rel->relreplident;
+
+		/*
+		 * Check replica identity if the publication allows UPDATE and/or
+		 * DELETE as DML operations. REPLICA IDENTITY FULL is OK since it
+		 * includes all columns in the old tuple.
+		 */
+		if ((rfinfo->pubactions.pubupdate || rfinfo->pubactions.pubdelete) &&
+			replident != REPLICA_IDENTITY_FULL)
+		{
+			Bitmapset  *bms_replident = NULL;
+
+			if (replident == REPLICA_IDENTITY_DEFAULT)
+				bms_replident = RelationGetIndexAttrBitmap(rfinfo->rel, INDEX_ATTR_BITMAP_PRIMARY_KEY);
+			else if (replident == REPLICA_IDENTITY_INDEX)
+				bms_replident = RelationGetIndexAttrBitmap(rfinfo->rel, INDEX_ATTR_BITMAP_IDENTITY_KEY);
+
+			/*
+			 * REPLICA IDENTITY NOTHING does not contain columns in the old
+			 * tuple so it is not supported. The referenced column must be
+			 * contained by REPLICA IDENTITY DEFAULT (primary key) or REPLICA
+			 * IDENTITY INDEX (index columns).
+			 */
+			if (replident == REPLICA_IDENTITY_NOTHING ||
+				!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber, bms_replident))
+			{
+				const char *colname = get_attname(RelationGetRelid(rfinfo->rel), attnum, false);
+
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+						 errmsg("cannot add relation \"%s\" to publication",
+								RelationGetRelationName(rfinfo->rel)),
+						 errdetail("Column \"%s\" used in the WHERE expression is not part of the replica identity.",
+								   colname)));
+			}
+
+			bms_free(bms_replident);
+		}
+		else if (var->vartype >= FirstNormalObjectId)
+		{
+			errormsg = _("User-defined types are not allowed.");
+		}
+	}
+	else if (IsA(node, List) || IsA(node, Const) || IsA(node, BoolExpr) || IsA(node, NullIfExpr) || IsA(node, NullTest) || IsA(node, BooleanTest) || IsA(node, CoalesceExpr) || IsA(node, CaseExpr) || IsA(node, CaseTestExpr) || IsA(node, MinMaxExpr) || IsA(node, ArrayExpr) || IsA(node, ScalarArrayOpExpr) || IsA(node, XmlExpr))
+	{
+		/* nodes are part of simple expressions */
+	}
+	else if (IsA(node, OpExpr))
+	{
+		if (((OpExpr *) node)->opno >= FirstNormalObjectId)
+			errormsg = _("User-defined operators are not allowed.");
+	}
+	else if (IsA(node, FuncExpr))
+	{
+		Oid			funcid = ((FuncExpr *) node)->funcid;
+		const char *funcname = get_func_name(funcid);
+
+		if (funcid >= FirstNormalObjectId)
+			errormsg = psprintf(_("User-defined functions are not allowed (%s)."), funcname);
+		else if (func_volatile(funcid) != PROVOLATILE_IMMUTABLE)
+			errormsg = psprintf(_("Non-immutable functions are not allowed (%s)."), funcname);
+	}
+	else
+	{
+		ereport(ERROR,
+				(errmsg("invalid publication WHERE expression for relation \"%s\"",
+						RelationGetRelationName(rfinfo->rel)),
+				 errdetail("Expressions only allows columns, constants and some builtin functions and operators.")));
+	}
+
+	if (errormsg)
+		ereport(ERROR,
+				(errmsg("invalid publication WHERE expression for relation \"%s\"",
+						RelationGetRelationName(rfinfo->rel)),
+				 errdetail("%s", errormsg)));
+
+	return expression_tree_walker(node, publication_row_filter_walker, (void *) rfinfo);
+}
+
+/*
+ * Validate the row filter with the following rules:
+ * (a) few node types are allowed in the expression. See the function
+ * publication_row_filter_walker for details.
+ * (b) If the publication publishes UPDATE and/or DELETE operations, all
+ * columns used in the row filter must be contained in the replica identity.
+ */
+static void
+check_publication_row_filter(PublicationActions pubactions, Relation rel, Node *rfnode)
+{
+	RowFilterCheckInfo rfinfo = {0};
+
+	rfinfo.rel = rel;
+	rfinfo.pubactions = pubactions;
+
+	publication_row_filter_walker(rfnode, &rfinfo);
+}
+
 /*
  * Returns if relation represented by oid and Form_pg_class entry
  * is publishable.
@@ -257,18 +402,22 @@ GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
  * Insert new publication / relation mapping.
  */
 ObjectAddress
-publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
+publication_add_relation(Oid pubid, PublicationRelInfo *pri,
 						 bool if_not_exists)
 {
 	Relation	rel;
 	HeapTuple	tup;
 	Datum		values[Natts_pg_publication_rel];
 	bool		nulls[Natts_pg_publication_rel];
-	Oid			relid = RelationGetRelid(targetrel->relation);
+	Relation	targetrel = pri->relation;
+	Oid			relid = RelationGetRelid(targetrel);
 	Oid			prrelid;
 	Publication *pub = GetPublication(pubid);
 	ObjectAddress myself,
 				referenced;
+	ParseState *pstate;
+	ParseNamespaceItem *nsitem;
+	Node	   *whereclause = NULL;
 	List	   *relids = NIL;
 
 	rel = table_open(PublicationRelRelationId, RowExclusiveLock);
@@ -289,10 +438,33 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 		ereport(ERROR,
 				(errcode(ERRCODE_DUPLICATE_OBJECT),
 				 errmsg("relation \"%s\" is already member of publication \"%s\"",
-						RelationGetRelationName(targetrel->relation), pub->name)));
+						RelationGetRelationName(targetrel), pub->name)));
 	}
 
-	check_publication_add_relation(targetrel->relation);
+	check_publication_add_relation(targetrel);
+
+	if (pri->whereClause != NULL)
+	{
+		/* Set up a ParseState to parse with */
+		pstate = make_parsestate(NULL);
+		pstate->p_sourcetext = nodeToString(pri->whereClause);
+
+		nsitem = addRangeTableEntryForRelation(pstate, targetrel,
+											   AccessShareLock,
+											   NULL, false, false);
+		addNSItemToQuery(pstate, nsitem, false, true, true);
+
+		whereclause = transformWhereClause(pstate,
+										   copyObject(pri->whereClause),
+										   EXPR_KIND_PUBLICATION_WHERE,
+										   "PUBLICATION");
+
+		/* Validate row filter expression */
+		check_publication_row_filter(pub->pubactions, targetrel, whereclause);
+
+		/* Fix up collation information */
+		assign_expr_collations(pstate, whereclause);
+	}
 
 	/* Form a tuple. */
 	memset(values, 0, sizeof(values));
@@ -306,6 +478,12 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 	values[Anum_pg_publication_rel_prrelid - 1] =
 		ObjectIdGetDatum(relid);
 
+	/* Add qualifications, if available */
+	if (whereclause)
+		values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(whereclause));
+	else
+		nulls[Anum_pg_publication_rel_prqual - 1] = true;
+
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
 	/* Insert tuple into catalog. */
@@ -322,6 +500,13 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 	ObjectAddressSet(referenced, RelationRelationId, relid);
 	recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
 
+	/* Add dependency on the objects mentioned in the qualifications */
+	if (whereclause)
+	{
+		recordDependencyOnExpr(&myself, whereclause, pstate->p_rtable, DEPENDENCY_NORMAL);
+		free_parsestate(pstate);
+	}
+
 	/* Close the table. */
 	table_close(rel, RowExclusiveLock);
 
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 7d4a0e95f6..bade24f12a 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -529,40 +529,64 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
 		List	   *delrels = NIL;
 		ListCell   *oldlc;
 
+		/*
+		 * Check if the relation is member of the existing schema in the
+		 * publication or member of the schema list specified.
+		 */
 		CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
 											  PUBLICATIONOBJ_TABLE);
 
-		/* Calculate which relations to drop. */
+		/*
+		 * Remove tables that are not found in the new table list. Remove
+		 * tables that are being re-added with a different qual expression
+		 * (including a table that has no qual expression) because simply
+		 * updating the existing tuple is not enough due to qual expression
+		 * dependencies.
+		 */
 		foreach(oldlc, oldrelids)
 		{
 			Oid			oldrelid = lfirst_oid(oldlc);
+			PublicationRelInfo *oldrel;
 			ListCell   *newlc;
 			bool		found = false;
+			HeapTuple	rftuple;
+			bool		rfisnull = true;	/* default if tuple is not found */
+
+			rftuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(oldrelid),
+									  ObjectIdGetDatum(pubid));
+			if (HeapTupleIsValid(rftuple))
+			{
+				SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, Anum_pg_publication_rel_prqual,
+								&rfisnull);
+				ReleaseSysCache(rftuple);
+			}
 
 			foreach(newlc, rels)
 			{
 				PublicationRelInfo *newpubrel;
 
 				newpubrel = (PublicationRelInfo *) lfirst(newlc);
-				if (RelationGetRelid(newpubrel->relation) == oldrelid)
+
+				/*
+				 * Keep the table iif old and new table has no qual
+				 * expression. Otherwise, this table will be included in the
+				 * deletion list below.
+				 */
+				if (oldrelid == RelationGetRelid(newpubrel->relation) &&
+					newpubrel->whereClause == NULL && rfisnull)
 				{
 					found = true;
 					break;
 				}
 			}
-			/* Not yet in the list, open it and add to the list */
+
 			if (!found)
 			{
-				Relation	oldrel;
-				PublicationRelInfo *pubrel;
-
-				/* Wrap relation into PublicationRelInfo */
-				oldrel = table_open(oldrelid, ShareUpdateExclusiveLock);
-
-				pubrel = palloc(sizeof(PublicationRelInfo));
-				pubrel->relation = oldrel;
-
-				delrels = lappend(delrels, pubrel);
+				oldrel = palloc(sizeof(PublicationRelInfo));
+				oldrel->whereClause = NULL;
+				oldrel->relation = table_open(oldrelid,
+											  ShareUpdateExclusiveLock);
+				delrels = lappend(delrels, oldrel);
 			}
 		}
 
@@ -899,6 +923,7 @@ OpenTableList(List *tables)
 	List	   *relids = NIL;
 	List	   *rels = NIL;
 	ListCell   *lc;
+	List	   *relids_with_rf = NIL;
 
 	/*
 	 * Open, share-lock, and check all the explicitly-specified relations
@@ -926,15 +951,30 @@ OpenTableList(List *tables)
 		 */
 		if (list_member_oid(relids, myrelid))
 		{
+			char	   *relname = pstrdup(RelationGetRelationName(rel));
+
 			table_close(rel, ShareUpdateExclusiveLock);
+
+			/* Disallow duplicate tables if there are any WHERE clauses. */
+			if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
+				ereport(ERROR,
+						(errcode(ERRCODE_DUPLICATE_OBJECT),
+						 errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
+								relname)));
+
+			pfree(relname);
 			continue;
 		}
 
 		pub_rel = palloc(sizeof(PublicationRelInfo));
 		pub_rel->relation = rel;
+		pub_rel->whereClause = t->whereClause;
 		rels = lappend(rels, pub_rel);
 		relids = lappend_oid(relids, myrelid);
 
+		if (t->whereClause)
+			relids_with_rf = lappend_oid(relids_with_rf, myrelid);
+
 		/*
 		 * Add children of this rel, if requested, so that they too are added
 		 * to the publication.  A partitioned table can't have any inheritance
@@ -967,6 +1007,8 @@ OpenTableList(List *tables)
 				rel = table_open(childrelid, NoLock);
 				pub_rel = palloc(sizeof(PublicationRelInfo));
 				pub_rel->relation = rel;
+				/* child inherits WHERE clause from parent */
+				pub_rel->whereClause = t->whereClause;
 				rels = lappend(rels, pub_rel);
 				relids = lappend_oid(relids, childrelid);
 			}
@@ -974,6 +1016,7 @@ OpenTableList(List *tables)
 	}
 
 	list_free(relids);
+	list_free(relids_with_rf);
 
 	return rels;
 }
@@ -993,6 +1036,8 @@ CloseTableList(List *rels)
 		pub_rel = (PublicationRelInfo *) lfirst(lc);
 		table_close(pub_rel->relation, NoLock);
 	}
+
+	list_free_deep(rels);
 }
 
 /*
@@ -1088,6 +1133,11 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
 							RelationGetRelationName(rel))));
 		}
 
+		if (pubrel->whereClause)
+			ereport(ERROR,
+					(errcode(ERRCODE_SYNTAX_ERROR),
+					 errmsg("cannot use a WHERE clause when removing a table from publication")));
+
 		ObjectAddressSet(obj, PublicationRelRelationId, prid);
 		performDeletion(&obj, DROP_CASCADE, 0);
 	}
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 297b6ee715..be9c1fbf32 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4832,6 +4832,7 @@ _copyPublicationTable(const PublicationTable *from)
 	PublicationTable *newnode = makeNode(PublicationTable);
 
 	COPY_NODE_FIELD(relation);
+	COPY_NODE_FIELD(whereClause);
 
 	return newnode;
 }
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index f537d3eb96..57764470dc 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -2312,6 +2312,7 @@ static bool
 _equalPublicationTable(const PublicationTable *a, const PublicationTable *b)
 {
 	COMPARE_NODE_FIELD(relation);
+	COMPARE_NODE_FIELD(whereClause);
 
 	return true;
 }
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
old mode 100644
new mode 100755
index 86ce33bd97..c9ccbf31af
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9654,12 +9654,13 @@ CreatePublicationStmt:
  * relation_expr here.
  */
 PublicationObjSpec:
-			TABLE relation_expr
+			TABLE relation_expr OptWhereClause
 				{
 					$$ = makeNode(PublicationObjSpec);
 					$$->pubobjtype = PUBLICATIONOBJ_TABLE;
 					$$->pubtable = makeNode(PublicationTable);
 					$$->pubtable->relation = $2;
+					$$->pubtable->whereClause = $3;
 				}
 			| ALL TABLES IN_P SCHEMA ColId
 				{
@@ -9674,28 +9675,45 @@ PublicationObjSpec:
 					$$->pubobjtype = PUBLICATIONOBJ_TABLE_IN_CUR_SCHEMA;
 					$$->location = @5;
 				}
-			| ColId
+			| ColId OptWhereClause
 				{
 					$$ = makeNode(PublicationObjSpec);
 					$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
-					$$->name = $1;
+					if ($2)
+					{
+						/*
+						 * The OptWhereClause must be stored here but it is
+						 * valid only for tables. If the ColId was mistakenly
+						 * not a table this will be detected later in
+						 * preprocess_pubobj_list() and an error is thrown.
+						 */
+						$$->pubtable = makeNode(PublicationTable);
+						$$->pubtable->relation = makeRangeVar(NULL, $1, @1);
+						$$->pubtable->whereClause = $2;
+					}
+					else
+					{
+						$$->name = $1;
+					}
 					$$->location = @1;
 				}
-			| ColId indirection
+			| ColId indirection OptWhereClause
 				{
 					$$ = makeNode(PublicationObjSpec);
 					$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
 					$$->pubtable = makeNode(PublicationTable);
 					$$->pubtable->relation = makeRangeVarFromQualifiedName($1, $2, @1, yyscanner);
+					$$->pubtable->whereClause = $3;
 					$$->location = @1;
 				}
 			/* grammar like tablename * , ONLY tablename, ONLY ( tablename ) */
-			| extended_relation_expr
+			| extended_relation_expr OptWhereClause
 				{
 					$$ = makeNode(PublicationObjSpec);
 					$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
 					$$->pubtable = makeNode(PublicationTable);
 					$$->pubtable->relation = $1;
+					$$->pubtable->whereClause = $2;
 				}
 			| CURRENT_SCHEMA
 				{
@@ -17343,7 +17361,8 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner)
 						errcode(ERRCODE_SYNTAX_ERROR),
 						errmsg("invalid table name at or near"),
 						parser_errposition(pubobj->location));
-			else if (pubobj->name)
+
+			if (pubobj->name)
 			{
 				/* convert it to PublicationTable */
 				PublicationTable *pubtable = makeNode(PublicationTable);
@@ -17356,6 +17375,13 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner)
 		else if (pubobj->pubobjtype == PUBLICATIONOBJ_TABLE_IN_SCHEMA ||
 				 pubobj->pubobjtype == PUBLICATIONOBJ_TABLE_IN_CUR_SCHEMA)
 		{
+			/* WHERE clause is not allowed on a schema object */
+			if (pubobj->pubtable && pubobj->pubtable->whereClause)
+				ereport(ERROR,
+						errcode(ERRCODE_SYNTAX_ERROR),
+						errmsg("WHERE clause for schema not allowed"),
+						parser_errposition(pubobj->location));
+
 			/*
 			 * We can distinguish between the different type of schema
 			 * objects based on whether name and pubtable is set.
diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c
index 7d829a05a9..7388bbdbd4 100644
--- a/src/backend/parser/parse_agg.c
+++ b/src/backend/parser/parse_agg.c
@@ -551,6 +551,9 @@ check_agglevels_and_constraints(ParseState *pstate, Node *expr)
 				err = _("grouping operations are not allowed in COPY FROM WHERE conditions");
 
 			break;
+		case EXPR_KIND_PUBLICATION_WHERE:
+			/* okay (see function publication_row_filter_walker) */
+			break;
 
 		case EXPR_KIND_CYCLE_MARK:
 			errkind = true;
@@ -943,6 +946,9 @@ transformWindowFuncCall(ParseState *pstate, WindowFunc *wfunc,
 		case EXPR_KIND_CYCLE_MARK:
 			errkind = true;
 			break;
+		case EXPR_KIND_PUBLICATION_WHERE:
+			/* okay (see function publication_row_filter_walker) */
+			break;
 
 			/*
 			 * There is intentionally no default: case here, so that the
diff --git a/src/backend/parser/parse_expr.c b/src/backend/parser/parse_expr.c
index 2d1a477154..f1bb01c80d 100644
--- a/src/backend/parser/parse_expr.c
+++ b/src/backend/parser/parse_expr.c
@@ -504,6 +504,7 @@ transformColumnRef(ParseState *pstate, ColumnRef *cref)
 		case EXPR_KIND_COPY_WHERE:
 		case EXPR_KIND_GENERATED_COLUMN:
 		case EXPR_KIND_CYCLE_MARK:
+		case EXPR_KIND_PUBLICATION_WHERE:
 			/* okay */
 			break;
 
@@ -1764,6 +1765,9 @@ transformSubLink(ParseState *pstate, SubLink *sublink)
 		case EXPR_KIND_GENERATED_COLUMN:
 			err = _("cannot use subquery in column generation expression");
 			break;
+		case EXPR_KIND_PUBLICATION_WHERE:
+			/* okay (see function publication_row_filter_walker) */
+			break;
 
 			/*
 			 * There is intentionally no default: case here, so that the
@@ -3084,6 +3088,8 @@ ParseExprKindName(ParseExprKind exprKind)
 			return "GENERATED AS";
 		case EXPR_KIND_CYCLE_MARK:
 			return "CYCLE";
+		case EXPR_KIND_PUBLICATION_WHERE:
+			return "publication expression";
 
 			/*
 			 * There is intentionally no default: case here, so that the
diff --git a/src/backend/parser/parse_func.c b/src/backend/parser/parse_func.c
index 542f9167aa..ffdf86fc7e 100644
--- a/src/backend/parser/parse_func.c
+++ b/src/backend/parser/parse_func.c
@@ -2655,6 +2655,10 @@ check_srf_call_placement(ParseState *pstate, Node *last_srf, int location)
 		case EXPR_KIND_CYCLE_MARK:
 			errkind = true;
 			break;
+		case EXPR_KIND_PUBLICATION_WHERE:
+			/* okay (see function publication_row_filter_walker) */
+			pstate->p_hasTargetSRFs = true;
+			break;
 
 			/*
 			 * There is intentionally no default: case here, so that the
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index f07983a43c..9041847087 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -688,19 +688,23 @@ copy_read_data(void *outbuf, int minread, int maxread)
 
 /*
  * Get information about remote relation in similar fashion the RELATION
- * message provides during replication.
+ * message provides during replication. This function also returns the relation
+ * qualifications to be used in COPY command.
  */
 static void
 fetch_remote_table_info(char *nspname, char *relname,
-						LogicalRepRelation *lrel)
+						LogicalRepRelation *lrel, List **qual)
 {
 	WalRcvExecResult *res;
 	StringInfoData cmd;
 	TupleTableSlot *slot;
 	Oid			tableRow[] = {OIDOID, CHAROID, CHAROID};
 	Oid			attrRow[] = {TEXTOID, OIDOID, BOOLOID};
+	Oid			qualRow[] = {TEXTOID};
 	bool		isnull;
 	int			natt;
+	ListCell   *lc;
+	bool		first;
 
 	lrel->nspname = nspname;
 	lrel->relname = relname;
@@ -796,6 +800,82 @@ fetch_remote_table_info(char *nspname, char *relname,
 	lrel->natts = natt;
 
 	walrcv_clear_result(res);
+
+	/*
+	 * Get relation qual. DISTINCT avoids the same expression of a table in
+	 * multiple publications from being included multiple times in the final
+	 * expression.
+	 */
+	if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
+	{
+		resetStringInfo(&cmd);
+		appendStringInfo(&cmd,
+						 "SELECT DISTINCT pg_get_expr(prqual, prrelid) "
+						 "  FROM pg_publication p "
+						 "  INNER JOIN pg_publication_rel pr "
+						 "       ON (p.oid = pr.prpubid) "
+						 " WHERE pr.prrelid = %u "
+						 "   AND p.pubname IN (", lrel->remoteid);
+
+		first = true;
+		foreach(lc, MySubscription->publications)
+		{
+			char	   *pubname = strVal(lfirst(lc));
+
+			if (first)
+				first = false;
+			else
+				appendStringInfoString(&cmd, ", ");
+
+			appendStringInfoString(&cmd, quote_literal_cstr(pubname));
+		}
+		appendStringInfoChar(&cmd, ')');
+
+		res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
+
+		if (res->status != WALRCV_OK_TUPLES)
+			ereport(ERROR,
+					(errmsg("could not fetch relation qualifications for table \"%s.%s\" from publisher: %s",
+							nspname, relname, res->err)));
+
+		/*
+		 * Multiple row filter expressions for the same table will be combined
+		 * by COPY using OR. If one of the multiple row filter expressions in
+		 * this table has no filter, it means the whole table will be copied.
+		 * Hence, it is not required to inform an unified row filter
+		 * expression at all.
+		 */
+		slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+		while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+		{
+			Datum		rf = slot_getattr(slot, 1, &isnull);
+
+			/*
+			 * One entry without a row filter expression means clean up
+			 * previous expressions (if there is any) and return with no
+			 * expressions.
+			 */
+			if (isnull)
+			{
+				if (*qual)
+				{
+					list_free(*qual);
+					*qual = NIL;
+				}
+				ExecClearTuple(slot);
+				break;
+			}
+			else
+			{
+				*qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
+				ExecClearTuple(slot);
+			}
+		}
+		ExecDropSingleTupleTableSlot(slot);
+
+		walrcv_clear_result(res);
+	}
+
 	pfree(cmd.data);
 }
 
@@ -809,6 +889,7 @@ copy_table(Relation rel)
 {
 	LogicalRepRelMapEntry *relmapentry;
 	LogicalRepRelation lrel;
+	List	   *qual = NIL;
 	WalRcvExecResult *res;
 	StringInfoData cmd;
 	CopyFromState cstate;
@@ -817,7 +898,7 @@ copy_table(Relation rel)
 
 	/* Get the publisher relation info. */
 	fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
-							RelationGetRelationName(rel), &lrel);
+							RelationGetRelationName(rel), &lrel, &qual);
 
 	/* Put the relation into relmap. */
 	logicalrep_relmap_update(&lrel);
@@ -828,14 +909,18 @@ copy_table(Relation rel)
 
 	/* Start copy on the publisher. */
 	initStringInfo(&cmd);
-	if (lrel.relkind == RELKIND_RELATION)
+
+	/* Regular table with no row filter */
+	if (lrel.relkind == RELKIND_RELATION && qual == NIL)
 		appendStringInfo(&cmd, "COPY %s TO STDOUT",
 						 quote_qualified_identifier(lrel.nspname, lrel.relname));
 	else
 	{
 		/*
 		 * For non-tables, we need to do COPY (SELECT ...), but we can't just
-		 * do SELECT * because we need to not copy generated columns.
+		 * do SELECT * because we need to not copy generated columns. For
+		 * tables with any row filters, build a SELECT query with OR'ed row
+		 * filters for COPY.
 		 */
 		appendStringInfoString(&cmd, "COPY (SELECT ");
 		for (int i = 0; i < lrel.natts; i++)
@@ -844,8 +929,29 @@ copy_table(Relation rel)
 			if (i < lrel.natts - 1)
 				appendStringInfoString(&cmd, ", ");
 		}
-		appendStringInfo(&cmd, " FROM %s) TO STDOUT",
+		appendStringInfo(&cmd, " FROM %s",
 						 quote_qualified_identifier(lrel.nspname, lrel.relname));
+		/* list of OR'ed filters */
+		if (qual != NIL)
+		{
+			ListCell   *lc;
+			bool		first = true;
+
+			appendStringInfoString(&cmd, " WHERE ");
+			foreach(lc, qual)
+			{
+				char	   *q = strVal(lfirst(lc));
+
+				if (first)
+					first = false;
+				else
+					appendStringInfoString(&cmd, " OR ");
+				appendStringInfoString(&cmd, q);
+			}
+			list_free_deep(qual);
+		}
+
+		appendStringInfoString(&cmd, ") TO STDOUT");
 	}
 	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
 	pfree(cmd.data);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 6f6a203dea..cfd41526d4 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -13,18 +13,28 @@
 #include "postgres.h"
 
 #include "access/tupconvert.h"
+#include "access/xact.h"
 #include "catalog/partition.h"
 #include "catalog/pg_publication.h"
+#include "catalog/pg_publication_rel.h"
 #include "commands/defrem.h"
+#include "executor/executor.h"
 #include "fmgr.h"
+#include "nodes/nodeFuncs.h"
+#include "nodes/makefuncs.h"
+#include "optimizer/optimizer.h"
+#include "parser/parse_coerce.h"
 #include "replication/logical.h"
 #include "replication/logicalproto.h"
+#include "replication/logicalrelation.h"
 #include "replication/origin.h"
 #include "replication/pgoutput.h"
+#include "utils/builtins.h"
 #include "utils/int8.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/snapmgr.h"
 #include "utils/syscache.h"
 #include "utils/varlena.h"
 
@@ -115,6 +125,17 @@ typedef struct RelationSyncEntry
 	bool		replicate_valid;
 	PublicationActions pubactions;
 
+	/*
+	 * ExprState cannot be used to indicate no cache, invalid cache and valid
+	 * cache. The flag exprstate_valid indicates if the current cache is
+	 * valid. Multiple ExprState entries might be used if there are multiple
+	 * publications for a single table. Different publication actions don't
+	 * allow multiple expressions to always be combined into one.
+	 */
+	bool		exprstate_valid;
+	ExprState  *exprstate[3];	/* ExprState array for row filter */
+	TupleTableSlot *scantuple;	/* tuple table slot for row filter */
+
 	/*
 	 * OID of the relation to publish changes as.  For a partition, this may
 	 * be set to one of its ancestors whose schema will be used when
@@ -137,7 +158,7 @@ static HTAB *RelationSyncCache = NULL;
 
 static void init_rel_sync_cache(MemoryContext decoding_context);
 static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
-static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
+static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Relation relation);
 static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
 										  uint32 hashvalue);
@@ -146,6 +167,13 @@ static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
 static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
 											TransactionId xid);
 
+/* row filter routines */
+static EState *create_estate_for_relation(Relation rel);
+static ExprState *pgoutput_row_filter_init_expr(Node *rfnode);
+static bool pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext);
+static bool pgoutput_row_filter(PGOutputData *data, Relation relation, HeapTuple oldtuple,
+								HeapTuple newtuple, RelationSyncEntry *entry, int action);
+
 /*
  * Specify output plugin callbacks
  */
@@ -620,6 +648,305 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
 	OutputPluginWrite(ctx, false);
 }
 
+/*
+ * Executor state preparation for evaluation of row filter expressions for the
+ * specified relation.
+ */
+static EState *
+create_estate_for_relation(Relation rel)
+{
+	EState	   *estate;
+	RangeTblEntry *rte;
+
+	estate = CreateExecutorState();
+
+	rte = makeNode(RangeTblEntry);
+	rte->rtekind = RTE_RELATION;
+	rte->relid = RelationGetRelid(rel);
+	rte->relkind = rel->rd_rel->relkind;
+	rte->rellockmode = AccessShareLock;
+	ExecInitRangeTable(estate, list_make1(rte));
+
+	estate->es_output_cid = GetCurrentCommandId(false);
+
+	return estate;
+}
+
+/*
+ * Initialize for row filter expression execution.
+ */
+static ExprState *
+pgoutput_row_filter_init_expr(Node *rfnode)
+{
+	ExprState  *exprstate;
+	Oid			exprtype;
+	Expr	   *expr;
+	MemoryContext oldctx;
+
+	/* Prepare expression for execution */
+	exprtype = exprType(rfnode);
+	expr = (Expr *) coerce_to_target_type(NULL, rfnode, exprtype, BOOLOID, -1, COERCION_ASSIGNMENT, COERCE_IMPLICIT_CAST, -1);
+
+	if (expr == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_CANNOT_COERCE),
+				 errmsg("row filter returns type %s that cannot be coerced to the expected type %s",
+						format_type_be(exprtype),
+						format_type_be(BOOLOID)),
+				 errhint("You will need to rewrite the row filter.")));
+
+	/*
+	 * Cache ExprState using CacheMemoryContext. This is the same code as
+	 * ExecPrepareExpr() but that is not used because it doesn't use an
+	 * EState. It should probably be another function in the executor to
+	 * handle the execution outside a normal Plan tree context.
+	 */
+	oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+	expr = expression_planner(expr);
+	exprstate = ExecInitExpr(expr, NULL);
+	MemoryContextSwitchTo(oldctx);
+
+	return exprstate;
+}
+
+/*
+ * Evaluates row filter.
+ *
+ * If the row filter evaluates to NULL, it is taken as false i.e. the change
+ * isn't replicated.
+ */
+static bool
+pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
+{
+	Datum		ret;
+	bool		isnull;
+
+	Assert(state != NULL);
+
+	ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
+
+	elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
+		 DatumGetBool(ret) ? "true" : "false",
+		 isnull ? "true" : "false");
+
+	if (isnull)
+		return false;
+
+	return DatumGetBool(ret);
+}
+
+/*
+ * Change is checked against the row filter, if any.
+ *
+ * If it returns true, the change is replicated, otherwise, it is not.
+ */
+static bool
+pgoutput_row_filter(PGOutputData *data, Relation relation, HeapTuple oldtuple, HeapTuple newtuple, RelationSyncEntry *entry, int action)
+{
+	EState	   *estate;
+	ExprContext *ecxt;
+	ListCell   *lc;
+	bool		result = false;
+	Oid			relid = RelationGetRelid(relation);
+
+	/*
+	 * The flag exprstate_valid indicates the row filter cache state. An
+	 * invalid state means there is no cache yet or there is no row filter for
+	 * this relation.
+	 *
+	 * Once the row filter is cached, it will be executed again only if there
+	 * is a relation entry invalidation. Hence, it seems fine to cache it
+	 * here.
+	 *
+	 * This code was not added to function get_rel_sync_entry() to avoid
+	 * updating the cache even if it was not changed. Besides that, it
+	 * postpones caching the expression near to the moment it will be used. It
+	 * means that it won't waste cycles in changes (such as truncate) that
+	 * won't use it or code paths that will eventually bail out without using
+	 * this cache.
+	 */
+	if (!entry->exprstate_valid)
+	{
+		MemoryContext oldctx;
+		TupleDesc	tupdesc = RelationGetDescr(relation);
+		List	   *rfnodes[3] = {NIL, NIL, NIL};	/* one Node List per
+													 * publication operation */
+		bool		rf_in_all_pubs[3] = {true, true, true}; /* row filter in all
+															 * publications? */
+		Node	   *rfnode;
+		int			i;
+
+		/*
+		 * Create a tuple table slot for row filter. TupleDesc must live as
+		 * long as the cache remains. Release the tuple table slot if it
+		 * already exists.
+		 */
+		if (entry->scantuple != NULL)
+		{
+			ExecDropSingleTupleTableSlot(entry->scantuple);
+			entry->scantuple = NULL;
+		}
+		oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+		tupdesc = CreateTupleDescCopy(tupdesc);
+		entry->scantuple = MakeSingleTupleTableSlot(tupdesc, &TTSOpsHeapTuple);
+		MemoryContextSwitchTo(oldctx);
+
+		/*
+		 * NOTE: If the relation is a partition and pubviaroot is true, use
+		 * the row filter of the topmost partitioned table instead of the row
+		 * filter of its own partition.
+		 *
+		 * Multiple publications might have multiple row filters for this
+		 * relation. Since row filter usage depends on the DML operation,
+		 * there are multiple lists (one for each operation) which row filters
+		 * will be appended.
+		 */
+		foreach(lc, data->publications)
+		{
+			Publication *pub = lfirst(lc);
+			HeapTuple	rftuple;
+			Datum		rfdatum;
+			bool		rfisnull;
+
+			/*
+			 * Append multiple row filters according to the DML operation. If
+			 * an entry does not have a row filter, remember this information
+			 * (rf_in_all_pubs).  It is used to discard all row filter
+			 * expressions for that DML operation and, as a result, bail out
+			 * through a fast path before initializing the state to process
+			 * the row filter expression.
+			 */
+			rftuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(entry->publish_as_relid), ObjectIdGetDatum(pub->oid));
+			if (HeapTupleIsValid(rftuple))
+			{
+				rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, Anum_pg_publication_rel_prqual, &rfisnull);
+
+				if (rfisnull)
+				{
+					if (pub->pubactions.pubinsert)
+						rf_in_all_pubs[0] = false;	/* INSERT */
+					if (pub->pubactions.pubupdate)
+						rf_in_all_pubs[1] = false;	/* UPDATE */
+					if (pub->pubactions.pubdelete)
+						rf_in_all_pubs[2] = false;	/* DELETE */
+				}
+				else
+				{
+					rfnode = stringToNode(TextDatumGetCString(rfdatum));
+					if (pub->pubactions.pubinsert)
+						rfnodes[0] = lappend(rfnodes[0], rfnode);	/* INSERT */
+					if (pub->pubactions.pubupdate)
+						rfnodes[1] = lappend(rfnodes[1], rfnode);	/* UPDATE */
+					if (pub->pubactions.pubdelete)
+						rfnodes[2] = lappend(rfnodes[2], rfnode);	/* DELETE */
+				}
+
+				ReleaseSysCache(rftuple);
+			}
+		}
+
+		/*
+		 * For each publication operation stores a single row filter
+		 * expression. This expression might be used or not depending on the
+		 * rf_in_all_pubs value.
+		 */
+		for (i = 0; i < 3; i++)
+		{
+			int			n;
+
+			/*
+			 * All row filter expressions will be discarded if there is one
+			 * publication-relation entry without a row filter. That's because
+			 * all expressions are aggregated by the OR operator. The row
+			 * filter absence means replicate all rows so a single valid
+			 * expression means publish this row.
+			 */
+			if (!rf_in_all_pubs[i])
+			{
+				list_free(rfnodes[i]);
+				entry->exprstate[i] = NULL;
+				continue;
+			}
+
+			n = list_length(rfnodes[i]);
+			if (n == 1)
+				rfnode = linitial(rfnodes[i]);
+			else if (n > 1)
+				rfnode = (Node *) makeBoolExpr(OR_EXPR, rfnodes[i], -1);
+			else
+				rfnode = NULL;
+
+			oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+			if (n == 0)
+				entry->exprstate[i] = NULL;
+			else
+				entry->exprstate[i] = pgoutput_row_filter_init_expr(rfnode);
+			MemoryContextSwitchTo(oldctx);
+		}
+
+		entry->exprstate_valid = true;
+	}
+
+	/*
+	 * Bail out if for a certain operation there is no row filter to process.
+	 * This is a fast path optimization. Read the explanation above about
+	 * rf_in_all_pubs.
+	 */
+	if (action == REORDER_BUFFER_CHANGE_INSERT && entry->exprstate[0] == NULL)
+		return true;
+	if (action == REORDER_BUFFER_CHANGE_UPDATE && entry->exprstate[1] == NULL)
+		return true;
+	if (action == REORDER_BUFFER_CHANGE_DELETE && entry->exprstate[2] == NULL)
+		return true;
+
+	elog(DEBUG3, "table \"%s.%s\" has row filter",
+		 get_namespace_name(get_rel_namespace(relid)),
+		 get_rel_name(relid));
+
+	PushActiveSnapshot(GetTransactionSnapshot());
+
+	estate = create_estate_for_relation(relation);
+
+	/* Prepare context per tuple */
+	ecxt = GetPerTupleExprContext(estate);
+	ecxt->ecxt_scantuple = entry->scantuple;
+
+	/*
+	 * The default behavior for UPDATEs is to use the old tuple for row
+	 * filtering.
+	 * 1) The columns used in the expression is restricted to REPLICA IDENTITY.
+	 * It means that all column values are available to evaluate the
+	 * expression.
+	 * 2) If the old tuple satisfies the row filter but the new tuple doesn't,
+	 * there is a data consistency issue. That is worse when the new tuple is
+	 * used (keep old row that could eventually conflicts with a new row
+	 * inserted in the future) instead of the old tuple (modify the row on
+	 * subscriber that couldn't be changed by the replication again due to row
+	 * filter expression).
+	 */
+	ExecStoreHeapTuple(oldtuple ? oldtuple : newtuple, ecxt->ecxt_scantuple, false);
+
+	/*
+	 * Process the row filter. Multiple row filters were already combined
+	 * above.
+	 */
+	if (action == REORDER_BUFFER_CHANGE_INSERT)
+		result = pgoutput_row_filter_exec_expr(entry->exprstate[0], ecxt);
+	else if (action == REORDER_BUFFER_CHANGE_UPDATE)
+		result = pgoutput_row_filter_exec_expr(entry->exprstate[1], ecxt);
+	else if (action == REORDER_BUFFER_CHANGE_DELETE)
+		result = pgoutput_row_filter_exec_expr(entry->exprstate[2], ecxt);
+	else
+		Assert(false);
+
+	/* Cleanup allocated resources */
+	ResetExprContext(ecxt);
+	FreeExecutorState(estate);
+	PopActiveSnapshot();
+
+	return result;
+}
+
 /*
  * Sends the decoded DML over wire.
  *
@@ -647,7 +974,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	if (in_streaming)
 		xid = change->txn->xid;
 
-	relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
+	relentry = get_rel_sync_entry(data, relation);
 
 	/* First check the table filter */
 	switch (change->action)
@@ -671,8 +998,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);
 
-	maybe_send_schema(ctx, change, relation, relentry);
-
 	/* Send the data */
 	switch (change->action)
 	{
@@ -680,6 +1005,16 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			{
 				HeapTuple	tuple = &change->data.tp.newtuple->tuple;
 
+				/* Check row filter. */
+				if (!pgoutput_row_filter(data, relation, NULL, tuple, relentry, change->action))
+					break;
+
+				/*
+				 * Schema should be sent before the logic that replaces the
+				 * relation because it also sends the ancestor's relation.
+				 */
+				maybe_send_schema(ctx, change, relation, relentry);
+
 				/* Switch relation if publishing via root. */
 				if (relentry->publish_as_relid != RelationGetRelid(relation))
 				{
@@ -703,6 +1038,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				&change->data.tp.oldtuple->tuple : NULL;
 				HeapTuple	newtuple = &change->data.tp.newtuple->tuple;
 
+				/* Check row filter. */
+				if (!pgoutput_row_filter(data, relation, oldtuple, newtuple, relentry, change->action))
+					break;
+
+				maybe_send_schema(ctx, change, relation, relentry);
+
 				/* Switch relation if publishing via root. */
 				if (relentry->publish_as_relid != RelationGetRelid(relation))
 				{
@@ -731,6 +1072,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			{
 				HeapTuple	oldtuple = &change->data.tp.oldtuple->tuple;
 
+				/* Check row filter. */
+				if (!pgoutput_row_filter(data, relation, oldtuple, NULL, relentry, change->action))
+					break;
+
+				maybe_send_schema(ctx, change, relation, relentry);
+
 				/* Switch relation if publishing via root. */
 				if (relentry->publish_as_relid != RelationGetRelid(relation))
 				{
@@ -794,7 +1141,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 		if (!is_publishable_relation(relation))
 			continue;
 
-		relentry = get_rel_sync_entry(data, relid);
+		relentry = get_rel_sync_entry(data, relation);
 
 		if (!relentry->pubactions.pubtruncate)
 			continue;
@@ -1116,9 +1463,10 @@ set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
  * when publishing.
  */
 static RelationSyncEntry *
-get_rel_sync_entry(PGOutputData *data, Oid relid)
+get_rel_sync_entry(PGOutputData *data, Relation relation)
 {
 	RelationSyncEntry *entry;
+	Oid			relid = RelationGetRelid(relation);
 	bool		am_partition = get_rel_relispartition(relid);
 	char		relkind = get_rel_relkind(relid);
 	bool		found;
@@ -1139,8 +1487,11 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		entry->schema_sent = false;
 		entry->streamed_txns = NIL;
 		entry->replicate_valid = false;
+		entry->exprstate_valid = false;
 		entry->pubactions.pubinsert = entry->pubactions.pubupdate =
 			entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
+		entry->scantuple = NULL;
+		entry->exprstate[0] = entry->exprstate[1] = entry->exprstate[2] = NULL;
 		entry->publish_as_relid = InvalidOid;
 		entry->map = NULL;		/* will be set by maybe_send_schema() if
 								 * needed */
@@ -1245,9 +1596,6 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 				entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
 			}
 
-			if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
-				entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
-				break;
 		}
 
 		list_free(pubids);
@@ -1340,6 +1688,8 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 	 */
 	if (entry != NULL)
 	{
+		int			i;
+
 		entry->schema_sent = false;
 		list_free(entry->streamed_txns);
 		entry->streamed_txns = NIL;
@@ -1354,6 +1704,24 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 			free_conversion_map(entry->map);
 		}
 		entry->map = NULL;
+
+		/*
+		 * Row filter cache cleanups
+		 */
+		entry->exprstate_valid = false;
+		for (i = 0; i < 3; i++)
+		{
+			if (entry->exprstate[i] != NULL)
+			{
+				pfree(entry->exprstate[i]);
+				entry->exprstate[i] = NULL;
+			}
+		}
+		if (entry->scantuple != NULL)
+		{
+			ExecDropSingleTupleTableSlot(entry->scantuple);
+			entry->scantuple = NULL;
+		}
 	}
 }
 
@@ -1365,6 +1733,7 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
 {
 	HASH_SEQ_STATUS status;
 	RelationSyncEntry *entry;
+	MemoryContext oldctx;
 
 	/*
 	 * We can get here if the plugin was used in SQL interface as the
@@ -1374,6 +1743,8 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
 	if (RelationSyncCache == NULL)
 		return;
 
+	oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+
 	/*
 	 * There is no way to find which entry in our cache the hash belongs to so
 	 * mark the whole cache as invalid.
@@ -1392,6 +1763,8 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
 		entry->pubactions.pubdelete = false;
 		entry->pubactions.pubtruncate = false;
 	}
+
+	MemoryContextSwitchTo(oldctx);
 }
 
 /* Send Replication origin */
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index c590003f18..6948ee0603 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4265,6 +4265,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 	int			i_oid;
 	int			i_prpubid;
 	int			i_prrelid;
+	int			i_prrelqual;
 	int			i,
 				j,
 				ntups;
@@ -4275,9 +4276,16 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 	query = createPQExpBuffer();
 
 	/* Collect all publication membership info. */
-	appendPQExpBufferStr(query,
-						 "SELECT tableoid, oid, prpubid, prrelid "
-						 "FROM pg_catalog.pg_publication_rel");
+	if (fout->remoteVersion >= 150000)
+		appendPQExpBufferStr(query,
+							 "SELECT tableoid, oid, prpubid, prrelid, "
+							 "pg_catalog.pg_get_expr(prqual, prrelid) AS prrelqual "
+							 "FROM pg_catalog.pg_publication_rel");
+	else
+		appendPQExpBufferStr(query,
+							 "SELECT tableoid, oid, prpubid, prrelid, "
+							 "NULL AS prrelqual "
+							 "FROM pg_catalog.pg_publication_rel");
 	res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
 
 	ntups = PQntuples(res);
@@ -4286,6 +4294,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 	i_oid = PQfnumber(res, "oid");
 	i_prpubid = PQfnumber(res, "prpubid");
 	i_prrelid = PQfnumber(res, "prrelid");
+	i_prrelqual = PQfnumber(res, "prrelqual");
 
 	/* this allocation may be more than we need */
 	pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo));
@@ -4326,6 +4335,10 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 		pubrinfo[j].dobj.name = tbinfo->dobj.name;
 		pubrinfo[j].publication = pubinfo;
 		pubrinfo[j].pubtable = tbinfo;
+		if (PQgetisnull(res, i, i_prrelqual))
+			pubrinfo[j].pubrelqual = NULL;
+		else
+			pubrinfo[j].pubrelqual = pg_strdup(PQgetvalue(res, i, i_prrelqual));
 
 		/* Decide whether we want to dump it */
 		selectDumpablePublicationObject(&(pubrinfo[j].dobj), fout);
@@ -4396,8 +4409,11 @@ dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo)
 
 	appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY",
 					  fmtId(pubinfo->dobj.name));
-	appendPQExpBuffer(query, " %s;\n",
+	appendPQExpBuffer(query, " %s",
 					  fmtQualifiedDumpable(tbinfo));
+	if (pubrinfo->pubrelqual)
+		appendPQExpBuffer(query, " WHERE %s", pubrinfo->pubrelqual);
+	appendPQExpBufferStr(query, ";\n");
 
 	/*
 	 * There is no point in creating a drop query as the drop is done by table
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index d1d8608e9c..0842a3c936 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -630,6 +630,7 @@ typedef struct _PublicationRelInfo
 	DumpableObject dobj;
 	PublicationInfo *publication;
 	TableInfo  *pubtable;
+	char	   *pubrelqual;
 } PublicationRelInfo;
 
 /*
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index ea721d963a..fb5cfc510f 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -3150,17 +3150,21 @@ describeOneTableDetails(const char *schemaname,
 			{
 				printfPQExpBuffer(&buf,
 								  "SELECT pubname\n"
+								  "		, NULL\n"
 								  "FROM pg_catalog.pg_publication p\n"
 								  "		JOIN pg_catalog.pg_publication_namespace pn ON p.oid = pn.pnpubid\n"
 								  "		JOIN pg_catalog.pg_class pc ON pc.relnamespace = pn.pnnspid\n"
 								  "WHERE pc.oid ='%s' and pg_catalog.pg_relation_is_publishable('%s')\n"
 								  "UNION\n"
 								  "SELECT pubname\n"
+								  "		, pg_get_expr(pr.prqual, c.oid)\n"
 								  "FROM pg_catalog.pg_publication p\n"
 								  "		JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n"
+								  "		JOIN pg_catalog.pg_class c ON c.oid = pr.prrelid\n"
 								  "WHERE pr.prrelid = '%s'\n"
 								  "UNION\n"
 								  "SELECT pubname\n"
+								  "		, NULL\n"
 								  "FROM pg_catalog.pg_publication p\n"
 								  "WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n"
 								  "ORDER BY 1;",
@@ -3196,6 +3200,13 @@ describeOneTableDetails(const char *schemaname,
 				printfPQExpBuffer(&buf, "    \"%s\"",
 								  PQgetvalue(result, i, 0));
 
+				/* row filter (if any) */
+				if (pset.sversion >= 150000)
+				{
+					if (!PQgetisnull(result, i, 1))
+						appendPQExpBuffer(&buf, " WHERE %s", PQgetvalue(result, i, 1));
+				}
+
 				printTableAddFooter(&cont, buf.data);
 			}
 			PQclear(result);
@@ -6320,8 +6331,12 @@ addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg,
 	for (i = 0; i < count; i++)
 	{
 		if (!singlecol)
+		{
 			printfPQExpBuffer(buf, "    \"%s.%s\"", PQgetvalue(res, i, 0),
 							  PQgetvalue(res, i, 1));
+			if (!PQgetisnull(res, i, 2))
+				appendPQExpBuffer(buf, " WHERE %s", PQgetvalue(res, i, 2));
+		}
 		else
 			printfPQExpBuffer(buf, "    \"%s\"", PQgetvalue(res, i, 0));
 
@@ -6450,8 +6465,15 @@ describePublications(const char *pattern)
 		{
 			/* Get the tables for the specified publication */
 			printfPQExpBuffer(&buf,
-							  "SELECT n.nspname, c.relname\n"
-							  "FROM pg_catalog.pg_class c,\n"
+							  "SELECT n.nspname, c.relname");
+			if (pset.sversion >= 150000)
+				appendPQExpBufferStr(&buf,
+									 ", pg_get_expr(pr.prqual, c.oid)");
+			else
+				appendPQExpBufferStr(&buf,
+									 ", NULL");
+			appendPQExpBuffer(&buf,
+							  "\nFROM pg_catalog.pg_class c,\n"
 							  "     pg_catalog.pg_namespace n,\n"
 							  "     pg_catalog.pg_publication_rel pr\n"
 							  "WHERE c.relnamespace = n.oid\n"
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 2f412ca3db..e1be254b9a 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1654,6 +1654,22 @@ psql_completion(const char *text, int start, int end)
 	/* ALTER PUBLICATION <name> SET */
 	else if (Matches("ALTER", "PUBLICATION", MatchAny, "SET"))
 		COMPLETE_WITH("(", "ALL TABLES IN SCHEMA", "TABLE");
+	/* ALTER PUBLICATION <name> SET TABLE <name> */
+	/* ALTER PUBLICATION <name> ADD TABLE <name> */
+	else if (Matches("ALTER", "PUBLICATION", MatchAny, "SET|ADD", "TABLE", MatchAny) && !ends_with(prev_wd, ','))
+		COMPLETE_WITH("WHERE (");
+
+	/*
+	 * "ALTER PUBLICATION <name> SET TABLE <name> WHERE (" - complete with
+	 * table attributes
+	 */
+
+	/*
+	 * "ALTER PUBLICATION <name> ADD TABLE <name> WHERE (" - complete with
+	 * table attributes
+	 */
+	else if (HeadMatches("ALTER", "PUBLICATION", MatchAny) && TailMatches("WHERE", "("))
+		COMPLETE_WITH_ATTR(prev3_wd, "");
 	else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD|DROP|SET", "ALL", "TABLES", "IN", "SCHEMA"))
 		COMPLETE_WITH_QUERY(Query_for_list_of_schemas
 							" AND nspname != 'pg_catalog' "
@@ -2777,12 +2793,19 @@ psql_completion(const char *text, int start, int end)
 		COMPLETE_WITH("TABLES", "TABLES IN SCHEMA");
 	else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "ALL", "TABLES"))
 		COMPLETE_WITH("IN SCHEMA", "WITH (");
-	else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "TABLE", MatchAny))
-		COMPLETE_WITH("WITH (");
+	else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "TABLE", MatchAny) && !ends_with(prev_wd, ','))
+		COMPLETE_WITH("WHERE (", "WITH (");
 	/* Complete "CREATE PUBLICATION <name> FOR TABLE" with "<table>, ..." */
 	else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "TABLE"))
 		COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables, NULL);
 
+	/*
+	 * "CREATE PUBLICATION <name> FOR TABLE <name> WHERE (" - complete with
+	 * table attributes
+	 */
+	else if (HeadMatches("CREATE", "PUBLICATION", MatchAny) && TailMatches("WHERE", "("))
+		COMPLETE_WITH_ATTR(prev3_wd, "");
+
 	/*
 	 * Complete "CREATE PUBLICATION <name> FOR ALL TABLES IN SCHEMA <schema>,
 	 * ..."
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 1ae439e6f3..fa23f09d69 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -86,6 +86,7 @@ typedef struct Publication
 typedef struct PublicationRelInfo
 {
 	Relation	relation;
+	Node	   *whereClause;
 } PublicationRelInfo;
 
 extern Publication *GetPublication(Oid pubid);
@@ -122,7 +123,7 @@ extern List *GetPubPartitionOptionRelations(List *result,
 											Oid relid);
 
 extern bool is_publishable_relation(Relation rel);
-extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
+extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *pri,
 											  bool if_not_exists);
 extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaid,
 											bool if_not_exists);
diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h
index b5d5504cbb..154bb61777 100644
--- a/src/include/catalog/pg_publication_rel.h
+++ b/src/include/catalog/pg_publication_rel.h
@@ -31,6 +31,10 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId)
 	Oid			oid;			/* oid */
 	Oid			prpubid BKI_LOOKUP(pg_publication); /* Oid of the publication */
 	Oid			prrelid BKI_LOOKUP(pg_class);	/* Oid of the relation */
+
+#ifdef	CATALOG_VARLEN			/* variable-length fields start here */
+	pg_node_tree prqual;		/* qualifications */
+#endif
 } FormData_pg_publication_rel;
 
 /* ----------------
@@ -40,6 +44,8 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId)
  */
 typedef FormData_pg_publication_rel *Form_pg_publication_rel;
 
+DECLARE_TOAST(pg_publication_rel, 8287, 8288);
+
 DECLARE_UNIQUE_INDEX_PKEY(pg_publication_rel_oid_index, 6112, PublicationRelObjectIndexId, on pg_publication_rel using btree(oid oid_ops));
 DECLARE_UNIQUE_INDEX(pg_publication_rel_prrelid_prpubid_index, 6113, PublicationRelPrrelidPrpubidIndexId, on pg_publication_rel using btree(prrelid oid_ops, prpubid oid_ops));
 
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 067138e6b5..b38e6633fb 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3641,6 +3641,7 @@ typedef struct PublicationTable
 {
 	NodeTag		type;
 	RangeVar   *relation;		/* relation to be published */
+	Node	   *whereClause;	/* qualifications */
 } PublicationTable;
 
 /*
diff --git a/src/include/parser/parse_node.h b/src/include/parser/parse_node.h
index ee179082ce..d58ae6a63f 100644
--- a/src/include/parser/parse_node.h
+++ b/src/include/parser/parse_node.h
@@ -80,6 +80,7 @@ typedef enum ParseExprKind
 	EXPR_KIND_COPY_WHERE,		/* WHERE condition in COPY FROM */
 	EXPR_KIND_GENERATED_COLUMN, /* generation expression for a column */
 	EXPR_KIND_CYCLE_MARK,		/* cycle mark value */
+	EXPR_KIND_PUBLICATION_WHERE /* WHERE condition for a table in PUBLICATION */
 } ParseExprKind;
 
 
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 1feb558968..3bf795756b 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -239,6 +239,173 @@ ALTER PUBLICATION testpub_forparted DROP TABLE testpub_parted;
 UPDATE testpub_parted2 SET a = 2;
 DROP TABLE testpub_parted1, testpub_parted2;
 DROP PUBLICATION testpub_forparted, testpub_forparted1;
+CREATE TABLE testpub_rf_tbl1 (a integer, b text);
+CREATE TABLE testpub_rf_tbl2 (c text, d integer);
+CREATE TABLE testpub_rf_tbl3 (e integer);
+CREATE TABLE testpub_rf_tbl4 (g text);
+CREATE TABLE testpub_rf_tbl5 (a xml);
+CREATE SCHEMA testpub_rf_schema1;
+CREATE TABLE testpub_rf_schema1.testpub_rf_tbl5 (h integer);
+CREATE SCHEMA testpub_rf_schema2;
+CREATE TABLE testpub_rf_schema2.testpub_rf_tb16 (i integer);
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub5 FOR TABLE testpub_rf_tbl1, testpub_rf_tbl2 WHERE (c <> 'test' AND d < 5) WITH (publish = 'insert');
+RESET client_min_messages;
+\dRp+ testpub5
+                                    Publication testpub5
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f          | t       | f       | f       | f         | f
+Tables:
+    "public.testpub_rf_tbl1"
+    "public.testpub_rf_tbl2" WHERE ((c <> 'test'::text) AND (d < 5))
+
+ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl3 WHERE (e > 1000 AND e < 2000);
+\dRp+ testpub5
+                                    Publication testpub5
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f          | t       | f       | f       | f         | f
+Tables:
+    "public.testpub_rf_tbl1"
+    "public.testpub_rf_tbl2" WHERE ((c <> 'test'::text) AND (d < 5))
+    "public.testpub_rf_tbl3" WHERE ((e > 1000) AND (e < 2000))
+
+ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl2;
+\dRp+ testpub5
+                                    Publication testpub5
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f          | t       | f       | f       | f         | f
+Tables:
+    "public.testpub_rf_tbl1"
+    "public.testpub_rf_tbl3" WHERE ((e > 1000) AND (e < 2000))
+
+-- remove testpub_rf_tbl1 and add testpub_rf_tbl3 again (another WHERE expression)
+ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (e > 300 AND e < 500);
+\dRp+ testpub5
+                                    Publication testpub5
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f          | t       | f       | f       | f         | f
+Tables:
+    "public.testpub_rf_tbl3" WHERE ((e > 300) AND (e < 500))
+
+-- test \d+ (now it displays filter information)
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub5a FOR TABLE testpub_rf_tbl1 WHERE (a > 1) WITH (publish = 'insert');
+CREATE PUBLICATION testpub5b FOR TABLE testpub_rf_tbl1;
+CREATE PUBLICATION testpub5c FOR TABLE testpub_rf_tbl1 WHERE (a > 3) WITH (publish = 'insert');
+RESET client_min_messages;
+\d+ testpub_rf_tbl1
+                              Table "public.testpub_rf_tbl1"
+ Column |  Type   | Collation | Nullable | Default | Storage  | Stats target | Description 
+--------+---------+-----------+----------+---------+----------+--------------+-------------
+ a      | integer |           |          |         | plain    |              | 
+ b      | text    |           |          |         | extended |              | 
+Publications:
+    "testpub5a" WHERE (a > 1)
+    "testpub5b"
+    "testpub5c" WHERE (a > 3)
+
+DROP PUBLICATION testpub5a, testpub5b, testpub5c;
+-- some more syntax tests to exercise other parser pathways
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_syntax1 FOR TABLE testpub_rf_tbl1, ONLY testpub_rf_tbl3 WHERE (e < 999) WITH (publish = 'insert');
+RESET client_min_messages;
+\dRp+ testpub_syntax1
+                                Publication testpub_syntax1
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f          | t       | f       | f       | f         | f
+Tables:
+    "public.testpub_rf_tbl1"
+    "public.testpub_rf_tbl3" WHERE (e < 999)
+
+DROP PUBLICATION testpub_syntax1;
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_syntax2 FOR TABLE testpub_rf_tbl1, testpub_rf_schema1.testpub_rf_tbl5 WHERE (h < 999) WITH (publish = 'insert');
+RESET client_min_messages;
+\dRp+ testpub_syntax2
+                                Publication testpub_syntax2
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f          | t       | f       | f       | f         | f
+Tables:
+    "public.testpub_rf_tbl1"
+    "testpub_rf_schema1.testpub_rf_tbl5" WHERE (h < 999)
+
+DROP PUBLICATION testpub_syntax2;
+-- fail - schemas don't allow WHERE clause
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_syntax3 FOR ALL TABLES IN SCHEMA testpub_rf_schema1 WHERE (a = 123);
+ERROR:  syntax error at or near "WHERE"
+LINE 1: ...ntax3 FOR ALL TABLES IN SCHEMA testpub_rf_schema1 WHERE (a =...
+                                                             ^
+CREATE PUBLICATION testpub_syntax3 FOR ALL TABLES IN SCHEMA testpub_rf_schema1, testpub_rf_schema1 WHERE (a = 123);
+ERROR:  WHERE clause for schema not allowed
+LINE 1: ...tax3 FOR ALL TABLES IN SCHEMA testpub_rf_schema1, testpub_rf...
+                                                             ^
+RESET client_min_messages;
+-- fail - duplicate tables are not allowed if that table has any WHERE clause
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_dups FOR TABLE testpub_rf_tbl1 WHERE (a = 1), testpub_rf_tbl1 WITH (publish = 'insert');
+ERROR:  conflicting or redundant WHERE clauses for table "testpub_rf_tbl1"
+CREATE PUBLICATION testpub_dups FOR TABLE testpub_rf_tbl1, testpub_rf_tbl1 WHERE (a = 2) WITH (publish = 'insert');
+ERROR:  conflicting or redundant WHERE clauses for table "testpub_rf_tbl1"
+RESET client_min_messages;
+-- fail - user-defined operators are not allowed
+CREATE FUNCTION testpub_rf_func1(integer, integer) RETURNS boolean AS $$ SELECT hashint4($1) > $2 $$ LANGUAGE SQL;
+CREATE OPERATOR =#> (PROCEDURE = testpub_rf_func1, LEFTARG = integer, RIGHTARG = integer);
+CREATE PUBLICATION testpub6 FOR TABLE testpub_rf_tbl3 WHERE (e =#> 27);
+ERROR:  invalid publication WHERE expression for relation "testpub_rf_tbl3"
+DETAIL:  User-defined operators are not allowed.
+-- fail - user-defined functions are not allowed
+CREATE FUNCTION testpub_rf_func2() RETURNS integer AS $$ BEGIN RETURN 123; END; $$ LANGUAGE plpgsql;
+ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl1 WHERE (a >= testpub_rf_func2());
+ERROR:  invalid publication WHERE expression for relation "testpub_rf_tbl1"
+DETAIL:  User-defined functions are not allowed (testpub_rf_func2).
+-- fail - non-immutable functions are not allowed. random() is volatile.
+ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl1 WHERE (a < random());
+ERROR:  invalid publication WHERE expression for relation "testpub_rf_tbl1"
+DETAIL:  Non-immutable functions are not allowed (random).
+-- ok - builtin operators are allowed
+ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (a IS NULL);
+-- ok - immutable builtin functions are allowed
+ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl4 WHERE (length(g) < 6);
+-- ok - conditional expressions are allowed
+ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl5 WHERE (a IS DOCUMENT);
+ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl5 WHERE (xmlexists('//foo[text() = ''bar'']' PASSING BY VALUE a));
+ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (NULLIF(1, 2) = a);
+ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (CASE a WHEN 5 THEN true ELSE false END);
+ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (COALESCE(b, 'foo') = 'foo');
+ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (GREATEST(a, 10) > 10);
+ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (a IN (2, 4, 6));
+ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (ARRAY[a] <@ ARRAY[2, 4, 6]);
+-- fail - WHERE not allowed in DROP
+ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl1 WHERE (e < 27);
+ERROR:  cannot use a WHERE clause when removing a table from publication
+-- fail - cannot ALTER SET table which is a member of a pre-existing schema
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub7 FOR ALL TABLES IN SCHEMA testpub_rf_schema2;
+ALTER PUBLICATION testpub7 SET ALL TABLES IN SCHEMA testpub_rf_schema2, TABLE testpub_rf_schema2.testpub_rf_tb16;
+ERROR:  cannot add relation "testpub_rf_schema2.testpub_rf_tb16" to publication
+DETAIL:  Table's schema "testpub_rf_schema2" is already part of the publication or part of the specified schema list.
+RESET client_min_messages;
+DROP TABLE testpub_rf_tbl1;
+DROP TABLE testpub_rf_tbl2;
+DROP TABLE testpub_rf_tbl3;
+DROP TABLE testpub_rf_tbl4;
+DROP TABLE testpub_rf_tbl5;
+DROP TABLE testpub_rf_schema1.testpub_rf_tbl5;
+DROP TABLE testpub_rf_schema2.testpub_rf_tb16;
+DROP SCHEMA testpub_rf_schema1;
+DROP SCHEMA testpub_rf_schema2;
+DROP PUBLICATION testpub5;
+DROP PUBLICATION testpub7;
+DROP OPERATOR =#>(integer, integer);
+DROP FUNCTION testpub_rf_func1(integer, integer);
+DROP FUNCTION testpub_rf_func2();
 -- Test cache invalidation FOR ALL TABLES publication
 SET client_min_messages = 'ERROR';
 CREATE TABLE testpub_tbl4(a int);
diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql
index 8fa0435c32..7ab0ef7e63 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -134,6 +134,100 @@ UPDATE testpub_parted2 SET a = 2;
 DROP TABLE testpub_parted1, testpub_parted2;
 DROP PUBLICATION testpub_forparted, testpub_forparted1;
 
+CREATE TABLE testpub_rf_tbl1 (a integer, b text);
+CREATE TABLE testpub_rf_tbl2 (c text, d integer);
+CREATE TABLE testpub_rf_tbl3 (e integer);
+CREATE TABLE testpub_rf_tbl4 (g text);
+CREATE TABLE testpub_rf_tbl5 (a xml);
+CREATE SCHEMA testpub_rf_schema1;
+CREATE TABLE testpub_rf_schema1.testpub_rf_tbl5 (h integer);
+CREATE SCHEMA testpub_rf_schema2;
+CREATE TABLE testpub_rf_schema2.testpub_rf_tb16 (i integer);
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub5 FOR TABLE testpub_rf_tbl1, testpub_rf_tbl2 WHERE (c <> 'test' AND d < 5) WITH (publish = 'insert');
+RESET client_min_messages;
+\dRp+ testpub5
+ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl3 WHERE (e > 1000 AND e < 2000);
+\dRp+ testpub5
+ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl2;
+\dRp+ testpub5
+-- remove testpub_rf_tbl1 and add testpub_rf_tbl3 again (another WHERE expression)
+ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (e > 300 AND e < 500);
+\dRp+ testpub5
+-- test \d+ (now it displays filter information)
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub5a FOR TABLE testpub_rf_tbl1 WHERE (a > 1) WITH (publish = 'insert');
+CREATE PUBLICATION testpub5b FOR TABLE testpub_rf_tbl1;
+CREATE PUBLICATION testpub5c FOR TABLE testpub_rf_tbl1 WHERE (a > 3) WITH (publish = 'insert');
+RESET client_min_messages;
+\d+ testpub_rf_tbl1
+DROP PUBLICATION testpub5a, testpub5b, testpub5c;
+-- some more syntax tests to exercise other parser pathways
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_syntax1 FOR TABLE testpub_rf_tbl1, ONLY testpub_rf_tbl3 WHERE (e < 999) WITH (publish = 'insert');
+RESET client_min_messages;
+\dRp+ testpub_syntax1
+DROP PUBLICATION testpub_syntax1;
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_syntax2 FOR TABLE testpub_rf_tbl1, testpub_rf_schema1.testpub_rf_tbl5 WHERE (h < 999) WITH (publish = 'insert');
+RESET client_min_messages;
+\dRp+ testpub_syntax2
+DROP PUBLICATION testpub_syntax2;
+-- fail - schemas don't allow WHERE clause
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_syntax3 FOR ALL TABLES IN SCHEMA testpub_rf_schema1 WHERE (a = 123);
+CREATE PUBLICATION testpub_syntax3 FOR ALL TABLES IN SCHEMA testpub_rf_schema1, testpub_rf_schema1 WHERE (a = 123);
+RESET client_min_messages;
+-- fail - duplicate tables are not allowed if that table has any WHERE clause
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_dups FOR TABLE testpub_rf_tbl1 WHERE (a = 1), testpub_rf_tbl1 WITH (publish = 'insert');
+CREATE PUBLICATION testpub_dups FOR TABLE testpub_rf_tbl1, testpub_rf_tbl1 WHERE (a = 2) WITH (publish = 'insert');
+RESET client_min_messages;
+-- fail - user-defined operators are not allowed
+CREATE FUNCTION testpub_rf_func1(integer, integer) RETURNS boolean AS $$ SELECT hashint4($1) > $2 $$ LANGUAGE SQL;
+CREATE OPERATOR =#> (PROCEDURE = testpub_rf_func1, LEFTARG = integer, RIGHTARG = integer);
+CREATE PUBLICATION testpub6 FOR TABLE testpub_rf_tbl3 WHERE (e =#> 27);
+-- fail - user-defined functions are not allowed
+CREATE FUNCTION testpub_rf_func2() RETURNS integer AS $$ BEGIN RETURN 123; END; $$ LANGUAGE plpgsql;
+ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl1 WHERE (a >= testpub_rf_func2());
+-- fail - non-immutable functions are not allowed. random() is volatile.
+ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl1 WHERE (a < random());
+-- ok - builtin operators are allowed
+ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (a IS NULL);
+-- ok - immutable builtin functions are allowed
+ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl4 WHERE (length(g) < 6);
+-- ok - conditional expressions are allowed
+ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl5 WHERE (a IS DOCUMENT);
+ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl5 WHERE (xmlexists('//foo[text() = ''bar'']' PASSING BY VALUE a));
+ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (NULLIF(1, 2) = a);
+ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (CASE a WHEN 5 THEN true ELSE false END);
+ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (COALESCE(b, 'foo') = 'foo');
+ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (GREATEST(a, 10) > 10);
+ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (a IN (2, 4, 6));
+ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl1 WHERE (ARRAY[a] <@ ARRAY[2, 4, 6]);
+-- fail - WHERE not allowed in DROP
+ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl1 WHERE (e < 27);
+-- fail - cannot ALTER SET table which is a member of a pre-existing schema
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub7 FOR ALL TABLES IN SCHEMA testpub_rf_schema2;
+ALTER PUBLICATION testpub7 SET ALL TABLES IN SCHEMA testpub_rf_schema2, TABLE testpub_rf_schema2.testpub_rf_tb16;
+RESET client_min_messages;
+
+DROP TABLE testpub_rf_tbl1;
+DROP TABLE testpub_rf_tbl2;
+DROP TABLE testpub_rf_tbl3;
+DROP TABLE testpub_rf_tbl4;
+DROP TABLE testpub_rf_tbl5;
+DROP TABLE testpub_rf_schema1.testpub_rf_tbl5;
+DROP TABLE testpub_rf_schema2.testpub_rf_tb16;
+DROP SCHEMA testpub_rf_schema1;
+DROP SCHEMA testpub_rf_schema2;
+DROP PUBLICATION testpub5;
+DROP PUBLICATION testpub7;
+DROP OPERATOR =#>(integer, integer);
+DROP FUNCTION testpub_rf_func1(integer, integer);
+DROP FUNCTION testpub_rf_func2();
+
 -- Test cache invalidation FOR ALL TABLES publication
 SET client_min_messages = 'ERROR';
 CREATE TABLE testpub_tbl4(a int);
diff --git a/src/test/subscription/t/027_row_filter.pl b/src/test/subscription/t/027_row_filter.pl
new file mode 100644
index 0000000000..5867cfd756
--- /dev/null
+++ b/src/test/subscription/t/027_row_filter.pl
@@ -0,0 +1,355 @@
+# Test logical replication behavior with row filtering
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 10;
+
+# create publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# setup structure on publisher
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_1 (a int primary key, b text)");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE tab_rowfilter_1 REPLICA IDENTITY FULL");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_2 (c int primary key)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_3 (a int primary key, b boolean)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_4 (c int primary key)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_partitioned (a int primary key, b integer) PARTITION BY RANGE(a)"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_less_10k (LIKE tab_rowfilter_partitioned)");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE tab_rowfilter_partitioned ATTACH PARTITION tab_rowfilter_less_10k FOR VALUES FROM (MINVALUE) TO (10000)"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_greater_10k (LIKE tab_rowfilter_partitioned)"
+);
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE tab_rowfilter_partitioned ATTACH PARTITION tab_rowfilter_greater_10k FOR VALUES FROM (10000) TO (MAXVALUE)"
+);
+
+# setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_1 (a int primary key, b text)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_2 (c int primary key)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_3 (a int primary key, b boolean)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_4 (c int primary key)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_partitioned (a int primary key, b integer) PARTITION BY RANGE(a)"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_less_10k (LIKE tab_rowfilter_partitioned)");
+$node_subscriber->safe_psql('postgres',
+	"ALTER TABLE tab_rowfilter_partitioned ATTACH PARTITION tab_rowfilter_less_10k FOR VALUES FROM (MINVALUE) TO (10000)"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_greater_10k (LIKE tab_rowfilter_partitioned)"
+);
+$node_subscriber->safe_psql('postgres',
+	"ALTER TABLE tab_rowfilter_partitioned ATTACH PARTITION tab_rowfilter_greater_10k FOR VALUES FROM (10000) TO (MAXVALUE)"
+);
+
+# setup logical replication
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_1 FOR TABLE tab_rowfilter_1 WHERE (a > 1000 AND b <> 'filtered')"
+);
+
+$node_publisher->safe_psql('postgres',
+	"ALTER PUBLICATION tap_pub_1 ADD TABLE tab_rowfilter_2 WHERE (c % 7 = 0)"
+);
+
+$node_publisher->safe_psql('postgres',
+	"ALTER PUBLICATION tap_pub_1 SET TABLE tab_rowfilter_1 WHERE (a > 1000 AND b <> 'filtered'), tab_rowfilter_2 WHERE (c % 2 = 0), tab_rowfilter_3"
+);
+
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_2 FOR TABLE tab_rowfilter_2 WHERE (c % 3 = 0)"
+);
+
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_3 FOR TABLE tab_rowfilter_partitioned WHERE (a < 5000)"
+);
+$node_publisher->safe_psql('postgres',
+	"ALTER PUBLICATION tap_pub_3 ADD TABLE tab_rowfilter_less_10k WHERE (a < 6000)"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_not_used FOR TABLE tab_rowfilter_1 WHERE (a < 0)"
+);
+
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_4a FOR TABLE tab_rowfilter_4 WHERE (c % 2 = 0)"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_4b FOR TABLE tab_rowfilter_4");
+
+#
+# The following INSERTs are executed before the CREATE SUBSCRIPTION, so these
+# SQL commands are for testing the initial data copy using logical replication.
+#
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) VALUES (1, 'not replicated')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) VALUES (1500, 'filtered')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) VALUES (1980, 'not filtered')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) SELECT x, 'test ' || x FROM generate_series(990,1002) x"
+);
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_2 (c) SELECT generate_series(1, 20)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_3 (a, b) SELECT x, (x % 3 = 0) FROM generate_series(1, 10) x"
+);
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_4 (c) SELECT generate_series(1, 10)");
+
+# insert data into partitioned table and directly on the partition
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_partitioned (a, b) VALUES(1, 100),(7000, 101),(15000, 102),(5500, 300)"
+);
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_less_10k (a, b) VALUES(2, 200),(6005, 201)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_greater_10k (a, b) VALUES(16000, 103)");
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+my $appname           = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2, tap_pub_3, tap_pub_4a, tap_pub_4b"
+);
+
+$node_publisher->wait_for_catchup($appname);
+
+# wait for initial table synchronization to finish
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Check expected replicated rows for tab_rowfilter_1
+# tap_pub_1 filter is: (a > 1000 AND b <> 'filtered')
+# - INSERT (1, 'not replicated')   NO, because a is not > 1000
+# - INSERT (1500, 'filtered')      NO, because b == 'filtered'
+# - INSERT (1980, 'not filtered')  YES
+# - generate_series(990,1002)      YES, only for 1001,1002 because a > 1000
+#
+my $result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT a, b FROM tab_rowfilter_1 ORDER BY 1, 2");
+is( $result, qq(1001|test 1001
+1002|test 1002
+1980|not filtered), 'check initial data copy from table tab_rowfilter_1');
+
+# Check expected replicated rows for tab_rowfilter_2
+# tap_pub_1 filter is: (c % 2 = 0)
+# tap_pub_2 filter is: (c % 3 = 0)
+# When there are multiple publications for the same table, the filters
+# expressions are OR'ed together. In this case, rows are replicated if
+# c value is divided by 2 OR 3 (2, 3, 4, 6, 8, 9, 10, 12, 14, 15, 16, 18, 20)
+#
+$result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT count(c), min(c), max(c) FROM tab_rowfilter_2");
+is($result, qq(13|2|20),
+	'check initial data copy from table tab_rowfilter_2');
+
+# Check expected replicated rows for tab_rowfilter_4
+# (same table in two publications but only one has a filter).
+# tap_pub_4a filter is: (c % 2 = 0)
+# tap_pub_4b filter is: <no filter>
+# Expressions are OR'ed together but when there is no filter it just means
+# OR everything - e.g. same as no filter at all.
+# Expect all rows: (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
+$result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT count(c), min(c), max(c) FROM tab_rowfilter_4");
+is($result, qq(10|1|10),
+	'check initial data copy from table tab_rowfilter_4');
+
+# Check expected replicated rows for tab_rowfilter_3
+# There is no filter. 10 rows are inserted, so 10 rows are replicated.
+$result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT count(a) FROM tab_rowfilter_3");
+is($result, qq(10), 'check initial data copy from table tab_rowfilter_3');
+
+# Check expected replicated rows for partitions
+# publication option publish_via_partition_root is false so use the row filter
+# from a partition
+# tab_rowfilter_partitioned filter: (a < 5000)
+# tab_rowfilter_less_10k filter:    (a < 6000)
+# tab_rowfilter_greater_10k filter: no filter
+#
+# INSERT into tab_rowfilter_partitioned:
+# - INSERT (1,100)       YES, because 1 < 6000
+# - INSERT (7000, 101)   NO,  because 7000 is not < 6000
+# - INSERT (15000, 102)  YES, because tab_rowfilter_greater_10k has no filter
+# - INSERT (5500, 300)   YES, because 5500 < 6000
+#
+# INSERT directly into tab_rowfilter_less_10k:
+# - INSERT (2, 200)      YES, because 2 < 6000
+# - INSERT (6005, 201)   NO, because 6005 is not < 6000
+#
+# INSERT directly into tab_rowfilter_greater_10k:
+# - INSERT (16000, 103)  YES, because tab_rowfilter_greater_10k has no filter
+#
+$result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT a, b FROM tab_rowfilter_less_10k ORDER BY 1, 2");
+is( $result, qq(1|100
+2|200
+5500|300), 'check initial data copy from partition tab_rowfilter_less_10k');
+
+$result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT a, b FROM tab_rowfilter_greater_10k ORDER BY 1, 2");
+is( $result, qq(15000|102
+16000|103), 'check initial data copy from partition tab_rowfilter_greater_10k'
+);
+
+# The following commands are executed after CREATE SUBSCRIPTION, so these SQL
+# commands are for testing normal logical replication behavior.
+#
+# test row filter (INSERT, UPDATE, DELETE)
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) VALUES (800, 'test 800')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) VALUES (1600, 'test 1600')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) VALUES (1601, 'test 1601')");
+$node_publisher->safe_psql('postgres',
+	"UPDATE tab_rowfilter_1 SET b = NULL WHERE a = 1600");
+$node_publisher->safe_psql('postgres',
+	"UPDATE tab_rowfilter_1 SET b = 'test 1601 updated' WHERE a = 1601");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_2 (c) VALUES (21), (22), (23), (24), (25)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_4 (c) VALUES (0), (11), (12)");
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check expected replicated rows for tab_rowfilter_2
+# tap_pub_1 filter is: (c % 2 = 0)
+# tap_pub_2 filter is: (c % 3 = 0)
+# When there are multiple publications for the same table, the filters
+# expressions are OR'ed together. In this case, rows are replicated if
+# c value is divided by 2 OR 3.
+#
+# Expect original rows (2, 3, 4, 6, 8, 9, 10, 12, 14, 15, 16, 18, 20)
+# Plus (21, 22, 24)
+#
+$result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT count(c), min(c), max(c) FROM tab_rowfilter_2");
+is($result, qq(16|2|24), 'check replicated rows to tab_rowfilter_2');
+
+# Check expected replicated rows for tab_rowfilter_4
+# (same table in two publications but only one has a filter).
+# tap_pub_4a filter is: (c % 2 = 0)
+# tap_pub_4b filter is: <no filter>
+# Expressions are OR'ed together but when there is no filter it just means
+# OR everything - e.g. same as no filter at all.
+# Expect all rows from initial copy: (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
+# And also (0, 11, 12)
+$result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT count(c), min(c), max(c) FROM tab_rowfilter_4");
+is($result, qq(13|0|12), 'check replicated rows to tab_rowfilter_4');
+
+# Check expected replicated rows for tab_rowfilter_1
+# tap_pub_1 filter is: (a > 1000 AND b <> 'filtered')
+#
+# - 1001, 1002, 1980 already exist from initial data copy
+# - INSERT (800, 'test 800')   NO, because 800 is not > 1000
+# - INSERT (1600, 'test 1600') YES, because 1600 > 1000 and 'test 1600' <> 'filtered'
+# - INSERT (1601, 'test 1601') YES, because 1601 > 1000 and 'test 1601' <> 'filtered'
+# - UPDATE (1600, NULL)        YES, because 1600 > 1000 and 'test 1600' <> 'filtered'
+# - UPDATE (1601, 'test 1601 updated') YES, because 1601 > 1000 and 'test 1601' <> 'filtered'
+#
+$result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT a, b FROM tab_rowfilter_1 ORDER BY 1, 2");
+is( $result, qq(1001|test 1001
+1002|test 1002
+1600|
+1601|test 1601 updated
+1980|not filtered), 'check replicated rows to table tab_rowfilter_1');
+
+# Publish using root partitioned table
+# Use a different partitioned table layout (exercise publish_via_partition_root)
+$node_publisher->safe_psql('postgres',
+	"ALTER PUBLICATION tap_pub_3 SET (publish_via_partition_root = true)");
+$node_subscriber->safe_psql('postgres',
+	"TRUNCATE TABLE tab_rowfilter_partitioned");
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION WITH (copy_data = true)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_partitioned (a, b) VALUES(4000, 400),(4001, 401),(4002, 402)"
+);
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_less_10k (a, b) VALUES(4500, 450)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_less_10k (a, b) VALUES(5600, 123)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_greater_10k (a, b) VALUES(14000, 1950)");
+$node_publisher->safe_psql('postgres',
+	"UPDATE tab_rowfilter_less_10k SET b = 30 WHERE a = 4001");
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM tab_rowfilter_less_10k WHERE a = 4002");
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check expected replicated rows for partitions
+# publication option publish_via_partition_root is true so use the row filter
+# from the root partitioned table
+# tab_rowfilter_partitioned filter: (a < 5000)
+# tab_rowfilter_less_10k filter:    (a < 6000)
+# tab_rowfilter_greater_10k filter: no filter
+#
+# After TRUNCATE, REFRESH PUBLICATION, the initial data copy will apply the
+# partitioned table row filter.
+# - INSERT (1, 100)      YES, 1 < 5000
+# - INSERT (7000, 101)   NO, 7000 is not < 5000
+# - INSERT (15000, 102)  NO, 15000 is not < 5000
+# - INSERT (5500, 300)   NO, 5500 is not < 5000
+# - INSERT (2, 200)      YES, 2 < 5000
+# - INSERT (6005, 201)   NO, 6005 is not < 5000
+# - INSERT (16000, 103)  NO, 16000 is not < 5000
+#
+# Execute SQL commands after initial data copy for testing the logical
+# replication behavior.
+# - INSERT (4000, 400)    YES, 4000 < 5000
+# - INSERT (4001, 401)    YES, 4001 < 5000
+# - INSERT (4002, 402)    YES, 4002 < 5000
+# - INSERT (4500, 450)    YES, 4500 < 5000
+# - INSERT (5600, 123)    NO, 5600 is not < 5000
+# - INSERT (14000, 1950)  NO, 16000 is not < 5000
+# - UPDATE (4001)         YES, 4001 < 5000
+# - DELETE (4002)         YES, 4002 < 5000
+$result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT a, b FROM tab_rowfilter_partitioned ORDER BY 1, 2");
+is( $result, qq(1|100
+2|200
+4000|400
+4001|30
+4500|450), 'check publish_via_partition_root behavior');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
-- 
2.20.1

