From 6517e50f482f88ea5185609ff4dcf0e0256475d5 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Mon, 26 Dec 2022 08:14:11 +0000 Subject: [PATCH v2] Collect WAL read from file stats for WAL senders --- doc/src/sgml/monitoring.sgml | 31 +++++++++++ src/backend/access/transam/xlogreader.c | 33 ++++++++++-- src/backend/access/transam/xlogutils.c | 2 +- src/backend/catalog/system_views.sql | 5 +- src/backend/replication/walsender.c | 58 +++++++++++++++++++-- src/bin/pg_waldump/pg_waldump.c | 2 +- src/include/access/xlogreader.h | 21 ++++++-- src/include/catalog/pg_proc.dat | 6 +-- src/include/replication/walsender_private.h | 4 ++ src/test/regress/expected/rules.out | 7 ++- 10 files changed, 151 insertions(+), 18 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 363b183e5f..fdf4c7d774 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -2615,6 +2615,37 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i Send time of last reply message received from standby server + + + + wal_read bigint + + + Number of times WAL data is read from disk + + + + + + wal_read_bytes numeric + + + Total amount of WAL read from disk in bytes + + + + + + wal_read_time double precision + + + Total amount of time spent reading WAL from disk via + WALRead request, in milliseconds + (if is enabled, + otherwise zero). + + + diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index a38a80e049..7453724a07 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -31,6 +31,7 @@ #include "access/xlogrecord.h" #include "catalog/pg_control.h" #include "common/pg_lzcompress.h" +#include "portability/instr_time.h" #include "replication/origin.h" #ifndef FRONTEND @@ -1489,9 +1490,9 @@ err: * WAL buffers when possible. */ bool -WALRead(XLogReaderState *state, - char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, - WALReadError *errinfo) +WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, + TimeLineID tli, WALReadError *errinfo, WALReadStats *stats, + bool capture_wal_io_timing) { char *p; XLogRecPtr recptr; @@ -1506,6 +1507,7 @@ WALRead(XLogReaderState *state, uint32 startoff; int segbytes; int readbytes; + instr_time start; startoff = XLogSegmentOffset(recptr, state->segcxt.ws_segsize); @@ -1540,6 +1542,10 @@ WALRead(XLogReaderState *state, else segbytes = nbytes; + /* Measure I/O timing to read WAL data if requested by the caller. */ + if (stats != NULL && capture_wal_io_timing) + INSTR_TIME_SET_CURRENT(start); + #ifndef FRONTEND pgstat_report_wait_start(WAIT_EVENT_WAL_READ); #endif @@ -1552,6 +1558,27 @@ WALRead(XLogReaderState *state, pgstat_report_wait_end(); #endif + /* Collect I/O stats if requested by the caller. */ + if (stats != NULL) + { + /* Increment the number of times WAL is read from disk. */ + stats->wal_read++; + + /* Collect bytes read. */ + if (readbytes > 0) + stats->wal_read_bytes += readbytes; + + /* Increment the I/O timing. */ + if (capture_wal_io_timing) + { + instr_time duration; + + INSTR_TIME_SET_CURRENT(duration); + INSTR_TIME_SUBTRACT(duration, start); + stats->wal_read_time += INSTR_TIME_GET_MICROSEC(duration); + } + } + if (readbytes <= 0) { errinfo->wre_errno = errno; diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 563cba258d..372de2c7d8 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -1027,7 +1027,7 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr, * zero-padded up to the page boundary if it's incomplete. */ if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli, - &errinfo)) + &errinfo, NULL, false)) WALReadRaiseError(&errinfo); /* number of valid bytes in the buffer */ diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 2d8104b090..b47f44a852 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -892,7 +892,10 @@ CREATE VIEW pg_stat_replication AS W.replay_lag, W.sync_priority, W.sync_state, - W.reply_time + W.reply_time, + W.wal_read, + W.wal_read_bytes, + W.wal_read_time FROM pg_stat_get_activity(NULL) AS S JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid) LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index c11bb3716f..fa02e327f2 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -259,7 +259,7 @@ static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p); - +static void WalSndAccumulateWalReadStats(WALReadStats *stats); /* Initialize walsender process before entering the main command loop */ void @@ -907,6 +907,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req WALReadError errinfo; XLogSegNo segno; TimeLineID currTLI = GetWALInsertionTimeLine(); + WALReadStats stats; /* * Since logical decoding is only permitted on a primary server, we know @@ -932,6 +933,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req else count = flushptr - targetPagePtr; /* part of the page available */ + MemSet(&stats, 0, sizeof(WALReadStats)); + /* now actually read the data, we know it's there */ if (!WALRead(state, cur_page, @@ -940,9 +943,13 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req state->seg.ws_tli, /* Pass the current TLI because only * WalSndSegmentOpen controls whether new * TLI is needed. */ - &errinfo)) + &errinfo, + &stats, + track_wal_io_timing)) WALReadRaiseError(&errinfo); + WalSndAccumulateWalReadStats(&stats); + /* * After reading into the buffer, check that what we read was valid. We do * this after reading, because even though the segment was present when we @@ -2610,6 +2617,9 @@ InitWalSenderSlot(void) walsnd->sync_standby_priority = 0; walsnd->latch = &MyProc->procLatch; walsnd->replyTime = 0; + walsnd->wal_read_stats.wal_read = 0; + walsnd->wal_read_stats.wal_read_bytes = 0; + walsnd->wal_read_stats.wal_read_time = 0; SpinLockRelease(&walsnd->mutex); /* don't need the lock anymore */ MyWalSnd = (WalSnd *) walsnd; @@ -2730,6 +2740,7 @@ XLogSendPhysical(void) Size nbytes; XLogSegNo segno; WALReadError errinfo; + WALReadStats stats; /* If requested switch the WAL sender to the stopping state. */ if (got_STOPPING) @@ -2945,6 +2956,8 @@ XLogSendPhysical(void) enlargeStringInfo(&output_message, nbytes); retry: + MemSet(&stats, 0, sizeof(WALReadStats)); + if (!WALRead(xlogreader, &output_message.data[output_message.len], startptr, @@ -2952,9 +2965,13 @@ retry: xlogreader->seg.ws_tli, /* Pass the current TLI because * only WalSndSegmentOpen controls * whether new TLI is needed. */ - &errinfo)) + &errinfo, + &stats, + track_wal_io_timing)) WALReadRaiseError(&errinfo); + WalSndAccumulateWalReadStats(&stats); + /* See logical_read_xlog_page(). */ XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize); CheckXLogRemoved(segno, xlogreader->seg.ws_tli); @@ -3458,7 +3475,7 @@ offset_to_interval(TimeOffset offset) Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_WAL_SENDERS_COLS 12 +#define PG_STAT_GET_WAL_SENDERS_COLS 15 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; SyncRepStandbyData *sync_standbys; int num_standbys; @@ -3487,9 +3504,13 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) WalSndState state; TimestampTz replyTime; bool is_sync_standby; + int64 wal_read; + uint64 wal_read_bytes; + int64 wal_read_time; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS] = {0}; int j; + char buf[256]; /* Collect data from shared memory */ SpinLockAcquire(&walsnd->mutex); @@ -3509,6 +3530,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) applyLag = walsnd->applyLag; priority = walsnd->sync_standby_priority; replyTime = walsnd->replyTime; + wal_read = walsnd->wal_read_stats.wal_read; + wal_read_bytes = walsnd->wal_read_stats.wal_read_bytes; + wal_read_time = walsnd->wal_read_stats.wal_read_time; SpinLockRelease(&walsnd->mutex); /* @@ -3605,6 +3629,18 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) nulls[11] = true; else values[11] = TimestampTzGetDatum(replyTime); + + values[12] = Int64GetDatum(wal_read); + + /* Convert to numeric. */ + snprintf(buf, sizeof buf, UINT64_FORMAT, wal_read_bytes); + values[13] = DirectFunctionCall3(numeric_in, + CStringGetDatum(buf), + ObjectIdGetDatum(0), + Int32GetDatum(-1)); + + /* Convert counter from microsec to millisec for display. */ + values[14] = Float8GetDatum(((double) wal_read_time) / 1000.0); } tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, @@ -3849,3 +3885,17 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now) Assert(time != 0); return now - time; } + +/* + * Function to accumulate WAL Read stats for WAL sender. + */ +static void +WalSndAccumulateWalReadStats(WALReadStats *stats) +{ + /* Collect I/O stats for walsender. */ + SpinLockAcquire(&MyWalSnd->mutex); + MyWalSnd->wal_read_stats.wal_read += stats->wal_read; + MyWalSnd->wal_read_stats.wal_read_bytes += stats->wal_read_bytes; + MyWalSnd->wal_read_stats.wal_read_time += stats->wal_read_time; + SpinLockRelease(&MyWalSnd->mutex); +} diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index 9993378ca5..698ce1e9f7 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -364,7 +364,7 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, } if (!WALRead(state, readBuff, targetPagePtr, count, private->timeline, - &errinfo)) + &errinfo, NULL, false)) { WALOpenSegment *seg = &errinfo.wre_seg; char fname[MAXPGPATH]; diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index e87f91316a..26a2c975de 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -389,9 +389,24 @@ typedef struct WALReadError WALOpenSegment wre_seg; /* Segment we tried to read from. */ } WALReadError; -extern bool WALRead(XLogReaderState *state, - char *buf, XLogRecPtr startptr, Size count, - TimeLineID tli, WALReadError *errinfo); +/* + * WAL read stats from WALRead that the callers can use. + */ +typedef struct WALReadStats +{ + /* Number of times WAL read from disk. */ + int64 wal_read; + + /* Total amount of WAL read from disk in bytes. */ + uint64 wal_read_bytes; + + /* Total amount of time spent reading WAL from disk. */ + int64 wal_read_time; +} WALReadStats; + +extern bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, + Size count, TimeLineID tli, WALReadError *errinfo, + WALReadStats *stats, bool capture_wal_io_timing); /* Functions for decoding an XLogRecord */ diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 7056c95371..18320cf846 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5391,9 +5391,9 @@ proname => 'pg_stat_get_wal_senders', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => '', - proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time}', + proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz,int8,numeric,float8}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time,wal_read,wal_read_bytes,wal_read_time}', prosrc => 'pg_stat_get_wal_senders' }, { oid => '3317', descr => 'statistics: information about WAL receiver', proname => 'pg_stat_get_wal_receiver', proisstrict => 'f', provolatile => 's', diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 7897c74589..35413ea0d2 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -13,6 +13,7 @@ #define _WALSENDER_PRIVATE_H #include "access/xlog.h" +#include "access/xlogreader.h" #include "nodes/nodes.h" #include "replication/syncrep.h" #include "storage/latch.h" @@ -78,6 +79,9 @@ typedef struct WalSnd * Timestamp of the last message received from standby. */ TimestampTz replyTime; + + /* WAL read stats for walsender. */ + WALReadStats wal_read_stats; } WalSnd; extern PGDLLIMPORT WalSnd *MyWalSnd; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index fb9f936d43..fd9d298e79 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2054,9 +2054,12 @@ pg_stat_replication| SELECT s.pid, w.replay_lag, w.sync_priority, w.sync_state, - w.reply_time + w.reply_time, + w.wal_read, + w.wal_read_bytes, + w.wal_read_time FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id) - JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid))) + JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time, wal_read, wal_read_bytes, wal_read_time) ON ((s.pid = w.pid))) LEFT JOIN pg_authid u ON ((s.usesysid = u.oid))); pg_stat_replication_slots| SELECT s.slot_name, s.spill_txns, -- 2.34.1