From 2bfa6de147aecac125883c23413f0e82614223ea Mon Sep 17 00:00:00 2001 From: Ashutosh Sharma Date: Tue, 30 Jun 2026 11:56:03 +0000 Subject: [PATCH 1/2] Refactor replication slot decoding stats handling The logical decoding stats reported for replication slots already use several closely-related counters. ReorderBuffer collected those values as separate fields, while PgStat_StatReplSlotEntry stored the same set again, requiring UpdateDecodingStats() to copy and clear every field individually. Introduce PgStat_ReplSlotStats as the common structure for these logical decoding counters. Embed it in ReorderBuffer for collection and in PgStat_StatReplSlotEntry for storage, keeping slot synchronization and reset metadata outside the shared struct because those fields are not collected by logical decoding. With the counters grouped this way, UpdateDecodingStats() can pass the collected struct directly to pgstat_report_replslot() and clear it with a single MemSet(). The pgstat reporting path now accumulates only the shared counter struct, and pg_stat_get_replication_slot() reads the same counters from the nested decoding_stats member. This makes future additions to the logical decoding replication slot counters less error-prone by keeping the collection, reporting, and reset paths tied to a single shared type. Per suggestion from Masahiko Sawada --- src/backend/replication/logical/logical.c | 49 ++++++------------- .../replication/logical/reorderbuffer.c | 32 +++++------- src/backend/utils/activity/pgstat_replslot.c | 6 +-- src/backend/utils/adt/pgstatfuncs.c | 18 +++---- src/include/pgstat.h | 9 +++- src/include/replication/reorderbuffer.h | 28 ++--------- src/tools/pgindent/typedefs.list | 1 + 7 files changed, 50 insertions(+), 93 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 3541fc793e4..87d1303d6c9 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1951,46 +1951,27 @@ void UpdateDecodingStats(LogicalDecodingContext *ctx) { ReorderBuffer *rb = ctx->reorder; - PgStat_StatReplSlotEntry repSlotStat; /* Nothing to do if we don't have any replication stats to be sent. */ - if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0 && - rb->memExceededCount <= 0) + if (rb->stats.spill_bytes <= 0 && rb->stats.stream_bytes <= 0 && + rb->stats.total_bytes <= 0 && rb->stats.mem_exceeded_count <= 0) return; elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64, rb, - rb->spillTxns, - rb->spillCount, - rb->spillBytes, - rb->streamTxns, - rb->streamCount, - rb->streamBytes, - rb->memExceededCount, - rb->totalTxns, - rb->totalBytes); - - repSlotStat.spill_txns = rb->spillTxns; - repSlotStat.spill_count = rb->spillCount; - repSlotStat.spill_bytes = rb->spillBytes; - repSlotStat.stream_txns = rb->streamTxns; - repSlotStat.stream_count = rb->streamCount; - repSlotStat.stream_bytes = rb->streamBytes; - repSlotStat.mem_exceeded_count = rb->memExceededCount; - repSlotStat.total_txns = rb->totalTxns; - repSlotStat.total_bytes = rb->totalBytes; - - pgstat_report_replslot(ctx->slot, &repSlotStat); - - rb->spillTxns = 0; - rb->spillCount = 0; - rb->spillBytes = 0; - rb->streamTxns = 0; - rb->streamCount = 0; - rb->streamBytes = 0; - rb->memExceededCount = 0; - rb->totalTxns = 0; - rb->totalBytes = 0; + rb->stats.spill_txns, + rb->stats.spill_count, + rb->stats.spill_bytes, + rb->stats.stream_txns, + rb->stats.stream_count, + rb->stats.stream_bytes, + rb->stats.mem_exceeded_count, + rb->stats.total_txns, + rb->stats.total_bytes); + + pgstat_report_replslot(ctx->slot, &rb->stats); + + MemSet(&rb->stats, 0, sizeof(PgStat_ReplSlotStats)); } /* diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 059ed860314..36e5dc767b8 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -385,15 +385,7 @@ ReorderBufferAllocate(void) /* txn_heap is ordered by transaction size */ buffer->txn_heap = pairingheap_allocate(ReorderBufferTXNSizeCompare, NULL); - buffer->spillTxns = 0; - buffer->spillCount = 0; - buffer->spillBytes = 0; - buffer->streamTxns = 0; - buffer->streamCount = 0; - buffer->streamBytes = 0; - buffer->memExceededCount = 0; - buffer->totalTxns = 0; - buffer->totalBytes = 0; + MemSet(&buffer->stats, 0, sizeof(PgStat_ReplSlotStats)); buffer->current_restart_decoding_lsn = InvalidXLogRecPtr; @@ -1469,7 +1461,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) * releasing the current set of changes and restoring the new set of * changes. */ - rb->totalBytes += entry->txn->size; + rb->stats.total_bytes += entry->txn->size; if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file, &state->entries[off].segno)) { @@ -2613,9 +2605,9 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, * which we have already accounted in ReorderBufferIterTXNNext. */ if (!rbtxn_is_streamed(txn)) - rb->totalTxns++; + rb->stats.total_txns++; - rb->totalBytes += txn->total_size; + rb->stats.total_bytes += txn->total_size; /* * Done with current changes, send the last message for this set of @@ -3902,7 +3894,7 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) * update the slot statistics altogether while streaming or * serializing transactions in most cases. */ - rb->memExceededCount += 1; + rb->stats.mem_exceeded_count += 1; } else if (debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_BUFFERED) { @@ -3973,7 +3965,7 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) Assert(txn->nentries_mem == 0); /* - * We've reported the memExceededCount update while streaming or + * We've reported the mem_exceeded_count update while streaming or * serializing the transaction. */ update_stats = false; @@ -4062,11 +4054,11 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) /* update the statistics iff we have spilled anything */ if (spilled) { - rb->spillCount += 1; - rb->spillBytes += size; + rb->stats.spill_count += 1; + rb->stats.spill_bytes += size; /* don't consider already serialized transactions */ - rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1; + rb->stats.spill_txns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1; /* update the decoding stats */ UpdateDecodingStats((LogicalDecodingContext *) rb->private_data); @@ -4434,11 +4426,11 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now, command_id, true); - rb->streamCount += 1; - rb->streamBytes += stream_bytes; + rb->stats.stream_count += 1; + rb->stats.stream_bytes += stream_bytes; /* Don't consider already streamed transaction. */ - rb->streamTxns += (txn_is_streamed) ? 0 : 1; + rb->stats.stream_txns += (txn_is_streamed) ? 0 : 1; /* update the decoding stats */ UpdateDecodingStats((LogicalDecodingContext *) rb->private_data); diff --git a/src/backend/utils/activity/pgstat_replslot.c b/src/backend/utils/activity/pgstat_replslot.c index a32b70a0373..5283dfe6344 100644 --- a/src/backend/utils/activity/pgstat_replslot.c +++ b/src/backend/utils/activity/pgstat_replslot.c @@ -74,16 +74,16 @@ pgstat_reset_replslot(const char *name) * pgstat_acquire_replslot() have already been called. */ void -pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *repSlotStat) +pgstat_report_replslot(ReplicationSlot *slot, const PgStat_ReplSlotStats *repSlotStat) { PgStat_EntryRef *entry_ref; PgStatShared_ReplSlot *shstatent; - PgStat_StatReplSlotEntry *statent; + PgStat_ReplSlotStats *statent; entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_REPLSLOT, InvalidOid, ReplicationSlotIndex(slot), false); shstatent = (PgStatShared_ReplSlot *) entry_ref->shared_stats; - statent = &shstatent->stats; + statent = &shstatent->stats.decoding_stats; /* Update the replication slot statistics */ #define REPLSLOT_ACC(fld) statent->fld += repSlotStat->fld diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 0c6a20843a5..06ad40f2907 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2274,15 +2274,15 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) } values[0] = CStringGetTextDatum(NameStr(slotname)); - values[1] = Int64GetDatum(slotent->spill_txns); - values[2] = Int64GetDatum(slotent->spill_count); - values[3] = Int64GetDatum(slotent->spill_bytes); - values[4] = Int64GetDatum(slotent->stream_txns); - values[5] = Int64GetDatum(slotent->stream_count); - values[6] = Int64GetDatum(slotent->stream_bytes); - values[7] = Int64GetDatum(slotent->mem_exceeded_count); - values[8] = Int64GetDatum(slotent->total_txns); - values[9] = Int64GetDatum(slotent->total_bytes); + values[1] = Int64GetDatum(slotent->decoding_stats.spill_txns); + values[2] = Int64GetDatum(slotent->decoding_stats.spill_count); + values[3] = Int64GetDatum(slotent->decoding_stats.spill_bytes); + values[4] = Int64GetDatum(slotent->decoding_stats.stream_txns); + values[5] = Int64GetDatum(slotent->decoding_stats.stream_count); + values[6] = Int64GetDatum(slotent->decoding_stats.stream_bytes); + values[7] = Int64GetDatum(slotent->decoding_stats.mem_exceeded_count); + values[8] = Int64GetDatum(slotent->decoding_stats.total_txns); + values[9] = Int64GetDatum(slotent->decoding_stats.total_bytes); values[10] = Int64GetDatum(slotent->slotsync_skip_count); if (slotent->slotsync_last_skip == 0) diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 58a44857f13..0410cf649b0 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -411,7 +411,7 @@ typedef struct PgStat_StatFuncEntry TimestampTz stat_reset_timestamp; } PgStat_StatFuncEntry; -typedef struct PgStat_StatReplSlotEntry +typedef struct PgStat_ReplSlotStats { PgStat_Counter spill_txns; PgStat_Counter spill_count; @@ -422,6 +422,11 @@ typedef struct PgStat_StatReplSlotEntry PgStat_Counter mem_exceeded_count; PgStat_Counter total_txns; PgStat_Counter total_bytes; +} PgStat_ReplSlotStats; + +typedef struct PgStat_StatReplSlotEntry +{ + PgStat_ReplSlotStats decoding_stats; PgStat_Counter slotsync_skip_count; TimestampTz slotsync_last_skip; TimestampTz stat_reset_timestamp; @@ -787,7 +792,7 @@ extern PgStat_TableStatus *find_tabstat_entry(Oid rel_id); extern void pgstat_reset_replslot(const char *name); struct ReplicationSlot; -extern void pgstat_report_replslot(struct ReplicationSlot *slot, const PgStat_StatReplSlotEntry *repSlotStat); +extern void pgstat_report_replslot(struct ReplicationSlot *slot, const PgStat_ReplSlotStats *repSlotStat); extern void pgstat_report_replslotsync(struct ReplicationSlot *slot); extern void pgstat_create_replslot(struct ReplicationSlot *slot); extern void pgstat_acquire_replslot(struct ReplicationSlot *slot); diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index ff825e4b7b2..3a84b8c46e8 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -12,6 +12,7 @@ #include "access/htup_details.h" #include "lib/ilist.h" #include "lib/pairingheap.h" +#include "pgstat.h" #include "storage/sinval.h" #include "utils/hsearch.h" #include "utils/relcache.h" @@ -674,31 +675,8 @@ struct ReorderBuffer /* Max-heap for sizes of all top-level and sub transactions */ pairingheap *txn_heap; - /* - * Statistics about transactions spilled to disk. - * - * A single transaction may be spilled repeatedly, which is why we keep - * two different counters. For spilling, the transaction counter includes - * both toplevel transactions and subtransactions. - */ - int64 spillTxns; /* number of transactions spilled to disk */ - int64 spillCount; /* spill-to-disk invocation counter */ - int64 spillBytes; /* amount of data spilled to disk */ - - /* Statistics about transactions streamed to the decoding output plugin */ - int64 streamTxns; /* number of transactions streamed */ - int64 streamCount; /* streaming invocation counter */ - int64 streamBytes; /* amount of data decoded */ - - /* Number of times the logical_decoding_work_mem limit has been reached */ - int64 memExceededCount; - - /* - * Statistics about all the transactions sent to the decoding output - * plugin - */ - int64 totalTxns; /* total number of transactions sent */ - int64 totalBytes; /* total amount of data decoded */ + /* Replication slot statistics collected by logical decoding. */ + PgStat_ReplSlotStats stats; }; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 3a2720fb5f9..07e4dcc94c3 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2348,6 +2348,7 @@ PgStat_LockEntry PgStat_PendingDroppedStatsItem PgStat_PendingIO PgStat_PendingLock +PgStat_ReplSlotStats PgStat_SLRUStats PgStat_ShmemControl PgStat_Snapshot -- 2.43.0