From 533aaedb1ecd13e9935c824cb49d23b12608d18d Mon Sep 17 00:00:00 2001 From: Lukas Fittl Date: Sun, 31 May 2026 09:59:33 -0700 Subject: [PATCH v1 3/4] Unify parallel worker handling for query text Multiple callers (parallel index builds, parallel vacuum, parallel query) all had the same implementation for passing query text to the parallel workers. To reduce duplicated code, introduce new helper functions to: (1) estimate the needed shared memory space for the query text, (2) store the query text in shared memory, and (3) restore the query text in the worker. Author: Lukas Fittl Reviewed-by: Discussion: --- src/backend/access/brin/brin.c | 28 +++--------------- src/backend/access/gin/gininsert.c | 28 +++--------------- src/backend/access/nbtree/nbtsort.c | 28 +++--------------- src/backend/access/transam/parallel.c | 42 +++++++++++++++++++++++++++ src/backend/commands/vacuumparallel.c | 26 ++--------------- src/include/access/parallel.h | 3 ++ 6 files changed, 60 insertions(+), 95 deletions(-) diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c index 8bd032c8669..c237449123d 100644 --- a/src/backend/access/brin/brin.c +++ b/src/backend/access/brin/brin.c @@ -2387,7 +2387,6 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, BrinLeader *brinleader = palloc0_object(BrinLeader); Instrumentation *instr; bool leaderparticipates = true; - int querylen; #ifdef DISABLE_LEADER_PARTICIPATION leaderparticipates = false; @@ -2430,14 +2429,7 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, EstimateParallelInstrumentation(pcxt); /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ - if (debug_query_string) - { - querylen = strlen(debug_query_string); - shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1); - shm_toc_estimate_keys(&pcxt->estimator, 1); - } - else - querylen = 0; /* keep compiler quiet */ + EstimateParallelQueryText(pcxt); /* Everyone's had a chance to ask for space, so now create the DSM */ InitializeParallelDSM(pcxt); @@ -2489,14 +2481,7 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT, sharedsort); /* Store query string for workers */ - if (debug_query_string) - { - char *sharedquery; - - sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); - memcpy(sharedquery, debug_query_string, querylen + 1); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery); - } + StoreParallelQueryText(pcxt, PARALLEL_KEY_QUERY_TEXT); /* Allocate space for each worker's Instrumentation. */ instr = StoreParallelInstrumentation(pcxt, PARALLEL_KEY_INSTRUMENTATION); @@ -2853,7 +2838,6 @@ _brin_parallel_scan_and_build(BrinBuildState *state, void _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) { - char *sharedquery; BrinShared *brinshared; Sharedsort *sharedsort; BrinBuildState *buildstate; @@ -2871,12 +2855,8 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) Assert((MyProc->statusFlags == 0) || (MyProc->statusFlags == PROC_IN_SAFE_IC)); - /* Set debug_query_string for individual workers first */ - sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true); - debug_query_string = sharedquery; - - /* Report the query string from leader */ - pgstat_report_activity(STATE_RUNNING, debug_query_string); + /* Set debug_query_string and report the query string from leader */ + RestoreParallelQueryText(toc, PARALLEL_KEY_QUERY_TEXT); /* Look up brin shared state */ brinshared = shm_toc_lookup(toc, PARALLEL_KEY_BRIN_SHARED, false); diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c index 97a8f38be5d..57e51c574b9 100644 --- a/src/backend/access/gin/gininsert.c +++ b/src/backend/access/gin/gininsert.c @@ -945,7 +945,6 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, GinLeader *ginleader = palloc0_object(GinLeader); Instrumentation *instr; bool leaderparticipates = true; - int querylen; #ifdef DISABLE_LEADER_PARTICIPATION leaderparticipates = false; @@ -987,14 +986,7 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, EstimateParallelInstrumentation(pcxt); /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ - if (debug_query_string) - { - querylen = strlen(debug_query_string); - shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1); - shm_toc_estimate_keys(&pcxt->estimator, 1); - } - else - querylen = 0; /* keep compiler quiet */ + EstimateParallelQueryText(pcxt); /* Everyone's had a chance to ask for space, so now create the DSM */ InitializeParallelDSM(pcxt); @@ -1041,14 +1033,7 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLESORT, sharedsort); /* Store query string for workers */ - if (debug_query_string) - { - char *sharedquery; - - sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); - memcpy(sharedquery, debug_query_string, querylen + 1); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery); - } + StoreParallelQueryText(pcxt, PARALLEL_KEY_QUERY_TEXT); /* Allocate space for each worker's Instrumentation. */ instr = StoreParallelInstrumentation(pcxt, PARALLEL_KEY_INSTRUMENTATION); @@ -2084,7 +2069,6 @@ _gin_parallel_scan_and_build(GinBuildState *state, void _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc) { - char *sharedquery; GinBuildShared *ginshared; Sharedsort *sharedsort; GinBuildState buildstate; @@ -2102,12 +2086,8 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc) Assert((MyProc->statusFlags == 0) || (MyProc->statusFlags == PROC_IN_SAFE_IC)); - /* Set debug_query_string for individual workers first */ - sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true); - debug_query_string = sharedquery; - - /* Report the query string from leader */ - pgstat_report_activity(STATE_RUNNING, debug_query_string); + /* Set debug_query_string and report the query string from leader */ + RestoreParallelQueryText(toc, PARALLEL_KEY_QUERY_TEXT); /* Look up gin shared state */ ginshared = shm_toc_lookup(toc, PARALLEL_KEY_GIN_SHARED, false); diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c index 7f0a1b88062..f43bb939081 100644 --- a/src/backend/access/nbtree/nbtsort.c +++ b/src/backend/access/nbtree/nbtsort.c @@ -1408,7 +1408,6 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) BTLeader *btleader = palloc0_object(BTLeader); Instrumentation *instr; bool leaderparticipates = true; - int querylen; #ifdef DISABLE_LEADER_PARTICIPATION leaderparticipates = false; @@ -1462,14 +1461,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) EstimateParallelInstrumentation(pcxt); /* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */ - if (debug_query_string) - { - querylen = strlen(debug_query_string); - shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1); - shm_toc_estimate_keys(&pcxt->estimator, 1); - } - else - querylen = 0; /* keep compiler quiet */ + EstimateParallelQueryText(pcxt); /* Everyone's had a chance to ask for space, so now create the DSM */ InitializeParallelDSM(pcxt); @@ -1535,14 +1527,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) } /* Store query string for workers */ - if (debug_query_string) - { - char *sharedquery; - - sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); - memcpy(sharedquery, debug_query_string, querylen + 1); - shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery); - } + StoreParallelQueryText(pcxt, PARALLEL_KEY_QUERY_TEXT); /* Allocate space for each worker's Instrumentation. */ instr = StoreParallelInstrumentation(pcxt, PARALLEL_KEY_INSTRUMENTATION); @@ -1717,7 +1702,6 @@ _bt_leader_participate_as_worker(BTBuildState *buildstate) void _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc) { - char *sharedquery; BTSpool *btspool; BTSpool *btspool2; BTShared *btshared; @@ -1742,12 +1726,8 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc) Assert((MyProc->statusFlags == 0) || (MyProc->statusFlags == PROC_IN_SAFE_IC)); - /* Set debug_query_string for individual workers first */ - sharedquery = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, true); - debug_query_string = sharedquery; - - /* Report the query string from leader */ - pgstat_report_activity(STATE_RUNNING, debug_query_string); + /* Set debug_query_string and report the query string from leader */ + RestoreParallelQueryText(toc, PARALLEL_KEY_QUERY_TEXT); /* Look up nbtree shared state */ btshared = shm_toc_lookup(toc, PARALLEL_KEY_BTREE_SHARED, false); diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 17fb7c15aab..f55cfa6853f 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -1027,6 +1027,48 @@ DestroyParallelContext(ParallelContext *pcxt) pfree(pcxt); } +/* + * Helpers for passing the current query text down to parallel workers. + */ + +/* Reserve DSM space for the query text, if any. */ +void +EstimateParallelQueryText(ParallelContext *pcxt) +{ + if (debug_query_string) + { + shm_toc_estimate_chunk(&pcxt->estimator, strlen(debug_query_string) + 1); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } +} + +/* Copy the query text into DSM under the given key, if any. */ +void +StoreParallelQueryText(ParallelContext *pcxt, uint64 key) +{ + if (debug_query_string) + { + Size querylen = strlen(debug_query_string); + char *sharedquery; + + sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); + memcpy(sharedquery, debug_query_string, querylen + 1); + shm_toc_insert(pcxt->toc, key, sharedquery); + } +} + +/* + * Restore the query text in a worker: set debug_query_string and report it as + * the current activity. The key is looked up with missing_ok, so this is a + * no-op (leaving debug_query_string NULL) when the leader stored no text. + */ +void +RestoreParallelQueryText(shm_toc *toc, uint64 key) +{ + debug_query_string = shm_toc_lookup(toc, key, true); + pgstat_report_activity(STATE_RUNNING, debug_query_string); +} + /* * Helpers for managing the per-worker Instrumentation array that parallel * leaders allocate in DSM. The worker side fills in its own slot directly via diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c index 5ffae66260d..f0726752f9d 100644 --- a/src/backend/commands/vacuumparallel.c +++ b/src/backend/commands/vacuumparallel.c @@ -312,7 +312,6 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, Size est_shared_len; int nindexes_mwm = 0; int parallel_workers = 0; - int querylen; /* * A parallel vacuum must be requested and there must be indexes on the @@ -362,14 +361,7 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, EstimateParallelInstrumentation(pcxt); /* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */ - if (debug_query_string) - { - querylen = strlen(debug_query_string); - shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1); - shm_toc_estimate_keys(&pcxt->estimator, 1); - } - else - querylen = 0; /* keep compiler quiet */ + EstimateParallelQueryText(pcxt); InitializeParallelDSM(pcxt); @@ -460,16 +452,7 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, PARALLEL_VACUUM_KEY_INSTRUMENTATION); /* Store query string for workers */ - if (debug_query_string) - { - char *sharedquery; - - sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1); - memcpy(sharedquery, debug_query_string, querylen + 1); - sharedquery[querylen] = '\0'; - shm_toc_insert(pcxt->toc, - PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery); - } + StoreParallelQueryText(pcxt, PARALLEL_VACUUM_KEY_QUERY_TEXT); /* Success -- return parallel vacuum state */ return pvs; @@ -1177,7 +1160,6 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) TidStore *dead_items; Instrumentation *worker_instr; int nindexes; - char *sharedquery; ErrorContextCallback errcallback; /* @@ -1191,9 +1173,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) shared = (PVShared *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_SHARED, false); /* Set debug_query_string for individual workers */ - sharedquery = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT, true); - debug_query_string = sharedquery; - pgstat_report_activity(STATE_RUNNING, debug_query_string); + RestoreParallelQueryText(toc, PARALLEL_VACUUM_KEY_QUERY_TEXT); /* Track query ID */ pgstat_report_query_id(shared->queryid, false); diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h index 3c85924e85d..20e7e7673b9 100644 --- a/src/include/access/parallel.h +++ b/src/include/access/parallel.h @@ -72,6 +72,9 @@ extern void WaitForParallelWorkersToFinish(ParallelContext *pcxt); extern void DestroyParallelContext(ParallelContext *pcxt); extern bool ParallelContextActive(void); +extern void EstimateParallelQueryText(ParallelContext *pcxt); +extern void StoreParallelQueryText(ParallelContext *pcxt, uint64 key); +extern void RestoreParallelQueryText(shm_toc *toc, uint64 key); extern void EstimateParallelInstrumentation(ParallelContext *pcxt); extern struct Instrumentation *StoreParallelInstrumentation(ParallelContext *pcxt, uint64 key); -- 2.47.1