From d7545b676e39edf77a6944d2b63c6dea25bb7569 Mon Sep 17 00:00:00 2001 From: Amit Langote Date: Thu, 23 Jan 2020 11:49:01 +0900 Subject: [PATCH v10 3/4] Add subscription support to replicate into partitioned tables Mainly, this adds support code in logical/worker.c for applying replicated operations whose target is a partitioned table to its relevant partitions. --- src/backend/executor/execReplication.c | 14 +- src/backend/replication/logical/relation.c | 161 +++++++++++++++++++ src/backend/replication/logical/tablesync.c | 28 ++-- src/backend/replication/logical/worker.c | 232 ++++++++++++++++++++++++++-- src/include/replication/logicalrelation.h | 2 + src/test/subscription/t/013_partition.pl | 7 +- 6 files changed, 413 insertions(+), 31 deletions(-) diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index 582b0cb017..635b29d050 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -591,17 +591,9 @@ CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname) { /* - * We currently only support writing to regular tables. However, give a - * more specific error for partitioned and foreign tables. + * Give a more specific error for foreign tables. */ - if (relkind == RELKIND_PARTITIONED_TABLE) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot use relation \"%s.%s\" as logical replication target", - nspname, relname), - errdetail("\"%s.%s\" is a partitioned table.", - nspname, relname))); - else if (relkind == RELKIND_FOREIGN_TABLE) + if (relkind == RELKIND_FOREIGN_TABLE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot use relation \"%s.%s\" as logical replication target", @@ -609,7 +601,7 @@ CheckSubscriptionRelkind(char relkind, const char *nspname, errdetail("\"%s.%s\" is a foreign table.", nspname, relname))); - if (relkind != RELKIND_RELATION) + if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot use relation \"%s.%s\" as logical replication target", diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index 3d7291b970..54189d7965 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -34,6 +34,7 @@ static MemoryContext LogicalRepRelMapContext = NULL; static HTAB *LogicalRepRelMap = NULL; static HTAB *LogicalRepTypMap = NULL; +static HTAB *LogicalRepPartMap = NULL; /* @@ -472,3 +473,163 @@ logicalrep_typmap_gettypname(Oid remoteid) Assert(OidIsValid(entry->remoteid)); return psprintf("%s.%s", entry->nspname, entry->typname); } + +/* + * Partition cache: look up partition LogicalRepRelMapEntry's + * + * Unlike relation map cache, this is keyed by partition OID, not remote + * relation OID, because we only have to use this cache in the case where + * partitions are not directly mapped to any remote relation, such as when + * replication is occurring with one of their ancestors as target. + */ + +/* + * Relcache invalidation callback + */ +static void +logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid) +{ + LogicalRepRelMapEntry *entry; + + /* Just to be sure. */ + if (LogicalRepPartMap == NULL) + return; + + if (reloid != InvalidOid) + { + HASH_SEQ_STATUS status; + + hash_seq_init(&status, LogicalRepPartMap); + + /* TODO, use inverse lookup hashtable? */ + while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) + { + if (entry->localreloid == reloid) + { + entry->localreloid = InvalidOid; + hash_seq_term(&status); + break; + } + } + } + else + { + /* invalidate all cache entries */ + HASH_SEQ_STATUS status; + + hash_seq_init(&status, LogicalRepPartMap); + + while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) + entry->localreloid = InvalidOid; + } +} + +/* + * Initialize the partition map cache. + */ +static void +logicalrep_partmap_init(void) +{ + HASHCTL ctl; + + if (!LogicalRepRelMapContext) + LogicalRepRelMapContext = + AllocSetContextCreate(CacheMemoryContext, + "LogicalRepPartMapContext", + ALLOCSET_DEFAULT_SIZES); + + /* Initialize the relation hash table. */ + MemSet(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(Oid); /* partition OID */ + ctl.entrysize = sizeof(LogicalRepRelMapEntry); + ctl.hcxt = LogicalRepRelMapContext; + + LogicalRepPartMap = hash_create("logicalrep partition map cache", 64, &ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + + /* Watch for invalidation events. */ + CacheRegisterRelcacheCallback(logicalrep_partmap_invalidate_cb, + (Datum) 0); +} + +/* + * logicalrep_partition_open + * + * Returned entry reuses most of the values of the root table's entry, save + * the attribute map, which can be different for the partition. + */ +LogicalRepRelMapEntry * +logicalrep_partition_open(LogicalRepRelMapEntry *root, + Relation partrel, AttrMap *map) +{ + LogicalRepRelMapEntry *entry; + LogicalRepRelation *remoterel = &root->remoterel; + Oid partOid = RelationGetRelid(partrel); + AttrMap *attrmap = root->attrmap; + bool found; + int i; + MemoryContext oldctx; + + if (LogicalRepPartMap == NULL) + logicalrep_partmap_init(); + + /* Search for existing entry. */ + entry = hash_search(LogicalRepPartMap, (void *) &partOid, + HASH_ENTER, &found); + + if (found) + return entry; + + memset(entry, 0, sizeof(LogicalRepRelMapEntry)); + + /* Make cached copy of the data */ + oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext); + + /* Remote relation is used as-is from the root's entry. */ + entry->remoterel.remoteid = remoterel->remoteid; + entry->remoterel.nspname = pstrdup(remoterel->nspname); + entry->remoterel.relname = pstrdup(remoterel->relname); + entry->remoterel.natts = remoterel->natts; + entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *)); + entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid)); + for (i = 0; i < remoterel->natts; i++) + { + entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]); + entry->remoterel.atttyps[i] = remoterel->atttyps[i]; + } + entry->remoterel.replident = remoterel->replident; + entry->remoterel.attkeys = bms_copy(remoterel->attkeys); + + entry->localrel = partrel; + entry->localreloid = partOid; + + /* + * If the partition's attributes don't match the root relation's, we'll + * need to make a new attrmap which maps partition attribute numbers to + * remoterel's, instead the original which maps root relation's attribute + * numbers to remoterel's. + */ + if (map) + { + AttrNumber attno; + + entry->attrmap = make_attrmap(map->maplen); + memset(entry->attrmap->attnums, -1, + entry->attrmap->maplen * sizeof(AttrNumber)); + for (attno = 0; attno < entry->attrmap->maplen; attno++) + { + AttrNumber root_attno = map->attnums[attno]; + + entry->attrmap->attnums[attno] = attrmap->attnums[root_attno - 1]; + } + } + else + entry->attrmap = attrmap; + + entry->updatable = root->updatable; + + /* state and statelsn are left set to 0. */ + MemoryContextSwitchTo(oldctx); + + return entry; +} diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 98825f01e9..6a18b78f22 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -630,16 +630,17 @@ copy_read_data(void *outbuf, int minread, int maxread) /* * Get information about remote relation in similar fashion the RELATION - * message provides during replication. + * message provides during replication. XXX - while we fetch relkind too + * here, the RELATION message doesn't provide it */ static void fetch_remote_table_info(char *nspname, char *relname, - LogicalRepRelation *lrel) + LogicalRepRelation *lrel, char *relkind) { WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; - Oid tableRow[2] = {OIDOID, CHAROID}; + Oid tableRow[3] = {OIDOID, CHAROID, CHAROID}; Oid attrRow[4] = {TEXTOID, OIDOID, INT4OID, BOOLOID}; bool isnull; int natt; @@ -649,16 +650,16 @@ fetch_remote_table_info(char *nspname, char *relname, /* First fetch Oid and replica identity. */ initStringInfo(&cmd); - appendStringInfo(&cmd, "SELECT c.oid, c.relreplident" + appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind" " FROM pg_catalog.pg_class c" " INNER JOIN pg_catalog.pg_namespace n" " ON (c.relnamespace = n.oid)" " WHERE n.nspname = %s" " AND c.relname = %s" - " AND c.relkind = 'r'", + " AND pg_relation_is_publishable(c.oid)", quote_literal_cstr(nspname), quote_literal_cstr(relname)); - res = walrcv_exec(wrconn, cmd.data, 2, tableRow); + res = walrcv_exec(wrconn, cmd.data, 3, tableRow); if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, @@ -675,6 +676,8 @@ fetch_remote_table_info(char *nspname, char *relname, Assert(!isnull); lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull)); Assert(!isnull); + *relkind = DatumGetChar(slot_getattr(slot, 3, &isnull)); + Assert(!isnull); ExecDropSingleTupleTableSlot(slot); walrcv_clear_result(res); @@ -750,10 +753,12 @@ copy_table(Relation rel) CopyState cstate; List *attnamelist; ParseState *pstate; + char remote_relkind; /* Get the publisher relation info. */ fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), - RelationGetRelationName(rel), &lrel); + RelationGetRelationName(rel), &lrel, + &remote_relkind); /* Put the relation into relmap. */ logicalrep_relmap_update(&lrel); @@ -761,12 +766,15 @@ copy_table(Relation rel) /* Map the publisher relation to local one. */ relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock); Assert(rel == relmapentry->localrel); - Assert(relmapentry->localrel->rd_rel->relkind == RELKIND_RELATION); /* Start copy on the publisher. */ initStringInfo(&cmd); - appendStringInfo(&cmd, "COPY %s TO STDOUT", - quote_qualified_identifier(lrel.nspname, lrel.relname)); + if (remote_relkind == RELKIND_PARTITIONED_TABLE) + appendStringInfo(&cmd, "COPY (SELECT * FROM %s) TO STDOUT", + quote_qualified_identifier(lrel.nspname, lrel.relname)); + else + appendStringInfo(&cmd, "COPY %s TO STDOUT", + quote_qualified_identifier(lrel.nspname, lrel.relname)); res = walrcv_exec(wrconn, cmd.data, 0, NULL); pfree(cmd.data); if (res->status != WALRCV_OK_COPY_OUT) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 86601f6e8f..a48537db0c 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -29,11 +29,14 @@ #include "access/xlog_internal.h" #include "catalog/catalog.h" #include "catalog/namespace.h" +#include "catalog/partition.h" +#include "catalog/pg_inherits.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "commands/tablecmds.h" #include "commands/trigger.h" #include "executor/executor.h" +#include "executor/execPartition.h" #include "executor/nodeModifyTable.h" #include "funcapi.h" #include "libpq/pqformat.h" @@ -720,6 +723,152 @@ apply_handle_do_delete(ResultRelInfo *relinfo, EState *estate, EvalPlanQualEnd(&epqstate); } +/* + * This handles insert, update, delete on a partitioned table. + */ +static void +apply_handle_tuple_routing(ResultRelInfo *relinfo, + EState *estate, + TupleTableSlot *remoteslot, + LogicalRepTupleData *newtup, + LogicalRepRelMapEntry *relmapentry, + CmdType operation) +{ + Relation rel = relinfo->ri_RelationDesc; + ModifyTableState *mtstate = NULL; + PartitionTupleRouting *proute = NULL; + ResultRelInfo *partrelinfo; + TupleTableSlot *localslot; + PartitionRoutingInfo *partinfo; + TupleConversionMap *map; + MemoryContext oldctx; + + /* ModifyTableState is needed for ExecFindPartition(). */ + mtstate = makeNode(ModifyTableState); + mtstate->ps.plan = NULL; + mtstate->ps.state = estate; + mtstate->operation = operation; + mtstate->resultRelInfo = relinfo; + proute = ExecSetupPartitionTupleRouting(estate, mtstate, rel); + + /* + * Find a partition for the tuple contained in remoteslot. + * + * For insert, remoteslot is tuple to insert. For update and delete, it + * is the tuple to be replaced and deleted, respectively. + */ + Assert(remoteslot != NULL); + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + /* The following throws an error if a suitable partition is not found. */ + partrelinfo = ExecFindPartition(mtstate, relinfo, proute, + remoteslot, estate); + Assert(partrelinfo != NULL); + /* Convert the tuple to match the partition's rowtype. */ + partinfo = partrelinfo->ri_PartitionInfo; + map = partinfo->pi_RootToPartitionMap; + if (map != NULL) + { + TupleTableSlot *part_slot = partinfo->pi_PartitionTupleSlot; + + remoteslot = execute_attr_map_slot(map->attrMap, remoteslot, + part_slot); + } + MemoryContextSwitchTo(oldctx); + + switch (operation) + { + case CMD_INSERT: + /* Just insert into the partition. */ + estate->es_result_relation_info = partrelinfo; + apply_handle_do_insert(partrelinfo, estate, remoteslot); + break; + + case CMD_DELETE: + /* Just delete from the partition. */ + estate->es_result_relation_info = partrelinfo; + apply_handle_do_delete(partrelinfo, estate, remoteslot, + &relmapentry->remoterel); + break; + + case CMD_UPDATE: + { + ResultRelInfo *partrelinfo_new; + + /* + * partrelinfo computed above is the partition which might + * contain the search tuple. Now find the partition for the + * replacement tuple, which might not be the same as + * partrelinfo. + */ + localslot = table_slot_create(rel, &estate->es_tupleTable); + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + slot_modify_cstrings(localslot, remoteslot, relmapentry, + newtup->values, newtup->changed); + partrelinfo_new = ExecFindPartition(mtstate, relinfo, proute, + localslot, estate); + + MemoryContextSwitchTo(oldctx); + + /* + * If both search and replacement tuple would be in the same + * partition, we can apply this as an UPDATE on the parttion. + */ + if (partrelinfo == partrelinfo_new) + { + Relation partrel = partrelinfo->ri_RelationDesc; + AttrMap *attrmap = map ? map->attrMap : NULL; + LogicalRepRelMapEntry *part_entry; + + part_entry = logicalrep_partition_open(relmapentry, + partrel, attrmap); + + /* UPDATE partition. */ + estate->es_result_relation_info = partrelinfo; + apply_handle_do_update(partrelinfo, estate, remoteslot, + newtup, part_entry); + } + else + { + /* + * Different, so handle this as DELETE followed by INSERT. + */ + + /* DELETE from partition partrelinfo. */ + estate->es_result_relation_info = partrelinfo; + apply_handle_do_delete(partrelinfo, estate, remoteslot, + &relmapentry->remoterel); + + /* + * Convert the replacement tuple to match the destination + * partition rowtype. + */ + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + partinfo = partrelinfo_new->ri_PartitionInfo; + map = partinfo->pi_RootToPartitionMap; + if (map != NULL) + { + TupleTableSlot *part_slot = partinfo->pi_PartitionTupleSlot; + + localslot = execute_attr_map_slot(map->attrMap, localslot, + part_slot); + } + MemoryContextSwitchTo(oldctx); + /* INSERT into partition partrelinfo_new. */ + estate->es_result_relation_info = partrelinfo_new; + apply_handle_do_insert(partrelinfo_new, estate, + localslot); + } + } + break; + + default: + elog(ERROR, "unrecognized CmdType: %d", (int) operation); + break; + } + + ExecCleanupTupleRouting(mtstate, proute); +} + /* * Handle INSERT message. */ @@ -762,9 +911,13 @@ apply_handle_insert(StringInfo s) slot_fill_defaults(rel, estate, remoteslot); MemoryContextSwitchTo(oldctx); - Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION); - apply_handle_do_insert(estate->es_result_relation_info, estate, - remoteslot); + /* For a partitioned table, insert the tuple into a partition. */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + apply_handle_tuple_routing(estate->es_result_relation_info, estate, + remoteslot, NULL, rel, CMD_INSERT); + else + apply_handle_do_insert(estate->es_result_relation_info, estate, + remoteslot); PopActiveSnapshot(); @@ -877,9 +1030,13 @@ apply_handle_update(StringInfo s) has_oldtup ? oldtup.values : newtup.values); MemoryContextSwitchTo(oldctx); - Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION); - apply_handle_do_update(estate->es_result_relation_info, estate, - remoteslot, &newtup, rel); + /* For a partitioned table, apply update to correct partition. */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + apply_handle_tuple_routing(estate->es_result_relation_info, estate, + remoteslot, &newtup, rel, CMD_UPDATE); + else + apply_handle_do_update(estate->es_result_relation_info, estate, + remoteslot, &newtup, rel); PopActiveSnapshot(); @@ -940,9 +1097,13 @@ apply_handle_delete(StringInfo s) slot_store_cstrings(remoteslot, rel, oldtup.values); MemoryContextSwitchTo(oldctx); - Assert(rel->localrel->rd_rel->relkind == RELKIND_RELATION); - apply_handle_do_delete(estate->es_result_relation_info, estate, - remoteslot, &rel->remoterel); + /* For a partitioned table, apply delete to correct partition. */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + apply_handle_tuple_routing(estate->es_result_relation_info, estate, + remoteslot, NULL, rel, CMD_DELETE); + else + apply_handle_do_delete(estate->es_result_relation_info, estate, + remoteslot, &rel->remoterel); PopActiveSnapshot(); @@ -970,6 +1131,7 @@ apply_handle_truncate(StringInfo s) List *remote_relids = NIL; List *remote_rels = NIL; List *rels = NIL; + List *part_rels = NIL; List *relids = NIL; List *relids_logged = NIL; ListCell *lc; @@ -999,6 +1161,52 @@ apply_handle_truncate(StringInfo s) relids = lappend_oid(relids, rel->localreloid); if (RelationIsLogicallyLogged(rel->localrel)) relids_logged = lappend_oid(relids_logged, rel->localreloid); + + /* + * Truncate partitions if we got a message to truncate a partitioned + * table. + */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + { + ListCell *child; + List *children = find_all_inheritors(rel->localreloid, + RowExclusiveLock, + NULL); + + foreach(child, children) + { + Oid childrelid = lfirst_oid(child); + Relation childrel; + + if (list_member_oid(relids, childrelid)) + continue; + + /* find_all_inheritors already got lock */ + childrel = table_open(childrelid, NoLock); + + /* + * It is possible that the parent table has children that are + * temp tables of other backends. We cannot safely access + * such tables (because of buffering issues), and the best + * thing to do is to silently ignore them. Note that this + * check is the same as one of the checks done in + * truncate_check_activity() called below, still it is kept + * here for simplicity. + */ + if (RELATION_IS_OTHER_TEMP(childrel)) + { + table_close(childrel, RowExclusiveLock); + continue; + } + + rels = lappend(rels, childrel); + part_rels = lappend(part_rels, childrel); + relids = lappend_oid(relids, childrelid); + /* Log this relation only if needed for logical decoding */ + if (RelationIsLogicallyLogged(childrel)) + relids_logged = lappend_oid(relids_logged, childrelid); + } + } } /* @@ -1014,6 +1222,12 @@ apply_handle_truncate(StringInfo s) logicalrep_rel_close(rel, NoLock); } + foreach(lc, part_rels) + { + Relation rel = lfirst(lc); + + table_close(rel, NoLock); + } CommandCounterIncrement(); } diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h index 9971a8028c..4650b4f9e1 100644 --- a/src/include/replication/logicalrelation.h +++ b/src/include/replication/logicalrelation.h @@ -34,6 +34,8 @@ extern void logicalrep_relmap_update(LogicalRepRelation *remoterel); extern LogicalRepRelMapEntry *logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode); +extern LogicalRepRelMapEntry *logicalrep_partition_open(LogicalRepRelMapEntry *root, + Relation partrel, AttrMap *map); extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode); diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl index 1fa392b618..1ec487154b 100644 --- a/src/test/subscription/t/013_partition.pl +++ b/src/test/subscription/t/013_partition.pl @@ -42,10 +42,15 @@ $node_subscriber1->safe_psql('postgres', "CREATE TABLE tab1 (a int PRIMARY KEY, b text, c text) PARTITION BY LIST (a)"); $node_subscriber1->safe_psql('postgres', "CREATE TABLE tab1_1 (b text, c text DEFAULT 'sub1_tab1', a int NOT NULL)"); + $node_subscriber1->safe_psql('postgres', "ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3, 4)"); $node_subscriber1->safe_psql('postgres', - "CREATE TABLE tab1_2 PARTITION OF tab1 (c DEFAULT 'sub1_tab1') FOR VALUES IN (5, 6)"); + "CREATE TABLE tab1_2 PARTITION OF tab1 (c DEFAULT 'sub1_tab1') FOR VALUES IN (5, 6) PARTITION BY LIST (a)"); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab1_2_1 PARTITION OF tab1_2 FOR VALUES IN (5)"); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab1_2_2 PARTITION OF tab1_2 FOR VALUES IN (6)"); $node_subscriber1->safe_psql('postgres', "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1"); -- 2.16.5