From 85494784ca3e37cedcdc714c1f216b229a9e112f Mon Sep 17 00:00:00 2001 From: bdrouvotAWS Date: Tue, 7 Feb 2023 09:04:12 +0000 Subject: [PATCH v49 5/6] New TAP test for logical decoding on standby. In addition to the new TAP test, this commit introduces a new pg_log_standby_snapshot() function. The idea is to be able to take a snapshot of running transactions and write this to WAL without requesting for a (costly) checkpoint. Author: Craig Ringer (in an older version), Amit Khandekar, Bertrand Drouvot Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes Mello --- doc/src/sgml/func.sgml | 15 + src/backend/access/transam/xlogfuncs.c | 32 + src/backend/catalog/system_functions.sql | 2 + src/include/catalog/pg_proc.dat | 3 + src/test/perl/PostgreSQL/Test/Cluster.pm | 37 + src/test/recovery/meson.build | 1 + .../t/034_standby_logical_decoding.pl | 710 ++++++++++++++++++ 7 files changed, 800 insertions(+) 3.0% src/backend/ 3.9% src/test/perl/PostgreSQL/Test/ 89.9% src/test/recovery/t/ diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index e09e289a43..59334dd422 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -26534,6 +26534,21 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset prepared with . + + + + pg_log_standby_snapshot + + pg_log_standby_snapshot () + pg_lsn + + + Take a snapshot of running transactions and write this to WAL without + having to wait bgwriter or checkpointer to log one. This one is useful for + logical decoding on standby for which logical slot creation is hanging + until such a record is replayed on the standby. + + diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c index c07daa874f..481e9a47da 100644 --- a/src/backend/access/transam/xlogfuncs.c +++ b/src/backend/access/transam/xlogfuncs.c @@ -38,6 +38,7 @@ #include "utils/pg_lsn.h" #include "utils/timestamp.h" #include "utils/tuplestore.h" +#include "storage/standby.h" /* * Backup-related variables. @@ -196,6 +197,37 @@ pg_switch_wal(PG_FUNCTION_ARGS) PG_RETURN_LSN(switchpoint); } +/* + * pg_log_standby_snapshot: call LogStandbySnapshot() + * + * Permission checking for this function is managed through the normal + * GRANT system. + */ +Datum +pg_log_standby_snapshot(PG_FUNCTION_ARGS) +{ + XLogRecPtr recptr; + + if (RecoveryInProgress()) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("recovery is in progress"), + errhint("pg_log_standby_snapshot() cannot be executed during recovery."))); + + if (!XLogStandbyInfoActive()) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("wal_level is not in desired state"), + errhint("wal_level has to be >= WAL_LEVEL_REPLICA."))); + + recptr = LogStandbySnapshot(); + + /* + * As a convenience, return the WAL location of the last inserted record + */ + PG_RETURN_LSN(recptr); +} + /* * pg_create_restore_point: a named point for restore * diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql index 83ca893444..b7c65ea37d 100644 --- a/src/backend/catalog/system_functions.sql +++ b/src/backend/catalog/system_functions.sql @@ -644,6 +644,8 @@ REVOKE EXECUTE ON FUNCTION pg_create_restore_point(text) FROM public; REVOKE EXECUTE ON FUNCTION pg_switch_wal() FROM public; +REVOKE EXECUTE ON FUNCTION pg_log_standby_snapshot() FROM public; + REVOKE EXECUTE ON FUNCTION pg_wal_replay_pause() FROM public; REVOKE EXECUTE ON FUNCTION pg_wal_replay_resume() FROM public; diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index c8e11ab710..48d7be075b 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -6384,6 +6384,9 @@ { oid => '2848', descr => 'switch to new wal file', proname => 'pg_switch_wal', provolatile => 'v', prorettype => 'pg_lsn', proargtypes => '', prosrc => 'pg_switch_wal' }, +{ oid => '9658', descr => 'log details of the current snapshot to WAL', + proname => 'pg_log_standby_snapshot', provolatile => 'v', prorettype => 'pg_lsn', + proargtypes => '', prosrc => 'pg_log_standby_snapshot' }, { oid => '3098', descr => 'create a named restore point', proname => 'pg_create_restore_point', provolatile => 'v', prorettype => 'pg_lsn', proargtypes => 'text', diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm b/src/test/perl/PostgreSQL/Test/Cluster.pm index 04921ca3a3..247265d328 100644 --- a/src/test/perl/PostgreSQL/Test/Cluster.pm +++ b/src/test/perl/PostgreSQL/Test/Cluster.pm @@ -3037,6 +3037,43 @@ $SIG{TERM} = $SIG{INT} = sub { =pod +=item $node->create_logical_slot_on_standby(self, primary, slot_name, dbname) + +Create logical replication slot on given standby + +=cut + +sub create_logical_slot_on_standby +{ + my ($self, $primary, $slot_name, $dbname) = @_; + my ($stdout, $stderr); + + my $handle; + + $handle = IPC::Run::start(['pg_recvlogical', '-d', $self->connstr($dbname), '-P', 'test_decoding', '-S', $slot_name, '--create-slot'], '>', \$stdout, '2>', \$stderr); + + # Once slot restart_lsn is created, the standby looks for xl_running_xacts + # WAL record from the restart_lsn onwards. So firstly, wait until the slot + # restart_lsn is evaluated. + + $self->poll_query_until( + 'postgres', qq[ + SELECT restart_lsn IS NOT NULL + FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name' + ]) or die "timed out waiting for logical slot to calculate its restart_lsn"; + + # Now arrange for the xl_running_xacts record for which pg_recvlogical + # is waiting. + $primary->safe_psql('postgres', 'SELECT pg_log_standby_snapshot()'); + + $handle->finish(); + + is($self->slot($slot_name)->{'slot_type'}, 'logical', $slot_name . ' on standby created') + or die "could not create slot" . $slot_name; +} + +=pod + =back =cut diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index 209118a639..eca90c5c8c 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -39,6 +39,7 @@ tests += { 't/031_recovery_conflict.pl', 't/032_relfilenode_reuse.pl', 't/033_replay_tsp_drops.pl', + 't/034_standby_logical_decoding.pl', ], }, } diff --git a/src/test/recovery/t/034_standby_logical_decoding.pl b/src/test/recovery/t/034_standby_logical_decoding.pl new file mode 100644 index 0000000000..cf1277bd1b --- /dev/null +++ b/src/test/recovery/t/034_standby_logical_decoding.pl @@ -0,0 +1,710 @@ +# logical decoding on standby : test logical decoding, +# recovery conflict and standby promotion. + +use strict; +use warnings; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More tests => 67; + +my ($stdin, $stdout, $stderr, $cascading_stdout, $cascading_stderr, $ret, $handle, $slot); + +my $node_primary = PostgreSQL::Test::Cluster->new('primary'); +my $node_standby = PostgreSQL::Test::Cluster->new('standby'); +my $node_cascading_standby = PostgreSQL::Test::Cluster->new('cascading_standby'); +my $default_timeout = $PostgreSQL::Test::Utils::timeout_default; +my $res; + +# Name for the physical slot on primary +my $primary_slotname = 'primary_physical'; +my $standby_physical_slotname = 'standby_physical'; + +# find $pat in logfile of $node after $off-th byte +sub find_in_log +{ + my ($node, $pat, $off) = @_; + + $off = 0 unless defined $off; + my $log = PostgreSQL::Test::Utils::slurp_file($node->logfile); + return 0 if (length($log) <= $off); + + $log = substr($log, $off); + + return $log =~ m/$pat/; +} + +# Fetch xmin columns from slot's pg_replication_slots row, after waiting for +# given boolean condition to be true to ensure we've reached a quiescent state. +sub wait_for_xmins +{ + my ($node, $slotname, $check_expr) = @_; + + $node->poll_query_until( + 'postgres', qq[ + SELECT $check_expr + FROM pg_catalog.pg_replication_slots + WHERE slot_name = '$slotname'; + ]) or die "Timed out waiting for slot xmins to advance"; +} + +# Create the required logical slots on standby. +sub create_logical_slots +{ + my ($node) = @_; + $node->create_logical_slot_on_standby($node_primary, 'inactiveslot', 'testdb'); + $node->create_logical_slot_on_standby($node_primary, 'activeslot', 'testdb'); +} + +# Drop the logical slots on standby. +sub drop_logical_slots +{ + $node_standby->psql('postgres', q[SELECT pg_drop_replication_slot('inactiveslot')]); + $node_standby->psql('postgres', q[SELECT pg_drop_replication_slot('activeslot')]); +} + +# Acquire one of the standby logical slots created by create_logical_slots(). +# In case wait is true we are waiting for an active pid on the 'activeslot' slot. +# If wait is not true it means we are testing a known failure scenario. +sub make_slot_active +{ + my ($node, $wait, $to_stdout, $to_stderr) = @_; + my $slot_user_handle; + + $slot_user_handle = IPC::Run::start(['pg_recvlogical', '-d', $node->connstr('testdb'), '-S', 'activeslot', '-o', 'include-xids=0', '-o', 'skip-empty-xacts=1', '--no-loop', '--start', '-f', '-'], '>', $to_stdout, '2>', $to_stderr); + + if ($wait) + { + # make sure activeslot is in use + $node->poll_query_until('testdb', + "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'activeslot' AND active_pid IS NOT NULL)" + ) or die "slot never became active"; + } + return $slot_user_handle; +} + +# Check pg_recvlogical stderr +sub check_pg_recvlogical_stderr +{ + my ($slot_user_handle, $check_stderr) = @_; + my $return; + + # our client should've terminated in response to the walsender error + $slot_user_handle->finish; + $return = $?; + cmp_ok($return, "!=", 0, "pg_recvlogical exited non-zero"); + if ($return) { + like($stderr, qr/$check_stderr/, 'slot has been invalidated'); + } + + return 0; +} + +# Check if all the slots on standby are dropped. These include the 'activeslot' +# that was acquired by make_slot_active(), and the non-active 'inactiveslot'. +sub check_slots_dropped +{ + my ($slot_user_handle) = @_; + + is($node_standby->slot('inactiveslot')->{'slot_type'}, '', 'inactiveslot on standby dropped'); + is($node_standby->slot('activeslot')->{'slot_type'}, '', 'activeslot on standby dropped'); + + check_pg_recvlogical_stderr($slot_user_handle, "conflict with recovery"); +} + +# Check if all the slots on standby are dropped. These include the 'activeslot' +# that was acquired by make_slot_active(), and the non-active 'inactiveslot'. +sub change_hot_standby_feedback_and_wait_for_xmins +{ + my ($hsf, $invalidated) = @_; + + $node_standby->append_conf('postgresql.conf',qq[ + hot_standby_feedback = $hsf + ]); + + $node_standby->reload; + + if ($hsf && $invalidated) + { + # With hot_standby_feedback on, xmin should advance, + # but catalog_xmin should still remain NULL since there is no logical slot. + wait_for_xmins($node_primary, $primary_slotname, + "xmin IS NOT NULL AND catalog_xmin IS NULL"); + } + elsif ($hsf) + { + # With hot_standby_feedback on, xmin and catalog_xmin should advance. + wait_for_xmins($node_primary, $primary_slotname, + "xmin IS NOT NULL AND catalog_xmin IS NOT NULL"); + } + else + { + # Both should be NULL since hs_feedback is off + wait_for_xmins($node_primary, $primary_slotname, + "xmin IS NULL AND catalog_xmin IS NULL"); + + } +} + +# Check conflicting status in pg_replication_slots. +sub check_slots_conflicting_status +{ + my ($conflicting) = @_; + + if ($conflicting) + { + $res = $node_standby->safe_psql( + 'postgres', qq( + select bool_and(conflicting) from pg_replication_slots;)); + + is($res, 't', + "Logical slots are reported as conflicting"); + } + else + { + $res = $node_standby->safe_psql( + 'postgres', qq( + select bool_or(conflicting) from pg_replication_slots;)); + + is($res, 'f', + "Logical slots are reported as non conflicting"); + } +} + +######################## +# Initialize primary node +######################## + +$node_primary->init(allows_streaming => 1, has_archiving => 1); +$node_primary->append_conf('postgresql.conf', q{ +wal_level = 'logical' +max_replication_slots = 4 +max_wal_senders = 4 +log_min_messages = 'debug2' +log_error_verbosity = verbose +}); +$node_primary->dump_info; +$node_primary->start; + +$node_primary->psql('postgres', q[CREATE DATABASE testdb]); + +$node_primary->safe_psql('testdb', qq[SELECT * FROM pg_create_physical_replication_slot('$primary_slotname');]); + +# Check conflicting is NULL for physical slot +$res = $node_primary->safe_psql( + 'postgres', qq[ + SELECT conflicting is null FROM pg_replication_slots where slot_name = '$primary_slotname';]); + +is($res, 't', + "Physical slot reports conflicting as NULL"); + +my $backup_name = 'b1'; +$node_primary->backup($backup_name); + +####################### +# Initialize standby node +####################### + +$node_standby->init_from_backup( + $node_primary, $backup_name, + has_streaming => 1, + has_restoring => 1); +$node_standby->append_conf('postgresql.conf', + qq[primary_slot_name = '$primary_slotname']); +$node_standby->start; +$node_primary->wait_for_catchup($node_standby, 'replay', $node_primary->lsn('flush')); +$node_standby->safe_psql('testdb', qq[SELECT * FROM pg_create_physical_replication_slot('$standby_physical_slotname');]); + +####################### +# Initialize cascading standby node +####################### +$node_standby->backup($backup_name); +$node_cascading_standby->init_from_backup( + $node_standby, $backup_name, + has_streaming => 1, + has_restoring => 1); +$node_cascading_standby->append_conf('postgresql.conf', + qq[primary_slot_name = '$standby_physical_slotname']); +$node_cascading_standby->start; +$node_standby->wait_for_catchup($node_cascading_standby, 'replay', $node_primary->lsn('flush')); + +################################################## +# Test that logical decoding on the standby +# behaves correctly. +################################################## + +# create the logical slots +create_logical_slots($node_standby); + +$node_primary->safe_psql('testdb', qq[CREATE TABLE decoding_test(x integer, y text);]); +$node_primary->safe_psql('testdb', qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;]); + +$node_primary->wait_for_catchup($node_standby, 'replay', $node_primary->lsn('flush')); + +my $result = $node_standby->safe_psql('testdb', + qq[SELECT pg_logical_slot_get_changes('activeslot', NULL, NULL);]); + +# test if basic decoding works +is(scalar(my @foobar = split /^/m, $result), + 14, 'Decoding produced 14 rows (2 BEGIN/COMMIT and 10 rows)'); + +# Insert some rows and verify that we get the same results from pg_recvlogical +# and the SQL interface. +$node_primary->safe_psql('testdb', + qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;] +); + +my $expected = q{BEGIN +table public.decoding_test: INSERT: x[integer]:1 y[text]:'1' +table public.decoding_test: INSERT: x[integer]:2 y[text]:'2' +table public.decoding_test: INSERT: x[integer]:3 y[text]:'3' +table public.decoding_test: INSERT: x[integer]:4 y[text]:'4' +COMMIT}; + +$node_primary->wait_for_catchup($node_standby, 'replay', $node_primary->lsn('flush')); + +my $stdout_sql = $node_standby->safe_psql('testdb', + qq[SELECT data FROM pg_logical_slot_peek_changes('activeslot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');] +); + +is($stdout_sql, $expected, 'got expected output from SQL decoding session'); + +my $endpos = $node_standby->safe_psql('testdb', + "SELECT lsn FROM pg_logical_slot_peek_changes('activeslot', NULL, NULL) ORDER BY lsn DESC LIMIT 1;" +); + +# Insert some rows after $endpos, which we won't read. +$node_primary->safe_psql('testdb', + qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(5,50) s;] +); + +$node_primary->wait_for_catchup($node_standby, 'replay', $node_primary->lsn('flush')); + +my $stdout_recv = $node_standby->pg_recvlogical_upto( + 'testdb', 'activeslot', $endpos, $default_timeout, + 'include-xids' => '0', + 'skip-empty-xacts' => '1'); +chomp($stdout_recv); +is($stdout_recv, $expected, + 'got same expected output from pg_recvlogical decoding session'); + +$node_standby->poll_query_until('testdb', + "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'activeslot' AND active_pid IS NULL)" +) or die "slot never became inactive"; + +$stdout_recv = $node_standby->pg_recvlogical_upto( + 'testdb', 'activeslot', $endpos, $default_timeout, + 'include-xids' => '0', + 'skip-empty-xacts' => '1'); +chomp($stdout_recv); +is($stdout_recv, '', 'pg_recvlogical acknowledged changes'); + +$node_primary->safe_psql('postgres', 'CREATE DATABASE otherdb'); + +is( $node_primary->psql( + 'otherdb', + "SELECT lsn FROM pg_logical_slot_peek_changes('activeslot', NULL, NULL) ORDER BY lsn DESC LIMIT 1;" + ), + 3, + 'replaying logical slot from another database fails'); + +# drop the logical slots +drop_logical_slots(); + +################################################## +# Recovery conflict: Invalidate conflicting slots, including in-use slots +# Scenario 1: hot_standby_feedback off and vacuum FULL +################################################## + +# create the logical slots +create_logical_slots($node_standby); + +# One way to produce recovery conflict is to create/drop a relation and +# launch a vacuum full on pg_class with hot_standby_feedback turned off on +# the standby. +change_hot_standby_feedback_and_wait_for_xmins(0,1); + +$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr); + +# This should trigger the conflict +$node_primary->safe_psql('testdb', qq[CREATE TABLE conflict_test(x integer, y text);]); +$node_primary->safe_psql('testdb', qq[DROP TABLE conflict_test;]); +$node_primary->safe_psql('testdb', 'VACUUM full pg_class;'); + +$node_primary->wait_for_catchup($node_standby, 'replay', $node_primary->lsn('flush')); + +# message should be issued +ok( find_in_log( + $node_standby, + "invalidating slot \"inactiveslot\" because it conflicts with recovery"), + 'inactiveslot slot invalidation is logged with vacuum FULL on pg_class'); + +ok( find_in_log( + $node_standby, + "invalidating slot \"activeslot\" because it conflicts with recovery"), + 'activeslot slot invalidation is logged with vacuum FULL on pg_class'); + +# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been updated +ok( $node_standby->poll_query_until( + 'postgres', + "select (confl_active_logicalslot = 1) from pg_stat_database_conflicts where datname = 'testdb'", 't'), + 'confl_active_logicalslot updated') or die "Timed out waiting confl_active_logicalslot to be updated"; + +# Verify slots are reported as conflicting in pg_replication_slots +check_slots_conflicting_status(1); + +$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr); + +# We are not able to read from the slot as it has been invalidated +check_pg_recvlogical_stderr($handle, "cannot read from logical replication slot \"activeslot\""); + +# Turn hot_standby_feedback back on +change_hot_standby_feedback_and_wait_for_xmins(1,1); + +################################################## +# Recovery conflict: Invalidate conflicting slots, including in-use slots +# Scenario 2: conflict due to row removal with hot_standby_feedback off. +################################################## + +# get the position to search from in the standby logfile +my $logstart = -s $node_standby->logfile; + +# drop the logical slots +drop_logical_slots(); + +# create the logical slots +create_logical_slots($node_standby); + +# One way to produce recovery conflict is to create/drop a relation and +# launch a vacuum on pg_class with hot_standby_feedback turned off on the standby. +change_hot_standby_feedback_and_wait_for_xmins(0,1); + +$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr); + +# This should trigger the conflict +$node_primary->safe_psql('testdb', qq[CREATE TABLE conflict_test(x integer, y text);]); +$node_primary->safe_psql('testdb', qq[DROP TABLE conflict_test;]); +$node_primary->safe_psql('testdb', 'VACUUM pg_class;'); + +$node_primary->wait_for_catchup($node_standby, 'replay', $node_primary->lsn('flush')); + +# message should be issued +ok( find_in_log( + $node_standby, + "invalidating slot \"inactiveslot\" because it conflicts with recovery", $logstart), + 'inactiveslot slot invalidation is logged with vacuum on pg_class'); + +ok( find_in_log( + $node_standby, + "invalidating slot \"activeslot\" because it conflicts with recovery", $logstart), + 'activeslot slot invalidation is logged with vacuum on pg_class'); + +# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been updated +# we now expect 2 conflicts reported as the counter persist across reloads +ok( $node_standby->poll_query_until( + 'postgres', + "select (confl_active_logicalslot = 2) from pg_stat_database_conflicts where datname = 'testdb'", 't'), + 'confl_active_logicalslot updated') or die "Timed out waiting confl_active_logicalslot to be updated"; + +# Verify slots are reported as conflicting in pg_replication_slots +check_slots_conflicting_status(1); + +$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr); + +# We are not able to read from the slot as it has been invalidated +check_pg_recvlogical_stderr($handle, "cannot read from logical replication slot \"activeslot\""); + +################################################## +# Recovery conflict: Same as Scenario 2 but on a non catalog table +# Scenario 3: No conflict expected. +################################################## + +# get the position to search from in the standby logfile +$logstart = -s $node_standby->logfile; + +# drop the logical slots +drop_logical_slots(); + +# create the logical slots +create_logical_slots($node_standby); + +# put hot standby feedback to off +change_hot_standby_feedback_and_wait_for_xmins(0,1); + +$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr); + +# This should not trigger a conflict +$node_primary->safe_psql('testdb', qq[CREATE TABLE conflict_test(x integer, y text);]); +$node_primary->safe_psql('testdb', qq[INSERT INTO conflict_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;]); +$node_primary->safe_psql('testdb', qq[UPDATE conflict_test set x=1, y=1;]); +$node_primary->safe_psql('testdb', 'VACUUM conflict_test;'); + +$node_primary->wait_for_catchup($node_standby, 'replay', $node_primary->lsn('flush')); + +# message should not be issued +ok( !find_in_log( + $node_standby, + "invalidating slot \"inactiveslot\" because it conflicts with recovery", $logstart), + 'inactiveslot slot invalidation is not logged with vacuum on conflict_test'); + +ok( !find_in_log( + $node_standby, + "invalidating slot \"activeslot\" because it conflicts with recovery", $logstart), + 'activeslot slot invalidation is not logged with vacuum on conflict_test'); + +# Verify that pg_stat_database_conflicts.confl_active_logicalslot has not been updated +# we now still expect 2 conflicts reported as the counter persist across reloads +ok( $node_standby->poll_query_until( + 'postgres', + "select (confl_active_logicalslot = 2) from pg_stat_database_conflicts where datname = 'testdb'", 't'), + 'confl_active_logicalslot not updated') or die "Timed out waiting confl_active_logicalslot to be updated"; + +# Verify slots are reported as non conflicting in pg_replication_slots +check_slots_conflicting_status(0); + +# Turn hot_standby_feedback back on +change_hot_standby_feedback_and_wait_for_xmins(1, 0); + +################################################## +# Recovery conflict: Invalidate conflicting slots, including in-use slots +# Scenario 4: conflict due to on-access pruning. +################################################## + +# get the position to search from in the standby logfile +$logstart = -s $node_standby->logfile; + +# drop the logical slots +drop_logical_slots(); + +# create the logical slots +create_logical_slots($node_standby); + +# One way to produce recovery conflict is to trigger an on-access pruning +# on a relation marked as user_catalog_table. +change_hot_standby_feedback_and_wait_for_xmins(0,0); + +$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr); + +# This should trigger the conflict +$node_primary->safe_psql('testdb', qq[CREATE TABLE prun(id integer, s char(2000)) WITH (fillfactor = 75, user_catalog_table = true);]); +$node_primary->safe_psql('testdb', qq[INSERT INTO prun VALUES (1, 'A');]); +$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'B';]); +$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'C';]); +$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'D';]); +$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'E';]); + +$node_primary->wait_for_catchup($node_standby, 'replay', $node_primary->lsn('flush')); + +# message should be issued +ok( find_in_log( + $node_standby, + "invalidating slot \"inactiveslot\" because it conflicts with recovery", $logstart), + 'inactiveslot slot invalidation is logged with on-access pruning'); + +ok( find_in_log( + $node_standby, + "invalidating slot \"activeslot\" because it conflicts with recovery", $logstart), + 'activeslot slot invalidation is logged with on-access pruning'); + +# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been updated +# we now expect 3 conflicts reported as the counter persist across reloads +ok( $node_standby->poll_query_until( + 'postgres', + "select (confl_active_logicalslot = 3) from pg_stat_database_conflicts where datname = 'testdb'", 't'), + 'confl_active_logicalslot updated') or die "Timed out waiting confl_active_logicalslot to be updated"; + +# Verify slots are reported as conflicting in pg_replication_slots +check_slots_conflicting_status(1); + +$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr); + +# We are not able to read from the slot as it has been invalidated +check_pg_recvlogical_stderr($handle, "cannot read from logical replication slot \"activeslot\""); + +# Turn hot_standby_feedback back on +change_hot_standby_feedback_and_wait_for_xmins(1, 1); + +################################################## +# Recovery conflict: Invalidate conflicting slots, including in-use slots +# Scenario 5: incorrect wal_level on primary. +################################################## + +# get the position to search from in the standby logfile +$logstart = -s $node_standby->logfile; + +# drop the logical slots +drop_logical_slots(); + +# create the logical slots +create_logical_slots($node_standby); + +$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr); + +# Make primary wal_level replica. This will trigger slot conflict. +$node_primary->append_conf('postgresql.conf',q[ +wal_level = 'replica' +]); +$node_primary->restart; + +$node_primary->wait_for_catchup($node_standby, 'replay', $node_primary->lsn('flush')); + +# message should be issued +ok( find_in_log( + $node_standby, + "invalidating slot \"inactiveslot\" because it conflicts with recovery", $logstart), + 'inactiveslot slot invalidation is logged due to wal_level'); + +ok( find_in_log( + $node_standby, + "invalidating slot \"activeslot\" because it conflicts with recovery", $logstart), + 'activeslot slot invalidation is logged due to wal_level'); + +# Verify that pg_stat_database_conflicts.confl_active_logicalslot has been updated +# we now expect 3 conflicts reported as the counter persist across reloads +ok( $node_standby->poll_query_until( + 'postgres', + "select (confl_active_logicalslot = 4) from pg_stat_database_conflicts where datname = 'testdb'", 't'), + 'confl_active_logicalslot updated') or die "Timed out waiting confl_active_logicalslot to be updated"; + +# Verify slots are reported as conflicting in pg_replication_slots +check_slots_conflicting_status(1); + +$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr); +# We are not able to read from the slot as it requires wal_level at least logical on the primary server +check_pg_recvlogical_stderr($handle, "logical decoding on standby requires wal_level to be at least logical on the primary server"); + +# Restore primary wal_level +$node_primary->append_conf('postgresql.conf',q[ +wal_level = 'logical' +]); +$node_primary->restart; +$node_primary->wait_for_catchup($node_standby, 'replay', $node_primary->lsn('flush')); + +$handle = make_slot_active($node_standby, 0, \$stdout, \$stderr); +# as the slot has been invalidated we should not be able to read +check_pg_recvlogical_stderr($handle, "cannot read from logical replication slot \"activeslot\""); + +################################################## +# DROP DATABASE should drops it's slots, including active slots. +################################################## + +# drop the logical slots +drop_logical_slots(); + +# create the logical slots +create_logical_slots($node_standby); + +$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr); +# Create a slot on a database that would not be dropped. This slot should not +# get dropped. +$node_standby->create_logical_slot_on_standby($node_primary, 'otherslot', 'postgres'); + +# dropdb on the primary to verify slots are dropped on standby +$node_primary->safe_psql('postgres', q[DROP DATABASE testdb]); + +$node_primary->wait_for_catchup($node_standby, 'replay', $node_primary->lsn('flush')); + +is($node_standby->safe_psql('postgres', + q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]), 'f', + 'database dropped on standby'); + +check_slots_dropped($handle); + +is($node_standby->slot('otherslot')->{'slot_type'}, 'logical', + 'otherslot on standby not dropped'); + +# Cleanup : manually drop the slot that was not dropped. +$node_standby->psql('postgres', q[SELECT pg_drop_replication_slot('otherslot')]); + +################################################## +# Test standby promotion and logical decoding behavior +# after the standby gets promoted. +################################################## + +# reduce wal_sender_timeout to not wait too long after promotion +$node_standby->append_conf('postgresql.conf',qq[ + wal_sender_timeout = 1s +]); + +$node_standby->reload; + +$node_primary->psql('postgres', q[CREATE DATABASE testdb]); +$node_primary->safe_psql('testdb', qq[CREATE TABLE decoding_test(x integer, y text);]); + +# create the logical slots +create_logical_slots($node_standby); + +# create the logical slots on the cascading standby too +create_logical_slots($node_cascading_standby); + +# Make slots actives +$handle = make_slot_active($node_standby, 1, \$stdout, \$stderr); +my $cascading_handle = make_slot_active($node_cascading_standby, 1, \$cascading_stdout, \$cascading_stderr); + +# Insert some rows before the promotion +$node_primary->safe_psql('testdb', + qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;] +); + +# Wait for both standbys to catchup +$node_primary->wait_for_catchup($node_standby, 'replay', $node_primary->lsn('flush')); +$node_standby->wait_for_catchup($node_cascading_standby, 'replay', $node_primary->lsn('flush')); + +# promote +$node_standby->promote; + +# insert some rows on promoted standby +$node_standby->safe_psql('testdb', + qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(5,7) s;] +); + +# Wait for the cascading standby to catchup +$node_standby->wait_for_catchup($node_cascading_standby, 'replay', $node_standby->lsn('flush')); + +$expected = q{BEGIN +table public.decoding_test: INSERT: x[integer]:1 y[text]:'1' +table public.decoding_test: INSERT: x[integer]:2 y[text]:'2' +table public.decoding_test: INSERT: x[integer]:3 y[text]:'3' +table public.decoding_test: INSERT: x[integer]:4 y[text]:'4' +COMMIT +BEGIN +table public.decoding_test: INSERT: x[integer]:5 y[text]:'5' +table public.decoding_test: INSERT: x[integer]:6 y[text]:'6' +table public.decoding_test: INSERT: x[integer]:7 y[text]:'7' +COMMIT}; + +# check that we are decoding pre and post promotion inserted rows +$stdout_sql = $node_standby->safe_psql('testdb', + qq[SELECT data FROM pg_logical_slot_peek_changes('inactiveslot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');] +); + +is($stdout_sql, $expected, 'got expected output from SQL decoding session on promoted standby'); + +# check that we are decoding pre and post promotion inserted rows +# with pg_recvlogical that has started before the promotion +my $pump_timeout = IPC::Run::timer($PostgreSQL::Test::Utils::timeout_default); + +ok( pump_until( + $handle, $pump_timeout, \$stdout, qr/^.*COMMIT.*COMMIT$/s), + 'got 2 COMMIT from pg_recvlogical output'); + +chomp($stdout); +is($stdout, $expected, + 'got same expected output from pg_recvlogical decoding session'); + +# check that we are decoding pre and post promotion inserted rows on the cascading standby +$stdout_sql = $node_cascading_standby->safe_psql('testdb', + qq[SELECT data FROM pg_logical_slot_peek_changes('inactiveslot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');] +); + +is($stdout_sql, $expected, 'got expected output from SQL decoding session on cascading standby'); + +# check that we are decoding pre and post promotion inserted rows +# with pg_recvlogical that has started before the promotion on the cascading standby +ok( pump_until( + $cascading_handle, $pump_timeout, \$cascading_stdout, qr/^.*COMMIT.*COMMIT$/s), + 'got 2 COMMIT from pg_recvlogical output'); + +chomp($cascading_stdout); +is($cascading_stdout, $expected, + 'got same expected output from pg_recvlogical decoding session on cascading standby'); -- 2.34.1