From eb5f9c83a0f248f2def60ff6e93e998149047f06 Mon Sep 17 00:00:00 2001 From: Ilmar Yunusov Date: Sat, 9 May 2026 03:44:24 +0500 Subject: [RFC PATCH v1 2/7] Aggregate EXPLAIN WAITS from parallel workers --- src/backend/commands/explain.c | 4 + src/backend/executor/execMain.c | 1 + src/backend/executor/execParallel.c | 129 ++++++++++++++++++++++++ src/backend/executor/execUtils.c | 1 + src/backend/utils/activity/wait_event.c | 22 +++- src/include/executor/execParallel.h | 2 + src/include/nodes/execnodes.h | 2 + src/include/utils/wait_event.h | 3 + src/test/regress/expected/explain.out | 17 ++++ src/test/regress/sql/explain.sql | 12 +++ 10 files changed, 190 insertions(+), 3 deletions(-) diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 0b7cc5c15c6..9d7372f5415 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -594,6 +594,7 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es, waitEventUsagePtr = &waitEventUsage; pgstat_begin_wait_event_usage(waitEventUsagePtr, queryDesc->estate->es_query_cxt); + queryDesc->estate->es_wait_event_usage = waitEventUsagePtr; } /* run the plan */ @@ -607,7 +608,10 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es, PG_FINALLY(); { if (waitEventUsagePtr) + { pgstat_end_wait_event_usage(waitEventUsagePtr); + queryDesc->estate->es_wait_event_usage = NULL; + } } PG_END_TRY(); diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 4b30f768680..86ab124c1c0 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -3066,6 +3066,7 @@ EvalPlanQualStart(EPQState *epqstate, Plan *planTree) /* es_trig_target_relations must NOT be copied */ rcestate->es_top_eflags = parentestate->es_top_eflags; rcestate->es_instrument = parentestate->es_instrument; + rcestate->es_wait_event_usage = parentestate->es_wait_event_usage; /* es_auxmodifytables must NOT be copied */ /* diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 81b87d82fab..8213565a708 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -51,6 +51,7 @@ #include "utils/dsa.h" #include "utils/lsyscache.h" #include "utils/snapmgr.h" +#include "utils/wait_event.h" /* * Magic numbers for parallel executor communication. We use constants @@ -67,6 +68,7 @@ #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008) #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009) #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A) +#define PARALLEL_KEY_WAIT_EVENT_USAGE UINT64CONST(0xE00000000000000B) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 @@ -114,6 +116,18 @@ struct SharedExecutorInstrumentation (StaticAssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \ (NodeInstrumentation *) (((char *) sei) + sei->instrument_offset)) +typedef struct SharedWaitEventUsageWorker +{ + int nentries; + dsa_pointer entries; +} SharedWaitEventUsageWorker; + +struct SharedWaitEventUsage +{ + int num_workers; + SharedWaitEventUsageWorker worker_usage[FLEXIBLE_ARRAY_MEMBER]; +}; + /* Context object for ExecParallelEstimate. */ typedef struct ExecParallelEstimateContext { @@ -141,6 +155,10 @@ static bool ExecParallelReInitializeDSM(PlanState *planstate, ParallelContext *pcxt); static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, SharedExecutorInstrumentation *instrumentation); +static void ExecParallelRetrieveWaitEventUsage(ParallelExecutorInfo *pei); +static void ExecParallelReportWaitEventUsage(SharedWaitEventUsage *shared, + dsa_area *area, + const WaitEventUsage *usage); /* Helper function that runs in the parallel worker. */ static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc); @@ -664,10 +682,12 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, char *paramlistinfo_space; BufferUsage *bufusage_space; WalUsage *walusage_space; + SharedWaitEventUsage *wait_event_usage = NULL; SharedExecutorInstrumentation *instrumentation = NULL; SharedJitInstrumentation *jit_instrumentation = NULL; int pstmt_len; int paramlistinfo_len; + int wait_event_usage_len = 0; int instrumentation_len = 0; int jit_instrumentation_len = 0; int instrument_offset = 0; @@ -744,6 +764,16 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, mul_size(sizeof(WalUsage), pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for per-worker wait event usage metadata. */ + if (estate->es_wait_event_usage != NULL) + { + wait_event_usage_len = + offsetof(SharedWaitEventUsage, worker_usage) + + mul_size(sizeof(SharedWaitEventUsageWorker), pcxt->nworkers); + shm_toc_estimate_chunk(&pcxt->estimator, wait_event_usage_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } + /* Estimate space for tuple queues. */ shm_toc_estimate_chunk(&pcxt->estimator, mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers)); @@ -839,6 +869,21 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space); pei->wal_usage = walusage_space; + /* Allocate metadata for each worker's wait event usage, if requested. */ + if (estate->es_wait_event_usage != NULL) + { + wait_event_usage = shm_toc_allocate(pcxt->toc, wait_event_usage_len); + wait_event_usage->num_workers = nworkers; + for (int i = 0; i < nworkers; i++) + { + wait_event_usage->worker_usage[i].nentries = 0; + wait_event_usage->worker_usage[i].entries = InvalidDsaPointer; + } + shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAIT_EVENT_USAGE, + wait_event_usage); + pei->wait_event_usage = wait_event_usage; + } + /* Set up the tuple queues that the workers will write into. */ pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false); @@ -1213,6 +1258,68 @@ ExecParallelRetrieveJitInstrumentation(PlanState *planstate, memcpy(planstate->worker_jit_instrument, shared_jit, ibytes); } +static void +ExecParallelRetrieveWaitEventUsage(ParallelExecutorInfo *pei) +{ + SharedWaitEventUsage *shared = pei->wait_event_usage; + WaitEventUsage *usage; + + if (shared == NULL) + return; + + usage = pei->planstate->state->es_wait_event_usage; + if (usage == NULL) + return; + + for (int i = 0; i < shared->num_workers; i++) + { + SharedWaitEventUsageWorker *worker = &shared->worker_usage[i]; + WaitEventUsageEntry *entries; + + if (worker->nentries <= 0 || !DsaPointerIsValid(worker->entries)) + continue; + + entries = dsa_get_address(pei->area, worker->entries); + pgstat_accumulate_wait_event_usage(usage, + entries, + worker->nentries); + dsa_free(pei->area, worker->entries); + worker->nentries = 0; + worker->entries = InvalidDsaPointer; + } +} + +static void +ExecParallelReportWaitEventUsage(SharedWaitEventUsage *shared, + dsa_area *area, + const WaitEventUsage *usage) +{ + SharedWaitEventUsageWorker *worker; + WaitEventUsageEntry *entries; + dsa_pointer entries_dsa; + Size entries_size; + + Assert(shared != NULL); + Assert(area != NULL); + Assert(usage != NULL); + Assert(IsParallelWorker()); + Assert(ParallelWorkerNumber < shared->num_workers); + + if (usage->nentries <= 0) + return; + + worker = &shared->worker_usage[ParallelWorkerNumber]; + entries_size = mul_size(sizeof(WaitEventUsageEntry), usage->nentries); + entries_dsa = dsa_allocate(area, entries_size); + entries = dsa_get_address(area, entries_dsa); + memcpy(entries, usage->entries, entries_size); + + if (DsaPointerIsValid(worker->entries)) + dsa_free(area, worker->entries); + worker->nentries = usage->nentries; + worker->entries = entries_dsa; +} + /* * Finish parallel execution. We wait for parallel workers to finish, and * accumulate their buffer/WAL usage. @@ -1261,6 +1368,9 @@ ExecParallelFinish(ParallelExecutorInfo *pei) for (i = 0; i < nworkers; i++) InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]); + /* Accumulate wait event usage, if requested. */ + ExecParallelRetrieveWaitEventUsage(pei); + pei->finished = true; } @@ -1516,10 +1626,13 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) FixedParallelExecutorState *fpes; BufferUsage *buffer_usage; WalUsage *wal_usage; + SharedWaitEventUsage *wait_event_usage; DestReceiver *receiver; QueryDesc *queryDesc; SharedExecutorInstrumentation *instrumentation; SharedJitInstrumentation *jit_instrumentation; + WaitEventUsage waitEventUsage; + WaitEventUsage *waitEventUsagePtr = NULL; int instrument_options = 0; void *area_space; dsa_area *area; @@ -1535,6 +1648,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) instrument_options = instrumentation->instrument_options; jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION, true); + wait_event_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAIT_EVENT_USAGE, true); queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options); /* Setting debug_query_string for individual workers */ @@ -1576,6 +1690,13 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) */ InstrStartParallelQuery(); + if (wait_event_usage != NULL) + { + waitEventUsagePtr = &waitEventUsage; + pgstat_begin_wait_event_usage(waitEventUsagePtr, + queryDesc->estate->es_query_cxt); + } + /* * Run the plan. If we specified a tuple bound, be careful not to demand * more tuples than that. @@ -1587,6 +1708,14 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) /* Shut down the executor */ ExecutorFinish(queryDesc); + if (waitEventUsagePtr != NULL) + { + pgstat_end_wait_event_usage(waitEventUsagePtr); + ExecParallelReportWaitEventUsage(wait_event_usage, + area, + waitEventUsagePtr); + } + /* Report buffer/WAL usage during parallel execution. */ buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false); diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c index 1eb6b9f1f40..80ea777632b 100644 --- a/src/backend/executor/execUtils.c +++ b/src/backend/executor/execUtils.c @@ -151,6 +151,7 @@ CreateExecutorState(void) estate->es_top_eflags = 0; estate->es_instrument = 0; + estate->es_wait_event_usage = NULL; estate->es_finished = false; estate->es_exprcontexts = NIL; diff --git a/src/backend/utils/activity/wait_event.c b/src/backend/utils/activity/wait_event.c index 60d37ccbb73..eb01bc3d88c 100644 --- a/src/backend/utils/activity/wait_event.c +++ b/src/backend/utils/activity/wait_event.c @@ -37,7 +37,7 @@ static const char *pgstat_get_wait_ipc(WaitEventIPC w); static const char *pgstat_get_wait_timeout(WaitEventTimeout w); static const char *pgstat_get_wait_io(WaitEventIO w); static void WaitEventUsageAdd(WaitEventUsage *usage, uint32 wait_event_info, - const instr_time *elapsed); + uint64 calls, const instr_time *elapsed); static uint32 local_my_wait_event_info; @@ -442,15 +442,31 @@ pgstat_count_wait_event_end(void) WaitEventUsageAdd(pgstat_wait_event_usage, pgstat_wait_event_usage_current, + 1, &elapsed); pgstat_wait_event_usage_current = 0; INSTR_TIME_SET_ZERO(pgstat_wait_event_usage_start); } +void +pgstat_accumulate_wait_event_usage(WaitEventUsage *usage, + const WaitEventUsageEntry *entries, + int nentries) +{ + Assert(usage != NULL); + Assert(nentries == 0 || entries != NULL); + + for (int i = 0; i < nentries; i++) + WaitEventUsageAdd(usage, + entries[i].wait_event_info, + entries[i].calls, + &entries[i].time); +} + static void WaitEventUsageAdd(WaitEventUsage *usage, uint32 wait_event_info, - const instr_time *elapsed) + uint64 calls, const instr_time *elapsed) { WaitEventUsageEntry *entry = NULL; @@ -494,7 +510,7 @@ WaitEventUsageAdd(WaitEventUsage *usage, uint32 wait_event_info, INSTR_TIME_SET_ZERO(entry->time); } - entry->calls++; + entry->calls += calls; INSTR_TIME_ADD(entry->time, *elapsed); } diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index 5a2034811d5..71df2c2511c 100644 --- a/src/include/executor/execParallel.h +++ b/src/include/executor/execParallel.h @@ -20,6 +20,7 @@ #include "utils/dsa.h" typedef struct SharedExecutorInstrumentation SharedExecutorInstrumentation; +typedef struct SharedWaitEventUsage SharedWaitEventUsage; typedef struct ParallelExecutorInfo { @@ -27,6 +28,7 @@ typedef struct ParallelExecutorInfo ParallelContext *pcxt; /* parallel context we're using */ BufferUsage *buffer_usage; /* points to bufusage area in DSM */ WalUsage *wal_usage; /* walusage area in DSM */ + SharedWaitEventUsage *wait_event_usage; /* optional */ SharedExecutorInstrumentation *instrumentation; /* optional */ struct SharedJitInstrumentation *jit_instrumentation; /* optional */ dsa_area *area; /* points to DSA area in DSM */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 13359180d25..781c8c79132 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -69,6 +69,7 @@ typedef struct Tuplestorestate Tuplestorestate; typedef struct TupleTableSlot TupleTableSlot; typedef struct TupleTableSlotOps TupleTableSlotOps; typedef struct WalUsage WalUsage; +typedef struct WaitEventUsage WaitEventUsage; typedef struct WorkerNodeInstrumentation WorkerNodeInstrumentation; @@ -754,6 +755,7 @@ typedef struct EState int es_top_eflags; /* eflags passed to ExecutorStart */ int es_instrument; /* OR of InstrumentOption flags */ + WaitEventUsage *es_wait_event_usage; /* EXPLAIN WAITS accumulator */ bool es_finished; /* true when ExecutorFinish is done */ List *es_exprcontexts; /* List of ExprContexts within EState */ diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h index f7fab5736bb..63992137ee7 100644 --- a/src/include/utils/wait_event.h +++ b/src/include/utils/wait_event.h @@ -39,6 +39,9 @@ extern void pgstat_reset_wait_event_storage(void); extern void pgstat_begin_wait_event_usage(WaitEventUsage *usage, MemoryContext memcontext); extern void pgstat_end_wait_event_usage(WaitEventUsage *usage); +extern void pgstat_accumulate_wait_event_usage(WaitEventUsage *usage, + const WaitEventUsageEntry *entries, + int nentries); extern void pgstat_count_wait_event_start(uint32 wait_event_info); extern void pgstat_count_wait_event_end(void); diff --git a/src/test/regress/expected/explain.out b/src/test/regress/expected/explain.out index 2c7a7e1d4c6..e3847e222be 100644 --- a/src/test/regress/expected/explain.out +++ b/src/test/regress/expected/explain.out @@ -114,6 +114,23 @@ select explain_filter_to_json('explain (analyze, waits, costs off, summary off, {"Time": 0.0, "Calls": 0, "Wait Event": "PgSleep", "Wait Event Type": "Timeout"} (1 row) +begin; +create function pg_temp.parallel_pg_sleep(float8) returns void + language internal volatile parallel safe as 'pg_sleep'; +set local debug_parallel_query = on; +set local max_parallel_workers_per_gather = 1; +select jsonb_path_query_first( + explain_filter_to_json('explain (analyze, waits, costs off, summary off, timing off, buffers off, format json) + select pg_temp.parallel_pg_sleep(0.01) + from tenk1 where unique1 = 1') #> '{0,Wait Events}', + '$[*] ? (@."Wait Event" == "PgSleep")' +); + jsonb_path_query_first +---------------------------------------------------------------------------------- + {"Time": 0.0, "Calls": 0, "Wait Event": "PgSleep", "Wait Event Type": "Timeout"} +(1 row) + +rollback; explain (waits) select 1; ERROR: EXPLAIN option WAITS requires ANALYZE \a diff --git a/src/test/regress/sql/explain.sql b/src/test/regress/sql/explain.sql index fe025ddeac5..8821250bcef 100644 --- a/src/test/regress/sql/explain.sql +++ b/src/test/regress/sql/explain.sql @@ -71,6 +71,18 @@ select explain_filter('explain (buffers, format text) select * from int8_tbl i8' -- WAITS option select explain_filter('explain (analyze, waits, costs off, summary off, timing off, buffers off) select pg_sleep(0.01)'); select explain_filter_to_json('explain (analyze, waits, costs off, summary off, timing off, buffers off, format json) select pg_sleep(0.01)') #> '{0,Wait Events,0}'; +begin; +create function pg_temp.parallel_pg_sleep(float8) returns void + language internal volatile parallel safe as 'pg_sleep'; +set local debug_parallel_query = on; +set local max_parallel_workers_per_gather = 1; +select jsonb_path_query_first( + explain_filter_to_json('explain (analyze, waits, costs off, summary off, timing off, buffers off, format json) + select pg_temp.parallel_pg_sleep(0.01) + from tenk1 where unique1 = 1') #> '{0,Wait Events}', + '$[*] ? (@."Wait Event" == "PgSleep")' +); +rollback; explain (waits) select 1; \a -- 2.52.0