From 15333302b07bac64e2eb0b984c2b4e037e2566ab Mon Sep 17 00:00:00 2001 From: alterego655 <824662526@qq.com> Date: Mon, 8 Jun 2026 20:47:56 +0800 Subject: [PATCH v1] Reproducer for logical decoding choosing the wrong timeline while a standby is being promoted. --- src/backend/replication/walsender.c | 5 + src/test/recovery/t/099_repro.pl | 139 ++++++++++++++++++++++++++++ 2 files changed, 144 insertions(+) create mode 100644 src/test/recovery/t/099_repro.pl diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 04aa770d981..dbd48d17250 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -94,6 +94,7 @@ #include "utils/acl.h" #include "utils/builtins.h" #include "utils/guc.h" +#include "utils/injection_point.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/pg_lsn.h" @@ -1103,6 +1104,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req */ am_cascading_walsender = RecoveryInProgress(); + INJECTION_POINT("logical-read-xlog-page-before-tli", NULL); + if (am_cascading_walsender) GetXLogReplayRecPtr(&currTLI); else @@ -1501,6 +1504,8 @@ StartLogicalReplication(StartReplicationCmd *cmd) ReplicationSlotAcquire(cmd->slotname, true, true); + INJECTION_POINT("logical-walsender-after-slot-acquire", NULL); + /* * Force a disconnect, so that the decoding code doesn't need to care * about an eventual switch from running in recovery, to running in a diff --git a/src/test/recovery/t/099_repro.pl b/src/test/recovery/t/099_repro.pl new file mode 100644 index 00000000000..83a42ae507e --- /dev/null +++ b/src/test/recovery/t/099_repro.pl @@ -0,0 +1,139 @@ +# Copyright (c) 2026, PostgreSQL Global Development Group + +# Reproducer for logical decoding choosing the wrong timeline while a standby +# is being promoted. + +use strict; +use warnings FATAL => 'all'; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +if ($ENV{enable_injection_points} ne 'yes') +{ + plan skip_all => 'Injection points not supported by this build'; +} + +my ($stdout, $stderr); + +my $node_primary = PostgreSQL::Test::Cluster->new('primary'); +my $node_standby = PostgreSQL::Test::Cluster->new('standby'); +my $default_timeout = $PostgreSQL::Test::Utils::timeout_default; + +my $primary_slotname = 'primary_physical'; + +$node_primary->init(allows_streaming => 1, has_archiving => 1); +$node_primary->append_conf( + 'postgresql.conf', q[ +wal_level = 'logical' +max_replication_slots = 4 +max_wal_senders = 4 +autovacuum = off +]); +$node_primary->start; + +if (!$node_primary->check_extension('injection_points')) +{ + plan skip_all => 'Extension injection_points not installed'; +} + +$node_primary->safe_psql('postgres', 'CREATE EXTENSION injection_points;'); + +$node_primary->safe_psql('postgres', + qq[SELECT * FROM pg_create_physical_replication_slot('$primary_slotname');] +); +$node_primary->safe_psql('postgres', + 'CREATE TABLE decoding_test(x integer, y text);'); + +my $backup_name = 'b1'; +$node_primary->backup($backup_name); + +$node_standby->init_from_backup( + $node_primary, $backup_name, + has_streaming => 1, + has_restoring => 1); +$node_standby->append_conf( + 'postgresql.conf', + qq[primary_slot_name = '$primary_slotname' +max_replication_slots = 4]); +$node_standby->start; +$node_primary->wait_for_replay_catchup($node_standby); + +$node_standby->create_logical_slot_on_standby($node_primary, 'testslot', + 'postgres'); + +$node_standby->safe_psql('postgres', + "SELECT injection_points_attach('logical-walsender-after-slot-acquire', 'wait');" +); + +my $handle = IPC::Run::start( + [ + 'pg_recvlogical', + '--dbname' => $node_standby->connstr('postgres'), + '--slot' => 'testslot', + '--option' => 'include-xids=0', + '--option' => 'skip-empty-xacts=1', + '--file' => '-', + '--no-loop', + '--start', + ], + '>' => \$stdout, + '2>' => \$stderr, + IPC::Run::timeout($default_timeout)); + +$node_standby->wait_for_event('walsender', + 'logical-walsender-after-slot-acquire'); + +$node_primary->safe_psql('postgres', + qq[INSERT INTO decoding_test(x,y) + SELECT s, s::text FROM generate_series(1,4) s;]); +$node_primary->wait_for_replay_catchup($node_standby); + +$node_standby->safe_psql('postgres', + qq[SELECT injection_points_attach('startup-logical-decoding-status-change-end-of-recovery', 'wait');] +); + +$node_standby->safe_psql('postgres', 'SELECT pg_promote(false);'); +$node_standby->wait_for_event('startup', + 'startup-logical-decoding-status-change-end-of-recovery'); + +$node_standby->safe_psql('postgres', + "SELECT injection_points_wakeup('logical-walsender-after-slot-acquire');" +); + +my $pump_timeout = IPC::Run::timer($PostgreSQL::Test::Utils::timeout_default); +ok(pump_until($handle, $pump_timeout, \$stdout, qr/^.*COMMIT$/s), + 'decoded pre-promotion transaction while promotion is in progress'); + +$node_standby->safe_psql('postgres', + qq[SELECT injection_points_wakeup('startup-logical-decoding-status-change-end-of-recovery');] +); + +$node_standby->poll_query_until('postgres', + "SELECT NOT pg_is_in_recovery();") + or die "standby did not finish promotion"; + +$node_standby->safe_psql('postgres', + qq[INSERT INTO decoding_test(x,y) + SELECT s, s::text FROM generate_series(5,7) s;]); + +ok(pump_until($handle, $pump_timeout, \$stdout, qr/^.*COMMIT.*COMMIT$/s), + 'decoded pre- and post-promotion transactions'); + +my $expected = q{BEGIN +table public.decoding_test: INSERT: x[integer]:1 y[text]:'1' +table public.decoding_test: INSERT: x[integer]:2 y[text]:'2' +table public.decoding_test: INSERT: x[integer]:3 y[text]:'3' +table public.decoding_test: INSERT: x[integer]:4 y[text]:'4' +COMMIT +BEGIN +table public.decoding_test: INSERT: x[integer]:5 y[text]:'5' +table public.decoding_test: INSERT: x[integer]:6 y[text]:'6' +table public.decoding_test: INSERT: x[integer]:7 y[text]:'7' +COMMIT}; + +chomp($stdout); +is($stdout, $expected, 'got expected output from pg_recvlogical'); + +done_testing(); -- 2.51.0