From e317848bac4a20ac331239672315d78143c80605 Mon Sep 17 00:00:00 2001 From: Antonin Houska Date: Thu, 8 Jan 2026 17:47:50 +0100 Subject: [PATCH 6/6] Use multiple snapshots to copy the data. REPACK (CONCURRENTLY) does not prevent applications from using the table that is being processed, however it can prevent the xmin horizon from advancing and thus restrict VACUUM for the whole database. This patch adds the ability to use particular snapshot only for certain range of pages. Each time that number of pages is processed, a new snapshot is built, which supposedly has its xmin higher than the previous snapshot. The data copying works as follows: 1. Have the logical decoding system build a snapshot S0 for range R0 at LSN0. This snapshot sees all the data changes whose commit records have LSN < LSN0. 2. Copy the pages in that range to the new relation. The changes not visible to the snapshot (because their transactions are still running) will appear in the output of the logical decoding system as soon as their commit records appear in WAL. 3. Perform logical decoding of all changes we find in WAL for the table we're repacking, put them aside and remember that out of these we can only apply those that affect the range R0 in the old relation. (Naturally, we cannot apply ones that belong to other pages because it's impossible to UPDATE / DELETE a row in the new relation if it hasn't been copied yet.) Once the decoding is done, consider LSN1 to be the position of the end of the last WAL record decoded. 4. Build a new snapshot S1 at position LSN1, i.e. one that sees all the data whose commit records are at WAL positions < LSN1. Use this snapshot to copy the range of pages R1. 5. Perform logical decoding like in step 3, but remember, that out of this next set, only changes belonging to ranges R0 *and* R1 in the old table can be applied. 6. etc Note that the changes decoded above should not be applied to the new relation until the whole relation has been copied. The point is that we need "identity index" to apply UPDATE and DELETE statements, and bulk creation of indexes on the already copied heap is probably better than retail insertions during the copying. Special attention needs to be paid to UPDATES that span page ranges. For example, if the old tuple is in range R0, but the new tuple is in R1, and R1 hasn't been copied yet, we only DELETE the old version from the new relation. The new version will be handled during processing of range R1. The snapshot S1 will be based on WAL position following that UPDATE, so it'll see the new tuple if its transaction's commit record is at WAL position lower than the position where we built the snapshot. On the other hand, if the commit record appears at higher position than the that of the snapshot, the corresponding INSERT will be decoded and replayed sometime later: once the scan of R1 started, changes of tuples belonging to it are no longer filtered out. Likewise, if the old tuple is in range R1 (not yet copied) but the new tuple is in R0, we only perform INSERT on the new relation. The deletion of the old version will either be visible to the snapshot S1 (i.e. the snapshot won't see the old version), or replayed later. This approach introduces one limitation though: if the USING INDEX clause is specified, an explicit sort is always used. Index scan wouldn't work because it does not return the tuples sorted by CTID. That way we wouldn't be able to split the copying into ranges of pages. I'm not sure it's serious. If REPACK runs concurrently and does not restrict VACUUM, the execution time should not be critical. A new GUC repack_snapshot_after can be used to set the number of pages per snapshot. It's currently classified as DEVELOPER_OPTIONS and may be replaced by a constant after enough evaluation is done. --- src/backend/access/heap/heapam_handler.c | 144 ++++- src/backend/commands/cluster.c | 589 +++++++++++++----- src/backend/replication/logical/decode.c | 47 +- src/backend/replication/logical/logical.c | 30 +- .../replication/logical/reorderbuffer.c | 50 ++ src/backend/replication/logical/snapbuild.c | 27 +- .../pgoutput_repack/pgoutput_repack.c | 2 + src/backend/utils/misc/guc_parameters.dat | 10 + src/backend/utils/misc/guc_tables.c | 1 + src/include/access/tableam.h | 14 +- src/include/commands/cluster.h | 72 +++ src/include/replication/logical.h | 2 +- src/include/replication/reorderbuffer.h | 1 + src/include/replication/snapbuild.h | 2 +- src/tools/pgindent/typedefs.list | 3 +- 15 files changed, 803 insertions(+), 191 deletions(-) diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index 475c536ce43..9c02d91d327 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -33,6 +33,7 @@ #include "catalog/index.h" #include "catalog/storage.h" #include "catalog/storage_xlog.h" +#include "commands/cluster.h" #include "commands/progress.h" #include "executor/executor.h" #include "miscadmin.h" @@ -686,12 +687,12 @@ static void heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, Relation OldIndex, bool use_sort, TransactionId OldestXmin, - Snapshot snapshot, TransactionId *xid_cutoff, MultiXactId *multi_cutoff, double *num_tuples, double *tups_vacuumed, - double *tups_recently_dead) + double *tups_recently_dead, + void *tableam_data) { RewriteState rwstate = NULL; IndexScanDesc indexScan; @@ -707,7 +708,10 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, bool *isnull; BufferHeapTupleTableSlot *hslot; BlockNumber prev_cblock = InvalidBlockNumber; - bool concurrent = snapshot != NULL; + ConcurrentChangeContext *ctx = (ConcurrentChangeContext *) tableam_data; + bool concurrent = ctx != NULL; + Snapshot snapshot = NULL; + BlockNumber range_end = InvalidBlockNumber; /* Remember if it's a system catalog */ is_system_catalog = IsSystemRelation(OldHeap); @@ -744,8 +748,9 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, * that still need to be copied, we scan with SnapshotAny and use * HeapTupleSatisfiesVacuum for the visibility test. * - * In the CONCURRENTLY case, we do regular MVCC visibility tests, using - * the snapshot passed by the caller. + * In the CONCURRENTLY case, we do regular MVCC visibility tests. The + * snapshot changes several times during the scan so that we do not block + * the progress of the xmin horizon for VACUUM too much. */ if (OldIndex != NULL && !use_sort) { @@ -773,10 +778,15 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, pgstat_progress_update_param(PROGRESS_REPACK_PHASE, PROGRESS_REPACK_PHASE_SEQ_SCAN_HEAP); - tableScan = table_beginscan(OldHeap, - snapshot ? snapshot : SnapshotAny, - 0, (ScanKey) NULL); + tableScan = table_beginscan(OldHeap, SnapshotAny, 0, (ScanKey) NULL); heapScan = (HeapScanDesc) tableScan; + + /* + * In CONCURRENTLY mode we scan the table by ranges of blocks and the + * algorithm below expects forward direction. (No other direction + * should be set here regardless concurrently anyway.) + */ + Assert(heapScan->rs_dir == ForwardScanDirection || !concurrent); indexScan = NULL; /* Set total heap blocks */ @@ -787,6 +797,24 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, slot = table_slot_create(OldHeap, NULL); hslot = (BufferHeapTupleTableSlot *) slot; + if (concurrent) + { + /* + * Do not block the progress of xmin horizons. + * + * TODO Analyze thoroughly if this might have bad consequences. + */ + PopActiveSnapshot(); + InvalidateCatalogSnapshot(); + + /* + * Wait until the worker has the initial snapshot and retrieve it. + */ + snapshot = repack_get_snapshot(ctx); + + PushActiveSnapshot(snapshot); + } + /* * Scan through the OldHeap, either in OldIndex order or sequentially; * copy each tuple into the NewHeap, or transiently to the tuplesort @@ -803,6 +831,13 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, if (indexScan != NULL) { + /* + * Index scan should not be used in the CONCURRENTLY case because + * it returns tuples in random order, so we could not split the + * scan into a series of page ranges. + */ + Assert(!concurrent); + if (!index_getnext_slot(indexScan, ForwardScanDirection, slot)) break; @@ -824,6 +859,18 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, */ pgstat_progress_update_param(PROGRESS_REPACK_HEAP_BLKS_SCANNED, heapScan->rs_nblocks); + + if (concurrent) + { + PopActiveSnapshot(); + + /* + * For the last range, there are no restriction on block + * numbers, so the concurrent data changes pertaining to + * this range can decoded (and applied) anytime after this + * loop. + */ + } break; } @@ -922,6 +969,75 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, continue; } } + else + { + BlockNumber blkno; + bool visible; + + /* + * With CONCURRENTLY, we use each snapshot only for certain range + * of pages, so that VACUUM does not get block for too long. So + * first check if the tuple falls into the current range. + */ + blkno = BufferGetBlockNumber(buf); + + /* The first block of the scan? */ + if (!BlockNumberIsValid(ctx->first_block)) + { + Assert(!BlockNumberIsValid(range_end)); + + ctx->first_block = blkno; + range_end = repack_blocks_per_snapshot; + } + else + { + Assert(BlockNumberIsValid(range_end)); + + /* End of the current range? */ + if (blkno >= range_end) + { + XLogRecPtr end_of_wal; + + PopActiveSnapshot(); + + /* + * XXX It might be worth Assert(CatalogSnapshot == NULL) + * here, however that symbol is not external. + */ + + /* + * Decode all the concurrent data changes committed so far + * - these will be applicable to the current range. + */ + end_of_wal = GetFlushRecPtr(NULL); + repack_get_concurrent_changes(ctx, end_of_wal, range_end, + true, false); + + /* + * Define the next range. + */ + range_end = blkno + repack_blocks_per_snapshot; + + /* + * Get the snapshot for the next range - it should have + * been built at the position right after the last change + * decoded. Data present in the next range of blocks will + * either be visible to the snapshot or appear in the next + * batch of decoded changes. + */ + snapshot = repack_get_snapshot(ctx); + PushActiveSnapshot(snapshot); + } + } + + /* Finally check the tuple visibility. */ + LockBuffer(buf, BUFFER_LOCK_SHARE); + visible = HeapTupleSatisfiesVisibility(tuple, snapshot, buf); + LockBuffer(buf, BUFFER_LOCK_UNLOCK); + + if (!visible) + continue; + } *num_tuples += 1; if (tuplesort != NULL) @@ -956,6 +1072,18 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, } } + if (concurrent) + { + XLogRecPtr end_of_wal; + + /* Decode the changes belonging to the last range. */ + end_of_wal = GetFlushRecPtr(NULL); + repack_get_concurrent_changes(ctx, end_of_wal, InvalidBlockNumber, + false, false); + + PushActiveSnapshot(GetTransactionSnapshot()); + } + if (indexScan != NULL) index_endscan(indexScan); if (tableScan != NULL) diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c index 5232fbfb57d..8affa859abc 100644 --- a/src/backend/commands/cluster.c +++ b/src/backend/commands/cluster.c @@ -111,52 +111,27 @@ typedef struct static RelFileLocator repacked_rel_locator = {.relNumber = InvalidOid}; static RelFileLocator repacked_rel_toast_locator = {.relNumber = InvalidOid}; -/* - * Everything we need to call ExecInsertIndexTuples(). - */ -typedef struct IndexInsertState -{ - ResultRelInfo *rri; - EState *estate; -} IndexInsertState; - /* The WAL segment being decoded. */ static XLogSegNo repack_current_segment = 0; /* - * The first file exported by the decoding worker must contain a snapshot, the - * following ones contain the data changes. + * When REPACK (CONCURRENTLY) copies data to the new heap, a new snapshot is + * built after processing this many pages. */ -#define WORKER_FILE_SNAPSHOT 0 +int repack_blocks_per_snapshot = 1024; /* - * Information needed to apply concurrent data changes. + * Remember here to which pages should applied to changes recorded in given + * file. */ -typedef struct ChangeDest +typedef struct RepackApplyRange { - /* The relation the changes are applied to. */ - Relation rel; + /* The first block of the next range. */ + BlockNumber end; - /* - * The following is needed to find the existing tuple if the change is - * UPDATE or DELETE. 'ident_key' should have all the fields except for - * 'sk_argument' initialized. - */ - Relation ident_index; - ScanKey ident_key; - int ident_key_nentries; - - /* Needed to update indexes of rel_dst. */ - IndexInsertState *iistate; - - /* - * Sequential number of the file containing the changes. - * - * TODO This field makes the structure name less descriptive. Should we - * rename it, e.g. to ChangeApplyInfo? - */ - int file_seq; -} ChangeDest; + /* File containing the changes to be applied to blocks in this range. */ + char *fname; +} RepackApplyRange; /* * Layout of shared memory used for communication between backend and the @@ -167,6 +142,9 @@ typedef struct DecodingWorkerShared /* Is the decoding initialized? */ bool initialized; + /* Set to request a snapshot. */ + bool snapshot_requested; + /* * Once the worker has reached this LSN, it should close the current * output file and either create a new one or exit, according to the field @@ -174,6 +152,8 @@ typedef struct DecodingWorkerShared * the WAL available and keep checking this field. It is ok if the worker * had already decoded records whose LSN is >= lsn_upto before this field * has been set. + * + * Set a valid LSN to request data changes. */ XLogRecPtr lsn_upto; @@ -184,7 +164,8 @@ typedef struct DecodingWorkerShared SharedFileSet sfs; /* Number of the last file exported by the worker. */ - int last_exported; + int last_exported_snapshot; + int last_exported_changes; /* Synchronize access to the fields above. */ slock_t mutex; @@ -226,26 +207,14 @@ typedef struct DecodingWorkerShared * the fileset name.) */ static inline void -DecodingWorkerFileName(char *fname, Oid relid, uint32 seq) +DecodingWorkerFileName(char *fname, Oid relid, uint32 seq, bool snapshot) { - snprintf(fname, MAXPGPATH, "%u-%u", relid, seq); + if (!snapshot) + snprintf(fname, MAXPGPATH, "%u-%u", relid, seq); + else + snprintf(fname, MAXPGPATH, "%u-%u-snapshot", relid, seq); } -/* - * Backend-local information to control the decoding worker. - */ -typedef struct DecodingWorker -{ - /* The worker. */ - BackgroundWorkerHandle *handle; - - /* DecodingWorkerShared is in this segment. */ - dsm_segment *seg; - - /* Handle of the error queue. */ - shm_mq_handle *error_mqh; -} DecodingWorker; - /* Pointer to currently running decoding worker. */ static DecodingWorker *decoding_worker = NULL; @@ -262,11 +231,11 @@ static void check_repack_concurrently_requirements(Relation rel); static void rebuild_relation(Relation OldHeap, Relation index, bool verbose, bool concurrent); static void copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, - Snapshot snapshot, bool verbose, bool *pSwapToastByContent, TransactionId *pFreezeXid, - MultiXactId *pCutoffMulti); + MultiXactId *pCutoffMulti, + ConcurrentChangeContext *ctx); static List *get_tables_to_repack(RepackCommand cmd, bool usingindex, MemoryContext permcxt); static List *get_tables_to_repack_partitioned(RepackCommand cmd, @@ -276,9 +245,12 @@ static bool cluster_is_permitted_for_relation(RepackCommand cmd, Oid relid, Oid userid); static LogicalDecodingContext *setup_logical_decoding(Oid relid); -static bool decode_concurrent_changes(LogicalDecodingContext *ctx, +static bool decode_concurrent_changes(LogicalDecodingContext *decoding_ctx, DecodingWorkerShared *shared); -static void apply_concurrent_changes(BufFile *file, ChangeDest *dest); +static void apply_concurrent_changes(ConcurrentChangeContext *ctx); +static void apply_concurrent_changes_file(ConcurrentChangeContext *ctx, + BufFile *file, + BlockNumber range_end); static void apply_concurrent_insert(Relation rel, HeapTuple tup, IndexInsertState *iistate, TupleTableSlot *index_slot); @@ -287,12 +259,14 @@ static void apply_concurrent_update(Relation rel, HeapTuple tup, IndexInsertState *iistate, TupleTableSlot *index_slot); static void apply_concurrent_delete(Relation rel, HeapTuple tup_target); -static HeapTuple find_target_tuple(Relation rel, ChangeDest *dest, +static bool is_tuple_in_block_range(HeapTuple tup, BlockNumber start, + BlockNumber end); +static HeapTuple find_target_tuple(Relation rel, + ConcurrentChangeContext *ctx, HeapTuple tup_key, TupleTableSlot *ident_slot); -static void process_concurrent_changes(XLogRecPtr end_of_wal, - ChangeDest *dest, - bool done); +static void repack_add_block_range(ConcurrentChangeContext *ctx, + BlockNumber end, char *fname); static IndexInsertState *get_index_insert_state(Relation relation, Oid ident_index_id, Relation *ident_index_p); @@ -303,7 +277,8 @@ static void cleanup_logical_decoding(LogicalDecodingContext *ctx); static void rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, Relation cl_index, TransactionId frozenXid, - MultiXactId cutoffMulti); + MultiXactId cutoffMulti, + ConcurrentChangeContext *ctx); static List *build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes); static Relation process_single_relation(RepackStmt *stmt, LOCKMODE lockmode, @@ -314,9 +289,8 @@ static Oid determine_clustered_index(Relation rel, bool usingindex, static void start_decoding_worker(Oid relid); static void stop_decoding_worker(void); static void repack_worker_internal(dsm_segment *seg); -static void export_initial_snapshot(Snapshot snapshot, - DecodingWorkerShared *shared); -static Snapshot get_initial_snapshot(DecodingWorker *worker); +static void export_snapshot(Snapshot snapshot, + DecodingWorkerShared *shared); static void ProcessRepackMessage(StringInfo msg); static const char *RepackCommandAsString(RepackCommand cmd); @@ -402,7 +376,15 @@ ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel) { rel = process_single_relation(stmt, lockmode, isTopLevel, ¶ms); if (rel == NULL) + { + /* + * The original transaction was committed, so the current + * portal will not pop the active snapshot. + */ + PopActiveSnapshot(); + return; /* all done */ + } } /* @@ -1020,6 +1002,15 @@ check_repack_concurrently_requirements(Relation rel) RelationGetRelationName(rel)), (errhint("Relation \"%s\" has no identity index.", RelationGetRelationName(rel))))); + + /* + * In the CONCURRENTLY mode we don't want to use the same snapshot + * throughout the whole processing, as it could block the progress of xmin + * horizon. + */ + if (IsolationUsesXactSnapshot()) + ereport(ERROR, + (errmsg("REPACK (CONCURRENTLY) does not support transaction isolation higher than READ COMMITTED"))); } @@ -1050,7 +1041,7 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose, bool concurrent bool swap_toast_by_content; TransactionId frozenXid; MultiXactId cutoffMulti; - Snapshot snapshot = NULL; + ConcurrentChangeContext *ctx = NULL; #if USE_ASSERT_CHECKING LOCKMODE lmode; @@ -1062,6 +1053,13 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose, bool concurrent if (concurrent) { + /* + * This is only needed here to gather the data changes and range + * information during the copying. The fields needed to apply the + * changes be filled later. + */ + ctx = palloc0_object(ConcurrentChangeContext); + /* * The worker needs to be member of the locking group we're the leader * of. We ought to become the leader before the worker starts. The @@ -1087,13 +1085,7 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose, bool concurrent * REPACK CONCURRENTLY. */ start_decoding_worker(tableOid); - - /* - * Wait until the worker has the initial snapshot and retrieve it. - */ - snapshot = get_initial_snapshot(decoding_worker); - - PushActiveSnapshot(snapshot); + ctx->worker = decoding_worker; } /* for CLUSTER or REPACK USING INDEX, mark the index as the one to use */ @@ -1117,21 +1109,25 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose, bool concurrent NewHeap = table_open(OIDNewHeap, NoLock); /* Copy the heap data into the new table in the desired order */ - copy_table_data(NewHeap, OldHeap, index, snapshot, verbose, - &swap_toast_by_content, &frozenXid, &cutoffMulti); - - /* The historic snapshot won't be needed anymore. */ - if (snapshot) + if (concurrent) { - PopActiveSnapshot(); - UpdateActiveSnapshotCommandId(); + ctx->first_block = InvalidBlockNumber; + ctx->block_ranges = NIL; } + copy_table_data(NewHeap, OldHeap, index, verbose, &swap_toast_by_content, + &frozenXid, &cutoffMulti, ctx); if (concurrent) { + /* + * Make sure the active snapshot can see the data copied, so the rows + * can be updated / deleted. + */ + UpdateActiveSnapshotCommandId(); + Assert(!swap_toast_by_content); rebuild_relation_finish_concurrent(NewHeap, OldHeap, index, - frozenXid, cutoffMulti); + frozenXid, cutoffMulti, ctx); pgstat_progress_update_param(PROGRESS_REPACK_PHASE, PROGRESS_REPACK_PHASE_FINAL_CLEANUP); @@ -1295,9 +1291,6 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod, /* * Do the physical copying of table data. * - * 'snapshot' and 'decoding_ctx': see table_relation_copy_for_cluster(). Pass - * iff concurrent processing is required. - * * There are three output parameters: * *pSwapToastByContent is set true if toast tables must be swapped by content. * *pFreezeXid receives the TransactionId used as freeze cutoff point. @@ -1305,8 +1298,9 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod, */ static void copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, - Snapshot snapshot, bool verbose, bool *pSwapToastByContent, - TransactionId *pFreezeXid, MultiXactId *pCutoffMulti) + bool verbose, bool *pSwapToastByContent, + TransactionId *pFreezeXid, MultiXactId *pCutoffMulti, + ConcurrentChangeContext *ctx) { Relation relRelation; HeapTuple reltup; @@ -1323,7 +1317,7 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, int elevel = verbose ? INFO : DEBUG2; PGRUsage ru0; char *nspname; - bool concurrent = snapshot != NULL; + bool concurrent = ctx != NULL; LOCKMODE lmode; lmode = concurrent ? ShareUpdateExclusiveLock : AccessExclusiveLock; @@ -1435,8 +1429,18 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, * provided, else plain seqscan. */ if (OldIndex != NULL && OldIndex->rd_rel->relam == BTREE_AM_OID) - use_sort = plan_cluster_use_sort(RelationGetRelid(OldHeap), - RelationGetRelid(OldIndex)); + { + if (!concurrent) + use_sort = plan_cluster_use_sort(RelationGetRelid(OldHeap), + RelationGetRelid(OldIndex)); + else + + /* + * To use multiple snapshots, we need to process the table + * sequentially. + */ + use_sort = true; + } else use_sort = false; @@ -1465,11 +1469,11 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, * values (e.g. because the AM doesn't use freezing). */ table_relation_copy_for_cluster(OldHeap, NewHeap, OldIndex, use_sort, - cutoffs.OldestXmin, snapshot, + cutoffs.OldestXmin, &cutoffs.FreezeLimit, &cutoffs.MultiXactCutoff, &num_tuples, &tups_vacuumed, - &tups_recently_dead); + &tups_recently_dead, ctx); /* return selected values to caller, get set as relfrozenxid/minmxid */ *pFreezeXid = cutoffs.FreezeLimit; @@ -2361,6 +2365,8 @@ cluster_is_permitted_for_relation(RepackCommand cmd, Oid relid, Oid userid) * instead return the opened and locked relcache entry, so that caller can * process the partitions using the multiple-table handling code. In this * case, if an index name is given, it's up to the caller to resolve it. + * + * A new transaction is started in either case. */ static Relation process_single_relation(RepackStmt *stmt, LOCKMODE lockmode, bool isTopLevel, @@ -2373,6 +2379,25 @@ process_single_relation(RepackStmt *stmt, LOCKMODE lockmode, bool isTopLevel, Assert(stmt->command == REPACK_COMMAND_CLUSTER || stmt->command == REPACK_COMMAND_REPACK); + /* + * Since REPACK (CONCURRENTLY) pops the active snapshot during the + * processing (it creates and pushes snapshots on its own), and since that + * snapshot can be referenced by the current portal, we need to make sure + * that the portal has no dangling pointer to the snapshot. Starting a new + * transaction seems to be the simplest way. + */ + PopActiveSnapshot(); + CommitTransactionCommand(); + + /* Start a new transaction. */ + StartTransactionCommand(); + + /* + * Functions in indexes may want a snapshot set. Note that the portal is + * not aware of this one, so the caller needs to pop it explicitly. + */ + PushActiveSnapshot(GetTransactionSnapshot()); + /* Find, lock, and check permissions on the table. */ tableOid = RangeVarGetRelidExtended(stmt->relation->relation, lockmode, @@ -2675,6 +2700,7 @@ decode_concurrent_changes(LogicalDecodingContext *ctx, DecodingWorkerShared *shared) { RepackDecodingState *dstate; + bool snapshot_requested; XLogRecPtr lsn_upto; bool done; char fname[MAXPGPATH]; @@ -2682,11 +2708,14 @@ decode_concurrent_changes(LogicalDecodingContext *ctx, dstate = (RepackDecodingState *) ctx->output_writer_private; /* Open the output file. */ - DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1); + DecodingWorkerFileName(fname, shared->relid, + shared->last_exported_changes + 1, + false); dstate->file = BufFileCreateFileSet(&shared->sfs.fs, fname); SpinLockAcquire(&shared->mutex); lsn_upto = shared->lsn_upto; + snapshot_requested = shared->snapshot_requested; done = shared->done; SpinLockRelease(&shared->mutex); @@ -2752,6 +2781,7 @@ decode_concurrent_changes(LogicalDecodingContext *ctx, { SpinLockAcquire(&shared->mutex); lsn_upto = shared->lsn_upto; + snapshot_requested = shared->snapshot_requested; /* 'done' should be set at the same time as 'lsn_upto' */ done = shared->done; SpinLockRelease(&shared->mutex); @@ -2796,27 +2826,105 @@ decode_concurrent_changes(LogicalDecodingContext *ctx, */ BufFileClose(dstate->file); dstate->file = NULL; + + /* + * Before publishing the data changes, export the snapshot too if + * requested. Publishing both at once makes sense because both are needed + * at the same time, and it's simpler. + */ + if (snapshot_requested) + { + Snapshot snapshot; + + snapshot = SnapBuildSnapshotForRepack(ctx->snapshot_builder); + export_snapshot(snapshot, shared); + + /* + * Adjust the replication slot's xmin so that VACUUM can do more work. + */ + LogicalIncreaseXminForSlot(InvalidXLogRecPtr, snapshot->xmin, false); + FreeSnapshot(snapshot); + } + else + { + /* + * If data changes were requested but no following snapshot, we don't + * care about xmin horizon because the heap copying should be done by + * now. + */ + LogicalIncreaseXminForSlot(InvalidXLogRecPtr, InvalidTransactionId, + false); + + } + + /* + * Now increase the counter(s) to announce that the output is + * available. + */ SpinLockAcquire(&shared->mutex); + shared->last_exported_changes++; shared->lsn_upto = InvalidXLogRecPtr; - shared->last_exported++; + if (snapshot_requested) + { + shared->last_exported_snapshot++; + shared->snapshot_requested = false; + } SpinLockRelease(&shared->mutex); + ConditionVariableSignal(&shared->cv); return done; } /* - * Apply changes stored in 'file'. + * Apply all concurrent changes. */ static void -apply_concurrent_changes(BufFile *file, ChangeDest *dest) +apply_concurrent_changes(ConcurrentChangeContext *ctx) +{ + DecodingWorkerShared *shared; + + shared = (DecodingWorkerShared *) dsm_segment_address(decoding_worker->seg); + + foreach_ptr(RepackApplyRange, range, ctx->block_ranges) + { + BufFile *file; + + file = BufFileOpenFileSet(&shared->sfs.fs, range->fname, O_RDONLY, + false); + + /* + * If range end is valid, the start should be as well. + */ + Assert(!BlockNumberIsValid(range->end) || + BlockNumberIsValid(ctx->first_block)); + + apply_concurrent_changes_file(ctx, file, range->end); + BufFileClose(file); + + pfree(range->fname); + pfree(range); + } + + /* Get ready for the next decoding. */ + ctx->block_ranges = NIL; + ctx->first_block = InvalidBlockNumber; +} + +/* + * Apply concurrent changes stored in 'file'. + */ +static void +apply_concurrent_changes_file(ConcurrentChangeContext *ctx, BufFile *file, + BlockNumber range_end) { char kind; uint32 t_len; - Relation rel = dest->rel; + Relation rel = ctx->rel; TupleTableSlot *index_slot, *ident_slot; HeapTuple tup_old = NULL; + bool check_range = BlockNumberIsValid(range_end); /* TupleTableSlot is needed to pass the tuple to ExecInsertIndexTuples(). */ index_slot = MakeSingleTupleTableSlot(RelationGetDescr(rel), @@ -2844,8 +2952,8 @@ apply_concurrent_changes(BufFile *file, ChangeDest *dest) tup->t_data = (HeapTupleHeader) ((char *) tup + HEAPTUPLESIZE); BufFileReadExact(file, tup->t_data, t_len); tup->t_len = t_len; - ItemPointerSetInvalid(&tup->t_self); - tup->t_tableOid = RelationGetRelid(dest->rel); + tup->t_tableOid = RelationGetRelid(ctx->rel); + BufFileReadExact(file, &tup->t_self, sizeof(tup->t_self)); if (kind == CHANGE_UPDATE_OLD) { @@ -2856,7 +2964,10 @@ apply_concurrent_changes(BufFile *file, ChangeDest *dest) { Assert(tup_old == NULL); - apply_concurrent_insert(rel, tup, dest->iistate, index_slot); + if (!check_range || + is_tuple_in_block_range(tup, ctx->first_block, range_end)) + apply_concurrent_insert(rel, tup, ctx->iistate, + index_slot); pfree(tup); } @@ -2877,16 +2988,52 @@ apply_concurrent_changes(BufFile *file, ChangeDest *dest) /* * Find the tuple to be updated or deleted. */ - tup_exist = find_target_tuple(rel, dest, tup_key, ident_slot); - if (tup_exist == NULL) - elog(ERROR, "failed to find target tuple"); + if (!check_range || + (is_tuple_in_block_range(tup_key, ctx->first_block, + range_end))) + { + /* The change needs to be applied to this tuple. */ + tup_exist = find_target_tuple(rel, ctx, tup_key, ident_slot); + if (tup_exist == NULL) + elog(ERROR, "failed to find target tuple"); - if (kind == CHANGE_UPDATE_NEW) - apply_concurrent_update(rel, tup, tup_exist, dest->iistate, - index_slot); + if (kind == CHANGE_DELETE) + apply_concurrent_delete(rel, tup_exist); + else + { + /* UPDATE */ + if (!check_range || tup == tup_key || + is_tuple_in_block_range(tup, ctx->first_block, + range_end)) + /* The new tuple is in the same range. */ + apply_concurrent_update(rel, tup, tup_exist, + ctx->iistate, index_slot); + else + + /* + * The new key is in the other range, so only delete + * it from the current one. The new version should be + * visible to the snapshot that we'll use to copy the + * other block. + */ + apply_concurrent_delete(rel, tup_exist); + } + } else - apply_concurrent_delete(rel, tup_exist); - + { + /* + * The change belongs to another range, so we don't need to + * bother with the old tuple: the snapshot used for the other + * range won't see it, so it won't be copied. However, the new + * tuple still may need to go to the range we are checking. In + * that case, simply insert it there. + */ + if (kind == CHANGE_UPDATE_NEW && tup != tup_key && + is_tuple_in_block_range(tup, ctx->first_block, + range_end)) + apply_concurrent_insert(rel, tup, ctx->iistate, + index_slot); + } if (tup_old != NULL) { pfree(tup_old); @@ -3025,6 +3172,33 @@ apply_concurrent_delete(Relation rel, HeapTuple tup_target) pgstat_progress_incr_param(PROGRESS_REPACK_HEAP_TUPLES_DELETED, 1); } +/* + * Check if tuple originates from given range of blocks that have already been + * copied. + */ +static bool +is_tuple_in_block_range(HeapTuple tup, BlockNumber start, BlockNumber end) +{ + BlockNumber blknum; + + Assert(BlockNumberIsValid(start) && BlockNumberIsValid(end)); + + blknum = ItemPointerGetBlockNumber(&tup->t_self); + Assert(BlockNumberIsValid(blknum)); + + if (start < end) + { + return blknum >= start && blknum < end; + } + else + { + /* Has the scan position wrapped around? */ + Assert(start > end); + + return blknum >= start || blknum < end; + } +} + /* * Find the tuple to be updated or deleted. * @@ -3034,10 +3208,10 @@ apply_concurrent_delete(Relation rel, HeapTuple tup_target) * it when he no longer needs the tuple returned. */ static HeapTuple -find_target_tuple(Relation rel, ChangeDest *dest, HeapTuple tup_key, - TupleTableSlot *ident_slot) +find_target_tuple(Relation rel, ConcurrentChangeContext *ctx, + HeapTuple tup_key, TupleTableSlot *ident_slot) { - Relation ident_index = dest->ident_index; + Relation ident_index = ctx->ident_index; IndexScanDesc scan; Form_pg_index ident_form; int2vector *ident_indkey; @@ -3045,14 +3219,14 @@ find_target_tuple(Relation rel, ChangeDest *dest, HeapTuple tup_key, /* XXX no instrumentation for now */ scan = index_beginscan(rel, ident_index, GetActiveSnapshot(), - NULL, dest->ident_key_nentries, 0); + NULL, ctx->ident_key_nentries, 0); /* * Scan key is passed by caller, so it does not have to be constructed * multiple times. Key entries have all fields initialized, except for * sk_argument. */ - index_rescan(scan, dest->ident_key, dest->ident_key_nentries, NULL, 0); + index_rescan(scan, ctx->ident_key, ctx->ident_key_nentries, NULL, 0); /* Info needed to retrieve key values from heap tuple. */ ident_form = ident_index->rd_index; @@ -3087,15 +3261,22 @@ find_target_tuple(Relation rel, ChangeDest *dest, HeapTuple tup_key, } /* - * Decode and apply concurrent changes, up to (and including) the record whose - * LSN is 'end_of_wal'. + * Get concurrent changes, up to (and including) the record whose LSN is + * 'end_of_wal', from the decoding worker. If 'range_end' is a valid block + * number, the changes should only be applied to blocks greater than or equal + * to ctx->first_block and lower than range_end. + * + * If 'request_snapshot' is true, the snapshot built at LSN following the last + * data change needs to be exported too. */ -static void -process_concurrent_changes(XLogRecPtr end_of_wal, ChangeDest *dest, bool done) +extern void +repack_get_concurrent_changes(ConcurrentChangeContext *ctx, + XLogRecPtr end_of_wal, + BlockNumber range_end, + bool request_snapshot, bool done) { DecodingWorkerShared *shared; char fname[MAXPGPATH]; - BufFile *file; pgstat_progress_update_param(PROGRESS_REPACK_PHASE, PROGRESS_REPACK_PHASE_CATCH_UP); @@ -3104,6 +3285,8 @@ process_concurrent_changes(XLogRecPtr end_of_wal, ChangeDest *dest, bool done) shared = (DecodingWorkerShared *) dsm_segment_address(decoding_worker->seg); SpinLockAcquire(&shared->mutex); shared->lsn_upto = end_of_wal; + Assert(!shared->snapshot_requested); + shared->snapshot_requested = request_snapshot; shared->done = done; SpinLockRelease(&shared->mutex); @@ -3118,30 +3301,52 @@ process_concurrent_changes(XLogRecPtr end_of_wal, ChangeDest *dest, bool done) int last_exported; SpinLockAcquire(&shared->mutex); - last_exported = shared->last_exported; + last_exported = shared->last_exported_changes; SpinLockRelease(&shared->mutex); /* * Has the worker exported the file we are waiting for? */ - if (last_exported == dest->file_seq) + if (last_exported == ctx->file_seq_changes) break; ConditionVariableSleep(&shared->cv, WAIT_EVENT_REPACK_WORKER_EXPORT); } ConditionVariableCancelSleep(); - /* Open the file. */ - DecodingWorkerFileName(fname, shared->relid, dest->file_seq); - file = BufFileOpenFileSet(&shared->sfs.fs, fname, O_RDONLY, false); - apply_concurrent_changes(file, dest); + /* + * Remember the file name so we can apply the changes when appropriate. + * One particular reason to postpone the replay is that indexes haven't + * been built yet on the new heap. + */ + DecodingWorkerFileName(fname, shared->relid, ctx->file_seq_changes, + false); + repack_add_block_range(ctx, range_end, fname); - BufFileClose(file); +#ifdef USE_ASSERT_CHECKING + /* No file is exported until the worker exports the next one. */ + SpinLockAcquire(&shared->mutex); + Assert(XLogRecPtrIsInvalid(shared->lsn_upto)); + SpinLockRelease(&shared->mutex); +#endif + + /* Get ready for the next set of changes. */ + ctx->file_seq_changes++; +} + +static void +repack_add_block_range(ConcurrentChangeContext *ctx, BlockNumber end, + char *fname) +{ + RepackApplyRange *range; - /* Get ready for the next file. */ - dest->file_seq++; + range = palloc_object(RepackApplyRange); + range->end = end; + range->fname = pstrdup(fname); + ctx->block_ranges = lappend(ctx->block_ranges, range); } + /* * Initialize IndexInsertState for index specified by ident_index_id. * @@ -3284,7 +3489,8 @@ static void rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, Relation cl_index, TransactionId frozenXid, - MultiXactId cutoffMulti) + MultiXactId cutoffMulti, + ConcurrentChangeContext *ctx) { LOCKMODE lockmode_old PG_USED_FOR_ASSERTS_ONLY; List *ind_oids_new; @@ -3303,7 +3509,6 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, Relation *ind_refs, *ind_refs_p; int nind; - ChangeDest chgdst; /* Like in cluster_rel(). */ lockmode_old = ShareUpdateExclusiveLock; @@ -3360,12 +3565,18 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, (errmsg("identity index missing on the new relation"))); /* Gather information to apply concurrent changes. */ - chgdst.rel = NewHeap; - chgdst.iistate = get_index_insert_state(NewHeap, ident_idx_new, - &chgdst.ident_index); - chgdst.ident_key = build_identity_key(ident_idx_new, OldHeap, - &chgdst.ident_key_nentries); - chgdst.file_seq = WORKER_FILE_SNAPSHOT + 1; + ctx->rel = NewHeap; + ctx->iistate = get_index_insert_state(NewHeap, ident_idx_new, + &ctx->ident_index); + ctx->ident_key = build_identity_key(ident_idx_new, OldHeap, + &ctx->ident_key_nentries); + + /* + * Replay the concurrent data changes gathered during heap copying. This + * had to wait until after the index build because the identity index is + * needed to apply UPDATE and DELETE changes. + */ + apply_concurrent_changes(ctx); /* * During testing, wait for another backend to perform concurrent data @@ -3383,11 +3594,13 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, end_of_wal = GetFlushRecPtr(NULL); /* - * Apply concurrent changes first time, to minimize the time we need to - * hold AccessExclusiveLock. (Quite some amount of WAL could have been + * Decode and apply concurrent changes again, to minimize the time we need + * to hold AccessExclusiveLock. (Quite some amount of WAL could have been * written during the data copying and index creation.) */ - process_concurrent_changes(end_of_wal, &chgdst, false); + repack_get_concurrent_changes(ctx, end_of_wal, InvalidBlockNumber, false, + false); + apply_concurrent_changes(ctx); /* * Acquire AccessExclusiveLock on the table, its TOAST relation (if there @@ -3482,10 +3695,13 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, end_of_wal = GetFlushRecPtr(NULL); /* - * Apply the concurrent changes again. Indicate that the decoding worker - * won't be needed anymore. + * Decode and apply the concurrent changes again. Indicate that the + * decoding worker won't be needed anymore. */ - process_concurrent_changes(end_of_wal, &chgdst, true); + repack_get_concurrent_changes(ctx, end_of_wal, InvalidBlockNumber, false, + true); + apply_concurrent_changes(ctx); + /* Remember info about rel before closing OldHeap */ relpersistence = OldHeap->rd_rel->relpersistence; @@ -3536,8 +3752,8 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, table_close(NewHeap, NoLock); /* Cleanup what we don't need anymore. (And close the identity index.) */ - pfree(chgdst.ident_key); - free_index_insert_state(chgdst.iistate); + pfree(ctx->ident_key); + free_index_insert_state(ctx->iistate); /* * Swap the relations and their TOAST relations and TOAST indexes. This @@ -3578,6 +3794,23 @@ build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes) char *newName; Relation ind; + /* + * Try to reduce the impact on VACUUM. + * + * The individual builds might still be a problem, but that's a + * separate issue. + * + * TODO Can we somehow use the fact that the new heap is not yet + * visible to other transaction, and thus cannot be vacuumed? Perhaps + * by preventing snapshots from setting MyProc->xmin temporarily. (All + * the snapshots that might have participated in the build, including + * the catalog snapshots, must not be used for other tables of + * course.) + */ + PopActiveSnapshot(); + InvalidateCatalogSnapshot(); + PushActiveSnapshot(GetTransactionSnapshot()); + ind = index_open(ind_oid, ShareUpdateExclusiveLock); newName = ChooseRelationName(get_rel_name(ind_oid), @@ -3616,10 +3849,14 @@ start_decoding_worker(Oid relid) BUFFERALIGN(REPACK_ERROR_QUEUE_SIZE); seg = dsm_create(size, 0); shared = (DecodingWorkerShared *) dsm_segment_address(seg); + shared->initialized = false; shared->lsn_upto = InvalidXLogRecPtr; shared->done = false; + /* Snapshot is the first thing we need from the worker. */ + shared->snapshot_requested = true; SharedFileSetInit(&shared->sfs, seg); - shared->last_exported = -1; + shared->last_exported_changes = -1; + shared->last_exported_snapshot = -1; SpinLockInit(&shared->mutex); shared->dbid = MyDatabaseId; @@ -3828,6 +4065,9 @@ repack_worker_internal(dsm_segment *seg) */ SpinLockAcquire(&shared->mutex); Assert(XLogRecPtrIsInvalid(shared->lsn_upto)); + /* Initially we're expected to provide a snapshot and only that. */ + Assert(shared->snapshot_requested && + XLogRecPtrIsInvalid(shared->lsn_upto)); sfs = &shared->sfs; SpinLockRelease(&shared->mutex); @@ -3845,8 +4085,22 @@ repack_worker_internal(dsm_segment *seg) ConditionVariableSignal(&shared->cv); /* Build the initial snapshot and export it. */ - snapshot = SnapBuildInitialSnapshotForRepack(decoding_ctx->snapshot_builder); - export_initial_snapshot(snapshot, shared); + snapshot = SnapBuildSnapshotForRepack(decoding_ctx->snapshot_builder); + export_snapshot(snapshot, shared); + + /* + * Adjust the replication slot's xmin so that VACUUM can do more work. + */ + LogicalIncreaseXminForSlot(InvalidXLogRecPtr, snapshot->xmin, false); + FreeSnapshot(snapshot); + + /* Increase the counter to tell the backend that the file is available. */ + SpinLockAcquire(&shared->mutex); + Assert(shared->snapshot_requested); + shared->last_exported_snapshot++; + shared->snapshot_requested = false; + SpinLockRelease(&shared->mutex); + ConditionVariableSignal(&shared->cv); /* * Only historic snapshots should be used now. Do not let us restrict the @@ -3866,7 +4120,7 @@ repack_worker_internal(dsm_segment *seg) * Make snapshot available to the backend that launched the decoding worker. */ static void -export_initial_snapshot(Snapshot snapshot, DecodingWorkerShared *shared) +export_snapshot(Snapshot snapshot, DecodingWorkerShared *shared) { char fname[MAXPGPATH]; BufFile *file; @@ -3876,28 +4130,23 @@ export_initial_snapshot(Snapshot snapshot, DecodingWorkerShared *shared) snap_size = EstimateSnapshotSpace(snapshot); snap_space = (char *) palloc(snap_size); SerializeSnapshot(snapshot, snap_space); - FreeSnapshot(snapshot); - DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1); + DecodingWorkerFileName(fname, shared->relid, + shared->last_exported_snapshot + 1, + true); file = BufFileCreateFileSet(&shared->sfs.fs, fname); /* To make restoration easier, write the snapshot size first. */ BufFileWrite(file, &snap_size, sizeof(snap_size)); BufFileWrite(file, snap_space, snap_size); pfree(snap_space); BufFileClose(file); - - /* Increase the counter to tell the backend that the file is available. */ - SpinLockAcquire(&shared->mutex); - shared->last_exported++; - SpinLockRelease(&shared->mutex); - ConditionVariableSignal(&shared->cv); } /* - * Get the initial snapshot from the decoding worker. + * Get snapshot from the decoding worker. */ -static Snapshot -get_initial_snapshot(DecodingWorker *worker) +extern Snapshot +repack_get_snapshot(ConcurrentChangeContext *ctx) { DecodingWorkerShared *shared; char fname[MAXPGPATH]; @@ -3905,13 +4154,15 @@ get_initial_snapshot(DecodingWorker *worker) Size snap_size; char *snap_space; Snapshot snapshot; + DecodingWorker *worker = ctx->worker; shared = (DecodingWorkerShared *) dsm_segment_address(worker->seg); /* - * The worker needs to initialize the logical decoding, which usually - * takes some time. Therefore it makes sense to prepare for the sleep - * first. + * For the first snapshot request, the worker needs to initialize the + * logical decoding, which usually takes some time. Therefore it makes + * sense to prepare for the sleep first. Does it make sense to skip the + * preparation on the next requests? */ ConditionVariablePrepareToSleep(&shared->cv); for (;;) @@ -3919,13 +4170,13 @@ get_initial_snapshot(DecodingWorker *worker) int last_exported; SpinLockAcquire(&shared->mutex); - last_exported = shared->last_exported; + last_exported = shared->last_exported_snapshot; SpinLockRelease(&shared->mutex); /* * Has the worker exported the file we are waiting for? */ - if (last_exported == WORKER_FILE_SNAPSHOT) + if (last_exported == ctx->file_seq_snapshot) break; ConditionVariableSleep(&shared->cv, WAIT_EVENT_REPACK_WORKER_EXPORT); @@ -3933,17 +4184,27 @@ get_initial_snapshot(DecodingWorker *worker) ConditionVariableCancelSleep(); /* Read the snapshot from a file. */ - DecodingWorkerFileName(fname, shared->relid, WORKER_FILE_SNAPSHOT); + DecodingWorkerFileName(fname, shared->relid, ctx->file_seq_snapshot, + true); file = BufFileOpenFileSet(&shared->sfs.fs, fname, O_RDONLY, false); BufFileReadExact(file, &snap_size, sizeof(snap_size)); snap_space = (char *) palloc(snap_size); BufFileReadExact(file, snap_space, snap_size); BufFileClose(file); +#ifdef USE_ASSERT_CHECKING + SpinLockAcquire(&shared->mutex); + Assert(!shared->snapshot_requested); + SpinLockRelease(&shared->mutex); +#endif + /* Restore it. */ snapshot = RestoreSnapshot(snap_space); pfree(snap_space); + /* Get ready for the next snapshot. */ + ctx->file_seq_snapshot++; + return snapshot; } diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index dc8c7be2aca..8f42238ab21 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -920,6 +920,7 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) xl_heap_insert *xlrec; ReorderBufferChange *change; RelFileLocator target_locator; + BlockNumber blknum; xlrec = (xl_heap_insert *) XLogRecGetData(r); @@ -931,7 +932,7 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) return; /* only interested in our database */ - XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL); + XLogRecGetBlockTag(r, 0, &target_locator, NULL, &blknum); if (target_locator.dbOid != ctx->slot->data.database) return; @@ -956,6 +957,15 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple); + /* + * REPACK (CONCURRENTLY) needs block number to check if the corresponding + * part of the table was already copied. + */ + if (am_decoding_for_repack()) + /* offnum is not really needed, but let's set valid pointer. */ + ItemPointerSet(&change->data.tp.newtuple->t_self, blknum, + xlrec->offnum); + change->data.tp.clear_toast_afterwards = true; ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, @@ -977,11 +987,12 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ReorderBufferChange *change; char *data; RelFileLocator target_locator; + BlockNumber new_blknum; xlrec = (xl_heap_update *) XLogRecGetData(r); /* only interested in our database */ - XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL); + XLogRecGetBlockTag(r, 0, &target_locator, NULL, &new_blknum); if (target_locator.dbOid != ctx->slot->data.database) return; @@ -1007,12 +1018,27 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen); DecodeXLogTuple(data, datalen, change->data.tp.newtuple); + + /* + * REPACK (CONCURRENTLY) needs block number to check if the + * corresponding part of the table was already copied. + */ + if (am_decoding_for_repack()) + /* offnum is not really needed, but let's set valid pointer. */ + ItemPointerSet(&change->data.tp.newtuple->t_self, + new_blknum, xlrec->new_offnum); } if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD) { Size datalen; Size tuplelen; + BlockNumber old_blknum; + + if (XLogRecHasBlockRef(r, 1)) + XLogRecGetBlockTag(r, 1, NULL, NULL, &old_blknum); + else + XLogRecGetBlockTag(r, 0, NULL, NULL, &old_blknum); /* caution, remaining data in record is not aligned */ data = XLogRecGetData(r) + SizeOfHeapUpdate; @@ -1023,6 +1049,11 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen); DecodeXLogTuple(data, datalen, change->data.tp.oldtuple); + /* See above. */ + if (am_decoding_for_repack()) + ItemPointerSet(&change->data.tp.oldtuple->t_self, + old_blknum, xlrec->old_offnum); + } change->data.tp.clear_toast_afterwards = true; @@ -1043,6 +1074,7 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) xl_heap_delete *xlrec; ReorderBufferChange *change; RelFileLocator target_locator; + BlockNumber blknum; xlrec = (xl_heap_delete *) XLogRecGetData(r); @@ -1056,7 +1088,7 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) return; /* only interested in our database */ - XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL); + XLogRecGetBlockTag(r, 0, &target_locator, NULL, &blknum); if (target_locator.dbOid != ctx->slot->data.database) return; @@ -1088,6 +1120,15 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete, datalen, change->data.tp.oldtuple); + + /* + * REPACK (CONCURRENTLY) needs block number to check if the + * corresponding part of the table was already copied. + */ + if (am_decoding_for_repack()) + /* offnum is not really needed, but let's set valid pointer. */ + ItemPointerSet(&change->data.tp.oldtuple->t_self, blknum, + xlrec->offnum); } change->data.tp.clear_toast_afterwards = true; diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 35a46988285..76119c5ecaa 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1661,14 +1661,17 @@ update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, /* * Set the required catalog xmin horizon for historic snapshots in the current - * replication slot. + * replication slot if catalog is TRUE, or xmin if catalog is FALSE. * * Note that in the most cases, we won't be able to immediately use the xmin * to increase the xmin horizon: we need to wait till the client has confirmed - * receiving current_lsn with LogicalConfirmReceivedLocation(). + * receiving current_lsn with LogicalConfirmReceivedLocation(). However, + * catalog=FALSE is only allowed for temporary replication slots, so the + * horizon is applied immediately. */ void -LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin) +LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin, + bool catalog) { bool updated_xmin = false; ReplicationSlot *slot; @@ -1679,6 +1682,27 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin) Assert(slot != NULL); SpinLockAcquire(&slot->mutex); + if (!catalog) + { + /* + * The non-catalog horizon can only advance in temporary slots, so + * update it in the shared memory immediately (w/o requiring prior + * saving to disk). + */ + Assert(slot->data.persistency == RS_TEMPORARY); + + /* + * The horizon must not go backwards, however it's o.k. to become + * invalid. + */ + Assert(!TransactionIdIsValid(slot->effective_xmin) || + !TransactionIdIsValid(xmin) || + TransactionIdFollowsOrEquals(xmin, slot->effective_xmin)); + + slot->effective_xmin = xmin; + SpinLockRelease(&slot->mutex); + return; + } /* * don't overwrite if we already have a newer xmin. This can happen if we diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index a0293f6ec7c..3003cadd76e 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -3734,6 +3734,56 @@ ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid) return rbtxn_has_catalog_changes(txn); } +/* + * Check if a transaction (or its subtransaction) contains a heap change. + */ +bool +ReorderBufferXidHasHeapChanges(ReorderBuffer *rb, TransactionId xid) +{ + ReorderBufferTXN *txn; + dlist_iter iter; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + if (txn == NULL) + return false; + + dlist_foreach(iter, &txn->changes) + { + ReorderBufferChange *change; + + change = dlist_container(ReorderBufferChange, node, iter.cur); + + switch (change->action) + { + case REORDER_BUFFER_CHANGE_INSERT: + case REORDER_BUFFER_CHANGE_UPDATE: + case REORDER_BUFFER_CHANGE_DELETE: + return true; + default: + break; + } + } + + /* Check subtransactions. */ + + /* + * TODO Verify that subtransactions must be assigned to the top-level + * transactions by now. + */ + dlist_foreach(iter, &txn->subtxns) + { + ReorderBufferTXN *subtxn; + + subtxn = dlist_container(ReorderBufferTXN, node, iter.cur); + + if (ReorderBufferXidHasHeapChanges(rb, subtxn->xid)) + return true; + } + + return false; +} + /* * ReorderBufferXidHasBaseSnapshot * Have we already set the base snapshot for the given txn/subtxn? diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index e238bcd73cd..fbc24de6e24 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -128,6 +128,7 @@ #include "access/heapam_xlog.h" #include "access/transam.h" #include "access/xact.h" +#include "commands/cluster.h" #include "common/file_utils.h" #include "miscadmin.h" #include "pgstat.h" @@ -496,7 +497,7 @@ SnapBuildInitialSnapshot(SnapBuild *builder) * we do not set MyProc->xmin). XXX Do we yet need to add some restrictions? */ Snapshot -SnapBuildInitialSnapshotForRepack(SnapBuild *builder) +SnapBuildSnapshotForRepack(SnapBuild *builder) { Snapshot snap; @@ -1035,6 +1036,28 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, } } + /* + * Is REPACKED (CONCURRENTLY) is being run by this backend? + */ + else if (am_decoding_for_repack()) + { + Assert(builder->building_full_snapshot); + + /* + * In this special mode, heap changes of other relations should not be + * decoded at all - see heap_decode(). Thus if we find a single heap + * change in this transaction (or its subtransaction), we know that + * this transaction changes the relation being repacked. + */ + if (ReorderBufferXidHasHeapChanges(builder->reorder, xid)) + + /* + * Record the commit so we can build snapshots for the relation + * being repacked. + */ + needs_timetravel = true; + } + for (nxact = 0; nxact < nsubxacts; nxact++) { TransactionId subxid = subxacts[nxact]; @@ -1240,7 +1263,7 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact xmin = running->oldestRunningXid; elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u", builder->xmin, builder->xmax, running->oldestRunningXid, xmin); - LogicalIncreaseXminForSlot(lsn, xmin); + LogicalIncreaseXminForSlot(lsn, xmin, true); /* * Also tell the slot where we can restart decoding from. We don't want to diff --git a/src/backend/replication/pgoutput_repack/pgoutput_repack.c b/src/backend/replication/pgoutput_repack/pgoutput_repack.c index fb9956d392d..be1c3ec9626 100644 --- a/src/backend/replication/pgoutput_repack/pgoutput_repack.c +++ b/src/backend/replication/pgoutput_repack/pgoutput_repack.c @@ -195,6 +195,8 @@ store_change(LogicalDecodingContext *ctx, ConcurrentChangeKind kind, BufFileWrite(dstate->file, &tuple->t_len, sizeof(tuple->t_len)); /* ... and the tuple itself. */ BufFileWrite(dstate->file, tuple->t_data, tuple->t_len); + /* CTID is needed as well, to check block ranges. */ + BufFileWrite(dstate->file, &tuple->t_self, sizeof(tuple->t_self)); /* Free the flat copy if created above. */ if (flattened) diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index 7c60b125564..24f29f0016e 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -2424,6 +2424,16 @@ boot_val => 'true', }, +# TODO Tune boot_val, 1024 is probably too low. +{ name => 'repack_snapshot_after', type => 'int', context => 'PGC_USERSET', group => 'DEVELOPER_OPTIONS', + short_desc => 'Number of pages after which REPACK (CONCURRENTLY) builds a new snapshot.', + flags => 'GUC_UNIT_BLOCKS | GUC_NOT_IN_SAMPLE', + variable => 'repack_blocks_per_snapshot', + boot_val => '1024', + min => '1', + max => 'INT_MAX', +} + { name => 'reserved_connections', type => 'int', context => 'PGC_POSTMASTER', group => 'CONN_AUTH_SETTINGS', short_desc => 'Sets the number of connection slots reserved for roles with privileges of pg_use_reserved_connections.', variable => 'ReservedConnections', diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 73ff6ad0a32..55c761de759 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -42,6 +42,7 @@ #include "catalog/namespace.h" #include "catalog/storage.h" #include "commands/async.h" +#include "commands/cluster.h" #include "commands/extension.h" #include "commands/event_trigger.h" #include "commands/tablespace.h" diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 15760363a1a..03ba89e6989 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -629,12 +629,12 @@ typedef struct TableAmRoutine Relation OldIndex, bool use_sort, TransactionId OldestXmin, - Snapshot snapshot, TransactionId *xid_cutoff, MultiXactId *multi_cutoff, double *num_tuples, double *tups_vacuumed, - double *tups_recently_dead); + double *tups_recently_dead, + void *tableam_data); /* * React to VACUUM command on the relation. The VACUUM can be triggered by @@ -1647,8 +1647,6 @@ table_relation_copy_data(Relation rel, const RelFileLocator *newrlocator) * not needed for the relation's AM * - *xid_cutoff - ditto * - *multi_cutoff - ditto - * - snapshot - if != NULL, ignore data changes done by transactions that this - * (MVCC) snapshot considers still in-progress or in the future. * * Output parameters: * - *xid_cutoff - rel's new relfrozenxid value, may be invalid @@ -1661,19 +1659,19 @@ table_relation_copy_for_cluster(Relation OldTable, Relation NewTable, Relation OldIndex, bool use_sort, TransactionId OldestXmin, - Snapshot snapshot, TransactionId *xid_cutoff, MultiXactId *multi_cutoff, double *num_tuples, double *tups_vacuumed, - double *tups_recently_dead) + double *tups_recently_dead, + void *tableam_data) { OldTable->rd_tableam->relation_copy_for_cluster(OldTable, NewTable, OldIndex, use_sort, OldestXmin, - snapshot, xid_cutoff, multi_cutoff, num_tuples, tups_vacuumed, - tups_recently_dead); + tups_recently_dead, + tableam_data); } /* diff --git a/src/include/commands/cluster.h b/src/include/commands/cluster.h index 1b05d5d418b..438ee0d751e 100644 --- a/src/include/commands/cluster.h +++ b/src/include/commands/cluster.h @@ -46,6 +46,72 @@ typedef struct ClusterParams * The following definitions are used by REPACK CONCURRENTLY. */ +extern PGDLLIMPORT int repack_blocks_per_snapshot; + +/* + * Everything we need to call ExecInsertIndexTuples(). + */ +typedef struct IndexInsertState +{ + ResultRelInfo *rri; + EState *estate; +} IndexInsertState; + +/* + * Backend-local information to control the decoding worker. + */ +typedef struct DecodingWorker +{ + /* The worker. */ + BackgroundWorkerHandle *handle; + + /* DecodingWorkerShared is in this segment. */ + dsm_segment *seg; + + /* Handle of the error queue. */ + shm_mq_handle *error_mqh; +} DecodingWorker; + +/* + * Information needed to handle concurrent data changes. + */ +typedef struct ConcurrentChangeContext +{ + /* The relation the changes are applied to. */ + Relation rel; + + /* + * Background worker performing logical decoding of concurrent data + * changes. + */ + DecodingWorker *worker; + + /* + * Sequential numbers of the most recent files containing snapshots and + * data changes respectively. These files are created by the decoding + * worker. + */ + int file_seq_snapshot; + int file_seq_changes; + + /* + * The following is needed to find the existing tuple if the change is + * UPDATE or DELETE. 'ident_key' should have all the fields except for + * 'sk_argument' initialized. + */ + Relation ident_index; + ScanKey ident_key; + int ident_key_nentries; + + /* Needed to update indexes of rel_dst. */ + IndexInsertState *iistate; + + /* The first block of the scan used to copy the heap. */ + BlockNumber first_block; + /* List of RepackApplyRange objects. */ + List *block_ranges; +} ConcurrentChangeContext; + /* * Stored as a single byte in the output file. */ @@ -103,6 +169,12 @@ extern void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, extern bool am_decoding_for_repack(void); extern bool change_useless_for_repack(XLogRecordBuffer *buf); +extern void repack_get_concurrent_changes(struct ConcurrentChangeContext *ctx, + XLogRecPtr end_of_wal, + BlockNumber range_end, + bool request_snapshot, + bool done); +extern Snapshot repack_get_snapshot(struct ConcurrentChangeContext *ctx); extern void RepackWorkerMain(Datum main_arg); #endif /* CLUSTER_H */ diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 5b43e181135..432dca928e3 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -137,7 +137,7 @@ extern bool DecodingContextReady(LogicalDecodingContext *ctx); extern void FreeDecodingContext(LogicalDecodingContext *ctx); extern void LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, - TransactionId xmin); + TransactionId xmin, bool catalog); extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn); extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn); diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 314e35592c0..19df5f4a9ee 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -763,6 +763,7 @@ extern void ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRe extern void ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn); extern bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid); +extern bool ReorderBufferXidHasHeapChanges(ReorderBuffer *rb, TransactionId xid); extern bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid); extern bool ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid, diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index 5ee267d1c90..b20a4d1a93d 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -73,7 +73,7 @@ extern void FreeSnapshotBuilder(SnapBuild *builder); extern void SnapBuildSnapDecRefcount(Snapshot snap); extern Snapshot SnapBuildInitialSnapshot(SnapBuild *builder); -extern Snapshot SnapBuildInitialSnapshotForRepack(SnapBuild *builder); +extern Snapshot SnapBuildSnapshotForRepack(SnapBuild *builder); extern Snapshot SnapBuildMVCCFromHistoric(Snapshot snapshot, bool in_place); extern const char *SnapBuildExportSnapshot(SnapBuild *builder); extern void SnapBuildClearExportedSnapshot(void); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index d1a694f9008..220a2b43aa1 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -419,7 +419,6 @@ CatCacheHeader CatalogId CatalogIdMapEntry CatalogIndexState -ChangeDest ChangeVarNodes_callback ChangeVarNodes_context CheckPoint @@ -496,6 +495,7 @@ CompressFileHandle CompressionLocation CompressorState ComputeXidHorizonsResult +ConcurrentChangeContext ConcurrentChangeKind ConditionVariable ConditionVariableMinimallyPadded @@ -2575,6 +2575,7 @@ ReorderBufferTupleCidKey ReorderBufferUpdateProgressTxnCB ReorderTuple RepOriginId +RepackApplyRange RepackCommand RepackDecodingState RepackStmt -- 2.47.3