From 8a1b409f3217e53d557288fac3c0843e53d0710e Mon Sep 17 00:00:00 2001 From: amit Date: Thu, 7 Nov 2019 18:19:33 +0900 Subject: [PATCH v8 1/4] Support adding partitioned tables to publication --- doc/src/sgml/logical-replication.sgml | 15 +-- doc/src/sgml/ref/create_publication.sgml | 27 +++-- src/backend/catalog/pg_publication.c | 42 +++++--- src/backend/commands/copy.c | 2 +- src/backend/commands/publicationcmds.c | 12 ++- src/backend/commands/subscriptioncmds.c | 117 +++++++++++++++++--- src/backend/executor/execMain.c | 7 +- src/backend/executor/execPartition.c | 5 +- src/backend/executor/execReplication.c | 47 ++++---- src/backend/executor/nodeModifyTable.c | 6 +- src/backend/replication/logical/tablesync.c | 1 + src/backend/replication/pgoutput/pgoutput.c | 41 +++++-- src/bin/pg_dump/pg_dump.c | 8 +- src/include/catalog/pg_publication.h | 1 + src/include/executor/executor.h | 8 +- src/test/regress/expected/publication.out | 21 +++- src/test/regress/sql/publication.sql | 12 ++- src/test/subscription/t/013_partition.pl | 161 ++++++++++++++++++++++++++++ 18 files changed, 447 insertions(+), 86 deletions(-) create mode 100644 src/test/subscription/t/013_partition.pl diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index f657d1d06e..4584cb82f6 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -402,13 +402,14 @@ - Replication is only possible from base tables to base tables. That is, - the tables on the publication and on the subscription side must be normal - tables, not views, materialized views, partition root tables, or foreign - tables. In the case of partitions, you can therefore replicate a - partition hierarchy one-to-one, but you cannot currently replicate to a - differently partitioned setup. Attempts to replicate tables other than - base tables will result in an error. + Replication is only supported by regular and partitioned tables, although + the type of the table must match between the two servers, that is, one + cannot replicate from a regular table into a partitioned able or vice + versa. Also, when replicating between partitioned tables, the actual + replication occurs between leaf partitions, so the partitions on the two + servers must match one-to-one. Attempts to replicate other types of + relations such as views, materialized views, or foreign tables, will + result in an error. diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index 99f87ca393..848779a00f 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -68,15 +68,25 @@ CREATE PUBLICATION name that table is added to the publication. If ONLY is not specified, the table and all its descendant tables (if any) are added. Optionally, * can be specified after the table name to - explicitly indicate that descendant tables are included. + explicitly indicate that descendant tables are included. However, adding + a partitioned table to a publication never explicitly adds its partitions, + because partitions are implicitly published due to the partitioned table + being added to the publication. - Only persistent base tables can be part of a publication. Temporary - tables, unlogged tables, foreign tables, materialized views, regular - views, and partitioned tables cannot be part of a publication. To - replicate a partitioned table, add the individual partitions to the - publication. + Only persistent base tables and partitioned tables can be part of a + publication. Temporary tables, unlogged tables, foreign tables, + materialized views, regular views cannot be part of a publication. + + + + When a partitioned table is added to a publication, all of its existing + and future partitions are also implicitly considered to be part of the + publication. So, any INSERT, UPDATE, + and DELETE, and TRUNCATE operations + that are directly applied to a partition are also published via its + ancestors' publications. @@ -132,6 +142,11 @@ CREATE PUBLICATION name empty set of tables. That is useful if tables are to be added later. + + Partitioned tables are not considered when FOR ALL TABLES + is specified. + + The creation of a publication does not start replication. It only defines a grouping and filtering logic for future subscribers. diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index c5eea7af3f..fb369dbe17 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -26,6 +26,7 @@ #include "catalog/namespace.h" #include "catalog/objectaccess.h" #include "catalog/objectaddress.h" +#include "catalog/partition.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_type.h" @@ -47,17 +48,9 @@ static void check_publication_add_relation(Relation targetrel) { - /* Give more specific error for partitioned tables */ - if (RelationGetForm(targetrel)->relkind == RELKIND_PARTITIONED_TABLE) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("\"%s\" is a partitioned table", - RelationGetRelationName(targetrel)), - errdetail("Adding partitioned tables to publications is not supported."), - errhint("You can add the table partitions individually."))); - - /* Must be table */ - if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION) + /* Must be a regular or partitioned table */ + if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION && + RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("\"%s\" is not a table", @@ -103,7 +96,8 @@ check_publication_add_relation(Relation targetrel) static bool is_publishable_class(Oid relid, Form_pg_class reltuple) { - return reltuple->relkind == RELKIND_RELATION && + return (reltuple->relkind == RELKIND_RELATION || + reltuple->relkind == RELKIND_PARTITIONED_TABLE) && !IsCatalogRelationOid(relid) && reltuple->relpersistence == RELPERSISTENCE_PERMANENT && relid >= FirstNormalObjectId; @@ -230,7 +224,7 @@ GetRelationPublications(Oid relid) CatCList *pubrellist; int i; - /* Find all publications associated with the relation. */ + /* Finds all publications associated with the relation. */ pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP, ObjectIdGetDatum(relid)); for (i = 0; i < pubrellist->n_members; i++) @@ -246,6 +240,28 @@ GetRelationPublications(Oid relid) return result; } +/* + * Finds all publications that publish changes to the input relation's + * ancestors. + */ +List * +GetRelationAncestorPublications(Oid relid) +{ + List *ancestors = get_partition_ancestors(relid); + List *ancestor_pubids = NIL; + ListCell *lc; + + foreach(lc, ancestors) + { + Oid ancestor = lfirst_oid(lc); + List *rel_publishers = GetRelationPublications(ancestor); + + ancestor_pubids = list_concat_copy(ancestor_pubids, rel_publishers); + } + + return ancestor_pubids; +} + /* * Gets list of relation oids for a publication. * diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index c93a788798..5a75419caf 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -2837,7 +2837,7 @@ CopyFrom(CopyState cstate) target_resultRelInfo = resultRelInfo; /* Verify the named relation is a valid target for INSERT */ - CheckValidResultRel(resultRelInfo, CMD_INSERT); + CheckValidResultRel(resultRelInfo, NULL, CMD_INSERT); ExecOpenIndices(resultRelInfo, false); diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index f96cb42adc..8f38c63ad2 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -498,7 +498,8 @@ RemovePublicationRelById(Oid proid) /* * Open relations specified by a RangeVar list. - * The returned tables are locked in ShareUpdateExclusiveLock mode. + * The returned tables are locked in ShareUpdateExclusiveLock mode in order to + * add them to a publication. */ static List * OpenTableList(List *tables) @@ -539,8 +540,13 @@ OpenTableList(List *tables) rels = lappend(rels, rel); relids = lappend_oid(relids, myrelid); - /* Add children of this rel, if requested */ - if (recurse) + /* + * Add children of this rel, if requested, so that they too are added + * to the publication. A partitioned table can't have any inheritance + * children other than its partitions, which need not be explicitly + * added to the publication. + */ + if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) { List *children; ListCell *child; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 95962b4a3e..786b15eb27 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -44,7 +44,19 @@ #include "utils/memutils.h" #include "utils/syscache.h" -static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); +/* + * Structure used by fetch_publication_tables to describe a published table. + * The information is used by the callers of fetch_publication_tables to + * generate a pg_subscription_rel catalog entry for the table. + */ +typedef struct PublishedTable +{ + RangeVar *rv; + + char relkind; +} PublishedTable; + +static List *fetch_publication_tables(WalReceiverConn *wrconn, List *publications); /* * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands. @@ -453,18 +465,42 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * Get the table list from publisher and build local table status * info. */ - tables = fetch_table_list(wrconn, publications); + tables = fetch_publication_tables(wrconn, publications); foreach(lc, tables) { - RangeVar *rv = (RangeVar *) lfirst(lc); + PublishedTable *pt = (PublishedTable *) lfirst(lc); + RangeVar *rv = pt->rv; Oid relid; + char local_relkind; relid = RangeVarGetRelid(rv, AccessShareLock, false); + local_relkind = get_rel_relkind(relid); /* Check for supported relkind. */ - CheckSubscriptionRelkind(get_rel_relkind(relid), + CheckSubscriptionRelkind(local_relkind, rv->schemaname, rv->relname); + /* + * Currently, partitioned table replication occurs between leaf + * partitions, so both the source and the target tables must be + * partitioned. + */ + if (pt->relkind == RELKIND_RELATION && + local_relkind == RELKIND_PARTITIONED_TABLE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot use relation \"%s.%s\" as logical replication target", + rv->schemaname, rv->relname), + errdetail("\"%s.%s\" is a partitioned table whereas it is a regular table on publication server.", + rv->schemaname, rv->relname))); + + /* + * A partitioned table doesn't need local state, because the + * state is managed for individual partitions instead. + */ + if (pt->relkind == RELKIND_PARTITIONED_TABLE) + continue; + AddSubscriptionRelState(subid, relid, table_state, InvalidXLogRecPtr); } @@ -530,7 +566,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) (errmsg("could not connect to the publisher: %s", err))); /* Get the table list from publisher. */ - pubrel_names = fetch_table_list(wrconn, sub->publications); + pubrel_names = fetch_publication_tables(wrconn, sub->publications); /* We are done with the remote side, close connection. */ walrcv_disconnect(wrconn); @@ -565,15 +601,39 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) foreach(lc, pubrel_names) { - RangeVar *rv = (RangeVar *) lfirst(lc); + PublishedTable *pt = (PublishedTable *) lfirst(lc); + RangeVar *rv = pt->rv; Oid relid; + char local_relkind; relid = RangeVarGetRelid(rv, AccessShareLock, false); + local_relkind = get_rel_relkind(relid); /* Check for supported relkind. */ - CheckSubscriptionRelkind(get_rel_relkind(relid), + CheckSubscriptionRelkind(local_relkind, rv->schemaname, rv->relname); + /* + * Currently, partitioned table replication occurs between leaf + * partitions, so both the source and the target tables must be + * partitioned. + */ + if (pt->relkind == RELKIND_RELATION && + local_relkind == RELKIND_PARTITIONED_TABLE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot use relation \"%s.%s\" as logical replication target", + rv->schemaname, rv->relname), + errdetail("\"%s.%s\" is a partitioned table whereas it is a regular table on publication server.", + rv->schemaname, rv->relname))); + + /* + * A partitioned table doesn't need local state, because the + * state is managed for individual partitions instead. + */ + if (pt->relkind == RELKIND_PARTITIONED_TABLE) + continue; + pubrel_local_oids[off++] = relid; if (!bsearch(&relid, subrel_local_oids, @@ -1121,15 +1181,17 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId) /* * Get the list of tables which belong to specified publications on the - * publisher connection. + * publisher connection to create a subscription state (pg_subscription_rel + * entry) for each. For partitioned tables, subscription state is maintained + * per partition, so partitions are fetched too. */ static List * -fetch_table_list(WalReceiverConn *wrconn, List *publications) +fetch_publication_tables(WalReceiverConn *wrconn, List *publications) { WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; - Oid tableRow[2] = {TEXTOID, TEXTOID}; + Oid tableRow[3] = {TEXTOID, TEXTOID, CHAROID}; ListCell *lc; bool first; List *tablelist = NIL; @@ -1137,9 +1199,30 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) Assert(list_length(publications) > 0); initStringInfo(&cmd); - appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n" + appendStringInfoString(&cmd, "SELECT DISTINCT s.schemaname, s.tablename, s.relkind FROM (\n" + " SELECT t.pubname, t.schemaname, t.tablename, c.relkind\n" " FROM pg_catalog.pg_publication_tables t\n" - " WHERE t.pubname IN ("); + " JOIN pg_catalog.pg_class c \n" + " ON t.schemaname = c.relnamespace::pg_catalog.regnamespace::name\n" + " AND t.tablename = c.relname \n"); + + /* + * As of v13, partitioned tables can be published, although their changes + * are published as their partitions', so we will need the partitions in + * the result. + */ + if (walrcv_server_version(wrconn) >= 130000) + appendStringInfoString(&cmd, " UNION\n" + " SELECT t.pubname, s.schemaname, s.tablename, s.relkind\n" + " FROM pg_catalog.pg_publication_tables t,\n" + " LATERAL (SELECT c.relnamespace::regnamespace::name, c.relname, c.relkind\n" + " FROM pg_class c\n" + " JOIN pg_partition_tree(t.schemaname || '.' || t.tablename) p\n" + " ON p.relid = c.oid\n" + " WHERE p.level > 0) AS s(schemaname, tablename, relkind)\n"); + + appendStringInfoString(&cmd, ") s WHERE s.pubname IN ("); + first = true; foreach(lc, publications) { @@ -1154,7 +1237,7 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) } appendStringInfoChar(&cmd, ')'); - res = walrcv_exec(wrconn, cmd.data, 2, tableRow); + res = walrcv_exec(wrconn, cmd.data, 3, tableRow); pfree(cmd.data); if (res->status != WALRCV_OK_TUPLES) @@ -1169,15 +1252,17 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) char *nspname; char *relname; bool isnull; - RangeVar *rv; + PublishedTable *pt = palloc(sizeof(PublishedTable)); nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); Assert(!isnull); relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); Assert(!isnull); + pt->rv = makeRangeVar(pstrdup(nspname), pstrdup(relname), -1); + pt->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull)); + Assert(!isnull); - rv = makeRangeVar(pstrdup(nspname), pstrdup(relname), -1); - tablelist = lappend(tablelist, rv); + tablelist = lappend(tablelist, pt); ExecClearTuple(slot); } diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 4181a7e343..96671ca49e 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -1073,7 +1073,9 @@ InitPlan(QueryDesc *queryDesc, int eflags) * CheckValidRowMarkRel. */ void -CheckValidResultRel(ResultRelInfo *resultRelInfo, CmdType operation) +CheckValidResultRel(ResultRelInfo *resultRelInfo, + ResultRelInfo *rootResultRelInfo, + CmdType operation) { Relation resultRel = resultRelInfo->ri_RelationDesc; TriggerDesc *trigDesc = resultRel->trigdesc; @@ -1083,7 +1085,8 @@ CheckValidResultRel(ResultRelInfo *resultRelInfo, CmdType operation) { case RELKIND_RELATION: case RELKIND_PARTITIONED_TABLE: - CheckCmdReplicaIdentity(resultRel, operation); + CheckCmdReplicaIdentity(resultRelInfo, rootResultRelInfo, + operation); break; case RELKIND_SEQUENCE: ereport(ERROR, diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c index c13b1d3501..2a639011b8 100644 --- a/src/backend/executor/execPartition.c +++ b/src/backend/executor/execPartition.c @@ -384,7 +384,8 @@ ExecFindPartition(ModifyTableState *mtstate, rri = elem->rri; /* Verify this ResultRelInfo allows INSERTs */ - CheckValidResultRel(rri, CMD_INSERT); + CheckValidResultRel(rri, rootResultRelInfo, + CMD_INSERT); /* Set up the PartitionRoutingInfo for it */ ExecInitRoutingInfo(mtstate, estate, proute, dispatch, @@ -529,7 +530,7 @@ ExecInitPartitionInfo(ModifyTableState *mtstate, EState *estate, * partition-key becomes a DELETE+INSERT operation, so this check is still * required when the operation is CMD_UPDATE. */ - CheckValidResultRel(leaf_part_rri, CMD_INSERT); + CheckValidResultRel(leaf_part_rri, rootResultRelInfo, CMD_INSERT); /* * Open partition indices. The user may have asked to check for conflicts diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index 582b0cb017..65bfb05df5 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -396,10 +396,10 @@ ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot) ResultRelInfo *resultRelInfo = estate->es_result_relation_info; Relation rel = resultRelInfo->ri_RelationDesc; - /* For now we support only tables. */ + /* For now we support only regular tables. */ Assert(rel->rd_rel->relkind == RELKIND_RELATION); - CheckCmdReplicaIdentity(rel, CMD_INSERT); + CheckCmdReplicaIdentity(resultRelInfo, NULL, CMD_INSERT); /* BEFORE ROW INSERT Triggers */ if (resultRelInfo->ri_TrigDesc && @@ -463,7 +463,7 @@ ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate, /* For now we support only tables. */ Assert(rel->rd_rel->relkind == RELKIND_RELATION); - CheckCmdReplicaIdentity(rel, CMD_UPDATE); + CheckCmdReplicaIdentity(resultRelInfo, NULL, CMD_UPDATE); /* BEFORE ROW UPDATE Triggers */ if (resultRelInfo->ri_TrigDesc && @@ -521,7 +521,7 @@ ExecSimpleRelationDelete(EState *estate, EPQState *epqstate, Relation rel = resultRelInfo->ri_RelationDesc; ItemPointer tid = &searchslot->tts_tid; - CheckCmdReplicaIdentity(rel, CMD_DELETE); + CheckCmdReplicaIdentity(resultRelInfo, NULL, CMD_DELETE); /* BEFORE ROW DELETE Triggers */ if (resultRelInfo->ri_TrigDesc && @@ -544,12 +544,17 @@ ExecSimpleRelationDelete(EState *estate, EPQState *epqstate, } /* - * Check if command can be executed with current replica identity. + * Check if command can be executed on 'target_rel' with its (or the + * ancestor's) current replica identity. */ void -CheckCmdReplicaIdentity(Relation rel, CmdType cmd) +CheckCmdReplicaIdentity(ResultRelInfo *target_rel, + ResultRelInfo *root_target_rel, + CmdType cmd) { PublicationActions *pubactions; + Relation rel = target_rel->ri_RelationDesc; + Relation rootrel = root_target_rel ? root_target_rel->ri_RelationDesc : NULL; /* We only need to do checks for UPDATE and DELETE. */ if (cmd != CMD_UPDATE && cmd != CMD_DELETE) @@ -563,9 +568,18 @@ CheckCmdReplicaIdentity(Relation rel, CmdType cmd) /* * This is either UPDATE OR DELETE and there is no replica identity. * - * Check if the table publishes UPDATES or DELETES. + * Check if the table or its root ancestor publishes UPDATES or DELETES. */ pubactions = GetRelationPublicationActions(rel); + if (rootrel) + { + PublicationActions *root_pubactions; + + root_pubactions = GetRelationPublicationActions(rootrel); + pubactions->pubupdate |= root_pubactions->pubupdate; + pubactions->pubdelete |= root_pubactions->pubdelete; + } + if (cmd == CMD_UPDATE && pubactions->pubupdate) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), @@ -591,17 +605,10 @@ CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname) { /* - * We currently only support writing to regular tables. However, give a - * more specific error for partitioned and foreign tables. + * We currently only support writing to regular and partitioned tables. + * However, give a more specific error for foreign tables. */ - if (relkind == RELKIND_PARTITIONED_TABLE) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot use relation \"%s.%s\" as logical replication target", - nspname, relname), - errdetail("\"%s.%s\" is a partitioned table.", - nspname, relname))); - else if (relkind == RELKIND_FOREIGN_TABLE) + if (relkind == RELKIND_FOREIGN_TABLE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot use relation \"%s.%s\" as logical replication target", @@ -609,7 +616,11 @@ CheckSubscriptionRelkind(char relkind, const char *nspname, errdetail("\"%s.%s\" is a foreign table.", nspname, relname))); - if (relkind != RELKIND_RELATION) + /* + * There are some unsupported cases with partitioned tables, but we leave + * it for the caller to report them. + */ + if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot use relation \"%s.%s\" as logical replication target", diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 59d1a31c97..63e108bb56 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -2268,6 +2268,7 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) int nplans = list_length(node->plans); ResultRelInfo *saved_resultRelInfo; ResultRelInfo *resultRelInfo; + ResultRelInfo *rootResultRelInfo = NULL; Plan *subplan; ListCell *l; int i; @@ -2295,8 +2296,11 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) /* If modifying a partitioned table, initialize the root table info */ if (node->rootResultRelIndex >= 0) + { mtstate->rootResultRelInfo = estate->es_root_result_relations + node->rootResultRelIndex; + rootResultRelInfo = mtstate->rootResultRelInfo; + } mtstate->mt_arowmarks = (List **) palloc0(sizeof(List *) * nplans); mtstate->mt_nplans = nplans; @@ -2330,7 +2334,7 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) /* * Verify result relation is a valid target for the current operation */ - CheckValidResultRel(resultRelInfo, operation); + CheckValidResultRel(resultRelInfo, rootResultRelInfo, operation); /* * If there are indices on the result relation, open them and save diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index f8183cd488..98825f01e9 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -761,6 +761,7 @@ copy_table(Relation rel) /* Map the publisher relation to local one. */ relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock); Assert(rel == relmapentry->localrel); + Assert(relmapentry->localrel->rd_rel->relkind == RELKIND_RELATION); /* Start copy on the publisher. */ initStringInfo(&cmd); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 752508213a..059d2c9194 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -50,7 +50,12 @@ static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); -/* Entry in the map used to remember which relation schemas we sent. */ +/* + * Entry in the map used to remember which relation schemas we sent. + * + * For partitions, 'pubactions' considers not only the table's own + * publications, but also those of all of its ancestors. + */ typedef struct RelationSyncEntry { Oid relid; /* relation oid */ @@ -63,7 +68,7 @@ typedef struct RelationSyncEntry static HTAB *RelationSyncCache = NULL; static void init_rel_sync_cache(MemoryContext decoding_context); -static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid); +static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Relation rel); static void rel_sync_cache_relation_cb(Datum arg, Oid relid); static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue); @@ -311,7 +316,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (!is_publishable_relation(relation)) return; - relentry = get_rel_sync_entry(data, RelationGetRelid(relation)); + relentry = get_rel_sync_entry(data, relation); /* First check the table filter */ switch (change->action) @@ -401,7 +406,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (!is_publishable_relation(relation)) continue; - relentry = get_rel_sync_entry(data, relid); + relentry = get_rel_sync_entry(data, relation); if (!relentry->pubactions.pubtruncate) continue; @@ -526,8 +531,9 @@ init_rel_sync_cache(MemoryContext cachectx) * Find or create entry in the relation schema cache. */ static RelationSyncEntry * -get_rel_sync_entry(PGOutputData *data, Oid relid) +get_rel_sync_entry(PGOutputData *data, Relation rel) { + Oid relid = RelationGetRelid(rel); RelationSyncEntry *entry; bool found; MemoryContext oldctx; @@ -546,7 +552,9 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) if (!found || !entry->replicate_valid) { List *pubids = GetRelationPublications(relid); - ListCell *lc; + ListCell *lc, + *lc1; + List *ancestor_pubids = NIL; /* Reload publications if needed before use. */ if (!publications_valid) @@ -568,6 +576,11 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->pubactions.pubinsert = entry->pubactions.pubupdate = entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; + /* For partitions, also consider publications of ancestors. */ + if (rel->rd_rel->relispartition) + ancestor_pubids = + GetRelationAncestorPublications(RelationGetRelid(rel)); + foreach(lc, data->publications) { Publication *pub = lfirst(lc); @@ -580,12 +593,28 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; } + if (entry->pubactions.pubinsert && entry->pubactions.pubupdate && + entry->pubactions.pubdelete && entry->pubactions.pubtruncate) + break; + + foreach(lc1, ancestor_pubids) + { + if (lfirst_oid(lc1) == pub->oid) + { + entry->pubactions.pubinsert |= pub->pubactions.pubinsert; + entry->pubactions.pubupdate |= pub->pubactions.pubupdate; + entry->pubactions.pubdelete |= pub->pubactions.pubdelete; + entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; + } + } + if (entry->pubactions.pubinsert && entry->pubactions.pubupdate && entry->pubactions.pubdelete && entry->pubactions.pubtruncate) break; } list_free(pubids); + list_free(ancestor_pubids); entry->replicate_valid = true; } diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 799b6988b7..dc33c20048 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -3969,8 +3969,12 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) { TableInfo *tbinfo = &tblinfo[i]; - /* Only plain tables can be aded to publications. */ - if (tbinfo->relkind != RELKIND_RELATION) + /* + * Only regular and partitioned tables can be added to + * publications. + */ + if (tbinfo->relkind != RELKIND_RELATION && + tbinfo->relkind != RELKIND_PARTITIONED_TABLE) continue; /* diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 6cdc2b1197..3cfb31c2e6 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -80,6 +80,7 @@ typedef struct Publication extern Publication *GetPublication(Oid pubid); extern Publication *GetPublicationByName(const char *pubname, bool missing_ok); extern List *GetRelationPublications(Oid relid); +extern List *GetRelationAncestorPublications(Oid relid); extern List *GetPublicationRelations(Oid pubid); extern List *GetAllTablesPublications(void); extern List *GetAllTablesPublicationRelations(void); diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 6ef3e1fe06..5b97bb5d57 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -179,7 +179,9 @@ extern void ExecutorEnd(QueryDesc *queryDesc); extern void standard_ExecutorEnd(QueryDesc *queryDesc); extern void ExecutorRewind(QueryDesc *queryDesc); extern bool ExecCheckRTPerms(List *rangeTable, bool ereport_on_violation); -extern void CheckValidResultRel(ResultRelInfo *resultRelInfo, CmdType operation); +extern void CheckValidResultRel(ResultRelInfo *resultRelInfo, + ResultRelInfo *rootResultRelInfo, + CmdType operation); extern void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, @@ -592,7 +594,9 @@ extern void ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot); extern void ExecSimpleRelationDelete(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot); -extern void CheckCmdReplicaIdentity(Relation rel, CmdType cmd); +extern void CheckCmdReplicaIdentity(ResultRelInfo *target_rel, + ResultRelInfo *root_target_rel, + CmdType cmd); extern void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname); diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index feb51e4add..e3fabe70f9 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -116,6 +116,22 @@ Tables: DROP TABLE testpub_tbl3, testpub_tbl3a; DROP PUBLICATION testpub3, testpub4; +-- Tests for partitioned tables +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_forparted; +RESET client_min_messages; +-- should add only the parent to publication, not the partition +CREATE TABLE testpub_parted1 PARTITION OF testpub_parted FOR VALUES IN (1); +ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted; +\dRp+ testpub_forparted + Publication testpub_forparted + Owner | All tables | Inserts | Updates | Deletes | Truncates +--------------------------+------------+---------+---------+---------+----------- + regress_publication_user | f | t | t | t | t +Tables: + "public.testpub_parted" + +DROP PUBLICATION testpub_forparted; -- fail - view CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view; ERROR: "testpub_view" is not a table @@ -142,11 +158,6 @@ Tables: ALTER PUBLICATION testpub_default ADD TABLE testpub_view; ERROR: "testpub_view" is not a table DETAIL: Only tables can be added to publications. --- fail - partitioned table -ALTER PUBLICATION testpub_fortbl ADD TABLE testpub_parted; -ERROR: "testpub_parted" is a partitioned table -DETAIL: Adding partitioned tables to publications is not supported. -HINT: You can add the table partitions individually. ALTER PUBLICATION testpub_default ADD TABLE testpub_tbl1; ALTER PUBLICATION testpub_default SET TABLE testpub_tbl1; ALTER PUBLICATION testpub_default ADD TABLE pub_test.testpub_nopk; diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index 5773a755cf..b79a3f8f8f 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -69,6 +69,16 @@ RESET client_min_messages; DROP TABLE testpub_tbl3, testpub_tbl3a; DROP PUBLICATION testpub3, testpub4; +-- Tests for partitioned tables +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_forparted; +RESET client_min_messages; +-- should add only the parent to publication, not the partition +CREATE TABLE testpub_parted1 PARTITION OF testpub_parted FOR VALUES IN (1); +ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted; +\dRp+ testpub_forparted +DROP PUBLICATION testpub_forparted; + -- fail - view CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view; SET client_min_messages = 'ERROR'; @@ -83,8 +93,6 @@ CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1; -- fail - view ALTER PUBLICATION testpub_default ADD TABLE testpub_view; --- fail - partitioned table -ALTER PUBLICATION testpub_fortbl ADD TABLE testpub_parted; ALTER PUBLICATION testpub_default ADD TABLE testpub_tbl1; ALTER PUBLICATION testpub_default SET TABLE testpub_tbl1; diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl new file mode 100644 index 0000000000..eb0f1cd6a8 --- /dev/null +++ b/src/test/subscription/t/013_partition.pl @@ -0,0 +1,161 @@ +# Test PARTITION +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 10; + +# setup + +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +my $node_subscriber1 = get_new_node('subscriber1'); +$node_subscriber1->init(allows_streaming => 'logical'); +$node_subscriber1->start; + +my $node_subscriber2 = get_new_node('subscriber2'); +$node_subscriber2->init(allows_streaming => 'logical'); +$node_subscriber2->start; + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)"); + +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, b text, c text) PARTITION BY LIST (a)"); + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1_1 (b text, a int NOT NULL)"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3)"); + +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab1_1 (b text, c text DEFAULT 'sub1_tab1', a int NOT NULL)"); +$node_subscriber1->safe_psql('postgres', + "ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3, 4)"); + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1_2 PARTITION OF tab1 FOR VALUES IN (5, 6)"); + +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab1_2 PARTITION OF tab1 (c DEFAULT 'sub1_tab1') FOR VALUES IN (5, 6)"); + +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1_2 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1_2', b text)"); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub1 FOR TABLE tab1, tab1_1"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub2 FOR TABLE tab1_2"); + +$node_subscriber1->safe_psql('postgres', + "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1"); + +$node_subscriber2->safe_psql('postgres', + "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub2"); + +# Wait for initial sync of all subscriptions +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber1->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber2->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# insert data (some into the root parent and some directly into partitions) + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab1 VALUES (1)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab1_1 (a) VALUES (3)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab1_2 VALUES (5)"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +my $result = $node_subscriber1->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1"); +is($result, qq(sub1_tab1|3|1|5), 'insert into tab1_1, tab1_2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab1_2 GROUP BY 1"); +is($result, qq(sub2_tab1_2|1|5|5), 'inserts into tab1_2 replicated'); + +# update a row (no partition change) + +$node_publisher->safe_psql('postgres', + "UPDATE tab1 SET a = 2 WHERE a = 1"); + +$node_publisher->wait_for_catchup('sub1'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1"); +is($result, qq(sub1_tab1|3|2|5), 'update of tab1_1 replicated'); + +# update a row (partition changes) + +$node_publisher->safe_psql('postgres', + "UPDATE tab1 SET a = 6 WHERE a = 2"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1"); +is($result, qq(sub1_tab1|3|3|6), 'delete from tab1_1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab1_2 GROUP BY 1"); +is($result, qq(sub2_tab1_2|2|5|6), 'insert into tab1_2 replicated'); + +# delete rows (some from the root parent, some directly from the partition) + +$node_publisher->safe_psql('postgres', + "DELETE FROM tab1 WHERE a IN (3, 5)"); +$node_publisher->safe_psql('postgres', + "DELETE FROM tab1_2"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1"); +is($result, qq(0||), 'delete from tab1_1, tab_2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1_2"); +is($result, qq(0||), 'delete from tab1_2 replicated'); + +# truncate (root parent and partition directly) + +$node_subscriber1->safe_psql('postgres', + "INSERT INTO tab1 VALUES (1), (2), (5)"); +$node_subscriber2->safe_psql('postgres', + "INSERT INTO tab1_2 VALUES (5)"); + +$node_publisher->safe_psql('postgres', + "TRUNCATE tab1_2"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1"); +is($result, qq(2|1|2), 'truncate of tab_2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1_2"); +is($result, qq(0||), 'truncate of tab1_2 replicated'); + +$node_publisher->safe_psql('postgres', + "TRUNCATE tab1"); + +$node_publisher->wait_for_catchup('sub1'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1"); +is($result, qq(0||), 'truncate of tab1_1 replicated'); -- 2.16.5