From ad98280be52f37acb23d8cd14fc26dd04e1cd3e9 Mon Sep 17 00:00:00 2001 From: Fujii Masao Date: Thu, 28 May 2026 23:19:42 +0900 Subject: [PATCH v1] pg_recvlogical: send final feedback on SIGINT/SIGTERM shutdown Previously, when pg_recvlogical exited due to SIGINT or SIGTERM, it could terminate without sending final feedback for the last decoded changes it had already written locally. So, if pg_recvlogical was restarted afterwards, the server-side logical replication slot could still point behind those changes, causing them to be sent again. Make pg_recvlogical send final feedback once more during SIGINT/SIGTERM shutdown, before sending CopyDone. This gives the server one more chance to advance the slot far enough to avoid resending already-written data, so users are less likely to see duplicate decoded output after stopping and restarting pg_recvlogical. This remains a best-effort improvement rather than a guarantee. Depending on when the signal arrives, pg_recvlogical can already have written decoded output that the server cannot yet safely treat as confirmed, so a later restart can still receive duplicate data. --- src/bin/pg_basebackup/pg_recvlogical.c | 23 ++++++ src/bin/pg_basebackup/t/030_pg_recvlogical.pl | 74 +++++++++++++++++++ 2 files changed, 97 insertions(+) diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c index 2fdf64bcadb..11abdbc274e 100644 --- a/src/bin/pg_basebackup/pg_recvlogical.c +++ b/src/bin/pg_basebackup/pg_recvlogical.c @@ -1071,6 +1071,29 @@ static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos, StreamStopReason reason, XLogRecPtr lsn) { + /* + * If pg_recvlogical is terminated by a signal, we can reach here without + * sending final feedback. In that case, send feedback once more before + * sending CopyDone so the replication slot can advance far enough to + * reduce the chance of resending duplicate data when pg_recvlogical is + * restarted. + * + * This is still only a best-effort attempt. Depending on when the signal + * arrives, the receiver may have written decoded output that the server + * cannot yet safely treat as confirmed, so a later restart can still see + * duplicate data. + * + * For other termination cases, such as STREAM_STOP_KEEPALIVE and + * STREAM_STOP_END_OF_WAL, feedback has already been sent before reaching + * here, so there is no need to call flushAndSendFeedback() again. + */ + if (reason == STREAM_STOP_SIGNAL) + { + TimestampTz now = feGetCurrentTimestamp(); + + (void) flushAndSendFeedback(conn, &now); + } + (void) PQputCopyEnd(conn, NULL); (void) PQflush(conn); diff --git a/src/bin/pg_basebackup/t/030_pg_recvlogical.pl b/src/bin/pg_basebackup/t/030_pg_recvlogical.pl index 945a242bdad..5e3e36cc4f3 100644 --- a/src/bin/pg_basebackup/t/030_pg_recvlogical.pl +++ b/src/bin/pg_basebackup/t/030_pg_recvlogical.pl @@ -282,6 +282,80 @@ SKIP: 'pg_recvlogical output file respects group permissions (0640)'); } +SKIP: +{ + skip "signals not supported on Windows", 4 + if ($Config{osname} eq 'MSWin32' || $Config{osname} eq 'cygwin'); + + my $signal_outfile = $node->basedir . '/signal_shutdown.out'; + + $node->command_ok( + [ + 'pg_recvlogical', + '--slot' => 'signal_shutdown_test', + '--dbname' => $node->connstr('postgres'), + '--create-slot', + ], + 'slot created for signal shutdown test'); + + @pg_recvlogical_cmd = ( + 'pg_recvlogical', + '--slot' => 'signal_shutdown_test', + '--dbname' => $node->connstr('postgres'), + '--start', + '--file' => $signal_outfile, + '--fsync-interval' => '100', + '--status-interval' => '100'); + + $recv = IPC::Run::start( + [@pg_recvlogical_cmd], + '>' => \$stdout, + '2>' => \$stderr); + + $node->safe_psql('postgres', 'INSERT INTO test_table VALUES (42)'); + + # Wait for not only INSERT but also COMMIT because the inserted + # change might not yet be safely confirmable by final feedback until + # the transaction has committed. + wait_for_file($signal_outfile, + qr/test_table: INSERT: x\[integer\]:42\b.*?\bCOMMIT\b/s); + + $recv->signal('TERM'); + $recv->finish(); + + $nextlsn = + $node->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn()'); + chomp($nextlsn); + + $node->command_ok( + [ + 'pg_recvlogical', + '--slot' => 'signal_shutdown_test', + '--dbname' => $node->connstr('postgres'), + '--start', + '--endpos' => $nextlsn, + '--no-loop', + '--file' => $signal_outfile, + ], + 'pg_recvlogical exits after signal without replaying flushed data'); + + my $signal_data = slurp_file($signal_outfile); + my $signal_count = + (() = $signal_data =~ /test_table: INSERT: x\[integer\]:42\b/g); + is($signal_count, 1, + 'pg_recvlogical does not duplicate decoded changes after signal shutdown' + ); + + $node->command_ok( + [ + 'pg_recvlogical', + '--slot' => 'signal_shutdown_test', + '--dbname' => $node->connstr('postgres'), + '--drop-slot' + ], + 'signal_shutdown_test slot dropped'); +} + $node->command_ok( [ 'pg_recvlogical', -- 2.53.0