From 9d664d24e1c4bcd036c661ad0f9e88095847229c Mon Sep 17 00:00:00 2001 From: Lukas Fittl Date: Sun, 31 May 2026 11:34:29 -0700 Subject: [PATCH v1 4/4] Unify parallel index build infrastructure The three in-tree index AMs that currently support parallel index builds (nbtree, gin and brin) each share very similar code for initializing parallel workers, allocating shared memory and communicating between the leader and the workers. To reduce duplication, introduce shared helpers for parallel index builds, unifiying the existing code. This also fixes an oversight in parallel GIN index builds that forgot to pass the queryid to the parallel workers, like other AMs do. Author: Lukas Fittl Reviewed-by: Discussion: --- src/backend/access/brin/brin.c | 188 +++---------- src/backend/access/common/Makefile | 1 + src/backend/access/common/meson.build | 1 + .../access/common/parallel_index_build.c | 246 ++++++++++++++++++ src/backend/access/gin/gininsert.c | 182 +++---------- src/backend/access/nbtree/nbtsort.c | 202 +++----------- src/include/access/parallel_index_build.h | 119 +++++++++ 7 files changed, 465 insertions(+), 474 deletions(-) create mode 100644 src/backend/access/common/parallel_index_build.c create mode 100644 src/include/access/parallel_index_build.h diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c index c237449123d..dc651383d63 100644 --- a/src/backend/access/brin/brin.c +++ b/src/backend/access/brin/brin.c @@ -19,6 +19,7 @@ #include "access/brin_page.h" #include "access/brin_pageops.h" #include "access/brin_xlog.h" +#include "access/parallel_index_build.h" #include "access/relation.h" #include "access/reloptions.h" #include "access/relscan.h" @@ -59,49 +60,13 @@ */ typedef struct BrinShared { - /* - * These fields are not modified during the build. They primarily exist - * for the benefit of worker processes that need to create state - * corresponding to that used by the leader. - */ - Oid heaprelid; - Oid indexrelid; - bool isconcurrent; - BlockNumber pagesPerRange; - int scantuplesortstates; - - /* Query ID, for report in worker processes */ - int64 queryid; + /* Common parallel index build state (must be first) */ + ParallelIndexBuildShared base; /* - * workersdonecv is used to monitor the progress of workers. All parallel - * participants must indicate that they are done before leader can use - * results built by the workers (and before leader can write the data into - * the index). + * BRIN-specific immutable state, not modified during the build. */ - ConditionVariable workersdonecv; - - /* - * mutex protects all fields before heapdesc. - * - * These fields contain status information of interest to BRIN index - * builds that must work just the same when an index is built in parallel. - */ - slock_t mutex; - - /* - * Mutable state that is maintained by workers, and reported back to - * leader at end of the scans. - * - * nparticipantsdone is number of worker processes finished. - * - * reltuples is the total number of input heap tuples. - * - * indtuples is the total number of tuples that made it into the index. - */ - int nparticipantsdone; - double reltuples; - double indtuples; + BlockNumber pagesPerRange; /* * ParallelTableScanDescData data follows. Can't directly embed here, as @@ -233,7 +198,6 @@ static void brin_fill_empty_ranges(BrinBuildState *state, static void _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, bool isconcurrent, int request); static void _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state); -static Size _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot); static double _brin_parallel_heapscan(BrinBuildState *state); static double _brin_parallel_merge(BrinBuildState *state); static void _brin_leader_participate_as_worker(BrinBuildState *buildstate, @@ -2396,29 +2360,18 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, * Enter parallel mode, and create context for parallel build of brin * index */ - EnterParallelMode(); - Assert(request > 0); - pcxt = CreateParallelContext("postgres", "_brin_parallel_build_main", - request); + pcxt = ParallelIndexBuildCreateContext("_brin_parallel_build_main", request); scantuplesortstates = leaderparticipates ? request + 1 : request; - /* - * Prepare for scan of the base relation. In a normal index build, we use - * SnapshotAny because we must retrieve all tuples and do our own time - * qual checks (because we have to index RECENTLY_DEAD tuples). In a - * concurrent build, we take a regular MVCC snapshot and index whatever's - * live according to that. - */ - if (!isconcurrent) - snapshot = SnapshotAny; - else - snapshot = RegisterSnapshot(GetTransactionSnapshot()); + /* Prepare for scan of the base relation. */ + snapshot = ParallelIndexBuildGetSnapshot(isconcurrent); /* * Estimate size for our own PARALLEL_KEY_BRIN_SHARED workspace. */ - estbrinshared = _brin_parallel_estimate_shared(heap, snapshot); + estbrinshared = ParallelIndexBuildEstimateShared(heap, snapshot, + sizeof(BrinShared)); shm_toc_estimate_chunk(&pcxt->estimator, estbrinshared); estsort = tuplesort_estimate_shared(scantuplesortstates); shm_toc_estimate_chunk(&pcxt->estimator, estsort); @@ -2446,24 +2399,13 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, /* Store shared build state, for which we reserved space */ brinshared = (BrinShared *) shm_toc_allocate(pcxt->toc, estbrinshared); - /* Initialize immutable state */ - brinshared->heaprelid = RelationGetRelid(heap); - brinshared->indexrelid = RelationGetRelid(index); - brinshared->isconcurrent = isconcurrent; - brinshared->scantuplesortstates = scantuplesortstates; + /* Initialize BRIN-specific immutable state */ brinshared->pagesPerRange = buildstate->bs_pagesPerRange; - brinshared->queryid = pgstat_get_my_query_id(); - ConditionVariableInit(&brinshared->workersdonecv); - SpinLockInit(&brinshared->mutex); - - /* Initialize mutable state */ - brinshared->nparticipantsdone = 0; - brinshared->reltuples = 0.0; - brinshared->indtuples = 0.0; - - table_parallelscan_initialize(heap, - ParallelTableScanFromBrinShared(brinshared), - snapshot); + /* Initialize common state, and the parallel scan that follows the struct */ + ParallelIndexBuildInitShared(&brinshared->base, heap, index, isconcurrent, + scantuplesortstates, + ParallelTableScanFromBrinShared(brinshared), + snapshot); /* * Store shared tuplesort-private state, for which we reserved space. @@ -2524,21 +2466,8 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, static void _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state) { - /* Shutdown worker processes */ - WaitForParallelWorkersToFinish(brinleader->pcxt); - - /* - * Next, accumulate WAL usage. (This must wait for the workers to finish, - * or we might get incomplete data.) - */ - InstrAccumParallelQuery(brinleader->instr, - brinleader->pcxt->nworkers_launched); - - /* Free last reference to MVCC snapshot, if one was used */ - if (IsMVCCSnapshot(brinleader->snapshot)) - UnregisterSnapshot(brinleader->snapshot); - DestroyParallelContext(brinleader->pcxt); - ExitParallelMode(); + ParallelIndexBuildEnd(brinleader->pcxt, brinleader->instr, + brinleader->snapshot); } /* @@ -2554,28 +2483,12 @@ static double _brin_parallel_heapscan(BrinBuildState *state) { BrinShared *brinshared = state->bs_leader->brinshared; - int nparticipanttuplesorts; - nparticipanttuplesorts = state->bs_leader->nparticipanttuplesorts; - for (;;) - { - SpinLockAcquire(&brinshared->mutex); - if (brinshared->nparticipantsdone == nparticipanttuplesorts) - { - /* copy the data into leader state */ - state->bs_reltuples = brinshared->reltuples; - state->bs_numtuples = brinshared->indtuples; - - SpinLockRelease(&brinshared->mutex); - break; - } - SpinLockRelease(&brinshared->mutex); - - ConditionVariableSleep(&brinshared->workersdonecv, - WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN); - } - - ConditionVariableCancelSleep(); + /* Wait for all participants to finish, and collect their results */ + ParallelIndexBuildWaitForWorkers(&brinshared->base, + state->bs_leader->nparticipanttuplesorts, + &state->bs_reltuples, &state->bs_numtuples, + NULL, NULL); return state->bs_reltuples; } @@ -2734,18 +2647,6 @@ _brin_parallel_merge(BrinBuildState *state) return reltuples; } -/* - * Returns size of shared memory required to store state for a parallel - * brin index build based on the snapshot its parallel scan will use. - */ -static Size -_brin_parallel_estimate_shared(Relation heap, Snapshot snapshot) -{ - /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */ - return add_size(BUFFERALIGN(sizeof(BrinShared)), - table_parallelscan_estimate(heap, snapshot)); -} - /* * Within leader, participate as a parallel worker. */ @@ -2800,7 +2701,7 @@ _brin_parallel_scan_and_build(BrinBuildState *state, /* Join parallel scan */ indexInfo = BuildIndexInfo(index); - indexInfo->ii_Concurrent = brinshared->isconcurrent; + indexInfo->ii_Concurrent = brinshared->base.isconcurrent; scan = table_beginscan_parallel(heap, ParallelTableScanFromBrinShared(brinshared), @@ -2817,17 +2718,9 @@ _brin_parallel_scan_and_build(BrinBuildState *state, state->bs_reltuples += reltuples; - /* - * Done. Record ambuild statistics. - */ - SpinLockAcquire(&brinshared->mutex); - brinshared->nparticipantsdone++; - brinshared->reltuples += state->bs_reltuples; - brinshared->indtuples += state->bs_numtuples; - SpinLockRelease(&brinshared->mutex); - - /* Notify leader */ - ConditionVariableSignal(&brinshared->workersdonecv); + /* Done. Record ambuild statistics, and notify leader. */ + ParallelIndexBuildReportScanDone(&brinshared->base, state->bs_reltuples, + state->bs_numtuples, false, false); tuplesort_end(state->bs_sortstate); } @@ -2861,24 +2754,9 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) /* Look up brin shared state */ brinshared = shm_toc_lookup(toc, PARALLEL_KEY_BRIN_SHARED, false); - /* Open relations using lock modes known to be obtained by index.c */ - if (!brinshared->isconcurrent) - { - heapLockmode = ShareLock; - indexLockmode = AccessExclusiveLock; - } - else - { - heapLockmode = ShareUpdateExclusiveLock; - indexLockmode = RowExclusiveLock; - } - - /* Track query ID */ - pgstat_report_query_id(brinshared->queryid, false); - - /* Open relations within worker */ - heapRel = table_open(brinshared->heaprelid, heapLockmode); - indexRel = index_open(brinshared->indexrelid, indexLockmode); + /* Open relations within worker, using the leader's lock modes */ + ParallelIndexBuildOpenRelations(&brinshared->base, &heapRel, &indexRel, + &heapLockmode, &indexLockmode); buildstate = initialize_brin_buildstate(indexRel, NULL, brinshared->pagesPerRange, @@ -2896,7 +2774,7 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) * (when requested number of workers were not launched, this will be * somewhat higher than it is for other workers). */ - sortmem = maintenance_work_mem / brinshared->scantuplesortstates; + sortmem = maintenance_work_mem / brinshared->base.scantuplesortstates; _brin_parallel_scan_and_build(buildstate, brinshared, sharedsort, heapRel, indexRel, sortmem, false); @@ -2905,8 +2783,8 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc) worker_instr = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, false); InstrEndParallelQuery(&worker_instr[ParallelWorkerNumber]); - index_close(indexRel, indexLockmode); - table_close(heapRel, heapLockmode); + ParallelIndexBuildCloseRelations(heapRel, indexRel, heapLockmode, + indexLockmode); } /* diff --git a/src/backend/access/common/Makefile b/src/backend/access/common/Makefile index e78de312659..0d4769b96af 100644 --- a/src/backend/access/common/Makefile +++ b/src/backend/access/common/Makefile @@ -18,6 +18,7 @@ OBJS = \ detoast.o \ heaptuple.o \ indextuple.o \ + parallel_index_build.o \ printsimple.o \ printtup.o \ relation.o \ diff --git a/src/backend/access/common/meson.build b/src/backend/access/common/meson.build index 35e89b5ea67..7d6247b9754 100644 --- a/src/backend/access/common/meson.build +++ b/src/backend/access/common/meson.build @@ -6,6 +6,7 @@ backend_sources += files( 'detoast.c', 'heaptuple.c', 'indextuple.c', + 'parallel_index_build.c', 'printsimple.c', 'printtup.c', 'relation.c', diff --git a/src/backend/access/common/parallel_index_build.c b/src/backend/access/common/parallel_index_build.c new file mode 100644 index 00000000000..adce8c547e5 --- /dev/null +++ b/src/backend/access/common/parallel_index_build.c @@ -0,0 +1,246 @@ +/*------------------------------------------------------------------------- + * + * parallel_index_build.c + * Shared infrastructure for parallel index builds. + * + * This file contains the access-method-independent parts of the parallel + * index build lifecycle shared by different index access methods: setting up the + * parallel context and shared state, and opening/closing relations and tearing + * everything down again in the worker and leader. + * + * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/common/parallel_index_build.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/parallel_index_build.h" +#include "access/genam.h" +#include "access/table.h" +#include "access/tableam.h" +#include "access/xact.h" +#include "executor/instrument.h" +#include "pgstat.h" +#include "utils/rel.h" +#include "utils/snapmgr.h" +#include "utils/wait_event.h" + +/* + * Enter parallel mode and create a parallel context for an index build, using + * the named worker entry point. + */ +ParallelContext * +ParallelIndexBuildCreateContext(const char *worker_function, int nworkers) +{ + Assert(nworkers > 0); + + EnterParallelMode(); + + return CreateParallelContext("postgres", worker_function, nworkers); +} + +/* + * Choose the snapshot for the heap scan of an index build. + * + * In a normal index build, we use SnapshotAny because we must retrieve all + * tuples and do our own time qual checks (because we have to index + * RECENTLY_DEAD tuples). In a concurrent build, we take a regular MVCC + * snapshot and index whatever's live according to that. The caller is + * responsible for unregistering an MVCC snapshot by calling ParallelIndexBuildEnd. + */ +Snapshot +ParallelIndexBuildGetSnapshot(bool isconcurrent) +{ + if (!isconcurrent) + return SnapshotAny; + + return RegisterSnapshot(GetTransactionSnapshot()); +} + +/* + * Estimate the DSM space for the shared state struct of a parallel index + * build, including the parallel table scan descriptor that trails it. + * + * am_shared_size is sizeof() the access method's whole shared struct (which + * embeds ParallelIndexBuildShared as its first member). + */ +Size +ParallelIndexBuildEstimateShared(Relation heap, Snapshot snapshot, + Size am_shared_size) +{ + /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */ + return add_size(BUFFERALIGN(am_shared_size), + table_parallelscan_estimate(heap, snapshot)); +} + +/* + * Initialize the common shared state for a parallel index build. + * + * The caller has already allocated the embedding shared struct in DSM; this + * fills in the common header and initializes the parallel heap scan + * descriptor that follows the (whole) embedding struct, which the caller + * passes as pscan. AM-specific fields are the caller's responsibility. + */ +void +ParallelIndexBuildInitShared(ParallelIndexBuildShared * shared, + Relation heap, Relation index, + bool isconcurrent, int scantuplesortstates, + ParallelTableScanDesc pscan, Snapshot snapshot) +{ + /* Initialize immutable state */ + shared->heaprelid = RelationGetRelid(heap); + shared->indexrelid = RelationGetRelid(index); + shared->isconcurrent = isconcurrent; + shared->scantuplesortstates = scantuplesortstates; + shared->queryid = pgstat_get_my_query_id(); + ConditionVariableInit(&shared->workersdonecv); + SpinLockInit(&shared->mutex); + + /* Initialize mutable state */ + shared->nparticipantsdone = 0; + shared->reltuples = 0.0; + shared->indtuples = 0.0; + shared->havedead = false; + shared->brokenhotchain = false; + + table_parallelscan_initialize(heap, pscan, snapshot); +} + +/* + * Wait, in the leader, for all participants to finish their portion of the + * scan, then read back the accumulated per-build results. + * + * reltuples and indtuples receive the totals. havedead and brokenhotchain are + * optional (pass NULL when the AM does not care): they report whether any + * worker saw RECENTLY_DEAD tuples or a broken HOT chain. + */ +void +ParallelIndexBuildWaitForWorkers(ParallelIndexBuildShared * shared, + int nparticipants, + double *reltuples, double *indtuples, + bool *havedead, bool *brokenhotchain) +{ + for (;;) + { + SpinLockAcquire(&shared->mutex); + if (shared->nparticipantsdone == nparticipants) + { + *reltuples = shared->reltuples; + *indtuples = shared->indtuples; + if (havedead) + *havedead = shared->havedead; + if (brokenhotchain) + *brokenhotchain = shared->brokenhotchain; + SpinLockRelease(&shared->mutex); + break; + } + SpinLockRelease(&shared->mutex); + + ConditionVariableSleep(&shared->workersdonecv, + WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN); + } + + ConditionVariableCancelSleep(); +} + +/* + * Shut down the workers and tear down a parallel index build. + * + * Waits for all workers to finish, accumulates their buffer/WAL usage into the + * leader's stats, releases the MVCC snapshot if one was used, and exits + * parallel mode. instr is the per-worker Instrumentation array previously + * allocated with StoreParallelInstrumentation. + */ +void +ParallelIndexBuildEnd(ParallelContext *pcxt, struct Instrumentation *instr, + Snapshot snapshot) +{ + /* Shutdown worker processes */ + WaitForParallelWorkersToFinish(pcxt); + + /* + * Next, accumulate instrumentation. This must wait for the workers to + * finish, or we might get incomplete data. + */ + InstrAccumParallelQuery(instr, pcxt->nworkers_launched); + + /* Free last reference to MVCC snapshot, if one was used */ + if (IsMVCCSnapshot(snapshot)) + UnregisterSnapshot(snapshot); + DestroyParallelContext(pcxt); + ExitParallelMode(); +} + +/* + * Open the heap and index relations in a parallel index build worker. + * + * Selects the lock modes used by the leader (which differ for concurrent + * builds), reports the query ID, and opens both relations. The chosen lock + * modes are returned so they can be passed to ParallelIndexBuildCloseRelations. + */ +void +ParallelIndexBuildOpenRelations(ParallelIndexBuildShared * shared, + Relation *heapRel, Relation *indexRel, + LOCKMODE *heapLockmode, LOCKMODE *indexLockmode) +{ + /* Open relations using lock modes known to be obtained by index.c */ + if (!shared->isconcurrent) + { + *heapLockmode = ShareLock; + *indexLockmode = AccessExclusiveLock; + } + else + { + *heapLockmode = ShareUpdateExclusiveLock; + *indexLockmode = RowExclusiveLock; + } + + /* Track query ID */ + pgstat_report_query_id(shared->queryid, false); + + /* Open relations within worker */ + *heapRel = table_open(shared->heaprelid, *heapLockmode); + *indexRel = index_open(shared->indexrelid, *indexLockmode); +} + +/* + * Close the relations opened by ParallelIndexBuildOpenRelations. + */ +void +ParallelIndexBuildCloseRelations(Relation heapRel, Relation indexRel, + LOCKMODE heapLockmode, LOCKMODE indexLockmode) +{ + index_close(indexRel, indexLockmode); + table_close(heapRel, heapLockmode); +} + +/* + * Report, from a worker, that it has finished its portion of the scan. + * + * Accumulates the worker's tuple counts into the shared totals, ORs in the + * havedead/brokenhotchain flags (AMs that don't track these pass false), and + * signals the leader. Must be paired with ParallelIndexBuildWaitForWorkers in + * the leader. + */ +void +ParallelIndexBuildReportScanDone(ParallelIndexBuildShared * shared, + double reltuples, double indtuples, + bool havedead, bool brokenhotchain) +{ + SpinLockAcquire(&shared->mutex); + shared->nparticipantsdone++; + shared->reltuples += reltuples; + shared->indtuples += indtuples; + if (havedead) + shared->havedead = true; + if (brokenhotchain) + shared->brokenhotchain = true; + SpinLockRelease(&shared->mutex); + + /* Notify leader */ + ConditionVariableSignal(&shared->workersdonecv); +} diff --git a/src/backend/access/gin/gininsert.c b/src/backend/access/gin/gininsert.c index 57e51c574b9..f0386f6bdd3 100644 --- a/src/backend/access/gin/gininsert.c +++ b/src/backend/access/gin/gininsert.c @@ -17,6 +17,7 @@ #include "access/gin_private.h" #include "access/gin_tuple.h" #include "access/parallel.h" +#include "access/parallel_index_build.h" #include "access/table.h" #include "access/tableam.h" #include "access/xloginsert.h" @@ -53,45 +54,8 @@ */ typedef struct GinBuildShared { - /* - * These fields are not modified during the build. They primarily exist - * for the benefit of worker processes that need to create state - * corresponding to that used by the leader. - */ - Oid heaprelid; - Oid indexrelid; - bool isconcurrent; - int scantuplesortstates; - - /* - * workersdonecv is used to monitor the progress of workers. All parallel - * participants must indicate that they are done before leader can use - * results built by the workers (and before leader can write the data into - * the index). - */ - ConditionVariable workersdonecv; - - /* - * mutex protects all following fields - * - * These fields contain status information of interest to GIN index builds - * that must work just the same when an index is built in parallel. - */ - slock_t mutex; - - /* - * Mutable state that is maintained by workers, and reported back to - * leader at end of the scans. - * - * nparticipantsdone is number of worker processes finished. - * - * reltuples is the total number of input heap tuples. - * - * indtuples is the total number of tuples that made it into the index. - */ - int nparticipantsdone; - double reltuples; - double indtuples; + /* Common parallel index build state (must be first) */ + ParallelIndexBuildShared base; /* * ParallelTableScanDescData data follows. Can't directly embed here, as @@ -185,7 +149,6 @@ typedef struct static void _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, bool isconcurrent, int request); static void _gin_end_parallel(GinLeader *ginleader, GinBuildState *state); -static Size _gin_parallel_estimate_shared(Relation heap, Snapshot snapshot); static double _gin_parallel_heapscan(GinBuildState *state); static double _gin_parallel_merge(GinBuildState *state); static void _gin_leader_participate_as_worker(GinBuildState *buildstate, @@ -953,29 +916,18 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, /* * Enter parallel mode, and create context for parallel build of gin index */ - EnterParallelMode(); - Assert(request > 0); - pcxt = CreateParallelContext("postgres", "_gin_parallel_build_main", - request); + pcxt = ParallelIndexBuildCreateContext("_gin_parallel_build_main", request); scantuplesortstates = leaderparticipates ? request + 1 : request; - /* - * Prepare for scan of the base relation. In a normal index build, we use - * SnapshotAny because we must retrieve all tuples and do our own time - * qual checks (because we have to index RECENTLY_DEAD tuples). In a - * concurrent build, we take a regular MVCC snapshot and index whatever's - * live according to that. - */ - if (!isconcurrent) - snapshot = SnapshotAny; - else - snapshot = RegisterSnapshot(GetTransactionSnapshot()); + /* Prepare for scan of the base relation. */ + snapshot = ParallelIndexBuildGetSnapshot(isconcurrent); /* * Estimate size for our own PARALLEL_KEY_GIN_SHARED workspace. */ - estginshared = _gin_parallel_estimate_shared(heap, snapshot); + estginshared = ParallelIndexBuildEstimateShared(heap, snapshot, + sizeof(GinBuildShared)); shm_toc_estimate_chunk(&pcxt->estimator, estginshared); estsort = tuplesort_estimate_shared(scantuplesortstates); shm_toc_estimate_chunk(&pcxt->estimator, estsort); @@ -1003,23 +955,11 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, /* Store shared build state, for which we reserved space */ ginshared = (GinBuildShared *) shm_toc_allocate(pcxt->toc, estginshared); - /* Initialize immutable state */ - ginshared->heaprelid = RelationGetRelid(heap); - ginshared->indexrelid = RelationGetRelid(index); - ginshared->isconcurrent = isconcurrent; - ginshared->scantuplesortstates = scantuplesortstates; - - ConditionVariableInit(&ginshared->workersdonecv); - SpinLockInit(&ginshared->mutex); - - /* Initialize mutable state */ - ginshared->nparticipantsdone = 0; - ginshared->reltuples = 0.0; - ginshared->indtuples = 0.0; - - table_parallelscan_initialize(heap, - ParallelTableScanFromGinBuildShared(ginshared), - snapshot); + /* Initialize common state, and the parallel scan that follows the struct */ + ParallelIndexBuildInitShared(&ginshared->base, heap, index, isconcurrent, + scantuplesortstates, + ParallelTableScanFromGinBuildShared(ginshared), + snapshot); /* * Store shared tuplesort-private state, for which we reserved space. @@ -1076,21 +1016,8 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index, static void _gin_end_parallel(GinLeader *ginleader, GinBuildState *state) { - /* Shutdown worker processes */ - WaitForParallelWorkersToFinish(ginleader->pcxt); - - /* - * Next, accumulate WAL usage. (This must wait for the workers to finish, - * or we might get incomplete data.) - */ - InstrAccumParallelQuery(ginleader->instr, - ginleader->pcxt->nworkers_launched); - - /* Free last reference to MVCC snapshot, if one was used */ - if (IsMVCCSnapshot(ginleader->snapshot)) - UnregisterSnapshot(ginleader->snapshot); - DestroyParallelContext(ginleader->pcxt); - ExitParallelMode(); + ParallelIndexBuildEnd(ginleader->pcxt, ginleader->instr, + ginleader->snapshot); } /* @@ -1106,28 +1033,12 @@ static double _gin_parallel_heapscan(GinBuildState *state) { GinBuildShared *ginshared = state->bs_leader->ginshared; - int nparticipanttuplesorts; - - nparticipanttuplesorts = state->bs_leader->nparticipanttuplesorts; - for (;;) - { - SpinLockAcquire(&ginshared->mutex); - if (ginshared->nparticipantsdone == nparticipanttuplesorts) - { - /* copy the data into leader state */ - state->bs_reltuples = ginshared->reltuples; - state->bs_numtuples = ginshared->indtuples; - SpinLockRelease(&ginshared->mutex); - break; - } - SpinLockRelease(&ginshared->mutex); - - ConditionVariableSleep(&ginshared->workersdonecv, - WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN); - } - - ConditionVariableCancelSleep(); + /* Wait for all participants to finish, and collect their results */ + ParallelIndexBuildWaitForWorkers(&ginshared->base, + state->bs_leader->nparticipanttuplesorts, + &state->bs_reltuples, &state->bs_numtuples, + NULL, NULL); return state->bs_reltuples; } @@ -1761,17 +1672,6 @@ _gin_parallel_merge(GinBuildState *state) return reltuples; } -/* - * Returns size of shared memory required to store state for a parallel - * gin index build based on the snapshot its parallel scan will use. - */ -static Size -_gin_parallel_estimate_shared(Relation heap, Snapshot snapshot) -{ - /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */ - return add_size(BUFFERALIGN(sizeof(GinBuildShared)), - table_parallelscan_estimate(heap, snapshot)); -} /* * Within leader, participate as a parallel worker. @@ -2008,7 +1908,7 @@ _gin_parallel_scan_and_build(GinBuildState *state, state->work_mem = (sortmem / 2); /* remember how many workers participate in the build */ - state->bs_num_workers = ginshared->scantuplesortstates; + state->bs_num_workers = ginshared->base.scantuplesortstates; /* Begin "partial" tuplesort */ state->bs_sortstate = tuplesort_begin_index_gin(heap, index, @@ -2024,7 +1924,7 @@ _gin_parallel_scan_and_build(GinBuildState *state, /* Join parallel scan */ indexInfo = BuildIndexInfo(index); - indexInfo->ii_Concurrent = ginshared->isconcurrent; + indexInfo->ii_Concurrent = ginshared->base.isconcurrent; scan = table_beginscan_parallel(heap, ParallelTableScanFromGinBuildShared(ginshared), @@ -2048,17 +1948,9 @@ _gin_parallel_scan_and_build(GinBuildState *state, state->bs_reltuples += reltuples; - /* - * Done. Record ambuild statistics. - */ - SpinLockAcquire(&ginshared->mutex); - ginshared->nparticipantsdone++; - ginshared->reltuples += state->bs_reltuples; - ginshared->indtuples += state->bs_numtuples; - SpinLockRelease(&ginshared->mutex); - - /* Notify leader */ - ConditionVariableSignal(&ginshared->workersdonecv); + /* Done. Record ambuild statistics, and notify leader. */ + ParallelIndexBuildReportScanDone(&ginshared->base, state->bs_reltuples, + state->bs_numtuples, false, false); tuplesort_end(state->bs_sortstate); } @@ -2092,21 +1984,9 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc) /* Look up gin shared state */ ginshared = shm_toc_lookup(toc, PARALLEL_KEY_GIN_SHARED, false); - /* Open relations using lock modes known to be obtained by index.c */ - if (!ginshared->isconcurrent) - { - heapLockmode = ShareLock; - indexLockmode = AccessExclusiveLock; - } - else - { - heapLockmode = ShareUpdateExclusiveLock; - indexLockmode = RowExclusiveLock; - } - - /* Open relations within worker */ - heapRel = table_open(ginshared->heaprelid, heapLockmode); - indexRel = index_open(ginshared->indexrelid, indexLockmode); + /* Open relations within worker, using the leader's lock modes */ + ParallelIndexBuildOpenRelations(&ginshared->base, &heapRel, &indexRel, + &heapLockmode, &indexLockmode); /* initialize the GIN build state */ initGinState(&buildstate.ginstate, indexRel); @@ -2146,7 +2026,7 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc) * (when requested number of workers were not launched, this will be * somewhat higher than it is for other workers). */ - sortmem = maintenance_work_mem / ginshared->scantuplesortstates; + sortmem = maintenance_work_mem / ginshared->base.scantuplesortstates; _gin_parallel_scan_and_build(&buildstate, ginshared, sharedsort, heapRel, indexRel, sortmem, false); @@ -2155,8 +2035,8 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc) worker_instr = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, false); InstrEndParallelQuery(&worker_instr[ParallelWorkerNumber]); - index_close(indexRel, indexLockmode); - table_close(heapRel, heapLockmode); + ParallelIndexBuildCloseRelations(heapRel, indexRel, heapLockmode, + indexLockmode); } /* diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c index f43bb939081..c953805576c 100644 --- a/src/backend/access/nbtree/nbtsort.c +++ b/src/backend/access/nbtree/nbtsort.c @@ -42,6 +42,7 @@ #include "access/nbtree.h" #include "access/parallel.h" +#include "access/parallel_index_build.h" #include "access/relscan.h" #include "access/table.h" #include "access/tableam.h" @@ -95,58 +96,12 @@ typedef struct BTSpool */ typedef struct BTShared { - /* - * These fields are not modified during the sort. They primarily exist - * for the benefit of worker processes that need to create BTSpool state - * corresponding to that used by the leader. - */ - Oid heaprelid; - Oid indexrelid; + /* Common parallel index build state (must be first) */ + ParallelIndexBuildShared base; + + /* nbtree-specific immutable state, not modified during the sort */ bool isunique; bool nulls_not_distinct; - bool isconcurrent; - int scantuplesortstates; - - /* Query ID, for report in worker processes */ - int64 queryid; - - /* - * workersdonecv is used to monitor the progress of workers. All parallel - * participants must indicate that they are done before leader can use - * mutable state that workers maintain during scan (and before leader can - * proceed to tuplesort_performsort()). - */ - ConditionVariable workersdonecv; - - /* - * mutex protects all fields before heapdesc. - * - * These fields contain status information of interest to B-Tree index - * builds that must work just the same when an index is built in parallel. - */ - slock_t mutex; - - /* - * Mutable state that is maintained by workers, and reported back to - * leader at end of parallel scan. - * - * nparticipantsdone is number of worker processes finished. - * - * reltuples is the total number of input heap tuples. - * - * havedead indicates if RECENTLY_DEAD tuples were encountered during - * build. - * - * indtuples is the total number of tuples that made it into the index. - * - * brokenhotchain indicates if any worker detected a broken HOT chain - * during build. - */ - int nparticipantsdone; - double reltuples; - bool havedead; - double indtuples; - bool brokenhotchain; /* * ParallelTableScanDescData data follows. Can't directly embed here, as @@ -280,7 +235,6 @@ static void _bt_load(BTWriteState *wstate, static void _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request); static void _bt_end_parallel(BTLeader *btleader); -static Size _bt_parallel_estimate_shared(Relation heap, Snapshot snapshot); static double _bt_parallel_heapscan(BTBuildState *buildstate, bool *brokenhotchain); static void _bt_leader_participate_as_worker(BTBuildState *buildstate); @@ -1417,30 +1371,19 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) * Enter parallel mode, and create context for parallel build of btree * index */ - EnterParallelMode(); - Assert(request > 0); - pcxt = CreateParallelContext("postgres", "_bt_parallel_build_main", - request); + pcxt = ParallelIndexBuildCreateContext("_bt_parallel_build_main", request); scantuplesortstates = leaderparticipates ? request + 1 : request; - /* - * Prepare for scan of the base relation. In a normal index build, we use - * SnapshotAny because we must retrieve all tuples and do our own time - * qual checks (because we have to index RECENTLY_DEAD tuples). In a - * concurrent build, we take a regular MVCC snapshot and index whatever's - * live according to that. - */ - if (!isconcurrent) - snapshot = SnapshotAny; - else - snapshot = RegisterSnapshot(GetTransactionSnapshot()); + /* Prepare for scan of the base relation. */ + snapshot = ParallelIndexBuildGetSnapshot(isconcurrent); /* * Estimate size for our own PARALLEL_KEY_BTREE_SHARED workspace, and * PARALLEL_KEY_TUPLESORT tuplesort workspace */ - estbtshared = _bt_parallel_estimate_shared(btspool->heap, snapshot); + estbtshared = ParallelIndexBuildEstimateShared(btspool->heap, snapshot, + sizeof(BTShared)); shm_toc_estimate_chunk(&pcxt->estimator, estbtshared); estsort = tuplesort_estimate_shared(scantuplesortstates); shm_toc_estimate_chunk(&pcxt->estimator, estsort); @@ -1478,25 +1421,14 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) /* Store shared build state, for which we reserved space */ btshared = (BTShared *) shm_toc_allocate(pcxt->toc, estbtshared); - /* Initialize immutable state */ - btshared->heaprelid = RelationGetRelid(btspool->heap); - btshared->indexrelid = RelationGetRelid(btspool->index); + /* Initialize nbtree-specific immutable state */ btshared->isunique = btspool->isunique; btshared->nulls_not_distinct = btspool->nulls_not_distinct; - btshared->isconcurrent = isconcurrent; - btshared->scantuplesortstates = scantuplesortstates; - btshared->queryid = pgstat_get_my_query_id(); - ConditionVariableInit(&btshared->workersdonecv); - SpinLockInit(&btshared->mutex); - /* Initialize mutable state */ - btshared->nparticipantsdone = 0; - btshared->reltuples = 0.0; - btshared->havedead = false; - btshared->indtuples = 0.0; - btshared->brokenhotchain = false; - table_parallelscan_initialize(btspool->heap, - ParallelTableScanFromBTShared(btshared), - snapshot); + /* Initialize common state, and the parallel scan that follows the struct */ + ParallelIndexBuildInitShared(&btshared->base, btspool->heap, btspool->index, + isconcurrent, scantuplesortstates, + ParallelTableScanFromBTShared(btshared), + snapshot); /* * Store shared tuplesort-private state, for which we reserved space. @@ -1571,33 +1503,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) static void _bt_end_parallel(BTLeader *btleader) { - /* Shutdown worker processes */ - WaitForParallelWorkersToFinish(btleader->pcxt); - - /* - * Next, accumulate WAL usage. (This must wait for the workers to finish, - * or we might get incomplete data.) - */ - InstrAccumParallelQuery(btleader->instr, - btleader->pcxt->nworkers_launched); - - /* Free last reference to MVCC snapshot, if one was used */ - if (IsMVCCSnapshot(btleader->snapshot)) - UnregisterSnapshot(btleader->snapshot); - DestroyParallelContext(btleader->pcxt); - ExitParallelMode(); -} - -/* - * Returns size of shared memory required to store state for a parallel - * btree index build based on the snapshot its parallel scan will use. - */ -static Size -_bt_parallel_estimate_shared(Relation heap, Snapshot snapshot) -{ - /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */ - return add_size(BUFFERALIGN(sizeof(BTShared)), - table_parallelscan_estimate(heap, snapshot)); + ParallelIndexBuildEnd(btleader->pcxt, btleader->instr, btleader->snapshot); } /* @@ -1616,29 +1522,13 @@ static double _bt_parallel_heapscan(BTBuildState *buildstate, bool *brokenhotchain) { BTShared *btshared = buildstate->btleader->btshared; - int nparticipanttuplesorts; double reltuples; - nparticipanttuplesorts = buildstate->btleader->nparticipanttuplesorts; - for (;;) - { - SpinLockAcquire(&btshared->mutex); - if (btshared->nparticipantsdone == nparticipanttuplesorts) - { - buildstate->havedead = btshared->havedead; - buildstate->indtuples = btshared->indtuples; - *brokenhotchain = btshared->brokenhotchain; - reltuples = btshared->reltuples; - SpinLockRelease(&btshared->mutex); - break; - } - SpinLockRelease(&btshared->mutex); - - ConditionVariableSleep(&btshared->workersdonecv, - WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN); - } - - ConditionVariableCancelSleep(); + /* Wait for all participants to finish, and collect their results */ + ParallelIndexBuildWaitForWorkers(&btshared->base, + buildstate->btleader->nparticipanttuplesorts, + &reltuples, &buildstate->indtuples, + &buildstate->havedead, brokenhotchain); return reltuples; } @@ -1732,24 +1622,9 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc) /* Look up nbtree shared state */ btshared = shm_toc_lookup(toc, PARALLEL_KEY_BTREE_SHARED, false); - /* Open relations using lock modes known to be obtained by index.c */ - if (!btshared->isconcurrent) - { - heapLockmode = ShareLock; - indexLockmode = AccessExclusiveLock; - } - else - { - heapLockmode = ShareUpdateExclusiveLock; - indexLockmode = RowExclusiveLock; - } - - /* Track query ID */ - pgstat_report_query_id(btshared->queryid, false); - - /* Open relations within worker */ - heapRel = table_open(btshared->heaprelid, heapLockmode); - indexRel = index_open(btshared->indexrelid, indexLockmode); + /* Open relations within worker, using the leader's lock modes */ + ParallelIndexBuildOpenRelations(&btshared->base, &heapRel, &indexRel, + &heapLockmode, &indexLockmode); /* Initialize worker's own spool */ btspool = palloc0_object(BTSpool); @@ -1784,7 +1659,7 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc) InstrStartParallelQuery(); /* Perform sorting of spool, and possibly a spool2 */ - sortmem = maintenance_work_mem / btshared->scantuplesortstates; + sortmem = maintenance_work_mem / btshared->base.scantuplesortstates; _bt_parallel_scan_and_sort(btspool, btspool2, btshared, sharedsort, sharedsort2, sortmem, false); @@ -1800,8 +1675,8 @@ _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc) } #endif /* BTREE_BUILD_STATS */ - index_close(indexRel, indexLockmode); - table_close(heapRel, heapLockmode); + ParallelIndexBuildCloseRelations(heapRel, indexRel, heapLockmode, + indexLockmode); } /* @@ -1877,7 +1752,7 @@ _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2, /* Join parallel scan */ indexInfo = BuildIndexInfo(btspool->index); - indexInfo->ii_Concurrent = btshared->isconcurrent; + indexInfo->ii_Concurrent = btshared->base.isconcurrent; scan = table_beginscan_parallel(btspool->heap, ParallelTableScanFromBTShared(btshared), SO_NONE); @@ -1899,21 +1774,12 @@ _bt_parallel_scan_and_sort(BTSpool *btspool, BTSpool *btspool2, } /* - * Done. Record ambuild statistics, and whether we encountered a broken - * HOT chain. + * Done. Record ambuild statistics (including whether we encountered a + * broken HOT chain), and notify the leader. */ - SpinLockAcquire(&btshared->mutex); - btshared->nparticipantsdone++; - btshared->reltuples += reltuples; - if (buildstate.havedead) - btshared->havedead = true; - btshared->indtuples += buildstate.indtuples; - if (indexInfo->ii_BrokenHotChain) - btshared->brokenhotchain = true; - SpinLockRelease(&btshared->mutex); - - /* Notify leader */ - ConditionVariableSignal(&btshared->workersdonecv); + ParallelIndexBuildReportScanDone(&btshared->base, reltuples, + buildstate.indtuples, buildstate.havedead, + indexInfo->ii_BrokenHotChain); /* We can end tuplesorts immediately */ tuplesort_end(btspool->sortstate); diff --git a/src/include/access/parallel_index_build.h b/src/include/access/parallel_index_build.h new file mode 100644 index 00000000000..f6d311218a4 --- /dev/null +++ b/src/include/access/parallel_index_build.h @@ -0,0 +1,119 @@ +/*------------------------------------------------------------------------- + * + * parallel_index_build.h + * Shared infrastructure for parallel index builds. + * + * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/parallel_index_build.h + * + *------------------------------------------------------------------------- + */ +#ifndef PARALLEL_INDEX_BUILD_H +#define PARALLEL_INDEX_BUILD_H + +#include "access/parallel.h" +#include "access/relscan.h" +#include "storage/condition_variable.h" +#include "storage/lockdefs.h" +#include "storage/spin.h" +#include "utils/relcache.h" +#include "utils/snapshot.h" + +struct Instrumentation; + +/* + * Shared state common to all parallel index builds. + * + * Access methods embed this as the first member of their own shared state + * and append any AM-specific fields, followed by the ParallelTableScanDescData + * for the heap scan. The immutable fields are set once by the leader before + * workers are launched; the mutable fields are maintained by the workers + * under mutex and read back by the leader once all workers have signalled + * completion. + */ +typedef struct ParallelIndexBuildShared +{ + /* Immutable state, set by the leader before launching workers */ + Oid heaprelid; + Oid indexrelid; + bool isconcurrent; + int scantuplesortstates; + + /* Query ID, for report in worker processes */ + int64 queryid; + + /* + * workersdonecv is used to monitor the progress of workers. All parallel + * participants must indicate that they are done before the leader can use + * the results built by the workers. + */ + ConditionVariable workersdonecv; + + /* + * mutex protects the mutable fields below. + */ + slock_t mutex; + + /* + * Mutable state that is maintained by workers for all index types, and + * reported back to leader at end of the scans. + * + * nparticipantsdone is the number of worker processes finished. + * + * reltuples is the total number of input heap tuples. + * + * indtuples is the total number of tuples that made it into the index. + */ + int nparticipantsdone; + double reltuples; + double indtuples; + + /* + * Mutable state that is maintained for exact index AMs (e.g. nbtree), and + * unused for lossy AMs. + * + * havedead indicates if RECENTLY_DEAD tuples were encountered during + * build. + * + * brokenhotchain indicates if any worker detected a broken HOT chain + * during build. + */ + bool havedead; + bool brokenhotchain; +} ParallelIndexBuildShared; + +/* Leader-side helpers */ +extern ParallelContext *ParallelIndexBuildCreateContext(const char *worker_function, + int nworkers); +extern Snapshot ParallelIndexBuildGetSnapshot(bool isconcurrent); +extern Size ParallelIndexBuildEstimateShared(Relation heap, Snapshot snapshot, + Size am_shared_size); +extern void ParallelIndexBuildInitShared(ParallelIndexBuildShared * shared, + Relation heap, Relation index, + bool isconcurrent, + int scantuplesortstates, + ParallelTableScanDesc pscan, + Snapshot snapshot); +extern void ParallelIndexBuildWaitForWorkers(ParallelIndexBuildShared * shared, + int nparticipants, + double *reltuples, double *indtuples, + bool *havedead, bool *brokenhotchain); +extern void ParallelIndexBuildEnd(ParallelContext *pcxt, + struct Instrumentation *instr, + Snapshot snapshot); + +/* Worker-side helpers */ +extern void ParallelIndexBuildReportScanDone(ParallelIndexBuildShared * shared, + double reltuples, double indtuples, + bool havedead, bool brokenhotchain); +extern void ParallelIndexBuildOpenRelations(ParallelIndexBuildShared * shared, + Relation *heapRel, Relation *indexRel, + LOCKMODE *heapLockmode, + LOCKMODE *indexLockmode); +extern void ParallelIndexBuildCloseRelations(Relation heapRel, Relation indexRel, + LOCKMODE heapLockmode, + LOCKMODE indexLockmode); + +#endif /* PARALLEL_INDEX_BUILD_H */ -- 2.47.1