From d3df4e9e798ffa370170b5596821e19567d0bbb5 Mon Sep 17 00:00:00 2001
From: vignesh <vignesh21@gmail.com>
Date: Mon, 14 Jun 2021 10:23:23 +0530
Subject: [PATCH v7 1/2] Added schema level support for publication.

This patch adds schema level support for publication.  User can specify multiple
schemas with schema option. When user specifies schema option, then the tables
present in the schema specified will be selected by publisher for sending the
data to subscriber.

pg_publication maintains the information about the publication. puballtables
bool column was used to indicate if the publication is "FOR ALL TABLES" or
"FOR TABLE" type currently. With the introduction of "FOR SCHEMA" publication
type, it is not easy to determine the publication type, hence a new column
pubtype was added to pg_publication relation to indicate the publication type.
There is a possibility to do without addition of new column, but that will
require checking puballtables of pg_publication and checking pg_publication_rel
for table type publication and then checking pg_publication_schema for schema
type publication. I preferred to use introduce pubtype which makes things
easier, this also will help for supporting new options in the future. New
system table pg_publication_schema was added which will maintain the schemas
that user wanted to publish through the publication. The
schema/publication/publication_schema dependency was created to handle the
corresponding renaming/removal of schema to the publication/publication_schema
when the schema is renamed/dropped. Decoder identifies if the relation is part
of the publication and replicates it to the subscriber. Changes were done in
pg_dump to handle pubtype updation in pg_publication table while the database
gets upgraded.

Prototypes present in pg_publication.h were moved to publicationcmds.h so
that minimal datastructures can be exported to pg_dump and psql clients and the
rest of the information need not be exported.

CATALOG_VERSION_NO needs to be updated while committing as this feature
involves catalog change.

TODO: version checks for psql/pg_dump need to be changed from 140000 to 150000
once the ongoing release is completed.
---
 src/backend/catalog/Makefile                |   4 +-
 src/backend/catalog/aclchk.c                |   2 +
 src/backend/catalog/dependency.c            |   9 +
 src/backend/catalog/objectaddress.c         | 145 ++++++++++
 src/backend/catalog/pg_publication.c        | 170 ++++++++++-
 src/backend/commands/alter.c                |   1 +
 src/backend/commands/event_trigger.c        |   4 +
 src/backend/commands/lockcmds.c             |   1 +
 src/backend/commands/publicationcmds.c      | 296 +++++++++++++++++++-
 src/backend/commands/seclabel.c             |   1 +
 src/backend/commands/tablecmds.c            |   2 +
 src/backend/parser/gram.y                   | 120 ++++++--
 src/backend/replication/pgoutput/pgoutput.c |  16 +-
 src/backend/utils/cache/relcache.c          |   1 +
 src/backend/utils/cache/syscache.c          |  23 ++
 src/bin/pg_dump/common.c                    |   3 +
 src/bin/pg_dump/pg_backup_archiver.c        |   3 +-
 src/bin/pg_dump/pg_dump.c                   | 165 ++++++++++-
 src/bin/pg_dump/pg_dump.h                   |  16 ++
 src/bin/pg_dump/pg_dump_sort.c              |   7 +
 src/bin/psql/describe.c                     | 218 +++++++++++---
 src/bin/psql/tab-complete.c                 |  22 +-
 src/include/catalog/dependency.h            |   1 +
 src/include/catalog/pg_publication.h        |  41 +--
 src/include/catalog/pg_publication_schema.h |  49 ++++
 src/include/commands/publicationcmds.h      |  21 ++
 src/include/nodes/nodes.h                   |   1 +
 src/include/nodes/parsenodes.h              |  20 ++
 src/include/utils/syscache.h                |   2 +
 src/test/regress/expected/oidjoins.out      |   2 +
 src/test/regress/expected/publication.out   | 100 +++----
 src/test/regress/expected/sanity_check.out  |   1 +
 src/tools/pgindent/typedefs.list            |   3 +
 33 files changed, 1304 insertions(+), 166 deletions(-)
 create mode 100644 src/include/catalog/pg_publication_schema.h

diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile
index 69f9dd51a7..30026a967b 100644
--- a/src/backend/catalog/Makefile
+++ b/src/backend/catalog/Makefile
@@ -67,8 +67,8 @@ CATALOG_HEADERS := \
 	pg_foreign_table.h pg_policy.h pg_replication_origin.h \
 	pg_default_acl.h pg_init_privs.h pg_seclabel.h pg_shseclabel.h \
 	pg_collation.h pg_partitioned_table.h pg_range.h pg_transform.h \
-	pg_sequence.h pg_publication.h pg_publication_rel.h pg_subscription.h \
-	pg_subscription_rel.h
+	pg_sequence.h pg_publication.h pg_publication_rel.h pg_publication_schema.h \
+	pg_subscription.h pg_subscription_rel.h
 
 GENERATED_HEADERS := $(CATALOG_HEADERS:%.h=%_d.h) schemapg.h system_fk_info.h
 
diff --git a/src/backend/catalog/aclchk.c b/src/backend/catalog/aclchk.c
index 53392414f1..59600fc98d 100644
--- a/src/backend/catalog/aclchk.c
+++ b/src/backend/catalog/aclchk.c
@@ -3433,6 +3433,7 @@ aclcheck_error(AclResult aclerr, ObjectType objtype,
 					case OBJECT_DEFACL:
 					case OBJECT_DOMCONSTRAINT:
 					case OBJECT_PUBLICATION_REL:
+					case OBJECT_PUBLICATION_SCHEMA:
 					case OBJECT_ROLE:
 					case OBJECT_RULE:
 					case OBJECT_TABCONSTRAINT:
@@ -3572,6 +3573,7 @@ aclcheck_error(AclResult aclerr, ObjectType objtype,
 					case OBJECT_DEFACL:
 					case OBJECT_DOMCONSTRAINT:
 					case OBJECT_PUBLICATION_REL:
+					case OBJECT_PUBLICATION_SCHEMA:
 					case OBJECT_ROLE:
 					case OBJECT_TRANSFORM:
 					case OBJECT_TSPARSER:
diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c
index 0c37fc1d53..c0a9fb0c7e 100644
--- a/src/backend/catalog/dependency.c
+++ b/src/backend/catalog/dependency.c
@@ -49,6 +49,7 @@
 #include "catalog/pg_proc.h"
 #include "catalog/pg_publication.h"
 #include "catalog/pg_publication_rel.h"
+#include "catalog/pg_publication_schema.h"
 #include "catalog/pg_rewrite.h"
 #include "catalog/pg_statistic_ext.h"
 #include "catalog/pg_subscription.h"
@@ -179,6 +180,7 @@ static const Oid object_classes[] = {
 	PolicyRelationId,			/* OCLASS_POLICY */
 	PublicationRelationId,		/* OCLASS_PUBLICATION */
 	PublicationRelRelationId,	/* OCLASS_PUBLICATION_REL */
+	PublicationSchemaRelationId, /* OCLASS_PUBLICATION_SCHEMA */
 	SubscriptionRelationId,		/* OCLASS_SUBSCRIPTION */
 	TransformRelationId			/* OCLASS_TRANSFORM */
 };
@@ -1470,6 +1472,10 @@ doDeletion(const ObjectAddress *object, int flags)
 			RemovePublicationRelById(object->objectId);
 			break;
 
+		case OCLASS_PUBLICATION_SCHEMA:
+			RemovePublicationSchemaById(object->objectId);
+			break;
+
 		case OCLASS_CAST:
 		case OCLASS_COLLATION:
 		case OCLASS_CONVERSION:
@@ -2863,6 +2869,9 @@ getObjectClass(const ObjectAddress *object)
 		case PublicationRelRelationId:
 			return OCLASS_PUBLICATION_REL;
 
+		case PublicationSchemaRelationId:
+			return OCLASS_PUBLICATION_SCHEMA;
+
 		case SubscriptionRelationId:
 			return OCLASS_SUBSCRIPTION;
 
diff --git a/src/backend/catalog/objectaddress.c b/src/backend/catalog/objectaddress.c
index 9882e549c4..b4e31f8157 100644
--- a/src/backend/catalog/objectaddress.c
+++ b/src/backend/catalog/objectaddress.c
@@ -49,6 +49,7 @@
 #include "catalog/pg_proc.h"
 #include "catalog/pg_publication.h"
 #include "catalog/pg_publication_rel.h"
+#include "catalog/pg_publication_schema.h"
 #include "catalog/pg_rewrite.h"
 #include "catalog/pg_statistic_ext.h"
 #include "catalog/pg_subscription.h"
@@ -67,6 +68,7 @@
 #include "commands/extension.h"
 #include "commands/policy.h"
 #include "commands/proclang.h"
+#include "commands/publicationcmds.h"
 #include "commands/tablespace.h"
 #include "commands/trigger.h"
 #include "foreign/foreign.h"
@@ -829,6 +831,10 @@ static const struct object_type_map
 	{
 		"publication relation", OBJECT_PUBLICATION_REL
 	},
+	/* OCLASS_PUBLICATION_SCHEMA */
+	{
+		"publication schema", OBJECT_PUBLICATION_SCHEMA
+	},
 	/* OCLASS_SUBSCRIPTION */
 	{
 		"subscription", OBJECT_SUBSCRIPTION
@@ -875,6 +881,9 @@ static ObjectAddress get_object_address_usermapping(List *object,
 static ObjectAddress get_object_address_publication_rel(List *object,
 														Relation *relp,
 														bool missing_ok);
+static ObjectAddress get_object_address_publication_schema(List *object,
+														   bool missing_ok);
+
 static ObjectAddress get_object_address_defacl(List *object,
 											   bool missing_ok);
 static const ObjectPropertyType *get_object_property_data(Oid class_id);
@@ -1118,6 +1127,10 @@ get_object_address(ObjectType objtype, Node *object,
 															 &relation,
 															 missing_ok);
 				break;
+			case OBJECT_PUBLICATION_SCHEMA:
+				address = get_object_address_publication_schema(castNode(List, object),
+																missing_ok);
+				break;
 			case OBJECT_DEFACL:
 				address = get_object_address_defacl(castNode(List, object),
 													missing_ok);
@@ -1935,6 +1948,51 @@ get_object_address_publication_rel(List *object,
 	return address;
 }
 
+/*
+ * Find the ObjectAddress for a publication schema.  The first element of
+ * the object parameter is the schema name, the second is the
+ * publication name.
+ */
+static ObjectAddress
+get_object_address_publication_schema(List *object, bool missing_ok)
+{
+	ObjectAddress address;
+	char	   *pubname;
+	Publication *pub;
+	char	   *schemaname;
+	Oid			schemaoid;
+
+	ObjectAddressSet(address, PublicationSchemaRelationId, InvalidOid);
+
+	/* fetch publication name and schema oid from input list */
+	schemaname = strVal(linitial(object));
+	pubname = strVal(lsecond(object));
+
+	schemaoid = get_namespace_oid(schemaname, false);
+
+	/* Now look up the pg_publication tuple */
+	pub = GetPublicationByName(pubname, missing_ok);
+	if (!pub)
+		return address;
+
+	/* Find the publication schema mapping in syscache. */
+	address.objectId =
+		GetSysCacheOid2(PUBLICATIONSCHEMAMAP, Anum_pg_publication_schema_oid,
+						ObjectIdGetDatum(schemaoid),
+						ObjectIdGetDatum(pub->oid));
+	if (!OidIsValid(address.objectId))
+	{
+		if (!missing_ok)
+			ereport(ERROR,
+					(errcode(ERRCODE_UNDEFINED_OBJECT),
+					 errmsg("publication schema \"%s\" in publication \"%s\" does not exist",
+							schemaname, pubname)));
+		return address;
+	}
+
+	return address;
+}
+
 /*
  * Find the ObjectAddress for a default ACL.
  */
@@ -2207,6 +2265,7 @@ pg_get_object_address(PG_FUNCTION_ARGS)
 		case OBJECT_CAST:
 		case OBJECT_USER_MAPPING:
 		case OBJECT_PUBLICATION_REL:
+		case OBJECT_PUBLICATION_SCHEMA:
 		case OBJECT_DEFACL:
 		case OBJECT_TRANSFORM:
 			if (list_length(args) != 1)
@@ -2299,6 +2358,9 @@ pg_get_object_address(PG_FUNCTION_ARGS)
 		case OBJECT_PUBLICATION_REL:
 			objnode = (Node *) list_make2(name, linitial(args));
 			break;
+		case OBJECT_PUBLICATION_SCHEMA:
+			objnode = (Node *) list_make2(linitial(name), linitial(args));
+			break;
 		case OBJECT_USER_MAPPING:
 			objnode = (Node *) list_make2(linitial(name), linitial(args));
 			break;
@@ -3902,6 +3964,44 @@ getObjectDescription(const ObjectAddress *object, bool missing_ok)
 				break;
 			}
 
+		case OCLASS_PUBLICATION_SCHEMA:
+			{
+				HeapTuple	tup;
+				char	   *pubname;
+				Form_pg_publication_schema psform;
+				char	   *nspname;
+
+				tup = SearchSysCache1(PUBLICATIONSCHEMA,
+									  ObjectIdGetDatum(object->objectId));
+				if (!HeapTupleIsValid(tup))
+				{
+					if (!missing_ok)
+						elog(ERROR, "cache lookup failed for publication schema %u",
+							 object->objectId);
+					break;
+				}
+
+				psform = (Form_pg_publication_schema) GETSTRUCT(tup);
+				pubname = get_publication_name(psform->pspubid, false);
+				nspname = get_namespace_name(psform->psnspcid);
+				if (!nspname)
+				{
+					pfree(pubname);
+					ReleaseSysCache(tup);
+					if (!missing_ok)
+						elog(ERROR, "cache lookup failed for schema %u",
+							 psform->psnspcid);
+					break;
+				}
+
+				appendStringInfo(&buffer, _("publication of schema %s in publication %s"),
+								 nspname, pubname);
+				pfree(pubname);
+				pfree(nspname);
+				ReleaseSysCache(tup);
+				break;
+			}
+
 		case OCLASS_SUBSCRIPTION:
 			{
 				char	   *subname = get_subscription_name(object->objectId,
@@ -4476,6 +4576,10 @@ getObjectTypeDescription(const ObjectAddress *object, bool missing_ok)
 			appendStringInfoString(&buffer, "publication relation");
 			break;
 
+		case OCLASS_PUBLICATION_SCHEMA:
+			appendStringInfoString(&buffer, "publication schema");
+			break;
+
 		case OCLASS_SUBSCRIPTION:
 			appendStringInfoString(&buffer, "subscription");
 			break;
@@ -5711,6 +5815,47 @@ getObjectIdentityParts(const ObjectAddress *object,
 				break;
 			}
 
+		case OCLASS_PUBLICATION_SCHEMA:
+			{
+				HeapTuple	tup;
+				char	   *pubname;
+				char	   *nspname;
+				Form_pg_publication_schema psform;
+
+				tup = SearchSysCache1(PUBLICATIONSCHEMA,
+									  ObjectIdGetDatum(object->objectId));
+				if (!HeapTupleIsValid(tup))
+				{
+					if (!missing_ok)
+						elog(ERROR, "cache lookup failed for publication schema %u",
+							 object->objectId);
+					break;
+				}
+
+				psform = (Form_pg_publication_schema) GETSTRUCT(tup);
+				pubname = get_publication_name(psform->pspubid, false);
+				nspname = get_namespace_name(psform->psnspcid);
+				if (!nspname)
+				{
+					pfree(pubname);
+					ReleaseSysCache(tup);
+					if (!missing_ok)
+						elog(ERROR, "cache lookup failed for schema %u",
+							 psform->psnspcid);
+					break;
+				}
+
+				appendStringInfo(&buffer, "%s in publication %s", nspname, pubname);
+
+				if (objargs)
+					*objargs = list_make1(pubname);
+				if (objname)
+					*objname = list_make1(nspname);
+
+				ReleaseSysCache(tup);
+				break;
+			}
+
 		case OCLASS_SUBSCRIPTION:
 			{
 				char	   *subname;
diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 86e415af89..aeb2df8e5c 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -28,9 +28,12 @@
 #include "catalog/objectaccess.h"
 #include "catalog/objectaddress.h"
 #include "catalog/pg_inherits.h"
+#include "catalog/pg_namespace.h"
 #include "catalog/pg_publication.h"
 #include "catalog/pg_publication_rel.h"
+#include "catalog/pg_publication_schema.h"
 #include "catalog/pg_type.h"
+#include "commands/publicationcmds.h"
 #include "funcapi.h"
 #include "miscadmin.h"
 #include "utils/array.h"
@@ -214,6 +217,76 @@ publication_add_relation(Oid pubid, Relation targetrel,
 	return myself;
 }
 
+/*
+ * Insert new publication / schema mapping.
+ */
+ObjectAddress
+publication_add_schema(Oid pubid, Oid schemaoid, bool if_not_exists)
+{
+	Relation	rel;
+	HeapTuple	tup;
+	Datum		values[Natts_pg_publication_schema];
+	bool		nulls[Natts_pg_publication_schema];
+	Oid			psschid;
+	Publication *pub = GetPublication(pubid);
+	ObjectAddress myself,
+				referenced;
+
+	rel = table_open(PublicationSchemaRelationId, RowExclusiveLock);
+
+	/*
+	 * Check for duplicates. Note that this does not really prevent
+	 * duplicates, it's here just to provide nicer error message in common
+	 * case. The real protection is the unique key on the catalog.
+	 */
+	if (SearchSysCacheExists2(PUBLICATIONSCHEMAMAP, ObjectIdGetDatum(schemaoid),
+							  ObjectIdGetDatum(pubid)))
+	{
+		table_close(rel, RowExclusiveLock);
+
+		if (if_not_exists)
+			return InvalidObjectAddress;
+
+		ereport(ERROR,
+				(errcode(ERRCODE_DUPLICATE_OBJECT),
+				 errmsg("schema \"%s\" is already member of publication \"%s\"",
+						get_namespace_name(schemaoid), pub->name)));
+	}
+
+	/* Form a tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+
+	psschid = GetNewOidWithIndex(rel, PublicationSchemaObjectIndexId,
+								 Anum_pg_publication_schema_oid);
+	values[Anum_pg_publication_schema_oid - 1] = ObjectIdGetDatum(psschid);
+	values[Anum_pg_publication_schema_pspubid - 1] =
+		ObjectIdGetDatum(pubid);
+	values[Anum_pg_publication_schema_psnspcid - 1] =
+		ObjectIdGetDatum(schemaoid);
+
+	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
+
+	/* Insert tuple into catalog. */
+	CatalogTupleInsert(rel, tup);
+	heap_freetuple(tup);
+
+	ObjectAddressSet(myself, PublicationSchemaRelationId, psschid);
+
+	/* Add dependency on the publication */
+	ObjectAddressSet(referenced, PublicationRelationId, pubid);
+	recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
+
+	/* Add dependency on the schema */
+	ObjectAddressSet(referenced, NamespaceRelationId, schemaoid);
+	recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
+
+	/* Close the table. */
+	table_close(rel, RowExclusiveLock);
+
+	return myself;
+}
+
 /* Gets list of publication oids for a relation */
 List *
 GetRelationPublications(Oid relid)
@@ -304,6 +377,45 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt)
 	return result;
 }
 
+/*
+ * Gets list of schema oids for a publication.
+ *
+ * This should only be used for normal publications.
+ */
+List *
+GetPublicationSchemas(Oid pubid)
+{
+	List	   *result = NIL;
+	Relation	pubschsrel;
+	ScanKeyData scankey;
+	SysScanDesc scan;
+	HeapTuple	tup;
+
+	/* Find all publications associated with the schema. */
+	pubschsrel = table_open(PublicationSchemaRelationId, AccessShareLock);
+
+	ScanKeyInit(&scankey,
+				Anum_pg_publication_schema_pspubid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(pubid));
+
+	scan = systable_beginscan(pubschsrel, PublicationSchemaPsnspcidPspubidIndexId,
+							  true, NULL, 1, &scankey);
+	while (HeapTupleIsValid(tup = systable_getnext(scan)))
+	{
+		Form_pg_publication_schema pubsch;
+
+		pubsch = (Form_pg_publication_schema) GETSTRUCT(tup);
+
+		result = lappend_oid(result, pubsch->psnspcid);
+	}
+
+	systable_endscan(scan);
+	table_close(pubschsrel, AccessShareLock);
+
+	return result;
+}
+
 /*
  * Gets list of publication oids for publications marked as FOR ALL TABLES.
  */
@@ -342,29 +454,37 @@ GetAllTablesPublications(void)
 }
 
 /*
- * Gets list of all relation published by FOR ALL TABLES publication(s).
+ * Gets list of relations published.
  *
  * If the publication publishes partition changes via their respective root
  * partitioned tables, we must exclude partitions in favor of including the
- * root partitioned tables.
+ * root partitioned tables. If schemaOid is specified, get the relations present
+ * in the schema specified.
  */
 List *
-GetAllTablesPublicationRelations(bool pubviaroot)
+GetAllTablesPublicationRelations(bool pubviaroot, Oid schemaOid)
 {
 	Relation	classRel;
-	ScanKeyData key[1];
+	ScanKeyData key[2];
 	TableScanDesc scan;
 	HeapTuple	tuple;
 	List	   *result = NIL;
+	int			keycount = 0;
 
 	classRel = table_open(RelationRelationId, AccessShareLock);
 
-	ScanKeyInit(&key[0],
+	ScanKeyInit(&key[keycount++],
 				Anum_pg_class_relkind,
 				BTEqualStrategyNumber, F_CHAREQ,
 				CharGetDatum(RELKIND_RELATION));
 
-	scan = table_beginscan_catalog(classRel, 1, key);
+	if (schemaOid != InvalidOid)
+		ScanKeyInit(&key[keycount++],
+					Anum_pg_class_relnamespace,
+					BTEqualStrategyNumber, F_OIDEQ,
+					schemaOid);
+
+	scan = table_beginscan_catalog(classRel, keycount, key);
 
 	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
 	{
@@ -380,12 +500,13 @@ GetAllTablesPublicationRelations(bool pubviaroot)
 
 	if (pubviaroot)
 	{
-		ScanKeyInit(&key[0],
+		ScanKeyData skey[1];
+		ScanKeyInit(&skey[0],
 					Anum_pg_class_relkind,
 					BTEqualStrategyNumber, F_CHAREQ,
 					CharGetDatum(RELKIND_PARTITIONED_TABLE));
 
-		scan = table_beginscan_catalog(classRel, 1, key);
+		scan = table_beginscan_catalog(classRel, 1, skey);
 
 		while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
 		{
@@ -404,6 +525,29 @@ GetAllTablesPublicationRelations(bool pubviaroot)
 	return result;
 }
 
+/*
+ * Gets list of all relation published by FOR SCHEMA publication(s).
+ */
+List *
+GetAllSchemasPublicationRelations(Publication *publication)
+{
+	List	   *result = NIL;
+	List	   *pubschemalist = GetPublicationSchemas(publication->oid);
+	ListCell   *cell;
+
+	foreach(cell, pubschemalist)
+	{
+		Oid			schemaOid = lfirst_oid(cell);
+		List	   *schemaRels = NIL;
+
+		schemaRels = GetAllTablesPublicationRelations(publication->pubviaroot,
+													  schemaOid);
+		result = list_concat(result, schemaRels);
+	}
+
+	return result;
+}
+
 /*
  * Get publication using oid
  *
@@ -431,6 +575,7 @@ GetPublication(Oid pubid)
 	pub->pubactions.pubdelete = pubform->pubdelete;
 	pub->pubactions.pubtruncate = pubform->pubtruncate;
 	pub->pubviaroot = pubform->pubviaroot;
+	pub->pubtype = pubform->pubtype;
 
 	ReleaseSysCache(tup);
 
@@ -530,13 +675,16 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 		 * replicated using leaf partition identity and schema, so we only
 		 * need those.
 		 */
-		if (publication->alltables)
-			tables = GetAllTablesPublicationRelations(publication->pubviaroot);
-		else
+		if (publication->pubtype == PUBTYPE_ALLTABLES)
+			tables = GetAllTablesPublicationRelations(publication->pubviaroot,
+													  InvalidOid);
+		else if (publication->pubtype == PUBTYPE_TABLE)
 			tables = GetPublicationRelations(publication->oid,
 											 publication->pubviaroot ?
 											 PUBLICATION_PART_ROOT :
 											 PUBLICATION_PART_LEAF);
+		else if (publication->pubtype == PUBTYPE_SCHEMA)
+			tables = GetAllSchemasPublicationRelations(publication);
 		funcctx->user_fctx = (void *) tables;
 
 		MemoryContextSwitchTo(oldcontext);
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 29249498a9..e7c27459d8 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -661,6 +661,7 @@ AlterObjectNamespace_oid(Oid classId, Oid objid, Oid nspOid,
 		case OCLASS_POLICY:
 		case OCLASS_PUBLICATION:
 		case OCLASS_PUBLICATION_REL:
+		case OCLASS_PUBLICATION_SCHEMA:
 		case OCLASS_SUBSCRIPTION:
 		case OCLASS_TRANSFORM:
 			/* ignore object types that don't have schema-qualified names */
diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c
index 9c31c9e763..34cf049632 100644
--- a/src/backend/commands/event_trigger.c
+++ b/src/backend/commands/event_trigger.c
@@ -974,6 +974,7 @@ EventTriggerSupportsObjectType(ObjectType obtype)
 		case OBJECT_PROCEDURE:
 		case OBJECT_PUBLICATION:
 		case OBJECT_PUBLICATION_REL:
+		case OBJECT_PUBLICATION_SCHEMA:
 		case OBJECT_ROUTINE:
 		case OBJECT_RULE:
 		case OBJECT_SCHEMA:
@@ -1051,6 +1052,7 @@ EventTriggerSupportsObjectClass(ObjectClass objclass)
 		case OCLASS_POLICY:
 		case OCLASS_PUBLICATION:
 		case OCLASS_PUBLICATION_REL:
+		case OCLASS_PUBLICATION_SCHEMA:
 		case OCLASS_SUBSCRIPTION:
 		case OCLASS_TRANSFORM:
 			return true;
@@ -2131,6 +2133,7 @@ stringify_grant_objtype(ObjectType objtype)
 		case OBJECT_POLICY:
 		case OBJECT_PUBLICATION:
 		case OBJECT_PUBLICATION_REL:
+		case OBJECT_PUBLICATION_SCHEMA:
 		case OBJECT_ROLE:
 		case OBJECT_RULE:
 		case OBJECT_STATISTIC_EXT:
@@ -2213,6 +2216,7 @@ stringify_adefprivs_objtype(ObjectType objtype)
 		case OBJECT_POLICY:
 		case OBJECT_PUBLICATION:
 		case OBJECT_PUBLICATION_REL:
+		case OBJECT_PUBLICATION_SCHEMA:
 		case OBJECT_ROLE:
 		case OBJECT_RULE:
 		case OBJECT_STATISTIC_EXT:
diff --git a/src/backend/commands/lockcmds.c b/src/backend/commands/lockcmds.c
index 34f2270ced..3732f3727d 100644
--- a/src/backend/commands/lockcmds.c
+++ b/src/backend/commands/lockcmds.c
@@ -17,6 +17,7 @@
 #include "access/table.h"
 #include "access/xact.h"
 #include "catalog/namespace.h"
+#include "catalog/objectaddress.h"
 #include "catalog/pg_inherits.h"
 #include "commands/lockcmds.h"
 #include "miscadmin.h"
diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c
index 95c253c8e0..5fe4b1ad6f 100644
--- a/src/backend/commands/publicationcmds.c
+++ b/src/backend/commands/publicationcmds.c
@@ -25,8 +25,10 @@
 #include "catalog/objectaddress.h"
 #include "catalog/partition.h"
 #include "catalog/pg_inherits.h"
+#include "catalog/pg_namespace.h"
 #include "catalog/pg_publication.h"
 #include "catalog/pg_publication_rel.h"
+#include "catalog/pg_publication_schema.h"
 #include "catalog/pg_type.h"
 #include "commands/dbcommands.h"
 #include "commands/defrem.h"
@@ -53,6 +55,9 @@ static void CloseTableList(List *rels);
 static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
 								 AlterPublicationStmt *stmt);
 static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
+static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
+								  AlterPublicationStmt *stmt);
+static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
 
 static void
 parse_publication_options(List *options,
@@ -141,6 +146,52 @@ parse_publication_options(List *options,
 	}
 }
 
+/*
+ * Convert the SchemaSpec list into an Oid list.
+ */
+static List *
+ConvertSchemaSpecListToOidList(List *schemas)
+{
+	List	   *schemaoidlist = NIL;
+	ListCell   *cell;
+
+	foreach(cell, schemas)
+	{
+		SchemaSpec *schema = (SchemaSpec *) lfirst(cell);
+		Oid			schemoid;
+
+		switch (schema->schematype)
+		{
+			List	   *search_path;
+			char	   *nspname;
+
+			case SCHEMASPEC_CURRENT_SCHEMA:
+				search_path = fetch_search_path(false);
+				if (search_path == NIL) /* nothing valid in search_path? */
+					ereport(ERROR,
+							errcode(ERRCODE_UNDEFINED_SCHEMA),
+							errmsg("no schema has been selected"));
+
+				nspname = get_namespace_name(linitial_oid(search_path));
+				if (nspname == NULL)	/* recently-deleted namespace? */
+					ereport(ERROR,
+							errcode(ERRCODE_UNDEFINED_SCHEMA),
+							errmsg("no schema has been selected"));
+
+				schemoid = get_namespace_oid(nspname, false);
+				break;
+
+			default:
+				schemoid = get_namespace_oid(schema->schemaname, false);
+				break;
+		}
+
+		schemaoidlist = lappend_oid(schemaoidlist, schemoid);
+	}
+
+	return schemaoidlist;
+}
+
 /*
  * Create new publication.
  */
@@ -213,6 +264,15 @@ CreatePublication(CreatePublicationStmt *stmt)
 	values[Anum_pg_publication_pubviaroot - 1] =
 		BoolGetDatum(publish_via_partition_root);
 
+	if (stmt->schemas)
+		values[Anum_pg_publication_pubtype - 1] = PUBTYPE_SCHEMA;
+	else if (stmt->tables)
+		values[Anum_pg_publication_pubtype - 1] = PUBTYPE_TABLE;
+	else if (stmt->for_all_tables)
+		values[Anum_pg_publication_pubtype - 1] = PUBTYPE_ALLTABLES;
+	else
+		values[Anum_pg_publication_pubtype - 1] = PUBTYPE_EMPTY;
+
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
 	/* Insert tuple into catalog. */
@@ -226,6 +286,20 @@ CreatePublication(CreatePublicationStmt *stmt)
 	/* Make the changes visible. */
 	CommandCounterIncrement();
 
+	if (stmt->schemas)
+	{
+		List	   *schemaoidlist = NIL;
+		Relation	nspcrel;
+
+		Assert(list_length(stmt->schemas) > 0);
+
+		schemaoidlist = ConvertSchemaSpecListToOidList(stmt->schemas);
+
+		nspcrel = table_open(NamespaceRelationId, ShareUpdateExclusiveLock);
+		PublicationAddSchemas(puboid, schemaoidlist, true, NULL);
+		table_close(nspcrel, ShareUpdateExclusiveLock);
+	}
+
 	if (stmt->tables)
 	{
 		List	   *rels;
@@ -252,6 +326,32 @@ CreatePublication(CreatePublicationStmt *stmt)
 	return myself;
 }
 
+static void
+UpdatePublicationTypeTupleValue(Relation rel, HeapTuple tup, int col,
+								char pubtype)
+{
+	bool		nulls[Natts_pg_publication];
+	bool		replaces[Natts_pg_publication];
+	Datum		values[Natts_pg_publication];
+
+
+	/* Everything ok, form a new tuple. */
+	memset(values, 0, sizeof(values));
+	memset(nulls, false, sizeof(nulls));
+	memset(replaces, false, sizeof(replaces));
+
+	values[col - 1] = pubtype;
+	replaces[col - 1] = true;
+
+	tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
+							replaces);
+
+	/* Update the catalog. */
+	CatalogTupleUpdate(rel, &tup->t_self, tup);
+
+	CommandCounterIncrement();
+}
+
 /*
  * Change options of a publication.
  */
@@ -311,7 +411,7 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
 	pubform = (Form_pg_publication) GETSTRUCT(tup);
 
 	/* Invalidate the relcache. */
-	if (pubform->puballtables)
+	if (pubform->pubtype == PUBTYPE_ALLTABLES)
 	{
 		CacheInvalidateRelcacheAll();
 	}
@@ -363,19 +463,31 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 	Oid			pubid = pubform->oid;
 
 	/* Check that user is allowed to manipulate the publication tables. */
-	if (pubform->puballtables)
+	if (pubform->pubtype == PUBTYPE_ALLTABLES)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("publication \"%s\" is defined as FOR ALL TABLES",
 						NameStr(pubform->pubname)),
 				 errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
 
+	if (pubform->pubtype == PUBTYPE_SCHEMA)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("publication \"%s\" is defined as FOR SCHEMA",
+						NameStr(pubform->pubname)),
+				 errdetail("Tables cannot be added to or dropped from FOR SCHEMA publications.")));
+
 	Assert(list_length(stmt->tables) > 0);
 
 	rels = OpenTableList(stmt->tables);
 
 	if (stmt->tableAction == DEFELEM_ADD)
+	{
 		PublicationAddTables(pubid, rels, false, stmt);
+		if (pubform->pubtype == PUBTYPE_EMPTY)
+			UpdatePublicationTypeTupleValue(rel, tup, Anum_pg_publication_pubtype,
+											PUBTYPE_TABLE);
+	}
 	else if (stmt->tableAction == DEFELEM_DROP)
 		PublicationDropTables(pubid, rels, false);
 	else						/* DEFELEM_SET */
@@ -427,11 +539,90 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel,
 	CloseTableList(rels);
 }
 
+/*
+ * Alter the publication schemas.
+ *
+ * Add/Remove/Set the schemas to/from publication.
+ */
+static void
+AlterPublicationSchemas(AlterPublicationStmt *stmt, Relation rel,
+						HeapTuple tup, Form_pg_publication pubform)
+{
+	List	   *schemaoidlist = NIL;
+
+	/* Check that user is allowed to manipulate the publication tables. */
+	if (pubform->pubtype == PUBTYPE_ALLTABLES)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("publication \"%s\" is defined as FOR ALL TABLES",
+						NameStr(pubform->pubname)),
+				 errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications.")));
+
+	if (pubform->pubtype == PUBTYPE_TABLE)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("publication \"%s\" is defined as FOR TABLE",
+						NameStr(pubform->pubname)),
+				 errdetail("Schemas cannot be added to or dropped from FOR TABLE publications.")));
+
+	/* Convert the text list into oid list. */
+	schemaoidlist = ConvertSchemaSpecListToOidList(stmt->schemas);
+
+	if (stmt->tableAction == DEFELEM_ADD)
+	{
+		PublicationAddSchemas(pubform->oid, schemaoidlist, false, stmt);
+		if (pubform->pubtype == PUBTYPE_EMPTY)
+			UpdatePublicationTypeTupleValue(rel, tup, Anum_pg_publication_pubtype,
+											PUBTYPE_SCHEMA);
+	}
+	else if (stmt->tableAction == DEFELEM_DROP)
+		PublicationDropSchemas(pubform->oid, schemaoidlist, false);
+	else
+	{
+		List	   *oldschemaids = GetPublicationSchemas(pubform->oid);
+		List	   *delschemas = NIL;
+		ListCell   *oldlc;
+
+		/* Identify which schemas should be dropped. */
+		foreach(oldlc, oldschemaids)
+		{
+			Oid			oldschemaid = lfirst_oid(oldlc);
+			ListCell   *newlc;
+			bool		found = false;
+
+			foreach(newlc, schemaoidlist)
+			{
+				Oid			newschemaid = lfirst_oid(newlc);
+
+				if (newschemaid == oldschemaid)
+				{
+					found = true;
+					break;
+				}
+			}
+
+			if (!found)
+				delschemas = lappend_oid(delschemas, oldschemaid);
+		}
+
+		/* And drop them. */
+		PublicationDropSchemas(pubform->oid, delschemas, true);
+
+		/*
+		 * Don't bother calculating the difference for adding, we'll catch and
+		 * skip existing ones when doing catalog update.
+		 */
+		PublicationAddSchemas(pubform->oid, schemaoidlist, true, stmt);
+	}
+
+	return;
+}
+
 /*
  * Alter the existing publication.
  *
- * This is dispatcher function for AlterPublicationOptions and
- * AlterPublicationTables.
+ * This is dispatcher function for AlterPublicationOptions,
+ * AlterPublicationSchemas and AlterPublicationTables.
  */
 void
 AlterPublication(AlterPublicationStmt *stmt)
@@ -460,6 +651,8 @@ AlterPublication(AlterPublicationStmt *stmt)
 
 	if (stmt->options)
 		AlterPublicationOptions(stmt, rel, tup);
+	else if (stmt->schemas)
+		AlterPublicationSchemas(stmt, rel, tup, pubform);
 	else
 		AlterPublicationTables(stmt, rel, tup);
 
@@ -498,6 +691,30 @@ RemovePublicationRelById(Oid proid)
 	table_close(rel, RowExclusiveLock);
 }
 
+/*
+ * Remove schema from publication by mapping OID.
+ */
+void
+RemovePublicationSchemaById(Oid psoid)
+{
+	Relation	rel;
+	HeapTuple	tup;
+
+	rel = table_open(PublicationSchemaRelationId, RowExclusiveLock);
+
+	tup = SearchSysCache1(PUBLICATIONSCHEMA, ObjectIdGetDatum(psoid));
+
+	if (!HeapTupleIsValid(tup))
+		elog(ERROR, "cache lookup failed for publication schema %u",
+			 psoid);
+
+	CatalogTupleDelete(rel, &tup->t_self);
+
+	ReleaseSysCache(tup);
+
+	table_close(rel, RowExclusiveLock);
+}
+
 /*
  * Open relations specified by a RangeVar list.
  * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
@@ -608,7 +825,7 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
 {
 	ListCell   *lc;
 
-	Assert(!stmt || !stmt->for_all_tables);
+	Assert(!stmt || !stmt->for_all_tables || !stmt->schemas);
 
 	foreach(lc, rels)
 	{
@@ -632,6 +849,39 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
 	}
 }
 
+/*
+ * Add listed schemas to the publication.
+ */
+static void
+PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
+					  AlterPublicationStmt *stmt)
+{
+	ListCell   *lc;
+
+	Assert(!stmt || !stmt->for_all_tables || !stmt->tables);
+
+	foreach(lc, schemas)
+	{
+		Oid			schemaoid = lfirst_oid(lc);
+		ObjectAddress obj;
+
+		/* Must be owner of the schema or superuser. */
+		if (!pg_namespace_ownercheck(schemaoid, GetUserId()))
+			aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SCHEMA,
+						   get_namespace_name(schemaoid));
+
+		obj = publication_add_schema(pubid, schemaoid, if_not_exists);
+		if (stmt)
+		{
+			EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
+											 (Node *) stmt);
+
+			InvokeObjectPostCreateHook(PublicationSchemaRelationId,
+									   obj.objectId, 0);
+		}
+	}
+}
+
 /*
  * Remove listed tables from the publication.
  */
@@ -666,6 +916,40 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
 	}
 }
 
+/*
+ * Remove listed schemas from the publication.
+ */
+static void
+PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
+{
+	ObjectAddress obj;
+	ListCell   *lc;
+	Oid			prid;
+
+	foreach(lc, schemas)
+	{
+		Oid			schemaoid = lfirst_oid(lc);
+
+		prid = GetSysCacheOid2(PUBLICATIONSCHEMAMAP,
+							   Anum_pg_publication_schema_oid,
+							   ObjectIdGetDatum(schemaoid),
+							   ObjectIdGetDatum(pubid));
+		if (!OidIsValid(prid))
+		{
+			if (missing_ok)
+				continue;
+
+			ereport(ERROR,
+					(errcode(ERRCODE_UNDEFINED_OBJECT),
+					 errmsg("schema \"%s\" is not part of the publication",
+							get_namespace_name(schemaoid))));
+		}
+
+		ObjectAddressSet(obj, PublicationSchemaRelationId, prid);
+		performDeletion(&obj, DROP_CASCADE, 0);
+	}
+}
+
 /*
  * Internal workhorse for changing a publication owner
  */
@@ -697,7 +981,7 @@ AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
 			aclcheck_error(aclresult, OBJECT_DATABASE,
 						   get_database_name(MyDatabaseId));
 
-		if (form->puballtables && !superuser_arg(newOwnerId))
+		if (form->pubtype == PUBTYPE_ALLTABLES && !superuser_arg(newOwnerId))
 			ereport(ERROR,
 					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
 					 errmsg("permission denied to change owner of publication \"%s\"",
diff --git a/src/backend/commands/seclabel.c b/src/backend/commands/seclabel.c
index 6906714298..b108b641c5 100644
--- a/src/backend/commands/seclabel.c
+++ b/src/backend/commands/seclabel.c
@@ -80,6 +80,7 @@ SecLabelSupportsObjectType(ObjectType objtype)
 		case OBJECT_OPFAMILY:
 		case OBJECT_POLICY:
 		case OBJECT_PUBLICATION_REL:
+		case OBJECT_PUBLICATION_SCHEMA:
 		case OBJECT_RULE:
 		case OBJECT_STATISTIC_EXT:
 		case OBJECT_TABCONSTRAINT:
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 4e23c7fce5..cffc3c263c 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -52,6 +52,7 @@
 #include "commands/defrem.h"
 #include "commands/event_trigger.h"
 #include "commands/policy.h"
+#include "commands/publicationcmds.h"
 #include "commands/sequence.h"
 #include "commands/tablecmds.h"
 #include "commands/tablespace.h"
@@ -12138,6 +12139,7 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
 			case OCLASS_EVENT_TRIGGER:
 			case OCLASS_PUBLICATION:
 			case OCLASS_PUBLICATION_REL:
+			case OCLASS_PUBLICATION_SCHEMA:
 			case OCLASS_SUBSCRIPTION:
 			case OCLASS_TRANSFORM:
 
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index eb24195438..81e508b7a5 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -169,6 +169,7 @@ static Node *makeNullAConst(int location);
 static Node *makeAConst(Value *v, int location);
 static Node *makeBoolAConst(bool state, int location);
 static RoleSpec *makeRoleSpec(RoleSpecType type, int location);
+static SchemaSpec *makeSchemaSpec(SchemaSpecType type, int location);
 static void check_qualified_name(List *names, core_yyscan_t yyscanner);
 static List *check_func_name(List *names, core_yyscan_t yyscanner);
 static List *check_indirection(List *indirection, core_yyscan_t yyscanner);
@@ -257,6 +258,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	PartitionSpec		*partspec;
 	PartitionBoundSpec	*partboundspec;
 	RoleSpec			*rolespec;
+	SchemaSpec			*schemaspec;
 	struct SelectLimit	*selectlimit;
 	SetQuantifier	 setquantifier;
 	struct GroupClause  *groupclause;
@@ -426,14 +428,13 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 				transform_element_list transform_type_list
 				TriggerTransitions TriggerReferencing
 				vacuum_relation_list opt_vacuum_relation_list
-				drop_option_list
+				drop_option_list schema_list
 
 %type <node>	opt_routine_body
 %type <groupclause> group_clause
 %type <list>	group_by_list
 %type <node>	group_by_item empty_grouping_set rollup_clause cube_clause
 %type <node>	grouping_sets_clause
-%type <node>	opt_publication_for_tables publication_for_tables
 
 %type <list>	opt_fdw_options fdw_options
 %type <defelt>	fdw_option
@@ -554,6 +555,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <str>		createdb_opt_name plassign_target
 %type <node>	var_value zone_value
 %type <rolespec> auth_ident RoleSpec opt_granted_by
+%type <schemaspec> SchemaSpec
 
 %type <keyword> unreserved_keyword type_func_name_keyword
 %type <keyword> col_name_keyword reserved_keyword
@@ -9583,45 +9585,68 @@ AlterOwnerStmt: ALTER AGGREGATE aggregate_with_argtypes OWNER TO RoleSpec
 
 /*****************************************************************************
  *
- * CREATE PUBLICATION name [ FOR TABLE ] [ WITH options ]
+ * CREATE PUBLICATION name [WITH options]
  *
+ * CREATE PUBLICATION FOR ALL TABLES [WITH options]
+ *
+ * CREATE PUBLICATION FOR TABLE [WITH options]
+ *
+ * CREATE PUBLICATION FOR SCHEMA [WITH options]
  *****************************************************************************/
 
 CreatePublicationStmt:
-			CREATE PUBLICATION name opt_publication_for_tables opt_definition
+			CREATE PUBLICATION name opt_definition
 				{
 					CreatePublicationStmt *n = makeNode(CreatePublicationStmt);
 					n->pubname = $3;
-					n->options = $5;
-					if ($4 != NULL)
-					{
-						/* FOR TABLE */
-						if (IsA($4, List))
-							n->tables = (List *)$4;
-						/* FOR ALL TABLES */
-						else
-							n->for_all_tables = true;
-					}
+					n->options = $4;
 					$$ = (Node *)n;
 				}
-		;
-
-opt_publication_for_tables:
-			publication_for_tables					{ $$ = $1; }
-			| /* EMPTY */							{ $$ = NULL; }
-		;
-
-publication_for_tables:
-			FOR TABLE relation_expr_list
+			| CREATE PUBLICATION name FOR ALL TABLES opt_definition
 				{
-					$$ = (Node *) $3;
+					CreatePublicationStmt *n = makeNode(CreatePublicationStmt);
+					n->pubname = $3;
+					n->options = $7;
+					n->for_all_tables = true;
+					$$ = (Node *)n;
 				}
-			| FOR ALL TABLES
+			| CREATE PUBLICATION name FOR TABLE relation_expr_list opt_definition
 				{
-					$$ = (Node *) makeInteger(true);
+					CreatePublicationStmt *n = makeNode(CreatePublicationStmt);
+					n->pubname = $3;
+					n->options = $7;
+					n->tables = (List *)$6;
+					$$ = (Node *)n;
+				}
+			| CREATE PUBLICATION name FOR SCHEMA schema_list opt_definition
+				{
+					CreatePublicationStmt *n = makeNode(CreatePublicationStmt);
+					n->pubname = $3;
+					n->options = $7;
+					n->schemas = (List *)$6;
+					$$ = (Node *)n;
 				}
 		;
 
+/* Schema specifications */
+SchemaSpec:	ColId
+					{
+						SchemaSpec *n;
+						n = makeSchemaSpec(SCHEMASPEC_CSTRING, @1);
+						n->schemaname = pstrdup($1);
+						$$ = n;
+					}
+			| CURRENT_SCHEMA
+					{
+						$$ = makeSchemaSpec(SCHEMASPEC_CURRENT_SCHEMA, @1);
+					}
+		;
+
+schema_list:	SchemaSpec
+					{ $$ = list_make1($1); }
+			| schema_list ',' SchemaSpec
+					{ $$ = lappend($1, $3); }
+		;
 
 /*****************************************************************************
  *
@@ -9633,6 +9658,11 @@ publication_for_tables:
  *
  * ALTER PUBLICATION name SET TABLE table [, table2]
  *
+ * ALTER PUBLICATION name ADD SCHEMA schema [, schema2]
+ *
+ * ALTER PUBLICATION name DROP SCHEMA schema [, schema2]
+ *
+ * ALTER PUBLICATION name SET SCHEMA schema [, schema2]
  *****************************************************************************/
 
 AlterPublicationStmt:
@@ -9667,6 +9697,30 @@ AlterPublicationStmt:
 					n->tableAction = DEFELEM_DROP;
 					$$ = (Node *)n;
 				}
+			| ALTER PUBLICATION name ADD_P SCHEMA schema_list
+				{
+					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
+					n->pubname = $3;
+					n->schemas = $6;
+					n->tableAction = DEFELEM_ADD;
+					$$ = (Node *)n;
+				}
+			| ALTER PUBLICATION name SET SCHEMA schema_list
+				{
+					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
+					n->pubname = $3;
+					n->schemas = $6;
+					n->tableAction = DEFELEM_SET;
+					$$ = (Node *)n;
+				}
+			| ALTER PUBLICATION name DROP SCHEMA schema_list
+				{
+					AlterPublicationStmt *n = makeNode(AlterPublicationStmt);
+					n->pubname = $3;
+					n->schemas = $6;
+					n->tableAction = DEFELEM_DROP;
+					$$ = (Node *)n;
+				}
 		;
 
 /*****************************************************************************
@@ -16613,6 +16667,20 @@ makeRoleSpec(RoleSpecType type, int location)
 	return spec;
 }
 
+/* makeSchemaSpec
+ * Create a SchemaSpec with the given type
+ */
+static SchemaSpec *
+makeSchemaSpec(SchemaSpecType type, int location)
+{
+	SchemaSpec *spec = makeNode(SchemaSpec);
+
+	spec->schematype = type;
+	spec->location = location;
+
+	return spec;
+}
+
 /* check_qualified_name --- check the result of qualified_name production
  *
  * It's easiest to let the grammar production for qualified_name allow
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 63f108f960..1057acd62e 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -16,6 +16,7 @@
 #include "catalog/partition.h"
 #include "catalog/pg_publication.h"
 #include "commands/defrem.h"
+#include "commands/publicationcmds.h"
 #include "fmgr.h"
 #include "replication/logical.h"
 #include "replication/logicalproto.h"
@@ -1063,13 +1064,26 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 			Publication *pub = lfirst(lc);
 			bool		publish = false;
 
-			if (pub->alltables)
+			if (pub->pubtype == PUBTYPE_ALLTABLES)
 			{
 				publish = true;
 				if (pub->pubviaroot && am_partition)
 					publish_as_relid = llast_oid(get_partition_ancestors(relid));
 			}
 
+			if (pub->pubtype == PUBTYPE_SCHEMA)
+			{
+				Oid			schemaId = get_rel_namespace(relid);
+				List	   *pubschemas = GetPublicationSchemas(pub->oid);
+
+				if (list_member_oid(pubschemas, schemaId))
+				{
+					publish = true;
+					if (pub->pubviaroot && am_partition)
+						publish_as_relid = llast_oid(get_partition_ancestors(relid));
+				}
+			}
+
 			if (!publish)
 			{
 				bool		ancestor_published = false;
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index d55ae016d0..671737acb2 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -66,6 +66,7 @@
 #include "catalog/schemapg.h"
 #include "catalog/storage.h"
 #include "commands/policy.h"
+#include "commands/publicationcmds.h"
 #include "commands/trigger.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c
index e4dc4ee34e..b2f8b8add8 100644
--- a/src/backend/utils/cache/syscache.c
+++ b/src/backend/utils/cache/syscache.c
@@ -51,6 +51,7 @@
 #include "catalog/pg_proc.h"
 #include "catalog/pg_publication.h"
 #include "catalog/pg_publication_rel.h"
+#include "catalog/pg_publication_schema.h"
 #include "catalog/pg_range.h"
 #include "catalog/pg_replication_origin.h"
 #include "catalog/pg_rewrite.h"
@@ -650,6 +651,28 @@ static const struct cachedesc cacheinfo[] = {
 		},
 		64
 	},
+	{PublicationSchemaRelationId,	/* PUBLICATIONSCHEMA */
+		PublicationSchemaObjectIndexId,
+		1,
+		{
+			Anum_pg_publication_schema_oid,
+			0,
+			0,
+			0
+		},
+		64
+	},
+	{PublicationSchemaRelationId,	/* PUBLICATIONSCHEMAMAP */
+		PublicationSchemaPsnspcidPspubidIndexId,
+		2,
+		{
+			Anum_pg_publication_schema_psnspcid,
+			Anum_pg_publication_schema_pspubid,
+			0,
+			0
+		},
+		64
+	},
 	{RangeRelationId,			/* RANGEMULTIRANGE */
 		RangeMultirangeTypidIndexId,
 		1,
diff --git a/src/bin/pg_dump/common.c b/src/bin/pg_dump/common.c
index 1f24e79665..773f038b24 100644
--- a/src/bin/pg_dump/common.c
+++ b/src/bin/pg_dump/common.c
@@ -257,6 +257,9 @@ getSchemaData(Archive *fout, int *numTablesPtr)
 	pg_log_info("reading publication membership");
 	getPublicationTables(fout, tblinfo, numTables);
 
+	pg_log_info("reading publciation schemas");
+	getPublicationSchemas(fout, nspinfo, numNamespaces);
+
 	pg_log_info("reading subscriptions");
 	getSubscriptions(fout);
 
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 24cc096255..719d537497 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -2788,7 +2788,8 @@ _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH)
 	 */
 	if (ropt->no_publications &&
 		(strcmp(te->desc, "PUBLICATION") == 0 ||
-		 strcmp(te->desc, "PUBLICATION TABLE") == 0))
+		 strcmp(te->desc, "PUBLICATION TABLE") == 0 ||
+		 strcmp(te->desc, "PUBLICATION SCHEMA") == 0))
 		return 0;
 
 	/* If it's a security label, maybe ignore it */
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 8f53cc7c3b..dd049da209 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -50,6 +50,7 @@
 #include "catalog/pg_largeobject_d.h"
 #include "catalog/pg_largeobject_metadata_d.h"
 #include "catalog/pg_proc_d.h"
+#include "catalog/pg_publication.h"
 #include "catalog/pg_trigger_d.h"
 #include "catalog/pg_type_d.h"
 #include "common/connect.h"
@@ -3958,6 +3959,7 @@ getPublications(Archive *fout, int *numPublications)
 	int			i_pubdelete;
 	int			i_pubtruncate;
 	int			i_pubviaroot;
+	int			i_pubtype;
 	int			i,
 				ntups;
 
@@ -3972,25 +3974,37 @@ getPublications(Archive *fout, int *numPublications)
 	resetPQExpBuffer(query);
 
 	/* Get the publications. */
-	if (fout->remoteVersion >= 130000)
+	if (fout->remoteVersion >= 140000)
 		appendPQExpBuffer(query,
 						  "SELECT p.tableoid, p.oid, p.pubname, "
 						  "(%s p.pubowner) AS rolname, "
-						  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot "
+						  "p.puballtables, p.pubinsert, p.pubupdate, "
+						  "p.pubdelete, p.pubtruncate, p.pubviaroot, p.pubtype "
 						  "FROM pg_publication p",
 						  username_subquery);
+	else if (fout->remoteVersion >= 130000)
+		appendPQExpBuffer(query,
+						  "SELECT p.tableoid, p.oid, p.pubname, "
+						  "(%s p.pubowner) AS rolname, "
+						  "p.puballtables, p.pubinsert, p.pubupdate, "
+						  "p.pubdelete, p.pubtruncate, p.pubviaroot, "
+						  "NULL AS pubtype FROM pg_publication p",
+						  username_subquery);
 	else if (fout->remoteVersion >= 110000)
 		appendPQExpBuffer(query,
 						  "SELECT p.tableoid, p.oid, p.pubname, "
 						  "(%s p.pubowner) AS rolname, "
-						  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS pubviaroot "
-						  "FROM pg_publication p",
+						  "p.puballtables, p.pubinsert, p.pubupdate, "
+						  "p.pubdelete, p.pubtruncate, false AS pubviaroot, "
+						  "NULL AS pubtype FROM pg_publication p",
 						  username_subquery);
 	else
 		appendPQExpBuffer(query,
 						  "SELECT p.tableoid, p.oid, p.pubname, "
 						  "(%s p.pubowner) AS rolname, "
-						  "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false AS pubviaroot "
+						  "p.puballtables, p.pubinsert, p.pubupdate, "
+						  "p.pubdelete, false AS pubtruncate, "
+						  "false AS pubviaroot, NULL AS pubtype "
 						  "FROM pg_publication p",
 						  username_subquery);
 
@@ -4008,6 +4022,7 @@ getPublications(Archive *fout, int *numPublications)
 	i_pubdelete = PQfnumber(res, "pubdelete");
 	i_pubtruncate = PQfnumber(res, "pubtruncate");
 	i_pubviaroot = PQfnumber(res, "pubviaroot");
+	i_pubtype = PQfnumber(res, "pubtype");
 
 	pubinfo = pg_malloc(ntups * sizeof(PublicationInfo));
 
@@ -4032,6 +4047,7 @@ getPublications(Archive *fout, int *numPublications)
 			(strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0);
 		pubinfo[i].pubviaroot =
 			(strcmp(PQgetvalue(res, i, i_pubviaroot), "t") == 0);
+		pubinfo[i].pubtype = get_publication_type(PQgetvalue(res, i, i_pubtype));
 
 		if (strlen(pubinfo[i].rolname) == 0)
 			pg_log_warning("owner of publication \"%s\" appears to be invalid",
@@ -4074,7 +4090,7 @@ dumpPublication(Archive *fout, const PublicationInfo *pubinfo)
 	appendPQExpBuffer(query, "CREATE PUBLICATION %s",
 					  qpubname);
 
-	if (pubinfo->puballtables)
+	if (pubinfo->puballtables || pubinfo->pubtype == PUBTYPE_ALLTABLES)
 		appendPQExpBufferStr(query, " FOR ALL TABLES");
 
 	appendPQExpBufferStr(query, " WITH (publish = '");
@@ -4141,6 +4157,101 @@ dumpPublication(Archive *fout, const PublicationInfo *pubinfo)
 	free(qpubname);
 }
 
+/*
+ * getPublicationSchemas
+ *	  get information about publication membership for dumpable schemas.
+ */
+void
+getPublicationSchemas(Archive *fout, NamespaceInfo nspinfo[], int numSchemas)
+{
+	PQExpBuffer query;
+	PGresult   *res;
+	PublicationSchemaInfo *pubrinfo;
+	DumpOptions *dopt = fout->dopt;
+	int			i_schemaoid;
+	int			i_oid;
+	int			i_pubname;
+	int			i_pubid;
+	int			i,
+				j,
+				ntups;
+
+	if (dopt->no_publications || fout->remoteVersion < 140000)
+		return;
+
+	query = createPQExpBuffer();
+
+	for (i = 0; i < numSchemas; i++)
+	{
+		NamespaceInfo *nsinfo = &nspinfo[i];
+		PublicationInfo *pubinfo;
+
+		/*
+		 * Ignore publication membership of schemas whose definitions are not
+		 * to be dumped.
+		 */
+		if (!(nsinfo->dobj.dump & DUMP_COMPONENT_DEFINITION))
+			continue;
+
+		pg_log_info("reading publication membership for schema \"%s\"",
+					nsinfo->dobj.name);
+
+		resetPQExpBuffer(query);
+
+		/* Get the publication membership for the table. */
+		appendPQExpBuffer(query,
+						  "SELECT ps.psnspcid, ps.oid, p.pubname, p.oid AS pubid "
+						  "FROM pg_publication_schema ps, pg_publication p "
+						  "WHERE ps.psnspcid = '%u' "
+						  "AND p.oid = ps.pspubid",
+						  nsinfo->dobj.catId.oid);
+		res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
+
+		ntups = PQntuples(res);
+
+		if (ntups == 0)
+		{
+			/*
+			 * Schema is not member of any publications. Clean up and return.
+			 */
+			PQclear(res);
+			continue;
+		}
+
+		i_schemaoid = PQfnumber(res, "psnspcid");
+		i_oid = PQfnumber(res, "oid");
+		i_pubname = PQfnumber(res, "pubname");
+		i_pubid = PQfnumber(res, "pubid");
+
+		pubrinfo = pg_malloc(ntups * sizeof(PublicationSchemaInfo));
+
+		for (j = 0; j < ntups; j++)
+		{
+			Oid			pspubid = atooid(PQgetvalue(res, j, i_pubid));
+
+			pubinfo = findPublicationByOid(pspubid);
+			if (pubinfo == NULL)
+				continue;
+
+			pubrinfo[j].dobj.objType = DO_PUBLICATION_SCHEMA;
+			pubrinfo[j].dobj.catId.tableoid =
+				atooid(PQgetvalue(res, j, i_schemaoid));
+			pubrinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, j, i_oid));
+			AssignDumpId(&pubrinfo[j].dobj);
+			pubrinfo[j].dobj.namespace = nsinfo->dobj.namespace;
+			pubrinfo[j].dobj.name = nsinfo->dobj.name;
+			pubrinfo[j].pubname = pg_strdup(PQgetvalue(res, j, i_pubname));
+			pubrinfo[j].pubschema = nsinfo;
+			pubrinfo[j].publication = pubinfo;
+
+			/* Decide whether we want to dump it */
+			selectDumpablePublicationTable(&(pubrinfo[j].dobj), fout);
+		}
+		PQclear(res);
+	}
+	destroyPQExpBuffer(query);
+}
+
 /*
  * getPublicationTables
  *	  get information about publication membership for dumpable tables.
@@ -4228,6 +4339,44 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
 	destroyPQExpBuffer(query);
 }
 
+/*
+ * dumpPublicationSchema
+ *	  dump the definition of the given publication schema mapping
+ */
+static void
+dumpPublicationSchema(Archive *fout, PublicationSchemaInfo *pubrinfo)
+{
+	NamespaceInfo *schemainfo = pubrinfo->pubschema;
+	PublicationInfo *pubinfo = pubrinfo->publication;
+	PQExpBuffer query;
+	char	   *tag;
+
+	if (!(pubrinfo->dobj.dump & DUMP_COMPONENT_DEFINITION))
+		return;
+
+	tag = psprintf("%s %s", pubrinfo->pubname, schemainfo->dobj.name);
+
+	query = createPQExpBuffer();
+
+	appendPQExpBuffer(query, "ALTER PUBLICATION %s ", fmtId(pubrinfo->pubname));
+	appendPQExpBuffer(query, "ADD SCHEMA %s;\n", fmtId(schemainfo->dobj.name));
+
+	/*
+	 * There is no point in creating drop query as the drop is done by schema
+	 * drop.
+	 */
+	ArchiveEntry(fout, pubrinfo->dobj.catId, pubrinfo->dobj.dumpId,
+				 ARCHIVE_OPTS(.tag = tag,
+							  .namespace = schemainfo->dobj.name,
+							  .owner = pubinfo->rolname,
+							  .description = "PUBLICATION SCHEMA",
+							  .section = SECTION_POST_DATA,
+							  .createStmt = query->data));
+
+	free(tag);
+	destroyPQExpBuffer(query);
+}
+
 /*
  * dumpPublicationTable
  *	  dump the definition of the given publication table mapping
@@ -10332,6 +10481,9 @@ dumpDumpableObject(Archive *fout, const DumpableObject *dobj)
 		case DO_PUBLICATION_REL:
 			dumpPublicationTable(fout, (const PublicationRelInfo *) dobj);
 			break;
+		case DO_PUBLICATION_SCHEMA:
+			dumpPublicationSchema(fout, (PublicationSchemaInfo *) dobj);
+			break;
 		case DO_SUBSCRIPTION:
 			dumpSubscription(fout, (const SubscriptionInfo *) dobj);
 			break;
@@ -18528,6 +18680,7 @@ addBoundaryDependencies(DumpableObject **dobjs, int numObjs,
 			case DO_POLICY:
 			case DO_PUBLICATION:
 			case DO_PUBLICATION_REL:
+			case DO_PUBLICATION_SCHEMA:
 			case DO_SUBSCRIPTION:
 				/* Post-data objects: must come after the post-data boundary */
 				addObjectDependency(dobj, postDataBound->dumpId);
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 49e1b0a09c..a9db477f25 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -81,6 +81,7 @@ typedef enum
 	DO_POLICY,
 	DO_PUBLICATION,
 	DO_PUBLICATION_REL,
+	DO_PUBLICATION_SCHEMA,
 	DO_SUBSCRIPTION
 } DumpableObjectType;
 
@@ -613,6 +614,7 @@ typedef struct _PublicationInfo
 	bool		pubdelete;
 	bool		pubtruncate;
 	bool		pubviaroot;
+	char		pubtype;
 } PublicationInfo;
 
 /*
@@ -626,6 +628,18 @@ typedef struct _PublicationRelInfo
 	TableInfo  *pubtable;
 } PublicationRelInfo;
 
+/*
+ * The PublicationSchemaInfo struct is used to represent publication schema
+ * mapping.
+ */
+typedef struct PublicationSchemaInfo
+{
+	DumpableObject 	dobj;
+	NamespaceInfo  *pubschema;
+	char		   *pubname;
+	PublicationInfo *publication;
+} PublicationSchemaInfo;
+
 /*
  * The SubscriptionInfo struct is used to represent subscription.
  */
@@ -731,6 +745,8 @@ extern PublicationInfo *getPublications(Archive *fout,
 										int *numPublications);
 extern void getPublicationTables(Archive *fout, TableInfo tblinfo[],
 								 int numTables);
+extern void getPublicationSchemas(Archive *fout, NamespaceInfo nspinfo[],
+								 int numSchemas);
 extern void getSubscriptions(Archive *fout);
 
 #endif							/* PG_DUMP_H */
diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c
index 46461fb6a1..13a6fcd660 100644
--- a/src/bin/pg_dump/pg_dump_sort.c
+++ b/src/bin/pg_dump/pg_dump_sort.c
@@ -82,6 +82,7 @@ enum dbObjectTypePriorities
 	PRIO_POLICY,
 	PRIO_PUBLICATION,
 	PRIO_PUBLICATION_REL,
+	PRIO_PUBLICATION_SCHEMA,
 	PRIO_SUBSCRIPTION,
 	PRIO_DEFAULT_ACL,			/* done in ACL pass */
 	PRIO_EVENT_TRIGGER,			/* must be next to last! */
@@ -135,6 +136,7 @@ static const int dbObjectTypePriority[] =
 	PRIO_POLICY,				/* DO_POLICY */
 	PRIO_PUBLICATION,			/* DO_PUBLICATION */
 	PRIO_PUBLICATION_REL,		/* DO_PUBLICATION_REL */
+	PRIO_PUBLICATION_SCHEMA,	/* DO_PUBLICATION_SCHEMA */
 	PRIO_SUBSCRIPTION			/* DO_SUBSCRIPTION */
 };
 
@@ -1477,6 +1479,11 @@ describeDumpableObject(DumpableObject *obj, char *buf, int bufsize)
 					 "PUBLICATION TABLE (ID %d OID %u)",
 					 obj->dumpId, obj->catId.oid);
 			return;
+		case DO_PUBLICATION_SCHEMA:
+			snprintf(buf, bufsize,
+					 "PUBLICATION SCHEMA (ID %d OID %u)",
+					 obj->dumpId, obj->catId.oid);
+			return;
 		case DO_SUBSCRIPTION:
 			snprintf(buf, bufsize,
 					 "SUBSCRIPTION (ID %d OID %u)",
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 2abf255798..73417cbfc4 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -19,6 +19,7 @@
 #include "catalog/pg_cast_d.h"
 #include "catalog/pg_class_d.h"
 #include "catalog/pg_default_acl_d.h"
+#include "catalog/pg_publication.h"
 #include "common.h"
 #include "common/logging.h"
 #include "describe.h"
@@ -3147,17 +3148,40 @@ describeOneTableDetails(const char *schemaname,
 		/* print any publications */
 		if (pset.sversion >= 100000)
 		{
-			printfPQExpBuffer(&buf,
-							  "SELECT pubname\n"
-							  "FROM pg_catalog.pg_publication p\n"
-							  "JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n"
-							  "WHERE pr.prrelid = '%s'\n"
-							  "UNION ALL\n"
-							  "SELECT pubname\n"
-							  "FROM pg_catalog.pg_publication p\n"
-							  "WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n"
-							  "ORDER BY 1;",
-							  oid, oid);
+			if (pset.sversion >= 14000)
+			{
+				printfPQExpBuffer(&buf,
+								  "SELECT p.pubname\n"
+								  "FROM pg_catalog.pg_publication p\n"
+								  "		JOIN pg_catalog.pg_publication_schema ps ON p.oid = ps.pspubid AND p.pubtype = 's'\n"
+								  "		JOIN pg_catalog.pg_class pc ON pc.relnamespace = ps.psnspcid AND pc.oid = '%s'\n"
+								  "UNION ALL\n"
+								  "SELECT pubname\n"
+								  "FROM pg_catalog.pg_publication p\n"
+								  "		JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n"
+								  "WHERE p.pubtype = 't' AND pr.prrelid = '%s'\n"
+								  "UNION ALL\n"
+								  "SELECT pubname\n"
+								  "FROM pg_catalog.pg_publication p\n"
+								  "WHERE p.pubtype = 'a' \n"
+								  "		AND pg_catalog.pg_relation_is_publishable('%s')\n"
+								  "ORDER BY 1;",
+								  oid, oid, oid);
+			}
+			else
+			{
+				printfPQExpBuffer(&buf,
+								  "SELECT pubname\n"
+								  "FROM pg_catalog.pg_publication p\n"
+								  "JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n"
+								  "WHERE pr.prrelid = '%s'\n"
+								  "UNION ALL\n"
+								  "SELECT pubname\n"
+								  "FROM pg_catalog.pg_publication p\n"
+								  "WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n"
+								  "ORDER BY 1;",
+								  oid, oid);
+			}
 
 			result = PSQLexec(buf.data);
 			if (!result)
@@ -5021,6 +5045,8 @@ listSchemas(const char *pattern, bool verbose, bool showSystem)
 	PQExpBufferData buf;
 	PGresult   *res;
 	printQueryOpt myopt = pset.popt;
+	int			pub_schema_tuples = 0;
+	char	  **footers = NULL;
 
 	initPQExpBuffer(&buf);
 	printfPQExpBuffer(&buf,
@@ -5061,9 +5087,66 @@ listSchemas(const char *pattern, bool verbose, bool showSystem)
 	myopt.title = _("List of schemas");
 	myopt.translate_header = true;
 
+	if (pattern && pset.sversion >= 140000)
+	{
+		PGresult   *result;
+		int			i;
+
+		printfPQExpBuffer(&buf,
+						  "SELECT p.pubname FROM pg_catalog.pg_publication p,\n"
+						  "pg_catalog.pg_namespace n,\n"
+						  "pg_catalog.pg_publication_schema ps\n"
+						  "WHERE n.oid = ps.psnspcid AND\n"
+						  "p.oid = ps.pspubid AND n.nspname = '%s'\n"
+						  "ORDER BY 1",
+						  pattern);
+		result = PSQLexec(buf.data);
+		if (!result)
+			return true;
+		else
+			pub_schema_tuples = PQntuples(result);
+
+		if (pub_schema_tuples > 0)
+		{
+			/*
+			 * Allocate memory for footers. Size of footers will be 1 (for
+			 * storing "Publications:" string) + Schema count +  1 (for storing
+			 * NULL)
+			 */
+			footers = (char **) palloc((1 + pub_schema_tuples + 1) * sizeof(char *));
+			footers[0] = pstrdup("Publications:");
+
+			/* Might be an empty set - that's ok */
+			for (i = 0; i < pub_schema_tuples; i++)
+			{
+				printfPQExpBuffer(&buf, "    \"%s\"",
+								  PQgetvalue(result, i, 0));
+
+				footers[i + 1] = pstrdup(buf.data);
+			}
+
+			footers[i + 1] = NULL;
+			myopt.footers = footers;
+		}
+
+		PQclear(result);
+	}
+
 	printQuery(res, &myopt, pset.queryFout, false, pset.logfile);
 
 	PQclear(res);
+
+	/* free the memory allocated for the footer */
+	if (footers)
+	{
+		char	  **footer = NULL;
+
+		for (footer = footers; *footer; footer++)
+			pfree(*footer);
+
+		pfree(footers);
+	}
+
 	return true;
 }
 
@@ -6147,7 +6230,7 @@ listPublications(const char *pattern)
 	PQExpBufferData buf;
 	PGresult   *res;
 	printQueryOpt myopt = pset.popt;
-	static const bool translate_columns[] = {false, false, false, false, false, false, false, false};
+	static const bool translate_columns[] = {false, false, false, false, false, false, false, false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -6182,6 +6265,10 @@ listPublications(const char *pattern)
 		appendPQExpBuffer(&buf,
 						  ",\n  pubviaroot AS \"%s\"",
 						  gettext_noop("Via root"));
+	if (pset.sversion >= 140000)
+		appendPQExpBuffer(&buf,
+						  ",\n  pubtype AS \"%s\"",
+						  gettext_noop("PubType"));
 
 	appendPQExpBufferStr(&buf,
 						 "\nFROM pg_catalog.pg_publication\n");
@@ -6210,6 +6297,39 @@ listPublications(const char *pattern)
 	return true;
 }
 
+static bool
+addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg,
+						   bool singlecol, printTableContent *cont)
+{
+	PGresult   *res;
+	int			count = 0;
+	int 		i = 0;
+
+	res = PSQLexec(buf->data);
+	if (!res)
+		return false;
+	else
+		count = PQntuples(res);
+
+	if (count > 0)
+		printTableAddFooter(cont, _(footermsg));
+
+	for (i = 0; i < count; i++)
+	{
+		if (!singlecol)
+			printfPQExpBuffer(buf, "    \"%s.%s\"", PQgetvalue(res, i, 0),
+							  PQgetvalue(res, i, 1));
+		else
+			printfPQExpBuffer(buf, "    \"%s\"", PQgetvalue(res, i, 0));
+
+		printTableAddFooter(cont, buf->data);
+	}
+
+	PQclear(res);
+	termPQExpBuffer(buf);
+	return true;
+}
+
 /*
  * \dRp+
  * Describes publications including the contents.
@@ -6224,6 +6344,9 @@ describePublications(const char *pattern)
 	PGresult   *res;
 	bool		has_pubtruncate;
 	bool		has_pubviaroot;
+	bool		has_pubtype;
+	PQExpBufferData title;
+	printTableContent cont;
 
 	if (pset.sversion < 100000)
 	{
@@ -6237,6 +6360,7 @@ describePublications(const char *pattern)
 
 	has_pubtruncate = (pset.sversion >= 110000);
 	has_pubviaroot = (pset.sversion >= 130000);
+	has_pubtype = (pset.sversion >= 140000);
 
 	initPQExpBuffer(&buf);
 
@@ -6250,6 +6374,10 @@ describePublications(const char *pattern)
 	if (has_pubviaroot)
 		appendPQExpBufferStr(&buf,
 							 ", pubviaroot");
+	if (has_pubtype)
+		appendPQExpBufferStr(&buf,
+							 ", pubtype");
+
 	appendPQExpBufferStr(&buf,
 						 "\nFROM pg_catalog.pg_publication\n");
 
@@ -6287,20 +6415,18 @@ describePublications(const char *pattern)
 		const char	align = 'l';
 		int			ncols = 5;
 		int			nrows = 1;
-		int			tables = 0;
-		PGresult   *tabres;
 		char	   *pubid = PQgetvalue(res, i, 0);
 		char	   *pubname = PQgetvalue(res, i, 1);
 		bool		puballtables = strcmp(PQgetvalue(res, i, 3), "t") == 0;
-		int			j;
-		PQExpBufferData title;
+		char		pubtype;
 		printTableOpt myopt = pset.popt.topt;
-		printTableContent cont;
 
 		if (has_pubtruncate)
 			ncols++;
 		if (has_pubviaroot)
 			ncols++;
+		if (has_pubtype)
+			ncols++;
 
 		initPQExpBuffer(&title);
 		printfPQExpBuffer(&title, _("Publication %s"), pubname);
@@ -6315,6 +6441,8 @@ describePublications(const char *pattern)
 			printTableAddHeader(&cont, gettext_noop("Truncates"), true, align);
 		if (has_pubviaroot)
 			printTableAddHeader(&cont, gettext_noop("Via root"), true, align);
+		if (has_pubtype)
+			printTableAddHeader(&cont, gettext_noop("Pubtype"), true, align);
 
 		printTableAddCell(&cont, PQgetvalue(res, i, 2), false, false);
 		printTableAddCell(&cont, PQgetvalue(res, i, 3), false, false);
@@ -6325,8 +6453,16 @@ describePublications(const char *pattern)
 			printTableAddCell(&cont, PQgetvalue(res, i, 7), false, false);
 		if (has_pubviaroot)
 			printTableAddCell(&cont, PQgetvalue(res, i, 8), false, false);
+		if (has_pubtype)
+		{
+			char	   *type = PQgetvalue(res, i, 9);
+			pubtype = get_publication_type(type);
+			printTableAddCell(&cont, type, false, false);
+		}
 
-		if (!puballtables)
+		/* Prior to version 14 check was based on all tables */
+		if ((has_pubtype && pubtype == PUBTYPE_TABLE) ||
+			(!has_pubtype && !puballtables))
 		{
 			printfPQExpBuffer(&buf,
 							  "SELECT n.nspname, c.relname\n"
@@ -6337,31 +6473,20 @@ describePublications(const char *pattern)
 							  "  AND c.oid = pr.prrelid\n"
 							  "  AND pr.prpubid = '%s'\n"
 							  "ORDER BY 1,2", pubid);
-
-			tabres = PSQLexec(buf.data);
-			if (!tabres)
-			{
-				printTableCleanup(&cont);
-				PQclear(res);
-				termPQExpBuffer(&buf);
-				termPQExpBuffer(&title);
-				return false;
-			}
-			else
-				tables = PQntuples(tabres);
-
-			if (tables > 0)
-				printTableAddFooter(&cont, _("Tables:"));
-
-			for (j = 0; j < tables; j++)
-			{
-				printfPQExpBuffer(&buf, "    \"%s.%s\"",
-								  PQgetvalue(tabres, j, 0),
-								  PQgetvalue(tabres, j, 1));
-
-				printTableAddFooter(&cont, buf.data);
-			}
-			PQclear(tabres);
+			if (!addFooterToPublicationDesc(&buf, "Tables:", false, &cont))
+				goto error_return;
+		}
+		else if (has_pubtype && pubtype == PUBTYPE_SCHEMA)
+		{
+			printfPQExpBuffer(&buf,
+							  "SELECT n.nspname\n"
+							  "FROM pg_catalog.pg_namespace n,\n"
+							  "     pg_catalog.pg_publication_schema ps\n"
+							  "WHERE n.oid = ps.psnspcid\n"
+							  "  AND ps.pspubid = '%s'\n"
+							  "ORDER BY 1", pubid);
+			if (!addFooterToPublicationDesc(&buf, "Schemas:", true, &cont))
+				goto error_return;
 		}
 
 		printTable(&cont, pset.queryFout, false, pset.logfile);
@@ -6374,6 +6499,13 @@ describePublications(const char *pattern)
 	PQclear(res);
 
 	return true;
+
+error_return:
+	printTableCleanup(&cont);
+	PQclear(res);
+	termPQExpBuffer(&buf);
+	termPQExpBuffer(&title);
+	return false;
 }
 
 /*
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 38af5682f2..f14543792e 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1640,10 +1640,19 @@ psql_completion(const char *text, int start, int end)
 
 	/* ALTER PUBLICATION <name> */
 	else if (Matches("ALTER", "PUBLICATION", MatchAny))
-		COMPLETE_WITH("ADD TABLE", "DROP TABLE", "OWNER TO", "RENAME TO", "SET");
+		COMPLETE_WITH("ADD", "DROP", "OWNER TO", "RENAME TO", "SET");
+	/* ALTER PUBLICATION <name> ADD */
+	else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD"))
+		COMPLETE_WITH("SCHEMA", "TABLE");
+	/* ALTER PUBLICATION <name> DROP */
+	else if (Matches("ALTER", "PUBLICATION", MatchAny, "DROP"))
+		COMPLETE_WITH("SCHEMA", "TABLE");
 	/* ALTER PUBLICATION <name> SET */
 	else if (Matches("ALTER", "PUBLICATION", MatchAny, "SET"))
-		COMPLETE_WITH("(", "TABLE");
+		COMPLETE_WITH("(", "SCHEMA", "TABLE");
+	else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD|DROP|SET", "SCHEMA"))
+		COMPLETE_WITH_QUERY(Query_for_list_of_schemas
+							" UNION SELECT 'CURRENT_SCHEMA'");
 	/* ALTER PUBLICATION <name> SET ( */
 	else if (HeadMatches("ALTER", "PUBLICATION", MatchAny) && TailMatches("SET", "("))
 		COMPLETE_WITH("publish", "publish_via_partition_root");
@@ -2630,15 +2639,20 @@ psql_completion(const char *text, int start, int end)
 
 /* CREATE PUBLICATION */
 	else if (Matches("CREATE", "PUBLICATION", MatchAny))
-		COMPLETE_WITH("FOR TABLE", "FOR ALL TABLES", "WITH (");
+		COMPLETE_WITH("FOR TABLE", "FOR ALL TABLES", "FOR SCHEMA", "WITH (");
 	else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR"))
-		COMPLETE_WITH("TABLE", "ALL TABLES");
+		COMPLETE_WITH("TABLE", "ALL TABLES", "SCHEMA");
 	/* Complete "CREATE PUBLICATION <name> FOR TABLE <table>, ..." */
 	else if (HeadMatches("CREATE", "PUBLICATION", MatchAny, "FOR", "TABLE"))
 		COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables, NULL);
 	/* Complete "CREATE PUBLICATION <name> [...] WITH" */
 	else if (HeadMatches("CREATE", "PUBLICATION") && TailMatches("WITH", "("))
 		COMPLETE_WITH("publish", "publish_via_partition_root");
+	/* Complete "CREATE PUBLICATION <name> FOR SCHEMA <schema>, ..." */
+	else if (HeadMatches("CREATE", "PUBLICATION", MatchAny, "FOR", "SCHEMA"))
+		COMPLETE_WITH_QUERY(Query_for_list_of_schemas
+							" UNION SELECT 'CURRENT_SCHEMA' "
+							"UNION SELECT 'WITH ('");
 
 /* CREATE RULE */
 	/* Complete "CREATE [ OR REPLACE ] RULE <sth>" with "AS ON" */
diff --git a/src/include/catalog/dependency.h b/src/include/catalog/dependency.h
index fd44081e74..08ec4c79f1 100644
--- a/src/include/catalog/dependency.h
+++ b/src/include/catalog/dependency.h
@@ -131,6 +131,7 @@ typedef enum ObjectClass
 	OCLASS_POLICY,				/* pg_policy */
 	OCLASS_PUBLICATION,			/* pg_publication */
 	OCLASS_PUBLICATION_REL,		/* pg_publication_rel */
+	OCLASS_PUBLICATION_SCHEMA,	/* pg_publication_schema */
 	OCLASS_SUBSCRIPTION,		/* pg_subscription */
 	OCLASS_TRANSFORM			/* pg_transform */
 } ObjectClass;
diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h
index 1b31fee9e3..f67f92f918 100644
--- a/src/include/catalog/pg_publication.h
+++ b/src/include/catalog/pg_publication.h
@@ -18,7 +18,6 @@
 #define PG_PUBLICATION_H
 
 #include "catalog/genbki.h"
-#include "catalog/objectaddress.h"
 #include "catalog/pg_publication_d.h"
 
 /* ----------------
@@ -54,6 +53,9 @@ CATALOG(pg_publication,6104,PublicationRelationId)
 
 	/* true if partition changes are published using root schema */
 	bool		pubviaroot;
+
+	/* see PUBTYPE_xxx constants below */
+	char		pubtype;
 } FormData_pg_publication;
 
 /* ----------------
@@ -83,12 +85,9 @@ typedef struct Publication
 	bool		alltables;
 	bool		pubviaroot;
 	PublicationActions pubactions;
+	char 		pubtype;
 } Publication;
 
-extern Publication *GetPublication(Oid pubid);
-extern Publication *GetPublicationByName(const char *pubname, bool missing_ok);
-extern List *GetRelationPublications(Oid relid);
-
 /*---------
  * Expected values for pub_partopt parameter of GetRelationPublications(),
  * which allows callers to specify which partitions of partitioned tables
@@ -105,16 +104,26 @@ typedef enum PublicationPartOpt
 	PUBLICATION_PART_ALL,
 } PublicationPartOpt;
 
-extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt);
-extern List *GetAllTablesPublications(void);
-extern List *GetAllTablesPublicationRelations(bool pubviaroot);
-
-extern bool is_publishable_relation(Relation rel);
-extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel,
-											  bool if_not_exists);
-
-extern Oid	get_publication_oid(const char *pubname, bool missing_ok);
-extern char *get_publication_name(Oid pubid, bool missing_ok);
-
+/* Publication types */
+#define                  PUBTYPE_ALLTABLES               'a'   /* all tables publication */
+#define                  PUBTYPE_TABLE                   't'   /* table publication */
+#define                  PUBTYPE_SCHEMA                  's'   /* schema publication */
+#define                  PUBTYPE_EMPTY                   'e'   /* empty publication */
+
+/*
+ * Return the publication type.
+*/
+static inline char
+get_publication_type(char *strpubtype)
+{
+	if (strcmp(strpubtype,"a") == 0)
+		return PUBTYPE_ALLTABLES;
+	else if(strcmp(strpubtype,"t") == 0)
+		return PUBTYPE_TABLE;
+	else if (strcmp(strpubtype,"s") == 0)
+		return PUBTYPE_SCHEMA;
+
+	return PUBTYPE_EMPTY;
+}
 
 #endif							/* PG_PUBLICATION_H */
diff --git a/src/include/catalog/pg_publication_schema.h b/src/include/catalog/pg_publication_schema.h
new file mode 100644
index 0000000000..b0c9361d91
--- /dev/null
+++ b/src/include/catalog/pg_publication_schema.h
@@ -0,0 +1,49 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_publication_schema.h
+ *	  definition of the system catalog for mappings between schemas and
+ *	  publications (pg_publication_schema)
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/catalog/pg_publication_schema.h
+ *
+ * NOTES
+ *	  The Catalog.pm module reads this file and derives schema
+ *	  information.
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_PUBLICATION_SCHEMA_H
+#define PG_PUBLICATION_SCHEMA_H
+
+#include "catalog/genbki.h"
+#include "catalog/pg_publication_schema_d.h"
+
+
+/* ----------------
+ *		pg_publication_schema definition.  cpp turns this into
+ *		typedef struct FormData_pg_publication_schema
+ * ----------------
+ */
+CATALOG(pg_publication_schema,8901,PublicationSchemaRelationId)
+{
+	Oid			oid;			/* oid */
+	Oid			pspubid BKI_LOOKUP(pg_publication);	/* Oid of the publication */
+	Oid			psnspcid BKI_LOOKUP(pg_class);		/* Oid of the schema */
+} FormData_pg_publication_schema;
+
+/* ----------------
+ *		Form_pg_publication_schema corresponds to a pointer to a tuple with
+ *		the format of pg_publication_schema relation.
+ * ----------------
+ */
+typedef FormData_pg_publication_schema *Form_pg_publication_schema;
+
+DECLARE_UNIQUE_INDEX_PKEY(pg_publication_schema_oid_index, 8902, on pg_publication_schema using btree(oid oid_ops));
+#define PublicationSchemaObjectIndexId 8902
+DECLARE_UNIQUE_INDEX(pg_publication_schema_psnspcid_pspubid_index, 8903, on pg_publication_schema using btree(psnspcid oid_ops, pspubid oid_ops));
+#define PublicationSchemaPsnspcidPspubidIndexId 8903
+
+#endif							/* PG_PUBLICATION_SCHEMA_H */
diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h
index 00e2e626e6..3c2a77d0b0 100644
--- a/src/include/commands/publicationcmds.h
+++ b/src/include/commands/publicationcmds.h
@@ -16,13 +16,34 @@
 #define PUBLICATIONCMDS_H
 
 #include "catalog/objectaddress.h"
+#include "catalog/pg_publication.h"
 #include "nodes/parsenodes.h"
 
 extern ObjectAddress CreatePublication(CreatePublicationStmt *stmt);
 extern void AlterPublication(AlterPublicationStmt *stmt);
 extern void RemovePublicationRelById(Oid proid);
+extern void RemovePublicationSchemaById(Oid psoid);
 
 extern ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId);
 extern void AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId);
 
+extern Publication *GetPublication(Oid pubid);
+extern Publication *GetPublicationByName(const char *pubname, bool missing_ok);
+extern List *GetRelationPublications(Oid relid);
+
+extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt);
+extern List *GetPublicationSchemas(Oid pubid);
+extern List *GetAllTablesPublications(void);
+extern List *GetAllTablesPublicationRelations(bool pubviaroot, Oid schemaOid);
+extern List *GetAllSchemasPublicationRelations(Publication *publication);
+
+extern bool is_publishable_relation(Relation rel);
+extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel,
+											  bool if_not_exists);
+extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaoid,
+											bool if_not_exists);
+
+extern Oid	get_publication_oid(const char *pubname, bool missing_ok);
+extern char *get_publication_name(Oid pubid, bool missing_ok);
+
 #endif							/* PUBLICATIONCMDS_H */
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index d9e417bcd7..dfeade5bf8 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -484,6 +484,7 @@ typedef enum NodeTag
 	T_CTECycleClause,
 	T_CommonTableExpr,
 	T_RoleSpec,
+	T_SchemaSpec,
 	T_TriggerTransition,
 	T_PartitionElem,
 	T_PartitionSpec,
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index def9651b34..169bdce07c 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -341,6 +341,23 @@ typedef struct RoleSpec
 	int			location;		/* token location, or -1 if unknown */
 } RoleSpec;
 
+/*
+ * SchemaSpec - a schema name or CURRENT_SCHEMA.
+ */
+typedef enum SchemaSpecType
+{
+	SCHEMASPEC_CSTRING,				/* schema name is stored as a C string */
+	SCHEMASPEC_CURRENT_SCHEMA		/* schema spec is CURRENT_SCHEMA */
+} SchemaSpecType;
+
+typedef struct SchemaSpec
+{
+	NodeTag			type;
+	SchemaSpecType 	schematype;		/* Type of this rolespec */
+	char	   	   *schemaname;		/* filled only for ROLESPEC_CSTRING */
+	int				location;		/* token location, or -1 if unknown */
+} SchemaSpec;
+
 /*
  * FuncCall - a function or aggregate invocation
  *
@@ -1805,6 +1822,7 @@ typedef enum ObjectType
 	OBJECT_PROCEDURE,
 	OBJECT_PUBLICATION,
 	OBJECT_PUBLICATION_REL,
+	OBJECT_PUBLICATION_SCHEMA,
 	OBJECT_ROLE,
 	OBJECT_ROUTINE,
 	OBJECT_RULE,
@@ -3631,6 +3649,7 @@ typedef struct CreatePublicationStmt
 	List	   *options;		/* List of DefElem nodes */
 	List	   *tables;			/* Optional list of tables to add */
 	bool		for_all_tables; /* Special publication for all tables in db */
+	List	   *schemas;		/* Optional list of schemas */
 } CreatePublicationStmt;
 
 typedef struct AlterPublicationStmt
@@ -3645,6 +3664,7 @@ typedef struct AlterPublicationStmt
 	List	   *tables;			/* List of tables to add/drop */
 	bool		for_all_tables; /* Special publication for all tables in db */
 	DefElemAction tableAction;	/* What action to perform with the tables */
+	List	   *schemas;		/* Optional list of schemas */
 } AlterPublicationStmt;
 
 typedef struct CreateSubscriptionStmt
diff --git a/src/include/utils/syscache.h b/src/include/utils/syscache.h
index d74a348600..1ba295206a 100644
--- a/src/include/utils/syscache.h
+++ b/src/include/utils/syscache.h
@@ -79,6 +79,8 @@ enum SysCacheIdentifier
 	PUBLICATIONOID,
 	PUBLICATIONREL,
 	PUBLICATIONRELMAP,
+	PUBLICATIONSCHEMA,
+	PUBLICATIONSCHEMAMAP,
 	RANGEMULTIRANGE,
 	RANGETYPE,
 	RELNAMENSP,
diff --git a/src/test/regress/expected/oidjoins.out b/src/test/regress/expected/oidjoins.out
index 1461e947cd..ddb421c394 100644
--- a/src/test/regress/expected/oidjoins.out
+++ b/src/test/regress/expected/oidjoins.out
@@ -260,6 +260,8 @@ NOTICE:  checking pg_sequence {seqtypid} => pg_type {oid}
 NOTICE:  checking pg_publication {pubowner} => pg_authid {oid}
 NOTICE:  checking pg_publication_rel {prpubid} => pg_publication {oid}
 NOTICE:  checking pg_publication_rel {prrelid} => pg_class {oid}
+NOTICE:  checking pg_publication_schema {pspubid} => pg_publication {oid}
+NOTICE:  checking pg_publication_schema {psnspcid} => pg_class {oid}
 NOTICE:  checking pg_subscription {subdbid} => pg_database {oid}
 NOTICE:  checking pg_subscription {subowner} => pg_authid {oid}
 NOTICE:  checking pg_subscription_rel {srsubid} => pg_subscription {oid}
diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out
index 63d6ab7a4e..28bf8daa64 100644
--- a/src/test/regress/expected/publication.out
+++ b/src/test/regress/expected/publication.out
@@ -28,20 +28,20 @@ ERROR:  unrecognized "publish" value: "cluster"
 CREATE PUBLICATION testpub_xxx WITH (publish_via_partition_root = 'true', publish_via_partition_root = '0');
 ERROR:  conflicting or redundant options
 \dRp
-                                              List of publications
-        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------+--------------------------+------------+---------+---------+---------+-----------+----------
- testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f         | f
- testpub_default    | regress_publication_user | f          | f       | t       | f       | f         | f
+                                                   List of publications
+        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | PubType 
+--------------------+--------------------------+------------+---------+---------+---------+-----------+----------+---------
+ testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f         | f        | e
+ testpub_default    | regress_publication_user | f          | f       | t       | f       | f         | f        | e
 (2 rows)
 
 ALTER PUBLICATION testpub_default SET (publish = 'insert, update, delete');
 \dRp
-                                              List of publications
-        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------+--------------------------+------------+---------+---------+---------+-----------+----------
- testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f         | f
- testpub_default    | regress_publication_user | f          | t       | t       | t       | f         | f
+                                                   List of publications
+        Name        |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | PubType 
+--------------------+--------------------------+------------+---------+---------+---------+-----------+----------+---------
+ testpib_ins_trunct | regress_publication_user | f          | t       | f       | f       | f         | f        | e
+ testpub_default    | regress_publication_user | f          | t       | t       | t       | f         | f        | e
 (2 rows)
 
 --- adding tables
@@ -85,10 +85,10 @@ Publications:
     "testpub_foralltables"
 
 \dRp+ testpub_foralltables
-                              Publication testpub_foralltables
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | t          | t       | t       | f       | f         | f
+                                   Publication testpub_foralltables
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtype 
+--------------------------+------------+---------+---------+---------+-----------+----------+---------
+ regress_publication_user | t          | t       | t       | f       | f         | f        | a
 (1 row)
 
 DROP TABLE testpub_tbl2;
@@ -100,19 +100,19 @@ CREATE PUBLICATION testpub3 FOR TABLE testpub_tbl3;
 CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl3;
 RESET client_min_messages;
 \dRp+ testpub3
-                                    Publication testpub3
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                         Publication testpub3
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtype 
+--------------------------+------------+---------+---------+---------+-----------+----------+---------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | t
 Tables:
     "public.testpub_tbl3"
     "public.testpub_tbl3a"
 
 \dRp+ testpub4
-                                    Publication testpub4
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                         Publication testpub4
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtype 
+--------------------------+------------+---------+---------+---------+-----------+----------+---------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | t
 Tables:
     "public.testpub_tbl3"
 
@@ -131,10 +131,10 @@ ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1);
 -- only parent is listed as being in publication, not the partition
 ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted;
 \dRp+ testpub_forparted
-                               Publication testpub_forparted
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                    Publication testpub_forparted
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtype 
+--------------------------+------------+---------+---------+---------+-----------+----------+---------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | t
 Tables:
     "public.testpub_parted"
 
@@ -147,10 +147,10 @@ ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1;
 UPDATE testpub_parted1 SET a = 1;
 ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true);
 \dRp+ testpub_forparted
-                               Publication testpub_forparted
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | t
+                                    Publication testpub_forparted
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtype 
+--------------------------+------------+---------+---------+---------+-----------+----------+---------
+ regress_publication_user | f          | t       | t       | t       | t         | t        | t
 Tables:
     "public.testpub_parted"
 
@@ -170,10 +170,10 @@ ERROR:  relation "testpub_tbl1" is already member of publication "testpub_fortbl
 CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1;
 ERROR:  publication "testpub_fortbl" already exists
 \dRp+ testpub_fortbl
-                                 Publication testpub_fortbl
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | t         | f
+                                      Publication testpub_fortbl
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtype 
+--------------------------+------------+---------+---------+---------+-----------+----------+---------
+ regress_publication_user | f          | t       | t       | t       | t         | f        | t
 Tables:
     "pub_test.testpub_nopk"
     "public.testpub_tbl1"
@@ -211,10 +211,10 @@ Publications:
     "testpub_fortbl"
 
 \dRp+ testpub_default
-                                Publication testpub_default
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | f         | f
+                                     Publication testpub_default
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtype 
+--------------------------+------------+---------+---------+---------+-----------+----------+---------
+ regress_publication_user | f          | t       | t       | t       | f         | f        | t
 Tables:
     "pub_test.testpub_nopk"
     "public.testpub_tbl1"
@@ -258,10 +258,10 @@ DROP TABLE testpub_parted;
 DROP VIEW testpub_view;
 DROP TABLE testpub_tbl1;
 \dRp+ testpub_default
-                                Publication testpub_default
-          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
---------------------------+------------+---------+---------+---------+-----------+----------
- regress_publication_user | f          | t       | t       | t       | f         | f
+                                     Publication testpub_default
+          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtype 
+--------------------------+------------+---------+---------+---------+-----------+----------+---------
+ regress_publication_user | f          | t       | t       | t       | f         | f        | t
 (1 row)
 
 -- fail - must be owner of publication
@@ -271,20 +271,20 @@ ERROR:  must be owner of publication testpub_default
 RESET ROLE;
 ALTER PUBLICATION testpub_default RENAME TO testpub_foo;
 \dRp testpub_foo
-                                           List of publications
-    Name     |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
--------------+--------------------------+------------+---------+---------+---------+-----------+----------
- testpub_foo | regress_publication_user | f          | t       | t       | t       | f         | f
+                                                List of publications
+    Name     |          Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | PubType 
+-------------+--------------------------+------------+---------+---------+---------+-----------+----------+---------
+ testpub_foo | regress_publication_user | f          | t       | t       | t       | f         | f        | t
 (1 row)
 
 -- rename back to keep the rest simple
 ALTER PUBLICATION testpub_foo RENAME TO testpub_default;
 ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2;
 \dRp testpub_default
-                                             List of publications
-      Name       |           Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root 
------------------+---------------------------+------------+---------+---------+---------+-----------+----------
- testpub_default | regress_publication_user2 | f          | t       | t       | t       | f         | f
+                                                  List of publications
+      Name       |           Owner           | All tables | Inserts | Updates | Deletes | Truncates | Via root | PubType 
+-----------------+---------------------------+------------+---------+---------+---------+-----------+----------+---------
+ testpub_default | regress_publication_user2 | f          | t       | t       | t       | f         | f        | t
 (1 row)
 
 DROP PUBLICATION testpub_default;
diff --git a/src/test/regress/expected/sanity_check.out b/src/test/regress/expected/sanity_check.out
index d9ce961be2..fe5a038824 100644
--- a/src/test/regress/expected/sanity_check.out
+++ b/src/test/regress/expected/sanity_check.out
@@ -140,6 +140,7 @@ pg_policy|t
 pg_proc|t
 pg_publication|t
 pg_publication_rel|t
+pg_publication_schema|t
 pg_range|t
 pg_replication_origin|t
 pg_rewrite|t
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index abdb08319c..b4d1c81898 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2031,6 +2031,7 @@ PublicationActions
 PublicationInfo
 PublicationPartOpt
 PublicationRelInfo
+PublicationSchemaInfo
 PullFilter
 PullFilterOps
 PushFilter
@@ -2322,6 +2323,8 @@ ScanState
 ScanTypeControl
 ScannerCallbackState
 SchemaQuery
+SchemaSpec
+SchemaSpecType
 SecBuffer
 SecBufferDesc
 SecLabelItem
-- 
2.25.1

