From 57686a159d357f08b2d948f5d00ab43edb19cb10 Mon Sep 17 00:00:00 2001 From: Lukas Fittl Date: Sun, 31 May 2026 10:01:32 -0700 Subject: [PATCH v1 2/4] Unify parallel worker handling for instrumentation Introduce helpers to estimate the shared memory required for instrumentation, and for allocating and storing it in shared memory. Author: Lukas Fittl Reviewed-by: Discussion: --- src/backend/access/brin/brin.c | 21 ++++-------------- src/backend/access/gin/gininsert.c | 21 ++++-------------- src/backend/access/nbtree/nbtsort.c | 21 ++++-------------- src/backend/access/transam/parallel.c | 32 +++++++++++++++++++++++++++ src/backend/commands/vacuumparallel.c | 25 +++++---------------- src/backend/executor/execParallel.c | 27 +++++----------------- src/include/access/parallel.h | 4 ++++ 7 files changed, 58 insertions(+), 93 deletions(-) diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c index b04028a3858..8bd032c8669 100644 --- a/src/backend/access/brin/brin.c +++ b/src/backend/access/brin/brin.c @@ -2426,16 +2426,8 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, shm_toc_estimate_keys(&pcxt->estimator, 2); - /* - * Estimate space for Instrumentation -- PARALLEL_KEY_INSTRUMENTATION. - * - * If there are no extensions loaded that care, we could skip this. We - * have no way of knowing whether anyone's looking at pgWalUsage or - * pgBufferUsage, so do it unconditionally. - */ - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(Instrumentation), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for the per-worker Instrumentation array. */ + EstimateParallelInstrumentation(pcxt); /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ if (debug_query_string) @@ -2506,13 +2498,8 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery); } - /* - * Allocate space for each worker's Instrumentation; no need to - * initialize. - */ - instr = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(Instrumentation), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION, instr); + /* Allocate space for each worker's Instrumentation. */ + instr = StoreParallelInstrumentation(pcxt, PARALLEL_KEY_INSTRUMENTATION); /* Launch workers, saving status for leader/caller */ LaunchParallelWorkers(pcxt); diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c index c6d144d12f5..97a8f38be5d 100644 --- a/src/backend/access/gin/gininsert.c +++ b/src/backend/access/gin/gininsert.c @@ -983,16 +983,8 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, shm_toc_estimate_keys(&pcxt->estimator, 2); - /* - * Estimate space for Instrumentation -- PARALLEL_KEY_INSTRUMENTATION. - * - * If there are no extensions loaded that care, we could skip this. We - * have no way of knowing whether anyone's looking at pgWalUsage or - * pgBufferUsage, so do it unconditionally. - */ - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(Instrumentation), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for the per-worker Instrumentation array. */ + EstimateParallelInstrumentation(pcxt); /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ if (debug_query_string) @@ -1058,13 +1050,8 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery); } - /* - * Allocate space for each worker's Instrumentation; no need to - * initialize. - */ - instr = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(Instrumentation), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION, instr); + /* Allocate space for each worker's Instrumentation. */ + instr = StoreParallelInstrumentation(pcxt, PARALLEL_KEY_INSTRUMENTATION); /* Launch workers, saving status for leader/caller */ LaunchParallelWorkers(pcxt); diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c index 0e5fa86cf17..7f0a1b88062 100644 --- a/src/backend/access/nbtree/nbtsort.c +++ b/src/backend/access/nbtree/nbtsort.c @@ -1458,16 +1458,8 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) shm_toc_estimate_keys(&pcxt->estimator, 3); } - /* - * Estimate space for Instrumentation -- PARALLEL_KEY_INSTRUMENTATION. - * - * If there are no extensions loaded that care, we could skip this. We - * have no way of knowing whether anyone's looking at pgWalUsage or - * pgBufferUsage, so do it unconditionally. - */ - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(Instrumentation), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for the per-worker Instrumentation array. */ + EstimateParallelInstrumentation(pcxt); /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ if (debug_query_string) @@ -1552,13 +1544,8 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery); } - /* - * Allocate space for each worker's Instrumentation; no need to - * initialize. - */ - instr = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(Instrumentation), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION, instr); + /* Allocate space for each worker's Instrumentation. */ + instr = StoreParallelInstrumentation(pcxt, PARALLEL_KEY_INSTRUMENTATION); /* Launch workers, saving status for leader/caller */ LaunchParallelWorkers(pcxt); diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 89e9d224eec..17fb7c15aab 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -28,6 +28,7 @@ #include "commands/async.h" #include "commands/vacuum.h" #include "executor/execParallel.h" +#include "executor/instrument.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "libpq/pqmq.h" @@ -1026,6 +1027,37 @@ DestroyParallelContext(ParallelContext *pcxt) pfree(pcxt); } +/* + * Helpers for managing the per-worker Instrumentation array that parallel + * leaders allocate in DSM. The worker side fills in its own slot directly via + * InstrEndParallelQuery. + */ + +/* Reserve DSM space for the per-worker Instrumentation array. */ +void +EstimateParallelInstrumentation(ParallelContext *pcxt) +{ + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(Instrumentation), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + +/* + * Allocate the per-worker Instrumentation array in DSM and publish it under + * the given key. No need to initialize; each worker fills in its own slot. + * Returns the array for the leader's convenience. + */ +Instrumentation * +StoreParallelInstrumentation(ParallelContext *pcxt, uint64 key) +{ + Instrumentation *instr; + + instr = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(Instrumentation), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, key, instr); + return instr; +} + /* * Are there any parallel contexts currently active? */ diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c index b2cdab310d6..5ffae66260d 100644 --- a/src/backend/commands/vacuumparallel.c +++ b/src/backend/commands/vacuumparallel.c @@ -307,7 +307,6 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, PVShared *shared; TidStore *dead_items; PVIndStats *indstats; - Instrumentation *instr; bool *will_parallel_vacuum; Size est_indstats_len; Size est_shared_len; @@ -359,17 +358,8 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, shm_toc_estimate_chunk(&pcxt->estimator, est_shared_len); shm_toc_estimate_keys(&pcxt->estimator, 1); - /* - * Estimate space for Instrumentation -- - * PARALLEL_VACUUM_KEY_INSTRUMENTATION. - * - * If there are no extensions loaded that care, we could skip this. We - * have no way of knowing whether anyone's looking at pgBufferUsage or - * pgWalUsage, so do it unconditionally. - */ - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(Instrumentation), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for the per-worker Instrumentation array. */ + EstimateParallelInstrumentation(pcxt); /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */ if (debug_query_string) @@ -465,14 +455,9 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared); pvs->shared = shared; - /* - * Allocate space for each worker's Instrumentation; no need to - * initialize. - */ - instr = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(Instrumentation), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_INSTRUMENTATION, instr); - pvs->instr = instr; + /* Allocate space for each worker's Instrumentation. */ + pvs->instr = StoreParallelInstrumentation(pcxt, + PARALLEL_VACUUM_KEY_INSTRUMENTATION); /* Store query string for workers */ if (debug_query_string) diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 89e717a1c50..4f202f544b3 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -723,16 +723,8 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len); shm_toc_estimate_keys(&pcxt->estimator, 1); - /* - * Estimate space for Instrumentation. - * - * If EXPLAIN is not in use and there are no extensions loaded that care, - * we could skip this. But we have no way of knowing whether anyone's - * looking at pgBufferUsage, so do it unconditionally. - */ - shm_toc_estimate_chunk(&pcxt->estimator, - mul_size(sizeof(Instrumentation), pcxt->nworkers)); - shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for the per-worker Instrumentation array. */ + EstimateParallelInstrumentation(pcxt); /* Estimate space for tuple queues. */ shm_toc_estimate_chunk(&pcxt->estimator, @@ -817,18 +809,9 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space); SerializeParamList(estate->es_param_list_info, ¶mlistinfo_space); - /* - * Allocate space for each worker's Instrumentation; no need to - * initialize. - */ - { - Instrumentation *instr; - - instr = shm_toc_allocate(pcxt->toc, - mul_size(sizeof(Instrumentation), pcxt->nworkers)); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION, instr); - pei->instrumentation = instr; - } + /* Allocate space for each worker's Instrumentation. */ + pei->instrumentation = StoreParallelInstrumentation(pcxt, + PARALLEL_KEY_INSTRUMENTATION); /* Set up the tuple queues that the workers will write into. */ pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false); diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h index 60f857675e0..3c85924e85d 100644 --- a/src/include/access/parallel.h +++ b/src/include/access/parallel.h @@ -72,6 +72,10 @@ extern void WaitForParallelWorkersToFinish(ParallelContext *pcxt); extern void DestroyParallelContext(ParallelContext *pcxt); extern bool ParallelContextActive(void); +extern void EstimateParallelInstrumentation(ParallelContext *pcxt); +extern struct Instrumentation *StoreParallelInstrumentation(ParallelContext *pcxt, + uint64 key); + extern void HandleParallelMessageInterrupt(void); extern void ProcessParallelMessages(void); extern void AtEOXact_Parallel(bool isCommit); -- 2.47.1