From 62b52d0011f08d472f3d60b67110453b07f1017d Mon Sep 17 00:00:00 2001 From: Nikolay Samokhvalov Date: Thu, 16 Apr 2026 19:25:55 -0700 Subject: [PATCH] Do not count logical decoding cleanup aborts in xact_rollback ReorderBufferProcessTXN() aborts the current transaction after each decoded commit to release locks and clean up catalog access. In a logical walsender that is a top-level abort, so every decoded commit bumps pg_stat_database.xact_rollback; the counts surface as a spike on walsender exit (e.g. when a subscription is disabled). Add AbortCurrentTransactionWithoutXactStats(), a wrapper that suppresses only the DB-level xact_commit/xact_rollback counter and leaves per-relation and subxact stat handling intact. Use it from the top-level cleanup paths in ReorderBufferProcessTXN() (gated on !using_subtxn) and SnapBuildClearExportedSnapshot(). A TAP test asserts a publisher xact_rollback delta of 0 across five decoded transactions (delta is 5 without the fix). Reported-by: Rafael Thofehrn Castro Discussion: https://postgr.es/m/CAM527d_EbU5Li4a5FdKQjYsdF-4Lqr_i3jXmZOm7Wbb%3DQ2KzTw%40mail.gmail.com --- src/backend/access/transam/twophase.c | 2 +- src/backend/access/transam/xact.c | 39 ++++++++++- .../replication/logical/reorderbuffer.c | 16 +++-- src/backend/replication/logical/snapbuild.c | 9 ++- src/backend/utils/activity/pgstat_xact.c | 11 ++- src/include/access/xact.h | 1 + src/include/pgstat.h | 2 +- src/test/subscription/meson.build | 1 + .../t/039_publisher_xact_rollback.pl | 70 +++++++++++++++++++ 9 files changed, 139 insertions(+), 12 deletions(-) create mode 100644 src/test/subscription/t/039_publisher_xact_rollback.pl diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 1035e8b3fc7..3aa31e7f805 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1676,7 +1676,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit) LWLockRelease(TwoPhaseStateLock); /* Count the prepared xact as committed or aborted */ - AtEOXact_PgStat(isCommit, false); + AtEOXact_PgStat(isCommit, false, true); /* * And now we can clean up any files we may have left. diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 48bc90c9673..35d5e174011 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -316,6 +316,15 @@ typedef struct XactCallbackItem static XactCallbackItem *Xact_callbacks = NULL; +/* + * When true, suppress the pg_stat_database xact_commit/xact_rollback bump + * for the current transaction end. Must only be set via + * AbortCurrentTransactionWithoutXactStats(); assertions in + * StartTransaction() and in the wrapper itself guard against the flag + * leaking across transactions. + */ +static bool xactSkipXactStats = false; + /* * List of add-on start- and end-of-subxact callbacks */ @@ -2118,6 +2127,7 @@ StartTransaction(void) /* check the current transaction state */ Assert(s->state == TRANS_DEFAULT); + Assert(!xactSkipXactStats); /* * Set the current transaction state information appropriately during @@ -2514,7 +2524,7 @@ CommitTransaction(void) AtEOXact_Files(true); AtEOXact_ComboCid(); AtEOXact_HashTables(true); - AtEOXact_PgStat(true, is_parallel_worker); + AtEOXact_PgStat(true, is_parallel_worker, true); AtEOXact_Snapshot(true, false); AtEOXact_ApplyLauncher(true); AtEOXact_LogicalRepWorkers(true); @@ -3039,7 +3049,7 @@ AbortTransaction(void) AtEOXact_Files(false); AtEOXact_ComboCid(); AtEOXact_HashTables(false); - AtEOXact_PgStat(false, is_parallel_worker); + AtEOXact_PgStat(false, is_parallel_worker, !xactSkipXactStats); AtEOXact_ApplyLauncher(false); AtEOXact_LogicalRepWorkers(false); AtEOXact_LogicalCtl(); @@ -3509,6 +3519,31 @@ AbortCurrentTransaction(void) } } +/* + * AbortCurrentTransactionWithoutXactStats + * + * Like AbortCurrentTransaction(), but do not count the transaction abort in + * pg_stat_database.xact_rollback. This is for internal cleanup aborts that + * release transaction-local resources but do not represent a user-visible + * transaction rollback. + */ +void +AbortCurrentTransactionWithoutXactStats(void) +{ + Assert(!xactSkipXactStats); + + xactSkipXactStats = true; + PG_TRY(); + { + AbortCurrentTransaction(); + } + PG_FINALLY(); + { + xactSkipXactStats = false; + } + PG_END_TRY(); +} + /* * AbortCurrentTransactionInternal - a function doing an iteration of work * regarding handling the current transaction abort. In the case of diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 682d13c9f22..f953676bfe1 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -2674,9 +2674,13 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * Aborting the current (sub-)transaction as a whole has the right * semantics. We want all locks acquired in here to be released, not * reassigned to the parent and we do not want any database access - * have persistent effects. + * have persistent effects. In the !using_subtxn case this is a + * top-level abort; keep it out of pg_stat_database.xact_rollback. */ - AbortCurrentTransaction(); + if (using_subtxn) + AbortCurrentTransaction(); + else + AbortCurrentTransactionWithoutXactStats(); /* make sure there's no cache pollution */ if (rbtxn_distr_inval_overflowed(txn)) @@ -2737,9 +2741,13 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, /* * Force cache invalidation to happen outside of a valid transaction - * to prevent catalog access as we just caught an error. + * to prevent catalog access as we just caught an error. As above, + * keep the top-level abort out of pg_stat_database.xact_rollback. */ - AbortCurrentTransaction(); + if (using_subtxn) + AbortCurrentTransaction(); + else + AbortCurrentTransactionWithoutXactStats(); /* make sure there's no cache pollution */ if (rbtxn_distr_inval_overflowed(txn)) diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index c8309b96ed4..ba9af88c505 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -631,8 +631,13 @@ SnapBuildClearExportedSnapshot(void) */ tmpResOwner = SavedResourceOwnerDuringExport; - /* make sure nothing could have ever happened */ - AbortCurrentTransaction(); + /* + * Make sure nothing could have ever happened. Keep this cleanup abort + * out of pg_stat_database.xact_rollback; we must be at top level so + * the abort reaches AtEOXact_PgStat_Database. + */ + Assert(!IsSubTransaction()); + AbortCurrentTransactionWithoutXactStats(); CurrentResourceOwner = tmpResOwner; } diff --git a/src/backend/utils/activity/pgstat_xact.c b/src/backend/utils/activity/pgstat_xact.c index 5e2d69e6297..ea9f703c088 100644 --- a/src/backend/utils/activity/pgstat_xact.c +++ b/src/backend/utils/activity/pgstat_xact.c @@ -37,11 +37,18 @@ static PgStat_SubXactStatus *pgStatXactStack = NULL; * Called from access/transam/xact.c at top-level transaction commit/abort. */ void -AtEOXact_PgStat(bool isCommit, bool parallel) +AtEOXact_PgStat(bool isCommit, bool parallel, bool count_xact_stats) { PgStat_SubXactStatus *xact_state; - AtEOXact_PgStat_Database(isCommit, parallel); + /* + * Only the database-level xact_commit/xact_rollback counter is gated + * here. Per-relation and subxact stat handling below must still run + * unconditionally so any stats accumulated during the transaction are + * not lost. + */ + if (count_xact_stats) + AtEOXact_PgStat_Database(isCommit, parallel); /* handle transactional stats information */ xact_state = pgStatXactStack; diff --git a/src/include/access/xact.h b/src/include/access/xact.h index a8cbdf247c8..a5ec9a027d4 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -467,6 +467,7 @@ extern void SaveTransactionCharacteristics(SavedTransactionCharacteristics *s); extern void RestoreTransactionCharacteristics(const SavedTransactionCharacteristics *s); extern void CommitTransactionCommand(void); extern void AbortCurrentTransaction(void); +extern void AbortCurrentTransactionWithoutXactStats(void); extern void BeginTransactionBlock(void); extern bool EndTransactionBlock(bool chain); extern bool PrepareTransactionBlock(const char *gid); diff --git a/src/include/pgstat.h b/src/include/pgstat.h index dfa2e837638..8509a590bbf 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -814,7 +814,7 @@ extern PgStat_StatSubEntry *pgstat_fetch_stat_subscription(Oid subid); * Functions in pgstat_xact.c */ -extern void AtEOXact_PgStat(bool isCommit, bool parallel); +extern void AtEOXact_PgStat(bool isCommit, bool parallel, bool count_xact_stats); extern void AtEOSubXact_PgStat(bool isCommit, int nestDepth); extern void AtPrepare_PgStat(void); extern void PostPrepare_PgStat(void); diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build index e71e95c6297..268fa8c3e9c 100644 --- a/src/test/subscription/meson.build +++ b/src/test/subscription/meson.build @@ -48,6 +48,7 @@ tests += { 't/036_sequences.pl', 't/037_except.pl', 't/038_walsnd_shutdown_timeout.pl', + 't/039_publisher_xact_rollback.pl', 't/100_bugs.pl', ], }, diff --git a/src/test/subscription/t/039_publisher_xact_rollback.pl b/src/test/subscription/t/039_publisher_xact_rollback.pl new file mode 100644 index 00000000000..1fa72a19712 --- /dev/null +++ b/src/test/subscription/t/039_publisher_xact_rollback.pl @@ -0,0 +1,70 @@ +# Copyright (c) 2026, PostgreSQL Global Development Group + +# Check that pg_stat_database.xact_rollback on a logical-replication +# publisher is not inflated by the walsender's internal catalog-cleanup +# aborts. ReorderBufferProcessTXN() ends each decoded transaction with +# AbortCurrentTransaction(); in the walsender that is a top-level abort +# whose counter increment flushes to shared stats on walsender exit. +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init; +$node_subscriber->start; + +$node_publisher->safe_psql('postgres', + 'CREATE TABLE t (id int PRIMARY KEY)'); +$node_subscriber->safe_psql('postgres', + 'CREATE TABLE t (id int PRIMARY KEY)'); + +$node_publisher->safe_psql('postgres', 'CREATE PUBLICATION p FOR TABLE t'); + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION s CONNECTION '$publisher_connstr' PUBLICATION p"); + +$node_subscriber->wait_for_subscription_sync($node_publisher, 's'); + +# Use a baseline-delta rather than pg_stat_reset() to tolerate ambient +# rollback activity. +my $base = $node_publisher->safe_psql('postgres', + "SELECT xact_rollback FROM pg_stat_database WHERE datname = 'postgres'"); +chomp $base; + +# Five autocommit INSERTs: each becomes one decoded committed txn on the +# walsender. Without the fix, that's five spurious rollbacks after DISABLE. +my $n = 5; +$node_publisher->safe_psql('postgres', + join('', map { "INSERT INTO t VALUES ($_);\n" } 1 .. $n)); + +$node_publisher->wait_for_catchup('s'); + +# Disabling the subscription terminates the walsender; its shutdown hook +# flushes pgstat counters to shared stats. +$node_subscriber->safe_psql('postgres', 'ALTER SUBSCRIPTION s DISABLE'); + +# Wait for this subscription's walsender (filter by application_name). +$node_publisher->poll_query_until( + 'postgres', q{ + SELECT count(*) = 0 FROM pg_stat_activity + WHERE backend_type = 'walsender' AND application_name = 's' +}) + or die 's walsender did not exit'; + +my $final = $node_publisher->safe_psql('postgres', + "SELECT xact_rollback FROM pg_stat_database WHERE datname = 'postgres'"); +chomp $final; + +cmp_ok( + $final - $base, '==', 0, + 'walsender does not inflate publisher xact_rollback for decoded transactions' +); + +done_testing(); -- 2.50.1 (Apple Git-155)