| From: | Ilmar Yunusov <tanswis42(at)gmail(dot)com> |
|---|---|
| To: | pgsql-hackers(at)postgresql(dot)org |
| Cc: | Ilmar Yunusov <tanswis42(at)gmail(dot)com> |
| Subject: | [RFC PATCH v0 2/7] Aggregate EXPLAIN WAITS from parallel workers |
| Date: | 2026-05-08 23:22:32 |
| Message-ID: | eb5f9c83a0f248f2def60ff6e93e998149047f06.1778280923.git.tanswis42@gmail.com |
| Views: | Whole Thread | Raw Message | Download mbox | Resend email |
| Thread: | |
| Lists: | pgsql-hackers |
---
src/backend/commands/explain.c | 4 +
src/backend/executor/execMain.c | 1 +
src/backend/executor/execParallel.c | 129 ++++++++++++++++++++++++
src/backend/executor/execUtils.c | 1 +
src/backend/utils/activity/wait_event.c | 22 +++-
src/include/executor/execParallel.h | 2 +
src/include/nodes/execnodes.h | 2 +
src/include/utils/wait_event.h | 3 +
src/test/regress/expected/explain.out | 17 ++++
src/test/regress/sql/explain.sql | 12 +++
10 files changed, 190 insertions(+), 3 deletions(-)
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 0b7cc5c15c6..9d7372f5415 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -594,6 +594,7 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es,
waitEventUsagePtr = &waitEventUsage;
pgstat_begin_wait_event_usage(waitEventUsagePtr,
queryDesc->estate->es_query_cxt);
+ queryDesc->estate->es_wait_event_usage = waitEventUsagePtr;
}
/* run the plan */
@@ -607,7 +608,10 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es,
PG_FINALLY();
{
if (waitEventUsagePtr)
+ {
pgstat_end_wait_event_usage(waitEventUsagePtr);
+ queryDesc->estate->es_wait_event_usage = NULL;
+ }
}
PG_END_TRY();
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 4b30f768680..86ab124c1c0 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -3066,6 +3066,7 @@ EvalPlanQualStart(EPQState *epqstate, Plan *planTree)
/* es_trig_target_relations must NOT be copied */
rcestate->es_top_eflags = parentestate->es_top_eflags;
rcestate->es_instrument = parentestate->es_instrument;
+ rcestate->es_wait_event_usage = parentestate->es_wait_event_usage;
/* es_auxmodifytables must NOT be copied */
/*
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 81b87d82fab..8213565a708 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -51,6 +51,7 @@
#include "utils/dsa.h"
#include "utils/lsyscache.h"
#include "utils/snapmgr.h"
+#include "utils/wait_event.h"
/*
* Magic numbers for parallel executor communication. We use constants
@@ -67,6 +68,7 @@
#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
#define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A)
+#define PARALLEL_KEY_WAIT_EVENT_USAGE UINT64CONST(0xE00000000000000B)
#define PARALLEL_TUPLE_QUEUE_SIZE 65536
@@ -114,6 +116,18 @@ struct SharedExecutorInstrumentation
(StaticAssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
(NodeInstrumentation *) (((char *) sei) + sei->instrument_offset))
+typedef struct SharedWaitEventUsageWorker
+{
+ int nentries;
+ dsa_pointer entries;
+} SharedWaitEventUsageWorker;
+
+struct SharedWaitEventUsage
+{
+ int num_workers;
+ SharedWaitEventUsageWorker worker_usage[FLEXIBLE_ARRAY_MEMBER];
+};
+
/* Context object for ExecParallelEstimate. */
typedef struct ExecParallelEstimateContext
{
@@ -141,6 +155,10 @@ static bool ExecParallelReInitializeDSM(PlanState *planstate,
ParallelContext *pcxt);
static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
SharedExecutorInstrumentation *instrumentation);
+static void ExecParallelRetrieveWaitEventUsage(ParallelExecutorInfo *pei);
+static void ExecParallelReportWaitEventUsage(SharedWaitEventUsage *shared,
+ dsa_area *area,
+ const WaitEventUsage *usage);
/* Helper function that runs in the parallel worker. */
static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
@@ -664,10 +682,12 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
char *paramlistinfo_space;
BufferUsage *bufusage_space;
WalUsage *walusage_space;
+ SharedWaitEventUsage *wait_event_usage = NULL;
SharedExecutorInstrumentation *instrumentation = NULL;
SharedJitInstrumentation *jit_instrumentation = NULL;
int pstmt_len;
int paramlistinfo_len;
+ int wait_event_usage_len = 0;
int instrumentation_len = 0;
int jit_instrumentation_len = 0;
int instrument_offset = 0;
@@ -744,6 +764,16 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
mul_size(sizeof(WalUsage), pcxt->nworkers));
shm_toc_estimate_keys(&pcxt->estimator, 1);
+ /* Estimate space for per-worker wait event usage metadata. */
+ if (estate->es_wait_event_usage != NULL)
+ {
+ wait_event_usage_len =
+ offsetof(SharedWaitEventUsage, worker_usage) +
+ mul_size(sizeof(SharedWaitEventUsageWorker), pcxt->nworkers);
+ shm_toc_estimate_chunk(&pcxt->estimator, wait_event_usage_len);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+ }
+
/* Estimate space for tuple queues. */
shm_toc_estimate_chunk(&pcxt->estimator,
mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
@@ -839,6 +869,21 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
pei->wal_usage = walusage_space;
+ /* Allocate metadata for each worker's wait event usage, if requested. */
+ if (estate->es_wait_event_usage != NULL)
+ {
+ wait_event_usage = shm_toc_allocate(pcxt->toc, wait_event_usage_len);
+ wait_event_usage->num_workers = nworkers;
+ for (int i = 0; i < nworkers; i++)
+ {
+ wait_event_usage->worker_usage[i].nentries = 0;
+ wait_event_usage->worker_usage[i].entries = InvalidDsaPointer;
+ }
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAIT_EVENT_USAGE,
+ wait_event_usage);
+ pei->wait_event_usage = wait_event_usage;
+ }
+
/* Set up the tuple queues that the workers will write into. */
pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
@@ -1213,6 +1258,68 @@ ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
}
+static void
+ExecParallelRetrieveWaitEventUsage(ParallelExecutorInfo *pei)
+{
+ SharedWaitEventUsage *shared = pei->wait_event_usage;
+ WaitEventUsage *usage;
+
+ if (shared == NULL)
+ return;
+
+ usage = pei->planstate->state->es_wait_event_usage;
+ if (usage == NULL)
+ return;
+
+ for (int i = 0; i < shared->num_workers; i++)
+ {
+ SharedWaitEventUsageWorker *worker = &shared->worker_usage[i];
+ WaitEventUsageEntry *entries;
+
+ if (worker->nentries <= 0 || !DsaPointerIsValid(worker->entries))
+ continue;
+
+ entries = dsa_get_address(pei->area, worker->entries);
+ pgstat_accumulate_wait_event_usage(usage,
+ entries,
+ worker->nentries);
+ dsa_free(pei->area, worker->entries);
+ worker->nentries = 0;
+ worker->entries = InvalidDsaPointer;
+ }
+}
+
+static void
+ExecParallelReportWaitEventUsage(SharedWaitEventUsage *shared,
+ dsa_area *area,
+ const WaitEventUsage *usage)
+{
+ SharedWaitEventUsageWorker *worker;
+ WaitEventUsageEntry *entries;
+ dsa_pointer entries_dsa;
+ Size entries_size;
+
+ Assert(shared != NULL);
+ Assert(area != NULL);
+ Assert(usage != NULL);
+ Assert(IsParallelWorker());
+ Assert(ParallelWorkerNumber < shared->num_workers);
+
+ if (usage->nentries <= 0)
+ return;
+
+ worker = &shared->worker_usage[ParallelWorkerNumber];
+ entries_size = mul_size(sizeof(WaitEventUsageEntry), usage->nentries);
+ entries_dsa = dsa_allocate(area, entries_size);
+ entries = dsa_get_address(area, entries_dsa);
+ memcpy(entries, usage->entries, entries_size);
+
+ if (DsaPointerIsValid(worker->entries))
+ dsa_free(area, worker->entries);
+ worker->nentries = usage->nentries;
+ worker->entries = entries_dsa;
+}
+
/*
* Finish parallel execution. We wait for parallel workers to finish, and
* accumulate their buffer/WAL usage.
@@ -1261,6 +1368,9 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
for (i = 0; i < nworkers; i++)
InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
+ /* Accumulate wait event usage, if requested. */
+ ExecParallelRetrieveWaitEventUsage(pei);
+
pei->finished = true;
}
@@ -1516,10 +1626,13 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
FixedParallelExecutorState *fpes;
BufferUsage *buffer_usage;
WalUsage *wal_usage;
+ SharedWaitEventUsage *wait_event_usage;
DestReceiver *receiver;
QueryDesc *queryDesc;
SharedExecutorInstrumentation *instrumentation;
SharedJitInstrumentation *jit_instrumentation;
+ WaitEventUsage waitEventUsage;
+ WaitEventUsage *waitEventUsagePtr = NULL;
int instrument_options = 0;
void *area_space;
dsa_area *area;
@@ -1535,6 +1648,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
instrument_options = instrumentation->instrument_options;
jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
true);
+ wait_event_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAIT_EVENT_USAGE, true);
queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
/* Setting debug_query_string for individual workers */
@@ -1576,6 +1690,13 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
*/
InstrStartParallelQuery();
+ if (wait_event_usage != NULL)
+ {
+ waitEventUsagePtr = &waitEventUsage;
+ pgstat_begin_wait_event_usage(waitEventUsagePtr,
+ queryDesc->estate->es_query_cxt);
+ }
+
/*
* Run the plan. If we specified a tuple bound, be careful not to demand
* more tuples than that.
@@ -1587,6 +1708,14 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
/* Shut down the executor */
ExecutorFinish(queryDesc);
+ if (waitEventUsagePtr != NULL)
+ {
+ pgstat_end_wait_event_usage(waitEventUsagePtr);
+ ExecParallelReportWaitEventUsage(wait_event_usage,
+ area,
+ waitEventUsagePtr);
+ }
+
/* Report buffer/WAL usage during parallel execution. */
buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c
index 1eb6b9f1f40..80ea777632b 100644
--- a/src/backend/executor/execUtils.c
+++ b/src/backend/executor/execUtils.c
@@ -151,6 +151,7 @@ CreateExecutorState(void)
estate->es_top_eflags = 0;
estate->es_instrument = 0;
+ estate->es_wait_event_usage = NULL;
estate->es_finished = false;
estate->es_exprcontexts = NIL;
diff --git a/src/backend/utils/activity/wait_event.c b/src/backend/utils/activity/wait_event.c
index 60d37ccbb73..eb01bc3d88c 100644
--- a/src/backend/utils/activity/wait_event.c
+++ b/src/backend/utils/activity/wait_event.c
@@ -37,7 +37,7 @@ static const char *pgstat_get_wait_ipc(WaitEventIPC w);
static const char *pgstat_get_wait_timeout(WaitEventTimeout w);
static const char *pgstat_get_wait_io(WaitEventIO w);
static void WaitEventUsageAdd(WaitEventUsage *usage, uint32 wait_event_info,
- const instr_time *elapsed);
+ uint64 calls, const instr_time *elapsed);
static uint32 local_my_wait_event_info;
@@ -442,15 +442,31 @@ pgstat_count_wait_event_end(void)
WaitEventUsageAdd(pgstat_wait_event_usage,
pgstat_wait_event_usage_current,
+ 1,
&elapsed);
pgstat_wait_event_usage_current = 0;
INSTR_TIME_SET_ZERO(pgstat_wait_event_usage_start);
}
+void
+pgstat_accumulate_wait_event_usage(WaitEventUsage *usage,
+ const WaitEventUsageEntry *entries,
+ int nentries)
+{
+ Assert(usage != NULL);
+ Assert(nentries == 0 || entries != NULL);
+
+ for (int i = 0; i < nentries; i++)
+ WaitEventUsageAdd(usage,
+ entries[i].wait_event_info,
+ entries[i].calls,
+ &entries[i].time);
+}
+
static void
WaitEventUsageAdd(WaitEventUsage *usage, uint32 wait_event_info,
- const instr_time *elapsed)
+ uint64 calls, const instr_time *elapsed)
{
WaitEventUsageEntry *entry = NULL;
@@ -494,7 +510,7 @@ WaitEventUsageAdd(WaitEventUsage *usage, uint32 wait_event_info,
INSTR_TIME_SET_ZERO(entry->time);
}
- entry->calls++;
+ entry->calls += calls;
INSTR_TIME_ADD(entry->time, *elapsed);
}
diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h
index 5a2034811d5..71df2c2511c 100644
--- a/src/include/executor/execParallel.h
+++ b/src/include/executor/execParallel.h
@@ -20,6 +20,7 @@
#include "utils/dsa.h"
typedef struct SharedExecutorInstrumentation SharedExecutorInstrumentation;
+typedef struct SharedWaitEventUsage SharedWaitEventUsage;
typedef struct ParallelExecutorInfo
{
@@ -27,6 +28,7 @@ typedef struct ParallelExecutorInfo
ParallelContext *pcxt; /* parallel context we're using */
BufferUsage *buffer_usage; /* points to bufusage area in DSM */
WalUsage *wal_usage; /* walusage area in DSM */
+ SharedWaitEventUsage *wait_event_usage; /* optional */
SharedExecutorInstrumentation *instrumentation; /* optional */
struct SharedJitInstrumentation *jit_instrumentation; /* optional */
dsa_area *area; /* points to DSA area in DSM */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 13359180d25..781c8c79132 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -69,6 +69,7 @@ typedef struct Tuplestorestate Tuplestorestate;
typedef struct TupleTableSlot TupleTableSlot;
typedef struct TupleTableSlotOps TupleTableSlotOps;
typedef struct WalUsage WalUsage;
+typedef struct WaitEventUsage WaitEventUsage;
typedef struct WorkerNodeInstrumentation WorkerNodeInstrumentation;
@@ -754,6 +755,7 @@ typedef struct EState
int es_top_eflags; /* eflags passed to ExecutorStart */
int es_instrument; /* OR of InstrumentOption flags */
+ WaitEventUsage *es_wait_event_usage; /* EXPLAIN WAITS accumulator */
bool es_finished; /* true when ExecutorFinish is done */
List *es_exprcontexts; /* List of ExprContexts within EState */
diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h
index f7fab5736bb..63992137ee7 100644
--- a/src/include/utils/wait_event.h
+++ b/src/include/utils/wait_event.h
@@ -39,6 +39,9 @@ extern void pgstat_reset_wait_event_storage(void);
extern void pgstat_begin_wait_event_usage(WaitEventUsage *usage,
MemoryContext memcontext);
extern void pgstat_end_wait_event_usage(WaitEventUsage *usage);
+extern void pgstat_accumulate_wait_event_usage(WaitEventUsage *usage,
+ const WaitEventUsageEntry *entries,
+ int nentries);
extern void pgstat_count_wait_event_start(uint32 wait_event_info);
extern void pgstat_count_wait_event_end(void);
diff --git a/src/test/regress/expected/explain.out b/src/test/regress/expected/explain.out
index 2c7a7e1d4c6..e3847e222be 100644
--- a/src/test/regress/expected/explain.out
+++ b/src/test/regress/expected/explain.out
@@ -114,6 +114,23 @@ select explain_filter_to_json('explain (analyze, waits, costs off, summary off,
{"Time": 0.0, "Calls": 0, "Wait Event": "PgSleep", "Wait Event Type": "Timeout"}
(1 row)
+begin;
+create function pg_temp.parallel_pg_sleep(float8) returns void
+ language internal volatile parallel safe as 'pg_sleep';
+set local debug_parallel_query = on;
+set local max_parallel_workers_per_gather = 1;
+select jsonb_path_query_first(
+ explain_filter_to_json('explain (analyze, waits, costs off, summary off, timing off, buffers off, format json)
+ select pg_temp.parallel_pg_sleep(0.01)
+ from tenk1 where unique1 = 1') #> '{0,Wait Events}',
+ '$[*] ? (@."Wait Event" == "PgSleep")'
+);
+ jsonb_path_query_first
+----------------------------------------------------------------------------------
+ {"Time": 0.0, "Calls": 0, "Wait Event": "PgSleep", "Wait Event Type": "Timeout"}
+(1 row)
+
+rollback;
explain (waits) select 1;
ERROR: EXPLAIN option WAITS requires ANALYZE
\a
diff --git a/src/test/regress/sql/explain.sql b/src/test/regress/sql/explain.sql
index fe025ddeac5..8821250bcef 100644
--- a/src/test/regress/sql/explain.sql
+++ b/src/test/regress/sql/explain.sql
@@ -71,6 +71,18 @@ select explain_filter('explain (buffers, format text) select * from int8_tbl i8'
-- WAITS option
select explain_filter('explain (analyze, waits, costs off, summary off, timing off, buffers off) select pg_sleep(0.01)');
select explain_filter_to_json('explain (analyze, waits, costs off, summary off, timing off, buffers off, format json) select pg_sleep(0.01)') #> '{0,Wait Events,0}';
+begin;
+create function pg_temp.parallel_pg_sleep(float8) returns void
+ language internal volatile parallel safe as 'pg_sleep';
+set local debug_parallel_query = on;
+set local max_parallel_workers_per_gather = 1;
+select jsonb_path_query_first(
+ explain_filter_to_json('explain (analyze, waits, costs off, summary off, timing off, buffers off, format json)
+ select pg_temp.parallel_pg_sleep(0.01)
+ from tenk1 where unique1 = 1') #> '{0,Wait Events}',
+ '$[*] ? (@."Wait Event" == "PgSleep")'
+);
+rollback;
explain (waits) select 1;
\a
--
2.52.0
| From | Date | Subject | |
|---|---|---|---|
| Next Message | Ilmar Yunusov | 2026-05-08 23:22:33 | [RFC PATCH v0 3/7] Attribute EXPLAIN WAITS to plan nodes |
| Previous Message | Ilmar Yunusov | 2026-05-08 23:22:31 | [RFC PATCH v0 1/7] Add EXPLAIN WAITS statement reporting |