From 365b81efed48fb0b1cdb6708fc3f4a9b82a84a22 Mon Sep 17 00:00:00 2001 From: amit Date: Wed, 13 Nov 2019 17:18:51 +0900 Subject: [PATCH v8 4/4] Publish partitioned table inserts as its own --- doc/src/sgml/logical-replication.sgml | 11 +- src/backend/catalog/pg_publication.c | 11 +- src/backend/commands/subscriptioncmds.c | 103 +++++----- src/backend/executor/nodeModifyTable.c | 2 + src/backend/replication/logical/tablesync.c | 28 ++- src/backend/replication/logical/worker.c | 289 ++++++++++++++++++++++++++-- src/backend/replication/pgoutput/pgoutput.c | 191 ++++++++++++++---- src/include/catalog/pg_publication.h | 2 +- src/test/subscription/t/013_partition.pl | 48 ++++- 9 files changed, 558 insertions(+), 127 deletions(-) diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 4584cb82f6..1a4d5a9d25 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -402,14 +402,9 @@ - Replication is only supported by regular and partitioned tables, although - the type of the table must match between the two servers, that is, one - cannot replicate from a regular table into a partitioned able or vice - versa. Also, when replicating between partitioned tables, the actual - replication occurs between leaf partitions, so the partitions on the two - servers must match one-to-one. Attempts to replicate other types of - relations such as views, materialized views, or foreign tables, will - result in an error. + Replication is only supported by regular and partitioned tables. + Attempts to replicate other types of relations such as + views, materialized views, or foreign tables, will result in an error. diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 6d2911d18f..d47461f763 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -243,20 +243,29 @@ GetRelationPublications(Oid relid) /* * Finds all publications that publish changes to the input relation's * ancestors. + * + * *published_ancestors will contain one OID for each publication returned, + * of the ancestor which belongs to it. Values in this list can be repeated, + * because a given ancestor may belong to multiple publications. */ List * -GetRelationAncestorPublications(Oid relid) +GetRelationAncestorPublications(Oid relid, List **published_ancestors) { List *ancestors = get_partition_ancestors(relid); List *ancestor_pubids = NIL; ListCell *lc; + *published_ancestors = NIL; foreach(lc, ancestors) { Oid ancestor = lfirst_oid(lc); List *rel_publishers = GetRelationPublications(ancestor); + int n = list_length(rel_publishers), + i; ancestor_pubids = list_concat_copy(ancestor_pubids, rel_publishers); + for (i = 0; i < n; i++) + *published_ancestors = lappend_oid(*published_ancestors, ancestor); } return ancestor_pubids; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 786b15eb27..2a45aff445 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -54,6 +54,15 @@ typedef struct PublishedTable RangeVar *rv; char relkind; + + /* + * If the published table is partitioned, the following being true means + * its changes are published using own schema rather than the schema of + * its individual partitions. In the latter case, a separate + * PublicationTable instance (and hence pg_subscription_rel entry) for + * each partition will be needed. + */ + bool published_using_root_schema; } PublishedTable; static List *fetch_publication_tables(WalReceiverConn *wrconn, List *publications); @@ -481,24 +490,13 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) rv->schemaname, rv->relname); /* - * Currently, partitioned table replication occurs between leaf - * partitions, so both the source and the target tables must be - * partitioned. + * A partitioned table doesn't need local state if the state + * is managed for individual partitions, which is the case if + * the partitioned table is published using the schema of its + * partitions. */ - if (pt->relkind == RELKIND_RELATION && - local_relkind == RELKIND_PARTITIONED_TABLE) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot use relation \"%s.%s\" as logical replication target", - rv->schemaname, rv->relname), - errdetail("\"%s.%s\" is a partitioned table whereas it is a regular table on publication server.", - rv->schemaname, rv->relname))); - - /* - * A partitioned table doesn't need local state, because the - * state is managed for individual partitions instead. - */ - if (pt->relkind == RELKIND_PARTITIONED_TABLE) + if (pt->relkind == RELKIND_PARTITIONED_TABLE && + !pt->published_using_root_schema) continue; AddSubscriptionRelState(subid, relid, table_state, @@ -614,24 +612,12 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) rv->schemaname, rv->relname); /* - * Currently, partitioned table replication occurs between leaf - * partitions, so both the source and the target tables must be - * partitioned. + * A partitioned table doesn't need local state if the state is + * managed for individual partitions, which is the case if the + * partitioned table is published using the schema of its partitions. */ - if (pt->relkind == RELKIND_RELATION && - local_relkind == RELKIND_PARTITIONED_TABLE) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot use relation \"%s.%s\" as logical replication target", - rv->schemaname, rv->relname), - errdetail("\"%s.%s\" is a partitioned table whereas it is a regular table on publication server.", - rv->schemaname, rv->relname))); - - /* - * A partitioned table doesn't need local state, because the - * state is managed for individual partitions instead. - */ - if (pt->relkind == RELKIND_PARTITIONED_TABLE) + if (pt->relkind == RELKIND_PARTITIONED_TABLE && + !pt->published_using_root_schema) continue; pubrel_local_oids[off++] = relid; @@ -1191,7 +1177,7 @@ fetch_publication_tables(WalReceiverConn *wrconn, List *publications) WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; - Oid tableRow[3] = {TEXTOID, TEXTOID, CHAROID}; + Oid tableRow[4] = {TEXTOID, TEXTOID, CHAROID, BOOLOID}; ListCell *lc; bool first; List *tablelist = NIL; @@ -1199,27 +1185,41 @@ fetch_publication_tables(WalReceiverConn *wrconn, List *publications) Assert(list_length(publications) > 0); initStringInfo(&cmd); - appendStringInfoString(&cmd, "SELECT DISTINCT s.schemaname, s.tablename, s.relkind FROM (\n" - " SELECT t.pubname, t.schemaname, t.tablename, c.relkind\n" - " FROM pg_catalog.pg_publication_tables t\n" - " JOIN pg_catalog.pg_class c \n" - " ON t.schemaname = c.relnamespace::pg_catalog.regnamespace::name\n" - " AND t.tablename = c.relname \n"); + appendStringInfoString(&cmd, "SELECT DISTINCT s.schemaname, s.tablename, s.relkind, s.pubasroot FROM (\n"); /* * As of v13, partitioned tables can be published, although their changes - * are published as their partitions', so we will need the partitions in - * the result. + * may be published either as their own or as their partitions', which is + * checked with pg_publication.pubasroot (whether the publication publishes + * using root partitioned table's schema). + */ + if (walrcv_server_version(wrconn) >= 130000) + appendStringInfoString(&cmd, " SELECT t.pubname, t.schemaname, t.tablename, c.relkind, p.pubasroot\n"); + else + appendStringInfoString(&cmd, " SELECT t.pubname, t.schemaname, t.tablename, c.relkind, false AS pubasroot\n"); + + appendStringInfoString(&cmd, " FROM pg_catalog.pg_publication_tables t\n" + " JOIN pg_catalog.pg_publication p ON t.pubname = p.pubname\n" + " JOIN pg_catalog.pg_class c\n" + " ON t.schemaname = c.relnamespace::pg_catalog.regnamespace::pg_catalog.name\n" + " AND t.tablename = c.relname\n"); + + /* + * If publication doesn't publish using the root table's schema, we will + * need partitions in the result. */ if (walrcv_server_version(wrconn) >= 130000) appendStringInfoString(&cmd, " UNION\n" - " SELECT t.pubname, s.schemaname, s.tablename, s.relkind\n" - " FROM pg_catalog.pg_publication_tables t,\n" - " LATERAL (SELECT c.relnamespace::regnamespace::name, c.relname, c.relkind\n" - " FROM pg_class c\n" - " JOIN pg_partition_tree(t.schemaname || '.' || t.tablename) p\n" - " ON p.relid = c.oid\n" - " WHERE p.level > 0) AS s(schemaname, tablename, relkind)\n"); + " SELECT DISTINCT t.pubname, s.schemaname, s.tablename, c.relkind, false AS pubasroot\n" + " FROM pg_catalog.pg_publication_tables t\n" + " JOIN pg_catalog.pg_publication p ON t.pubname = p.pubname AND NOT p.pubasroot,\n" + " LATERAL (SELECT c.relnamespace::pg_catalog.regnamespace::pg_catalog.name, c.relname\n" + " FROM pg_catalog.pg_class c\n" + " JOIN pg_catalog.pg_partition_tree(t.schemaname || '.' || t.tablename) p\n" + " ON p.relid = c.oid\n" + " WHERE p.level > 0) AS s(schemaname, tablename)\n" + " JOIN pg_catalog.pg_class c ON s.schemaname = c.relnamespace::pg_catalog.regnamespace::pg_catalog.name\n" + " AND s.tablename = c.relname\n"); appendStringInfoString(&cmd, ") s WHERE s.pubname IN ("); @@ -1237,7 +1237,7 @@ fetch_publication_tables(WalReceiverConn *wrconn, List *publications) } appendStringInfoChar(&cmd, ')'); - res = walrcv_exec(wrconn, cmd.data, 3, tableRow); + res = walrcv_exec(wrconn, cmd.data, 4, tableRow); pfree(cmd.data); if (res->status != WALRCV_OK_TUPLES) @@ -1260,6 +1260,7 @@ fetch_publication_tables(WalReceiverConn *wrconn, List *publications) Assert(!isnull); pt->rv = makeRangeVar(pstrdup(nspname), pstrdup(relname), -1); pt->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull)); + pt->published_using_root_schema = DatumGetBool(slot_getattr(slot, 4, &isnull)); Assert(!isnull); tablelist = lappend(tablelist, pt); diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 63e108bb56..5b7265939f 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -2299,6 +2299,8 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) { mtstate->rootResultRelInfo = estate->es_root_result_relations + node->rootResultRelIndex; + CheckValidResultRel(mtstate->rootResultRelInfo, + mtstate->rootResultRelInfo, operation); rootResultRelInfo = mtstate->rootResultRelInfo; } diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 98825f01e9..6a18b78f22 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -630,16 +630,17 @@ copy_read_data(void *outbuf, int minread, int maxread) /* * Get information about remote relation in similar fashion the RELATION - * message provides during replication. + * message provides during replication. XXX - while we fetch relkind too + * here, the RELATION message doesn't provide it */ static void fetch_remote_table_info(char *nspname, char *relname, - LogicalRepRelation *lrel) + LogicalRepRelation *lrel, char *relkind) { WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; - Oid tableRow[2] = {OIDOID, CHAROID}; + Oid tableRow[3] = {OIDOID, CHAROID, CHAROID}; Oid attrRow[4] = {TEXTOID, OIDOID, INT4OID, BOOLOID}; bool isnull; int natt; @@ -649,16 +650,16 @@ fetch_remote_table_info(char *nspname, char *relname, /* First fetch Oid and replica identity. */ initStringInfo(&cmd); - appendStringInfo(&cmd, "SELECT c.oid, c.relreplident" + appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind" " FROM pg_catalog.pg_class c" " INNER JOIN pg_catalog.pg_namespace n" " ON (c.relnamespace = n.oid)" " WHERE n.nspname = %s" " AND c.relname = %s" - " AND c.relkind = 'r'", + " AND pg_relation_is_publishable(c.oid)", quote_literal_cstr(nspname), quote_literal_cstr(relname)); - res = walrcv_exec(wrconn, cmd.data, 2, tableRow); + res = walrcv_exec(wrconn, cmd.data, 3, tableRow); if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, @@ -675,6 +676,8 @@ fetch_remote_table_info(char *nspname, char *relname, Assert(!isnull); lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull)); Assert(!isnull); + *relkind = DatumGetChar(slot_getattr(slot, 3, &isnull)); + Assert(!isnull); ExecDropSingleTupleTableSlot(slot); walrcv_clear_result(res); @@ -750,10 +753,12 @@ copy_table(Relation rel) CopyState cstate; List *attnamelist; ParseState *pstate; + char remote_relkind; /* Get the publisher relation info. */ fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), - RelationGetRelationName(rel), &lrel); + RelationGetRelationName(rel), &lrel, + &remote_relkind); /* Put the relation into relmap. */ logicalrep_relmap_update(&lrel); @@ -761,12 +766,15 @@ copy_table(Relation rel) /* Map the publisher relation to local one. */ relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock); Assert(rel == relmapentry->localrel); - Assert(relmapentry->localrel->rd_rel->relkind == RELKIND_RELATION); /* Start copy on the publisher. */ initStringInfo(&cmd); - appendStringInfo(&cmd, "COPY %s TO STDOUT", - quote_qualified_identifier(lrel.nspname, lrel.relname)); + if (remote_relkind == RELKIND_PARTITIONED_TABLE) + appendStringInfo(&cmd, "COPY (SELECT * FROM %s) TO STDOUT", + quote_qualified_identifier(lrel.nspname, lrel.relname)); + else + appendStringInfo(&cmd, "COPY %s TO STDOUT", + quote_qualified_identifier(lrel.nspname, lrel.relname)); res = walrcv_exec(wrconn, cmd.data, 0, NULL); pfree(cmd.data); if (res->status != WALRCV_OK_COPY_OUT) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 34b0ac78cc..ec34418f75 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -29,11 +29,14 @@ #include "access/xlog_internal.h" #include "catalog/catalog.h" #include "catalog/namespace.h" +#include "catalog/partition.h" +#include "catalog/pg_inherits.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "commands/tablecmds.h" #include "commands/trigger.h" #include "executor/executor.h" +#include "executor/execPartition.h" #include "executor/nodeModifyTable.h" #include "funcapi.h" #include "libpq/pqformat.h" @@ -722,6 +725,180 @@ apply_handle_do_delete(ResultRelInfo *relinfo, EState *estate, EvalPlanQualEnd(&epqstate); } +/* + * This handles insert, update, delete on a partitioned table. + */ +static void +apply_handle_tuple_routing(ResultRelInfo *relinfo, + LogicalRepRelMapEntry *relmapentry, + EState *estate, CmdType operation, + TupleTableSlot *remoteslot, + LogicalRepTupleData *newtup) +{ + Relation rel = relinfo->ri_RelationDesc; + ModifyTableState *mtstate = NULL; + PartitionTupleRouting *proute = NULL; + ResultRelInfo *partrelinfo; + TupleTableSlot *localslot; + PartitionRoutingInfo *partinfo; + TupleConversionMap *map; + MemoryContext oldctx; + + /* ModifyTableState is needed for ExecFindPartition(). */ + mtstate = makeNode(ModifyTableState); + mtstate->ps.plan = NULL; + mtstate->ps.state = estate; + mtstate->operation = operation; + mtstate->resultRelInfo = relinfo; + proute = ExecSetupPartitionTupleRouting(estate, mtstate, rel); + + /* + * Find a partition for the tuple contained in remoteslot. + * + * For insert, remoteslot is tuple to insert. For update and delete, it + * is the tuple to be replaced and deleted, respectively. + */ + Assert(remoteslot != NULL); + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + /* The following throws error if a suitable partition is not found. */ + partrelinfo = ExecFindPartition(mtstate, relinfo, proute, + remoteslot, estate); + Assert(partrelinfo != NULL); + /* Convert the tuple to match the partition's rowtype. */ + partinfo = partrelinfo->ri_PartitionInfo; + map = partinfo->pi_RootToPartitionMap; + if (map != NULL) + { + TupleTableSlot *part_slot = partinfo->pi_PartitionTupleSlot; + + remoteslot = execute_attr_map_slot(map->attrMap, remoteslot, + part_slot); + } + MemoryContextSwitchTo(oldctx); + + switch (operation) + { + case CMD_INSERT: + /* Just insert into the partition. */ + estate->es_result_relation_info = partrelinfo; + apply_handle_do_insert(partrelinfo, estate, remoteslot); + break; + + case CMD_DELETE: + /* Just delete from the partition. */ + estate->es_result_relation_info = partrelinfo; + apply_handle_do_delete(partrelinfo, estate, remoteslot, + &relmapentry->remoterel); + break; + + case CMD_UPDATE: + { + ResultRelInfo *partrelinfo_new; + + /* + * partrelinfo computed above is the partition which might + * contain the search tuple. Now find the partition for the + * replacement tuple, which might not be the same as + * partrelinfo. + */ + localslot = table_slot_create(rel, &estate->es_tupleTable); + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + slot_modify_cstrings(localslot, remoteslot, + newtup->values, newtup->changed, + relmapentry->attrmap, + &relmapentry->remoterel, + RelationGetRelid(rel)); + partrelinfo_new = ExecFindPartition(mtstate, relinfo, proute, + localslot, estate); + MemoryContextSwitchTo(oldctx); + + /* + * If both search and replacement tuple would be in the same + * partition, we can apply this as an UPDATE on the parttion. + */ + if (partrelinfo == partrelinfo_new) + { + AttrMap *attrmap = relmapentry->attrmap, + *new_attrmap = NULL; + + /* + * If the partition's attributes don't match the root + * relation's, we'll need to make a new attrmap which maps + * partition attribute numbers to remoterel's, instead + * the original which maps root relation's attribute + * numbers to remoterel's. + */ + if (map) + { + TupleDesc partdesc = RelationGetDescr(partrelinfo->ri_RelationDesc); + TupleDesc rootdesc = RelationGetDescr(rel); + AttrMap *partToRootMap; + AttrNumber attno; + + /* Need the reverse map here */ + partToRootMap = build_attrmap_by_name(partdesc, rootdesc); + new_attrmap = make_attrmap(partdesc->natts); + memset(new_attrmap->attnums, -1, + new_attrmap->maplen * sizeof(AttrNumber)); + for (attno = 0; attno < new_attrmap->maplen; attno++) + { + AttrNumber root_attno = partToRootMap->attnums[attno]; + + new_attrmap->attnums[attno] = attrmap->attnums[root_attno - 1]; + } + attrmap = new_attrmap; + } + + /* UPDATE partition. */ + estate->es_result_relation_info = partrelinfo; + apply_handle_do_update(partrelinfo, estate, remoteslot, + newtup, attrmap, + &relmapentry->remoterel); + if (new_attrmap) + free_attrmap(new_attrmap); + } + else + { + /* + * Different, so handle this as DELETE followed by INSERT. + */ + + /* DELETE from partition partrelinfo. */ + estate->es_result_relation_info = partrelinfo; + apply_handle_do_delete(partrelinfo, estate, remoteslot, + &relmapentry->remoterel); + + /* + * Convert the replacement tuple to match the destination + * partition rowtype. + */ + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + partinfo = partrelinfo_new->ri_PartitionInfo; + map = partinfo->pi_RootToPartitionMap; + if (map != NULL) + { + TupleTableSlot *part_slot = partinfo->pi_PartitionTupleSlot; + + localslot = execute_attr_map_slot(map->attrMap, localslot, + part_slot); + } + MemoryContextSwitchTo(oldctx); + /* INSERT into partition partrelinfo_new. */ + estate->es_result_relation_info = partrelinfo_new; + apply_handle_do_insert(partrelinfo_new, estate, + localslot); + } + } + break; + + default: + elog(ERROR, "unrecognized CmdType: %d", (int) operation); + break; + } + + ExecCleanupTupleRouting(mtstate, proute); +} + /* * Handle INSERT message. */ @@ -764,9 +941,13 @@ apply_handle_insert(StringInfo s) slot_fill_defaults(rel, estate, remoteslot); MemoryContextSwitchTo(oldctx); - Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION); - apply_handle_do_insert(estate->es_result_relation_info, estate, - remoteslot); + /* For a partitioned table, insert the tuple into a partition. */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + apply_handle_tuple_routing(estate->es_result_relation_info, rel, + estate, CMD_INSERT, remoteslot, NULL); + else + apply_handle_do_insert(estate->es_result_relation_info, estate, + remoteslot); PopActiveSnapshot(); @@ -879,10 +1060,14 @@ apply_handle_update(StringInfo s) has_oldtup ? oldtup.values : newtup.values); MemoryContextSwitchTo(oldctx); - Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION); - apply_handle_do_update(estate->es_result_relation_info, estate, - remoteslot, &newtup, rel->attrmap, - &rel->remoterel); + /* For a partitioned table, apply update to correct partition. */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + apply_handle_tuple_routing(estate->es_result_relation_info, rel, + estate, CMD_UPDATE, remoteslot, &newtup); + else + apply_handle_do_update(estate->es_result_relation_info, estate, + remoteslot, &newtup, rel->attrmap, + &rel->remoterel); PopActiveSnapshot(); @@ -944,9 +1129,13 @@ apply_handle_delete(StringInfo s) slot_fill_defaults(rel, estate, remoteslot); MemoryContextSwitchTo(oldctx); - Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION); - apply_handle_do_delete(estate->es_result_relation_info, estate, - remoteslot, &rel->remoterel); + /* For a partitioned table, apply delete to correct partition. */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + apply_handle_tuple_routing(estate->es_result_relation_info, rel, + estate, CMD_DELETE, remoteslot, NULL); + else + apply_handle_do_delete(estate->es_result_relation_info, estate, + remoteslot, &rel->remoterel); PopActiveSnapshot(); @@ -988,14 +1177,43 @@ apply_handle_truncate(StringInfo s) LogicalRepRelMapEntry *rel; rel = logicalrep_rel_open(relid, RowExclusiveLock); + if (!should_apply_changes_for_rel(rel)) { + bool really_skip = true; + + /* + * If we seem to have gotten sent a leaf partition because an + * ancestor was truncated, confirm before proceeding with + * truncating the partition that an ancestor indeed has a valid + * subscription state. + */ + if (rel->state == SUBREL_STATE_UNKNOWN && + rel->localrel->rd_rel->relispartition) + { + List *ancestors = get_partition_ancestors(rel->localreloid); + ListCell *lc1; + + foreach(lc1, ancestors) + { + Oid anc_oid = lfirst_oid(lc1); + LogicalRepRelMapEntry *anc_rel; + + anc_rel = logicalrep_rel_open(anc_oid, RowExclusiveLock); + really_skip &= !should_apply_changes_for_rel(anc_rel); + logicalrep_rel_close(anc_rel, RowExclusiveLock); + } + } + /* * The relation can't become interesting in the middle of the * transaction so it's safe to unlock it. */ - logicalrep_rel_close(rel, RowExclusiveLock); - continue; + if (really_skip) + { + logicalrep_rel_close(rel, RowExclusiveLock); + continue; + } } remote_rels = lappend(remote_rels, rel); @@ -1003,6 +1221,47 @@ apply_handle_truncate(StringInfo s) relids = lappend_oid(relids, rel->localreloid); if (RelationIsLogicallyLogged(rel->localrel)) relids_logged = lappend_oid(relids_logged, rel->localreloid); + + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + { + ListCell *child; + List *children = find_all_inheritors(rel->localreloid, + RowExclusiveLock, + NULL); + + foreach(child, children) + { + Oid childrelid = lfirst_oid(child); + Relation childrel; + + if (list_member_oid(relids, childrelid)) + continue; + + /* find_all_inheritors already got lock */ + childrel = table_open(childrelid, NoLock); + + /* + * It is possible that the parent table has children that are + * temp tables of other backends. We cannot safely access + * such tables (because of buffering issues), and the best + * thing to do is to silently ignore them. Note that this + * check is the same as one of the checks done in + * truncate_check_activity() called below, still it is kept + * here for simplicity. + */ + if (RELATION_IS_OTHER_TEMP(childrel)) + { + table_close(childrel, RowExclusiveLock); + continue; + } + + rels = lappend(rels, childrel); + relids = lappend_oid(relids, childrelid); + /* Log this relation only if needed for logical decoding */ + if (RelationIsLogicallyLogged(childrel)) + relids_logged = lappend_oid(relids_logged, childrelid); + } + } } /* @@ -1012,11 +1271,11 @@ apply_handle_truncate(StringInfo s) */ ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs); - foreach(lc, remote_rels) + foreach(lc, rels) { - LogicalRepRelMapEntry *rel = lfirst(lc); + Relation rel = lfirst(lc); - logicalrep_rel_close(rel, NoLock); + table_close(rel, NoLock); } CommandCounterIncrement(); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 059d2c9194..99ceae0d5f 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -12,6 +12,7 @@ */ #include "postgres.h" +#include "access/tupconvert.h" #include "catalog/pg_publication.h" #include "fmgr.h" #include "replication/logical.h" @@ -49,6 +50,7 @@ static bool publications_valid; static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); +static void send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx); /* * Entry in the map used to remember which relation schemas we sent. @@ -59,9 +61,22 @@ static void publication_invalidation_cb(Datum arg, int cacheid, typedef struct RelationSyncEntry { Oid relid; /* relation oid */ - bool schema_sent; /* did we send the schema? */ + + /* + * Did we send the schema? If ancestor relid is set, its schema must also + * have been sent for this to be true. + */ + bool schema_sent; bool replicate_valid; PublicationActions pubactions; + + /* + * Valid if publishing relation's changes as changes to some ancestor, + * that is, if relation is a partition. The map, if any, will be used to + * convert the tuples from partition's type to the ancestor's. + */ + Oid replicate_as_relid; + TupleConversionMap *map; } RelationSyncEntry; /* Map used to remember which relation schemas we sent. */ @@ -259,47 +274,72 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } /* - * Write the relation schema if the current schema hasn't been sent yet. + * Write the current schema of the relation and its ancestor (if any) if not + * done yet. */ static void maybe_send_schema(LogicalDecodingContext *ctx, Relation relation, RelationSyncEntry *relentry) { - if (!relentry->schema_sent) + if (relentry->schema_sent) + return; + + /* If needed, send the ancestor's schema first. */ + if (OidIsValid(relentry->replicate_as_relid)) { - TupleDesc desc; - int i; + Relation ancestor = + RelationIdGetRelation(relentry->replicate_as_relid); + TupleDesc indesc = RelationGetDescr(relation); + TupleDesc outdesc = RelationGetDescr(ancestor); + MemoryContext oldctx; + + /* Map must live as long as the session does. */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + relentry->map = convert_tuples_by_name(indesc, outdesc); + MemoryContextSwitchTo(oldctx); + send_relation_and_attrs(ancestor, ctx); + RelationClose(ancestor); + } - desc = RelationGetDescr(relation); + send_relation_and_attrs(relation, ctx); + relentry->schema_sent = true; +} - /* - * Write out type info if needed. We do that only for user-created - * types. We use FirstGenbkiObjectId as the cutoff, so that we only - * consider objects with hand-assigned OIDs to be "built in", not for - * instance any function or type defined in the information_schema. - * This is important because only hand-assigned OIDs can be expected - * to remain stable across major versions. - */ - for (i = 0; i < desc->natts; i++) - { - Form_pg_attribute att = TupleDescAttr(desc, i); +/* + * Sends a relation + */ +static void +send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx) +{ + TupleDesc desc = RelationGetDescr(relation); + int i; - if (att->attisdropped || att->attgenerated) - continue; + /* + * Write out type info if needed. We do that only for user-created types. + * We use FirstGenbkiObjectId as the cutoff, so that we only consider + * objects with hand-assigned OIDs to be "built in", not for instance any + * function or type defined in the information_schema. This is important + * because only hand-assigned OIDs can be expected to remain stable across + * major versions. + */ + for (i = 0; i < desc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(desc, i); - if (att->atttypid < FirstGenbkiObjectId) - continue; + if (att->attisdropped || att->attgenerated) + continue; - OutputPluginPrepareWrite(ctx, false); - logicalrep_write_typ(ctx->out, att->atttypid); - OutputPluginWrite(ctx, false); - } + if (att->atttypid < FirstGenbkiObjectId) + continue; OutputPluginPrepareWrite(ctx, false); - logicalrep_write_rel(ctx->out, relation); + logicalrep_write_typ(ctx->out, att->atttypid); OutputPluginWrite(ctx, false); - relentry->schema_sent = true; } + + OutputPluginPrepareWrite(ctx, false); + logicalrep_write_rel(ctx->out, relation); + OutputPluginWrite(ctx, false); } /* @@ -346,28 +386,65 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, switch (change->action) { case REORDER_BUFFER_CHANGE_INSERT: - OutputPluginPrepareWrite(ctx, true); - logicalrep_write_insert(ctx->out, relation, - &change->data.tp.newtuple->tuple); - OutputPluginWrite(ctx, true); - break; + { + HeapTuple tuple = &change->data.tp.newtuple->tuple; + + /* Publish as root relation change if requested. */ + if (OidIsValid(relentry->replicate_as_relid)) + { + Assert(relation->rd_rel->relispartition); + relation = RelationIdGetRelation(relentry->replicate_as_relid); + /* Convert tuple if needed. */ + if (relentry->map) + tuple = execute_attr_map_tuple(tuple, relentry->map); + } + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_insert(ctx->out, relation, tuple); + OutputPluginWrite(ctx, true); + break; + } case REORDER_BUFFER_CHANGE_UPDATE: { HeapTuple oldtuple = change->data.tp.oldtuple ? &change->data.tp.oldtuple->tuple : NULL; + HeapTuple newtuple = &change->data.tp.newtuple->tuple; + + /* Publish as root relation change if requested. */ + if (OidIsValid(relentry->replicate_as_relid)) + { + Assert(relation->rd_rel->relispartition); + relation = RelationIdGetRelation(relentry->replicate_as_relid); + /* Convert tuples if needed. */ + if (relentry->map) + { + oldtuple = execute_attr_map_tuple(oldtuple, relentry->map); + newtuple = execute_attr_map_tuple(newtuple, relentry->map); + } + } OutputPluginPrepareWrite(ctx, true); - logicalrep_write_update(ctx->out, relation, oldtuple, - &change->data.tp.newtuple->tuple); + logicalrep_write_update(ctx->out, relation, oldtuple, newtuple); OutputPluginWrite(ctx, true); break; } case REORDER_BUFFER_CHANGE_DELETE: if (change->data.tp.oldtuple) { + HeapTuple oldtuple = &change->data.tp.oldtuple->tuple; + + /* Publish as root relation change if requested. */ + if (OidIsValid(relentry->replicate_as_relid)) + { + Assert(relation->rd_rel->relispartition); + relation = RelationIdGetRelation(relentry->replicate_as_relid); + /* Convert tuple if needed. */ + if (relentry->map) + oldtuple = execute_attr_map_tuple(oldtuple, relentry->map); + } + OutputPluginPrepareWrite(ctx, true); - logicalrep_write_delete(ctx->out, relation, - &change->data.tp.oldtuple->tuple); + logicalrep_write_delete(ctx->out, relation, oldtuple); OutputPluginWrite(ctx, true); } else @@ -411,6 +488,28 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (!relentry->pubactions.pubtruncate) continue; + /* + * If this partition was not *directly* truncated, don't bother + * sending it to the subscriber. + */ + if (OidIsValid(relentry->replicate_as_relid)) + { + int j; + bool can_skip_part_trunc = false; + + for (j = 0; j < nrelids; j++) + { + if (relentry->replicate_as_relid == relids[j]) + { + can_skip_part_trunc = true; + break; + } + } + + if (can_skip_part_trunc) + continue; + } + relids[nrelids++] = relid; maybe_send_schema(ctx, relation, relentry); } @@ -529,6 +628,11 @@ init_rel_sync_cache(MemoryContext cachectx) /* * Find or create entry in the relation schema cache. + * + * For a partition, the schema of the top-most ancestor that is published + * will be used in some cases, instead of that of the partition itself, so + * the information about ancestor's publications is looked up here and saved in + * the schema cache entry. */ static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Relation rel) @@ -553,8 +657,11 @@ get_rel_sync_entry(PGOutputData *data, Relation rel) { List *pubids = GetRelationPublications(relid); ListCell *lc, - *lc1; + *lc1, + *lc2; List *ancestor_pubids = NIL; + List *published_ancestors = NIL; + Oid topmost_published_ancestor = InvalidOid; /* Reload publications if needed before use. */ if (!publications_valid) @@ -579,7 +686,9 @@ get_rel_sync_entry(PGOutputData *data, Relation rel) /* For partitions, also consider publications of ancestors. */ if (rel->rd_rel->relispartition) ancestor_pubids = - GetRelationAncestorPublications(RelationGetRelid(rel)); + GetRelationAncestorPublications(RelationGetRelid(rel), + &published_ancestors); + Assert(list_length(ancestor_pubids) == list_length(published_ancestors)); foreach(lc, data->publications) { @@ -597,7 +706,7 @@ get_rel_sync_entry(PGOutputData *data, Relation rel) entry->pubactions.pubdelete && entry->pubactions.pubtruncate) break; - foreach(lc1, ancestor_pubids) + forboth(lc1, ancestor_pubids, lc2, published_ancestors) { if (lfirst_oid(lc1) == pub->oid) { @@ -605,6 +714,8 @@ get_rel_sync_entry(PGOutputData *data, Relation rel) entry->pubactions.pubupdate |= pub->pubactions.pubupdate; entry->pubactions.pubdelete |= pub->pubactions.pubdelete; entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; + if (pub->publish_using_root_schema) + topmost_published_ancestor = lfirst_oid(lc2); } } @@ -615,7 +726,9 @@ get_rel_sync_entry(PGOutputData *data, Relation rel) list_free(pubids); list_free(ancestor_pubids); + list_free(published_ancestors); + entry->replicate_as_relid = topmost_published_ancestor; entry->replicate_valid = true; } diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 9d13e5c735..0a45c11d7d 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -83,7 +83,7 @@ typedef struct Publication extern Publication *GetPublication(Oid pubid); extern Publication *GetPublicationByName(const char *pubname, bool missing_ok); extern List *GetRelationPublications(Oid relid); -extern List *GetRelationAncestorPublications(Oid relid); +extern List *GetRelationAncestorPublications(Oid relid, List **published_ancestors); extern List *GetPublicationRelations(Oid pubid); extern List *GetAllTablesPublications(void); extern List *GetAllTablesPublicationRelations(void); diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl index eb0f1cd6a8..957c7b4be1 100644 --- a/src/test/subscription/t/013_partition.pl +++ b/src/test/subscription/t/013_partition.pl @@ -3,7 +3,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 10; +use Test::More tests => 16; # setup @@ -41,21 +41,38 @@ $node_publisher->safe_psql('postgres', "CREATE TABLE tab1_2 PARTITION OF tab1 FOR VALUES IN (5, 6)"); $node_subscriber1->safe_psql('postgres', - "CREATE TABLE tab1_2 PARTITION OF tab1 (c DEFAULT 'sub1_tab1') FOR VALUES IN (5, 6)"); + "CREATE TABLE tab1_2 PARTITION OF tab1 (c DEFAULT 'sub1_tab1') FOR VALUES IN (5, 6) PARTITION BY LIST (a)"); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab1_2_1 PARTITION OF tab1_2 FOR VALUES IN (5)"); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab1_2_2 PARTITION OF tab1_2 FOR VALUES IN (6)"); $node_subscriber2->safe_psql('postgres', "CREATE TABLE tab1_2 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1_2', b text)"); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1', b text) PARTITION BY HASH (a)"); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1_part1 (b text, c text, a int NOT NULL)"); +$node_subscriber2->safe_psql('postgres', + "ALTER TABLE tab1 ATTACH PARTITION tab1_part1 FOR VALUES WITH (MODULUS 2, REMAINDER 0)"); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1_part2 PARTITION OF tab1 FOR VALUES WITH (MODULUS 2, REMAINDER 1)"); + $node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub1 FOR TABLE tab1, tab1_1"); $node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub2 FOR TABLE tab1_2"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub3 FOR TABLE tab1 WITH (publish_using_root_schema = true)"); $node_subscriber1->safe_psql('postgres', "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1"); $node_subscriber2->safe_psql('postgres', "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub2"); +$node_subscriber2->safe_psql('postgres', + "CREATE SUBSCRIPTION sub3 CONNECTION '$publisher_connstr' PUBLICATION pub3"); # Wait for initial sync of all subscriptions my $synced_query = @@ -85,17 +102,26 @@ $result = $node_subscriber2->safe_psql('postgres', "SELECT c, count(*), min(a), max(a) FROM tab1_2 GROUP BY 1"); is($result, qq(sub2_tab1_2|1|5|5), 'inserts into tab1_2 replicated'); +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1"); +is($result, qq(sub2_tab1|3|1|5), 'inserts into tab1_2 replicated'); + # update a row (no partition change) $node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 2 WHERE a = 1"); $node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); $result = $node_subscriber1->safe_psql('postgres', "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1"); is($result, qq(sub1_tab1|3|2|5), 'update of tab1_1 replicated'); +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1"); +is($result, qq(sub2_tab1|3|2|5), 'update of tab1_1 replicated'); + # update a row (partition changes) $node_publisher->safe_psql('postgres', @@ -112,6 +138,10 @@ $result = $node_subscriber2->safe_psql('postgres', "SELECT c, count(*), min(a), max(a) FROM tab1_2 GROUP BY 1"); is($result, qq(sub2_tab1_2|2|5|6), 'insert into tab1_2 replicated'); +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1"); +is($result, qq(sub2_tab1|3|3|6), 'delete from tab1_1 replicated'); + # delete rows (some from the root parent, some directly from the partition) $node_publisher->safe_psql('postgres', @@ -130,12 +160,18 @@ $result = $node_subscriber2->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab1_2"); is($result, qq(0||), 'delete from tab1_2 replicated'); +$result = $node_subscriber2->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1"); +is($result, qq(0||), 'delete from tab1_1, tab_2 replicated'); + # truncate (root parent and partition directly) $node_subscriber1->safe_psql('postgres', "INSERT INTO tab1 VALUES (1), (2), (5)"); $node_subscriber2->safe_psql('postgres', "INSERT INTO tab1_2 VALUES (5)"); +$node_subscriber2->safe_psql('postgres', + "INSERT INTO tab1 VALUES (1), (2), (5)"); $node_publisher->safe_psql('postgres', "TRUNCATE tab1_2"); @@ -151,6 +187,10 @@ $result = $node_subscriber2->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab1_2"); is($result, qq(0||), 'truncate of tab1_2 replicated'); +$result = $node_subscriber2->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1"); +is($result, qq(3|1|5), 'no change, because only truncate of tab1 will be replicated'); + $node_publisher->safe_psql('postgres', "TRUNCATE tab1"); @@ -159,3 +199,7 @@ $node_publisher->wait_for_catchup('sub1'); $result = $node_subscriber1->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab1"); is($result, qq(0||), 'truncate of tab1_1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1"); +is($result, qq(0||), 'truncate of tab1_1 replicated'); -- 2.16.5