From 61e983f85ccd309fd8aea2ae2119db278812b119 Mon Sep 17 00:00:00 2001 From: amit Date: Fri, 29 Nov 2019 17:40:11 +0900 Subject: [PATCH v10 4/4] Publish partitioned table inserts as its own To control whether partition changes are replicated using their own identity (and schema) or an ancestor's, add a new parameter that cab be set per publication named 'publish_using_root_schema'. --- doc/src/sgml/logical-replication.sgml | 12 +- doc/src/sgml/ref/create_publication.sgml | 17 +++ src/backend/catalog/partition.c | 9 ++ src/backend/catalog/pg_publication.c | 98 ++++++++++--- src/backend/commands/publicationcmds.c | 95 ++++++++----- src/backend/commands/tablecmds.c | 2 +- src/backend/executor/nodeModifyTable.c | 4 + src/backend/replication/pgoutput/pgoutput.c | 211 ++++++++++++++++++++++------ src/backend/utils/cache/relcache.c | 7 +- src/bin/pg_dump/pg_dump.c | 22 ++- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/describe.c | 17 ++- src/include/catalog/partition.h | 1 + src/include/catalog/pg_publication.h | 7 +- src/test/regress/expected/publication.out | 103 ++++++++------ src/test/regress/sql/publication.sql | 3 + src/test/subscription/t/013_partition.pl | 170 +++++++++++++++++++++- 17 files changed, 611 insertions(+), 168 deletions(-) diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index fa30ac27f7..98da594eeb 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -402,16 +402,8 @@ - Replication is only supported by regular and partitioned tables, although - the type of the table must match between the two servers, that is, one - cannot replicate from a regular table into a partitioned able or vice - versa. Also, when replicating between partitioned tables, the actual - replication occurs between leaf partitions, so the partitions on the two - servers must match one-to-one. - - - - Attempts to replicate other types of relations such as views, materialized + Replication is only supported by regular and partitioned tables. + Attempts to replicate other types of relations such as view, materialized views, or foreign tables, will result in an error. diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index a304f9b8c3..b51701a623 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -122,6 +122,23 @@ CREATE PUBLICATION name + + + publish_using_root_schema (boolean) + + + This parameter determines whether DML operations on a partitioned + table (or on its partitions) contained in the publication will be + published using its own schema rather than of the individual + partitions which are actually changed; the latter is the default. + Setting it to true allows the changes to be + replicated into a non-partitioned table or a partitioned table + consisting of a different set of partitions. However, + TRUNCATE operations performed directly on + partitions are not replicated. + + + diff --git a/src/backend/catalog/partition.c b/src/backend/catalog/partition.c index 239ac017fa..07853b85d5 100644 --- a/src/backend/catalog/partition.c +++ b/src/backend/catalog/partition.c @@ -28,6 +28,7 @@ #include "partitioning/partbounds.h" #include "rewrite/rewriteManip.h" #include "utils/fmgroids.h" +#include "utils/lsyscache.h" #include "utils/partcache.h" #include "utils/rel.h" #include "utils/syscache.h" @@ -126,6 +127,14 @@ get_partition_ancestors(Oid relid) return result; } +/* Is given relation a leaf partition? */ +bool +is_leaf_partition(Oid relid) +{ + return get_rel_relkind(relid) != RELKIND_PARTITIONED_TABLE && + get_rel_relispartition(relid); +} + /* * get_partition_ancestors_worker * recursive worker for get_partition_ancestors diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 14d4ad3abd..6fac401ed4 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -224,13 +224,30 @@ publication_add_relation(Oid pubid, Relation targetrel, /* * Gets list of publication oids for a relation, plus those of ancestors, * if any, if the relation is a partition. + * + * *published_rels, if asked for, will contain the OID of the relation for + * each publication returned, that is, of the relation that is actually + * published. Examining this list allows the caller, for instance, to + * distinguish publications that it is directly part from those that it is + * indirectly part of via an ancestor. */ List * -GetRelationPublications(Oid relid) +GetRelationPublications(Oid relid, List **published_rels) { List *result = NIL; + int i, + num; + + if (published_rels) + *published_rels = NIL; result = get_rel_publications(relid); + if (published_rels) + { + num = list_length(result); + for (i = 0; i < num; i++) + *published_rels = lappend_oid(*published_rels, relid); + } if (get_rel_relispartition(relid)) { List *ancestors = get_partition_ancestors(relid); @@ -242,6 +259,12 @@ GetRelationPublications(Oid relid) List *ancestor_pubs = get_rel_publications(ancestor); result = list_concat(result, ancestor_pubs); + if (published_rels) + { + num = list_length(ancestor_pubs); + for (i = 0; i < num; i++) + *published_rels = lappend_oid(*published_rels, ancestor); + } } } @@ -362,9 +385,13 @@ GetAllTablesPublications(void) /* * Gets list of all relation published by FOR ALL TABLES publication(s). + * + * If the publication publishes partition changes via their respective root + * partitioned tables, we must exclude partitions in favor of including the + * root partitioned tables. */ List * -GetAllTablesPublicationRelations(void) +GetAllTablesPublicationRelations(bool pubasroot) { Relation classRel; ScanKeyData key[1]; @@ -386,12 +413,35 @@ GetAllTablesPublicationRelations(void) Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple); Oid relid = relForm->oid; - if (is_publishable_class(relid, relForm)) + if (is_publishable_class(relid, relForm) && + !(relForm->relispartition && pubasroot)) result = lappend_oid(result, relid); } table_endscan(scan); - table_close(classRel, AccessShareLock); + + if (pubasroot) + { + ScanKeyInit(&key[0], + Anum_pg_class_relkind, + BTEqualStrategyNumber, F_CHAREQ, + CharGetDatum(RELKIND_PARTITIONED_TABLE)); + + scan = table_beginscan_catalog(classRel, 1, key); + + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple); + Oid relid = relForm->oid; + + if (is_publishable_class(relid, relForm) && + !relForm->relispartition) + result = lappend_oid(result, relid); + } + + table_endscan(scan); + table_close(classRel, AccessShareLock); + } return result; } @@ -422,6 +472,7 @@ GetPublication(Oid pubid) pub->pubactions.pubupdate = pubform->pubupdate; pub->pubactions.pubdelete = pubform->pubdelete; pub->pubactions.pubtruncate = pubform->pubtruncate; + pub->pubasroot = pubform->pubasroot; ReleaseSysCache(tup); @@ -518,36 +569,45 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) /* * Publications support partitioned tables, although we need to filter - * them out from the result, because all changes are replicated using - * the leaf partition identity and schema. + * them out from the result unless the publication replicates changes + * using the root schema. In other cases, we return only their leaf + * partitions, because all changes are replicated using the leaf + * partition identity and schema. */ if (publication->alltables) { - /* - * GetAllTablesPublicationRelations() only ever returns leaf - * partitions. - */ - tables = GetAllTablesPublicationRelations(); + tables = GetAllTablesPublicationRelations(publication->pubasroot); } else { List *all_tables; ListCell *lc; + /* + * Only need partitions if not replicating partitioned table + * changes using the root schema. + */ + all_tables = GetPublicationRelations(publication->oid, + !publication->pubasroot); + /* * GetPublicationRelations() includes partitioned tables in its * result which is required by other internal users of that - * function, which must be filtered out. + * function, which must be filtered out if needed. */ - all_tables = GetPublicationRelations(publication->oid, true); - tables = NIL; - foreach(lc, all_tables) + if (!publication->pubasroot) { - Oid relid = lfirst_oid(lc); - - if (get_rel_relkind(relid) != RELKIND_PARTITIONED_TABLE) - tables = lappend_oid(tables, relid); + tables = NIL; + foreach(lc, all_tables) + { + Oid relid = lfirst_oid(lc); + + if (get_rel_relkind(relid) != RELKIND_PARTITIONED_TABLE) + tables = lappend_oid(tables, relid); + } } + else + tables = all_tables; } funcctx->user_fctx = (void *) tables; diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index d4b43e7662..309ee77650 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -23,6 +23,7 @@ #include "catalog/namespace.h" #include "catalog/objectaccess.h" #include "catalog/objectaddress.h" +#include "catalog/partition.h" #include "catalog/pg_inherits.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" @@ -55,20 +56,23 @@ static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok); static void parse_publication_options(List *options, bool *publish_given, - bool *publish_insert, - bool *publish_update, - bool *publish_delete, - bool *publish_truncate) + PublicationActions *pubactions, + bool *publish_using_root_schema_given, + bool *publish_using_root_schema) { ListCell *lc; + *publish_using_root_schema_given = false; *publish_given = false; /* Defaults are true */ - *publish_insert = true; - *publish_update = true; - *publish_delete = true; - *publish_truncate = true; + pubactions->pubinsert = true; + pubactions->pubupdate = true; + pubactions->pubdelete = true; + pubactions->pubtruncate = true; + + /* Relation changes published as of itself by default. */ + *publish_using_root_schema = false; /* Parse options */ foreach(lc, options) @@ -90,10 +94,10 @@ parse_publication_options(List *options, * If publish option was given only the explicitly listed actions * should be published. */ - *publish_insert = false; - *publish_update = false; - *publish_delete = false; - *publish_truncate = false; + pubactions->pubinsert = false; + pubactions->pubupdate = false; + pubactions->pubdelete = false; + pubactions->pubtruncate = false; *publish_given = true; publish = defGetString(defel); @@ -109,19 +113,28 @@ parse_publication_options(List *options, char *publish_opt = (char *) lfirst(lc); if (strcmp(publish_opt, "insert") == 0) - *publish_insert = true; + pubactions->pubinsert = true; else if (strcmp(publish_opt, "update") == 0) - *publish_update = true; + pubactions->pubupdate = true; else if (strcmp(publish_opt, "delete") == 0) - *publish_delete = true; + pubactions->pubdelete = true; else if (strcmp(publish_opt, "truncate") == 0) - *publish_truncate = true; + pubactions->pubtruncate = true; else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("unrecognized \"publish\" value: \"%s\"", publish_opt))); } } + else if (strcmp(defel->defname, "publish_using_root_schema") == 0) + { + if (*publish_using_root_schema_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + *publish_using_root_schema_given = true; + *publish_using_root_schema = defGetBoolean(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -142,10 +155,9 @@ CreatePublication(CreatePublicationStmt *stmt) Datum values[Natts_pg_publication]; HeapTuple tup; bool publish_given; - bool publish_insert; - bool publish_update; - bool publish_delete; - bool publish_truncate; + PublicationActions pubactions; + bool publish_using_root_schema_given; + bool publish_using_root_schema; AclResult aclresult; /* must have CREATE privilege on database */ @@ -182,9 +194,9 @@ CreatePublication(CreatePublicationStmt *stmt) values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId()); parse_publication_options(stmt->options, - &publish_given, &publish_insert, - &publish_update, &publish_delete, - &publish_truncate); + &publish_given, &pubactions, + &publish_using_root_schema_given, + &publish_using_root_schema); puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId, Anum_pg_publication_oid); @@ -192,13 +204,15 @@ CreatePublication(CreatePublicationStmt *stmt) values[Anum_pg_publication_puballtables - 1] = BoolGetDatum(stmt->for_all_tables); values[Anum_pg_publication_pubinsert - 1] = - BoolGetDatum(publish_insert); + BoolGetDatum(pubactions.pubinsert); values[Anum_pg_publication_pubupdate - 1] = - BoolGetDatum(publish_update); + BoolGetDatum(pubactions.pubupdate); values[Anum_pg_publication_pubdelete - 1] = - BoolGetDatum(publish_delete); + BoolGetDatum(pubactions.pubdelete); values[Anum_pg_publication_pubtruncate - 1] = - BoolGetDatum(publish_truncate); + BoolGetDatum(pubactions.pubtruncate); + values[Anum_pg_publication_pubasroot - 1] = + BoolGetDatum(publish_using_root_schema); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -250,17 +264,16 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel, bool replaces[Natts_pg_publication]; Datum values[Natts_pg_publication]; bool publish_given; - bool publish_insert; - bool publish_update; - bool publish_delete; - bool publish_truncate; + PublicationActions pubactions; + bool publish_using_root_schema_given; + bool publish_using_root_schema; ObjectAddress obj; Form_pg_publication pubform; parse_publication_options(stmt->options, - &publish_given, &publish_insert, - &publish_update, &publish_delete, - &publish_truncate); + &publish_given, &pubactions, + &publish_using_root_schema_given, + &publish_using_root_schema); /* Everything ok, form a new tuple. */ memset(values, 0, sizeof(values)); @@ -269,19 +282,25 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel, if (publish_given) { - values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(publish_insert); + values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert); replaces[Anum_pg_publication_pubinsert - 1] = true; - values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(publish_update); + values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate); replaces[Anum_pg_publication_pubupdate - 1] = true; - values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(publish_delete); + values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete); replaces[Anum_pg_publication_pubdelete - 1] = true; - values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(publish_truncate); + values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate); replaces[Anum_pg_publication_pubtruncate - 1] = true; } + if (publish_using_root_schema_given) + { + values[Anum_pg_publication_pubasroot - 1] = BoolGetDatum(publish_using_root_schema); + replaces[Anum_pg_publication_pubasroot - 1] = true; + } + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, replaces); diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 7c23968f2d..5ba7dde845 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -14625,7 +14625,7 @@ ATPrepChangePersistence(Relation rel, bool toLogged) * UNLOGGED as UNLOGGED tables can't be published. */ if (!toLogged && - list_length(GetRelationPublications(RelationGetRelid(rel))) > 0) + list_length(GetRelationPublications(RelationGetRelid(rel), NULL)) > 0) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot change table \"%s\" to unlogged because it is part of a publication", diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 59d1a31c97..f88377a0c2 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -2295,8 +2295,12 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) /* If modifying a partitioned table, initialize the root table info */ if (node->rootResultRelIndex >= 0) + { mtstate->rootResultRelInfo = estate->es_root_result_relations + node->rootResultRelIndex; + /* Only necessary to check replication identity. */ + CheckValidResultRel(mtstate->rootResultRelInfo, operation); + } mtstate->mt_arowmarks = (List **) palloc0(sizeof(List *) * nplans); mtstate->mt_nplans = nplans; diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index d6b9cbe1bd..ac88ba4f83 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -12,6 +12,8 @@ */ #include "postgres.h" +#include "access/tupconvert.h" +#include "catalog/partition.h" #include "catalog/pg_publication.h" #include "fmgr.h" #include "replication/logical.h" @@ -20,6 +22,7 @@ #include "replication/pgoutput.h" #include "utils/int8.h" #include "utils/inval.h" +#include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/syscache.h" #include "utils/varlena.h" @@ -49,6 +52,7 @@ static bool publications_valid; static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); +static void send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx); /* * Entry in the map used to remember which relation schemas we sent. @@ -59,9 +63,33 @@ static void publication_invalidation_cb(Datum arg, int cacheid, typedef struct RelationSyncEntry { Oid relid; /* relation oid */ - bool schema_sent; /* did we send the schema? */ + + /* + * Did we send the schema? If ancestor relid is set, its schema must also + * have been sent for this to be true. + */ + bool schema_sent; bool replicate_valid; PublicationActions pubactions; + + /* + * True when publication that is matched by get_rel_sync_entry for this + * relation is configured as such. + */ + bool pubasroot; + + /* + * OID of the ancestor whose schema will be used when replicating changes + * to a partition; InvalidOid if pubasroot is false. + */ + Oid replicate_as_relid; + + /* + * Map, if any, used when replicating using an ancestor's schema to + * convert the tuples from partition's type to the ancestor's; NULL if + * pubasroot is false. + */ + TupleConversionMap *map; } RelationSyncEntry; /* Map used to remember which relation schemas we sent. */ @@ -259,47 +287,72 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } /* - * Write the relation schema if the current schema hasn't been sent yet. + * Write the current schema of the relation and its ancestor (if any) if not + * done yet. */ static void maybe_send_schema(LogicalDecodingContext *ctx, Relation relation, RelationSyncEntry *relentry) { - if (!relentry->schema_sent) + if (relentry->schema_sent) + return; + + /* If needed, send the ancestor's schema first. */ + if (OidIsValid(relentry->replicate_as_relid)) { - TupleDesc desc; - int i; + Relation ancestor = + RelationIdGetRelation(relentry->replicate_as_relid); + TupleDesc indesc = RelationGetDescr(relation); + TupleDesc outdesc = RelationGetDescr(ancestor); + MemoryContext oldctx; + + /* Map must live as long as the session does. */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + relentry->map = convert_tuples_by_name(indesc, outdesc); + MemoryContextSwitchTo(oldctx); + send_relation_and_attrs(ancestor, ctx); + RelationClose(ancestor); + } - desc = RelationGetDescr(relation); + send_relation_and_attrs(relation, ctx); + relentry->schema_sent = true; +} - /* - * Write out type info if needed. We do that only for user-created - * types. We use FirstGenbkiObjectId as the cutoff, so that we only - * consider objects with hand-assigned OIDs to be "built in", not for - * instance any function or type defined in the information_schema. - * This is important because only hand-assigned OIDs can be expected - * to remain stable across major versions. - */ - for (i = 0; i < desc->natts; i++) - { - Form_pg_attribute att = TupleDescAttr(desc, i); +/* + * Sends a relation + */ +static void +send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx) +{ + TupleDesc desc = RelationGetDescr(relation); + int i; - if (att->attisdropped || att->attgenerated) - continue; + /* + * Write out type info if needed. We do that only for user-created types. + * We use FirstGenbkiObjectId as the cutoff, so that we only consider + * objects with hand-assigned OIDs to be "built in", not for instance any + * function or type defined in the information_schema. This is important + * because only hand-assigned OIDs can be expected to remain stable across + * major versions. + */ + for (i = 0; i < desc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(desc, i); - if (att->atttypid < FirstGenbkiObjectId) - continue; + if (att->attisdropped || att->attgenerated) + continue; - OutputPluginPrepareWrite(ctx, false); - logicalrep_write_typ(ctx->out, att->atttypid); - OutputPluginWrite(ctx, false); - } + if (att->atttypid < FirstGenbkiObjectId) + continue; OutputPluginPrepareWrite(ctx, false); - logicalrep_write_rel(ctx->out, relation); + logicalrep_write_typ(ctx->out, att->atttypid); OutputPluginWrite(ctx, false); - relentry->schema_sent = true; } + + OutputPluginPrepareWrite(ctx, false); + logicalrep_write_rel(ctx->out, relation); + OutputPluginWrite(ctx, false); } /* @@ -346,28 +399,68 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, switch (change->action) { case REORDER_BUFFER_CHANGE_INSERT: - OutputPluginPrepareWrite(ctx, true); - logicalrep_write_insert(ctx->out, relation, - &change->data.tp.newtuple->tuple); - OutputPluginWrite(ctx, true); - break; + { + HeapTuple tuple = &change->data.tp.newtuple->tuple; + + /* Publish as root relation change if requested. */ + if (OidIsValid(relentry->replicate_as_relid)) + { + Assert(relentry->pubasroot); + Assert(relation->rd_rel->relispartition); + relation = RelationIdGetRelation(relentry->replicate_as_relid); + /* Convert tuple if needed. */ + if (relentry->map) + tuple = execute_attr_map_tuple(tuple, relentry->map); + } + + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_insert(ctx->out, relation, tuple); + OutputPluginWrite(ctx, true); + break; + } case REORDER_BUFFER_CHANGE_UPDATE: { HeapTuple oldtuple = change->data.tp.oldtuple ? &change->data.tp.oldtuple->tuple : NULL; + HeapTuple newtuple = &change->data.tp.newtuple->tuple; + + /* Publish as root relation change if requested. */ + if (OidIsValid(relentry->replicate_as_relid)) + { + Assert(relentry->pubasroot); + Assert(relation->rd_rel->relispartition); + relation = RelationIdGetRelation(relentry->replicate_as_relid); + /* Convert tuples if needed. */ + if (relentry->map) + { + oldtuple = execute_attr_map_tuple(oldtuple, relentry->map); + newtuple = execute_attr_map_tuple(newtuple, relentry->map); + } + } OutputPluginPrepareWrite(ctx, true); - logicalrep_write_update(ctx->out, relation, oldtuple, - &change->data.tp.newtuple->tuple); + logicalrep_write_update(ctx->out, relation, oldtuple, newtuple); OutputPluginWrite(ctx, true); break; } case REORDER_BUFFER_CHANGE_DELETE: if (change->data.tp.oldtuple) { + HeapTuple oldtuple = &change->data.tp.oldtuple->tuple; + + /* Publish as root relation change if requested. */ + if (OidIsValid(relentry->replicate_as_relid)) + { + Assert(relentry->pubasroot); + Assert(relation->rd_rel->relispartition); + relation = RelationIdGetRelation(relentry->replicate_as_relid); + /* Convert tuple if needed. */ + if (relentry->map) + oldtuple = execute_attr_map_tuple(oldtuple, relentry->map); + } + OutputPluginPrepareWrite(ctx, true); - logicalrep_write_delete(ctx->out, relation, - &change->data.tp.oldtuple->tuple); + logicalrep_write_delete(ctx->out, relation, oldtuple); OutputPluginWrite(ctx, true); } else @@ -413,9 +506,10 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, /* * Don't send partitioned tables, because partitions would be - * sent instead. + * sent instead, unless user specified to send the former. */ - if (relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + if (relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE && + !relentry->pubasroot) continue; relids[nrelids++] = relid; @@ -540,7 +634,8 @@ init_rel_sync_cache(MemoryContext cachectx) * This looks up publications that given relation is directly or indirectly * part of (latter if it's really the relation's ancestor that is part of a * publication) and fills up the found entry with the information about - * which operations to publish. + * which operations to publish and whether to use an ancestor's schema + * when publishing. */ static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Oid relid) @@ -562,8 +657,10 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) /* Not found means schema wasn't sent */ if (!found || !entry->replicate_valid) { - List *pubids = GetRelationPublications(relid); + List *published_rels = NIL; + List *pubids = GetRelationPublications(relid, &published_rels); ListCell *lc; + Oid ancestor = InvalidOid; /* Reload publications if needed before use. */ if (!publications_valid) @@ -588,13 +685,42 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) foreach(lc, data->publications) { Publication *pub = lfirst(lc); + bool publish = false; + + if (pub->alltables) + { + publish = true; + if (pub->pubasroot && get_rel_relispartition(relid)) + ancestor = llast_oid(get_partition_ancestors(relid)); + } + + if (!publish) + { + ListCell *lc1, + *lc2; + + forboth(lc1, pubids, lc2, published_rels) + { + Oid pubid = lfirst_oid(lc1); + Oid pub_relid = lfirst_oid(lc2); + if (pubid == pub->oid) + { + publish = true; + if (pub->pubasroot && pub_relid != relid) + ancestor = pub_relid; + break; + } + } + } - if (pub->alltables || list_member_oid(pubids, pub->oid)) + if (publish) { entry->pubactions.pubinsert |= pub->pubactions.pubinsert; entry->pubactions.pubupdate |= pub->pubactions.pubupdate; entry->pubactions.pubdelete |= pub->pubactions.pubdelete; - entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; + if (!OidIsValid(ancestor)) + entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; + entry->pubasroot = pub->pubasroot; } if (entry->pubactions.pubinsert && entry->pubactions.pubupdate && @@ -604,6 +730,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) list_free(pubids); + entry->replicate_as_relid = ancestor; entry->replicate_valid = true; } diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index df025a5a30..cf5736b311 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -43,6 +43,7 @@ #include "catalog/catalog.h" #include "catalog/indexing.h" #include "catalog/namespace.h" +#include "catalog/partition.h" #include "catalog/pg_am.h" #include "catalog/pg_amproc.h" #include "catalog/pg_attrdef.h" @@ -5138,7 +5139,7 @@ GetRelationPublicationActions(Relation relation) sizeof(PublicationActions)); /* Fetch the publication membership info. */ - puboids = GetRelationPublications(RelationGetRelid(relation)); + puboids = GetRelationPublications(RelationGetRelid(relation), NULL); puboids = list_concat_unique_oid(puboids, GetAllTablesPublications()); foreach(lc, puboids) @@ -5157,7 +5158,9 @@ GetRelationPublicationActions(Relation relation) pubactions->pubinsert |= pubform->pubinsert; pubactions->pubupdate |= pubform->pubupdate; pubactions->pubdelete |= pubform->pubdelete; - pubactions->pubtruncate |= pubform->pubtruncate; + if (!pubform->pubasroot || + !is_leaf_partition(RelationGetRelid(relation))) + pubactions->pubtruncate |= pubform->pubtruncate; ReleaseSysCache(tup); diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index dc33c20048..bdbd1f823b 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -3780,6 +3780,7 @@ getPublications(Archive *fout) int i_pubupdate; int i_pubdelete; int i_pubtruncate; + int i_pubasroot; int i, ntups; @@ -3791,11 +3792,18 @@ getPublications(Archive *fout) resetPQExpBuffer(query); /* Get the publications. */ - if (fout->remoteVersion >= 110000) + if (fout->remoteVersion >= 130000) + appendPQExpBuffer(query, + "SELECT p.tableoid, p.oid, p.pubname, " + "(%s p.pubowner) AS rolname, " + "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubasroot " + "FROM pg_publication p", + username_subquery); + else if (fout->remoteVersion >= 110000) appendPQExpBuffer(query, "SELECT p.tableoid, p.oid, p.pubname, " "(%s p.pubowner) AS rolname, " - "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate " + "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false as pubasroot " "FROM pg_publication p", username_subquery); else @@ -3819,6 +3827,7 @@ getPublications(Archive *fout) i_pubupdate = PQfnumber(res, "pubupdate"); i_pubdelete = PQfnumber(res, "pubdelete"); i_pubtruncate = PQfnumber(res, "pubtruncate"); + i_pubasroot = PQfnumber(res, "pubasroot"); pubinfo = pg_malloc(ntups * sizeof(PublicationInfo)); @@ -3841,6 +3850,8 @@ getPublications(Archive *fout) (strcmp(PQgetvalue(res, i, i_pubdelete), "t") == 0); pubinfo[i].pubtruncate = (strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0); + pubinfo[i].pubasroot = + (strcmp(PQgetvalue(res, i, i_pubasroot), "t") == 0); if (strlen(pubinfo[i].rolname) == 0) pg_log_warning("owner of publication \"%s\" appears to be invalid", @@ -3917,7 +3928,12 @@ dumpPublication(Archive *fout, PublicationInfo *pubinfo) first = false; } - appendPQExpBufferStr(query, "');\n"); + appendPQExpBufferStr(query, "'"); + + if (pubinfo->pubasroot) + appendPQExpBufferStr(query, ", publish_using_root_schema = true"); + + appendPQExpBufferStr(query, ");\n"); ArchiveEntry(fout, pubinfo->dobj.catId, pubinfo->dobj.dumpId, ARCHIVE_OPTS(.tag = pubinfo->dobj.name, diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 21004e5078..90e47dd1f3 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -600,6 +600,7 @@ typedef struct _PublicationInfo bool pubupdate; bool pubdelete; bool pubtruncate; + bool pubasroot; } PublicationInfo; /* diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index f3c7eb96fa..3f6ce713af 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -5706,7 +5706,7 @@ listPublications(const char *pattern) PQExpBufferData buf; PGresult *res; printQueryOpt myopt = pset.popt; - static const bool translate_columns[] = {false, false, false, false, false, false, false}; + static const bool translate_columns[] = {false, false, false, false, false, false, false, false}; if (pset.sversion < 100000) { @@ -5737,6 +5737,10 @@ listPublications(const char *pattern) appendPQExpBuffer(&buf, ",\n pubtruncate AS \"%s\"", gettext_noop("Truncates")); + if (pset.sversion >= 130000) + appendPQExpBuffer(&buf, + ",\n pubasroot AS \"%s\"", + gettext_noop("Publishes Using Root Schema")); appendPQExpBufferStr(&buf, "\nFROM pg_catalog.pg_publication\n"); @@ -5778,6 +5782,7 @@ describePublications(const char *pattern) int i; PGresult *res; bool has_pubtruncate; + bool has_pubasroot; if (pset.sversion < 100000) { @@ -5790,6 +5795,7 @@ describePublications(const char *pattern) } has_pubtruncate = (pset.sversion >= 110000); + has_pubasroot = (pset.sversion >= 130000); initPQExpBuffer(&buf); @@ -5800,6 +5806,9 @@ describePublications(const char *pattern) if (has_pubtruncate) appendPQExpBufferStr(&buf, ", pubtruncate"); + if (has_pubasroot) + appendPQExpBufferStr(&buf, + ", pubasroot"); appendPQExpBufferStr(&buf, "\nFROM pg_catalog.pg_publication\n"); @@ -5849,6 +5858,8 @@ describePublications(const char *pattern) if (has_pubtruncate) ncols++; + if (has_pubasroot) + ncols++; initPQExpBuffer(&title); printfPQExpBuffer(&title, _("Publication %s"), pubname); @@ -5861,6 +5872,8 @@ describePublications(const char *pattern) printTableAddHeader(&cont, gettext_noop("Deletes"), true, align); if (has_pubtruncate) printTableAddHeader(&cont, gettext_noop("Truncates"), true, align); + if (has_pubasroot) + printTableAddHeader(&cont, gettext_noop("Publishes Using Root Schema"), true, align); printTableAddCell(&cont, PQgetvalue(res, i, 2), false, false); printTableAddCell(&cont, PQgetvalue(res, i, 3), false, false); @@ -5869,6 +5882,8 @@ describePublications(const char *pattern) printTableAddCell(&cont, PQgetvalue(res, i, 6), false, false); if (has_pubtruncate) printTableAddCell(&cont, PQgetvalue(res, i, 7), false, false); + if (has_pubasroot) + printTableAddCell(&cont, PQgetvalue(res, i, 8), false, false); if (!puballtables) { diff --git a/src/include/catalog/partition.h b/src/include/catalog/partition.h index 27873aff6e..c6c19119ca 100644 --- a/src/include/catalog/partition.h +++ b/src/include/catalog/partition.h @@ -21,6 +21,7 @@ extern Oid get_partition_parent(Oid relid); extern List *get_partition_ancestors(Oid relid); +extern bool is_leaf_partition(Oid relid); extern Oid index_get_partition(Relation partition, Oid indexId); extern List *map_partition_varattnos(List *expr, int fromrel_varno, Relation to_rel, Relation from_rel); diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 04a8b87e78..ea4210c1c2 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -52,6 +52,8 @@ CATALOG(pg_publication,6104,PublicationRelationId) /* true if truncates are published */ bool pubtruncate; + /* true if partition changes are published using root schema */ + bool pubasroot; } FormData_pg_publication; /* ---------------- @@ -74,15 +76,16 @@ typedef struct Publication Oid oid; char *name; bool alltables; + bool pubasroot; PublicationActions pubactions; } Publication; extern Publication *GetPublication(Oid pubid); extern Publication *GetPublicationByName(const char *pubname, bool missing_ok); -extern List *GetRelationPublications(Oid relid); +extern List *GetRelationPublications(Oid relid, List **published_rels); extern List *GetPublicationRelations(Oid pubid, bool include_partitions); extern List *GetAllTablesPublications(void); -extern List *GetAllTablesPublicationRelations(void); +extern List *GetAllTablesPublicationRelations(bool pubasroot); extern bool is_publishable_relation(Relation rel); extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel, diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index e3fabe70f9..da22ca3c6a 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -25,21 +25,23 @@ CREATE PUBLICATION testpub_xxx WITH (foo); ERROR: unrecognized publication parameter: "foo" CREATE PUBLICATION testpub_xxx WITH (publish = 'cluster, vacuum'); ERROR: unrecognized "publish" value: "cluster" +CREATE PUBLICATION testpub_xxx WITH (publish_using_root_schema = 'true', publish_using_root_schema = '0'); +ERROR: conflicting or redundant options \dRp - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates ---------------------+--------------------------+------------+---------+---------+---------+----------- - testpib_ins_trunct | regress_publication_user | f | t | f | f | f - testpub_default | regress_publication_user | f | f | t | f | f + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Publishes Using Root Schema +--------------------+--------------------------+------------+---------+---------+---------+-----------+----------------------------- + testpib_ins_trunct | regress_publication_user | f | t | f | f | f | f + testpub_default | regress_publication_user | f | f | t | f | f | f (2 rows) ALTER PUBLICATION testpub_default SET (publish = 'insert, update, delete'); \dRp - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates ---------------------+--------------------------+------------+---------+---------+---------+----------- - testpib_ins_trunct | regress_publication_user | f | t | f | f | f - testpub_default | regress_publication_user | f | t | t | t | f + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Publishes Using Root Schema +--------------------+--------------------------+------------+---------+---------+---------+-----------+----------------------------- + testpib_ins_trunct | regress_publication_user | f | t | f | f | f | f + testpub_default | regress_publication_user | f | t | t | t | f | f (2 rows) --- adding tables @@ -83,10 +85,10 @@ Publications: "testpub_foralltables" \dRp+ testpub_foralltables - Publication testpub_foralltables - Owner | All tables | Inserts | Updates | Deletes | Truncates ---------------------------+------------+---------+---------+---------+----------- - regress_publication_user | t | t | t | f | f + Publication testpub_foralltables + Owner | All tables | Inserts | Updates | Deletes | Truncates | Publishes Using Root Schema +--------------------------+------------+---------+---------+---------+-----------+----------------------------- + regress_publication_user | t | t | t | f | f | f (1 row) DROP TABLE testpub_tbl2; @@ -98,19 +100,19 @@ CREATE PUBLICATION testpub3 FOR TABLE testpub_tbl3; CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl3; RESET client_min_messages; \dRp+ testpub3 - Publication testpub3 - Owner | All tables | Inserts | Updates | Deletes | Truncates ---------------------------+------------+---------+---------+---------+----------- - regress_publication_user | f | t | t | t | t + Publication testpub3 + Owner | All tables | Inserts | Updates | Deletes | Truncates | Publishes Using Root Schema +--------------------------+------------+---------+---------+---------+-----------+----------------------------- + regress_publication_user | f | t | t | t | t | f Tables: "public.testpub_tbl3" "public.testpub_tbl3a" \dRp+ testpub4 - Publication testpub4 - Owner | All tables | Inserts | Updates | Deletes | Truncates ---------------------------+------------+---------+---------+---------+----------- - regress_publication_user | f | t | t | t | t + Publication testpub4 + Owner | All tables | Inserts | Updates | Deletes | Truncates | Publishes Using Root Schema +--------------------------+------------+---------+---------+---------+-----------+----------------------------- + regress_publication_user | f | t | t | t | t | f Tables: "public.testpub_tbl3" @@ -124,10 +126,19 @@ RESET client_min_messages; CREATE TABLE testpub_parted1 PARTITION OF testpub_parted FOR VALUES IN (1); ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted; \dRp+ testpub_forparted - Publication testpub_forparted - Owner | All tables | Inserts | Updates | Deletes | Truncates ---------------------------+------------+---------+---------+---------+----------- - regress_publication_user | f | t | t | t | t + Publication testpub_forparted + Owner | All tables | Inserts | Updates | Deletes | Truncates | Publishes Using Root Schema +--------------------------+------------+---------+---------+---------+-----------+----------------------------- + regress_publication_user | f | t | t | t | t | f +Tables: + "public.testpub_parted" + +ALTER PUBLICATION testpub_forparted SET (publish_using_root_schema = true); +\dRp+ testpub_forparted + Publication testpub_forparted + Owner | All tables | Inserts | Updates | Deletes | Truncates | Publishes Using Root Schema +--------------------------+------------+---------+---------+---------+-----------+----------------------------- + regress_publication_user | f | t | t | t | t | t Tables: "public.testpub_parted" @@ -146,10 +157,10 @@ ERROR: relation "testpub_tbl1" is already member of publication "testpub_fortbl CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1; ERROR: publication "testpub_fortbl" already exists \dRp+ testpub_fortbl - Publication testpub_fortbl - Owner | All tables | Inserts | Updates | Deletes | Truncates ---------------------------+------------+---------+---------+---------+----------- - regress_publication_user | f | t | t | t | t + Publication testpub_fortbl + Owner | All tables | Inserts | Updates | Deletes | Truncates | Publishes Using Root Schema +--------------------------+------------+---------+---------+---------+-----------+----------------------------- + regress_publication_user | f | t | t | t | t | f Tables: "pub_test.testpub_nopk" "public.testpub_tbl1" @@ -187,10 +198,10 @@ Publications: "testpub_fortbl" \dRp+ testpub_default - Publication testpub_default - Owner | All tables | Inserts | Updates | Deletes | Truncates ---------------------------+------------+---------+---------+---------+----------- - regress_publication_user | f | t | t | t | f + Publication testpub_default + Owner | All tables | Inserts | Updates | Deletes | Truncates | Publishes Using Root Schema +--------------------------+------------+---------+---------+---------+-----------+----------------------------- + regress_publication_user | f | t | t | t | f | f Tables: "pub_test.testpub_nopk" "public.testpub_tbl1" @@ -234,10 +245,10 @@ DROP TABLE testpub_parted; DROP VIEW testpub_view; DROP TABLE testpub_tbl1; \dRp+ testpub_default - Publication testpub_default - Owner | All tables | Inserts | Updates | Deletes | Truncates ---------------------------+------------+---------+---------+---------+----------- - regress_publication_user | f | t | t | t | f + Publication testpub_default + Owner | All tables | Inserts | Updates | Deletes | Truncates | Publishes Using Root Schema +--------------------------+------------+---------+---------+---------+-----------+----------------------------- + regress_publication_user | f | t | t | t | f | f (1 row) -- fail - must be owner of publication @@ -247,20 +258,20 @@ ERROR: must be owner of publication testpub_default RESET ROLE; ALTER PUBLICATION testpub_default RENAME TO testpub_foo; \dRp testpub_foo - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates --------------+--------------------------+------------+---------+---------+---------+----------- - testpub_foo | regress_publication_user | f | t | t | t | f + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Publishes Using Root Schema +-------------+--------------------------+------------+---------+---------+---------+-----------+----------------------------- + testpub_foo | regress_publication_user | f | t | t | t | f | f (1 row) -- rename back to keep the rest simple ALTER PUBLICATION testpub_foo RENAME TO testpub_default; ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2; \dRp testpub_default - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates ------------------+---------------------------+------------+---------+---------+---------+----------- - testpub_default | regress_publication_user2 | f | t | t | t | f + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Publishes Using Root Schema +-----------------+---------------------------+------------+---------+---------+---------+-----------+----------------------------- + testpub_default | regress_publication_user2 | f | t | t | t | f | f (1 row) DROP PUBLICATION testpub_default; diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index b79a3f8f8f..7ddca1b974 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -23,6 +23,7 @@ ALTER PUBLICATION testpub_default SET (publish = update); -- error cases CREATE PUBLICATION testpub_xxx WITH (foo); CREATE PUBLICATION testpub_xxx WITH (publish = 'cluster, vacuum'); +CREATE PUBLICATION testpub_xxx WITH (publish_using_root_schema = 'true', publish_using_root_schema = '0'); \dRp @@ -77,6 +78,8 @@ RESET client_min_messages; CREATE TABLE testpub_parted1 PARTITION OF testpub_parted FOR VALUES IN (1); ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted; \dRp+ testpub_forparted +ALTER PUBLICATION testpub_forparted SET (publish_using_root_schema = true); +\dRp+ testpub_forparted DROP PUBLICATION testpub_forparted; -- fail - view diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl index 1ec487154b..6cb484aded 100644 --- a/src/test/subscription/t/013_partition.pl +++ b/src/test/subscription/t/013_partition.pl @@ -3,7 +3,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 15; +use Test::More tests => 34; # setup @@ -25,7 +25,11 @@ my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; $node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub1"); $node_publisher->safe_psql('postgres', - "CREATE PUBLICATION pub_all FOR ALL TABLES"); + "CREATE PUBLICATION pub_all FOR ALL TABLES WITH (publish_using_root_schema = true)"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub2"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub3 WITH (publish_using_root_schema = true)"); $node_publisher->safe_psql('postgres', "CREATE TABLE tab1 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)"); $node_publisher->safe_psql('postgres', @@ -34,8 +38,24 @@ $node_publisher->safe_psql('postgres', "ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3)"); $node_publisher->safe_psql('postgres', "CREATE TABLE tab1_2 PARTITION OF tab1 FOR VALUES IN (5, 6)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab2 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab2_1 (b text, a int NOT NULL)"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab2 ATTACH PARTITION tab2_1 FOR VALUES IN (1, 2, 3)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab2_2 PARTITION OF tab2 FOR VALUES IN (5, 6)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab3 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab3_1 PARTITION OF tab3 FOR VALUES IN (1, 2, 3, 5, 6)"); $node_publisher->safe_psql('postgres', "ALTER PUBLICATION pub1 ADD TABLE tab1, tab1_1"); +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION pub2 ADD TABLE tab1_1, tab1_2"); +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION pub3 ADD TABLE tab2, tab3_1"); # subscriber1 $node_subscriber1->safe_psql('postgres', @@ -51,18 +71,42 @@ $node_subscriber1->safe_psql('postgres', "CREATE TABLE tab1_2_1 PARTITION OF tab1_2 FOR VALUES IN (5)"); $node_subscriber1->safe_psql('postgres', "CREATE TABLE tab1_2_2 PARTITION OF tab1_2 FOR VALUES IN (6)"); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab2 (a int PRIMARY KEY, c text DEFAULT 'sub1_tab2', b text) PARTITION BY RANGE (a)"); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab2_1 (c text DEFAULT 'sub1_tab2', b text, a int NOT NULL)"); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab3_1 (c text DEFAULT 'sub1_tab3_1', b text, a int NOT NULL PRIMARY KEY)"); +$node_subscriber1->safe_psql('postgres', + "ALTER TABLE tab2 ATTACH PARTITION tab2_1 FOR VALUES FROM (1) TO (10)"); $node_subscriber1->safe_psql('postgres', "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1"); +$node_subscriber1->safe_psql('postgres', + "CREATE SUBSCRIPTION sub4 CONNECTION '$publisher_connstr' PUBLICATION pub3"); # subscriber 2 $node_subscriber2->safe_psql('postgres', - "CREATE TABLE tab1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1', b text)"); + "CREATE TABLE tab1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1', b text) PARTITION BY HASH (a)"); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1_part1 (b text, c text, a int NOT NULL)"); +$node_subscriber2->safe_psql('postgres', + "ALTER TABLE tab1 ATTACH PARTITION tab1_part1 FOR VALUES WITH (MODULUS 2, REMAINDER 0)"); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1_part2 PARTITION OF tab1 FOR VALUES WITH (MODULUS 2, REMAINDER 1)"); $node_subscriber2->safe_psql('postgres', "CREATE TABLE tab1_1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1_1', b text)"); $node_subscriber2->safe_psql('postgres', "CREATE TABLE tab1_2 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1_2', b text)"); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab2 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab2', b text)"); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab3 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1_1', b text)"); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab3_1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1_2', b text)"); $node_subscriber2->safe_psql('postgres', "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub_all"); +$node_subscriber2->safe_psql('postgres', + "CREATE SUBSCRIPTION sub3 CONNECTION '$publisher_connstr' PUBLICATION pub2"); # Wait for initial sync of all subscriptions my $synced_query = @@ -79,14 +123,28 @@ $node_publisher->safe_psql('postgres', "INSERT INTO tab1_1 (a) VALUES (3)"); $node_publisher->safe_psql('postgres', "INSERT INTO tab1_2 VALUES (5)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab2 VALUES (1), (3), (5)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab3 VALUES (1), (3), (5)"); $node_publisher->wait_for_catchup('sub1'); $node_publisher->wait_for_catchup('sub2'); +$node_publisher->wait_for_catchup('sub3'); +$node_publisher->wait_for_catchup('sub4'); my $result = $node_subscriber1->safe_psql('postgres', "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1"); is($result, qq(sub1_tab1|3|1|5), 'insert into tab1_1, tab1_2 replicated'); +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab2 GROUP BY 1"); +is($result, qq(sub1_tab2|3|1|5), 'insert into tab2 replicated'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab3_1 GROUP BY 1"); +is($result, qq(sub1_tab3_1|3|1|5), 'insert into tab3_1 replicated'); + $result = $node_subscriber2->safe_psql('postgres', "SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1"); is($result, qq(sub2_tab1_1|2|1|3), 'inserts into tab1_1 replicated'); @@ -95,32 +153,68 @@ $result = $node_subscriber2->safe_psql('postgres', "SELECT c, count(*), min(a), max(a) FROM tab1_2 GROUP BY 1"); is($result, qq(sub2_tab1_2|1|5|5), 'inserts into tab1_2 replicated'); +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1"); +is($result, qq(sub2_tab1|3|1|5), 'inserts into tab1 replicated'); + # update (no partition change) $node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 2 WHERE a = 1"); +$node_publisher->safe_psql('postgres', + "UPDATE tab2 SET a = 2 WHERE a = 1"); +$node_publisher->safe_psql('postgres', + "UPDATE tab3 SET a = 2 WHERE a = 1"); $node_publisher->wait_for_catchup('sub1'); $node_publisher->wait_for_catchup('sub2'); +$node_publisher->wait_for_catchup('sub3'); +$node_publisher->wait_for_catchup('sub4'); $result = $node_subscriber1->safe_psql('postgres', "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1"); is($result, qq(sub1_tab1|3|2|5), 'update of tab1_1 replicated'); +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab2 GROUP BY 1"); +is($result, qq(sub1_tab2|3|2|5), 'update of tab2 replicated'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab3_1 GROUP BY 1"); +is($result, qq(sub1_tab3_1|3|2|5), 'update of tab3_1 replicated'); + $result = $node_subscriber2->safe_psql('postgres', "SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1"); is($result, qq(sub2_tab1_1|2|2|3), 'update of tab1_1 replicated'); +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1"); +is($result, qq(sub2_tab1|3|2|5), 'update of tab1 replicated'); + # update (partition changes) $node_publisher->safe_psql('postgres', "UPDATE tab1 SET a = 6 WHERE a = 2"); +$node_publisher->safe_psql('postgres', + "UPDATE tab2 SET a = 6 WHERE a = 2"); +$node_publisher->safe_psql('postgres', + "UPDATE tab3 SET a = 6 WHERE a = 2"); $node_publisher->wait_for_catchup('sub1'); $node_publisher->wait_for_catchup('sub2'); +$node_publisher->wait_for_catchup('sub3'); +$node_publisher->wait_for_catchup('sub4'); $result = $node_subscriber1->safe_psql('postgres', "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1"); is($result, qq(sub1_tab1|3|3|6), 'update of tab1 replicated'); +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab2 GROUP BY 1"); +is($result, qq(sub1_tab2|3|3|6), 'update of tab2 replicated'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab3_1 GROUP BY 1"); +is($result, qq(sub1_tab3_1|3|3|6), 'update of tab3_1 replicated'); + $result = $node_subscriber2->safe_psql('postgres', "SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1"); is($result, qq(sub2_tab1_1|1|3|3), 'delete from tab1_1 replicated'); @@ -129,19 +223,41 @@ $result = $node_subscriber2->safe_psql('postgres', "SELECT c, count(*), min(a), max(a) FROM tab1_2 GROUP BY 1"); is($result, qq(sub2_tab1_2|2|5|6), 'insert into tab1_2 replicated'); +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1"); +is($result, qq(sub2_tab1_1|1|3|3), 'delete from tab1_1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1"); +is($result, qq(sub2_tab1|3|3|6), 'update of tab1 replicated'); + # delete $node_publisher->safe_psql('postgres', "DELETE FROM tab1 WHERE a IN (3, 5)"); $node_publisher->safe_psql('postgres', "DELETE FROM tab1_2"); +$node_publisher->safe_psql('postgres', + "DELETE FROM tab2 WHERE a IN (3, 5)"); +$node_publisher->safe_psql('postgres', + "DELETE FROM tab3 WHERE a IN (3, 5)"); $node_publisher->wait_for_catchup('sub1'); $node_publisher->wait_for_catchup('sub2'); +$node_publisher->wait_for_catchup('sub3'); +$node_publisher->wait_for_catchup('sub4'); $result = $node_subscriber1->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab1"); is($result, qq(0||), 'delete from tab1_1, tab1_2 replicated'); +$result = $node_subscriber1->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab2"); +is($result, qq(1|6|6), 'delete from tab2 replicated'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab3_1"); +is($result, qq(1|6|6), 'delete from tab3_1 replicated'); + $result = $node_subscriber2->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab1_1"); is($result, qq(0||), 'delete from tab1_1 replicated'); @@ -150,34 +266,80 @@ $result = $node_subscriber2->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab1_2"); is($result, qq(0||), 'delete from tab1_2 replicated'); +$result = $node_subscriber2->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1_1"); +is($result, qq(0||), 'delete from tab1_1, tab_2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1"); +is($result, qq(0||), 'delete from tab1 replicated'); + # truncate $node_subscriber1->safe_psql('postgres', "INSERT INTO tab1 VALUES (1), (2), (5)"); +$node_subscriber1->safe_psql('postgres', + "INSERT INTO tab2 VALUES (1), (2), (5)"); +$node_subscriber1->safe_psql('postgres', + "INSERT INTO tab3_1 (a) VALUES (1), (2), (5)"); $node_subscriber2->safe_psql('postgres', "INSERT INTO tab1_2 VALUES (2)"); +$node_subscriber2->safe_psql('postgres', + "INSERT INTO tab1_1 VALUES (1)"); +$node_subscriber2->safe_psql('postgres', + "INSERT INTO tab1 VALUES (1), (2), (5)"); + $node_publisher->safe_psql('postgres', "TRUNCATE tab1_2"); +$node_publisher->safe_psql('postgres', + "TRUNCATE tab2_1"); $node_publisher->wait_for_catchup('sub1'); $node_publisher->wait_for_catchup('sub2'); +$node_publisher->wait_for_catchup('sub3'); +$node_publisher->wait_for_catchup('sub4'); $result = $node_subscriber1->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab1"); is($result, qq(2|1|2), 'truncate of tab1_2 replicated'); +$result = $node_subscriber1->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab2"); +is($result, qq(4|1|6), 'truncate of tab2_2 NOT replicated'); + $result = $node_subscriber2->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab1_2"); is($result, qq(0||), 'truncate of tab1_2 replicated'); +$node_subscriber2->safe_psql('postgres', + "DROP SUBSCRIPTION sub3"); +$node_subscriber2->safe_psql('postgres', + "INSERT INTO tab1_2 VALUES (2)"); $node_publisher->safe_psql('postgres', "TRUNCATE tab1"); +$node_publisher->safe_psql('postgres', + "TRUNCATE tab2"); +$node_publisher->safe_psql('postgres', + "TRUNCATE tab3"); $node_publisher->wait_for_catchup('sub1'); $node_publisher->wait_for_catchup('sub2'); +$node_publisher->wait_for_catchup('sub4'); $result = $node_subscriber1->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab1"); is($result, qq(0||), 'truncate of tab1_1 replicated'); $result = $node_subscriber2->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab1"); -is($result, qq(0||), 'truncate of tab1_1 replicated'); +is($result, qq(0||), 'truncate of tab1 replicated'); +$result = $node_subscriber2->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1_1"); +is($result, qq(1|1|1), 'tab1_1 unchanged'); +$result = $node_subscriber1->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab2"); +is($result, qq(0||), 'truncate of tab2 replicated'); +$result = $node_subscriber1->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab3_1"); +is($result, qq(0||), 'truncate of tab3_1 replicated'); +$result = $node_subscriber2->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1_2"); +is($result, qq(1|2|2), 'tab1_2 unchanged'); -- 2.16.5