diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl index 561dcd33c3..c3c0e718c8 100644 --- a/src/test/recovery/t/035_standby_logical_decoding.pl +++ b/src/test/recovery/t/035_standby_logical_decoding.pl @@ -8,14 +8,18 @@ use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; -my ($stdin, $stdout, $stderr, $cascading_stdout, $cascading_stderr, $ret, $handle, $slot); +my ($stdin, $stdout, $stderr, $cascading_stdout, $cascading_stderr, $subscriber_stdin, $subscriber_stdout, $subscriber_stderr, $ret, $handle, $slot); my $node_primary = PostgreSQL::Test::Cluster->new('primary'); my $node_standby = PostgreSQL::Test::Cluster->new('standby'); my $node_cascading_standby = PostgreSQL::Test::Cluster->new('cascading_standby'); +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); my $default_timeout = $PostgreSQL::Test::Utils::timeout_default; +my $psql_timeout = + IPC::Run::timer(2 * $PostgreSQL::Test::Utils::timeout_default); my $res; + # Name for the physical slot on primary my $primary_slotname = 'primary_physical'; my $standby_physical_slotname = 'standby_physical'; @@ -263,6 +267,7 @@ $node_standby->init_from_backup( has_restoring => 1); $node_standby->append_conf('postgresql.conf', qq[primary_slot_name = '$primary_slotname']); +$node_standby->append_conf('postgresql.conf', 'max_replication_slots = 6'); $node_standby->start; $node_primary->wait_for_replay_catchup($node_standby); $node_standby->safe_psql('testdb', qq[SELECT * FROM pg_create_physical_replication_slot('$standby_physical_slotname');]); @@ -280,6 +285,20 @@ $node_cascading_standby->append_conf('postgresql.conf', $node_cascading_standby->start; $node_standby->wait_for_replay_catchup($node_cascading_standby, $node_primary); +####################### +# Initialize subscriber node +####################### +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->append_conf('postgresql.conf', 'max_replication_slots = 4'); +$node_subscriber->start; + +my %psql_subscriber = ('subscriber_stdin' => '', 'subscriber_stdout' => ''); +$psql_subscriber{run} = + $node_subscriber->background_psql('postgres', \$psql_subscriber{subscriber_stdin}, + \$psql_subscriber{subscriber_stdout}, + $psql_timeout); +$psql_subscriber{subscriber_stdout} = ''; + ################################################## # Test that logical decoding on the standby # behaves correctly. @@ -360,6 +379,43 @@ is( $node_primary->psql( 3, 'replaying logical slot from another database fails'); +################################################## +# Test that we can subscribe on the standby with the publication +# created on the primary. +################################################## + +# Create a table on the primary +$node_primary->safe_psql('postgres', "CREATE TABLE tab_rep (a int primary key)"); + +# Create a table (same structure) on the subscriber node +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_rep (a int primary key)"); + +# Create a publication on the primary +$node_primary->safe_psql('postgres', "CREATE PUBLICATION tap_pub"); +$node_primary->safe_psql('postgres', "ALTER PUBLICATION tap_pub ADD TABLE tab_rep"); + +my $standby_connstr = $node_standby->connstr . ' dbname=postgres'; + +# Subscribe on the standby +$psql_subscriber{stdin} .= qq[ + CREATE SUBSCRIPTION tap_sub CONNECTION '$standby_connstr' PUBLICATION tap_pub; + ]; +$psql_subscriber{run}->pump_nb; +$node_primary->safe_psql('postgres', 'SELECT pg_log_standby_snapshot()'); +$psql_subscriber{run}->finish; + +$node_primary->safe_psql('postgres', qq[INSERT INTO tab_rep select generate_series(1,10);]); +$node_primary->wait_for_replay_catchup($node_standby); + +# Wait for sync to finish +$node_subscriber->wait_for_subscription_sync($node_standby, 'tap_sub'); + +$result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep"); +is($result, qq(10), + 'check replicated inserts after subscription on standby'); + +$node_subscriber->stop; + ################################################## # Recovery conflict: Invalidate conflicting slots, including in-use slots # Scenario 1: hot_standby_feedback off and vacuum FULL @@ -402,7 +458,6 @@ check_slots_conflicting_status(1); ################################################## # Verify that invalidated logical slots do not lead to retaining WAL ################################################## -# XXXXX TODO ################################################## # Recovery conflict: Invalidate conflicting slots, including in-use slots