From 82a3f4c6ced148c19d77d1acb3119d4f0b158b00 Mon Sep 17 00:00:00 2001 From: Ashutosh Sharma Date: Mon, 22 Jun 2026 11:01:13 +0000 Subject: [PATCH] Report output bytes in pg_stat_replication_slots pg_stat_replication_slots currently tracks bytes added to the reorder buffer, but it does not track bytes produced by the output plugin for downstream consumers. Add output_bytes to expose that metric and help diagnose replication throughput issues caused by a slow or stalled downstream consumer. Author: Ashutosh Bapat Author: Ashutosh Sharma Reviewed-by: Shveta Malik Reviewed-by: Bertrand Drouvot Reviewed-by: Amit Kapila Reviewed-by: Ashutosh Sharma Reviewed-by: Andres Freund Discussion: https://www.postgresql.org/message-id/CAExHW5s6KntzUyUoMbKR5dgwRmdV2Ay_2+AnTgYGAzo=Qv61wA@mail.gmail.com --- contrib/test_decoding/expected/stats.out | 48 +++++++++---------- contrib/test_decoding/sql/stats.sql | 7 +-- contrib/test_decoding/t/001_repl_stats.pl | 13 ++--- doc/src/sgml/monitoring.sgml | 24 ++++++++++ src/backend/catalog/system_views.sql | 1 + src/backend/replication/logical/logical.c | 7 ++- .../replication/logical/logicalfuncs.c | 8 ++++ .../replication/logical/reorderbuffer.c | 1 + src/backend/replication/walsender.c | 3 ++ src/backend/utils/activity/pgstat_replslot.c | 1 + src/backend/utils/adt/pgstatfuncs.c | 21 ++++---- src/include/catalog/pg_proc.dat | 6 +-- src/include/pgstat.h | 1 + src/include/replication/reorderbuffer.h | 1 + src/test/recovery/t/006_logical_decoding.pl | 12 ++--- src/test/regress/expected/rules.out | 3 +- 16 files changed, 103 insertions(+), 54 deletions(-) diff --git a/contrib/test_decoding/expected/stats.out b/contrib/test_decoding/expected/stats.out index a9ead3c41aa..98748841905 100644 --- a/contrib/test_decoding/expected/stats.out +++ b/contrib/test_decoding/expected/stats.out @@ -37,12 +37,12 @@ SELECT pg_stat_force_next_flush(); (1 row) -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; - slot_name | spill_txns | spill_count | total_txns | total_bytes | mem_exceeded_count -------------------------+------------+-------------+------------+-------------+-------------------- - regression_slot_stats1 | t | t | t | t | t - regression_slot_stats2 | t | t | t | t | t - regression_slot_stats3 | t | t | t | t | t +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, output_bytes > 0 AS output_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; + slot_name | spill_txns | spill_count | total_txns | total_bytes | output_bytes | mem_exceeded_count +------------------------+------------+-------------+------------+-------------+--------------+-------------------- + regression_slot_stats1 | t | t | t | t | t | t + regression_slot_stats2 | t | t | t | t | t | t + regression_slot_stats3 | t | t | t | t | t | t (3 rows) RESET logical_decoding_work_mem; @@ -53,12 +53,12 @@ SELECT pg_stat_reset_replication_slot('regression_slot_stats1'); (1 row) -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; - slot_name | spill_txns | spill_count | total_txns | total_bytes | mem_exceeded_count -------------------------+------------+-------------+------------+-------------+-------------------- - regression_slot_stats1 | t | t | f | f | t - regression_slot_stats2 | t | t | t | t | t - regression_slot_stats3 | t | t | t | t | t +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, output_bytes > 0 AS output_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; + slot_name | spill_txns | spill_count | total_txns | total_bytes | output_bytes | mem_exceeded_count +------------------------+------------+-------------+------------+-------------+--------------+-------------------- + regression_slot_stats1 | t | t | f | f | f | t + regression_slot_stats2 | t | t | t | t | t | t + regression_slot_stats3 | t | t | t | t | t | t (3 rows) -- reset stats for all slots @@ -68,27 +68,27 @@ SELECT pg_stat_reset_replication_slot(NULL); (1 row) -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; - slot_name | spill_txns | spill_count | total_txns | total_bytes | mem_exceeded_count -------------------------+------------+-------------+------------+-------------+-------------------- - regression_slot_stats1 | t | t | f | f | t - regression_slot_stats2 | t | t | f | f | t - regression_slot_stats3 | t | t | f | f | t +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, output_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; + slot_name | spill_txns | spill_count | total_txns | total_bytes | output_bytes | mem_exceeded_count +------------------------+------------+-------------+------------+-------------+--------------+-------------------- + regression_slot_stats1 | t | t | f | f | 0 | t + regression_slot_stats2 | t | t | f | f | 0 | t + regression_slot_stats3 | t | t | f | f | 0 | t (3 rows) -- verify accessing/resetting stats for non-existent slot does something reasonable SELECT * FROM pg_stat_get_replication_slot('do-not-exist'); - slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | slotsync_skip_count | slotsync_last_skip | stats_reset ---------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+---------------------+--------------------+------------- - do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | | + slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | output_bytes | slotsync_skip_count | slotsync_last_skip | stats_reset +--------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+--------------+---------------------+--------------------+------------- + do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | | (1 row) SELECT pg_stat_reset_replication_slot('do-not-exist'); ERROR: replication slot "do-not-exist" does not exist SELECT * FROM pg_stat_get_replication_slot('do-not-exist'); - slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | slotsync_skip_count | slotsync_last_skip | stats_reset ---------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+---------------------+--------------------+------------- - do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | | + slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | output_bytes | slotsync_skip_count | slotsync_last_skip | stats_reset +--------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+--------------+---------------------+--------------------+------------- + do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | | (1 row) -- spilling the xact diff --git a/contrib/test_decoding/sql/stats.sql b/contrib/test_decoding/sql/stats.sql index 6661dbcb85c..2f95cadf7c5 100644 --- a/contrib/test_decoding/sql/stats.sql +++ b/contrib/test_decoding/sql/stats.sql @@ -15,16 +15,17 @@ SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats1', NULL, SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats2', NULL, NULL, 'skip-empty-xacts', '1'); SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats3', NULL, NULL, 'skip-empty-xacts', '1'); SELECT pg_stat_force_next_flush(); -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; + +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, output_bytes > 0 AS output_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; RESET logical_decoding_work_mem; -- reset stats for one slot, others should be unaffected SELECT pg_stat_reset_replication_slot('regression_slot_stats1'); -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, output_bytes > 0 AS output_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; -- reset stats for all slots SELECT pg_stat_reset_replication_slot(NULL); -SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, output_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name; -- verify accessing/resetting stats for non-existent slot does something reasonable SELECT * FROM pg_stat_get_replication_slot('do-not-exist'); diff --git a/contrib/test_decoding/t/001_repl_stats.pl b/contrib/test_decoding/t/001_repl_stats.pl index 6814c792e2b..f081cf8417a 100644 --- a/contrib/test_decoding/t/001_repl_stats.pl +++ b/contrib/test_decoding/t/001_repl_stats.pl @@ -26,7 +26,8 @@ sub test_slot_stats my $result = $node->safe_psql( 'postgres', qq[ SELECT slot_name, total_txns > 0 AS total_txn, - total_bytes > 0 AS total_bytes + total_bytes > 0 AS total_bytes, + output_bytes > 0 AS output_bytes FROM pg_stat_replication_slots ORDER BY slot_name]); is($result, $expected, $msg); @@ -80,9 +81,9 @@ $node->start; # restart. test_slot_stats( $node, - qq(regression_slot1|t|t -regression_slot2|t|t -regression_slot3|t|t), + qq(regression_slot1|t|t|t +regression_slot2|t|t|t +regression_slot3|t|t|t), 'check replication statistics are updated'); # Test to remove one of the replication slots and adjust @@ -104,8 +105,8 @@ $node->start; # restart. test_slot_stats( $node, - qq(regression_slot1|t|t -regression_slot2|t|t), + qq(regression_slot1|t|t|t +regression_slot2|t|t|t), 'check replication statistics after removing the slot file'); # cleanup diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 08d5b824552..a5eed73693b 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1692,6 +1692,30 @@ description | Waiting for a newly initialized WAL file to reach durable storage + + + output_bytesbigint + + + Amount of decoded data produced for this slot's consumer by the output + plugin, after applying any output plugin filters and converting the + changes into the output plugin's format. This counts the transaction + changes together with the messages that delimit them (such as the + begin and commit messages), but not connection-management messages + such as keepalives, which are generated by the server rather than the + output plugin and are therefore not included. + + + This value can differ from total_bytes: it + may be smaller because filtered changes are not output, or larger + because the output plugin's format can be more verbose than the + decoded changes. For these reasons + output_bytes is not directly comparable to + total_bytes. + + + + slotsync_skip_countbigint diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 8f129baec90..9943706ca80 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1139,6 +1139,7 @@ CREATE VIEW pg_stat_replication_slots AS s.mem_exceeded_count, s.total_txns, s.total_bytes, + s.output_bytes, s.slotsync_skip_count, s.slotsync_last_skip, s.stats_reset diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 3541fc793e4..1638c336929 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1958,7 +1958,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) rb->memExceededCount <= 0) return; - elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64, + elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64, rb, rb->spillTxns, rb->spillCount, @@ -1968,7 +1968,8 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) rb->streamBytes, rb->memExceededCount, rb->totalTxns, - rb->totalBytes); + rb->totalBytes, + rb->outputBytes); repSlotStat.spill_txns = rb->spillTxns; repSlotStat.spill_count = rb->spillCount; @@ -1979,6 +1980,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) repSlotStat.mem_exceeded_count = rb->memExceededCount; repSlotStat.total_txns = rb->totalTxns; repSlotStat.total_bytes = rb->totalBytes; + repSlotStat.output_bytes = rb->outputBytes; pgstat_report_replslot(ctx->slot, &repSlotStat); @@ -1991,6 +1993,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) rb->memExceededCount = 0; rb->totalTxns = 0; rb->totalBytes = 0; + rb->outputBytes = 0; } /* diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 71fbaf72269..3cad69de782 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -65,6 +65,7 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi Datum values[3]; bool nulls[3]; DecodingOutputState *p; + int64 outputBytes = 0; /* SQL Datums can only be of a limited length... */ if (ctx->out->len > MaxAllocSize - VARHDRSZ) @@ -74,7 +75,9 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi memset(nulls, 0, sizeof(nulls)); values[0] = LSNGetDatum(lsn); + outputBytes += sizeof(XLogRecPtr); values[1] = TransactionIdGetDatum(xid); + outputBytes += sizeof(TransactionId); /* * Assert ctx->out is in database encoding when we're writing textual @@ -87,8 +90,13 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi /* ick, but cstring_to_text_with_len works for bytea perfectly fine */ values[2] = PointerGetDatum(cstring_to_text_with_len(ctx->out->data, ctx->out->len)); + outputBytes += ctx->out->len; tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls); + + /* Update the amount of data sent downstream. */ + ctx->reorder->outputBytes += outputBytes; + p->returned_rows++; } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 059ed860314..e735bf63f10 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -394,6 +394,7 @@ ReorderBufferAllocate(void) buffer->memExceededCount = 0; buffer->totalTxns = 0; buffer->totalBytes = 0; + buffer->outputBytes = 0; buffer->current_restart_decoding_lsn = InvalidXLogRecPtr; diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index c931d9b4fa8..1382dd2ad7f 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1650,6 +1650,9 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, /* output previously gathered data in a CopyData packet */ pq_putmessage_noblock(PqMsg_CopyData, ctx->out->data, ctx->out->len); + /* Update the amount of data sent downstream. */ + ctx->reorder->outputBytes += ctx->out->len + 1; /* +1 for the 'd' */ + CHECK_FOR_INTERRUPTS(); /* Try to flush pending output to the client */ diff --git a/src/backend/utils/activity/pgstat_replslot.c b/src/backend/utils/activity/pgstat_replslot.c index a32b70a0373..d5ffc1a7f36 100644 --- a/src/backend/utils/activity/pgstat_replslot.c +++ b/src/backend/utils/activity/pgstat_replslot.c @@ -96,6 +96,7 @@ pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *re REPLSLOT_ACC(mem_exceeded_count); REPLSLOT_ACC(total_txns); REPLSLOT_ACC(total_bytes); + REPLSLOT_ACC(output_bytes); #undef REPLSLOT_ACC pgstat_unlock_entry(entry_ref); diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 6f9c9c72de5..ef0016cb35e 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2181,7 +2181,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS) Datum pg_stat_get_replication_slot(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_REPLICATION_SLOT_COLS 13 +#define PG_STAT_GET_REPLICATION_SLOT_COLS 14 text *slotname_text = PG_GETARG_TEXT_P(0); NameData slotname; TupleDesc tupdesc; @@ -2212,11 +2212,13 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 10, "total_bytes", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 11, "slotsync_skip_count", + TupleDescInitEntry(tupdesc, (AttrNumber) 11, "output_bytes", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 12, "slotsync_last_skip", + TupleDescInitEntry(tupdesc, (AttrNumber) 12, "slotsync_skip_count", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 13, "slotsync_last_skip", TIMESTAMPTZOID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 13, "stats_reset", + TupleDescInitEntry(tupdesc, (AttrNumber) 14, "stats_reset", TIMESTAMPTZOID, -1, 0); TupleDescFinalize(tupdesc); BlessTupleDesc(tupdesc); @@ -2243,17 +2245,18 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) values[7] = Int64GetDatum(slotent->mem_exceeded_count); values[8] = Int64GetDatum(slotent->total_txns); values[9] = Int64GetDatum(slotent->total_bytes); - values[10] = Int64GetDatum(slotent->slotsync_skip_count); + values[10] = Int64GetDatum(slotent->output_bytes); + values[11] = Int64GetDatum(slotent->slotsync_skip_count); if (slotent->slotsync_last_skip == 0) - nulls[11] = true; + nulls[12] = true; else - values[11] = TimestampTzGetDatum(slotent->slotsync_last_skip); + values[12] = TimestampTzGetDatum(slotent->slotsync_last_skip); if (slotent->stat_reset_timestamp == 0) - nulls[12] = true; + nulls[13] = true; else - values[12] = TimestampTzGetDatum(slotent->stat_reset_timestamp); + values[13] = TimestampTzGetDatum(slotent->stat_reset_timestamp); /* Returns the record as Datum */ PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 402d869710b..34fe893361c 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5735,9 +5735,9 @@ { oid => '6169', descr => 'statistics: information about replication slot', proname => 'pg_stat_get_replication_slot', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'text', - proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz,timestamptz}', - proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,mem_exceeded_count,total_txns,total_bytes,slotsync_skip_count,slotsync_last_skip,stats_reset}', + proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz,timestamptz}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,mem_exceeded_count,total_txns,total_bytes,output_bytes,slotsync_skip_count,slotsync_last_skip,stats_reset}', prosrc => 'pg_stat_get_replication_slot' }, { oid => '6230', descr => 'statistics: check if a stats object exists', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index dfa2e837638..cb48d37418a 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -422,6 +422,7 @@ typedef struct PgStat_StatReplSlotEntry PgStat_Counter mem_exceeded_count; PgStat_Counter total_txns; PgStat_Counter total_bytes; + PgStat_Counter output_bytes; PgStat_Counter slotsync_skip_count; TimestampTz slotsync_last_skip; TimestampTz stat_reset_timestamp; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index ff825e4b7b2..79a8afb8006 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -699,6 +699,7 @@ struct ReorderBuffer */ int64 totalTxns; /* total number of transactions sent */ int64 totalBytes; /* total amount of data decoded */ + int64 outputBytes; /* amount of data produced for downstream */ }; diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index 97d11f98b59..812bdd2b9ed 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -214,10 +214,10 @@ my $stats_test_slot2 = 'logical_slot'; # Stats exist for stats test slot 1 is( $node_primary->safe_psql( 'postgres', - qq(SELECT total_bytes > 0, stats_reset IS NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1') + qq(SELECT total_bytes > 0, output_bytes > 0, stats_reset IS NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1') ), - qq(t|t), - qq(Total bytes is > 0 and stats_reset is NULL for slot '$stats_test_slot1'.) + qq(t|t|t), + qq(Total bytes and sent bytes are both > 0 and stats_reset is NULL for slot '$stats_test_slot1'.) ); # Do reset of stats for stats test slot 1 @@ -235,10 +235,10 @@ $node_primary->safe_psql('postgres', is( $node_primary->safe_psql( 'postgres', - qq(SELECT stats_reset > '$reset1'::timestamptz, total_bytes = 0 FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1') + qq(SELECT stats_reset > '$reset1'::timestamptz, total_bytes = 0, output_bytes = 0 FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1') ), - qq(t|t), - qq(Check that reset timestamp is later after the second reset of stats for slot '$stats_test_slot1' and confirm total_bytes was set to 0.) + qq(t|t|t), + qq(Check that reset timestamp is later after the second reset of stats for slot '$stats_test_slot1' and confirm total_bytes and output_bytes were set to 0.) ); # Check that test slot 2 has NULL in reset timestamp diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index a65a5bf0c4f..ed61b20cb5c 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2260,11 +2260,12 @@ pg_stat_replication_slots| SELECT s.slot_name, s.mem_exceeded_count, s.total_txns, s.total_bytes, + s.output_bytes, s.slotsync_skip_count, s.slotsync_last_skip, s.stats_reset FROM pg_replication_slots r, - LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, mem_exceeded_count, total_txns, total_bytes, slotsync_skip_count, slotsync_last_skip, stats_reset) + LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, mem_exceeded_count, total_txns, total_bytes, output_bytes, slotsync_skip_count, slotsync_last_skip, stats_reset) WHERE (r.datoid IS NOT NULL); pg_stat_slru| SELECT name, blks_zeroed, -- 2.43.0