From 4d22158a63a05e990b04af1d9c12992e31aa08e2 Mon Sep 17 00:00:00 2001
From: Peter Smith <peter.b.smith@fujitsu.com>
Date: Fri, 17 Dec 2021 18:59:42 +1100
Subject: [PATCH v50 1/8] Row-filter for logical replication.

This feature adds row filtering for publication tables. When a publication
is defined or modified, rows that don't satisfy an optional WHERE clause
will be filtered out. This allows a database or set of tables to be
partially replicated. The row-filter is per table. A new row-filter can
be added simply by specifying a WHERE clause after the table name. The
WHERE clause must be enclosed by parentheses.

The row-filter WHERE clause for a table added to a publication that publishes
UPDATE and/or DELETE operations must contain only columns that are covered by
REPLICA IDENTITY. The row-filter WHERE clause for a table added to a publication
that publishes INSERT can use any column. If the row-filter evaluates to NULL,
it returns false. The WHERE clause allows simple expressions. Simple expressions
cannot contain any aggregate or window functions, non-immutable functions,
user-defined types, operators or functions.  This restriction could possibly be
addressed in the future.

If you choose to do the initial table synchronization, only data that satisfies
the row-filters is pulled by the subscriber. If the subscription has several
publications in which a table has been published with different WHERE clauses,
rows which satisfy ANY of the expressions will be copied. If a subscriber is a
pre-15 version, the initial table synchronization won't use row-filters even
if they are defined in the publisher.

If your publication contains a partitioned table, the publication parameter
publish_via_partition_root determines if it uses the partition row-filter (if
the parameter is false, the default) or the root partitioned table row-filter.

Psql commands \dRp+ and \d+ will display any row-filters.

Author: Euler Taveira, Peter Smith
Discussion: https://www.postgresql.org/message-id/flat/CAHE3wggb715X%2BmK_DitLXF25B%3DjE6xyNCH4YOwM860JR7HarGQ%40mail.gmail.com

Combining multiple row-filters
==============================

The subscription is treated "as a union of all the publications" [1], so the
row-filters are combined with OR.

If the subscription has several publications in which the same table has been
published with different filters, those expressions get OR'ed together so that
rows satisfying any of the expressions will be replicated.

Notice this means if one of the publications has no filter at all then all other
filters become redundant.

Author: Peter Smith
[1] https://www.postgresql.org/message-id/574b4e78-2f35-acf3-4bdc-4b872582e739%40enterprisedb.com

Row-filter caching
==================

The cached row-filters (e.g. ExprState *) are invalidated only in function
rel_sync_cache_relation_cb, so it means the ALTER PUBLICATION for one table
will not cause row-filters of other tables to also become invalidated.

The code related to caching row-filters is done just before they are needed
(in the pgoutput_row_filter function).

If there are multiple publication row-filters for a given table these are all
combined/flattened into a single filter.

Author: Peter Smith, Greg Nancarrow
The filter caching is based on a suggestions from Amit [1] [2], and Houz [3]
[1] https://www.postgresql.org/message-id/CAA4eK1%2BxQb06NGs6Y7OzwMtKYYixEqR8tdWV5THAVE4SAqNrDg%40mail.gmail.com
[2] https://www.postgresql.org/message-id/CAA4eK1%2Btio46goUKBUfAKFsFVxtgk8nOty%3DTxKoKH-gdLzHD2g%40mail.gmail.com
[3] https://www.postgresql.org/message-id/OS0PR01MB5716090A70A73ADF58C58950948D9%40OS0PR01MB5716.jpnprd01.prod.outlook.com

Cache ExprState per pubaction.

If a subscriber has multiple publications and these publications include the
same table then there can be multiple filters that apply to that table.

These filters are stored per-pubactions of the publications. There are 4 kinds
of pubaction ("insert", "update", "delete", "truncate"), but row-filters are
not applied for "truncate".

Filters for the same pubaction are all combined (OR'ed) and cached as one, so
at the end there are at most 3 cached filters per table.

The appropriate (pubaction) filter is executed according to the DML operation.

Author: Peter Smith
Discussion: https://www.postgresql.org/message-id/CAA4eK1%2BhVXfOSScbf5LUB%3D5is%3DwYaC6NBhLxuvetbWQnZRnsVQ%40mail.gmail.com
---
 doc/src/sgml/catalogs.sgml                  |   8 +
 doc/src/sgml/ref/alter_publication.sgml     |  13 +-
 doc/src/sgml/ref/create_publication.sgml    |  37 +-
 doc/src/sgml/ref/create_subscription.sgml   |  24 +-
 src/backend/catalog/pg_publication.c        |  69 +++-
 src/backend/commands/publicationcmds.c      | 108 +++++-
 src/backend/nodes/copyfuncs.c               |   1 +
 src/backend/nodes/equalfuncs.c              |   1 +
 src/backend/parser/gram.y                   |  38 +-
 src/backend/parser/parse_agg.c              |  10 +
 src/backend/parser/parse_expr.c             |  21 +-
 src/backend/parser/parse_func.c             |   3 +
 src/backend/parser/parse_oper.c             |   7 +
 src/backend/parser/parse_relation.c         |   9 +
 src/backend/replication/logical/tablesync.c | 118 +++++-
 src/backend/replication/pgoutput/pgoutput.c | 410 +++++++++++++++++++-
 src/bin/psql/describe.c                     |  26 +-
 src/include/catalog/pg_publication.h        |   7 +-
 src/include/catalog/pg_publication_rel.h    |   6 +
 src/include/nodes/parsenodes.h              |   1 +
 src/include/parser/parse_node.h             |   1 +
 src/test/regress/expected/publication.out   | 151 +++++++
 src/test/regress/sql/publication.sql        |  76 ++++
 src/test/subscription/t/027_row_filter.pl   | 357 +++++++++++++++++
 24 files changed, 1451 insertions(+), 51 deletions(-)
 create mode 100644 src/test/subscription/t/027_row_filter.pl

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 03e2537b07..2f1f9132c6 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -6311,6 +6311,14 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
        Reference to relation
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+      <structfield>prqual</structfield> <type>pg_node_tree</type>
+      </para>
+      <para>Expression tree (in <function>nodeToString()</function>
+      representation) for the relation's qualifying condition</para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml
index bb4ef5e5e2..5d9869c4f6 100644
--- a/doc/src/sgml/ref/alter_publication.sgml
+++ b/doc/src/sgml/ref/alter_publication.sgml
@@ -30,7 +30,7 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
 
 <phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
 
-    TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ]
+    TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ... ]
     ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
 </synopsis>
  </refsynopsisdiv>
@@ -52,7 +52,9 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
    remove one or more tables/schemas from the publication.  Note that adding
    tables/schemas to a publication that is already subscribed to will require a
    <literal>ALTER SUBSCRIPTION ... REFRESH PUBLICATION</literal> action on the
-   subscribing side in order to become effective.
+   subscribing side in order to become effective. Note also that the combination
+   of <literal>DROP</literal> with <literal>WHERE</literal> clause is not
+   allowed.
   </para>
 
   <para>
@@ -109,7 +111,12 @@ ALTER PUBLICATION <replaceable class="parameter">name</replaceable> RENAME TO <r
       table name, only that table is affected.  If <literal>ONLY</literal> is not
       specified, the table and all its descendant tables (if any) are
       affected.  Optionally, <literal>*</literal> can be specified after the table
-      name to explicitly indicate that descendant tables are included.
+      name to explicitly indicate that descendant tables are included. If the
+      optional <literal>WHERE</literal> clause is specified, rows that do not
+      satisfy the <replaceable class="parameter">expression</replaceable> will
+      not be published. Note that parentheses are required around the
+      expression. The <replaceable class="parameter">expression</replaceable>
+      is evaluated with the role used for the replication connection.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml
index d805e8e77a..5aeee2309d 100644
--- a/doc/src/sgml/ref/create_publication.sgml
+++ b/doc/src/sgml/ref/create_publication.sgml
@@ -28,7 +28,7 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
 
 <phrase>where <replaceable class="parameter">publication_object</replaceable> is one of:</phrase>
 
-    TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [, ... ]
+    TABLE [ ONLY ] <replaceable class="parameter">table_name</replaceable> [ * ] [ WHERE ( <replaceable class="parameter">expression</replaceable> ) ] [, ... ]
     ALL TABLES IN SCHEMA { <replaceable class="parameter">schema_name</replaceable> | CURRENT_SCHEMA } [, ... ]
 </synopsis>
  </refsynopsisdiv>
@@ -78,6 +78,13 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
       publication, so they are never explicitly added to the publication.
      </para>
 
+     <para>
+      If the optional <literal>WHERE</literal> clause is specified, only rows
+      that satisfy the <replaceable class="parameter">expression</replaceable> 
+      will be published. Note that parentheses are required around the 
+      expression. It has no effect on <literal>TRUNCATE</literal> commands.
+     </para>
+
      <para>
       Only persistent base tables and partitioned tables can be part of a
       publication.  Temporary tables, unlogged tables, foreign tables,
@@ -225,6 +232,22 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
    disallowed on those tables.
   </para>
 
+  <para>
+   A <literal>WHERE</literal> clause must contain only columns that are
+   part of the primary key or are covered by the <literal>REPLICA
+   IDENTITY</literal>, in order for <command>UPDATE</command> and
+   <command>DELETE</command> operations to be published. 
+   For publication of <command>INSERT</command> operations, any column
+   may be used in the <literal>WHERE</literal> clause.
+   If nullable columns are present in the <literal>WHERE</literal> clause,
+   possible NULL values should be accounted for in expressions, to avoid
+   unexpected results, because <literal>NULL</literal> values can cause 
+   those expressions to evaluate to false. 
+   A <literal>WHERE</literal> clause allows simple expressions. The simple
+   expression cannot contain any aggregate or window functions, non-immutable
+   functions, user-defined types, operators or functions.
+  </para>
+
   <para>
    For an <command>INSERT ... ON CONFLICT</command> command, the publication will
    publish the operation that actually results from the command.  So depending
@@ -247,6 +270,11 @@ CREATE PUBLICATION <replaceable class="parameter">name</replaceable>
   <para>
    <acronym>DDL</acronym> operations are not published.
   </para>
+
+  <para>
+   The <literal>WHERE</literal> clause expression is executed with the role used
+   for the replication connection.
+  </para>
  </refsect1>
 
  <refsect1>
@@ -259,6 +287,13 @@ CREATE PUBLICATION mypublication FOR TABLE users, departments;
 </programlisting>
   </para>
 
+  <para>
+   Create a publication that publishes all changes from active departments:
+<programlisting>
+CREATE PUBLICATION active_departments FOR TABLE departments WHERE (active IS TRUE);
+</programlisting>
+  </para>
+
   <para>
    Create a publication that publishes all changes in all tables:
 <programlisting>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 990a41f1a1..db255f323a 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -208,6 +208,11 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
           that are being subscribed to when the replication starts.
           The default is <literal>true</literal>.
          </para>
+         <para>
+          If the publications contain conditional expressions, it will affect
+          what data is copied. Refer to the
+          <xref linkend="sql-createsubscription-notes" /> for details.
+         </para>
         </listitem>
        </varlistentry>
 
@@ -293,7 +298,7 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
   </variablelist>
  </refsect1>
 
- <refsect1>
+ <refsect1 id="sql-createsubscription-notes" xreflabel="Notes">
   <title>Notes</title>
 
   <para>
@@ -319,6 +324,23 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
    the parameter <literal>create_slot = false</literal>.  This is an
    implementation restriction that might be lifted in a future release.
   </para>
+
+  <para>
+   If any table in the publication has a <literal>WHERE</literal> clause, rows
+   that do not satisfy the <replaceable class="parameter">expression</replaceable>
+   will not be published (i.e. they will be filtered out).
+   If the subscription has several publications in which the same table has been
+   published with different <literal>WHERE</literal> clauses, those expressions
+   (for the same publish operation) get OR'ed together so that rows satisfying any
+   of the expressions will be published. Also, if one of the publications for the
+   same table has no <literal>WHERE</literal> clause at all, or is a <literal>FOR
+   ALL TABLES</literal> or <literal>FOR ALL TABLES IN SCHEMA</literal> publication,
+   then all other <literal>WHERE</literal> clauses (for the same publish operation)
+   become redundant.
+   If the subscriber is a <productname>PostgreSQL</productname> version before 15
+   then any row filtering is ignored during the initial data synchronization phase.
+  </para>
+
  </refsect1>
 
  <refsect1>
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 62f10bcbd2..0929aa0a35 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -36,6 +36,9 @@
 #include "commands/publicationcmds.h"
 #include "funcapi.h"
 #include "miscadmin.h"
+#include "parser/parse_clause.h"
+#include "parser/parse_collate.h"
+#include "parser/parse_relation.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/catcache.h"
@@ -275,22 +278,55 @@ GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt,
 	return result;
 }
 
+/*
+ * Transform a publication WHERE clause, ensuring it is coerced to boolean and
+ * necessary collation information is added if required, and add a new
+ * nsitem/RTE for the associated relation to the ParseState's namespace list.
+ */
+Node *
+GetTransformedWhereClause(ParseState *pstate, PublicationRelInfo *pri,
+						  bool fixup_collation)
+{
+	ParseNamespaceItem *nsitem;
+	Node			   *whereclause = NULL;
+
+	pstate->p_sourcetext = nodeToString(pri->whereClause);
+
+	nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
+										   AccessShareLock, NULL, false, false);
+
+	addNSItemToQuery(pstate, nsitem, false, true, true);
+
+	whereclause = transformWhereClause(pstate, copyObject(pri->whereClause),
+									   EXPR_KIND_PUBLICATION_WHERE,
+									   "PUBLICATION WHERE");
+
+	/* Fix up collation information */
+	if (fixup_collation)
+		assign_expr_collations(pstate, whereclause);
+
+	return whereclause;
+}
+
 /*
  * Insert new publication / relation mapping.
  */
 ObjectAddress
-publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
+publication_add_relation(Oid pubid, PublicationRelInfo *pri,
 						 bool if_not_exists)
 {
 	Relation	rel;
 	HeapTuple	tup;
 	Datum		values[Natts_pg_publication_rel];
 	bool		nulls[Natts_pg_publication_rel];
-	Oid			relid = RelationGetRelid(targetrel->relation);
+	Relation	targetrel = pri->relation;
+	Oid			relid = RelationGetRelid(targetrel);
 	Oid			prrelid;
 	Publication *pub = GetPublication(pubid);
 	ObjectAddress myself,
 				referenced;
+	ParseState *pstate;
+	Node	   *whereclause = NULL;
 	List	   *relids = NIL;
 
 	rel = table_open(PublicationRelRelationId, RowExclusiveLock);
@@ -311,10 +347,22 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 		ereport(ERROR,
 				(errcode(ERRCODE_DUPLICATE_OBJECT),
 				 errmsg("relation \"%s\" is already member of publication \"%s\"",
-						RelationGetRelationName(targetrel->relation), pub->name)));
+						RelationGetRelationName(targetrel), pub->name)));
 	}
 
-	check_publication_add_relation(targetrel->relation);
+	check_publication_add_relation(targetrel);
+
+	if (pri->whereClause != NULL)
+	{
+		/* Set up a ParseState to parse with */
+		pstate = make_parsestate(NULL);
+
+		/*
+		 * Get the transformed WHERE clause, of boolean type, with necessary
+		 * collation information.
+		 */
+		whereclause = GetTransformedWhereClause(pstate, pri, true);
+	}
 
 	/* Form a tuple. */
 	memset(values, 0, sizeof(values));
@@ -328,6 +376,12 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 	values[Anum_pg_publication_rel_prrelid - 1] =
 		ObjectIdGetDatum(relid);
 
+	/* Add qualifications, if available */
+	if (whereclause)
+		values[Anum_pg_publication_rel_prqual - 1] = CStringGetTextDatum(nodeToString(whereclause));
+	else
+		nulls[Anum_pg_publication_rel_prqual - 1] = true;
+
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
 	/* Insert tuple into catalog. */
@@ -344,6 +398,13 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
 	ObjectAddressSet(referenced, RelationRelationId, relid);
 	recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
 
+	/* Add dependency on the objects mentioned in the qualifications */
+	if (whereclause)
+	{
+		recordDependencyOnExpr(&myself, whereclause, pstate->p_rtable, DEPENDENCY_NORMAL);
+		free_parsestate(pstate);
+	}
+
 	/* Close the table. */
 	table_close(rel, RowExclusiveLock);
 
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 404bb5d0c8..9ca743c6d2 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -529,40 +529,96 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
 		List	   *delrels = NIL;
 		ListCell   *oldlc;
 
+		/*
+		 * Check if the relation is member of the existing schema in the
+		 * publication or member of the schema list specified.
+		 */
 		CheckObjSchemaNotAlreadyInPublication(rels, schemaidlist,
 											  PUBLICATIONOBJ_TABLE);
 
-		/* Calculate which relations to drop. */
+		/*
+		 * In order to recreate the relation list for the publication,
+		 * look for existing relations that need not be dropped.
+		 */
 		foreach(oldlc, oldrelids)
 		{
 			Oid			oldrelid = lfirst_oid(oldlc);
-			ListCell   *newlc;
+			ListCell	*newlc;
+			PublicationRelInfo *oldrel;
 			bool		found = false;
+			HeapTuple	rftuple;
+			bool		rfisnull = true;
+			Node		*oldrelwhereclause = NULL;
+
+			/* look up the cache for the old relmap */
+			rftuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(oldrelid),
+									  ObjectIdGetDatum(pubid));
+			if (HeapTupleIsValid(rftuple))
+			{
+				Datum		whereClauseDatum;
+
+				whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, Anum_pg_publication_rel_prqual,
+												   &rfisnull);
+				if (!rfisnull)
+					oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
+
+				ReleaseSysCache(rftuple);
+			}
 
 			foreach(newlc, rels)
 			{
 				PublicationRelInfo *newpubrel;
 
 				newpubrel = (PublicationRelInfo *) lfirst(newlc);
+
+				/*
+				 * Look if any of the new set of relations match with
+				 * the existing relations in the publication. Additionally,
+				 * if the relation has an associated where-clause, check the
+				 * where-clauses also match. Drop the rest.
+				 */
 				if (RelationGetRelid(newpubrel->relation) == oldrelid)
 				{
-					found = true;
-					break;
+					if (rfisnull && !newpubrel->whereClause)
+					{
+						found = true;
+						break;
+					}
+
+					if (!rfisnull && newpubrel->whereClause)
+					{
+						ParseState *pstate = make_parsestate(NULL);
+						Node *whereclause;
+
+						whereclause = GetTransformedWhereClause(pstate,
+																newpubrel,
+																false);
+						if (equal(oldrelwhereclause, whereclause))
+						{
+							free_parsestate(pstate);
+							found = true;
+							break;
+						}
+
+						free_parsestate(pstate);
+					}
 				}
 			}
-			/* Not yet in the list, open it and add to the list */
+
+			if (oldrelwhereclause)
+				pfree(oldrelwhereclause);
+
+			/*
+			 * Add the non-matched relations to a list so that they can
+			 * be dropped.
+			 */
 			if (!found)
 			{
-				Relation	oldrel;
-				PublicationRelInfo *pubrel;
-
-				/* Wrap relation into PublicationRelInfo */
-				oldrel = table_open(oldrelid, ShareUpdateExclusiveLock);
-
-				pubrel = palloc(sizeof(PublicationRelInfo));
-				pubrel->relation = oldrel;
-
-				delrels = lappend(delrels, pubrel);
+				oldrel = palloc(sizeof(PublicationRelInfo));
+				oldrel->whereClause = NULL;
+				oldrel->relation = table_open(oldrelid,
+											  ShareUpdateExclusiveLock);
+				delrels = lappend(delrels, oldrel);
 			}
 		}
 
@@ -899,6 +955,7 @@ OpenTableList(List *tables)
 	List	   *relids = NIL;
 	List	   *rels = NIL;
 	ListCell   *lc;
+	List	   *relids_with_rf = NIL;
 
 	/*
 	 * Open, share-lock, and check all the explicitly-specified relations
@@ -926,15 +983,26 @@ OpenTableList(List *tables)
 		 */
 		if (list_member_oid(relids, myrelid))
 		{
+			/* Disallow duplicate tables if there are any with row-filters. */
+			if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
+				ereport(ERROR,
+						(errcode(ERRCODE_DUPLICATE_OBJECT),
+						 errmsg("conflicting or redundant row-filters for \"%s\"",
+								RelationGetRelationName(rel))));
+
 			table_close(rel, ShareUpdateExclusiveLock);
 			continue;
 		}
 
 		pub_rel = palloc(sizeof(PublicationRelInfo));
 		pub_rel->relation = rel;
+		pub_rel->whereClause = t->whereClause;
 		rels = lappend(rels, pub_rel);
 		relids = lappend_oid(relids, myrelid);
 
+		if (t->whereClause)
+			relids_with_rf = lappend_oid(relids_with_rf, myrelid);
+
 		/*
 		 * Add children of this rel, if requested, so that they too are added
 		 * to the publication.  A partitioned table can't have any inheritance
@@ -967,6 +1035,8 @@ OpenTableList(List *tables)
 				rel = table_open(childrelid, NoLock);
 				pub_rel = palloc(sizeof(PublicationRelInfo));
 				pub_rel->relation = rel;
+				/* child inherits WHERE clause from parent */
+				pub_rel->whereClause = t->whereClause;
 				rels = lappend(rels, pub_rel);
 				relids = lappend_oid(relids, childrelid);
 			}
@@ -974,6 +1044,7 @@ OpenTableList(List *tables)
 	}
 
 	list_free(relids);
+	list_free(relids_with_rf);
 
 	return rels;
 }
@@ -993,6 +1064,8 @@ CloseTableList(List *rels)
 		pub_rel = (PublicationRelInfo *) lfirst(lc);
 		table_close(pub_rel->relation, NoLock);
 	}
+
+	list_free_deep(rels);
 }
 
 /*
@@ -1088,6 +1161,11 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
 							RelationGetRelationName(rel))));
 		}
 
+		if (pubrel->whereClause)
+			ereport(ERROR,
+					(errcode(ERRCODE_SYNTAX_ERROR),
+					 errmsg("invalid use of WHERE row-filter in ALTER PUBLICATION ... DROP TABLE")));
+
 		ObjectAddressSet(obj, PublicationRelRelationId, prid);
 		performDeletion(&obj, DROP_CASCADE, 0);
 	}
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index df0b747883..bd55ea667f 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4833,6 +4833,7 @@ _copyPublicationTable(const PublicationTable *from)
 	PublicationTable *newnode = makeNode(PublicationTable);
 
 	COPY_NODE_FIELD(relation);
+	COPY_NODE_FIELD(whereClause);
 
 	return newnode;
 }
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index cb7ddd463c..028b8e5dc0 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -2312,6 +2312,7 @@ static bool
 _equalPublicationTable(const PublicationTable *a, const PublicationTable *b)
 {
 	COMPARE_NODE_FIELD(relation);
+	COMPARE_NODE_FIELD(whereClause);
 
 	return true;
 }
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 3d4dd43e47..9da93a01a1 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -9742,12 +9742,13 @@ CreatePublicationStmt:
  * relation_expr here.
  */
 PublicationObjSpec:
-			TABLE relation_expr
+			TABLE relation_expr OptWhereClause
 				{
 					$$ = makeNode(PublicationObjSpec);
 					$$->pubobjtype = PUBLICATIONOBJ_TABLE;
 					$$->pubtable = makeNode(PublicationTable);
 					$$->pubtable->relation = $2;
+					$$->pubtable->whereClause = $3;
 				}
 			| ALL TABLES IN_P SCHEMA ColId
 				{
@@ -9762,28 +9763,45 @@ PublicationObjSpec:
 					$$->pubobjtype = PUBLICATIONOBJ_TABLE_IN_CUR_SCHEMA;
 					$$->location = @5;
 				}
-			| ColId
+			| ColId OptWhereClause
 				{
 					$$ = makeNode(PublicationObjSpec);
 					$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
-					$$->name = $1;
+					if ($2)
+					{
+						/*
+						 * The OptWhereClause must be stored here but it is
+						 * valid only for tables. If the ColId was mistakenly
+						 * not a table this will be detected later in
+						 * preprocess_pubobj_list() and an error is thrown.
+						 */
+						$$->pubtable = makeNode(PublicationTable);
+						$$->pubtable->relation = makeRangeVar(NULL, $1, @1);
+						$$->pubtable->whereClause = $2;
+					}
+					else
+					{
+						$$->name = $1;
+					}
 					$$->location = @1;
 				}
-			| ColId indirection
+			| ColId indirection OptWhereClause
 				{
 					$$ = makeNode(PublicationObjSpec);
 					$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
 					$$->pubtable = makeNode(PublicationTable);
 					$$->pubtable->relation = makeRangeVarFromQualifiedName($1, $2, @1, yyscanner);
+					$$->pubtable->whereClause = $3;
 					$$->location = @1;
 				}
 			/* grammar like tablename * , ONLY tablename, ONLY ( tablename ) */
-			| extended_relation_expr
+			| extended_relation_expr OptWhereClause
 				{
 					$$ = makeNode(PublicationObjSpec);
 					$$->pubobjtype = PUBLICATIONOBJ_CONTINUATION;
 					$$->pubtable = makeNode(PublicationTable);
 					$$->pubtable->relation = $1;
+					$$->pubtable->whereClause = $2;
 				}
 			| CURRENT_SCHEMA
 				{
@@ -17431,7 +17449,8 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner)
 						errcode(ERRCODE_SYNTAX_ERROR),
 						errmsg("invalid table name at or near"),
 						parser_errposition(pubobj->location));
-			else if (pubobj->name)
+
+			if (pubobj->name)
 			{
 				/* convert it to PublicationTable */
 				PublicationTable *pubtable = makeNode(PublicationTable);
@@ -17444,6 +17463,13 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner)
 		else if (pubobj->pubobjtype == PUBLICATIONOBJ_TABLE_IN_SCHEMA ||
 				 pubobj->pubobjtype == PUBLICATIONOBJ_TABLE_IN_CUR_SCHEMA)
 		{
+			/* WHERE clause is not allowed on a schema object */
+			if (pubobj->pubtable && pubobj->pubtable->whereClause)
+				ereport(ERROR,
+						errcode(ERRCODE_SYNTAX_ERROR),
+						errmsg("WHERE clause for schema not allowed"),
+						parser_errposition(pubobj->location));
+
 			/*
 			 * We can distinguish between the different type of schema
 			 * objects based on whether name and pubtable is set.
diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c
index 7d829a05a9..193c87d8b7 100644
--- a/src/backend/parser/parse_agg.c
+++ b/src/backend/parser/parse_agg.c
@@ -551,6 +551,13 @@ check_agglevels_and_constraints(ParseState *pstate, Node *expr)
 				err = _("grouping operations are not allowed in COPY FROM WHERE conditions");
 
 			break;
+		case EXPR_KIND_PUBLICATION_WHERE:
+			if (isAgg)
+				err = _("aggregate functions are not allowed in publication WHERE expressions");
+			else
+				err = _("grouping operations are not allowed in publication WHERE expressions");
+
+			break;
 
 		case EXPR_KIND_CYCLE_MARK:
 			errkind = true;
@@ -943,6 +950,9 @@ transformWindowFuncCall(ParseState *pstate, WindowFunc *wfunc,
 		case EXPR_KIND_CYCLE_MARK:
 			errkind = true;
 			break;
+		case EXPR_KIND_PUBLICATION_WHERE:
+			err = _("window functions are not allowed in publication WHERE expressions");
+			break;
 
 			/*
 			 * There is intentionally no default: case here, so that the
diff --git a/src/backend/parser/parse_expr.c b/src/backend/parser/parse_expr.c
index 2d1a477154..3d43839b35 100644
--- a/src/backend/parser/parse_expr.c
+++ b/src/backend/parser/parse_expr.c
@@ -200,8 +200,19 @@ transformExprRecurse(ParseState *pstate, Node *expr)
 			break;
 
 		case T_FuncCall:
-			result = transformFuncCall(pstate, (FuncCall *) expr);
-			break;
+			{
+				/*
+				 * Forbid functions in publication WHERE condition
+				 */
+				if (pstate->p_expr_kind == EXPR_KIND_PUBLICATION_WHERE)
+					ereport(ERROR,
+							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+							 errmsg("functions are not allowed in publication WHERE expressions"),
+							 parser_errposition(pstate, exprLocation(expr))));
+
+				result = transformFuncCall(pstate, (FuncCall *) expr);
+				break;
+			}
 
 		case T_MultiAssignRef:
 			result = transformMultiAssignRef(pstate, (MultiAssignRef *) expr);
@@ -504,6 +515,7 @@ transformColumnRef(ParseState *pstate, ColumnRef *cref)
 		case EXPR_KIND_COPY_WHERE:
 		case EXPR_KIND_GENERATED_COLUMN:
 		case EXPR_KIND_CYCLE_MARK:
+		case EXPR_KIND_PUBLICATION_WHERE:
 			/* okay */
 			break;
 
@@ -1764,6 +1776,9 @@ transformSubLink(ParseState *pstate, SubLink *sublink)
 		case EXPR_KIND_GENERATED_COLUMN:
 			err = _("cannot use subquery in column generation expression");
 			break;
+		case EXPR_KIND_PUBLICATION_WHERE:
+			err = _("cannot use subquery in publication WHERE expression");
+			break;
 
 			/*
 			 * There is intentionally no default: case here, so that the
@@ -3084,6 +3099,8 @@ ParseExprKindName(ParseExprKind exprKind)
 			return "GENERATED AS";
 		case EXPR_KIND_CYCLE_MARK:
 			return "CYCLE";
+		case EXPR_KIND_PUBLICATION_WHERE:
+			return "publication expression";
 
 			/*
 			 * There is intentionally no default: case here, so that the
diff --git a/src/backend/parser/parse_func.c b/src/backend/parser/parse_func.c
index 542f9167aa..29bebb73eb 100644
--- a/src/backend/parser/parse_func.c
+++ b/src/backend/parser/parse_func.c
@@ -2655,6 +2655,9 @@ check_srf_call_placement(ParseState *pstate, Node *last_srf, int location)
 		case EXPR_KIND_CYCLE_MARK:
 			errkind = true;
 			break;
+		case EXPR_KIND_PUBLICATION_WHERE:
+			err = _("set-returning functions are not allowed in publication WHERE expressions");
+			break;
 
 			/*
 			 * There is intentionally no default: case here, so that the
diff --git a/src/backend/parser/parse_oper.c b/src/backend/parser/parse_oper.c
index bc34a23afc..29f8835ce1 100644
--- a/src/backend/parser/parse_oper.c
+++ b/src/backend/parser/parse_oper.c
@@ -718,6 +718,13 @@ make_op(ParseState *pstate, List *opname, Node *ltree, Node *rtree,
 											opform->oprright)),
 				 parser_errposition(pstate, location)));
 
+	/* Check it's not a custom operator for publication WHERE expressions */
+	if (pstate->p_expr_kind == EXPR_KIND_PUBLICATION_WHERE && opform->oid >= FirstNormalObjectId)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("user-defined operators are not allowed in publication WHERE expressions"),
+				 parser_errposition(pstate, location)));
+
 	/* Do typecasting and build the expression tree */
 	if (ltree == NULL)
 	{
diff --git a/src/backend/parser/parse_relation.c b/src/backend/parser/parse_relation.c
index c5c3f26ecf..036d9c6d26 100644
--- a/src/backend/parser/parse_relation.c
+++ b/src/backend/parser/parse_relation.c
@@ -3538,11 +3538,20 @@ errorMissingRTE(ParseState *pstate, RangeVar *relation)
 						  rte->eref->aliasname)),
 				 parser_errposition(pstate, relation->location)));
 	else
+	{
+		if (pstate->p_expr_kind == EXPR_KIND_PUBLICATION_WHERE)
+			ereport(ERROR,
+					(errcode(ERRCODE_UNDEFINED_TABLE),
+					 errmsg("publication WHERE expression invalid reference to table \"%s\"",
+							relation->relname),
+					 parser_errposition(pstate, relation->location)));
+
 		ereport(ERROR,
 				(errcode(ERRCODE_UNDEFINED_TABLE),
 				 errmsg("missing FROM-clause entry for table \"%s\"",
 						relation->relname),
 				 parser_errposition(pstate, relation->location)));
+	}
 }
 
 /*
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index f07983a43c..c20c2219fc 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -687,20 +687,24 @@ copy_read_data(void *outbuf, int minread, int maxread)
 
 
 /*
- * Get information about remote relation in similar fashion the RELATION
- * message provides during replication.
+ * Get information about a remote relation, in a similar fashion to how the
+ * RELATION message provides information during replication. This function also
+ * returns the relation qualifications to be used in the COPY command.
  */
 static void
 fetch_remote_table_info(char *nspname, char *relname,
-						LogicalRepRelation *lrel)
+						LogicalRepRelation *lrel, List **qual)
 {
 	WalRcvExecResult *res;
 	StringInfoData cmd;
 	TupleTableSlot *slot;
 	Oid			tableRow[] = {OIDOID, CHAROID, CHAROID};
 	Oid			attrRow[] = {TEXTOID, OIDOID, BOOLOID};
+	Oid			qualRow[] = {TEXTOID};
 	bool		isnull;
 	int			natt;
+	ListCell   *lc;
+	bool		first;
 
 	lrel->nspname = nspname;
 	lrel->relname = relname;
@@ -796,6 +800,80 @@ fetch_remote_table_info(char *nspname, char *relname,
 	lrel->natts = natt;
 
 	walrcv_clear_result(res);
+
+	/*
+	 * Get relation qual. DISTINCT avoids the same expression of a table in
+	 * multiple publications from being included multiple times in the final
+	 * expression.
+	 */
+	if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
+	{
+		resetStringInfo(&cmd);
+		appendStringInfo(&cmd,
+						 "SELECT DISTINCT pg_get_expr(prqual, prrelid) "
+						 "  FROM pg_publication p "
+						 "  INNER JOIN pg_publication_rel pr "
+						 "       ON (p.oid = pr.prpubid) "
+						 " WHERE pr.prrelid = %u "
+						 "   AND p.pubname IN (", lrel->remoteid);
+
+		first = true;
+		foreach(lc, MySubscription->publications)
+		{
+			char	   *pubname = strVal(lfirst(lc));
+
+			if (first)
+				first = false;
+			else
+				appendStringInfoString(&cmd, ", ");
+
+			appendStringInfoString(&cmd, quote_literal_cstr(pubname));
+		}
+		appendStringInfoChar(&cmd, ')');
+
+		res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 1, qualRow);
+
+		if (res->status != WALRCV_OK_TUPLES)
+			ereport(ERROR,
+					(errmsg("could not fetch relation qualifications for table \"%s.%s\" from publisher: %s",
+							nspname, relname, res->err)));
+
+		/*
+		 * Multiple row filter expressions for the same table will be combined
+		 * by COPY using OR. If any of the filter expressions for this table are
+		 * null, it means the whole table will be copied. In this case it is not
+		 * necessary to construct a unified row filter expression at all.
+		 */
+		slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+		while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+		{
+			Datum		rf = slot_getattr(slot, 1, &isnull);
+
+			if (!isnull)
+				*qual = lappend(*qual, makeString(TextDatumGetCString(rf)));
+
+			ExecClearTuple(slot);
+
+			/*
+			 * One entry without a row filter expression means clean up
+			 * previous expressions (if there are any) and return with no
+			 * expressions.
+			 */
+			if (isnull)
+			{
+				if (*qual)
+				{
+					list_free_deep(*qual);
+					*qual = NIL;
+				}
+				break;
+			}
+		}
+		ExecDropSingleTupleTableSlot(slot);
+
+		walrcv_clear_result(res);
+	}
+
 	pfree(cmd.data);
 }
 
@@ -809,6 +887,7 @@ copy_table(Relation rel)
 {
 	LogicalRepRelMapEntry *relmapentry;
 	LogicalRepRelation lrel;
+	List	   *qual = NIL;
 	WalRcvExecResult *res;
 	StringInfoData cmd;
 	CopyFromState cstate;
@@ -817,7 +896,7 @@ copy_table(Relation rel)
 
 	/* Get the publisher relation info. */
 	fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
-							RelationGetRelationName(rel), &lrel);
+							RelationGetRelationName(rel), &lrel, &qual);
 
 	/* Put the relation into relmap. */
 	logicalrep_relmap_update(&lrel);
@@ -828,14 +907,18 @@ copy_table(Relation rel)
 
 	/* Start copy on the publisher. */
 	initStringInfo(&cmd);
-	if (lrel.relkind == RELKIND_RELATION)
+
+	/* Regular table with no row filter */
+	if (lrel.relkind == RELKIND_RELATION && qual == NIL)
 		appendStringInfo(&cmd, "COPY %s TO STDOUT",
 						 quote_qualified_identifier(lrel.nspname, lrel.relname));
 	else
 	{
 		/*
 		 * For non-tables, we need to do COPY (SELECT ...), but we can't just
-		 * do SELECT * because we need to not copy generated columns.
+		 * do SELECT * because we need to not copy generated columns. For
+		 * tables with any row filters, build a SELECT query with OR'ed row
+		 * filters for COPY.
 		 */
 		appendStringInfoString(&cmd, "COPY (SELECT ");
 		for (int i = 0; i < lrel.natts; i++)
@@ -844,8 +927,29 @@ copy_table(Relation rel)
 			if (i < lrel.natts - 1)
 				appendStringInfoString(&cmd, ", ");
 		}
-		appendStringInfo(&cmd, " FROM %s) TO STDOUT",
+		appendStringInfo(&cmd, " FROM %s",
 						 quote_qualified_identifier(lrel.nspname, lrel.relname));
+		/* list of OR'ed filters */
+		if (qual != NIL)
+		{
+			ListCell   *lc;
+			bool		first = true;
+
+			appendStringInfoString(&cmd, " WHERE ");
+			foreach(lc, qual)
+			{
+				char	   *q = strVal(lfirst(lc));
+
+				if (first)
+					first = false;
+				else
+					appendStringInfoString(&cmd, " OR ");
+				appendStringInfoString(&cmd, q);
+			}
+			list_free_deep(qual);
+		}
+
+		appendStringInfoString(&cmd, ") TO STDOUT");
 	}
 	res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
 	pfree(cmd.data);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 6f6a203dea..2fa08e7278 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -15,16 +15,24 @@
 #include "access/tupconvert.h"
 #include "catalog/partition.h"
 #include "catalog/pg_publication.h"
+#include "catalog/pg_publication_rel.h"
 #include "commands/defrem.h"
+#include "executor/executor.h"
 #include "fmgr.h"
+#include "nodes/nodeFuncs.h"
+#include "nodes/makefuncs.h"
+#include "optimizer/optimizer.h"
+#include "parser/parse_coerce.h"
 #include "replication/logical.h"
 #include "replication/logicalproto.h"
 #include "replication/origin.h"
 #include "replication/pgoutput.h"
+#include "utils/builtins.h"
 #include "utils/int8.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "utils/snapmgr.h"
 #include "utils/syscache.h"
 #include "utils/varlena.h"
 
@@ -115,6 +123,24 @@ typedef struct RelationSyncEntry
 	bool		replicate_valid;
 	PublicationActions pubactions;
 
+	/*
+	 * ExprState cannot be used to indicate no cache, invalid cache and valid
+	 * cache, so the flag exprstate_valid indicates if the current cache is
+	 * valid.
+	 *
+	 * Multiple ExprState entries might be used if there are multiple
+	 * publications for a single table. Different publication actions don't
+	 * allow multiple expressions to always be combined into one, so there is
+	 * one ExprSTate per publication action. Only 3 publication actions are used
+	 * for row filtering ("insert", "update", "delete"). The exprstate array is
+	 * indexed by ReorderBufferChangeType.
+	 */
+	bool		exprstate_valid;
+#define IDX_PUBACTION_n		 	3
+	ExprState	   *exprstate[IDX_PUBACTION_n];	/* ExprState array for row filter.
+												   One per publication action. */
+	TupleTableSlot *scantuple;		/* tuple table slot for row filter */
+
 	/*
 	 * OID of the relation to publish changes as.  For a partition, this may
 	 * be set to one of its ancestors whose schema will be used when
@@ -137,7 +163,7 @@ static HTAB *RelationSyncCache = NULL;
 
 static void init_rel_sync_cache(MemoryContext decoding_context);
 static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
-static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
+static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Relation relation);
 static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
 										  uint32 hashvalue);
@@ -146,6 +172,14 @@ static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
 static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
 											TransactionId xid);
 
+/* row filter routines */
+static EState *create_estate_for_relation(Relation rel);
+static ExprState *pgoutput_row_filter_init_expr(Node *rfnode);
+static bool pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext);
+static bool pgoutput_row_filter(enum ReorderBufferChangeType changetype, PGOutputData *data,
+								Relation relation, HeapTuple oldtuple,
+								HeapTuple newtuple, RelationSyncEntry *entry);
+
 /*
  * Specify output plugin callbacks
  */
@@ -620,6 +654,316 @@ send_relation_and_attrs(Relation relation, TransactionId xid,
 	OutputPluginWrite(ctx, false);
 }
 
+/*
+ * Executor state preparation for evaluation of row filter expressions for the
+ * specified relation.
+ */
+static EState *
+create_estate_for_relation(Relation rel)
+{
+	EState			*estate;
+	RangeTblEntry	*rte;
+
+	estate = CreateExecutorState();
+
+	rte = makeNode(RangeTblEntry);
+	rte->rtekind = RTE_RELATION;
+	rte->relid = RelationGetRelid(rel);
+	rte->relkind = rel->rd_rel->relkind;
+	rte->rellockmode = AccessShareLock;
+	ExecInitRangeTable(estate, list_make1(rte));
+
+	estate->es_output_cid = GetCurrentCommandId(false);
+
+	return estate;
+}
+
+/*
+ * Initialize for row filter expression execution.
+ */
+static ExprState *
+pgoutput_row_filter_init_expr(Node *rfnode)
+{
+	ExprState	   *exprstate;
+	Oid				exprtype;
+	Expr		   *expr;
+
+	/* Cache ExprState using CacheMemoryContext. */
+	Assert(CurrentMemoryContext = CacheMemoryContext);
+
+	/* Prepare expression for execution */
+	exprtype = exprType(rfnode);
+	expr = (Expr *) coerce_to_target_type(NULL, rfnode, exprtype, BOOLOID, -1, COERCION_ASSIGNMENT, COERCE_IMPLICIT_CAST, -1);
+
+	if (expr == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_CANNOT_COERCE),
+				 errmsg("row filter returns type %s that cannot be cast to the expected type %s",
+						format_type_be(exprtype),
+						format_type_be(BOOLOID)),
+				 errhint("You will need to rewrite the row filter.")));
+
+	/*
+	 * This is the same code as ExecPrepareExpr() but that is not used because
+	 * we have no EState to pass it. There should probably be another function
+	 * in the executor to handle the execution outside a normal Plan tree
+	 * context.
+	 */
+	expr = expression_planner(expr);
+	exprstate = ExecInitExpr(expr, NULL);
+
+	return exprstate;
+}
+
+/*
+ * Evaluates row filter.
+ *
+ * If the row filter evaluates to NULL, it is taken as false i.e. the change
+ * isn't replicated.
+ */
+static bool
+pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext)
+{
+	Datum		ret;
+	bool		isnull;
+
+	Assert(state != NULL);
+
+	ret = ExecEvalExprSwitchContext(state, econtext, &isnull);
+
+	elog(DEBUG3, "row filter evaluates to %s (isnull: %s)",
+		 DatumGetBool(ret) ? "true" : "false",
+		 isnull ? "true" : "false");
+
+	if (isnull)
+		return false;
+
+	return DatumGetBool(ret);
+}
+
+/*
+ * Change is checked against the row filter, if any.
+ *
+ * If it returns true, the change is replicated, otherwise, it is not.
+ */
+static bool
+pgoutput_row_filter(enum ReorderBufferChangeType changetype, PGOutputData *data,
+					Relation relation, HeapTuple oldtuple, HeapTuple newtuple,
+					RelationSyncEntry *entry)
+{
+	EState	   *estate;
+	ExprContext *ecxt;
+	ListCell   *lc;
+	bool		result = true;
+	Oid			relid = RelationGetRelid(relation);
+	List	   *rfnodes[] = {NIL, NIL, NIL}; /* One per pubaction */
+	bool		no_filter[] = {false, false, false}; /* One per pubaction */
+
+	Assert(changetype == REORDER_BUFFER_CHANGE_INSERT ||
+		   changetype == REORDER_BUFFER_CHANGE_UPDATE ||
+		   changetype == REORDER_BUFFER_CHANGE_DELETE);
+	/*
+	 * If the row filter caching is currently flagged "invalid" then it means we
+	 * don't know yet if there is/isn't any row filters for this relation.
+	 *
+	 * This code is usually one-time execution.
+	 *
+	 * NOTE: The ExprState cache could have been created up-front in the
+	 * function get_rel_sync_entry() instead of the deferred on-the-fly
+	 * assignment below. The reason for choosing to do it here is because there
+	 * are some scenarios where the get_rel_sync_entry() is called but where a
+	 * row will not be published. For example, for truncate, we may not need
+	 * any row evaluation, so there is no need to compute it. It would also be
+	 * a waste if any error happens before actually evaluating the filter. And
+	 * tomorrow there could be other operations (which use get_rel_sync_entry)
+	 * but which don't need to build ExprState. Furthermore, because the
+	 * decision to publish or not is made AFTER the call to get_rel_sync_entry
+	 * it may be that the filter evaluation is not necessary at all. So the
+	 * decision was to defer this logic to last moment when we know it will be
+	 * needed.
+	 */
+	if (!entry->exprstate_valid)
+	{
+		MemoryContext	oldctx;
+		int				idx;
+		bool			found_filters = false;
+		int				idx_ins = REORDER_BUFFER_CHANGE_INSERT;
+		int				idx_upd = REORDER_BUFFER_CHANGE_UPDATE;
+		int				idx_del = REORDER_BUFFER_CHANGE_DELETE;
+
+		/*
+		 * Find if there are any row filters for this relation. If there are,
+		 * then prepare the necessary ExprState and cache it in entry->exprstate.
+		 *
+		 * NOTE: All publication-table mappings must be checked.
+		 *
+		 * NOTE: If the relation is a partition and pubviaroot is true, use
+		 * the row filter of the topmost partitioned table instead of the row
+		 * filter of its own partition.
+		 *
+		 * NOTE: Multiple publications might have multiple row filters for this
+		 * relation. Since row filter usage depends on the DML operation,
+		 * there are multiple lists (one for each operation) which row filters
+		 * will be appended.
+		 */
+		foreach(lc, data->publications)
+		{
+			Publication *pub = lfirst(lc);
+			HeapTuple	rftuple;
+			Datum		rfdatum;
+			bool		rfisnull;
+
+			/*
+			 * Lookup if there is a row-filter, and if yes remember it in a list (per
+			 * pubaction). If no, then remember there was no filter for this pubaction.
+			 * Code following this 'publications' loop will combine all filters.
+			 */
+			rftuple = SearchSysCache2(PUBLICATIONRELMAP, ObjectIdGetDatum(entry->publish_as_relid), ObjectIdGetDatum(pub->oid));
+			if (HeapTupleIsValid(rftuple))
+			{
+				rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, Anum_pg_publication_rel_prqual, &rfisnull);
+
+				if (!rfisnull)
+				{
+					Node   *rfnode;
+
+					oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+					/* Gather the rfnodes per pubaction of this publiaction. */
+					if (pub->pubactions.pubinsert)
+					{
+						rfnode = stringToNode(TextDatumGetCString(rfdatum));
+						rfnodes[idx_ins] = lappend(rfnodes[idx_ins], rfnode);
+					}
+					if (pub->pubactions.pubupdate)
+					{
+						rfnode = stringToNode(TextDatumGetCString(rfdatum));
+						rfnodes[idx_upd] = lappend(rfnodes[idx_upd], rfnode);
+					}
+					if (pub->pubactions.pubdelete)
+					{
+						rfnode = stringToNode(TextDatumGetCString(rfdatum));
+						rfnodes[idx_del] = lappend(rfnodes[idx_del], rfnode);
+					}
+					MemoryContextSwitchTo(oldctx);
+				}
+				else
+				{
+					/* Remember which pubactions have no row-filter. */
+					if (pub->pubactions.pubinsert)
+						no_filter[idx_ins] = true;
+					if (pub->pubactions.pubupdate)
+						no_filter[idx_upd] = true;
+					if (pub->pubactions.pubdelete)
+						no_filter[idx_del] = true;
+
+					/* Quick exit loop if all pubactions have no row-filter. */
+					if (no_filter[idx_ins] && no_filter[idx_upd] && no_filter[idx_del])
+					{
+						ReleaseSysCache(rftuple);
+						break;
+					}
+				}
+
+				ReleaseSysCache(rftuple);
+			}
+
+		} /* loop all subscribed publications */
+
+		/*
+		 * Now all the filters for all pubactions are known. Combine them when
+		 * their pubactions are same.
+		 *
+		 * All row filter expressions will be discarded if there is one
+		 * publication-relation entry without a row filter. That's because all
+		 * expressions are aggregated by the OR operator. The row filter absence
+		 * means replicate all rows so a single valid expression means publish
+		 * this row.
+		 */
+		for (idx = 0; idx < IDX_PUBACTION_n; idx++)
+		{
+			int n_filters;
+
+			if (no_filter[idx])
+			{
+				if (rfnodes[idx])
+				{
+					list_free_deep(rfnodes[idx]);
+					rfnodes[idx] = NIL;
+				}
+			}
+
+			/*
+			 * If there was one or more filter for this pubaction then combine them
+			 * (if necessary) and cache the ExprState.
+			 */
+			n_filters = list_length(rfnodes[idx]);
+			if (n_filters > 0)
+			{
+				Node	   *rfnode;
+
+				oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+				rfnode = n_filters > 1 ? makeBoolExpr(OR_EXPR, rfnodes[idx], -1) : linitial(rfnodes[idx]);
+				entry->exprstate[idx] = pgoutput_row_filter_init_expr(rfnode);
+				MemoryContextSwitchTo(oldctx);
+
+				found_filters = true; /* flag that we will need slots made */
+			}
+		} /* for each pubaction */
+
+		if (found_filters)
+		{
+			TupleDesc	tupdesc = RelationGetDescr(relation);
+
+			/*
+			 * Create tuple table slots for row filter. Create a copy of the
+			 * TupleDesc as it needs to live as long as the cache remains.
+			 */
+			oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+			tupdesc = CreateTupleDescCopy(tupdesc);
+			entry->scantuple = MakeSingleTupleTableSlot(tupdesc, &TTSOpsHeapTuple);
+			MemoryContextSwitchTo(oldctx);
+		}
+
+		entry->exprstate_valid = true;
+	}
+
+	/* Bail out if there is no row filter */
+	if (!entry->exprstate[changetype])
+		return true;
+
+	if (message_level_is_interesting(DEBUG3))
+		elog(DEBUG3, "table \"%s.%s\" has row filter",
+			 get_namespace_name(get_rel_namespace(relid)),
+			 get_rel_name(relid));
+
+	PushActiveSnapshot(GetTransactionSnapshot());
+
+	estate = create_estate_for_relation(relation);
+
+	/* Prepare context per tuple */
+	ecxt = GetPerTupleExprContext(estate);
+	ecxt->ecxt_scantuple = entry->scantuple;
+
+	ExecStoreHeapTuple(newtuple ? newtuple : oldtuple, ecxt->ecxt_scantuple, false);
+
+	/*
+	 * NOTE: Multiple publication row-filters have already been combined to a
+	 * single exprstate (for this pubaction).
+	 */
+	if (entry->exprstate[changetype])
+	{
+		/* Evaluates row filter */
+		result = pgoutput_row_filter_exec_expr(entry->exprstate[changetype], ecxt);
+	}
+
+	/* Cleanup allocated resources */
+	ResetExprContext(ecxt);
+	FreeExecutorState(estate);
+	PopActiveSnapshot();
+
+	return result;
+}
+
 /*
  * Sends the decoded DML over wire.
  *
@@ -647,7 +991,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	if (in_streaming)
 		xid = change->txn->xid;
 
-	relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
+	relentry = get_rel_sync_entry(data, relation);
 
 	/* First check the table filter */
 	switch (change->action)
@@ -671,8 +1015,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);
 
-	maybe_send_schema(ctx, change, relation, relentry);
-
 	/* Send the data */
 	switch (change->action)
 	{
@@ -680,6 +1022,16 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			{
 				HeapTuple	tuple = &change->data.tp.newtuple->tuple;
 
+				/* Check row filter. */
+				if (!pgoutput_row_filter(change->action, data, relation, NULL, tuple, relentry))
+					break;
+
+				/*
+				 * Schema should be sent before the logic that replaces the
+				 * relation because it also sends the ancestor's relation.
+				 */
+				maybe_send_schema(ctx, change, relation, relentry);
+
 				/* Switch relation if publishing via root. */
 				if (relentry->publish_as_relid != RelationGetRelid(relation))
 				{
@@ -703,6 +1055,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				&change->data.tp.oldtuple->tuple : NULL;
 				HeapTuple	newtuple = &change->data.tp.newtuple->tuple;
 
+				/* Check row filter. */
+				if (!pgoutput_row_filter(change->action, data, relation, oldtuple, newtuple, relentry))
+					break;
+
+				maybe_send_schema(ctx, change, relation, relentry);
+
 				/* Switch relation if publishing via root. */
 				if (relentry->publish_as_relid != RelationGetRelid(relation))
 				{
@@ -731,6 +1089,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			{
 				HeapTuple	oldtuple = &change->data.tp.oldtuple->tuple;
 
+				/* Check row filter. */
+				if (!pgoutput_row_filter(change->action, data, relation, oldtuple, NULL, relentry))
+					break;
+
+				maybe_send_schema(ctx, change, relation, relentry);
+
 				/* Switch relation if publishing via root. */
 				if (relentry->publish_as_relid != RelationGetRelid(relation))
 				{
@@ -794,7 +1158,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 		if (!is_publishable_relation(relation))
 			continue;
 
-		relentry = get_rel_sync_entry(data, relid);
+		relentry = get_rel_sync_entry(data, relation);
 
 		if (!relentry->pubactions.pubtruncate)
 			continue;
@@ -1116,9 +1480,10 @@ set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
  * when publishing.
  */
 static RelationSyncEntry *
-get_rel_sync_entry(PGOutputData *data, Oid relid)
+get_rel_sync_entry(PGOutputData *data, Relation relation)
 {
 	RelationSyncEntry *entry;
+	Oid			relid = RelationGetRelid(relation);
 	bool		am_partition = get_rel_relispartition(relid);
 	char		relkind = get_rel_relkind(relid);
 	bool		found;
@@ -1139,8 +1504,13 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		entry->schema_sent = false;
 		entry->streamed_txns = NIL;
 		entry->replicate_valid = false;
+		entry->exprstate_valid = false;
 		entry->pubactions.pubinsert = entry->pubactions.pubupdate =
 			entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false;
+		entry->scantuple = NULL;
+		entry->exprstate[REORDER_BUFFER_CHANGE_INSERT] = NULL;
+		entry->exprstate[REORDER_BUFFER_CHANGE_UPDATE] = NULL;
+		entry->exprstate[REORDER_BUFFER_CHANGE_DELETE] = NULL;
 		entry->publish_as_relid = InvalidOid;
 		entry->map = NULL;		/* will be set by maybe_send_schema() if
 								 * needed */
@@ -1245,9 +1615,6 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 				entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate;
 			}
 
-			if (entry->pubactions.pubinsert && entry->pubactions.pubupdate &&
-				entry->pubactions.pubdelete && entry->pubactions.pubtruncate)
-				break;
 		}
 
 		list_free(pubids);
@@ -1310,6 +1677,7 @@ static void
 rel_sync_cache_relation_cb(Datum arg, Oid relid)
 {
 	RelationSyncEntry *entry;
+	int	idx;
 
 	/*
 	 * We can get here if the plugin was used in SQL interface as the
@@ -1354,6 +1722,25 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 			free_conversion_map(entry->map);
 		}
 		entry->map = NULL;
+
+		/*
+		 * Row filter cache cleanups. (Will be rebuilt later if needed).
+		 */
+		entry->exprstate_valid = false;
+		if (entry->scantuple != NULL)
+		{
+			ExecDropSingleTupleTableSlot(entry->scantuple);
+			entry->scantuple = NULL;
+		}
+		/* Cleanup the ExprState for each of the pubactions. */
+		for (idx = 0; idx < IDX_PUBACTION_n; idx++)
+		{
+			if (entry->exprstate[idx] != NULL)
+			{
+				pfree(entry->exprstate[idx]);
+				entry->exprstate[idx] = NULL;
+			}
+		}
 	}
 }
 
@@ -1365,6 +1752,7 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
 {
 	HASH_SEQ_STATUS status;
 	RelationSyncEntry *entry;
+	MemoryContext oldctx;
 
 	/*
 	 * We can get here if the plugin was used in SQL interface as the
@@ -1374,6 +1762,8 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
 	if (RelationSyncCache == NULL)
 		return;
 
+	oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+
 	/*
 	 * There is no way to find which entry in our cache the hash belongs to so
 	 * mark the whole cache as invalid.
@@ -1392,6 +1782,8 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
 		entry->pubactions.pubdelete = false;
 		entry->pubactions.pubtruncate = false;
 	}
+
+	MemoryContextSwitchTo(oldctx);
 }
 
 /* Send Replication origin */
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index c28788e84f..929b2f5388 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -2868,17 +2868,21 @@ describeOneTableDetails(const char *schemaname,
 			{
 				printfPQExpBuffer(&buf,
 								  "SELECT pubname\n"
+								  "		, NULL\n"
 								  "FROM pg_catalog.pg_publication p\n"
 								  "		JOIN pg_catalog.pg_publication_namespace pn ON p.oid = pn.pnpubid\n"
 								  "		JOIN pg_catalog.pg_class pc ON pc.relnamespace = pn.pnnspid\n"
 								  "WHERE pc.oid ='%s' and pg_catalog.pg_relation_is_publishable('%s')\n"
 								  "UNION\n"
 								  "SELECT pubname\n"
+								  "		, pg_get_expr(pr.prqual, c.oid)\n"
 								  "FROM pg_catalog.pg_publication p\n"
 								  "		JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n"
+								  "		JOIN pg_catalog.pg_class c ON c.oid = pr.prrelid\n"
 								  "WHERE pr.prrelid = '%s'\n"
 								  "UNION\n"
 								  "SELECT pubname\n"
+								  "		, NULL\n"
 								  "FROM pg_catalog.pg_publication p\n"
 								  "WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n"
 								  "ORDER BY 1;",
@@ -2914,6 +2918,13 @@ describeOneTableDetails(const char *schemaname,
 				printfPQExpBuffer(&buf, "    \"%s\"",
 								  PQgetvalue(result, i, 0));
 
+				/* row filter (if any) */
+				if (pset.sversion >= 150000)
+				{
+					if (!PQgetisnull(result, i, 1))
+						appendPQExpBuffer(&buf, " WHERE %s", PQgetvalue(result, i, 1));
+				}
+
 				printTableAddFooter(&cont, buf.data);
 			}
 			PQclear(result);
@@ -5833,8 +5844,12 @@ addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg,
 	for (i = 0; i < count; i++)
 	{
 		if (!singlecol)
+		{
 			printfPQExpBuffer(buf, "    \"%s.%s\"", PQgetvalue(res, i, 0),
 							  PQgetvalue(res, i, 1));
+			if (!PQgetisnull(res, i, 2))
+				appendPQExpBuffer(buf, " WHERE %s", PQgetvalue(res, i, 2));
+		}
 		else
 			printfPQExpBuffer(buf, "    \"%s\"", PQgetvalue(res, i, 0));
 
@@ -5963,8 +5978,15 @@ describePublications(const char *pattern)
 		{
 			/* Get the tables for the specified publication */
 			printfPQExpBuffer(&buf,
-							  "SELECT n.nspname, c.relname\n"
-							  "FROM pg_catalog.pg_class c,\n"
+							  "SELECT n.nspname, c.relname");
+			if (pset.sversion >= 150000)
+				appendPQExpBufferStr(&buf,
+									 ", pg_get_expr(pr.prqual, c.oid)");
+			else
+				appendPQExpBufferStr(&buf,
+									 ", NULL");
+			appendPQExpBuffer(&buf,
+							  "\nFROM pg_catalog.pg_class c,\n"
 							  "     pg_catalog.pg_namespace n,\n"
 							  "     pg_catalog.pg_publication_rel pr\n"
 							  "WHERE c.relnamespace = n.oid\n"
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 902f2f2f0d..96c55f627d 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -20,6 +20,7 @@
 #include "catalog/genbki.h"
 #include "catalog/objectaddress.h"
 #include "catalog/pg_publication_d.h"
+#include "parser/parse_node.h"
 
 /* ----------------
  *		pg_publication definition.  cpp turns this into
@@ -86,6 +87,7 @@ typedef struct Publication
 typedef struct PublicationRelInfo
 {
 	Relation	relation;
+	Node	   *whereClause;
 } PublicationRelInfo;
 
 extern Publication *GetPublication(Oid pubid);
@@ -123,13 +125,16 @@ extern List *GetPubPartitionOptionRelations(List *result,
 
 extern bool is_publishable_relation(Relation rel);
 extern bool is_schema_publication(Oid pubid);
-extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *targetrel,
+extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *pri,
 											  bool if_not_exists);
 extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaid,
 											bool if_not_exists);
 
 extern Oid	get_publication_oid(const char *pubname, bool missing_ok);
 extern char *get_publication_name(Oid pubid, bool missing_ok);
+extern Node *GetTransformedWhereClause(ParseState *pstate,
+									   PublicationRelInfo *pri,
+									   bool bfixupcollation);
 
 
 #endif							/* PG_PUBLICATION_H */
diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h
index b5d5504cbb..154bb61777 100644
--- a/src/include/catalog/pg_publication_rel.h
+++ b/src/include/catalog/pg_publication_rel.h
@@ -31,6 +31,10 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId)
 	Oid			oid;			/* oid */
 	Oid			prpubid BKI_LOOKUP(pg_publication); /* Oid of the publication */
 	Oid			prrelid BKI_LOOKUP(pg_class);	/* Oid of the relation */
+
+#ifdef	CATALOG_VARLEN			/* variable-length fields start here */
+	pg_node_tree prqual;		/* qualifications */
+#endif
 } FormData_pg_publication_rel;
 
 /* ----------------
@@ -40,6 +44,8 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId)
  */
 typedef FormData_pg_publication_rel *Form_pg_publication_rel;
 
+DECLARE_TOAST(pg_publication_rel, 8287, 8288);
+
 DECLARE_UNIQUE_INDEX_PKEY(pg_publication_rel_oid_index, 6112, PublicationRelObjectIndexId, on pg_publication_rel using btree(oid oid_ops));
 DECLARE_UNIQUE_INDEX(pg_publication_rel_prrelid_prpubid_index, 6113, PublicationRelPrrelidPrpubidIndexId, on pg_publication_rel using btree(prrelid oid_ops, prpubid oid_ops));
 
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 4c5a8a39bf..e437a55bb2 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3642,6 +3642,7 @@ typedef struct PublicationTable
 {
 	NodeTag		type;
 	RangeVar   *relation;		/* relation to be published */
+	Node	   *whereClause;	/* qualifications */
 } PublicationTable;
 
 /*
diff --git a/src/include/parser/parse_node.h b/src/include/parser/parse_node.h
index ee179082ce..d58ae6a63f 100644
--- a/src/include/parser/parse_node.h
+++ b/src/include/parser/parse_node.h
@@ -80,6 +80,7 @@ typedef enum ParseExprKind
 	EXPR_KIND_COPY_WHERE,		/* WHERE condition in COPY FROM */
 	EXPR_KIND_GENERATED_COLUMN, /* generation expression for a column */
 	EXPR_KIND_CYCLE_MARK,		/* cycle mark value */
+	EXPR_KIND_PUBLICATION_WHERE /* WHERE condition for a table in PUBLICATION */
 } ParseExprKind;
 
 
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 5ac2d666a2..5a49003ae4 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -239,6 +239,157 @@ ALTER PUBLICATION testpub_forparted DROP TABLE testpub_parted;
 UPDATE testpub_parted2 SET a = 2;
 DROP TABLE testpub_parted1, testpub_parted2;
 DROP PUBLICATION testpub_forparted, testpub_forparted1;
+CREATE TABLE testpub_rf_tbl1 (a integer, b text);
+CREATE TABLE testpub_rf_tbl2 (c text, d integer);
+CREATE TABLE testpub_rf_tbl3 (e integer);
+CREATE TABLE testpub_rf_tbl4 (g text);
+CREATE SCHEMA testpub_rf_myschema;
+CREATE TABLE testpub_rf_myschema.testpub_rf_tbl5(h integer);
+CREATE SCHEMA testpub_rf_myschema1;
+CREATE TABLE testpub_rf_myschema1.testpub_rf_tbl6(i integer);
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub5 FOR TABLE testpub_rf_tbl1, testpub_rf_tbl2 WHERE (c <> 'test' AND d < 5);
+RESET client_min_messages;
+\dRp+ testpub5
+                                    Publication testpub5
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f          | t       | t       | t       | t         | f
+Tables:
+    "public.testpub_rf_tbl1"
+    "public.testpub_rf_tbl2" WHERE ((c <> 'test'::text) AND (d < 5))
+
+ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl3 WHERE (e > 1000 AND e < 2000);
+\dRp+ testpub5
+                                    Publication testpub5
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f          | t       | t       | t       | t         | f
+Tables:
+    "public.testpub_rf_tbl1"
+    "public.testpub_rf_tbl2" WHERE ((c <> 'test'::text) AND (d < 5))
+    "public.testpub_rf_tbl3" WHERE ((e > 1000) AND (e < 2000))
+
+ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl2;
+\dRp+ testpub5
+                                    Publication testpub5
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f          | t       | t       | t       | t         | f
+Tables:
+    "public.testpub_rf_tbl1"
+    "public.testpub_rf_tbl3" WHERE ((e > 1000) AND (e < 2000))
+
+-- remove testpub_rf_tbl1 and add testpub_rf_tbl3 again (another WHERE expression)
+ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (e > 300 AND e < 500);
+\dRp+ testpub5
+                                    Publication testpub5
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f          | t       | t       | t       | t         | f
+Tables:
+    "public.testpub_rf_tbl3" WHERE ((e > 300) AND (e < 500))
+
+-- test \d+ (now it displays filter information)
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_dplus_rf_yes FOR TABLE testpub_rf_tbl1 WHERE (a > 1) WITH (publish = 'insert');
+CREATE PUBLICATION testpub_dplus_rf_no FOR TABLE testpub_rf_tbl1;
+RESET client_min_messages;
+\d+ testpub_rf_tbl1
+                              Table "public.testpub_rf_tbl1"
+ Column |  Type   | Collation | Nullable | Default | Storage  | Stats target | Description 
+--------+---------+-----------+----------+---------+----------+--------------+-------------
+ a      | integer |           |          |         | plain    |              | 
+ b      | text    |           |          |         | extended |              | 
+Publications:
+    "testpub_dplus_rf_no"
+    "testpub_dplus_rf_yes" WHERE (a > 1)
+
+DROP PUBLICATION testpub_dplus_rf_yes, testpub_dplus_rf_no;
+-- some more syntax tests to exercise other parser pathways
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_syntax1 FOR TABLE testpub_rf_tbl1, ONLY testpub_rf_tbl3 WHERE (e < 999);
+RESET client_min_messages;
+\dRp+ testpub_syntax1
+                                Publication testpub_syntax1
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f          | t       | t       | t       | t         | f
+Tables:
+    "public.testpub_rf_tbl1"
+    "public.testpub_rf_tbl3" WHERE (e < 999)
+
+DROP PUBLICATION testpub_syntax1;
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_syntax2 FOR TABLE testpub_rf_tbl1, testpub_rf_myschema.testpub_rf_tbl5 WHERE (h < 999);
+RESET client_min_messages;
+\dRp+ testpub_syntax2
+                                Publication testpub_syntax2
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
+--------------------------+------------+---------+---------+---------+-----------+----------
+ regress_publication_user | f          | t       | t       | t       | t         | f
+Tables:
+    "public.testpub_rf_tbl1"
+    "testpub_rf_myschema.testpub_rf_tbl5" WHERE (h < 999)
+
+DROP PUBLICATION testpub_syntax2;
+-- fail - schemas are not allowed WHERE row-filter
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_syntax3 FOR ALL TABLES IN SCHEMA testpub_rf_myschema WHERE (a = 123);
+ERROR:  syntax error at or near "WHERE"
+LINE 1: ...tax3 FOR ALL TABLES IN SCHEMA testpub_rf_myschema WHERE (a =...
+                                                             ^
+CREATE PUBLICATION testpub_syntax3 FOR ALL TABLES IN SCHEMA testpub_rf_myschema, testpub_rf_myschema WHERE (a = 123);
+ERROR:  WHERE clause for schema not allowed
+LINE 1: ...ax3 FOR ALL TABLES IN SCHEMA testpub_rf_myschema, testpub_rf...
+                                                             ^
+RESET client_min_messages;
+-- fail - duplicate tables are not allowed if that table has any WHERE row-filters
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_dups FOR TABLE testpub_rf_tbl1 WHERE (a = 1), testpub_rf_tbl1 WITH (publish = 'insert');
+ERROR:  conflicting or redundant row-filters for "testpub_rf_tbl1"
+CREATE PUBLICATION testpub_dups FOR TABLE testpub_rf_tbl1, testpub_rf_tbl1 WHERE (a = 2) WITH (publish = 'insert');
+ERROR:  conflicting or redundant row-filters for "testpub_rf_tbl1"
+RESET client_min_messages;
+-- fail - aggregate functions not allowed in WHERE clause
+ALTER PUBLICATION  testpub5 SET TABLE testpub_rf_tbl3 WHERE (e < AVG(e));
+ERROR:  functions are not allowed in publication WHERE expressions
+LINE 1: ...TION  testpub5 SET TABLE testpub_rf_tbl3 WHERE (e < AVG(e));
+                                                               ^
+-- fail - functions disallowed
+ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl4 WHERE (length(g) < 6);
+ERROR:  functions are not allowed in publication WHERE expressions
+LINE 1: ...ICATION testpub5 ADD TABLE testpub_rf_tbl4 WHERE (length(g) ...
+                                                             ^
+-- fail - user-defined operators disallowed
+CREATE FUNCTION testpub_rf_func(integer, integer) RETURNS boolean AS $$ SELECT hashint4($1) > $2 $$ LANGUAGE SQL;
+CREATE OPERATOR =#> (PROCEDURE = testpub_rf_func, LEFTARG = integer, RIGHTARG = integer);
+CREATE PUBLICATION testpub6 FOR TABLE testpub_rf_tbl3 WHERE (e =#> 27);
+ERROR:  user-defined operators are not allowed in publication WHERE expressions
+LINE 1: ...ICATION testpub6 FOR TABLE testpub_rf_tbl3 WHERE (e =#> 27);
+                                                               ^
+-- fail - WHERE not allowed in DROP
+ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl3 WHERE (e < 27);
+ERROR:  invalid use of WHERE row-filter in ALTER PUBLICATION ... DROP TABLE
+-- fail - cannot ALTER SET table which is a member of a pre-existing schema
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub7 FOR ALL TABLES IN SCHEMA testpub_rf_myschema1;
+ALTER PUBLICATION testpub7 SET ALL TABLES IN SCHEMA testpub_rf_myschema1, TABLE testpub_rf_myschema1.testpub_rf_tbl6 WHERE (i < 99);
+ERROR:  cannot add relation "testpub_rf_myschema1.testpub_rf_tbl6" to publication
+DETAIL:  Table's schema "testpub_rf_myschema1" is already part of the publication or part of the specified schema list.
+RESET client_min_messages;
+DROP TABLE testpub_rf_tbl1;
+DROP TABLE testpub_rf_tbl2;
+DROP TABLE testpub_rf_tbl3;
+DROP TABLE testpub_rf_tbl4;
+DROP TABLE testpub_rf_myschema.testpub_rf_tbl5;
+DROP TABLE testpub_rf_myschema1.testpub_rf_tbl6;
+DROP SCHEMA testpub_rf_myschema;
+DROP SCHEMA testpub_rf_myschema1;
+DROP PUBLICATION testpub5;
+DROP PUBLICATION testpub7;
+DROP OPERATOR =#>(integer, integer);
+DROP FUNCTION testpub_rf_func(integer, integer);
 -- Test cache invalidation FOR ALL TABLES publication
 SET client_min_messages = 'ERROR';
 CREATE TABLE testpub_tbl4(a int);
diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql
index 56dd358554..47bdba86ac 100644
--- a/src/test/regress/sql/publication.sql
+++ b/src/test/regress/sql/publication.sql
@@ -134,6 +134,82 @@ UPDATE testpub_parted2 SET a = 2;
 DROP TABLE testpub_parted1, testpub_parted2;
 DROP PUBLICATION testpub_forparted, testpub_forparted1;
 
+CREATE TABLE testpub_rf_tbl1 (a integer, b text);
+CREATE TABLE testpub_rf_tbl2 (c text, d integer);
+CREATE TABLE testpub_rf_tbl3 (e integer);
+CREATE TABLE testpub_rf_tbl4 (g text);
+CREATE SCHEMA testpub_rf_myschema;
+CREATE TABLE testpub_rf_myschema.testpub_rf_tbl5(h integer);
+CREATE SCHEMA testpub_rf_myschema1;
+CREATE TABLE testpub_rf_myschema1.testpub_rf_tbl6(i integer);
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub5 FOR TABLE testpub_rf_tbl1, testpub_rf_tbl2 WHERE (c <> 'test' AND d < 5);
+RESET client_min_messages;
+\dRp+ testpub5
+ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl3 WHERE (e > 1000 AND e < 2000);
+\dRp+ testpub5
+ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl2;
+\dRp+ testpub5
+-- remove testpub_rf_tbl1 and add testpub_rf_tbl3 again (another WHERE expression)
+ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (e > 300 AND e < 500);
+\dRp+ testpub5
+-- test \d+ (now it displays filter information)
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_dplus_rf_yes FOR TABLE testpub_rf_tbl1 WHERE (a > 1) WITH (publish = 'insert');
+CREATE PUBLICATION testpub_dplus_rf_no FOR TABLE testpub_rf_tbl1;
+RESET client_min_messages;
+\d+ testpub_rf_tbl1
+DROP PUBLICATION testpub_dplus_rf_yes, testpub_dplus_rf_no;
+-- some more syntax tests to exercise other parser pathways
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_syntax1 FOR TABLE testpub_rf_tbl1, ONLY testpub_rf_tbl3 WHERE (e < 999);
+RESET client_min_messages;
+\dRp+ testpub_syntax1
+DROP PUBLICATION testpub_syntax1;
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_syntax2 FOR TABLE testpub_rf_tbl1, testpub_rf_myschema.testpub_rf_tbl5 WHERE (h < 999);
+RESET client_min_messages;
+\dRp+ testpub_syntax2
+DROP PUBLICATION testpub_syntax2;
+-- fail - schemas are not allowed WHERE row-filter
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_syntax3 FOR ALL TABLES IN SCHEMA testpub_rf_myschema WHERE (a = 123);
+CREATE PUBLICATION testpub_syntax3 FOR ALL TABLES IN SCHEMA testpub_rf_myschema, testpub_rf_myschema WHERE (a = 123);
+RESET client_min_messages;
+-- fail - duplicate tables are not allowed if that table has any WHERE row-filters
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub_dups FOR TABLE testpub_rf_tbl1 WHERE (a = 1), testpub_rf_tbl1 WITH (publish = 'insert');
+CREATE PUBLICATION testpub_dups FOR TABLE testpub_rf_tbl1, testpub_rf_tbl1 WHERE (a = 2) WITH (publish = 'insert');
+RESET client_min_messages;
+-- fail - aggregate functions not allowed in WHERE clause
+ALTER PUBLICATION  testpub5 SET TABLE testpub_rf_tbl3 WHERE (e < AVG(e));
+-- fail - functions disallowed
+ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl4 WHERE (length(g) < 6);
+-- fail - user-defined operators disallowed
+CREATE FUNCTION testpub_rf_func(integer, integer) RETURNS boolean AS $$ SELECT hashint4($1) > $2 $$ LANGUAGE SQL;
+CREATE OPERATOR =#> (PROCEDURE = testpub_rf_func, LEFTARG = integer, RIGHTARG = integer);
+CREATE PUBLICATION testpub6 FOR TABLE testpub_rf_tbl3 WHERE (e =#> 27);
+-- fail - WHERE not allowed in DROP
+ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl3 WHERE (e < 27);
+-- fail - cannot ALTER SET table which is a member of a pre-existing schema
+SET client_min_messages = 'ERROR';
+CREATE PUBLICATION testpub7 FOR ALL TABLES IN SCHEMA testpub_rf_myschema1;
+ALTER PUBLICATION testpub7 SET ALL TABLES IN SCHEMA testpub_rf_myschema1, TABLE testpub_rf_myschema1.testpub_rf_tbl6 WHERE (i < 99);
+RESET client_min_messages;
+
+DROP TABLE testpub_rf_tbl1;
+DROP TABLE testpub_rf_tbl2;
+DROP TABLE testpub_rf_tbl3;
+DROP TABLE testpub_rf_tbl4;
+DROP TABLE testpub_rf_myschema.testpub_rf_tbl5;
+DROP TABLE testpub_rf_myschema1.testpub_rf_tbl6;
+DROP SCHEMA testpub_rf_myschema;
+DROP SCHEMA testpub_rf_myschema1;
+DROP PUBLICATION testpub5;
+DROP PUBLICATION testpub7;
+DROP OPERATOR =#>(integer, integer);
+DROP FUNCTION testpub_rf_func(integer, integer);
+
 -- Test cache invalidation FOR ALL TABLES publication
 SET client_min_messages = 'ERROR';
 CREATE TABLE testpub_tbl4(a int);
diff --git a/src/test/subscription/t/027_row_filter.pl b/src/test/subscription/t/027_row_filter.pl
new file mode 100644
index 0000000000..64e71d0adb
--- /dev/null
+++ b/src/test/subscription/t/027_row_filter.pl
@@ -0,0 +1,357 @@
+# Test logical replication behavior with row filtering
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 10;
+
+# create publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# setup structure on publisher
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_1 (a int primary key, b text)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_2 (c int primary key)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_3 (a int primary key, b boolean)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_4 (c int primary key)");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_partitioned (a int primary key, b integer) PARTITION BY RANGE(a)"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_less_10k (LIKE tab_rowfilter_partitioned)");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE tab_rowfilter_partitioned ATTACH PARTITION tab_rowfilter_less_10k FOR VALUES FROM (MINVALUE) TO (10000)"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_greater_10k (LIKE tab_rowfilter_partitioned)"
+);
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE tab_rowfilter_partitioned ATTACH PARTITION tab_rowfilter_greater_10k FOR VALUES FROM (10000) TO (MAXVALUE)"
+);
+
+# setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_1 (a int primary key, b text)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_2 (c int primary key)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_3 (a int primary key, b boolean)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_4 (c int primary key)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_partitioned (a int primary key, b integer) PARTITION BY RANGE(a)"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_less_10k (LIKE tab_rowfilter_partitioned)");
+$node_subscriber->safe_psql('postgres',
+	"ALTER TABLE tab_rowfilter_partitioned ATTACH PARTITION tab_rowfilter_less_10k FOR VALUES FROM (MINVALUE) TO (10000)"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_rowfilter_greater_10k (LIKE tab_rowfilter_partitioned)"
+);
+$node_subscriber->safe_psql('postgres',
+	"ALTER TABLE tab_rowfilter_partitioned ATTACH PARTITION tab_rowfilter_greater_10k FOR VALUES FROM (10000) TO (MAXVALUE)"
+);
+
+# setup logical replication
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_1 FOR TABLE tab_rowfilter_1 WHERE (a > 1000 AND b <> 'filtered')"
+);
+
+$node_publisher->safe_psql('postgres',
+	"ALTER PUBLICATION tap_pub_1 ADD TABLE tab_rowfilter_2 WHERE (c % 7 = 0)"
+);
+
+$node_publisher->safe_psql('postgres',
+	"ALTER PUBLICATION tap_pub_1 SET TABLE tab_rowfilter_1 WHERE (a > 1000 AND b <> 'filtered'), tab_rowfilter_2 WHERE (c % 2 = 0), tab_rowfilter_3"
+);
+
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_2 FOR TABLE tab_rowfilter_2 WHERE (c % 3 = 0)"
+);
+
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_3 FOR TABLE tab_rowfilter_partitioned WHERE (a < 5000)"
+);
+$node_publisher->safe_psql('postgres',
+	"ALTER PUBLICATION tap_pub_3 ADD TABLE tab_rowfilter_less_10k WHERE (a < 6000)"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_not_used FOR TABLE tab_rowfilter_1 WHERE (a < 0)"
+);
+
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_4a FOR TABLE tab_rowfilter_4 WHERE (c % 2 = 0)"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_4b FOR TABLE tab_rowfilter_4"
+);
+
+#
+# The following INSERTs are executed before the CREATE SUBSCRIPTION, so these
+# SQL commands are for testing the initial data copy using logical replication.
+#
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) VALUES (1, 'not replicated')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) VALUES (1500, 'filtered')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) VALUES (1980, 'not filtered')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) SELECT x, 'test ' || x FROM generate_series(990,1002) x"
+);
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_2 (c) SELECT generate_series(1, 20)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_3 (a, b) SELECT x, (x % 3 = 0) FROM generate_series(1, 10) x");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_4 (c) SELECT generate_series(1, 10)");
+
+# insert data into partitioned table and directly on the partition
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_partitioned (a, b) VALUES(1, 100),(7000, 101),(15000, 102),(5500, 300)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_less_10k (a, b) VALUES(2, 200),(6005, 201)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_greater_10k (a, b) VALUES(16000, 103)");
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+my $appname           = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2, tap_pub_3, tap_pub_4a, tap_pub_4b"
+);
+
+$node_publisher->wait_for_catchup($appname);
+
+# wait for initial table synchronization to finish
+my $synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Check expected replicated rows for tab_rowfilter_1
+# tap_pub_1 filter is: (a > 1000 AND b <> 'filtered')
+# - INSERT (1, 'not replicated')   NO, because a is not > 1000
+# - INSERT (1500, 'filtered')      NO, because b == 'filtered'
+# - INSERT (1980, 'not filtered')  YES
+# - generate_series(990,1002)      YES, only for 1001,1002 because a > 1000
+#
+my $result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT a, b FROM tab_rowfilter_1 ORDER BY 1, 2");
+is( $result, qq(1001|test 1001
+1002|test 1002
+1980|not filtered), 'check initial data copy from table tab_rowfilter_1');
+
+# Check expected replicated rows for tab_rowfilter_2
+# tap_pub_1 filter is: (c % 2 = 0)
+# tap_pub_2 filter is: (c % 3 = 0)
+# When there are multiple publications for the same table, the filters
+# expressions are OR'ed together. In this case, rows are replicated if
+# c value is divided by 2 OR 3 (2, 3, 4, 6, 8, 9, 10, 12, 14, 15, 16, 18, 20)
+#
+$result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT count(c), min(c), max(c) FROM tab_rowfilter_2");
+is($result, qq(13|2|20), 'check initial data copy from table tab_rowfilter_2');
+
+# Check expected replicated rows for tab_rowfilter_4
+# (same table in two publications but only one has a filter).
+# tap_pub_4a filter is: (c % 2 = 0)
+# tap_pub_4b filter is: <no filter>
+# Expressions are OR'ed together but when there is no filter it just means
+# OR everything - e.g. same as no filter at all.
+# Expect all rows: (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
+$result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT count(c), min(c), max(c) FROM tab_rowfilter_4");
+is($result, qq(10|1|10), 'check initial data copy from table tab_rowfilter_4');
+
+# Check expected replicated rows for tab_rowfilter_3
+# There is no filter. 10 rows are inserted, so 10 rows are replicated.
+$result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT count(a) FROM tab_rowfilter_3");
+is($result, qq(10), 'check initial data copy from table tab_rowfilter_3');
+
+# Check expected replicated rows for partitions
+# publication option publish_via_partition_root is false so use the row filter
+# from a partition
+# tab_rowfilter_partitioned filter: (a < 5000)
+# tab_rowfilter_less_10k filter:    (a < 6000)
+# tab_rowfilter_greater_10k filter: no filter
+#
+# INSERT into tab_rowfilter_partitioned:
+# - INSERT (1,100)       YES, because 1 < 6000
+# - INSERT (7000, 101)   NO,  because 7000 is not < 6000
+# - INSERT (15000, 102)  YES, because tab_rowfilter_greater_10k has no filter
+# - INSERT (5500, 300)   YES, because 5500 < 6000
+#
+# INSERT directly into tab_rowfilter_less_10k:
+# - INSERT (2, 200)      YES, because 2 < 6000
+# - INSERT (6005, 201)   NO, because 6005 is not < 6000
+#
+# INSERT directly into tab_rowfilter_greater_10k:
+# - INSERT (16000, 103)  YES, because tab_rowfilter_greater_10k has no filter
+#
+$result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT a, b FROM tab_rowfilter_less_10k ORDER BY 1, 2");
+is($result, qq(1|100
+2|200
+5500|300), 'check initial data copy from partition tab_rowfilter_less_10k');
+
+$result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT a, b FROM tab_rowfilter_greater_10k ORDER BY 1, 2");
+is($result, qq(15000|102
+16000|103), 'check initial data copy from partition tab_rowfilter_greater_10k');
+
+# The following commands are executed after CREATE SUBSCRIPTION, so these SQL
+# commands are for testing normal logical replication behavior.
+#
+# test row filter (INSERT, UPDATE, DELETE)
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) VALUES (800, 'test 800')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) VALUES (1600, 'test 1600')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) VALUES (1601, 'test 1601')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_1 (a, b) VALUES (1700, 'test 1700')");
+$node_publisher->safe_psql('postgres',
+	"UPDATE tab_rowfilter_1 SET b = NULL WHERE a = 1600");
+$node_publisher->safe_psql('postgres',
+	"UPDATE tab_rowfilter_1 SET b = 'test 1601 updated' WHERE a = 1601");
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM tab_rowfilter_1 WHERE a = 1700");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_2 (c) VALUES (21), (22), (23), (24), (25)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_4 (c) VALUES (0), (11), (12)");
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check expected replicated rows for tab_rowfilter_2
+# tap_pub_1 filter is: (c % 2 = 0)
+# tap_pub_2 filter is: (c % 3 = 0)
+# When there are multiple publications for the same table, the filters
+# expressions are OR'ed together. In this case, rows are replicated if
+# c value is divided by 2 OR 3.
+#
+# Expect original rows (2, 3, 4, 6, 8, 9, 10, 12, 14, 15, 16, 18, 20)
+# Plus (21, 22, 24)
+#
+$result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT count(c), min(c), max(c) FROM tab_rowfilter_2");
+is($result, qq(16|2|24), 'check replicated rows to tab_rowfilter_2');
+
+# Check expected replicated rows for tab_rowfilter_4
+# (same table in two publications but only one has a filter).
+# tap_pub_4a filter is: (c % 2 = 0)
+# tap_pub_4b filter is: <no filter>
+# Expressions are OR'ed together but when there is no filter it just means
+# OR everything - e.g. same as no filter at all.
+# Expect all rows from initial copy: (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
+# And also (0, 11, 12)
+$result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT count(c), min(c), max(c) FROM tab_rowfilter_4");
+is($result, qq(13|0|12), 'check replicated rows to tab_rowfilter_4');
+
+# Check expected replicated rows for tab_rowfilter_1
+# tap_pub_1 filter is: (a > 1000 AND b <> 'filtered')
+#
+# - 1001, 1002, 1980 already exist from initial data copy
+# - INSERT (800, 'test 800')   NO, because 800 is not > 1000
+# - INSERT (1600, 'test 1600') YES, because 1600 > 1000 and 'test 1600' <> 'filtered'
+# - INSERT (1601, 'test 1601') YES, because 1601 > 1000 and 'test 1601' <> 'filtered'
+# - INSERT (1700, 'test 1700') YES, because 1700 > 1000 and 'test 1700' <> 'filtered'
+# - UPDATE (1600, NULL)        NO, row filter evaluates to false because NULL is not <> 'filtered'
+# - UPDATE (1601, 'test 1601 updated') YES, because 1601 > 1000 and 'test 1601 updated' <> 'filtered'
+# - DELETE (1700)              NO, row filter contains column b that is not part of
+# the PK or REPLICA IDENTITY and old tuple contains b = NULL, hence, row filter
+# evaluates to false
+#
+$result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT a, b FROM tab_rowfilter_1 ORDER BY 1, 2");
+is($result, qq(1001|test 1001
+1002|test 1002
+1600|test 1600
+1601|test 1601 updated
+1700|test 1700
+1980|not filtered), 'check replicated rows to table tab_rowfilter_1');
+
+# Publish using root partitioned table
+# Use a different partitioned table layout (exercise publish_via_partition_root)
+$node_publisher->safe_psql('postgres',
+	"ALTER PUBLICATION tap_pub_3 SET (publish_via_partition_root = true)");
+$node_subscriber->safe_psql('postgres',
+	"TRUNCATE TABLE tab_rowfilter_partitioned");
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION WITH (copy_data = true)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_partitioned (a, b) VALUES(4000, 400),(4001, 401),(4002, 402)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_less_10k (a, b) VALUES(4500, 450)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_less_10k (a, b) VALUES(5600, 123)");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_rowfilter_greater_10k (a, b) VALUES(14000, 1950)");
+$node_publisher->safe_psql('postgres',
+	"UPDATE tab_rowfilter_less_10k SET b = 30 WHERE a = 4001");
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM tab_rowfilter_less_10k WHERE a = 4002");
+
+$node_publisher->wait_for_catchup($appname);
+
+# Check expected replicated rows for partitions
+# publication option publish_via_partition_root is true so use the row filter
+# from the root partitioned table
+# tab_rowfilter_partitioned filter: (a < 5000)
+# tab_rowfilter_less_10k filter:    (a < 6000)
+# tab_rowfilter_greater_10k filter: no filter
+#
+# After TRUNCATE, REFRESH PUBLICATION, the initial data copy will apply the
+# partitioned table row filter.
+# - INSERT (1, 100)      YES, 1 < 5000
+# - INSERT (7000, 101)   NO, 7000 is not < 5000
+# - INSERT (15000, 102)  NO, 15000 is not < 5000
+# - INSERT (5500, 300)   NO, 5500 is not < 5000
+# - INSERT (2, 200)      YES, 2 < 5000
+# - INSERT (6005, 201)   NO, 6005 is not < 5000
+# - INSERT (16000, 103)  NO, 16000 is not < 5000
+#
+# Execute SQL commands after initial data copy for testing the logical
+# replication behavior.
+# - INSERT (4000, 400)    YES, 4000 < 5000
+# - INSERT (4001, 401)    YES, 4001 < 5000
+# - INSERT (4002, 402)    YES, 4002 < 5000
+# - INSERT (4500, 450)    YES, 4500 < 5000
+# - INSERT (5600, 123)    NO, 5600 is not < 5000
+# - INSERT (14000, 1950)  NO, 16000 is not < 5000
+# - UPDATE (4001)         YES, 4001 < 5000
+# - DELETE (4002)         YES, 4002 < 5000
+$result =
+  $node_subscriber->safe_psql('postgres',
+	"SELECT a, b FROM tab_rowfilter_partitioned ORDER BY 1, 2");
+is( $result, qq(1|100
+2|200
+4000|400
+4001|30
+4500|450), 'check publish_via_partition_root behavior');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
-- 
2.20.1

