From faa8656da138079873c5f2dba48b2c7568fea546 Mon Sep 17 00:00:00 2001 From: Kirill Bychik Date: Tue, 17 Mar 2020 14:41:50 +0100 Subject: [PATCH v9 3/6] Add infrastructure to track WAL usage. Author: Kirill Bychik, Julien Rouhaud Reviewed-by: Fuji Masao Discussion: https://postgr.es/m/CAB-hujrP8ZfUkvL5OYETipQwA=e3n7oqHFU=4ZLxWS_Cza3kQQ@mail.gmail.com --- src/backend/access/heap/vacuumlazy.c | 32 ++++++++++++---- src/backend/access/nbtree/nbtsort.c | 36 +++++++++++++----- src/backend/access/transam/xlog.c | 12 +++++- src/backend/access/transam/xloginsert.c | 13 +++++-- src/backend/executor/execParallel.c | 38 ++++++++++++++----- src/backend/executor/instrument.c | 50 +++++++++++++++++++++---- src/include/access/xlog.h | 3 +- src/include/executor/execParallel.h | 1 + src/include/executor/instrument.h | 18 ++++++++- src/tools/pgindent/typedefs.list | 1 + 10 files changed, 163 insertions(+), 41 deletions(-) diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index 9f9596c718..27e163f5b3 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -139,6 +139,7 @@ #define PARALLEL_VACUUM_KEY_DEAD_TUPLES 2 #define PARALLEL_VACUUM_KEY_QUERY_TEXT 3 #define PARALLEL_VACUUM_KEY_BUFFER_USAGE 4 +#define PARALLEL_VACUUM_KEY_WAL_USAGE 5 /* * Macro to check if we are in a parallel vacuum. If true, we are in the @@ -274,6 +275,8 @@ typedef struct LVParallelState /* Points to buffer usage area in DSM */ BufferUsage *buffer_usage; + /* Points to WAL usage area in DSM */ + WalUsage *wal_usage; /* * The number of indexes that support parallel index bulk-deletion and @@ -2154,7 +2157,7 @@ lazy_parallel_vacuum_indexes(Relation *Irel, IndexBulkDeleteResult **stats, WaitForParallelWorkersToFinish(lps->pcxt); for (i = 0; i < lps->pcxt->nworkers_launched; i++) - InstrAccumParallelQuery(&lps->buffer_usage[i]); + InstrAccumParallelQuery(&lps->buffer_usage[i], &lps->wal_usage[i]); } /* @@ -3171,6 +3174,7 @@ begin_parallel_vacuum(Oid relid, Relation *Irel, LVRelStats *vacrelstats, LVShared *shared; LVDeadTuples *dead_tuples; BufferUsage *buffer_usage; + WalUsage *wal_usage; bool *can_parallel_vacuum; long maxtuples; char *sharedquery; @@ -3255,15 +3259,19 @@ begin_parallel_vacuum(Oid relid, Relation *Irel, LVRelStats *vacrelstats, shm_toc_estimate_keys(&pcxt->estimator, 1); /* - * Estimate space for BufferUsage -- PARALLEL_VACUUM_KEY_BUFFER_USAGE. + * Estimate space for BufferUsage and WalUsage -- + * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE. * * If there are no extensions loaded that care, we could skip this. We - * have no way of knowing whether anyone's looking at pgBufferUsage, so do - * it unconditionally. + * have no way of knowing whether anyone's looking at pgBufferUsage or + * pgWalUsage, so do it unconditionally. */ shm_toc_estimate_chunk(&pcxt->estimator, mul_size(sizeof(BufferUsage), pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */ querylen = strlen(debug_query_string); @@ -3299,11 +3307,18 @@ begin_parallel_vacuum(Oid relid, Relation *Irel, LVRelStats *vacrelstats, shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_TUPLES, dead_tuples); vacrelstats->dead_tuples = dead_tuples; - /* Allocate space for each worker's BufferUsage; no need to initialize */ + /* + * Allocate space for each worker's BufferUsage and WalUsage; no need to + * initialize + */ buffer_usage = shm_toc_allocate(pcxt->toc, mul_size(sizeof(BufferUsage), pcxt->nworkers)); shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage); lps->buffer_usage = buffer_usage; + wal_usage = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage); + lps->wal_usage = wal_usage; /* Store query string for workers */ sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); @@ -3435,6 +3450,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) LVShared *lvshared; LVDeadTuples *dead_tuples; BufferUsage *buffer_usage; + WalUsage *wal_usage; int nindexes; char *sharedquery; IndexBulkDeleteResult **stats; @@ -3511,9 +3527,11 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) parallel_vacuum_index(indrels, stats, lvshared, dead_tuples, nindexes, &vacrelstats); - /* Report buffer usage during parallel execution */ + /* Report buffer/WAL usage during parallel execution */ buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false); - InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber]); + wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false); + InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber], + &wal_usage[ParallelWorkerNumber]); /* Pop the error context stack */ error_context_stack = errcallback.previous; diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c index ba48b7e9f9..d7a1b95c9f 100644 --- a/src/backend/access/nbtree/nbtsort.c +++ b/src/backend/access/nbtree/nbtsort.c @@ -83,6 +83,7 @@ #define PARALLEL_KEY_TUPLESORT_SPOOL2 UINT64CONST(0xA000000000000003) #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xA000000000000004) #define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xA000000000000005) +#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xA000000000000006) /* * DISABLE_LEADER_PARTICIPATION disables the leader's participation in @@ -206,6 +207,7 @@ typedef struct BTLeader Sharedsort *sharedsort2; Snapshot snapshot; BufferUsage *bufferusage; + WalUsage *walusage; } BTLeader; /* @@ -1480,6 +1482,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) BTSpool *btspool = buildstate->spool; BTLeader *btleader = (BTLeader *) palloc0(sizeof(BTLeader)); BufferUsage *bufferusage; + WalUsage *walusage; bool leaderparticipates = true; char *sharedquery; int querylen; @@ -1533,16 +1536,20 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) } /* - * Estimate space for BufferUsage -- PARALLEL_KEY_BUFFER_USAGE + * Estimate space for BufferUsage and WalUsage -- PARALLEL_KEY_BUFFER_USAGE + * and PARALLEL_KEY_WAL_USAGE. * - * BufferUsage during executing maintenance command can be used by an - * extension that reports the buffer usage, such as pg_stat_statements. - * We have no way of knowing whether anyone's looking at pgBufferUsage, - * so do it unconditionally. + * BufferUsage and WalUsage during executing maintenance command can be + * used by an extension that reports the buffer or WAL usage, such as + * pg_stat_statements. We have no way of knowing whether anyone's looking + * at pgBufferUsage or pgWalUsage, so do it unconditionally. */ shm_toc_estimate_chunk(&pcxt->estimator, mul_size(sizeof(BufferUsage), pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ querylen = strlen(debug_query_string); @@ -1615,10 +1622,16 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) memcpy(sharedquery, debug_query_string, querylen + 1); shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery); - /* Allocate space for each worker's BufferUsage; no need to initialize */ + /* + * Allocate space for each worker's BufferUsage and WalUsage; no need to + * initialize + */ bufferusage = shm_toc_allocate(pcxt->toc, mul_size(sizeof(BufferUsage), pcxt->nworkers)); shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufferusage); + walusage = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage); /* Launch workers, saving status for leader/caller */ LaunchParallelWorkers(pcxt); @@ -1631,6 +1644,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) btleader->sharedsort2 = sharedsort2; btleader->snapshot = snapshot; btleader->bufferusage = bufferusage; + btleader->walusage = walusage; /* If no workers were successfully launched, back out (do serial build) */ if (pcxt->nworkers_launched == 0) @@ -1669,7 +1683,8 @@ _bt_end_parallel(BTLeader *btleader) * finish, or we might get incomplete data.) */ for (i = 0; i < btleader->pcxt->nworkers_launched; i++) - InstrAccumParallelQuery(&btleader->bufferusage[i]); + InstrAccumParallelQuery(&btleader->bufferusage[i], + &btleader->walusage[i]); /* Free last reference to MVCC snapshot, if one was used */ if (IsMVCCSnapshot(btleader->snapshot)) @@ -1802,6 +1817,7 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc) LOCKMODE heapLockmode; LOCKMODE indexLockmode; BufferUsage *bufferusage; + WalUsage *walusage; int sortmem; #ifdef BTREE_BUILD_STATS @@ -1871,9 +1887,11 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc) _bt_parallel_scan_and_sort(btspool, btspool2, btshared, sharedsort, sharedsort2, sortmem, false); - /* Report buffer usage during parallel execution */ + /* Report buffer and WAL usage during parallel execution */ bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); - InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber]); + walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); + InstrEndParallelQuery(&bufferusage[ParallelWorkerNumber], + &walusage[ParallelWorkerNumber]); #ifdef BTREE_BUILD_STATS if (log_btree_build_stats) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 977d448f50..444886bf0c 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -43,6 +43,7 @@ #include "commands/progress.h" #include "commands/tablespace.h" #include "common/controldata_utils.h" +#include "executor/instrument.h" #include "miscadmin.h" #include "pg_trace.h" #include "pgstat.h" @@ -996,7 +997,8 @@ static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt); XLogRecPtr XLogInsertRecord(XLogRecData *rdata, XLogRecPtr fpw_lsn, - uint8 flags) + uint8 flags, + int num_fpw) { XLogCtlInsert *Insert = &XLogCtl->Insert; pg_crc32c rdata_crc; @@ -1252,6 +1254,14 @@ XLogInsertRecord(XLogRecData *rdata, ProcLastRecPtr = StartPos; XactLastRecEnd = EndPos; + /* Provide WAL update data to the instrumentation */ + if (inserted) + { + pgWalUsage.wal_bytes += rechdr->xl_tot_len; + pgWalUsage.wal_records++; + pgWalUsage.wal_num_fpw += num_fpw; + } + return EndPos; } diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c index a618dec776..413750948b 100644 --- a/src/backend/access/transam/xloginsert.c +++ b/src/backend/access/transam/xloginsert.c @@ -25,6 +25,7 @@ #include "access/xloginsert.h" #include "catalog/pg_control.h" #include "common/pg_lzcompress.h" +#include "executor/instrument.h" #include "miscadmin.h" #include "pg_trace.h" #include "replication/origin.h" @@ -108,7 +109,7 @@ static MemoryContext xloginsert_cxt; static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info, XLogRecPtr RedoRecPtr, bool doPageWrites, - XLogRecPtr *fpw_lsn); + XLogRecPtr *fpw_lsn, int *num_fpw); static bool XLogCompressBackupBlock(char *page, uint16 hole_offset, uint16 hole_length, char *dest, uint16 *dlen); @@ -448,6 +449,7 @@ XLogInsert(RmgrId rmid, uint8 info) bool doPageWrites; XLogRecPtr fpw_lsn; XLogRecData *rdt; + int num_fpw = 0; /* * Get values needed to decide whether to do full-page writes. Since @@ -457,9 +459,9 @@ XLogInsert(RmgrId rmid, uint8 info) GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites); rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites, - &fpw_lsn); + &fpw_lsn, &num_fpw); - EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags); + EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpw); } while (EndPos == InvalidXLogRecPtr); XLogResetInsertion(); @@ -482,7 +484,7 @@ XLogInsert(RmgrId rmid, uint8 info) static XLogRecData * XLogRecordAssemble(RmgrId rmid, uint8 info, XLogRecPtr RedoRecPtr, bool doPageWrites, - XLogRecPtr *fpw_lsn) + XLogRecPtr *fpw_lsn, int *num_fpw) { XLogRecData *rdt; uint32 total_len = 0; @@ -635,6 +637,9 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, */ bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE; + /* Report a full page imsage constructed for the WAL record */ + *num_fpw += 1; + /* * Construct XLogRecData entries for the page content. */ diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index a753d6efa0..7d9ca66fc8 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -12,7 +12,7 @@ * workers and ensuring that their state generally matches that of the * leader; see src/backend/access/transam/README.parallel for details. * However, we must save and restore relevant executor state, such as - * any ParamListInfo associated with the query, buffer usage info, and + * any ParamListInfo associated with the query, buffer/WAL usage info, and * the actual plan to be passed down to the worker. * * IDENTIFICATION @@ -62,6 +62,7 @@ #define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007) #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008) #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009) +#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 @@ -573,6 +574,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, char *pstmt_space; char *paramlistinfo_space; BufferUsage *bufusage_space; + WalUsage *walusage_space; SharedExecutorInstrumentation *instrumentation = NULL; SharedJitInstrumentation *jit_instrumentation = NULL; int pstmt_len; @@ -646,6 +648,13 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, mul_size(sizeof(BufferUsage), pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); + /* + * Same thing for WalUsage. + */ + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for tuple queues. */ shm_toc_estimate_chunk(&pcxt->estimator, mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers)); @@ -728,6 +737,12 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space); pei->buffer_usage = bufusage_space; + /* Same for WalUsage. */ + walusage_space = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space); + pei->wal_usage = walusage_space; + /* Set up the tuple queues that the workers will write into. */ pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false); @@ -1069,7 +1084,7 @@ ExecParallelRetrieveJitInstrumentation(PlanState *planstate, /* * Finish parallel execution. We wait for parallel workers to finish, and - * accumulate their buffer usage. + * accumulate their buffer/WAL usage. */ void ExecParallelFinish(ParallelExecutorInfo *pei) @@ -1109,11 +1124,11 @@ ExecParallelFinish(ParallelExecutorInfo *pei) WaitForParallelWorkersToFinish(pei->pcxt); /* - * Next, accumulate buffer usage. (This must wait for the workers to - * finish, or we might get incomplete data.) + * Next, accumulate buffer/WAL usage. (This must wait for the workers + * to finish, or we might get incomplete data.) */ for (i = 0; i < nworkers; i++) - InstrAccumParallelQuery(&pei->buffer_usage[i]); + InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]); pei->finished = true; } @@ -1333,6 +1348,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) { FixedParallelExecutorState *fpes; BufferUsage *buffer_usage; + WalUsage *wal_usage; DestReceiver *receiver; QueryDesc *queryDesc; SharedExecutorInstrumentation *instrumentation; @@ -1386,11 +1402,11 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate); /* - * Prepare to track buffer usage during query execution. + * Prepare to track buffer/WAL usage during query execution. * * We do this after starting up the executor to match what happens in the - * leader, which also doesn't count buffer accesses that occur during - * executor startup. + * leader, which also doesn't count buffer accesses and WAL activity that + * occur during executor startup. */ InstrStartParallelQuery(); @@ -1406,9 +1422,11 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) /* Shut down the executor */ ExecutorFinish(queryDesc); - /* Report buffer usage during parallel execution. */ + /* Report buffer/WAL usage during parallel execution. */ buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); - InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber]); + wal_usage = shm_toc_lookup (toc, PARALLEL_KEY_WAL_USAGE, false); + InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber], + &wal_usage[ParallelWorkerNumber]); /* Report instrumentation data if any instrumentation options are set. */ if (instrumentation != NULL) diff --git a/src/backend/executor/instrument.c b/src/backend/executor/instrument.c index 042e10f96b..e0cf458314 100644 --- a/src/backend/executor/instrument.c +++ b/src/backend/executor/instrument.c @@ -19,9 +19,11 @@ BufferUsage pgBufferUsage; static BufferUsage save_pgBufferUsage; +WalUsage pgWalUsage; +static WalUsage save_pgWalUsage; static void BufferUsageAdd(BufferUsage *dst, const BufferUsage *add); - +static void WalUsageAdd(WalUsage *dst, WalUsage *add); /* Allocate new instrumentation structure(s) */ Instrumentation * @@ -31,15 +33,17 @@ InstrAlloc(int n, int instrument_options) /* initialize all fields to zeroes, then modify as needed */ instr = palloc0(n * sizeof(Instrumentation)); - if (instrument_options & (INSTRUMENT_BUFFERS | INSTRUMENT_TIMER)) + if (instrument_options & (INSTRUMENT_BUFFERS | INSTRUMENT_TIMER | INSTRUMENT_WAL)) { bool need_buffers = (instrument_options & INSTRUMENT_BUFFERS) != 0; + bool need_wal = (instrument_options & INSTRUMENT_WAL) != 0; bool need_timer = (instrument_options & INSTRUMENT_TIMER) != 0; int i; for (i = 0; i < n; i++) { instr[i].need_bufusage = need_buffers; + instr[i].need_walusage = need_wal; instr[i].need_timer = need_timer; } } @@ -53,6 +57,7 @@ InstrInit(Instrumentation *instr, int instrument_options) { memset(instr, 0, sizeof(Instrumentation)); instr->need_bufusage = (instrument_options & INSTRUMENT_BUFFERS) != 0; + instr->need_walusage = (instrument_options & INSTRUMENT_WAL) != 0; instr->need_timer = (instrument_options & INSTRUMENT_TIMER) != 0; } @@ -67,6 +72,9 @@ InstrStartNode(Instrumentation *instr) /* save buffer usage totals at node entry, if needed */ if (instr->need_bufusage) instr->bufusage_start = pgBufferUsage; + + if (instr->need_walusage) + instr->walusage_start = pgWalUsage; } /* Exit from a plan node */ @@ -95,6 +103,10 @@ InstrStopNode(Instrumentation *instr, double nTuples) BufferUsageAccumDiff(&instr->bufusage, &pgBufferUsage, &instr->bufusage_start); + if (instr->need_walusage) + WalUsageAccumDiff(&instr->walusage, + &pgWalUsage, &instr->walusage_start); + /* Is this the first tuple of this cycle? */ if (!instr->running) { @@ -158,6 +170,9 @@ InstrAggNode(Instrumentation *dst, Instrumentation *add) /* Add delta of buffer usage since entry to node's totals */ if (dst->need_bufusage) BufferUsageAdd(&dst->bufusage, &add->bufusage); + + if (dst->need_walusage) + WalUsageAdd(&dst->walusage, &add->walusage); } /* note current values during parallel executor startup */ @@ -165,21 +180,25 @@ void InstrStartParallelQuery(void) { save_pgBufferUsage = pgBufferUsage; + save_pgWalUsage = pgWalUsage; } /* report usage after parallel executor shutdown */ void -InstrEndParallelQuery(BufferUsage *result) +InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage) { - memset(result, 0, sizeof(BufferUsage)); - BufferUsageAccumDiff(result, &pgBufferUsage, &save_pgBufferUsage); + memset(bufusage, 0, sizeof(BufferUsage)); + memset(walusage, 0, sizeof(WalUsage)); + BufferUsageAccumDiff(bufusage, &pgBufferUsage, &save_pgBufferUsage); + WalUsageAccumDiff(walusage, &pgWalUsage, &save_pgWalUsage); } /* accumulate work done by workers in leader's stats */ void -InstrAccumParallelQuery(BufferUsage *result) +InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage) { - BufferUsageAdd(&pgBufferUsage, result); + BufferUsageAdd(&pgBufferUsage, bufusage); + WalUsageAdd(&pgWalUsage, walusage); } /* dst += add */ @@ -221,3 +240,20 @@ BufferUsageAccumDiff(BufferUsage *dst, INSTR_TIME_ACCUM_DIFF(dst->blk_write_time, add->blk_write_time, sub->blk_write_time); } + +/* helper functions for WAL usage accumulation */ +static void +WalUsageAdd(WalUsage *dst, WalUsage *add) +{ + dst->wal_bytes += add->wal_bytes; + dst->wal_records += add->wal_records; + dst->wal_num_fpw += add->wal_num_fpw; +} + +void +WalUsageAccumDiff(WalUsage *dst, const WalUsage *add, const WalUsage *sub) +{ + dst->wal_bytes += add->wal_bytes - sub->wal_bytes; + dst->wal_records += add->wal_records - sub->wal_records; + dst->wal_num_fpw += add->wal_num_fpw - sub->wal_num_fpw; +} diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 9ec7b31cce..b91e724b2d 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -259,7 +259,8 @@ struct XLogRecData; extern XLogRecPtr XLogInsertRecord(struct XLogRecData *rdata, XLogRecPtr fpw_lsn, - uint8 flags); + uint8 flags, + int num_fpw); extern void XLogFlush(XLogRecPtr RecPtr); extern bool XLogBackgroundFlush(void); extern bool XLogNeedsFlush(XLogRecPtr RecPtr); diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index 17d07cf020..1cc5b524fd 100644 --- a/src/include/executor/execParallel.h +++ b/src/include/executor/execParallel.h @@ -26,6 +26,7 @@ typedef struct ParallelExecutorInfo PlanState *planstate; /* plan subtree we're running in parallel */ ParallelContext *pcxt; /* parallel context we're using */ BufferUsage *buffer_usage; /* points to bufusage area in DSM */ + WalUsage *wal_usage; /* walusage area in DSM */ SharedExecutorInstrumentation *instrumentation; /* optional */ struct SharedJitInstrumentation *jit_instrumentation; /* optional */ dsa_area *area; /* points to DSA area in DSM */ diff --git a/src/include/executor/instrument.h b/src/include/executor/instrument.h index 3825a5ac1f..e8875a8e9b 100644 --- a/src/include/executor/instrument.h +++ b/src/include/executor/instrument.h @@ -32,12 +32,20 @@ typedef struct BufferUsage instr_time blk_write_time; /* time spent writing */ } BufferUsage; +typedef struct WalUsage +{ + long wal_records; /* # of WAL records produced */ + long wal_num_fpw; /* # of WAL full page image produced */ + uint64 wal_bytes; /* size of WAL records produced */ +} WalUsage; + /* Flag bits included in InstrAlloc's instrument_options bitmask */ typedef enum InstrumentOption { INSTRUMENT_TIMER = 1 << 0, /* needs timer (and row counts) */ INSTRUMENT_BUFFERS = 1 << 1, /* needs buffer usage */ INSTRUMENT_ROWS = 1 << 2, /* needs row count */ + INSTRUMENT_WAL = 1 << 3, /* needs WAL usage */ INSTRUMENT_ALL = PG_INT32_MAX } InstrumentOption; @@ -46,6 +54,7 @@ typedef struct Instrumentation /* Parameters set at node creation: */ bool need_timer; /* true if we need timer data */ bool need_bufusage; /* true if we need buffer usage data */ + bool need_walusage; /* true if we need WAL usage data */ /* Info about current plan cycle: */ bool running; /* true if we've completed first tuple */ instr_time starttime; /* start time of current iteration of node */ @@ -53,6 +62,7 @@ typedef struct Instrumentation double firsttuple; /* time for first tuple of this cycle */ double tuplecount; /* # of tuples emitted so far this cycle */ BufferUsage bufusage_start; /* buffer usage at start */ + WalUsage walusage_start; /* WAL usage at start */ /* Accumulated statistics across all completed cycles: */ double startup; /* total startup time (in seconds) */ double total; /* total time (in seconds) */ @@ -62,6 +72,7 @@ typedef struct Instrumentation double nfiltered1; /* # of tuples removed by scanqual or joinqual */ double nfiltered2; /* # of tuples removed by "other" quals */ BufferUsage bufusage; /* total buffer usage */ + WalUsage walusage; /* total WAL usage */ } Instrumentation; typedef struct WorkerInstrumentation @@ -71,6 +82,7 @@ typedef struct WorkerInstrumentation } WorkerInstrumentation; extern PGDLLIMPORT BufferUsage pgBufferUsage; +extern PGDLLIMPORT WalUsage pgWalUsage; extern Instrumentation *InstrAlloc(int n, int instrument_options); extern void InstrInit(Instrumentation *instr, int instrument_options); @@ -79,9 +91,11 @@ extern void InstrStopNode(Instrumentation *instr, double nTuples); extern void InstrEndLoop(Instrumentation *instr); extern void InstrAggNode(Instrumentation *dst, Instrumentation *add); extern void InstrStartParallelQuery(void); -extern void InstrEndParallelQuery(BufferUsage *result); -extern void InstrAccumParallelQuery(BufferUsage *result); +extern void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage); +extern void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage); extern void BufferUsageAccumDiff(BufferUsage *dst, const BufferUsage *add, const BufferUsage *sub); +extern void WalUsageAccumDiff(WalUsage *dst, const WalUsage *add, + const WalUsage *sub); #endif /* INSTRUMENT_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 587b040532..64ae983661 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2643,6 +2643,7 @@ WalSndCtlData WalSndSendDataCallback WalSndState WalTimeSample +WalUsage WalWriteMethod Walfile WindowAgg -- 2.20.1