From 02cf5b9566a0887b8f86955c14789502f76e755a Mon Sep 17 00:00:00 2001 From: Ashutosh Sharma Date: Fri, 3 Jul 2026 05:25:17 +0000 Subject: [PATCH 2/2] Refactor replication slot decoding stats handling Replication slot decoding stats use several related counters. Previously, ReorderBuffer collected them as separate fields, while PgStat_StatReplSlotEntry stored the same set again. UpdateDecodingStats() then had to copy and clear each field by hand. Introduce PgStat_ReplSlotStats as the common structure for these logical decoding counters. Embed it in ReorderBuffer for collection and in PgStat_StatReplSlotEntry for storage, while keeping slot synchronization and reset metadata outside it. With the counters grouped, UpdateDecodingStats() can pass the collected struct directly to pgstat_report_replslot() and clear it with a single MemSet(). The pgstat reporting path now accumulates the shared struct, and pg_stat_get_replication_slot() reads counters from the nested decoding_stats member. Keep the shared type in a small replication header, so reorderbuffer.h does not need to include pgstat.h. This makes future logical decoding replication slot counters less error-prone to add by tying collection, reporting, and reset paths to one shared type. Per suggestion from Masahiko Sawada --- src/backend/replication/logical/logical.c | 55 ++++++------------- .../replication/logical/reorderbuffer.c | 33 ++++------- src/backend/utils/activity/pgstat_replslot.c | 6 +- src/backend/utils/adt/pgstatfuncs.c | 20 +++---- src/include/pgstat.h | 14 +---- src/include/replication/reorderbuffer.h | 29 +--------- src/include/replication/replslot_stats.h | 29 ++++++++++ src/tools/pgindent/typedefs.list | 1 + 8 files changed, 78 insertions(+), 109 deletions(-) create mode 100644 src/include/replication/replslot_stats.h diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 8435f4128e8..c83f0b9056f 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -705,7 +705,7 @@ OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write) outputBytes = ctx->out->len; ctx->write(ctx, ctx->write_location, ctx->write_xid, last_write); - ctx->reorder->outputBytes += outputBytes; + ctx->reorder->stats.output_bytes += outputBytes; ctx->prepared_write = false; } @@ -1956,11 +1956,10 @@ void UpdateDecodingStats(LogicalDecodingContext *ctx) { ReorderBuffer *rb = ctx->reorder; - PgStat_StatReplSlotEntry repSlotStat; /* Nothing to do if we don't have any replication stats to be sent. */ - if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0 && - rb->memExceededCount <= 0) + if (rb->stats.spill_bytes <= 0 && rb->stats.stream_bytes <= 0 && + rb->stats.total_bytes <= 0 && rb->stats.mem_exceeded_count <= 0) return; elog(DEBUG2, @@ -1971,40 +1970,20 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) " total_txns=%" PRId64 " total_bytes=%" PRId64 " output_bytes=%" PRId64, rb, - rb->spillTxns, - rb->spillCount, - rb->spillBytes, - rb->streamTxns, - rb->streamCount, - rb->streamBytes, - rb->memExceededCount, - rb->totalTxns, - rb->totalBytes, - rb->outputBytes); - - repSlotStat.spill_txns = rb->spillTxns; - repSlotStat.spill_count = rb->spillCount; - repSlotStat.spill_bytes = rb->spillBytes; - repSlotStat.stream_txns = rb->streamTxns; - repSlotStat.stream_count = rb->streamCount; - repSlotStat.stream_bytes = rb->streamBytes; - repSlotStat.mem_exceeded_count = rb->memExceededCount; - repSlotStat.total_txns = rb->totalTxns; - repSlotStat.total_bytes = rb->totalBytes; - repSlotStat.output_bytes = rb->outputBytes; - - pgstat_report_replslot(ctx->slot, &repSlotStat); - - rb->spillTxns = 0; - rb->spillCount = 0; - rb->spillBytes = 0; - rb->streamTxns = 0; - rb->streamCount = 0; - rb->streamBytes = 0; - rb->memExceededCount = 0; - rb->totalTxns = 0; - rb->totalBytes = 0; - rb->outputBytes = 0; + rb->stats.spill_txns, + rb->stats.spill_count, + rb->stats.spill_bytes, + rb->stats.stream_txns, + rb->stats.stream_count, + rb->stats.stream_bytes, + rb->stats.mem_exceeded_count, + rb->stats.total_txns, + rb->stats.total_bytes, + rb->stats.output_bytes); + + pgstat_report_replslot(ctx->slot, &rb->stats); + + MemSet(&rb->stats, 0, sizeof(PgStat_ReplSlotStats)); } /* diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index e735bf63f10..36e5dc767b8 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -385,16 +385,7 @@ ReorderBufferAllocate(void) /* txn_heap is ordered by transaction size */ buffer->txn_heap = pairingheap_allocate(ReorderBufferTXNSizeCompare, NULL); - buffer->spillTxns = 0; - buffer->spillCount = 0; - buffer->spillBytes = 0; - buffer->streamTxns = 0; - buffer->streamCount = 0; - buffer->streamBytes = 0; - buffer->memExceededCount = 0; - buffer->totalTxns = 0; - buffer->totalBytes = 0; - buffer->outputBytes = 0; + MemSet(&buffer->stats, 0, sizeof(PgStat_ReplSlotStats)); buffer->current_restart_decoding_lsn = InvalidXLogRecPtr; @@ -1470,7 +1461,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) * releasing the current set of changes and restoring the new set of * changes. */ - rb->totalBytes += entry->txn->size; + rb->stats.total_bytes += entry->txn->size; if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file, &state->entries[off].segno)) { @@ -2614,9 +2605,9 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * which we have already accounted in ReorderBufferIterTXNNext. */ if (!rbtxn_is_streamed(txn)) - rb->totalTxns++; + rb->stats.total_txns++; - rb->totalBytes += txn->total_size; + rb->stats.total_bytes += txn->total_size; /* * Done with current changes, send the last message for this set of @@ -3903,7 +3894,7 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) * update the slot statistics altogether while streaming or * serializing transactions in most cases. */ - rb->memExceededCount += 1; + rb->stats.mem_exceeded_count += 1; } else if (debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_BUFFERED) { @@ -3974,7 +3965,7 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) Assert(txn->nentries_mem == 0); /* - * We've reported the memExceededCount update while streaming or + * We've reported the mem_exceeded_count update while streaming or * serializing the transaction. */ update_stats = false; @@ -4063,11 +4054,11 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* update the statistics iff we have spilled anything */ if (spilled) { - rb->spillCount += 1; - rb->spillBytes += size; + rb->stats.spill_count += 1; + rb->stats.spill_bytes += size; /* don't consider already serialized transactions */ - rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1; + rb->stats.spill_txns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1; /* update the decoding stats */ UpdateDecodingStats((LogicalDecodingContext *) rb->private_data); @@ -4435,11 +4426,11 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now, command_id, true); - rb->streamCount += 1; - rb->streamBytes += stream_bytes; + rb->stats.stream_count += 1; + rb->stats.stream_bytes += stream_bytes; /* Don't consider already streamed transaction. */ - rb->streamTxns += (txn_is_streamed) ? 0 : 1; + rb->stats.stream_txns += (txn_is_streamed) ? 0 : 1; /* update the decoding stats */ UpdateDecodingStats((LogicalDecodingContext *) rb->private_data); diff --git a/src/backend/utils/activity/pgstat_replslot.c b/src/backend/utils/activity/pgstat_replslot.c index d5ffc1a7f36..9e42d05c70b 100644 --- a/src/backend/utils/activity/pgstat_replslot.c +++ b/src/backend/utils/activity/pgstat_replslot.c @@ -74,16 +74,16 @@ pgstat_reset_replslot(const char *name) * pgstat_acquire_replslot() have already been called. */ void -pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *repSlotStat) +pgstat_report_replslot(ReplicationSlot *slot, const PgStat_ReplSlotStats *repSlotStat) { PgStat_EntryRef *entry_ref; PgStatShared_ReplSlot *shstatent; - PgStat_StatReplSlotEntry *statent; + PgStat_ReplSlotStats *statent; entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_REPLSLOT, InvalidOid, ReplicationSlotIndex(slot), false); shstatent = (PgStatShared_ReplSlot *) entry_ref->shared_stats; - statent = &shstatent->stats; + statent = &shstatent->stats.decoding_stats; /* Update the replication slot statistics */ #define REPLSLOT_ACC(fld) statent->fld += repSlotStat->fld diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 6ea59f44749..6c4b7a26bc8 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2276,16 +2276,16 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) } values[0] = CStringGetTextDatum(NameStr(slotname)); - values[1] = Int64GetDatum(slotent->spill_txns); - values[2] = Int64GetDatum(slotent->spill_count); - values[3] = Int64GetDatum(slotent->spill_bytes); - values[4] = Int64GetDatum(slotent->stream_txns); - values[5] = Int64GetDatum(slotent->stream_count); - values[6] = Int64GetDatum(slotent->stream_bytes); - values[7] = Int64GetDatum(slotent->mem_exceeded_count); - values[8] = Int64GetDatum(slotent->total_txns); - values[9] = Int64GetDatum(slotent->total_bytes); - values[10] = Int64GetDatum(slotent->output_bytes); + values[1] = Int64GetDatum(slotent->decoding_stats.spill_txns); + values[2] = Int64GetDatum(slotent->decoding_stats.spill_count); + values[3] = Int64GetDatum(slotent->decoding_stats.spill_bytes); + values[4] = Int64GetDatum(slotent->decoding_stats.stream_txns); + values[5] = Int64GetDatum(slotent->decoding_stats.stream_count); + values[6] = Int64GetDatum(slotent->decoding_stats.stream_bytes); + values[7] = Int64GetDatum(slotent->decoding_stats.mem_exceeded_count); + values[8] = Int64GetDatum(slotent->decoding_stats.total_txns); + values[9] = Int64GetDatum(slotent->decoding_stats.total_bytes); + values[10] = Int64GetDatum(slotent->decoding_stats.output_bytes); values[11] = Int64GetDatum(slotent->slotsync_skip_count); if (slotent->slotsync_last_skip == 0) diff --git a/src/include/pgstat.h b/src/include/pgstat.h index c90f19b547c..f716872d198 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -15,6 +15,7 @@ #include "portability/instr_time.h" #include "postmaster/pgarch.h" /* for MAX_XFN_CHARS */ #include "replication/conflict.h" +#include "replication/replslot_stats.h" #include "storage/locktag.h" #include "utils/backend_progress.h" /* for backward compatibility */ /* IWYU pragma: export */ #include "utils/backend_status.h" /* for backward compatibility */ /* IWYU pragma: export */ @@ -413,16 +414,7 @@ typedef struct PgStat_StatFuncEntry typedef struct PgStat_StatReplSlotEntry { - PgStat_Counter spill_txns; - PgStat_Counter spill_count; - PgStat_Counter spill_bytes; - PgStat_Counter stream_txns; - PgStat_Counter stream_count; - PgStat_Counter stream_bytes; - PgStat_Counter mem_exceeded_count; - PgStat_Counter total_txns; - PgStat_Counter total_bytes; - PgStat_Counter output_bytes; + PgStat_ReplSlotStats decoding_stats; PgStat_Counter slotsync_skip_count; TimestampTz slotsync_last_skip; TimestampTz stat_reset_timestamp; @@ -788,7 +780,7 @@ extern PgStat_TableStatus *find_tabstat_entry(Oid rel_id); extern void pgstat_reset_replslot(const char *name); struct ReplicationSlot; -extern void pgstat_report_replslot(struct ReplicationSlot *slot, const PgStat_StatReplSlotEntry *repSlotStat); +extern void pgstat_report_replslot(struct ReplicationSlot *slot, const PgStat_ReplSlotStats *repSlotStat); extern void pgstat_report_replslotsync(struct ReplicationSlot *slot); extern void pgstat_create_replslot(struct ReplicationSlot *slot); extern void pgstat_acquire_replslot(struct ReplicationSlot *slot); diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 79a8afb8006..a6f8cc4d100 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -12,6 +12,7 @@ #include "access/htup_details.h" #include "lib/ilist.h" #include "lib/pairingheap.h" +#include "replication/replslot_stats.h" #include "storage/sinval.h" #include "utils/hsearch.h" #include "utils/relcache.h" @@ -674,32 +675,8 @@ struct ReorderBuffer /* Max-heap for sizes of all top-level and sub transactions */ pairingheap *txn_heap; - /* - * Statistics about transactions spilled to disk. - * - * A single transaction may be spilled repeatedly, which is why we keep - * two different counters. For spilling, the transaction counter includes - * both toplevel transactions and subtransactions. - */ - int64 spillTxns; /* number of transactions spilled to disk */ - int64 spillCount; /* spill-to-disk invocation counter */ - int64 spillBytes; /* amount of data spilled to disk */ - - /* Statistics about transactions streamed to the decoding output plugin */ - int64 streamTxns; /* number of transactions streamed */ - int64 streamCount; /* streaming invocation counter */ - int64 streamBytes; /* amount of data decoded */ - - /* Number of times the logical_decoding_work_mem limit has been reached */ - int64 memExceededCount; - - /* - * Statistics about all the transactions sent to the decoding output - * plugin - */ - int64 totalTxns; /* total number of transactions sent */ - int64 totalBytes; /* total amount of data decoded */ - int64 outputBytes; /* amount of data produced for downstream */ + /* Replication slot statistics collected by logical decoding. */ + PgStat_ReplSlotStats stats; }; diff --git a/src/include/replication/replslot_stats.h b/src/include/replication/replslot_stats.h new file mode 100644 index 00000000000..a696d912caa --- /dev/null +++ b/src/include/replication/replslot_stats.h @@ -0,0 +1,29 @@ +/*------------------------------------------------------------------------- + * + * replslot_stats.h + * Replication slot statistics shared definitions. + * + * Copyright (c) 2026, PostgreSQL Global Development Group + * + * src/include/replication/replslot_stats.h + * + *------------------------------------------------------------------------- + */ +#ifndef REPLSLOT_STATS_H +#define REPLSLOT_STATS_H + +typedef struct PgStat_ReplSlotStats +{ + int64 spill_txns; + int64 spill_count; + int64 spill_bytes; + int64 stream_txns; + int64 stream_count; + int64 stream_bytes; + int64 mem_exceeded_count; + int64 total_txns; + int64 total_bytes; + int64 output_bytes; +} PgStat_ReplSlotStats; + +#endif /* REPLSLOT_STATS_H */ \ No newline at end of file diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 117e7379f10..3949f2dc5d0 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2351,6 +2351,7 @@ PgStat_LockEntry PgStat_PendingDroppedStatsItem PgStat_PendingIO PgStat_PendingLock +PgStat_ReplSlotStats PgStat_SLRUStats PgStat_ShmemControl PgStat_Snapshot -- 2.43.0