From 1269c02d09e9cd7aceee54f97ad903acac7bbb41 Mon Sep 17 00:00:00 2001 From: wangw Date: Wed, 20 Jul 2022 09:47:14 +0800 Subject: [PATCH v19] Fix data replicated twice when specifying publish_via_partition_root option. If there are two publications - one of them publishing a parent table (using publish_via_partition_root = true) and the other is publishing one of the parent's child tables - then subscribing to both publications from one subscription results in the same initial child data being copied twice. It should only be copied once. To fix this, we exclude the partition table whose ancestor belongs to specified publications when getting the table list from publisher. --- src/backend/commands/subscriptioncmds.c | 45 ++++++++++++++++++---- src/test/subscription/t/013_partition.pl | 21 +++++++--- src/test/subscription/t/028_row_filter.pl | 13 ++++++- src/test/subscription/t/031_column_list.pl | 9 ++++- 4 files changed, 72 insertions(+), 16 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 83cb67e7a5..ac9013fe24 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1761,25 +1761,56 @@ static List * fetch_table_list(WalReceiverConn *wrconn, List *publications) { WalRcvExecResult *res; - StringInfoData cmd; + StringInfoData cmd, + pub_names; TupleTableSlot *slot; Oid tableRow[3] = {TEXTOID, TEXTOID, NAMEARRAYOID}; List *tablelist = NIL; bool check_columnlist = (walrcv_server_version(wrconn) >= 150000); + initStringInfo(&pub_names); + get_publications_str(publications, &pub_names, true); + initStringInfo(&cmd); - appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename \n"); + + /* + * Get the list of tables from publisher, the partition table whose + * ancestor is also in this list will be ignored, otherwise the initial + * date in the partition table would be replicated twice. + */ + appendStringInfoString(&cmd, "WITH pub_tabs AS(\n" + " SELECT DISTINCT N.nspname, C.oid, C.relname, C.relispartition\n"); /* Get column lists for each relation if the publisher supports it */ if (check_columnlist) - appendStringInfoString(&cmd, ", t.attnames\n"); + appendStringInfoString(&cmd, ", ( SELECT array_agg(a.attname ORDER BY a.attnum)\n" + " FROM pg_attribute a\n" + " WHERE a.attrelid = GPT.relid AND a.attnum > 0 AND\n" + " NOT a.attisdropped AND\n" + " (a.attnum = ANY(GPT.attrs) OR GPT.attrs IS NULL)\n" + " ) AS attnames\n"); + + appendStringInfo(&cmd, " FROM pg_publication P,\n" + " LATERAL pg_get_publication_tables(P.pubname) GPT,\n" + " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n" + " WHERE C.oid = GPT.relid AND P.pubname IN ( %s )\n" + ")\n" + "SELECT DISTINCT pub_tabs.nspname, pub_tabs.relname\n", + pub_names.data); + + /* Get column lists for each relation if the publisher supports it */ + if (check_columnlist) + appendStringInfoString(&cmd, ", pub_tabs.attnames\n"); + + appendStringInfoString(&cmd, "FROM pub_tabs\n" + " WHERE (pub_tabs.relispartition IS FALSE\n" + " OR NOT EXISTS (SELECT 1 FROM pg_partition_ancestors(pub_tabs.oid) as PA\n" + " WHERE PA.relid IN (SELECT pub_tabs.oid FROM pub_tabs)\n" + " AND PA.relid != pub_tabs.oid))\n"); - appendStringInfoString(&cmd, "FROM pg_catalog.pg_publication_tables t\n" - " WHERE t.pubname IN ("); - get_publications_str(publications, &cmd, true); - appendStringInfoChar(&cmd, ')'); res = walrcv_exec(wrconn, cmd.data, check_columnlist ? 3 : 2, tableRow); + pfree(pub_names.data); pfree(cmd.data); if (res->status != WALRCV_OK_TUPLES) diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl index 8b33e4e7ae..821fc74706 100644 --- a/src/test/subscription/t/013_partition.pl +++ b/src/test/subscription/t/013_partition.pl @@ -409,10 +409,10 @@ $node_publisher->safe_psql('postgres', $node_publisher->safe_psql('postgres', "CREATE TABLE tab4 (a int PRIMARY KEY) PARTITION BY LIST (a)"); $node_publisher->safe_psql('postgres', - "CREATE TABLE tab4_1 PARTITION OF tab4 FOR VALUES IN (0, 1) PARTITION BY LIST (a)" + "CREATE TABLE tab4_1 PARTITION OF tab4 FOR VALUES IN (-1, 0, 1) PARTITION BY LIST (a)" ); $node_publisher->safe_psql('postgres', - "CREATE TABLE tab4_1_1 PARTITION OF tab4_1 FOR VALUES IN (0, 1)"); + "CREATE TABLE tab4_1_1 PARTITION OF tab4_1 FOR VALUES IN (-1, 0, 1)"); $node_publisher->safe_psql('postgres', "ALTER PUBLICATION pub_all SET (publish_via_partition_root = true)"); @@ -424,13 +424,17 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub_viaroot FOR TABLE tab2, tab2_1, tab3_1 WITH (publish_via_partition_root = true)" ); -# for tab4, we publish changes through the "middle" partitioned table +# If we subscribe only to pub_lower_level, changes for tab4 will be published +# through the "middle" partition table. However, since we will be subscribing +# to both pub_lower_level and pub_all (see subscription sub2 below), we will +# publish changes via the root table (tab4). $node_publisher->safe_psql('postgres', "CREATE PUBLICATION pub_lower_level FOR TABLE tab4_1 WITH (publish_via_partition_root = true)" ); # prepare data for the initial sync $node_publisher->safe_psql('postgres', "INSERT INTO tab2 VALUES (1)"); +$node_publisher->safe_psql('postgres', "INSERT INTO tab4 VALUES (-1)"); # subscriber 1 $node_subscriber1->safe_psql('postgres', "DROP SUBSCRIPTION sub1"); @@ -479,6 +483,7 @@ $node_subscriber2->safe_psql('postgres', "CREATE TABLE tab4 (a int PRIMARY KEY)"); $node_subscriber2->safe_psql('postgres', "CREATE TABLE tab4_1 (a int PRIMARY KEY)"); + # Publication that sub2 points to now publishes via root, so must update # subscription target relations. We set the list of publications so that # the FOR ALL TABLES publication is second (the list order matters). @@ -492,6 +497,10 @@ $node_subscriber2->wait_for_subscription_sync; # check that data is synced correctly $result = $node_subscriber1->safe_psql('postgres', "SELECT c, a FROM tab2"); is($result, qq(sub1_tab2|1), 'initial data synced for pub_viaroot'); +$result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab4 ORDER BY 1"); +is($result, qq(-1), 'initial data synced for pub_lower_level and pub_all'); +$result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab4_1 ORDER BY 1"); +is($result, qq(), 'initial data synced for pub_lower_level and pub_all'); # insert $node_publisher->safe_psql('postgres', "INSERT INTO tab1 VALUES (1), (0)"); @@ -548,7 +557,8 @@ sub2_tab3|5), 'inserts into tab3 replicated'); # maps to the tab4 relation on subscriber. $result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab4 ORDER BY 1"); -is($result, qq(0), 'inserts into tab4 replicated'); +is( $result, qq(-1 +0), 'inserts into tab4 replicated'); $result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab4_1 ORDER BY 1"); @@ -574,7 +584,8 @@ $node_publisher->wait_for_catchup('sub2'); # maps to the tab4 relation on subscriber. $result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab4 ORDER BY 1"); -is( $result, qq(0 +is( $result, qq(-1 +0 1), 'inserts into tab4 replicated'); $result = diff --git a/src/test/subscription/t/028_row_filter.pl b/src/test/subscription/t/028_row_filter.pl index da52289dde..056ceb8096 100644 --- a/src/test/subscription/t/028_row_filter.pl +++ b/src/test/subscription/t/028_row_filter.pl @@ -386,6 +386,10 @@ $node_publisher->safe_psql('postgres', "INSERT INTO tab_rowfilter_child(a, b) VALUES(0,'0'),(30,'30'),(40,'40')" ); +# insert data into partitioned table. +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rowfilter_viaroot_part(a) VALUES(13), (17)"); + $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_1, tap_pub_2, tap_pub_3, tap_pub_4a, tap_pub_4b, tap_pub_5a, tap_pub_5b, tap_pub_toast, tap_pub_inherits, tap_pub_viaroot_2, tap_pub_viaroot_1" ); @@ -707,13 +711,18 @@ is($result, qq(t|1), 'check replicated rows to tab_rowfilter_toast'); # the row filter for the top-level ancestor: # # tab_rowfilter_viaroot_part filter is: (a > 15) +# - INSERT (13) NO, 13 < 15 # - INSERT (14) NO, 14 < 15 # - INSERT (15) NO, 15 = 15 # - INSERT (16) YES, 16 > 15 +# - INSERT (17) YES, 17 > 15 $result = $node_subscriber->safe_psql('postgres', - "SELECT a FROM tab_rowfilter_viaroot_part"); -is($result, qq(16), 'check replicated rows to tab_rowfilter_viaroot_part'); + "SELECT a FROM tab_rowfilter_viaroot_part ORDER BY 1"); +is($result, qq(16 +17), + 'check replicated rows to tab_rowfilter_viaroot_part' +); # Check there is no data in tab_rowfilter_viaroot_part_1 because rows are # replicated via the top most parent table tab_rowfilter_viaroot_part diff --git a/src/test/subscription/t/031_column_list.pl b/src/test/subscription/t/031_column_list.pl index 2ca120f7a4..6b119d9c40 100644 --- a/src/test/subscription/t/031_column_list.pl +++ b/src/test/subscription/t/031_column_list.pl @@ -959,16 +959,21 @@ $node_publisher->safe_psql( CREATE TABLE test_root_1 PARTITION OF test_root FOR VALUES FROM (1) TO (10); CREATE TABLE test_root_2 PARTITION OF test_root FOR VALUES FROM (10) TO (20); - CREATE PUBLICATION pub_root_true FOR TABLE test_root (a) WITH (publish_via_partition_root = true); + CREATE PUBLICATION pub_root_true_1 FOR TABLE test_root (a) WITH (publish_via_partition_root = true); + CREATE PUBLICATION pub_root_true_2 FOR TABLE test_root_1 (a, b) WITH (publish_via_partition_root = true); -- initial data INSERT INTO test_root VALUES (1, 2, 3); INSERT INTO test_root VALUES (10, 20, 30); )); +# Subscribe to pub_root_true_1 and pub_root_true_2 at the same time, which +# means that the initial data will be synced once, and only the column list of +# the parent table (test_root) in the publication pub_root_true_1 will be used +# for both table sync and data replication. $node_subscriber->safe_psql( 'postgres', qq( - CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_root_true; + CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub_root_true_1, pub_root_true_2; )); $node_subscriber->wait_for_subscription_sync; -- 2.39.1.windows.1