From 72eb76b32daa384074beaa3b3b1946db8fd154a8 Mon Sep 17 00:00:00 2001 From: amit Date: Thu, 7 Nov 2019 18:19:33 +0900 Subject: [PATCH v9] Support adding partitioned tables to publication --- doc/src/sgml/logical-replication.sgml | 18 +-- doc/src/sgml/ref/create_publication.sgml | 20 +++- src/backend/catalog/pg_publication.c | 164 ++++++++++++++++++++++--- src/backend/commands/publicationcmds.c | 16 ++- src/backend/replication/logical/tablesync.c | 1 + src/backend/replication/pgoutput/pgoutput.c | 19 ++- src/bin/pg_dump/pg_dump.c | 8 +- src/include/catalog/pg_publication.h | 2 +- src/test/regress/expected/publication.out | 30 ++++- src/test/regress/sql/publication.sql | 18 ++- src/test/subscription/t/013_partition.pl | 178 ++++++++++++++++++++++++++++ 11 files changed, 428 insertions(+), 46 deletions(-) create mode 100644 src/test/subscription/t/013_partition.pl diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index f657d1d06e..fa30ac27f7 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -402,13 +402,17 @@ - Replication is only possible from base tables to base tables. That is, - the tables on the publication and on the subscription side must be normal - tables, not views, materialized views, partition root tables, or foreign - tables. In the case of partitions, you can therefore replicate a - partition hierarchy one-to-one, but you cannot currently replicate to a - differently partitioned setup. Attempts to replicate tables other than - base tables will result in an error. + Replication is only supported by regular and partitioned tables, although + the type of the table must match between the two servers, that is, one + cannot replicate from a regular table into a partitioned able or vice + versa. Also, when replicating between partitioned tables, the actual + replication occurs between leaf partitions, so the partitions on the two + servers must match one-to-one. + + + + Attempts to replicate other types of relations such as views, materialized + views, or foreign tables, will result in an error. diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index 99f87ca393..a304f9b8c3 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -68,15 +68,23 @@ CREATE PUBLICATION name that table is added to the publication. If ONLY is not specified, the table and all its descendant tables (if any) are added. Optionally, * can be specified after the table name to - explicitly indicate that descendant tables are included. + explicitly indicate that descendant tables are included. However, adding + a partitioned table to a publication never explicitly adds its partitions, + because partitions are implicitly published due to the partitioned table + being added to the publication. - Only persistent base tables can be part of a publication. Temporary - tables, unlogged tables, foreign tables, materialized views, regular - views, and partitioned tables cannot be part of a publication. To - replicate a partitioned table, add the individual partitions to the - publication. + Only persistent base tables and partitioned tables can be part of a + publication. Temporary tables, unlogged tables, foreign tables, + materialized views, regular views cannot be part of a publication. + + + + When a partitioned table is added to a publication, all of its existing + and future partitions are also implicitly considered to be part of the + publication. So, even operations that are performed directly on a + partition are also published via its ancestors' publications. diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index c5eea7af3f..c05617dec9 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -24,8 +24,10 @@ #include "catalog/index.h" #include "catalog/indexing.h" #include "catalog/namespace.h" +#include "catalog/partition.h" #include "catalog/objectaccess.h" #include "catalog/objectaddress.h" +#include "catalog/pg_inherits.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_type.h" @@ -40,6 +42,8 @@ #include "utils/rel.h" #include "utils/syscache.h" +static List *get_rel_publications(Oid relid); + /* * Check if relation can be in given publication and throws appropriate * error if not. @@ -47,17 +51,9 @@ static void check_publication_add_relation(Relation targetrel) { - /* Give more specific error for partitioned tables */ - if (RelationGetForm(targetrel)->relkind == RELKIND_PARTITIONED_TABLE) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("\"%s\" is a partitioned table", - RelationGetRelationName(targetrel)), - errdetail("Adding partitioned tables to publications is not supported."), - errhint("You can add the table partitions individually."))); - - /* Must be table */ - if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION) + /* Must be a regular or partitioned table */ + if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION && + RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("\"%s\" is not a table", @@ -103,7 +99,8 @@ check_publication_add_relation(Relation targetrel) static bool is_publishable_class(Oid relid, Form_pg_class reltuple) { - return reltuple->relkind == RELKIND_RELATION && + return (reltuple->relkind == RELKIND_RELATION || + reltuple->relkind == RELKIND_PARTITIONED_TABLE) && !IsCatalogRelationOid(relid) && reltuple->relpersistence == RELPERSISTENCE_PERMANENT && relid >= FirstNormalObjectId; @@ -165,6 +162,10 @@ publication_add_relation(Oid pubid, Relation targetrel, * Check for duplicates. Note that this does not really prevent * duplicates, it's here just to provide nicer error message in common * case. The real protection is the unique key on the catalog. + * + * We give special messages for when a partition is found to be implicitly + * published via an ancestor and when a partitioned tables's partitions + * are found to be published on their own. */ if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid), ObjectIdGetDatum(pubid))) @@ -179,6 +180,71 @@ publication_add_relation(Oid pubid, Relation targetrel, errmsg("relation \"%s\" is already member of publication \"%s\"", RelationGetRelationName(targetrel), pub->name))); } + else if (targetrel->rd_rel->relispartition) + { + List *ancestors = get_partition_ancestors(relid); + ListCell *lc; + Oid ancestor; + bool found = false; + + foreach(lc, ancestors) + { + ancestor = lfirst_oid(lc); + if (SearchSysCacheExists2(PUBLICATIONRELMAP, + ObjectIdGetDatum(ancestor), + ObjectIdGetDatum(pubid))) + { + found = true; + break; + } + } + + if (found) + { + Assert(OidIsValid(ancestor)); + table_close(rel, RowExclusiveLock); + + if (if_not_exists) + return InvalidObjectAddress; + + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("relation \"%s\" is already member of publication \"%s\" via ancestor \"%s\"", + RelationGetRelationName(targetrel), pub->name, + get_rel_name(ancestor)))); + } + } + else if (targetrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + { + List *pub_rels = GetPublicationRelations(pubid, true); + List *parts = find_all_inheritors(relid, NoLock, NULL); + ListCell *lc; + Oid partition; + bool found = false; + + foreach(lc, parts) + { + partition = lfirst_oid(lc); + if (list_member_oid(pub_rels, partition)) + { + found = true; + break; + } + } + + if (found) + { + Assert(OidIsValid(partition)); + table_close(rel, RowExclusiveLock); + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("descendent table \"%s\" of \"%s\"is already member of publication \"%s\"", + get_rel_name(partition), + RelationGetRelationName(targetrel), pub->name), + errhint("Remove descendent tables of \"%s\" from publication before adding it to the publication.", + RelationGetRelationName(targetrel)))); + } + } check_publication_add_relation(targetrel); @@ -221,10 +287,35 @@ publication_add_relation(Oid pubid, Relation targetrel, /* - * Gets list of publication oids for a relation oid. + * Gets list of publication oids for a relation, plus those of ancestors, + * if any, if the relation is a partition. */ List * GetRelationPublications(Oid relid) +{ + List *result = NIL; + + result = get_rel_publications(relid); + if (get_rel_relispartition(relid)) + { + List *ancestors = get_partition_ancestors(relid); + ListCell *lc; + + foreach(lc, ancestors) + { + Oid ancestor = lfirst_oid(lc); + List *ancestor_pubs = get_rel_publications(ancestor); + + result = list_concat(result, ancestor_pubs); + } + } + + return result; +} + +/* Workhorse of GetRelationPublications() */ +static List * +get_rel_publications(Oid relid) { List *result = NIL; CatCList *pubrellist; @@ -251,9 +342,14 @@ GetRelationPublications(Oid relid) * * This should only be used for normal publications, the FOR ALL TABLES * should use GetAllTablesPublicationRelations(). + * + * Caller should pass true for 'include_partitions' so that for any + * partitioned tables that are in the publication its partitions are + * included too if the operation to be performed on the returned relations + * expects to see all relations that are affected by the publication. */ List * -GetPublicationRelations(Oid pubid) +GetPublicationRelations(Oid pubid, bool include_partitions) { List *result; Relation pubrelsrel; @@ -278,8 +374,12 @@ GetPublicationRelations(Oid pubid) Form_pg_publication_rel pubrel; pubrel = (Form_pg_publication_rel) GETSTRUCT(tup); - - result = lappend_oid(result, pubrel->prrelid); + if (get_rel_relkind(pubrel->prrelid) == RELKIND_PARTITIONED_TABLE && + include_partitions) + result = list_concat(result, find_all_inheritors(pubrel->prrelid, + NoLock, NULL)); + else + result = lappend_oid(result, pubrel->prrelid); } systable_endscan(scan); @@ -480,10 +580,40 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); publication = GetPublicationByName(pubname, false); + + /* + * Publications support partitioned tables, although we need to filter + * them out from the result, because all changes are replicated using + * the leaf partition identity and schema. + */ if (publication->alltables) + { + /* + * GetAllTablesPublicationRelations() only ever returns leaf + * partitions. + */ tables = GetAllTablesPublicationRelations(); + } else - tables = GetPublicationRelations(publication->oid); + { + List *all_tables; + ListCell *lc; + + /* + * GetPublicationRelations() includes partitioned tables in its + * result which is required by other internal users of that + * function, which must be filtered out. + */ + all_tables = GetPublicationRelations(publication->oid, true); + tables = NIL; + foreach(lc, all_tables) + { + Oid relid = lfirst_oid(lc); + + if (get_rel_relkind(relid) != RELKIND_PARTITIONED_TABLE) + tables = lappend_oid(tables, relid); + } + } funcctx->user_fctx = (void *) tables; MemoryContextSwitchTo(oldcontext); diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index f96cb42adc..d4b43e7662 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -299,7 +299,7 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel, } else { - List *relids = GetPublicationRelations(pubform->oid); + List *relids = GetPublicationRelations(pubform->oid, true); /* * We don't want to send too many individual messages, at some point @@ -356,7 +356,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, PublicationDropTables(pubid, rels, false); else /* DEFELEM_SET */ { - List *oldrelids = GetPublicationRelations(pubid); + List *oldrelids = GetPublicationRelations(pubid, false); List *delrels = NIL; ListCell *oldlc; @@ -498,7 +498,8 @@ RemovePublicationRelById(Oid proid) /* * Open relations specified by a RangeVar list. - * The returned tables are locked in ShareUpdateExclusiveLock mode. + * The returned tables are locked in ShareUpdateExclusiveLock mode in order to + * add them to a publication. */ static List * OpenTableList(List *tables) @@ -539,8 +540,13 @@ OpenTableList(List *tables) rels = lappend(rels, rel); relids = lappend_oid(relids, myrelid); - /* Add children of this rel, if requested */ - if (recurse) + /* + * Add children of this rel, if requested, so that they too are added + * to the publication. A partitioned table can't have any inheritance + * children other than its partitions, which need not be explicitly + * added to the publication. + */ + if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) { List *children; ListCell *child; diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index f8183cd488..98825f01e9 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -761,6 +761,7 @@ copy_table(Relation rel) /* Map the publisher relation to local one. */ relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock); Assert(rel == relmapentry->localrel); + Assert(relmapentry->localrel->rd_rel->relkind == RELKIND_RELATION); /* Start copy on the publisher. */ initStringInfo(&cmd); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 752508213a..d6b9cbe1bd 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -50,7 +50,12 @@ static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); -/* Entry in the map used to remember which relation schemas we sent. */ +/* + * Entry in the map used to remember which relation schemas we sent. + * + * For partitions, 'pubactions' considers not only the table's own + * publications, but also those of all of its ancestors. + */ typedef struct RelationSyncEntry { Oid relid; /* relation oid */ @@ -406,6 +411,13 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (!relentry->pubactions.pubtruncate) continue; + /* + * Don't send partitioned tables, because partitions would be + * sent instead. + */ + if (relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + continue; + relids[nrelids++] = relid; maybe_send_schema(ctx, relation, relentry); } @@ -524,6 +536,11 @@ init_rel_sync_cache(MemoryContext cachectx) /* * Find or create entry in the relation schema cache. + * + * This looks up publications that given relation is directly or indirectly + * part of (latter if it's really the relation's ancestor that is part of a + * publication) and fills up the found entry with the information about + * which operations to publish. */ static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Oid relid) diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 799b6988b7..dc33c20048 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -3969,8 +3969,12 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) { TableInfo *tbinfo = &tblinfo[i]; - /* Only plain tables can be aded to publications. */ - if (tbinfo->relkind != RELKIND_RELATION) + /* + * Only regular and partitioned tables can be added to + * publications. + */ + if (tbinfo->relkind != RELKIND_RELATION && + tbinfo->relkind != RELKIND_PARTITIONED_TABLE) continue; /* diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 6cdc2b1197..04a8b87e78 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -80,7 +80,7 @@ typedef struct Publication extern Publication *GetPublication(Oid pubid); extern Publication *GetPublicationByName(const char *pubname, bool missing_ok); extern List *GetRelationPublications(Oid relid); -extern List *GetPublicationRelations(Oid pubid); +extern List *GetPublicationRelations(Oid pubid, bool include_partitions); extern List *GetAllTablesPublications(void); extern List *GetAllTablesPublicationRelations(void); diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index feb51e4add..d1d9b90c50 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -116,6 +116,31 @@ Tables: DROP TABLE testpub_tbl3, testpub_tbl3a; DROP PUBLICATION testpub3, testpub4; +-- Tests for partitioned tables +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_forparted; +RESET client_min_messages; +-- should add only the parent to publication, not the partition +CREATE TABLE testpub_parted1 PARTITION OF testpub_parted FOR VALUES IN (1); +ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted; +\dRp+ testpub_forparted + Publication testpub_forparted + Owner | All tables | Inserts | Updates | Deletes | Truncates +--------------------------+------------+---------+---------+---------+----------- + regress_publication_user | f | t | t | t | t +Tables: + "public.testpub_parted" + +-- fail - can't re-add partition +ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted1; +ERROR: relation "testpub_parted1" is already member of publication "testpub_forparted" via ancestor "testpub_parted" +ALTER PUBLICATION testpub_forparted DROP TABLE testpub_parted; +ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted1; +-- fail - can't re-add partition +ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted; +ERROR: descendent table "testpub_parted1" of "testpub_parted"is already member of publication "testpub_forparted" +HINT: Remove descendent tables of "testpub_parted" from publication before adding it to the publication. +DROP PUBLICATION testpub_forparted; -- fail - view CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view; ERROR: "testpub_view" is not a table @@ -142,11 +167,6 @@ Tables: ALTER PUBLICATION testpub_default ADD TABLE testpub_view; ERROR: "testpub_view" is not a table DETAIL: Only tables can be added to publications. --- fail - partitioned table -ALTER PUBLICATION testpub_fortbl ADD TABLE testpub_parted; -ERROR: "testpub_parted" is a partitioned table -DETAIL: Adding partitioned tables to publications is not supported. -HINT: You can add the table partitions individually. ALTER PUBLICATION testpub_default ADD TABLE testpub_tbl1; ALTER PUBLICATION testpub_default SET TABLE testpub_tbl1; ALTER PUBLICATION testpub_default ADD TABLE pub_test.testpub_nopk; diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index 5773a755cf..7074c08efd 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -69,6 +69,22 @@ RESET client_min_messages; DROP TABLE testpub_tbl3, testpub_tbl3a; DROP PUBLICATION testpub3, testpub4; +-- Tests for partitioned tables +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_forparted; +RESET client_min_messages; +-- should add only the parent to publication, not the partition +CREATE TABLE testpub_parted1 PARTITION OF testpub_parted FOR VALUES IN (1); +ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted; +\dRp+ testpub_forparted +-- fail - can't re-add partition +ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted1; +ALTER PUBLICATION testpub_forparted DROP TABLE testpub_parted; +ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted1; +-- fail - can't re-add partition +ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted; +DROP PUBLICATION testpub_forparted; + -- fail - view CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view; SET client_min_messages = 'ERROR'; @@ -83,8 +99,6 @@ CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1; -- fail - view ALTER PUBLICATION testpub_default ADD TABLE testpub_view; --- fail - partitioned table -ALTER PUBLICATION testpub_fortbl ADD TABLE testpub_parted; ALTER PUBLICATION testpub_default ADD TABLE testpub_tbl1; ALTER PUBLICATION testpub_default SET TABLE testpub_tbl1; diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl new file mode 100644 index 0000000000..1fa392b618 --- /dev/null +++ b/src/test/subscription/t/013_partition.pl @@ -0,0 +1,178 @@ +# Test PARTITION +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 15; + +# setup + +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +my $node_subscriber1 = get_new_node('subscriber1'); +$node_subscriber1->init(allows_streaming => 'logical'); +$node_subscriber1->start; + +my $node_subscriber2 = get_new_node('subscriber2'); +$node_subscriber2->init(allows_streaming => 'logical'); +$node_subscriber2->start; + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +# publisher +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub1"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub_all FOR ALL TABLES"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1_1 (b text, a int NOT NULL)"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1_2 PARTITION OF tab1 FOR VALUES IN (5, 6)"); +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION pub1 ADD TABLE tab1, tab1_1"); + +# subscriber1 +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, b text, c text) PARTITION BY LIST (a)"); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab1_1 (b text, c text DEFAULT 'sub1_tab1', a int NOT NULL)"); +$node_subscriber1->safe_psql('postgres', + "ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3, 4)"); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab1_2 PARTITION OF tab1 (c DEFAULT 'sub1_tab1') FOR VALUES IN (5, 6)"); +$node_subscriber1->safe_psql('postgres', + "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1"); + +# subscriber 2 +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1', b text)"); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1_1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1_1', b text)"); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1_2 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1_2', b text)"); +$node_subscriber2->safe_psql('postgres', + "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub_all"); + +# Wait for initial sync of all subscriptions +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber1->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber2->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# insert +$node_publisher->safe_psql('postgres', + "INSERT INTO tab1 VALUES (1)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab1_1 (a) VALUES (3)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab1_2 VALUES (5)"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +my $result = $node_subscriber1->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1"); +is($result, qq(sub1_tab1|3|1|5), 'insert into tab1_1, tab1_2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1"); +is($result, qq(sub2_tab1_1|2|1|3), 'inserts into tab1_1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab1_2 GROUP BY 1"); +is($result, qq(sub2_tab1_2|1|5|5), 'inserts into tab1_2 replicated'); + +# update (no partition change) +$node_publisher->safe_psql('postgres', + "UPDATE tab1 SET a = 2 WHERE a = 1"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1"); +is($result, qq(sub1_tab1|3|2|5), 'update of tab1_1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1"); +is($result, qq(sub2_tab1_1|2|2|3), 'update of tab1_1 replicated'); + +# update (partition changes) +$node_publisher->safe_psql('postgres', + "UPDATE tab1 SET a = 6 WHERE a = 2"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1"); +is($result, qq(sub1_tab1|3|3|6), 'update of tab1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1"); +is($result, qq(sub2_tab1_1|1|3|3), 'delete from tab1_1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab1_2 GROUP BY 1"); +is($result, qq(sub2_tab1_2|2|5|6), 'insert into tab1_2 replicated'); + +# delete +$node_publisher->safe_psql('postgres', + "DELETE FROM tab1 WHERE a IN (3, 5)"); +$node_publisher->safe_psql('postgres', + "DELETE FROM tab1_2"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1"); +is($result, qq(0||), 'delete from tab1_1, tab1_2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1_1"); +is($result, qq(0||), 'delete from tab1_1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1_2"); +is($result, qq(0||), 'delete from tab1_2 replicated'); + +# truncate +$node_subscriber1->safe_psql('postgres', + "INSERT INTO tab1 VALUES (1), (2), (5)"); +$node_subscriber2->safe_psql('postgres', + "INSERT INTO tab1_2 VALUES (2)"); +$node_publisher->safe_psql('postgres', + "TRUNCATE tab1_2"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1"); +is($result, qq(2|1|2), 'truncate of tab1_2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1_2"); +is($result, qq(0||), 'truncate of tab1_2 replicated'); + +$node_publisher->safe_psql('postgres', + "TRUNCATE tab1"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1"); +is($result, qq(0||), 'truncate of tab1_1 replicated'); +$result = $node_subscriber2->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1"); +is($result, qq(0||), 'truncate of tab1_1 replicated'); -- 2.16.5