From 1ffd5327b0950a70ab1b8d65e8946c8a2882ac0a Mon Sep 17 00:00:00 2001 From: amit Date: Thu, 7 Nov 2019 18:19:33 +0900 Subject: [PATCH v11 1/4] Support adding partitioned tables to publication When a partitioned tables is added to a publication, changes of all of its current and future partitions are published via that publication. --- doc/src/sgml/logical-replication.sgml | 17 +-- doc/src/sgml/ref/create_publication.sgml | 20 +++- src/backend/catalog/pg_publication.c | 92 +++++++++++--- src/backend/commands/publicationcmds.c | 23 +++- src/backend/replication/logical/tablesync.c | 1 + src/backend/replication/pgoutput/pgoutput.c | 19 ++- src/bin/pg_dump/pg_dump.c | 8 +- src/include/catalog/pg_publication.h | 15 ++- src/test/regress/expected/publication.out | 34 +++++- src/test/regress/sql/publication.sql | 23 +++- src/test/subscription/t/013_partition.pl | 178 ++++++++++++++++++++++++++++ 11 files changed, 385 insertions(+), 45 deletions(-) create mode 100644 src/test/subscription/t/013_partition.pl diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index f657d1d06e..8bd7c9c8ac 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -402,13 +402,16 @@ - Replication is only possible from base tables to base tables. That is, - the tables on the publication and on the subscription side must be normal - tables, not views, materialized views, partition root tables, or foreign - tables. In the case of partitions, you can therefore replicate a - partition hierarchy one-to-one, but you cannot currently replicate to a - differently partitioned setup. Attempts to replicate tables other than - base tables will result in an error. + Replication is only supported by tables, partitioned or not, although a + given table must either be partitioned on both servers or not partitioned + at all. Also, when replicating between partitioned tables, the actual + replication occurs between leaf partitions, so partitions on the two + servers must match one-to-one. + + + + Attempts to replicate other types of relations such as views, materialized + views, or foreign tables, will result in an error. diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index 99f87ca393..a304f9b8c3 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -68,15 +68,23 @@ CREATE PUBLICATION name that table is added to the publication. If ONLY is not specified, the table and all its descendant tables (if any) are added. Optionally, * can be specified after the table name to - explicitly indicate that descendant tables are included. + explicitly indicate that descendant tables are included. However, adding + a partitioned table to a publication never explicitly adds its partitions, + because partitions are implicitly published due to the partitioned table + being added to the publication. - Only persistent base tables can be part of a publication. Temporary - tables, unlogged tables, foreign tables, materialized views, regular - views, and partitioned tables cannot be part of a publication. To - replicate a partitioned table, add the individual partitions to the - publication. + Only persistent base tables and partitioned tables can be part of a + publication. Temporary tables, unlogged tables, foreign tables, + materialized views, regular views cannot be part of a publication. + + + + When a partitioned table is added to a publication, all of its existing + and future partitions are also implicitly considered to be part of the + publication. So, even operations that are performed directly on a + partition are also published via its ancestors' publications. diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index c5eea7af3f..ea13cced79 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -24,8 +24,10 @@ #include "catalog/index.h" #include "catalog/indexing.h" #include "catalog/namespace.h" +#include "catalog/partition.h" #include "catalog/objectaccess.h" #include "catalog/objectaddress.h" +#include "catalog/pg_inherits.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_type.h" @@ -40,6 +42,8 @@ #include "utils/rel.h" #include "utils/syscache.h" +static List *get_rel_publications(Oid relid); + /* * Check if relation can be in given publication and throws appropriate * error if not. @@ -47,17 +51,9 @@ static void check_publication_add_relation(Relation targetrel) { - /* Give more specific error for partitioned tables */ - if (RelationGetForm(targetrel)->relkind == RELKIND_PARTITIONED_TABLE) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("\"%s\" is a partitioned table", - RelationGetRelationName(targetrel)), - errdetail("Adding partitioned tables to publications is not supported."), - errhint("You can add the table partitions individually."))); - - /* Must be table */ - if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION) + /* Must be a regular or partitioned table */ + if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION && + RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("\"%s\" is not a table", @@ -103,7 +99,8 @@ check_publication_add_relation(Relation targetrel) static bool is_publishable_class(Oid relid, Form_pg_class reltuple) { - return reltuple->relkind == RELKIND_RELATION && + return (reltuple->relkind == RELKIND_RELATION || + reltuple->relkind == RELKIND_PARTITIONED_TABLE) && !IsCatalogRelationOid(relid) && reltuple->relpersistence == RELPERSISTENCE_PERMANENT && relid >= FirstNormalObjectId; @@ -165,6 +162,10 @@ publication_add_relation(Oid pubid, Relation targetrel, * Check for duplicates. Note that this does not really prevent * duplicates, it's here just to provide nicer error message in common * case. The real protection is the unique key on the catalog. + * + * We give special messages for when a partition is found to be implicitly + * published via an ancestor and when a partitioned tables's partitions + * are found to be published on their own. */ if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid), ObjectIdGetDatum(pubid))) @@ -221,10 +222,35 @@ publication_add_relation(Oid pubid, Relation targetrel, /* - * Gets list of publication oids for a relation oid. + * Gets list of publication oids for a relation, plus those of ancestors, + * if any, if the relation is a partition. */ List * GetRelationPublications(Oid relid) +{ + List *result = NIL; + + result = get_rel_publications(relid); + if (get_rel_relispartition(relid)) + { + List *ancestors = get_partition_ancestors(relid); + ListCell *lc; + + foreach(lc, ancestors) + { + Oid ancestor = lfirst_oid(lc); + List *ancestor_pubs = get_rel_publications(ancestor); + + result = list_concat(result, ancestor_pubs); + } + } + + return result; +} + +/* Workhorse of GetRelationPublications() */ +static List * +get_rel_publications(Oid relid) { List *result = NIL; CatCList *pubrellist; @@ -251,9 +277,12 @@ GetRelationPublications(Oid relid) * * This should only be used for normal publications, the FOR ALL TABLES * should use GetAllTablesPublicationRelations(). + * + * See catalog/pg_publication.h for the values that are appropriate for + * 'pub_partopt'. */ List * -GetPublicationRelations(Oid pubid) +GetPublicationRelations(Oid pubid, int pub_partopt) { List *result; Relation pubrelsrel; @@ -279,7 +308,31 @@ GetPublicationRelations(Oid pubid) pubrel = (Form_pg_publication_rel) GETSTRUCT(tup); - result = lappend_oid(result, pubrel->prrelid); + if (get_rel_relkind(pubrel->prrelid) == RELKIND_PARTITIONED_TABLE && + pub_partopt != PUBLICATION_PART_ROOT) + { + List *all_parts = find_all_inheritors(pubrel->prrelid, NoLock, + NULL); + + if (pub_partopt == PUBLICATION_PART_ALL) + result = list_concat(result, all_parts); + else if (pub_partopt == PUBLICATION_PART_LEAF) + { + ListCell *lc; + + foreach(lc, all_parts) + { + Oid partOid = lfirst_oid(lc); + + if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE) + result = lappend_oid(result, partOid); + } + } + else + Assert(false); + } + else + result = lappend_oid(result, pubrel->prrelid); } systable_endscan(scan); @@ -480,10 +533,17 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); publication = GetPublicationByName(pubname, false); + + /* + * Publications support partitioned tables, although all changes are + * replicated using leaf partition identity and schema, so we only + * need those. + */ if (publication->alltables) tables = GetAllTablesPublicationRelations(); else - tables = GetPublicationRelations(publication->oid); + tables = GetPublicationRelations(publication->oid, + PUBLICATION_PART_LEAF); funcctx->user_fctx = (void *) tables; MemoryContextSwitchTo(oldcontext); diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index f96cb42adc..23b9e1a5ae 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -299,7 +299,13 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel, } else { - List *relids = GetPublicationRelations(pubform->oid); + /* + * For any partitioned tables contained in the publication, we must + * invalidate all partitions contained in the respective partition + * trees, not just those explicitly mentioned in the publication. + */ + List *relids = GetPublicationRelations(pubform->oid, + PUBLICATION_PART_ALL); /* * We don't want to send too many individual messages, at some point @@ -356,7 +362,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, PublicationDropTables(pubid, rels, false); else /* DEFELEM_SET */ { - List *oldrelids = GetPublicationRelations(pubid); + List *oldrelids = GetPublicationRelations(pubid, + PUBLICATION_PART_ROOT); List *delrels = NIL; ListCell *oldlc; @@ -498,7 +505,8 @@ RemovePublicationRelById(Oid proid) /* * Open relations specified by a RangeVar list. - * The returned tables are locked in ShareUpdateExclusiveLock mode. + * The returned tables are locked in ShareUpdateExclusiveLock mode in order to + * add them to a publication. */ static List * OpenTableList(List *tables) @@ -539,8 +547,13 @@ OpenTableList(List *tables) rels = lappend(rels, rel); relids = lappend_oid(relids, myrelid); - /* Add children of this rel, if requested */ - if (recurse) + /* + * Add children of this rel, if requested, so that they too are added + * to the publication. A partitioned table can't have any inheritance + * children other than its partitions, which need not be explicitly + * added to the publication. + */ + if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) { List *children; ListCell *child; diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index f8183cd488..98825f01e9 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -761,6 +761,7 @@ copy_table(Relation rel) /* Map the publisher relation to local one. */ relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock); Assert(rel == relmapentry->localrel); + Assert(relmapentry->localrel->rd_rel->relkind == RELKIND_RELATION); /* Start copy on the publisher. */ initStringInfo(&cmd); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 752508213a..d6b9cbe1bd 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -50,7 +50,12 @@ static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); -/* Entry in the map used to remember which relation schemas we sent. */ +/* + * Entry in the map used to remember which relation schemas we sent. + * + * For partitions, 'pubactions' considers not only the table's own + * publications, but also those of all of its ancestors. + */ typedef struct RelationSyncEntry { Oid relid; /* relation oid */ @@ -406,6 +411,13 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (!relentry->pubactions.pubtruncate) continue; + /* + * Don't send partitioned tables, because partitions would be + * sent instead. + */ + if (relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + continue; + relids[nrelids++] = relid; maybe_send_schema(ctx, relation, relentry); } @@ -524,6 +536,11 @@ init_rel_sync_cache(MemoryContext cachectx) /* * Find or create entry in the relation schema cache. + * + * This looks up publications that given relation is directly or indirectly + * part of (latter if it's really the relation's ancestor that is part of a + * publication) and fills up the found entry with the information about + * which operations to publish. */ static RelationSyncEntry * get_rel_sync_entry(PGOutputData *data, Oid relid) diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 799b6988b7..dc33c20048 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -3969,8 +3969,12 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) { TableInfo *tbinfo = &tblinfo[i]; - /* Only plain tables can be aded to publications. */ - if (tbinfo->relkind != RELKIND_RELATION) + /* + * Only regular and partitioned tables can be added to + * publications. + */ + if (tbinfo->relkind != RELKIND_RELATION && + tbinfo->relkind != RELKIND_PARTITIONED_TABLE) continue; /* diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 6cdc2b1197..20d95e5914 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -80,7 +80,20 @@ typedef struct Publication extern Publication *GetPublication(Oid pubid); extern Publication *GetPublicationByName(const char *pubname, bool missing_ok); extern List *GetRelationPublications(Oid relid); -extern List *GetPublicationRelations(Oid pubid); + +/*--------- + * Expected values for pub_partopt parameter of GetRelationPublications(), + * which allows callers to specify which partitions of partitioned tables + * mentioned in the publication they expect to see. + * + * ROOT: only the table explicitly mentioned in the publication + * LEAF: only leaf partitions in given tree + * ALL: all partitions in given tree + */ +#define PUBLICATION_PART_ROOT 0 +#define PUBLICATION_PART_LEAF 1 +#define PUBLICATION_PART_ALL 2 +extern List *GetPublicationRelations(Oid pubid, int pub_partopt); extern List *GetAllTablesPublications(void); extern List *GetAllTablesPublicationRelations(void); diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index feb51e4add..2634d2c1e1 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -116,6 +116,35 @@ Tables: DROP TABLE testpub_tbl3, testpub_tbl3a; DROP PUBLICATION testpub3, testpub4; +-- Tests for partitioned tables +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_forparted; +CREATE PUBLICATION testpub_forparted1; +RESET client_min_messages; +CREATE TABLE testpub_parted1 (LIKE testpub_parted); +ALTER PUBLICATION testpub_forparted1 SET (publish='insert'); +-- works despite missing REPLICA IDENTITY, because updates are not replicated +UPDATE testpub_parted1 SET a = 1; +ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1); +-- only parent is listed as being in publication, not the partition +ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted; +\dRp+ testpub_forparted + Publication testpub_forparted + Owner | All tables | Inserts | Updates | Deletes | Truncates +--------------------------+------------+---------+---------+---------+----------- + regress_publication_user | f | t | t | t | t +Tables: + "public.testpub_parted" + +-- should now fail, because parent's publication replicates updates +UPDATE testpub_parted1 SET a = 1; +ERROR: cannot update table "testpub_parted1" because it does not have a replica identity and publishes updates +HINT: To enable updating the table, set REPLICA IDENTITY using ALTER TABLE. +ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1; +-- works again, because parent's publication is no longer considered +UPDATE testpub_parted1 SET a = 1; +DROP TABLE testpub_parted1; +DROP PUBLICATION testpub_forparted, testpub_forparted1; -- fail - view CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view; ERROR: "testpub_view" is not a table @@ -142,11 +171,6 @@ Tables: ALTER PUBLICATION testpub_default ADD TABLE testpub_view; ERROR: "testpub_view" is not a table DETAIL: Only tables can be added to publications. --- fail - partitioned table -ALTER PUBLICATION testpub_fortbl ADD TABLE testpub_parted; -ERROR: "testpub_parted" is a partitioned table -DETAIL: Adding partitioned tables to publications is not supported. -HINT: You can add the table partitions individually. ALTER PUBLICATION testpub_default ADD TABLE testpub_tbl1; ALTER PUBLICATION testpub_default SET TABLE testpub_tbl1; ALTER PUBLICATION testpub_default ADD TABLE pub_test.testpub_nopk; diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index 5773a755cf..219e04129d 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -69,6 +69,27 @@ RESET client_min_messages; DROP TABLE testpub_tbl3, testpub_tbl3a; DROP PUBLICATION testpub3, testpub4; +-- Tests for partitioned tables +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_forparted; +CREATE PUBLICATION testpub_forparted1; +RESET client_min_messages; +CREATE TABLE testpub_parted1 (LIKE testpub_parted); +ALTER PUBLICATION testpub_forparted1 SET (publish='insert'); +-- works despite missing REPLICA IDENTITY, because updates are not replicated +UPDATE testpub_parted1 SET a = 1; +ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1); +-- only parent is listed as being in publication, not the partition +ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted; +\dRp+ testpub_forparted +-- should now fail, because parent's publication replicates updates +UPDATE testpub_parted1 SET a = 1; +ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1; +-- works again, because parent's publication is no longer considered +UPDATE testpub_parted1 SET a = 1; +DROP TABLE testpub_parted1; +DROP PUBLICATION testpub_forparted, testpub_forparted1; + -- fail - view CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view; SET client_min_messages = 'ERROR'; @@ -83,8 +104,6 @@ CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1; -- fail - view ALTER PUBLICATION testpub_default ADD TABLE testpub_view; --- fail - partitioned table -ALTER PUBLICATION testpub_fortbl ADD TABLE testpub_parted; ALTER PUBLICATION testpub_default ADD TABLE testpub_tbl1; ALTER PUBLICATION testpub_default SET TABLE testpub_tbl1; diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl new file mode 100644 index 0000000000..1fa392b618 --- /dev/null +++ b/src/test/subscription/t/013_partition.pl @@ -0,0 +1,178 @@ +# Test PARTITION +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 15; + +# setup + +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +my $node_subscriber1 = get_new_node('subscriber1'); +$node_subscriber1->init(allows_streaming => 'logical'); +$node_subscriber1->start; + +my $node_subscriber2 = get_new_node('subscriber2'); +$node_subscriber2->init(allows_streaming => 'logical'); +$node_subscriber2->start; + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +# publisher +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub1"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub_all FOR ALL TABLES"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1_1 (b text, a int NOT NULL)"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1_2 PARTITION OF tab1 FOR VALUES IN (5, 6)"); +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION pub1 ADD TABLE tab1, tab1_1"); + +# subscriber1 +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, b text, c text) PARTITION BY LIST (a)"); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab1_1 (b text, c text DEFAULT 'sub1_tab1', a int NOT NULL)"); +$node_subscriber1->safe_psql('postgres', + "ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3, 4)"); +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab1_2 PARTITION OF tab1 (c DEFAULT 'sub1_tab1') FOR VALUES IN (5, 6)"); +$node_subscriber1->safe_psql('postgres', + "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1"); + +# subscriber 2 +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1', b text)"); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1_1 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1_1', b text)"); +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1_2 (a int PRIMARY KEY, c text DEFAULT 'sub2_tab1_2', b text)"); +$node_subscriber2->safe_psql('postgres', + "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub_all"); + +# Wait for initial sync of all subscriptions +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber1->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber2->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# insert +$node_publisher->safe_psql('postgres', + "INSERT INTO tab1 VALUES (1)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab1_1 (a) VALUES (3)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab1_2 VALUES (5)"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +my $result = $node_subscriber1->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1"); +is($result, qq(sub1_tab1|3|1|5), 'insert into tab1_1, tab1_2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1"); +is($result, qq(sub2_tab1_1|2|1|3), 'inserts into tab1_1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab1_2 GROUP BY 1"); +is($result, qq(sub2_tab1_2|1|5|5), 'inserts into tab1_2 replicated'); + +# update (no partition change) +$node_publisher->safe_psql('postgres', + "UPDATE tab1 SET a = 2 WHERE a = 1"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1"); +is($result, qq(sub1_tab1|3|2|5), 'update of tab1_1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1"); +is($result, qq(sub2_tab1_1|2|2|3), 'update of tab1_1 replicated'); + +# update (partition changes) +$node_publisher->safe_psql('postgres', + "UPDATE tab1 SET a = 6 WHERE a = 2"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab1 GROUP BY 1"); +is($result, qq(sub1_tab1|3|3|6), 'update of tab1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab1_1 GROUP BY 1"); +is($result, qq(sub2_tab1_1|1|3|3), 'delete from tab1_1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT c, count(*), min(a), max(a) FROM tab1_2 GROUP BY 1"); +is($result, qq(sub2_tab1_2|2|5|6), 'insert into tab1_2 replicated'); + +# delete +$node_publisher->safe_psql('postgres', + "DELETE FROM tab1 WHERE a IN (3, 5)"); +$node_publisher->safe_psql('postgres', + "DELETE FROM tab1_2"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1"); +is($result, qq(0||), 'delete from tab1_1, tab1_2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1_1"); +is($result, qq(0||), 'delete from tab1_1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1_2"); +is($result, qq(0||), 'delete from tab1_2 replicated'); + +# truncate +$node_subscriber1->safe_psql('postgres', + "INSERT INTO tab1 VALUES (1), (2), (5)"); +$node_subscriber2->safe_psql('postgres', + "INSERT INTO tab1_2 VALUES (2)"); +$node_publisher->safe_psql('postgres', + "TRUNCATE tab1_2"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1"); +is($result, qq(2|1|2), 'truncate of tab1_2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1_2"); +is($result, qq(0||), 'truncate of tab1_2 replicated'); + +$node_publisher->safe_psql('postgres', + "TRUNCATE tab1"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1"); +is($result, qq(0||), 'truncate of tab1_1 replicated'); +$result = $node_subscriber2->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1"); +is($result, qq(0||), 'truncate of tab1_1 replicated'); -- 2.16.5