From fc79ec8084837e1792441b1dae1594986dba0caa Mon Sep 17 00:00:00 2001 From: nkey Date: Mon, 2 Dec 2024 01:33:21 +0100 Subject: [PATCH v2 4/4] Allow snapshot resets during parallel concurrent index builds Previously, non-unique concurrent index builds in parallel mode required a consistent MVCC snapshot throughout the build, which could hold back the xmin horizon and prevent dead tuple cleanup. This patch extends the previous work on snapshot resets (introduced for non-parallel builds) to also support parallel builds. Key changes: - Add infrastructure to track snapshot restoration in parallel workers - Extend parallel scan initialization to support periodic snapshot resets - Wait for parallel workers to restore their initial snapshots before proceeding with scan - Add regression tests to verify behavior with various index types The snapshot reset approach is safe for non-unique indexes since they don't need snapshot consistency across the entire scan. For unique indexes, we continue to maintain a consistent snapshot to properly enforce uniqueness constraints. This helps reduce the xmin horizon impact of long-running concurrent index builds in parallel mode, improving VACUUM's ability to clean up dead tuples. --- src/backend/access/brin/brin.c | 43 +++++++++------- src/backend/access/heap/heapam_handler.c | 12 +++-- src/backend/access/nbtree/nbtsort.c | 38 ++++++++++++-- src/backend/access/table/tableam.c | 37 ++++++++++++-- src/backend/access/transam/parallel.c | 50 +++++++++++++++++-- src/backend/executor/nodeSeqscan.c | 3 +- src/backend/utils/time/snapmgr.c | 8 --- src/include/access/parallel.h | 3 +- src/include/access/relscan.h | 1 + src/include/access/tableam.h | 9 ++-- .../expected/cic_reset_snapshots.out | 23 ++++++++- .../sql/cic_reset_snapshots.sql | 7 ++- 12 files changed, 178 insertions(+), 56 deletions(-) diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c index d69859ac4df..0782bd64a6a 100644 --- a/src/backend/access/brin/brin.c +++ b/src/backend/access/brin/brin.c @@ -143,7 +143,6 @@ typedef struct BrinLeader */ BrinShared *brinshared; Sharedsort *sharedsort; - Snapshot snapshot; WalUsage *walusage; BufferUsage *bufferusage; } BrinLeader; @@ -231,7 +230,7 @@ static void brin_fill_empty_ranges(BrinBuildState *state, static void _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, bool isconcurrent, int request); static void _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state); -static Size _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot); +static Size _brin_parallel_estimate_shared(Relation heap); static double _brin_parallel_heapscan(BrinBuildState *state); static double _brin_parallel_merge(BrinBuildState *state); static void _brin_leader_participate_as_worker(BrinBuildState *buildstate, @@ -2357,7 +2356,6 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, { ParallelContext *pcxt; int scantuplesortstates; - Snapshot snapshot; Size estbrinshared; Size estsort; BrinShared *brinshared; @@ -2367,6 +2365,7 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, BufferUsage *bufferusage; bool leaderparticipates = true; bool need_pop_active_snapshot = true; + bool wait_for_snapshot_attach; int querylen; #ifdef DISABLE_LEADER_PARTICIPATION @@ -2388,25 +2387,25 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, * Prepare for scan of the base relation. In a normal index build, we use * SnapshotAny because we must retrieve all tuples and do our own time * qual checks (because we have to index RECENTLY_DEAD tuples). In a - * concurrent build, we take a regular MVCC snapshot and index whatever's - * live according to that. + * concurrent build, we take a regular MVCC snapshot and push it as active. + * Later we index whatever's live according to that snapshot while that + * snapshot is reset periodically. */ if (!isconcurrent) { Assert(ActiveSnapshotSet()); - snapshot = SnapshotAny; need_pop_active_snapshot = false; } else { - snapshot = RegisterSnapshot(GetTransactionSnapshot()); + Assert(!ActiveSnapshotSet()); PushActiveSnapshot(GetTransactionSnapshot()); } /* * Estimate size for our own PARALLEL_KEY_BRIN_SHARED workspace. */ - estbrinshared = _brin_parallel_estimate_shared(heap, snapshot); + estbrinshared = _brin_parallel_estimate_shared(heap); shm_toc_estimate_chunk(&pcxt->estimator, estbrinshared); estsort = tuplesort_estimate_shared(scantuplesortstates); shm_toc_estimate_chunk(&pcxt->estimator, estsort); @@ -2446,8 +2445,6 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, { if (need_pop_active_snapshot) PopActiveSnapshot(); - if (IsMVCCSnapshot(snapshot)) - UnregisterSnapshot(snapshot); DestroyParallelContext(pcxt); ExitParallelMode(); return; @@ -2472,7 +2469,8 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, table_parallelscan_initialize(heap, ParallelTableScanFromBrinShared(brinshared), - snapshot); + isconcurrent ? InvalidSnapshot : SnapshotAny, + isconcurrent); /* * Store shared tuplesort-private state, for which we reserved space. @@ -2518,7 +2516,6 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, brinleader->nparticipanttuplesorts++; brinleader->brinshared = brinshared; brinleader->sharedsort = sharedsort; - brinleader->snapshot = snapshot; brinleader->walusage = walusage; brinleader->bufferusage = bufferusage; @@ -2534,6 +2531,16 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, /* Save leader state now that it's clear build will be parallel */ buildstate->bs_leader = brinleader; + /* + * In case of concurrent build snapshots are going to be reset periodically. + * In case when leader going to reset own active snapshot as well - we need to + * wait until all workers imported initial snapshot. + */ + wait_for_snapshot_attach = isconcurrent && leaderparticipates; + + if (wait_for_snapshot_attach) + WaitForParallelWorkersToAttach(pcxt, true); + /* Join heap scan ourselves */ if (leaderparticipates) _brin_leader_participate_as_worker(buildstate, heap, index); @@ -2542,7 +2549,8 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index, * Caller needs to wait for all launched workers when we return. Make * sure that the failure-to-start case will not hang forever. */ - WaitForParallelWorkersToAttach(pcxt); + if (!wait_for_snapshot_attach) + WaitForParallelWorkersToAttach(pcxt, false); if (need_pop_active_snapshot) PopActiveSnapshot(); } @@ -2565,9 +2573,6 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state) for (i = 0; i < brinleader->pcxt->nworkers_launched; i++) InstrAccumParallelQuery(&brinleader->bufferusage[i], &brinleader->walusage[i]); - /* Free last reference to MVCC snapshot, if one was used */ - if (IsMVCCSnapshot(brinleader->snapshot)) - UnregisterSnapshot(brinleader->snapshot); DestroyParallelContext(brinleader->pcxt); ExitParallelMode(); } @@ -2767,14 +2772,14 @@ _brin_parallel_merge(BrinBuildState *state) /* * Returns size of shared memory required to store state for a parallel - * brin index build based on the snapshot its parallel scan will use. + * brin index build. */ static Size -_brin_parallel_estimate_shared(Relation heap, Snapshot snapshot) +_brin_parallel_estimate_shared(Relation heap) { /* c.f. shm_toc_allocate as to why BUFFERALIGN is used */ return add_size(BUFFERALIGN(sizeof(BrinShared)), - table_parallelscan_estimate(heap, snapshot)); + table_parallelscan_estimate(heap, InvalidSnapshot)); } /* diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index 980c51e32b9..2e5163609c1 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -1231,14 +1231,13 @@ heapam_index_build_range_scan(Relation heapRelation, * SnapshotAny because we must retrieve all tuples and do our own time * qual checks (because we have to index RECENTLY_DEAD tuples). In a * concurrent build, or during bootstrap, we take a regular MVCC snapshot - * and index whatever's live according to that. + * and index whatever's live according to that while that snapshot is reset + * every so often (in case of non-unique index). */ OldestXmin = InvalidTransactionId; /* * For unique index we need consistent snapshot for the whole scan. - * In case of parallel scan some additional infrastructure required - * to perform scan with SO_RESET_SNAPSHOT which is not yet ready. */ reset_snapshots = indexInfo->ii_Concurrent && !indexInfo->ii_Unique && @@ -1300,8 +1299,11 @@ heapam_index_build_range_scan(Relation heapRelation, Assert(!IsBootstrapProcessingMode()); Assert(allow_sync); snapshot = scan->rs_snapshot; - PushActiveSnapshot(snapshot); - need_pop_active_snapshot = true; + if (!reset_snapshots) + { + PushActiveSnapshot(snapshot); + need_pop_active_snapshot = true; + } } hscan = (HeapScanDesc) scan; diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c index 5c4581afb1a..2acbf121745 100644 --- a/src/backend/access/nbtree/nbtsort.c +++ b/src/backend/access/nbtree/nbtsort.c @@ -1411,6 +1411,8 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) BufferUsage *bufferusage; bool leaderparticipates = true; bool need_pop_active_snapshot = true; + bool reset_snapshot; + bool wait_for_snapshot_attach; int querylen; #ifdef DISABLE_LEADER_PARTICIPATION @@ -1428,12 +1430,21 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) scantuplesortstates = leaderparticipates ? request + 1 : request; + /* + * For concurrent non-unique index builds, we can periodically reset snapshots + * to allow the xmin horizon to advance. This is safe since these builds don't + * require a consistent view across the entire scan. Unique indexes still need + * a stable snapshot to properly enforce uniqueness constraints. + */ + reset_snapshot = isconcurrent && !btspool->isunique; + /* * Prepare for scan of the base relation. In a normal index build, we use * SnapshotAny because we must retrieve all tuples and do our own time * qual checks (because we have to index RECENTLY_DEAD tuples). In a * concurrent build, we take a regular MVCC snapshot and index whatever's - * live according to that. + * live according to that, while that snapshot may be reset periodically in + * case of non-unique index. */ if (!isconcurrent) { @@ -1441,6 +1452,11 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) snapshot = SnapshotAny; need_pop_active_snapshot = false; } + else if (reset_snapshot) + { + snapshot = InvalidSnapshot; + PushActiveSnapshot(GetTransactionSnapshot()); + } else { snapshot = RegisterSnapshot(GetTransactionSnapshot()); @@ -1501,7 +1517,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) { if (need_pop_active_snapshot) PopActiveSnapshot(); - if (IsMVCCSnapshot(snapshot)) + if (snapshot != InvalidSnapshot && IsMVCCSnapshot(snapshot)) UnregisterSnapshot(snapshot); DestroyParallelContext(pcxt); ExitParallelMode(); @@ -1528,7 +1544,8 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) btshared->brokenhotchain = false; table_parallelscan_initialize(btspool->heap, ParallelTableScanFromBTShared(btshared), - snapshot); + snapshot, + reset_snapshot); /* * Store shared tuplesort-private state, for which we reserved space. @@ -1604,6 +1621,16 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) /* Save leader state now that it's clear build will be parallel */ buildstate->btleader = btleader; + /* + * In case of concurrent build snapshots are going to be reset periodically. + * In case when leader going to reset own active snapshot as well - we need to + * wait until all workers imported initial snapshot. + */ + wait_for_snapshot_attach = reset_snapshot && leaderparticipates; + + if (wait_for_snapshot_attach) + WaitForParallelWorkersToAttach(pcxt, true); + /* Join heap scan ourselves */ if (leaderparticipates) _bt_leader_participate_as_worker(buildstate); @@ -1612,7 +1639,8 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request) * Caller needs to wait for all launched workers when we return. Make * sure that the failure-to-start case will not hang forever. */ - WaitForParallelWorkersToAttach(pcxt); + if (!wait_for_snapshot_attach) + WaitForParallelWorkersToAttach(pcxt, false); if (need_pop_active_snapshot) PopActiveSnapshot(); } @@ -1636,7 +1664,7 @@ _bt_end_parallel(BTLeader *btleader) InstrAccumParallelQuery(&btleader->bufferusage[i], &btleader->walusage[i]); /* Free last reference to MVCC snapshot, if one was used */ - if (IsMVCCSnapshot(btleader->snapshot)) + if (btleader->snapshot != InvalidSnapshot && IsMVCCSnapshot(btleader->snapshot)) UnregisterSnapshot(btleader->snapshot); DestroyParallelContext(btleader->pcxt); ExitParallelMode(); diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c index bd8715b6797..cac7a9ea88a 100644 --- a/src/backend/access/table/tableam.c +++ b/src/backend/access/table/tableam.c @@ -131,10 +131,10 @@ table_parallelscan_estimate(Relation rel, Snapshot snapshot) { Size sz = 0; - if (IsMVCCSnapshot(snapshot)) + if (snapshot != InvalidSnapshot && IsMVCCSnapshot(snapshot)) sz = add_size(sz, EstimateSnapshotSpace(snapshot)); else - Assert(snapshot == SnapshotAny); + Assert(snapshot == SnapshotAny || snapshot == InvalidSnapshot); sz = add_size(sz, rel->rd_tableam->parallelscan_estimate(rel)); @@ -143,21 +143,36 @@ table_parallelscan_estimate(Relation rel, Snapshot snapshot) void table_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan, - Snapshot snapshot) + Snapshot snapshot, bool reset_snapshot) { Size snapshot_off = rel->rd_tableam->parallelscan_initialize(rel, pscan); pscan->phs_snapshot_off = snapshot_off; - if (IsMVCCSnapshot(snapshot)) + /* + * Initialize parallel scan description. For normal scans with a regular + * MVCC snapshot, serialize the snapshot info. For scans that use periodic + * snapshot resets, mark the scan accordingly. + */ + if (reset_snapshot) + { + Assert(snapshot == InvalidSnapshot); + pscan->phs_snapshot_any = false; + pscan->phs_reset_snapshot = true; + INJECTION_POINT("table_parallelscan_initialize"); + } + else if (IsMVCCSnapshot(snapshot)) { SerializeSnapshot(snapshot, (char *) pscan + pscan->phs_snapshot_off); pscan->phs_snapshot_any = false; + pscan->phs_reset_snapshot = false; } else { Assert(snapshot == SnapshotAny); + Assert(!reset_snapshot); pscan->phs_snapshot_any = true; + pscan->phs_reset_snapshot = false; } } @@ -170,7 +185,19 @@ table_beginscan_parallel(Relation relation, ParallelTableScanDesc pscan) Assert(RelFileLocatorEquals(relation->rd_locator, pscan->phs_locator)); - if (!pscan->phs_snapshot_any) + /* + * For scans that + * use periodic snapshot resets, mark the scan accordingly and use the active + * snapshot as the initial state. + */ + if (pscan->phs_reset_snapshot) + { + Assert(ActiveSnapshotSet()); + flags |= SO_RESET_SNAPSHOT; + /* Start with current active snapshot. */ + snapshot = GetActiveSnapshot(); + } + else if (!pscan->phs_snapshot_any) { /* Snapshot was serialized -- restore it */ snapshot = RestoreSnapshot((char *) pscan + pscan->phs_snapshot_off); diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 0a1e089ec1d..d49c6ee410f 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -76,6 +76,7 @@ #define PARALLEL_KEY_RELMAPPER_STATE UINT64CONST(0xFFFFFFFFFFFF000D) #define PARALLEL_KEY_UNCOMMITTEDENUMS UINT64CONST(0xFFFFFFFFFFFF000E) #define PARALLEL_KEY_CLIENTCONNINFO UINT64CONST(0xFFFFFFFFFFFF000F) +#define PARALLEL_KEY_SNAPSHOT_RESTORED UINT64CONST(0xFFFFFFFFFFFF0010) /* Fixed-size parallel state. */ typedef struct FixedParallelState @@ -301,6 +302,10 @@ InitializeParallelDSM(ParallelContext *pcxt) pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); + shm_toc_estimate_chunk(&pcxt->estimator, mul_size(sizeof(bool), + pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate how much we'll need for the entrypoint info. */ shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) + strlen(pcxt->function_name) + 2); @@ -372,6 +377,7 @@ InitializeParallelDSM(ParallelContext *pcxt) char *entrypointstate; char *uncommittedenumsspace; char *clientconninfospace; + bool *snapshot_set_flag_space; Size lnamelen; /* Serialize shared libraries we have loaded. */ @@ -487,6 +493,19 @@ InitializeParallelDSM(ParallelContext *pcxt) strcpy(entrypointstate, pcxt->library_name); strcpy(entrypointstate + lnamelen + 1, pcxt->function_name); shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate); + + /* + * Establish dynamic shared memory to pass information about importing + * of snapshot. + */ + snapshot_set_flag_space = + shm_toc_allocate(pcxt->toc, mul_size(sizeof(bool), pcxt->nworkers)); + for (i = 0; i < pcxt->nworkers; ++i) + { + pcxt->worker[i].snapshot_restored = snapshot_set_flag_space + i * sizeof(bool); + *pcxt->worker[i].snapshot_restored = false; + } + shm_toc_insert(pcxt->toc, PARALLEL_KEY_SNAPSHOT_RESTORED, snapshot_set_flag_space); } /* Update nworkers_to_launch, in case we changed nworkers above. */ @@ -542,6 +561,17 @@ ReinitializeParallelDSM(ParallelContext *pcxt) pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL); } } + + /* Set snapshot restored flag to false. */ + if (pcxt->nworkers > 0) + { + bool *snapshot_restored_space; + int i; + snapshot_restored_space = + shm_toc_lookup(pcxt->toc, PARALLEL_KEY_SNAPSHOT_RESTORED, false); + for (i = 0; i < pcxt->nworkers; ++i) + snapshot_restored_space[i] = false; + } } /* @@ -657,6 +687,10 @@ LaunchParallelWorkers(ParallelContext *pcxt) * Wait for all workers to attach to their error queues, and throw an error if * any worker fails to do this. * + * wait_for_snapshot: track whether each parallel worker has successfully restored + * its snapshot. This is needed when using periodic snapshot resets to ensure all + * workers have a valid initial snapshot before proceeding with the scan. + * * Callers can assume that if this function returns successfully, then the * number of workers given by pcxt->nworkers_launched have initialized and * attached to their error queues. Whether or not these workers are guaranteed @@ -686,7 +720,7 @@ LaunchParallelWorkers(ParallelContext *pcxt) * call this function at all. */ void -WaitForParallelWorkersToAttach(ParallelContext *pcxt) +WaitForParallelWorkersToAttach(ParallelContext *pcxt, bool wait_for_snapshot) { int i; @@ -730,9 +764,12 @@ WaitForParallelWorkersToAttach(ParallelContext *pcxt) mq = shm_mq_get_queue(pcxt->worker[i].error_mqh); if (shm_mq_get_sender(mq) != NULL) { - /* Yes, so it is known to be attached. */ - pcxt->known_attached_workers[i] = true; - ++pcxt->nknown_attached_workers; + if (!wait_for_snapshot || *(pcxt->worker[i].snapshot_restored)) + { + /* Yes, so it is known to be attached. */ + pcxt->known_attached_workers[i] = true; + ++pcxt->nknown_attached_workers; + } } } else if (status == BGWH_STOPPED) @@ -1291,6 +1328,7 @@ ParallelWorkerMain(Datum main_arg) shm_toc *toc; FixedParallelState *fps; char *error_queue_space; + bool *snapshot_restored_space; shm_mq *mq; shm_mq_handle *mqh; char *libraryspace; @@ -1489,6 +1527,10 @@ ParallelWorkerMain(Datum main_arg) fps->parallel_leader_pgproc); PushActiveSnapshot(asnapshot); + /* Snapshot is restored, set flag to make leader know about it. */ + snapshot_restored_space = shm_toc_lookup(toc, PARALLEL_KEY_SNAPSHOT_RESTORED, false); + snapshot_restored_space[ParallelWorkerNumber] = true; + /* * We've changed which tuples we can see, and must therefore invalidate * system caches. diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index 7cb12a11c2d..2907b366791 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -262,7 +262,8 @@ ExecSeqScanInitializeDSM(SeqScanState *node, pscan = shm_toc_allocate(pcxt->toc, node->pscan_len); table_parallelscan_initialize(node->ss.ss_currentRelation, pscan, - estate->es_snapshot); + estate->es_snapshot, + false); shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan); node->ss.ss_currentScanDesc = table_beginscan_parallel(node->ss.ss_currentRelation, pscan); diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index 3a7357a050d..148e1982cad 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -291,14 +291,6 @@ GetTransactionSnapshot(void) Snapshot GetLatestSnapshot(void) { - /* - * We might be able to relax this, but nothing that could otherwise work - * needs it. - */ - if (IsInParallelMode()) - elog(ERROR, - "cannot update SecondarySnapshot during a parallel operation"); - /* * So far there are no cases requiring support for GetLatestSnapshot() * during logical decoding, but it wouldn't be hard to add if required. diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h index 69ffe5498f9..964a7e945be 100644 --- a/src/include/access/parallel.h +++ b/src/include/access/parallel.h @@ -26,6 +26,7 @@ typedef struct ParallelWorkerInfo { BackgroundWorkerHandle *bgwhandle; shm_mq_handle *error_mqh; + bool *snapshot_restored; } ParallelWorkerInfo; typedef struct ParallelContext @@ -65,7 +66,7 @@ extern void InitializeParallelDSM(ParallelContext *pcxt); extern void ReinitializeParallelDSM(ParallelContext *pcxt); extern void ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch); extern void LaunchParallelWorkers(ParallelContext *pcxt); -extern void WaitForParallelWorkersToAttach(ParallelContext *pcxt); +extern void WaitForParallelWorkersToAttach(ParallelContext *pcxt, bool wait_for_snapshot); extern void WaitForParallelWorkersToFinish(ParallelContext *pcxt); extern void DestroyParallelContext(ParallelContext *pcxt); extern bool ParallelContextActive(void); diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h index e1884acf493..a9603084aeb 100644 --- a/src/include/access/relscan.h +++ b/src/include/access/relscan.h @@ -88,6 +88,7 @@ typedef struct ParallelTableScanDescData RelFileLocator phs_locator; /* physical relation to scan */ bool phs_syncscan; /* report location to syncscan logic? */ bool phs_snapshot_any; /* SnapshotAny, not phs_snapshot_data? */ + bool phs_reset_snapshot; /* use SO_RESET_SNAPSHOT? */ Size phs_snapshot_off; /* data for snapshot */ } ParallelTableScanDescData; typedef struct ParallelTableScanDescData *ParallelTableScanDesc; diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index f4c7d2a92bf..9ee5ea15fd4 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -1184,7 +1184,8 @@ extern Size table_parallelscan_estimate(Relation rel, Snapshot snapshot); */ extern void table_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan, - Snapshot snapshot); + Snapshot snapshot, + bool reset_snapshot); /* * Begin a parallel scan. `pscan` needs to have been initialized with @@ -1802,9 +1803,9 @@ table_scan_analyze_next_tuple(TableScanDesc scan, TransactionId OldestXmin, * This only really makes sense for heap AM, it might need to be generalized * for other AMs later. * - * In case of non-unique index and non-parallel concurrent build - * SO_RESET_SNAPSHOT is applied for the scan. That leads for changing snapshots - * on the fly to allow xmin horizon propagate. + * In case of non-unique concurrent index build SO_RESET_SNAPSHOT is applied + * for the scan. That leads for changing snapshots on the fly to allow xmin + * horizon propagate. */ static inline double table_index_build_scan(Relation table_rel, diff --git a/src/test/modules/injection_points/expected/cic_reset_snapshots.out b/src/test/modules/injection_points/expected/cic_reset_snapshots.out index 4cfbbb05923..49ef68d9071 100644 --- a/src/test/modules/injection_points/expected/cic_reset_snapshots.out +++ b/src/test/modules/injection_points/expected/cic_reset_snapshots.out @@ -17,6 +17,12 @@ SELECT injection_points_attach('table_beginscan_strat_reset_snapshots', 'notice' (1 row) +SELECT injection_points_attach('table_parallelscan_initialize', 'notice'); + injection_points_attach +------------------------- + +(1 row) + CREATE SCHEMA cic_reset_snap; CREATE TABLE cic_reset_snap.tbl(i int primary key, j int); INSERT INTO cic_reset_snap.tbl SELECT i, i * I FROM generate_series(1, 200) s(i); @@ -72,27 +78,40 @@ NOTICE: notice triggered for injection point heap_reset_scan_snapshot_effective DROP INDEX CONCURRENTLY cic_reset_snap.idx; -- The same in parallel mode ALTER TABLE cic_reset_snap.tbl SET (parallel_workers=2); +-- Detach to keep test stable, since parallel worker may complete scan before leader +SELECT injection_points_detach('heap_reset_scan_snapshot_effective'); + injection_points_detach +------------------------- + +(1 row) + CREATE UNIQUE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(i); REINDEX INDEX CONCURRENTLY cic_reset_snap.idx; DROP INDEX CONCURRENTLY cic_reset_snap.idx; CREATE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(i); +NOTICE: notice triggered for injection point table_parallelscan_initialize REINDEX INDEX CONCURRENTLY cic_reset_snap.idx; +NOTICE: notice triggered for injection point table_parallelscan_initialize DROP INDEX CONCURRENTLY cic_reset_snap.idx; CREATE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(MOD(i, 2), j) WHERE MOD(i, 2) = 0; +NOTICE: notice triggered for injection point table_parallelscan_initialize REINDEX INDEX CONCURRENTLY cic_reset_snap.idx; +NOTICE: notice triggered for injection point table_parallelscan_initialize DROP INDEX CONCURRENTLY cic_reset_snap.idx; CREATE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(i, j) WHERE cic_reset_snap.predicate_stable(i); NOTICE: notice triggered for injection point table_beginscan_strat_reset_snapshots -NOTICE: notice triggered for injection point heap_reset_scan_snapshot_effective REINDEX INDEX CONCURRENTLY cic_reset_snap.idx; NOTICE: notice triggered for injection point table_beginscan_strat_reset_snapshots -NOTICE: notice triggered for injection point heap_reset_scan_snapshot_effective DROP INDEX CONCURRENTLY cic_reset_snap.idx; CREATE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(i, j) WHERE cic_reset_snap.predicate_stable_no_param(); +NOTICE: notice triggered for injection point table_parallelscan_initialize REINDEX INDEX CONCURRENTLY cic_reset_snap.idx; +NOTICE: notice triggered for injection point table_parallelscan_initialize DROP INDEX CONCURRENTLY cic_reset_snap.idx; CREATE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl USING BRIN(i); +NOTICE: notice triggered for injection point table_parallelscan_initialize REINDEX INDEX CONCURRENTLY cic_reset_snap.idx; +NOTICE: notice triggered for injection point table_parallelscan_initialize DROP INDEX CONCURRENTLY cic_reset_snap.idx; DROP SCHEMA cic_reset_snap CASCADE; NOTICE: drop cascades to 3 other objects diff --git a/src/test/modules/injection_points/sql/cic_reset_snapshots.sql b/src/test/modules/injection_points/sql/cic_reset_snapshots.sql index 4fef5a47431..5d1c31493f0 100644 --- a/src/test/modules/injection_points/sql/cic_reset_snapshots.sql +++ b/src/test/modules/injection_points/sql/cic_reset_snapshots.sql @@ -3,7 +3,7 @@ CREATE EXTENSION injection_points; SELECT injection_points_set_local(); SELECT injection_points_attach('heap_reset_scan_snapshot_effective', 'notice'); SELECT injection_points_attach('table_beginscan_strat_reset_snapshots', 'notice'); - +SELECT injection_points_attach('table_parallelscan_initialize', 'notice'); CREATE SCHEMA cic_reset_snap; CREATE TABLE cic_reset_snap.tbl(i int primary key, j int); @@ -53,6 +53,9 @@ DROP INDEX CONCURRENTLY cic_reset_snap.idx; -- The same in parallel mode ALTER TABLE cic_reset_snap.tbl SET (parallel_workers=2); +-- Detach to keep test stable, since parallel worker may complete scan before leader +SELECT injection_points_detach('heap_reset_scan_snapshot_effective'); + CREATE UNIQUE INDEX CONCURRENTLY idx ON cic_reset_snap.tbl(i); REINDEX INDEX CONCURRENTLY cic_reset_snap.idx; DROP INDEX CONCURRENTLY cic_reset_snap.idx; @@ -79,4 +82,4 @@ DROP INDEX CONCURRENTLY cic_reset_snap.idx; DROP SCHEMA cic_reset_snap CASCADE; -DROP EXTENSION injection_points; +DROP EXTENSION injection_points; \ No newline at end of file -- 2.43.0