From f90dfcbd1968280feec6d116568697225854ac40 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Mon, 26 Dec 2022 08:13:07 +0000 Subject: [PATCH v2] Collect WAL read from buffers and file stats for WAL senders --- doc/src/sgml/monitoring.sgml | 61 +++++++++++++++ src/backend/access/transam/xlogreader.c | 56 +++++++++++++- src/backend/access/transam/xlogutils.c | 2 +- src/backend/catalog/system_views.sql | 8 +- src/backend/replication/walsender.c | 85 ++++++++++++++++++++- src/bin/pg_waldump/pg_waldump.c | 2 +- src/include/access/xlogreader.h | 30 +++++++- src/include/catalog/pg_proc.dat | 6 +- src/include/replication/walsender_private.h | 4 + src/test/regress/expected/rules.out | 10 ++- 10 files changed, 246 insertions(+), 18 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 363b183e5f..239e0b0db9 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -2615,6 +2615,67 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i Send time of last reply message received from standby server + + + + wal_read bigint + + + Number of times WAL data is read from disk + + + + + + wal_read_bytes numeric + + + Total amount of WAL read from disk in bytes + + + + + + wal_read_time double precision + + + Total amount of time spent reading WAL from disk via + WALRead request, in milliseconds + (if is enabled, + otherwise zero). + + + + + + wal_read_buffers bigint + + + Number of times WAL data is read from WAL buffers + + + + + + wal_read_bytes_buffers numeric + + + Total amount of WAL read from WAL buffers in bytes + + + + + + wal_read_time_buffers double precision + + + Total amount of time spent reading WAL from WAL buffers via + WALRead request, in milliseconds + (if is enabled, + otherwise zero). + + + diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 4a2e7af169..b9dfd4fde7 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -31,6 +31,7 @@ #include "access/xlogrecord.h" #include "catalog/pg_control.h" #include "common/pg_lzcompress.h" +#include "portability/instr_time.h" #include "replication/origin.h" #ifndef FRONTEND @@ -1488,9 +1489,9 @@ err: * When possible, this function reads data directly from WAL buffers. */ bool -WALRead(XLogReaderState *state, - char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, - WALReadError *errinfo) +WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, + TimeLineID tli, WALReadError *errinfo, WALReadStats *stats, + bool capture_wal_io_timing) { char *p; XLogRecPtr recptr; @@ -1510,10 +1511,33 @@ WALRead(XLogReaderState *state, if (!RecoveryInProgress() && tli == GetWALInsertionTimeLine()) { + instr_time start; + + /* Measure I/O timing to read WAL data if requested by the caller. */ + if (stats != NULL && capture_wal_io_timing) + INSTR_TIME_SET_CURRENT(start); + pgstat_report_wait_start(WAIT_EVENT_WAL_READ); XLogReadFromBuffers(startptr, tli, count, buf, &read_bytes); pgstat_report_wait_end(); + /* Collect I/O stats if requested by the caller. */ + if (stats != NULL && read_bytes > 0) + { + stats->wal_read_buffers++; + stats->wal_read_bytes_buffers += read_bytes; + + /* Increment the I/O timing. */ + if (capture_wal_io_timing) + { + instr_time duration; + + INSTR_TIME_SET_CURRENT(duration); + INSTR_TIME_SUBTRACT(duration, start); + stats->wal_read_time_buffers += INSTR_TIME_GET_MICROSEC(duration); + } + } + /* * Check if we have read fully (hit), partially (partial hit) or * nothing (miss) from WAL buffers. If we have read either partially or @@ -1549,6 +1573,7 @@ WALRead(XLogReaderState *state, uint32 startoff; int segbytes; int readbytes; + instr_time start; startoff = XLogSegmentOffset(recptr, state->segcxt.ws_segsize); @@ -1583,6 +1608,10 @@ WALRead(XLogReaderState *state, else segbytes = nbytes; + /* Measure I/O timing to read WAL data if requested by the caller. */ + if (stats != NULL && capture_wal_io_timing) + INSTR_TIME_SET_CURRENT(start); + #ifndef FRONTEND pgstat_report_wait_start(WAIT_EVENT_WAL_READ); #endif @@ -1595,6 +1624,27 @@ WALRead(XLogReaderState *state, pgstat_report_wait_end(); #endif + /* Collect I/O stats if requested by the caller. */ + if (stats != NULL) + { + /* Increment the number of times WAL is read from disk. */ + stats->wal_read++; + + /* Collect bytes read. */ + if (readbytes > 0) + stats->wal_read_bytes += readbytes; + + /* Increment the I/O timing. */ + if (capture_wal_io_timing) + { + instr_time duration; + + INSTR_TIME_SET_CURRENT(duration); + INSTR_TIME_SUBTRACT(duration, start); + stats->wal_read_time += INSTR_TIME_GET_MICROSEC(duration); + } + } + if (readbytes <= 0) { errinfo->wre_errno = errno; diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index 563cba258d..372de2c7d8 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -1027,7 +1027,7 @@ read_local_xlog_page_guts(XLogReaderState *state, XLogRecPtr targetPagePtr, * zero-padded up to the page boundary if it's incomplete. */ if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli, - &errinfo)) + &errinfo, NULL, false)) WALReadRaiseError(&errinfo); /* number of valid bytes in the buffer */ diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 2d8104b090..bf6315df27 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -892,7 +892,13 @@ CREATE VIEW pg_stat_replication AS W.replay_lag, W.sync_priority, W.sync_state, - W.reply_time + W.reply_time, + W.wal_read, + W.wal_read_bytes, + W.wal_read_time, + W.wal_read_buffers, + W.wal_read_bytes_buffers, + W.wal_read_time_buffers FROM pg_stat_get_activity(NULL) AS S JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid) LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index c11bb3716f..d3393b2b63 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -259,7 +259,7 @@ static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p); - +static void WalSndAccumulateWalReadStats(WALReadStats *stats); /* Initialize walsender process before entering the main command loop */ void @@ -907,6 +907,7 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req WALReadError errinfo; XLogSegNo segno; TimeLineID currTLI = GetWALInsertionTimeLine(); + WALReadStats stats; /* * Since logical decoding is only permitted on a primary server, we know @@ -932,6 +933,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req else count = flushptr - targetPagePtr; /* part of the page available */ + MemSet(&stats, 0, sizeof(WALReadStats)); + /* now actually read the data, we know it's there */ if (!WALRead(state, cur_page, @@ -940,9 +943,13 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req state->seg.ws_tli, /* Pass the current TLI because only * WalSndSegmentOpen controls whether new * TLI is needed. */ - &errinfo)) + &errinfo, + &stats, + track_wal_io_timing)) WALReadRaiseError(&errinfo); + WalSndAccumulateWalReadStats(&stats); + /* * After reading into the buffer, check that what we read was valid. We do * this after reading, because even though the segment was present when we @@ -2610,6 +2617,12 @@ InitWalSenderSlot(void) walsnd->sync_standby_priority = 0; walsnd->latch = &MyProc->procLatch; walsnd->replyTime = 0; + walsnd->wal_read_stats.wal_read = 0; + walsnd->wal_read_stats.wal_read_bytes = 0; + walsnd->wal_read_stats.wal_read_time = 0; + walsnd->wal_read_stats.wal_read_buffers = 0; + walsnd->wal_read_stats.wal_read_bytes_buffers = 0; + walsnd->wal_read_stats.wal_read_time_buffers = 0; SpinLockRelease(&walsnd->mutex); /* don't need the lock anymore */ MyWalSnd = (WalSnd *) walsnd; @@ -2730,6 +2743,7 @@ XLogSendPhysical(void) Size nbytes; XLogSegNo segno; WALReadError errinfo; + WALReadStats stats; /* If requested switch the WAL sender to the stopping state. */ if (got_STOPPING) @@ -2945,6 +2959,8 @@ XLogSendPhysical(void) enlargeStringInfo(&output_message, nbytes); retry: + MemSet(&stats, 0, sizeof(WALReadStats)); + if (!WALRead(xlogreader, &output_message.data[output_message.len], startptr, @@ -2952,9 +2968,13 @@ retry: xlogreader->seg.ws_tli, /* Pass the current TLI because * only WalSndSegmentOpen controls * whether new TLI is needed. */ - &errinfo)) + &errinfo, + &stats, + track_wal_io_timing)) WALReadRaiseError(&errinfo); + WalSndAccumulateWalReadStats(&stats); + /* See logical_read_xlog_page(). */ XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize); CheckXLogRemoved(segno, xlogreader->seg.ws_tli); @@ -3458,7 +3478,7 @@ offset_to_interval(TimeOffset offset) Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_WAL_SENDERS_COLS 12 +#define PG_STAT_GET_WAL_SENDERS_COLS 18 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; SyncRepStandbyData *sync_standbys; int num_standbys; @@ -3487,9 +3507,16 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) WalSndState state; TimestampTz replyTime; bool is_sync_standby; + int64 wal_read; + uint64 wal_read_bytes; + int64 wal_read_time; + int64 wal_read_buffers; + uint64 wal_read_bytes_buffers; + int64 wal_read_time_buffers; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS] = {0}; int j; + char buf[256]; /* Collect data from shared memory */ SpinLockAcquire(&walsnd->mutex); @@ -3509,6 +3536,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) applyLag = walsnd->applyLag; priority = walsnd->sync_standby_priority; replyTime = walsnd->replyTime; + wal_read = walsnd->wal_read_stats.wal_read; + wal_read_bytes = walsnd->wal_read_stats.wal_read_bytes; + wal_read_time = walsnd->wal_read_stats.wal_read_time; + wal_read_buffers = walsnd->wal_read_stats.wal_read_buffers; + wal_read_bytes_buffers = walsnd->wal_read_stats.wal_read_bytes_buffers; + wal_read_time_buffers = walsnd->wal_read_stats.wal_read_time_buffers; SpinLockRelease(&walsnd->mutex); /* @@ -3605,6 +3638,31 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) nulls[11] = true; else values[11] = TimestampTzGetDatum(replyTime); + + values[12] = Int64GetDatum(wal_read); + + /* Convert to numeric. */ + snprintf(buf, sizeof buf, UINT64_FORMAT, wal_read_bytes); + values[13] = DirectFunctionCall3(numeric_in, + CStringGetDatum(buf), + ObjectIdGetDatum(0), + Int32GetDatum(-1)); + + /* Convert counter from microsec to millisec for display. */ + values[14] = Float8GetDatum(((double) wal_read_time) / 1000.0); + + values[15] = Int64GetDatum(wal_read_buffers); + + /* Convert to numeric. */ + MemSet(buf, '\0', sizeof buf); + snprintf(buf, sizeof buf, UINT64_FORMAT, wal_read_bytes_buffers); + values[16] = DirectFunctionCall3(numeric_in, + CStringGetDatum(buf), + ObjectIdGetDatum(0), + Int32GetDatum(-1)); + + /* Convert counter from microsec to millisec for display. */ + values[17] = Float8GetDatum(((double) wal_read_time_buffers) / 1000.0); } tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, @@ -3849,3 +3907,22 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now) Assert(time != 0); return now - time; } + +/* + * Function to accumulate WAL Read stats for WAL sender. + */ +static void +WalSndAccumulateWalReadStats(WALReadStats *stats) +{ + /* Collect I/O stats for walsender. */ + SpinLockAcquire(&MyWalSnd->mutex); + MyWalSnd->wal_read_stats.wal_read += stats->wal_read; + MyWalSnd->wal_read_stats.wal_read_bytes += stats->wal_read_bytes; + MyWalSnd->wal_read_stats.wal_read_time += stats->wal_read_time; + MyWalSnd->wal_read_stats.wal_read_buffers += stats->wal_read_buffers; + MyWalSnd->wal_read_stats.wal_read_bytes_buffers += + stats->wal_read_bytes_buffers; + MyWalSnd->wal_read_stats.wal_read_time_buffers += + stats->wal_read_time_buffers; + SpinLockRelease(&MyWalSnd->mutex); +} diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index 9993378ca5..698ce1e9f7 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -364,7 +364,7 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, } if (!WALRead(state, readBuff, targetPagePtr, count, private->timeline, - &errinfo)) + &errinfo, NULL, false)) { WALOpenSegment *seg = &errinfo.wre_seg; char fname[MAXPGPATH]; diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index e87f91316a..9287114779 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -389,9 +389,33 @@ typedef struct WALReadError WALOpenSegment wre_seg; /* Segment we tried to read from. */ } WALReadError; -extern bool WALRead(XLogReaderState *state, - char *buf, XLogRecPtr startptr, Size count, - TimeLineID tli, WALReadError *errinfo); +/* + * WAL read stats from WALRead that the callers can use. + */ +typedef struct WALReadStats +{ + /* Number of times WAL read from disk. */ + int64 wal_read; + + /* Total amount of WAL read from disk in bytes. */ + uint64 wal_read_bytes; + + /* Total amount of time spent reading WAL from disk. */ + int64 wal_read_time; + + /* Number of times WAL read from WAL buffers. */ + int64 wal_read_buffers; + + /* Total amount of WAL read from WAL buffers in bytes. */ + uint64 wal_read_bytes_buffers; + + /* Total amount of time spent reading WAL from WAL buffers. */ + int64 wal_read_time_buffers; +} WALReadStats; + +extern bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, + Size count, TimeLineID tli, WALReadError *errinfo, + WALReadStats *stats, bool capture_wal_io_timing); /* Functions for decoding an XLogRecord */ diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 7056c95371..706a005c2b 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5391,9 +5391,9 @@ proname => 'pg_stat_get_wal_senders', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => '', - proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time}', + proallargtypes => '{int4,text,pg_lsn,pg_lsn,pg_lsn,pg_lsn,interval,interval,interval,int4,text,timestamptz,int8,numeric,float8,int8,numeric,float8}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{pid,state,sent_lsn,write_lsn,flush_lsn,replay_lsn,write_lag,flush_lag,replay_lag,sync_priority,sync_state,reply_time,wal_read,wal_read_bytes,wal_read_time,wal_read_buffers,wal_read_bytes_buffers,wal_read_time_buffers}', prosrc => 'pg_stat_get_wal_senders' }, { oid => '3317', descr => 'statistics: information about WAL receiver', proname => 'pg_stat_get_wal_receiver', proisstrict => 'f', provolatile => 's', diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 7897c74589..35413ea0d2 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -13,6 +13,7 @@ #define _WALSENDER_PRIVATE_H #include "access/xlog.h" +#include "access/xlogreader.h" #include "nodes/nodes.h" #include "replication/syncrep.h" #include "storage/latch.h" @@ -78,6 +79,9 @@ typedef struct WalSnd * Timestamp of the last message received from standby. */ TimestampTz replyTime; + + /* WAL read stats for walsender. */ + WALReadStats wal_read_stats; } WalSnd; extern PGDLLIMPORT WalSnd *MyWalSnd; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index fb9f936d43..6ae65981c2 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2054,9 +2054,15 @@ pg_stat_replication| SELECT s.pid, w.replay_lag, w.sync_priority, w.sync_state, - w.reply_time + w.reply_time, + w.wal_read, + w.wal_read_bytes, + w.wal_read_time, + w.wal_read_buffers, + w.wal_read_bytes_buffers, + w.wal_read_time_buffers FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id) - JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid))) + JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time, wal_read, wal_read_bytes, wal_read_time, wal_read_buffers, wal_read_bytes_buffers, wal_read_time_buffers) ON ((s.pid = w.pid))) LEFT JOIN pg_authid u ON ((s.usesysid = u.oid))); pg_stat_replication_slots| SELECT s.slot_name, s.spill_txns, -- 2.34.1