From 400cd183ca955e989b1b9a2e2faf5df39d32e6f8 Mon Sep 17 00:00:00 2001 From: Mikhail Nikalayeu Date: Mon, 21 Apr 2025 14:11:53 +0200 Subject: [PATCH v26 8/8] Refresh snapshot periodically during index validation Enhances validation phase of concurrently built indexes by periodically refreshing snapshots rather than using a single reference snapshot. This addresses issues with xmin propagation during long-running validations. The validation now takes a fresh snapshot every few pages, allowing the xmin horizon to advance. This restores feature of commit d9d076222f5b, which was reverted in commit e28bb8851969. New STIR-based approach is not depends on single reference snapshot anymore. --- src/backend/access/heap/README.HOT | 4 +- src/backend/access/heap/heapam_handler.c | 73 +++++++++++++++++++++--- src/backend/access/spgist/spgvacuum.c | 12 +++- src/backend/catalog/index.c | 42 ++++++++++---- src/backend/commands/indexcmds.c | 50 ++-------------- src/include/access/tableam.h | 25 ++++---- src/include/access/transam.h | 15 +++++ src/include/catalog/index.h | 2 +- 8 files changed, 139 insertions(+), 84 deletions(-) diff --git a/src/backend/access/heap/README.HOT b/src/backend/access/heap/README.HOT index 28e2a1604c4..604bdda59ff 100644 --- a/src/backend/access/heap/README.HOT +++ b/src/backend/access/heap/README.HOT @@ -401,12 +401,12 @@ live tuple. We mark the index open for inserts (but still not ready for reads) then we again wait for transactions which have the table open. Then validate the index. This searches for tuples missing from the index in auxiliary -index, and inserts any missing ones if them visible to reference snapshot. +index, and inserts any missing ones if them visible to fresh snapshot. Again, the index entries have to have TIDs equal to HOT-chain root TIDs, but the value to be inserted is the one from the live tuple. Then we wait until every transaction that could have a snapshot older than -the second reference snapshot is finished. This ensures that nobody is +the latest used snapshot is finished. This ensures that nobody is alive any longer who could need to see any tuples that might be missing from the index, as well as ensuring that no one can see any inconsistent rows in a broken HOT chain (the first condition is stronger than the diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index c85e5332ba2..12baa8728d5 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -1996,23 +1996,26 @@ heapam_index_validate_scan_read_stream_next( return result; } -static void +static TransactionId heapam_index_validate_scan(Relation heapRelation, Relation indexRelation, IndexInfo *indexInfo, - Snapshot snapshot, ValidateIndexState *state, ValidateIndexState *auxState) { + TransactionId limitXmin; + Datum values[INDEX_MAX_KEYS]; bool isnull[INDEX_MAX_KEYS]; + Snapshot snapshot; TupleTableSlot *slot; EState *estate; ExprContext *econtext; BufferAccessStrategy bstrategy = GetAccessStrategy(BAS_BULKREAD); - int num_to_check; + int num_to_check, + page_read_counter = 1; /* set to 1 to skip snapshot reset at start */ Tuplestorestate *tuples_for_check; ValidateIndexScanState callback_private_data; @@ -2023,14 +2026,16 @@ heapam_index_validate_scan(Relation heapRelation, /* Use 10% of memory for tuple store. */ int store_work_mem_part = maintenance_work_mem / 10; - /* - * Encode TIDs as int8 values for the sort, rather than directly sorting - * item pointers. This can be significantly faster, primarily because TID - * is a pass-by-reference type on all platforms, whereas int8 is - * pass-by-value on most platforms. - */ + PushActiveSnapshot(GetTransactionSnapshot()); + tuples_for_check = tuplestore_begin_datum(INT8OID, false, false, store_work_mem_part); + PopActiveSnapshot(); + InvalidateCatalogSnapshot(); + + Assert(!HaveRegisteredOrActiveSnapshot()); + Assert(!TransactionIdIsValid(MyProc->xmin)); + /* * sanity checks */ @@ -2046,6 +2051,29 @@ heapam_index_validate_scan(Relation heapRelation, state->tuplesort = auxState->tuplesort = NULL; + /* + * Now take the first snapshot that will be used by to filter candidate + * tuples. We are going to replace it by newer snapshot every so often + * to propagate horizon. + * + * Beware! There might still be snapshots in use that treat some transaction + * as in-progress that our temporary snapshot treats as committed. + * + * If such a recently-committed transaction deleted tuples in the table, + * we will not include them in the index; yet those transactions which + * see the deleting one as still-in-progress will expect such tuples to + * be there once we mark the index as valid. + * + * We solve this by waiting for all endangered transactions to exit before + * we mark the index as valid, for that reason limitXmin is supported. + * + * We also set ActiveSnapshot to this snap, since functions in indexes may + * need a snapshot. + */ + snapshot = RegisterSnapshot(GetLatestSnapshot()); + PushActiveSnapshot(snapshot); + limitXmin = snapshot->xmin; + estate = CreateExecutorState(); econtext = GetPerTupleExprContext(estate); slot = MakeSingleTupleTableSlot(RelationGetDescr(heapRelation), @@ -2079,6 +2107,7 @@ heapam_index_validate_scan(Relation heapRelation, LockBuffer(buf, BUFFER_LOCK_SHARE); block_number = BufferGetBlockNumber(buf); + page_read_counter++; i = 0; while ((off = tuples[i]) != InvalidOffsetNumber) @@ -2134,6 +2163,20 @@ heapam_index_validate_scan(Relation heapRelation, } ReleaseBuffer(buf); +#define VALIDATE_INDEX_RESET_SNAPSHOT_EACH_N_PAGE 4096 + if (page_read_counter % VALIDATE_INDEX_RESET_SNAPSHOT_EACH_N_PAGE == 0) + { + PopActiveSnapshot(); + UnregisterSnapshot(snapshot); + /* to make sure we propagate xmin */ + InvalidateCatalogSnapshot(); + Assert(!TransactionIdIsValid(MyProc->xmin)); + + snapshot = RegisterSnapshot(GetLatestSnapshot()); + PushActiveSnapshot(snapshot); + /* xmin should not go backwards, but just for the case*/ + limitXmin = TransactionIdNewer(limitXmin, snapshot->xmin); + } } ExecDropSingleTupleTableSlot(slot); @@ -2143,9 +2186,21 @@ heapam_index_validate_scan(Relation heapRelation, read_stream_end(read_stream); tuplestore_end(tuples_for_check); + /* + * Drop the latest snapshot. We must do this before waiting out other + * snapshot holders, else we will deadlock against other processes also + * doing CREATE INDEX CONCURRENTLY, which would see our snapshot as one + * they must wait for. + */ + PopActiveSnapshot(); + UnregisterSnapshot(snapshot); + InvalidateCatalogSnapshot(); + Assert(MyProc->xmin == InvalidTransactionId); /* These may have been pointing to the now-gone estate */ indexInfo->ii_ExpressionsState = NIL; indexInfo->ii_PredicateState = NULL; + + return limitXmin; } /* diff --git a/src/backend/access/spgist/spgvacuum.c b/src/backend/access/spgist/spgvacuum.c index 71ef2e5036f..81406d8fc2b 100644 --- a/src/backend/access/spgist/spgvacuum.c +++ b/src/backend/access/spgist/spgvacuum.c @@ -191,14 +191,16 @@ vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer, * Add target TID to pending list if the redirection could have * happened since VACUUM started. (If xid is invalid, assume it * must have happened before VACUUM started, since REINDEX - * CONCURRENTLY locks out VACUUM.) + * CONCURRENTLY locks out VACUUM, if myXmin is invalid it is + * validation scan.) * * Note: we could make a tighter test by seeing if the xid is * "running" according to the active snapshot; but snapmgr.c * doesn't currently export a suitable API, and it's not entirely * clear that a tighter test is worth the cycles anyway. */ - if (TransactionIdFollowsOrEquals(dt->xid, bds->myXmin)) + if (!TransactionIdIsValid(bds->myXmin) || + TransactionIdFollowsOrEquals(dt->xid, bds->myXmin)) spgAddPendingTID(bds, &dt->pointer); } else @@ -808,7 +810,6 @@ spgvacuumscan(spgBulkDeleteState *bds) /* Finish setting up spgBulkDeleteState */ initSpGistState(&bds->spgstate, index); bds->pendingList = NULL; - bds->myXmin = GetActiveSnapshot()->xmin; bds->lastFilledBlock = SPGIST_LAST_FIXED_BLKNO; /* @@ -959,6 +960,10 @@ spgbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, bds.stats = stats; bds.callback = callback; bds.callback_state = callback_state; + if (info->validate_index) + bds.myXmin = InvalidTransactionId; + else + bds.myXmin = GetActiveSnapshot()->xmin; spgvacuumscan(&bds); @@ -999,6 +1004,7 @@ spgvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats) bds.stats = stats; bds.callback = dummy_callback; bds.callback_state = NULL; + bds.myXmin = GetActiveSnapshot()->xmin; spgvacuumscan(&bds); } diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index 2044b724ba5..2415e1f2f39 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -3513,8 +3513,9 @@ IndexCheckExclusion(Relation heapRelation, * insert their new tuples into it. At the same moment we clear "indisready" for * auxiliary index, since it is no more required to be updated. * - * We then take a new reference snapshot, any tuples that are valid according - * to this snap, but are not in the index, must be added to the index. + * We then take a new snapshot, any tuples that are valid according + * to this snap, but are not in the index, must be added to the index. In + * order to propagate xmin we reset that snapshot every few so often. * (Any tuples committed live after the snap will be inserted into the * index by their originating transaction. Any tuples committed dead before * the snap need not be indexed, because we will wait out all transactions @@ -3527,7 +3528,7 @@ IndexCheckExclusion(Relation heapRelation, * TIDs of both auxiliary and target indexes, and doing a "merge join" against * the TID lists to see which tuples from auxiliary index are missing from the * target index. Thus we will ensure that all tuples valid according to the - * reference snapshot are in the index. Notice we need to do bulkdelete in the + * latest snapshot are in the index. Notice we need to do bulkdelete in the * particular order: auxiliary first, target last. * * Building a unique index this way is tricky: we might try to insert a @@ -3548,13 +3549,14 @@ IndexCheckExclusion(Relation heapRelation, * * Also, some actions to concurrent drop the auxiliary index are performed. */ -void -validate_index(Oid heapId, Oid indexId, Oid auxIndexId, Snapshot snapshot) +TransactionId +validate_index(Oid heapId, Oid indexId, Oid auxIndexId) { Relation heapRelation, indexRelation, auxIndexRelation; IndexInfo *indexInfo; + TransactionId limitXmin; IndexVacuumInfo ivinfo, auxivinfo; ValidateIndexState state, auxState; Oid save_userid; @@ -3604,8 +3606,12 @@ validate_index(Oid heapId, Oid indexId, Oid auxIndexId, Snapshot snapshot) * Fetch info needed for index_insert. (You might think this should be * passed in from DefineIndex, but its copy is long gone due to having * been built in a previous transaction.) + * + * We might need snapshot for index expressions or predicates. */ + PushActiveSnapshot(GetTransactionSnapshot()); indexInfo = BuildIndexInfo(indexRelation); + PopActiveSnapshot(); /* mark build is concurrent just for consistency */ indexInfo->ii_Concurrent = true; @@ -3641,6 +3647,9 @@ validate_index(Oid heapId, Oid indexId, Oid auxIndexId, Snapshot snapshot) NULL, TUPLESORT_NONE); auxState.htups = auxState.itups = auxState.tups_inserted = 0; + /* tuplesort_begin_datum may require catalog snapshot */ + InvalidateCatalogSnapshot(); + (void) index_bulk_delete(&auxivinfo, NULL, validate_index_callback, &auxState); /* If aux index is empty, merge may be skipped */ @@ -3675,6 +3684,9 @@ validate_index(Oid heapId, Oid indexId, Oid auxIndexId, Snapshot snapshot) NULL, TUPLESORT_NONE); state.htups = state.itups = state.tups_inserted = 0; + /* tuplesort_begin_datum may require catalog snapshot */ + InvalidateCatalogSnapshot(); + /* ambulkdelete updates progress metrics */ (void) index_bulk_delete(&ivinfo, NULL, validate_index_callback, &state); @@ -3694,19 +3706,24 @@ validate_index(Oid heapId, Oid indexId, Oid auxIndexId, Snapshot snapshot) pgstat_progress_update_multi_param(3, progress_index, progress_vals); } tuplesort_performsort(state.tuplesort); + /* tuplesort_performsort may require catalog snapshot */ + InvalidateCatalogSnapshot(); + tuplesort_performsort(auxState.tuplesort); + /* tuplesort_performsort may require catalog snapshot */ + InvalidateCatalogSnapshot(); + Assert(!TransactionIdIsValid(MyProc->xmin)); /* * Now merge both indexes */ pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE, PROGRESS_CREATEIDX_PHASE_VALIDATE_IDXMERGE); - table_index_validate_scan(heapRelation, - indexRelation, - indexInfo, - snapshot, - &state, - &auxState); + limitXmin = table_index_validate_scan(heapRelation, + indexRelation, + indexInfo, + &state, + &auxState); /* Tuple sort closed by table_index_validate_scan */ Assert(state.tuplesort == NULL && auxState.tuplesort == NULL); @@ -3729,6 +3746,9 @@ validate_index(Oid heapId, Oid indexId, Oid auxIndexId, Snapshot snapshot) index_close(auxIndexRelation, NoLock); index_close(indexRelation, NoLock); table_close(heapRelation, NoLock); + + Assert(!TransactionIdIsValid(MyProc->xmin)); + return limitXmin; } /* diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c index ca4dc003d15..75152d69b86 100644 --- a/src/backend/commands/indexcmds.c +++ b/src/backend/commands/indexcmds.c @@ -591,7 +591,6 @@ DefineIndex(Oid tableId, LockRelId heaprelid; LOCKTAG heaplocktag; LOCKMODE lockmode; - Snapshot snapshot; Oid root_save_userid; int root_save_sec_context; int root_save_nestlevel; @@ -1805,32 +1804,11 @@ DefineIndex(Oid tableId, /* Tell concurrent index builds to ignore us, if index qualifies */ if (safe_index) set_indexsafe_procflags(); - - /* - * Now take the "reference snapshot" that will be used by validate_index() - * to filter candidate tuples. Beware! There might still be snapshots in - * use that treat some transaction as in-progress that our reference - * snapshot treats as committed. If such a recently-committed transaction - * deleted tuples in the table, we will not include them in the index; yet - * those transactions which see the deleting one as still-in-progress will - * expect such tuples to be there once we mark the index as valid. - * - * We solve this by waiting for all endangered transactions to exit before - * we mark the index as valid. - * - * We also set ActiveSnapshot to this snap, since functions in indexes may - * need a snapshot. - */ - snapshot = RegisterSnapshot(GetTransactionSnapshot()); - PushActiveSnapshot(snapshot); /* * Merge content of auxiliary and target indexes - insert any missing index entries. */ - validate_index(tableId, indexRelationId, auxIndexRelationId, snapshot); - limitXmin = snapshot->xmin; + limitXmin = validate_index(tableId, indexRelationId, auxIndexRelationId); - PopActiveSnapshot(); - UnregisterSnapshot(snapshot); /* * The snapshot subsystem could still contain registered snapshots that * are holding back our process's advertised xmin; in particular, if @@ -1852,8 +1830,8 @@ DefineIndex(Oid tableId, /* * The index is now valid in the sense that it contains all currently * interesting tuples. But since it might not contain tuples deleted just - * before the reference snap was taken, we have to wait out any - * transactions that might have older snapshots. + * before the last snapshot during validating was taken, we have to wait + * out any transactions that might have older snapshots. */ INJECTION_POINT("define_index_before_set_valid", NULL); pgstat_progress_update_param(PROGRESS_CREATEIDX_PHASE, @@ -4401,7 +4379,6 @@ ReindexRelationConcurrently(const ReindexStmt *stmt, Oid relationOid, const Rein { ReindexIndexInfo *newidx = lfirst(lc); TransactionId limitXmin; - Snapshot snapshot; StartTransactionCommand(); @@ -4416,13 +4393,6 @@ ReindexRelationConcurrently(const ReindexStmt *stmt, Oid relationOid, const Rein if (newidx->safe) set_indexsafe_procflags(); - /* - * Take the "reference snapshot" that will be used by validate_index() - * to filter candidate tuples. - */ - snapshot = RegisterSnapshot(GetTransactionSnapshot()); - PushActiveSnapshot(snapshot); - /* * Update progress for the index to build, with the correct parent * table involved. @@ -4434,16 +4404,8 @@ ReindexRelationConcurrently(const ReindexStmt *stmt, Oid relationOid, const Rein progress_vals[3] = newidx->amId; pgstat_progress_update_multi_param(4, progress_index, progress_vals); - validate_index(newidx->tableId, newidx->indexId, newidx->auxIndexId, snapshot); - - /* - * We can now do away with our active snapshot, we still need to save - * the xmin limit to wait for older snapshots. - */ - limitXmin = snapshot->xmin; - - PopActiveSnapshot(); - UnregisterSnapshot(snapshot); + limitXmin = validate_index(newidx->tableId, newidx->indexId, newidx->auxIndexId); + Assert(!TransactionIdIsValid(MyProc->xmin)); /* * To ensure no deadlocks, we must commit and start yet another @@ -4456,7 +4418,7 @@ ReindexRelationConcurrently(const ReindexStmt *stmt, Oid relationOid, const Rein /* * The index is now valid in the sense that it contains all currently * interesting tuples. But since it might not contain tuples deleted - * just before the reference snap was taken, we have to wait out any + * just before the latest snap was taken, we have to wait out any * transactions that might have older snapshots. * * Because we don't take a snapshot or Xid in this transaction, diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 22446b32157..5fa60e8e37b 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -702,12 +702,11 @@ typedef struct TableAmRoutine TableScanDesc scan); /* see table_index_validate_scan for reference about parameters */ - void (*index_validate_scan) (Relation table_rel, - Relation index_rel, - IndexInfo *index_info, - Snapshot snapshot, - ValidateIndexState *state, - ValidateIndexState *aux_state); + TransactionId (*index_validate_scan) (Relation table_rel, + Relation index_rel, + IndexInfo *index_info, + ValidateIndexState *state, + ValidateIndexState *aux_state); /* ------------------------------------------------------------------------ @@ -1808,20 +1807,18 @@ table_index_build_range_scan(Relation table_rel, * Note: it is responsibility of that function to close sortstates in * both `state` and `auxstate`. */ -static inline void +static inline TransactionId table_index_validate_scan(Relation table_rel, Relation index_rel, IndexInfo *index_info, - Snapshot snapshot, ValidateIndexState *state, ValidateIndexState *auxstate) { - table_rel->rd_tableam->index_validate_scan(table_rel, - index_rel, - index_info, - snapshot, - state, - auxstate); + return table_rel->rd_tableam->index_validate_scan(table_rel, + index_rel, + index_info, + state, + auxstate); } diff --git a/src/include/access/transam.h b/src/include/access/transam.h index c9e20418275..b4a444a66e6 100644 --- a/src/include/access/transam.h +++ b/src/include/access/transam.h @@ -417,6 +417,21 @@ NormalTransactionIdOlder(TransactionId a, TransactionId b) return b; } +/* return the newer of the two IDs */ +static inline TransactionId +TransactionIdNewer(TransactionId a, TransactionId b) +{ + if (!TransactionIdIsValid(a)) + return b; + + if (!TransactionIdIsValid(b)) + return a; + + if (TransactionIdFollows(a, b)) + return a; + return b; +} + /* return the newer of the two IDs */ static inline FullTransactionId FullTransactionIdNewer(FullTransactionId a, FullTransactionId b) diff --git a/src/include/catalog/index.h b/src/include/catalog/index.h index c29f44f2465..051ac02ff9c 100644 --- a/src/include/catalog/index.h +++ b/src/include/catalog/index.h @@ -152,7 +152,7 @@ extern void index_build(Relation heapRelation, bool isreindex, bool parallel); -extern void validate_index(Oid heapId, Oid indexId, Oid auxIndexId, Snapshot snapshot); +extern TransactionId validate_index(Oid heapId, Oid indexId, Oid auxIndexId); extern void index_set_state_flags(Oid indexId, IndexStateFlagsAction action); -- 2.43.0