From 9fd14e817ba569654dc32ef75aadde7300ad1b0f Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Thu, 26 Sep 2019 19:15:35 +0200 Subject: [PATCH 16/17] Add TAP test for streaming vs. DDL --- .../subscription/t/014_stream_through_ddl.pl | 98 +++++++++++++++++++ 1 file changed, 98 insertions(+) create mode 100644 src/test/subscription/t/014_stream_through_ddl.pl diff --git a/src/test/subscription/t/014_stream_through_ddl.pl b/src/test/subscription/t/014_stream_through_ddl.pl new file mode 100644 index 0000000000..b8d78b1972 --- /dev/null +++ b/src/test/subscription/t/014_stream_through_ddl.pl @@ -0,0 +1,98 @@ +# Test streaming of large transaction with DDL, subtransactions and rollbacks. +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 2; + +sub wait_for_caught_up +{ + my ($node, $appname) = @_; + + $node->poll_query_until('postgres', +"SELECT pg_current_wal_lsn() <= replay_lsn FROM pg_stat_replication WHERE application_name = '$appname';" + ) or die "Timed out while waiting for subscriber to catch up"; +} + +# Create publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c INT, d text, e INT)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', +"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming=true)" +); + +wait_for_caught_up($node_publisher, $appname); + +# Also wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d) FROM test_tab"); +is($result, qq(2|0|0), 'check initial data was copied to subscriber'); + + +# large (streamed) transaction with DDL and DML +$node_publisher->safe_psql('postgres', q{ +BEGIN; +SAVEPOINT s1; +INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 1000) s(i); +SAVEPOINT s2; +ALTER TABLE test_tab ADD COLUMN c INT; +INSERT INTO test_tab SELECT i, md5(i::text), i FROM generate_series(1001, 2000) s(i); +SAVEPOINT s3; +ALTER TABLE test_tab ADD COLUMN d text; +SAVEPOINT s4; +SAVEPOINT s5; +INSERT INTO test_tab SELECT i, md5(i::text), i, md5(i::text) FROM generate_series(2001, 3000) s(i); +ALTER TABLE test_tab ADD COLUMN e INT; +INSERT INTO test_tab SELECT i, md5(i::text), i, md5(i::text), i FROM generate_series(3001, 4000) s(i); +SAVEPOINT s10; +ALTER TABLE test_tab DROP d; +INSERT INTO test_tab SELECT i, md5(i::text), i, i FROM generate_series(4001, 5000) s(i); +ALTER TABLE test_tab ADD COLUMN d text; +ROLLBACK TO SAVEPOINT s10; +RELEASE SAVEPOINT s10; +SAVEPOINT s10; +INSERT INTO test_tab SELECT i, md5(i::text), i, md5(i::text), i FROM generate_series(5001, 6000) s(i); +SAVEPOINT s6; +ALTER TABLE test_tab DROP d; +INSERT INTO test_tab SELECT i, md5(i::text), i, i FROM generate_series(6001, 7000) s(i); +SAVEPOINT s7; +ALTER TABLE test_tab ADD COLUMN d text; +INSERT INTO test_tab (a, b, c, d, e) SELECT i, md5(i::text), i, md5(i::text), i FROM generate_series(7001, 8000) s(i); +COMMIT; +}); + +wait_for_caught_up($node_publisher, $appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*), count(a), count(b), count(c), count(d), count(e) FROM test_tab"); +is($result, qq(7000|7000|7000|6000|4000|4000), 'check extra columns contain local defaults'); + +$node_subscriber->stop; +$node_publisher->stop; -- 2.21.0