From 32accb96480ddf42847ae23f30f23636d64eeb50 Mon Sep 17 00:00:00 2001 From: Antonin Houska Date: Sat, 13 Dec 2025 19:27:18 +0100 Subject: [PATCH 6/6] Use multiple snapshots to copy the data. REPACK (CONCURRENTLY) does not prevent applications from using the table that is being processed, however it can prevent the xmin horizon from advancing and thus restrict VACUUM for the whole database. This patch adds the ability to use particular snapshot only for certain range of pages. Each time that number of pages is processed, a new snapshot is built, which supposedly has its xmin higher than the previous snapshot. The data copying works as follows: 1. Have the logical decoding system build a snapshot S0 for range R0 at LSN0. This snapshot sees all the data changes whose commit records have LSN < LSN0. 2. Copy the pages in that range to the new relation. The changes not visible to the snapshot (because their transactions are still running) will appear in the output of the logical decoding system as soon as their commit records appear in WAL. 3. Perform logical decoding of all changes we find in WAL for the table we're repacking, put them aside and remember that out of these we can only apply those that affect the range R0 in the old relation. (Naturally, we cannot apply ones that belong to other pages because it's impossible to UPDATE / DELETE a row in the new relation if it hasn't been copied yet.) Once the decoding is done, consider LSN1 to be the position of the end of the last WAL record decoded. 4. Build a new snapshot S1 at position LSN1, i.e. one that sees all the data whose commit records are at WAL positions < LSN1. Use this snapshot to copy the range of pages R1. 5. Perform logical decoding like in step 3, but remember, that out of this next set, only changes belonging to ranges R0 *and* R1 in the old table can be applied. 6. etc Note that the changes decoded above should not be applied to the new relation until the whole relation has been copied. The point is that we need "identity index" to apply UPDATE and DELETE statements, and bulk creation of indexes on the already copied heap is probably better than retail insertions during the copying. Special attention needs to be paid to UPDATES that span page ranges. For example, if the old tuple is in range R0, but the new tuple is in R1, and R1 hasn't been copied yet, we only DELETE the old version from the new relation. The new version will be handled during processing of range R1. The snapshot S1 will be based on WAL position following that UPDATE, so it'll see the new tuple if its transaction's commit record is at WAL position lower than the position where we built the snapshot. On the other hand, if the commit record appears at higher position than the that of the snapshot, the corresponding INSERT will be decoded and replayed sometime later: once the scan of R1 started, changes of tuples belonging to it are no longer filtered out. Likewise, if the old tuple is in range R1 (not yet copied) but the new tuple is in R0, we only perform INSERT on the new relation. The deletion of the old version will either be visible to the snapshot S1 (i.e. the snapshot won't see the old version), or replayed later. This approach introduces one limitation though: if the USING INDEX clause is specified, an explicit sort is always used. Index scan wouldn't work because it does not return the tuples sorted by CTID. That way we wouldn't be able to split the copying into ranges of pages. I'm not sure it's serious. If REPACK runs concurrently and does not restrict VACUUM, the execution time should not be critical. A new GUC repack_snapshot_after can be used to set the number of pages per snapshot. It's currently classified as DEVELOPER_OPTIONS and may be replaced by a constant after enough evaluation is done. --- src/backend/access/heap/heapam_handler.c | 144 ++++- src/backend/commands/cluster.c | 561 +++++++++++++----- src/backend/replication/logical/decode.c | 47 +- src/backend/replication/logical/logical.c | 30 +- .../replication/logical/reorderbuffer.c | 50 ++ src/backend/replication/logical/snapbuild.c | 27 +- .../pgoutput_repack/pgoutput_repack.c | 2 + src/backend/utils/misc/guc_parameters.dat | 10 + src/backend/utils/misc/guc_tables.c | 1 + src/include/access/tableam.h | 14 +- src/include/commands/cluster.h | 65 ++ src/include/replication/logical.h | 2 +- src/include/replication/reorderbuffer.h | 1 + src/include/replication/snapbuild.h | 2 +- src/tools/pgindent/typedefs.list | 3 +- 15 files changed, 771 insertions(+), 188 deletions(-) diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index cb09e6fd1dc..b87fad605e4 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -33,6 +33,7 @@ #include "catalog/index.h" #include "catalog/storage.h" #include "catalog/storage_xlog.h" +#include "commands/cluster.h" #include "commands/progress.h" #include "executor/executor.h" #include "miscadmin.h" @@ -686,12 +687,12 @@ static void heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, Relation OldIndex, bool use_sort, TransactionId OldestXmin, - Snapshot snapshot, TransactionId *xid_cutoff, MultiXactId *multi_cutoff, double *num_tuples, double *tups_vacuumed, - double *tups_recently_dead) + double *tups_recently_dead, + void *tableam_data) { RewriteState rwstate = NULL; IndexScanDesc indexScan; @@ -707,7 +708,10 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, bool *isnull; BufferHeapTupleTableSlot *hslot; BlockNumber prev_cblock = InvalidBlockNumber; - bool concurrent = snapshot != NULL; + ConcurrentChangeContext *ctx = (ConcurrentChangeContext *) tableam_data; + bool concurrent = ctx != NULL; + Snapshot snapshot = NULL; + BlockNumber range_end = InvalidBlockNumber; /* Remember if it's a system catalog */ is_system_catalog = IsSystemRelation(OldHeap); @@ -744,8 +748,9 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, * that still need to be copied, we scan with SnapshotAny and use * HeapTupleSatisfiesVacuum for the visibility test. * - * In the CONCURRENTLY case, we do regular MVCC visibility tests, using - * the snapshot passed by the caller. + * In the CONCURRENTLY case, we do regular MVCC visibility tests. The + * snapshot changes several times during the scan so that we do not block + * the progress of the xmin horizon for VACUUM too much. */ if (OldIndex != NULL && !use_sort) { @@ -773,10 +778,15 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, pgstat_progress_update_param(PROGRESS_REPACK_PHASE, PROGRESS_REPACK_PHASE_SEQ_SCAN_HEAP); - tableScan = table_beginscan(OldHeap, - snapshot ? snapshot : SnapshotAny, - 0, (ScanKey) NULL); + tableScan = table_beginscan(OldHeap, SnapshotAny, 0, (ScanKey) NULL); heapScan = (HeapScanDesc) tableScan; + + /* + * In CONCURRENTLY mode we scan the table by ranges of blocks and the + * algorithm below expects forward direction. (No other direction + * should be set here regardless concurrently anyway.) + */ + Assert(heapScan->rs_dir == ForwardScanDirection || !concurrent); indexScan = NULL; /* Set total heap blocks */ @@ -787,6 +797,24 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, slot = table_slot_create(OldHeap, NULL); hslot = (BufferHeapTupleTableSlot *) slot; + if (concurrent) + { + /* + * Do not block the progress of xmin horizons. + * + * TODO Analyze thoroughly if this might have bad consequences. + */ + PopActiveSnapshot(); + InvalidateCatalogSnapshot(); + + /* + * Wait until the worker has the initial snapshot and retrieve it. + */ + snapshot = repack_get_snapshot(ctx); + + PushActiveSnapshot(snapshot); + } + /* * Scan through the OldHeap, either in OldIndex order or sequentially; * copy each tuple into the NewHeap, or transiently to the tuplesort @@ -803,6 +831,13 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, if (indexScan != NULL) { + /* + * Index scan should not be used in the CONCURRENTLY case because + * it returns tuples in random order, so we could not split the + * scan into a series of page ranges. + */ + Assert(!concurrent); + if (!index_getnext_slot(indexScan, ForwardScanDirection, slot)) break; @@ -824,6 +859,18 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, */ pgstat_progress_update_param(PROGRESS_REPACK_HEAP_BLKS_SCANNED, heapScan->rs_nblocks); + + if (concurrent) + { + PopActiveSnapshot(); + + /* + * For the last range, there are no restriction on block + * numbers, so the concurrent data changes pertaining to + * this range can decoded (and applied) anytime after this + * loop. + */ + } break; } @@ -922,6 +969,75 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, continue; } } + else + { + BlockNumber blkno; + bool visible; + + /* + * With CONCURRENTLY, we use each snapshot only for certain range + * of pages, so that VACUUM does not get block for too long. So + * first check if the tuple falls into the current range. + */ + blkno = BufferGetBlockNumber(buf); + + /* The first block of the scan? */ + if (!BlockNumberIsValid(ctx->first_block)) + { + Assert(!BlockNumberIsValid(range_end)); + + ctx->first_block = blkno; + range_end = repack_blocks_per_snapshot; + } + else + { + Assert(BlockNumberIsValid(range_end)); + + /* End of the current range? */ + if (blkno >= range_end) + { + XLogRecPtr end_of_wal; + + PopActiveSnapshot(); + + /* + * XXX It might be worth Assert(CatalogSnapshot == NULL) + * here, however that symbol is not external. + */ + + /* + * Decode all the concurrent data changes committed so far + * - these will be applicable to the current range. + */ + end_of_wal = GetFlushRecPtr(NULL); + repack_get_concurrent_changes(ctx, end_of_wal, range_end, + true, false); + + /* + * Define the next range. + */ + range_end = blkno + repack_blocks_per_snapshot; + + /* + * Get the snapshot for the next range - it should have + * been built at the position right after the last change + * decoded. Data present in the next range of blocks will + * either be visible to the snapshot or appear in the next + * batch of decoded changes. + */ + snapshot = repack_get_snapshot(ctx); + PushActiveSnapshot(snapshot); + } + } + + /* Finally check the tuple visibility. */ + LockBuffer(buf, BUFFER_LOCK_SHARE); + visible = HeapTupleSatisfiesVisibility(tuple, snapshot, buf); + LockBuffer(buf, BUFFER_LOCK_UNLOCK); + + if (!visible) + continue; + } *num_tuples += 1; if (tuplesort != NULL) @@ -956,6 +1072,18 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, } } + if (concurrent) + { + XLogRecPtr end_of_wal; + + /* Decode the changes belonging to the last range. */ + end_of_wal = GetFlushRecPtr(NULL); + repack_get_concurrent_changes(ctx, end_of_wal, InvalidBlockNumber, + false, false); + + PushActiveSnapshot(GetTransactionSnapshot()); + } + if (indexScan != NULL) index_endscan(indexScan); if (tableScan != NULL) diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c index b0383c1375f..3eb642b996d 100644 --- a/src/backend/commands/cluster.c +++ b/src/backend/commands/cluster.c @@ -110,38 +110,27 @@ typedef struct RelFileLocator repacked_rel_locator = {.relNumber = InvalidOid}; RelFileLocator repacked_rel_toast_locator = {.relNumber = InvalidOid}; -/* - * Everything we need to call ExecInsertIndexTuples(). - */ -typedef struct IndexInsertState -{ - ResultRelInfo *rri; - EState *estate; -} IndexInsertState; - /* The WAL segment being decoded. */ static XLogSegNo repack_current_segment = 0; /* - * Information needed to apply concurrent data changes. + * When REPACK (CONCURRENTLY) copies data to the new heap, a new snapshot is + * built after processing this many pages. */ -typedef struct ChangeDest -{ - /* The relation the changes are applied to. */ - Relation rel; +int repack_blocks_per_snapshot = 1024; - /* - * The following is needed to find the existing tuple if the change is - * UPDATE or DELETE. 'ident_key' should have all the fields except for - * 'sk_argument' initialized. - */ - Relation ident_index; - ScanKey ident_key; - int ident_key_nentries; +/* + * Remember here to which pages should applied to changes recorded in given + * file. + */ +typedef struct RepackApplyRange +{ + /* The first block of the next range. */ + BlockNumber end; - /* Needed to update indexes of rel_dst. */ - IndexInsertState *iistate; -} ChangeDest; + /* File containing the changes to be applied to blocks in this range. */ + char *fname; +} RepackApplyRange; /* * Layout of shared memory used for communication between backend and the @@ -152,6 +141,9 @@ typedef struct DecodingWorkerShared /* Is the decoding initialized? */ bool initialized; + /* Set to request a snapshot. */ + bool snapshot_requested; + /* * Once the worker has reached this LSN, it should close the current * output file and either create a new one or exit, according to the field @@ -159,20 +151,25 @@ typedef struct DecodingWorkerShared * the WAL available and keep checking this field. It is ok if the worker * had already decoded records whose LSN is >= lsn_upto before this field * has been set. + * + * Set a valid LSN to request data changes. */ XLogRecPtr lsn_upto; +#define WORKER_RESPONSE_SNAPSHOT 0x1 +#define WORKER_RESPONSE_CHANGES 0x2 + /* Which kind of data is ready? */ + int response;; + /* Exit after closing the current file? */ bool done; /* The output is stored here. */ SharedFileSet sfs; - /* Can backend read the file contents? */ - bool sfs_valid; - /* Number of the last file exported by the worker. */ - int last_exported; + int last_exported_changes; + int last_exported_snapshot; /* Synchronize access to the fields above. */ slock_t mutex; @@ -214,26 +211,14 @@ typedef struct DecodingWorkerShared * the fileset name.) */ static inline void -DecodingWorkerFileName(char *fname, Oid relid, uint32 seq) +DecodingWorkerFileName(char *fname, Oid relid, uint32 seq, bool snapshot) { - snprintf(fname, MAXPGPATH, "%u-%u", relid, seq); + if (!snapshot) + snprintf(fname, MAXPGPATH, "%u-%u", relid, seq); + else + snprintf(fname, MAXPGPATH, "%u-%u-snapshot", relid, seq); } -/* - * Backend-local information to control the decoding worker. - */ -typedef struct DecodingWorker -{ - /* The worker. */ - BackgroundWorkerHandle *handle; - - /* DecodingWorkerShared is in this segment. */ - dsm_segment *seg; - - /* Handle of the error queue. */ - shm_mq_handle *error_mqh; -} DecodingWorker; - /* Pointer to currently running decoding worker. */ static DecodingWorker *decoding_worker = NULL; @@ -250,11 +235,11 @@ static void check_repack_concurrently_requirements(Relation rel); static void rebuild_relation(Relation OldHeap, Relation index, bool verbose, bool concurrent); static void copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, - Snapshot snapshot, bool verbose, bool *pSwapToastByContent, TransactionId *pFreezeXid, - MultiXactId *pCutoffMulti); + MultiXactId *pCutoffMulti, + ConcurrentChangeContext *ctx); static List *get_tables_to_repack(RepackCommand cmd, bool usingindex, MemoryContext permcxt); static List *get_tables_to_repack_partitioned(RepackCommand cmd, @@ -264,9 +249,12 @@ static bool cluster_is_permitted_for_relation(RepackCommand cmd, Oid relid, Oid userid); static LogicalDecodingContext *setup_logical_decoding(Oid relid); -static bool decode_concurrent_changes(LogicalDecodingContext *ctx, +static bool decode_concurrent_changes(LogicalDecodingContext *decoding_ctx, DecodingWorkerShared *shared); -static void apply_concurrent_changes(BufFile *file, ChangeDest *dest); +static void apply_concurrent_changes(ConcurrentChangeContext *ctx); +static void apply_concurrent_changes_file(ConcurrentChangeContext *ctx, + BufFile *file, + BlockNumber range_end); static void apply_concurrent_insert(Relation rel, HeapTuple tup, IndexInsertState *iistate, TupleTableSlot *index_slot); @@ -275,12 +263,14 @@ static void apply_concurrent_update(Relation rel, HeapTuple tup, IndexInsertState *iistate, TupleTableSlot *index_slot); static void apply_concurrent_delete(Relation rel, HeapTuple tup_target); -static HeapTuple find_target_tuple(Relation rel, ChangeDest *dest, +static bool is_tuple_in_block_range(HeapTuple tup, BlockNumber start, + BlockNumber end); +static HeapTuple find_target_tuple(Relation rel, + ConcurrentChangeContext *ctx, HeapTuple tup_key, TupleTableSlot *ident_slot); -static void process_concurrent_changes(XLogRecPtr end_of_wal, - ChangeDest *dest, - bool done); +static void repack_add_block_range(ConcurrentChangeContext *ctx, + BlockNumber end, char *fname); static IndexInsertState *get_index_insert_state(Relation relation, Oid ident_index_id, Relation *ident_index_p); @@ -291,7 +281,8 @@ static void cleanup_logical_decoding(LogicalDecodingContext *ctx); static void rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, Relation cl_index, TransactionId frozenXid, - MultiXactId cutoffMulti); + MultiXactId cutoffMulti, + ConcurrentChangeContext *ctx); static List *build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes); static Relation process_single_relation(RepackStmt *stmt, LOCKMODE lockmode, @@ -302,9 +293,8 @@ static Oid determine_clustered_index(Relation rel, bool usingindex, static void start_decoding_worker(Oid relid); static void stop_decoding_worker(void); static void repack_worker_internal(dsm_segment *seg); -static void export_initial_snapshot(Snapshot snapshot, - DecodingWorkerShared *shared); -static Snapshot get_initial_snapshot(DecodingWorker *worker); +static void export_snapshot(Snapshot snapshot, + DecodingWorkerShared *shared); static void ProcessRepackMessage(StringInfo msg); static const char *RepackCommandAsString(RepackCommand cmd); @@ -1008,6 +998,15 @@ check_repack_concurrently_requirements(Relation rel) RelationGetRelationName(rel)), (errhint("Relation \"%s\" has no identity index.", RelationGetRelationName(rel))))); + + /* + * In the CONCURRENTLY mode we don't want to use the same snapshot + * throughout the whole processing, as it could block the progress of xmin + * horizon. + */ + if (IsolationUsesXactSnapshot()) + ereport(ERROR, + (errmsg("REPACK (CONCURRENTLY) does not support transaction isolation higher than READ COMMITTED"))); } @@ -1038,7 +1037,7 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose, bool concurrent bool swap_toast_by_content; TransactionId frozenXid; MultiXactId cutoffMulti; - Snapshot snapshot = NULL; + ConcurrentChangeContext *ctx = NULL; #if USE_ASSERT_CHECKING LOCKMODE lmode; @@ -1050,6 +1049,13 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose, bool concurrent if (concurrent) { + /* + * This is only needed here to gather the data changes and range + * information during the copying. The fields needed to apply the + * changes be filled later. + */ + ctx = palloc0_object(ConcurrentChangeContext); + /* * The worker needs to be member of the locking group we're the leader * of. We ought to become the leader before the worker starts. The @@ -1075,13 +1081,7 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose, bool concurrent * REPACK CONCURRENTLY. */ start_decoding_worker(tableOid); - - /* - * Wait until the worker has the initial snapshot and retrieve it. - */ - snapshot = get_initial_snapshot(decoding_worker); - - PushActiveSnapshot(snapshot); + ctx->worker = decoding_worker; } /* for CLUSTER or REPACK USING INDEX, mark the index as the one to use */ @@ -1105,21 +1105,25 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose, bool concurrent NewHeap = table_open(OIDNewHeap, NoLock); /* Copy the heap data into the new table in the desired order */ - copy_table_data(NewHeap, OldHeap, index, snapshot, verbose, - &swap_toast_by_content, &frozenXid, &cutoffMulti); - - /* The historic snapshot won't be needed anymore. */ - if (snapshot) + if (concurrent) { - PopActiveSnapshot(); - UpdateActiveSnapshotCommandId(); + ctx->first_block = InvalidBlockNumber; + ctx->block_ranges = NIL; } + copy_table_data(NewHeap, OldHeap, index, verbose, &swap_toast_by_content, + &frozenXid, &cutoffMulti, ctx); if (concurrent) { + /* + * Make sure the active snapshot can see the data copied, so the rows + * can be updated / deleted. + */ + UpdateActiveSnapshotCommandId(); + Assert(!swap_toast_by_content); rebuild_relation_finish_concurrent(NewHeap, OldHeap, index, - frozenXid, cutoffMulti); + frozenXid, cutoffMulti, ctx); pgstat_progress_update_param(PROGRESS_REPACK_PHASE, PROGRESS_REPACK_PHASE_FINAL_CLEANUP); @@ -1283,9 +1287,6 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod, /* * Do the physical copying of table data. * - * 'snapshot' and 'decoding_ctx': see table_relation_copy_for_cluster(). Pass - * iff concurrent processing is required. - * * There are three output parameters: * *pSwapToastByContent is set true if toast tables must be swapped by content. * *pFreezeXid receives the TransactionId used as freeze cutoff point. @@ -1293,8 +1294,9 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod, */ static void copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, - Snapshot snapshot, bool verbose, bool *pSwapToastByContent, - TransactionId *pFreezeXid, MultiXactId *pCutoffMulti) + bool verbose, bool *pSwapToastByContent, + TransactionId *pFreezeXid, MultiXactId *pCutoffMulti, + ConcurrentChangeContext *ctx) { Relation relRelation; HeapTuple reltup; @@ -1311,7 +1313,7 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, int elevel = verbose ? INFO : DEBUG2; PGRUsage ru0; char *nspname; - bool concurrent = snapshot != NULL; + bool concurrent = ctx != NULL; LOCKMODE lmode; lmode = concurrent ? ShareUpdateExclusiveLock : AccessExclusiveLock; @@ -1423,8 +1425,18 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, * provided, else plain seqscan. */ if (OldIndex != NULL && OldIndex->rd_rel->relam == BTREE_AM_OID) - use_sort = plan_cluster_use_sort(RelationGetRelid(OldHeap), - RelationGetRelid(OldIndex)); + { + if (!concurrent) + use_sort = plan_cluster_use_sort(RelationGetRelid(OldHeap), + RelationGetRelid(OldIndex)); + else + + /* + * To use multiple snapshots, we need to process the table + * sequentially. + */ + use_sort = true; + } else use_sort = false; @@ -1453,11 +1465,11 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, * values (e.g. because the AM doesn't use freezing). */ table_relation_copy_for_cluster(OldHeap, NewHeap, OldIndex, use_sort, - cutoffs.OldestXmin, snapshot, + cutoffs.OldestXmin, &cutoffs.FreezeLimit, &cutoffs.MultiXactCutoff, &num_tuples, &tups_vacuumed, - &tups_recently_dead); + &tups_recently_dead, ctx); /* return selected values to caller, get set as relfrozenxid/minmxid */ *pFreezeXid = cutoffs.FreezeLimit; @@ -2610,6 +2622,7 @@ decode_concurrent_changes(LogicalDecodingContext *ctx, DecodingWorkerShared *shared) { RepackDecodingState *dstate; + bool snapshot_requested; XLogRecPtr lsn_upto; bool done; char fname[MAXPGPATH]; @@ -2617,11 +2630,14 @@ decode_concurrent_changes(LogicalDecodingContext *ctx, dstate = (RepackDecodingState *) ctx->output_writer_private; /* Open the output file. */ - DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1); + DecodingWorkerFileName(fname, shared->relid, + shared->last_exported_changes + 1, + false); dstate->file = BufFileCreateFileSet(&shared->sfs.fs, fname); SpinLockAcquire(&shared->mutex); lsn_upto = shared->lsn_upto; + snapshot_requested = shared->snapshot_requested; done = shared->done; SpinLockRelease(&shared->mutex); @@ -2689,6 +2705,7 @@ decode_concurrent_changes(LogicalDecodingContext *ctx, { SpinLockAcquire(&shared->mutex); lsn_upto = shared->lsn_upto; + snapshot_requested = shared->snapshot_requested; /* 'done' should be set at the same time as 'lsn_upto' */ done = shared->done; SpinLockRelease(&shared->mutex); @@ -2710,28 +2727,108 @@ decode_concurrent_changes(LogicalDecodingContext *ctx, */ BufFileClose(dstate->file); dstate->file = NULL; + + /* + * Before publishing the data changes, export the snapshot too if + * requested. Publishing both at once makes sense because both are needed + * at the same time, and it's simpler. + */ + if (snapshot_requested) + { + Snapshot snapshot; + + snapshot = SnapBuildSnapshotForRepack(ctx->snapshot_builder); + export_snapshot(snapshot, shared); + + /* + * Adjust the replication slot's xmin so that VACUUM can do more work. + */ + LogicalIncreaseXminForSlot(InvalidXLogRecPtr, snapshot->xmin, false); + FreeSnapshot(snapshot); + } + else + { + /* + * If data changes were requested but no following snapshot, we don't + * care about xmin horizon because the heap copying should be done by + * now. + */ + LogicalIncreaseXminForSlot(InvalidXLogRecPtr, InvalidTransactionId, + false); + + } + + /* Now announce that the output is available. */ SpinLockAcquire(&shared->mutex); shared->lsn_upto = InvalidXLogRecPtr; - shared->sfs_valid = true; - shared->last_exported++; + shared->response |= WORKER_RESPONSE_CHANGES; + shared->last_exported_changes++; + if (snapshot_requested) + { + shared->snapshot_requested = false; + shared->response |= WORKER_RESPONSE_SNAPSHOT; + shared->last_exported_snapshot++; + } SpinLockRelease(&shared->mutex); + ConditionVariableSignal(&shared->cv); return done; } /* - * Apply changes stored in 'file'. + * Apply all concurrent changes. */ static void -apply_concurrent_changes(BufFile *file, ChangeDest *dest) +apply_concurrent_changes(ConcurrentChangeContext *ctx) +{ + DecodingWorkerShared *shared; + ListCell *lc; + + shared = (DecodingWorkerShared *) dsm_segment_address(decoding_worker->seg); + + foreach(lc, ctx->block_ranges) + { + RepackApplyRange *range; + BufFile *file; + + range = (RepackApplyRange *) lfirst(lc); + + file = BufFileOpenFileSet(&shared->sfs.fs, range->fname, O_RDONLY, + false); + + /* + * If range end is valid, the start should be as well. + */ + Assert(!BlockNumberIsValid(range->end) || + BlockNumberIsValid(ctx->first_block)); + + apply_concurrent_changes_file(ctx, file, range->end); + BufFileClose(file); + + pfree(range->fname); + pfree(range); + } + + /* Get ready for the next decoding. */ + ctx->block_ranges = NIL; + ctx->first_block = InvalidBlockNumber; +} + +/* + * Apply concurrent changes stored in 'file'. + */ +static void +apply_concurrent_changes_file(ConcurrentChangeContext *ctx, BufFile *file, + BlockNumber range_end) { char kind; uint32 t_len; - Relation rel = dest->rel; + Relation rel = ctx->rel; TupleTableSlot *index_slot, *ident_slot; HeapTuple tup_old = NULL; + bool check_range = BlockNumberIsValid(range_end); /* TupleTableSlot is needed to pass the tuple to ExecInsertIndexTuples(). */ index_slot = MakeSingleTupleTableSlot(RelationGetDescr(rel), @@ -2759,8 +2856,8 @@ apply_concurrent_changes(BufFile *file, ChangeDest *dest) tup->t_data = (HeapTupleHeader) ((char *) tup + HEAPTUPLESIZE); BufFileReadExact(file, tup->t_data, t_len); tup->t_len = t_len; - ItemPointerSetInvalid(&tup->t_self); - tup->t_tableOid = RelationGetRelid(dest->rel); + tup->t_tableOid = RelationGetRelid(ctx->rel); + BufFileReadExact(file, &tup->t_self, sizeof(tup->t_self)); if (kind == CHANGE_UPDATE_OLD) { @@ -2771,7 +2868,10 @@ apply_concurrent_changes(BufFile *file, ChangeDest *dest) { Assert(tup_old == NULL); - apply_concurrent_insert(rel, tup, dest->iistate, index_slot); + if (!check_range || + is_tuple_in_block_range(tup, ctx->first_block, range_end)) + apply_concurrent_insert(rel, tup, ctx->iistate, + index_slot); pfree(tup); } @@ -2792,16 +2892,52 @@ apply_concurrent_changes(BufFile *file, ChangeDest *dest) /* * Find the tuple to be updated or deleted. */ - tup_exist = find_target_tuple(rel, dest, tup_key, ident_slot); - if (tup_exist == NULL) - elog(ERROR, "failed to find target tuple"); + if (!check_range || + (is_tuple_in_block_range(tup_key, ctx->first_block, + range_end))) + { + /* The change needs to be applied to this tuple. */ + tup_exist = find_target_tuple(rel, ctx, tup_key, ident_slot); + if (tup_exist == NULL) + elog(ERROR, "failed to find target tuple"); - if (kind == CHANGE_UPDATE_NEW) - apply_concurrent_update(rel, tup, tup_exist, dest->iistate, - index_slot); + if (kind == CHANGE_DELETE) + apply_concurrent_delete(rel, tup_exist); + else + { + /* UPDATE */ + if (!check_range || tup == tup_key || + is_tuple_in_block_range(tup, ctx->first_block, + range_end)) + /* The new tuple is in the same range. */ + apply_concurrent_update(rel, tup, tup_exist, + ctx->iistate, index_slot); + else + + /* + * The new key is in the other range, so only delete + * it from the current one. The new version should be + * visible to the snapshot that we'll use to copy the + * other block. + */ + apply_concurrent_delete(rel, tup_exist); + } + } else - apply_concurrent_delete(rel, tup_exist); - + { + /* + * The change belongs to another range, so we don't need to + * bother with the old tuple: the snapshot used for the other + * range won't see it, so it won't be copied. However, the new + * tuple still may need to go to the range we are checking. In + * that case, simply insert it there. + */ + if (kind == CHANGE_UPDATE_NEW && tup != tup_key && + is_tuple_in_block_range(tup, ctx->first_block, + range_end)) + apply_concurrent_insert(rel, tup, ctx->iistate, + index_slot); + } if (tup_old != NULL) { pfree(tup_old); @@ -2940,6 +3076,33 @@ apply_concurrent_delete(Relation rel, HeapTuple tup_target) pgstat_progress_incr_param(PROGRESS_REPACK_HEAP_TUPLES_DELETED, 1); } +/* + * Check if tuple originates from given range of blocks that have already been + * copied. + */ +static bool +is_tuple_in_block_range(HeapTuple tup, BlockNumber start, BlockNumber end) +{ + BlockNumber blknum; + + Assert(BlockNumberIsValid(start) && BlockNumberIsValid(end)); + + blknum = ItemPointerGetBlockNumber(&tup->t_self); + Assert(BlockNumberIsValid(blknum)); + + if (start < end) + { + return blknum >= start && blknum < end; + } + else + { + /* Has the scan position wrapped around? */ + Assert(start > end); + + return blknum >= start || blknum < end; + } +} + /* * Find the tuple to be updated or deleted. * @@ -2949,10 +3112,10 @@ apply_concurrent_delete(Relation rel, HeapTuple tup_target) * it when he no longer needs the tuple returned. */ static HeapTuple -find_target_tuple(Relation rel, ChangeDest *dest, HeapTuple tup_key, - TupleTableSlot *ident_slot) +find_target_tuple(Relation rel, ConcurrentChangeContext *ctx, + HeapTuple tup_key, TupleTableSlot *ident_slot) { - Relation ident_index = dest->ident_index; + Relation ident_index = ctx->ident_index; IndexScanDesc scan; Form_pg_index ident_form; int2vector *ident_indkey; @@ -2960,14 +3123,14 @@ find_target_tuple(Relation rel, ChangeDest *dest, HeapTuple tup_key, /* XXX no instrumentation for now */ scan = index_beginscan(rel, ident_index, GetActiveSnapshot(), - NULL, dest->ident_key_nentries, 0); + NULL, ctx->ident_key_nentries, 0); /* * Scan key is passed by caller, so it does not have to be constructed * multiple times. Key entries have all fields initialized, except for * sk_argument. */ - index_rescan(scan, dest->ident_key, dest->ident_key_nentries, NULL, 0); + index_rescan(scan, ctx->ident_key, ctx->ident_key_nentries, NULL, 0); /* Info needed to retrieve key values from heap tuple. */ ident_form = ident_index->rd_index; @@ -3002,15 +3165,22 @@ find_target_tuple(Relation rel, ChangeDest *dest, HeapTuple tup_key, } /* - * Decode and apply concurrent changes, up to (and including) the record whose - * LSN is 'end_of_wal'. + * Get concurrent changes, up to (and including) the record whose LSN is + * 'end_of_wal', from the decoding worker. If 'range_end' is a valid block + * number, the changes should only be applied to blocks greater than or equal + * to ctx->first_block and lower than range_end. + * + * If 'request_snapshot' is true, the snapshot built at LSN following the last + * data change needs to be exported too. */ -static void -process_concurrent_changes(XLogRecPtr end_of_wal, ChangeDest *dest, bool done) +extern void +repack_get_concurrent_changes(ConcurrentChangeContext *ctx, + XLogRecPtr end_of_wal, + BlockNumber range_end, + bool request_snapshot, bool done) { DecodingWorkerShared *shared; char fname[MAXPGPATH]; - BufFile *file; pgstat_progress_update_param(PROGRESS_REPACK_PHASE, PROGRESS_REPACK_PHASE_CATCH_UP); @@ -3019,6 +3189,8 @@ process_concurrent_changes(XLogRecPtr end_of_wal, ChangeDest *dest, bool done) shared = (DecodingWorkerShared *) dsm_segment_address(decoding_worker->seg); SpinLockAcquire(&shared->mutex); shared->lsn_upto = end_of_wal; + Assert(!shared->snapshot_requested); + shared->snapshot_requested = request_snapshot; shared->done = done; SpinLockRelease(&shared->mutex); @@ -3030,32 +3202,49 @@ process_concurrent_changes(XLogRecPtr end_of_wal, ChangeDest *dest, bool done) ConditionVariablePrepareToSleep(&shared->cv); for (;;) { - bool valid; + int response; SpinLockAcquire(&shared->mutex); - valid = shared->sfs_valid; + response = shared->response; SpinLockRelease(&shared->mutex); - if (valid) + if (response & WORKER_RESPONSE_CHANGES) break; ConditionVariableSleep(&shared->cv, WAIT_EVENT_REPACK_WORKER_EXPORT); } ConditionVariableCancelSleep(); - /* Open the file. */ - DecodingWorkerFileName(fname, shared->relid, shared->last_exported); - file = BufFileOpenFileSet(&shared->sfs.fs, fname, O_RDONLY, false); - apply_concurrent_changes(file, dest); + /* + * Remember the file name so we can apply the changes when appropriate. + * One particular reason to postpone the replay is that indexes haven't + * been built yet on the new heap. + */ + DecodingWorkerFileName(fname, shared->relid, + shared->last_exported_changes, + false); + repack_add_block_range(ctx, range_end, fname); /* No file is exported until the worker exports the next one. */ SpinLockAcquire(&shared->mutex); - shared->sfs_valid = false; + shared->response &= ~WORKER_RESPONSE_CHANGES; + Assert(XLogRecPtrIsInvalid(shared->lsn_upto)); SpinLockRelease(&shared->mutex); +} - BufFileClose(file); +static void +repack_add_block_range(ConcurrentChangeContext *ctx, BlockNumber end, + char *fname) +{ + RepackApplyRange *range; + + range = palloc_object(RepackApplyRange); + range->end = end; + range->fname = pstrdup(fname); + ctx->block_ranges = lappend(ctx->block_ranges, range); } + /* * Initialize IndexInsertState for index specified by ident_index_id. * @@ -3198,7 +3387,8 @@ static void rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, Relation cl_index, TransactionId frozenXid, - MultiXactId cutoffMulti) + MultiXactId cutoffMulti, + ConcurrentChangeContext *ctx) { LOCKMODE lockmode_old PG_USED_FOR_ASSERTS_ONLY; List *ind_oids_new; @@ -3217,7 +3407,6 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, Relation *ind_refs, *ind_refs_p; int nind; - ChangeDest chgdst; /* Like in cluster_rel(). */ lockmode_old = ShareUpdateExclusiveLock; @@ -3274,11 +3463,18 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, (errmsg("identity index missing on the new relation"))); /* Gather information to apply concurrent changes. */ - chgdst.rel = NewHeap; - chgdst.iistate = get_index_insert_state(NewHeap, ident_idx_new, - &chgdst.ident_index); - chgdst.ident_key = build_identity_key(ident_idx_new, OldHeap, - &chgdst.ident_key_nentries); + ctx->rel = NewHeap; + ctx->iistate = get_index_insert_state(NewHeap, ident_idx_new, + &ctx->ident_index); + ctx->ident_key = build_identity_key(ident_idx_new, OldHeap, + &ctx->ident_key_nentries); + + /* + * Replay the concurrent data changes gathered during heap copying. This + * had to wait until after the index build because the identity index is + * needed to apply UPDATE and DELETE changes. + */ + apply_concurrent_changes(ctx); /* * During testing, wait for another backend to perform concurrent data @@ -3296,11 +3492,13 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, end_of_wal = GetFlushRecPtr(NULL); /* - * Apply concurrent changes first time, to minimize the time we need to - * hold AccessExclusiveLock. (Quite some amount of WAL could have been + * Decode and apply concurrent changes again, to minimize the time we need + * to hold AccessExclusiveLock. (Quite some amount of WAL could have been * written during the data copying and index creation.) */ - process_concurrent_changes(end_of_wal, &chgdst, false); + repack_get_concurrent_changes(ctx, end_of_wal, InvalidBlockNumber, false, + false); + apply_concurrent_changes(ctx); /* * Acquire AccessExclusiveLock on the table, its TOAST relation (if there @@ -3397,10 +3595,13 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, end_of_wal = GetFlushRecPtr(NULL); /* - * Apply the concurrent changes again. Indicate that the decoding worker - * won't be needed anymore. + * Decode and apply the concurrent changes again. Indicate that the + * decoding worker won't be needed anymore. */ - process_concurrent_changes(end_of_wal, &chgdst, true); + repack_get_concurrent_changes(ctx, end_of_wal, InvalidBlockNumber, false, + true); + apply_concurrent_changes(ctx); + /* Remember info about rel before closing OldHeap */ relpersistence = OldHeap->rd_rel->relpersistence; @@ -3451,8 +3652,8 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, table_close(NewHeap, NoLock); /* Cleanup what we don't need anymore. (And close the identity index.) */ - pfree(chgdst.ident_key); - free_index_insert_state(chgdst.iistate); + pfree(ctx->ident_key); + free_index_insert_state(ctx->iistate); /* * Swap the relations and their TOAST relations and TOAST indexes. This @@ -3495,6 +3696,23 @@ build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes) char *newName; Relation ind; + /* + * Try to reduce the impact on VACUUM. + * + * The individual builds might still be a problem, but that's a + * separate issue. + * + * TODO Can we somehow use the fact that the new heap is not yet + * visible to other transaction, and thus cannot be vacuumed? Perhaps + * by preventing snapshots from setting MyProc->xmin temporarily. (All + * the snapshots that might have participated in the build, including + * the catalog snapshots, must not be used for other tables of + * course.) + */ + PopActiveSnapshot(); + InvalidateCatalogSnapshot(); + PushActiveSnapshot(GetTransactionSnapshot()); + ind_oid = lfirst_oid(lc); ind = index_open(ind_oid, ShareUpdateExclusiveLock); @@ -3534,11 +3752,15 @@ start_decoding_worker(Oid relid) BUFFERALIGN(REPACK_ERROR_QUEUE_SIZE); seg = dsm_create(size, 0); shared = (DecodingWorkerShared *) dsm_segment_address(seg); + shared->initialized = false; shared->lsn_upto = InvalidXLogRecPtr; shared->done = false; + /* Snapshot is the first thing we need from the worker. */ + shared->snapshot_requested = true; + shared->response = 0; SharedFileSetInit(&shared->sfs, seg); - shared->sfs_valid = false; - shared->last_exported = -1; + shared->last_exported_changes = -1; + shared->last_exported_snapshot = -1; SpinLockInit(&shared->mutex); shared->dbid = MyDatabaseId; @@ -3747,7 +3969,10 @@ repack_worker_internal(dsm_segment *seg) */ SpinLockAcquire(&shared->mutex); Assert(XLogRecPtrIsInvalid(shared->lsn_upto)); - Assert(!shared->sfs_valid); + Assert(shared->response == 0); + /* Initially we're expected to provide a snapshot and only that. */ + Assert(shared->snapshot_requested && + XLogRecPtrIsInvalid(shared->lsn_upto)); sfs = &shared->sfs; SpinLockRelease(&shared->mutex); @@ -3765,8 +3990,23 @@ repack_worker_internal(dsm_segment *seg) ConditionVariableSignal(&shared->cv); /* Build the initial snapshot and export it. */ - snapshot = SnapBuildInitialSnapshotForRepack(decoding_ctx->snapshot_builder); - export_initial_snapshot(snapshot, shared); + snapshot = SnapBuildSnapshotForRepack(decoding_ctx->snapshot_builder); + export_snapshot(snapshot, shared); + + /* + * Adjust the replication slot's xmin so that VACUUM can do more work. + */ + LogicalIncreaseXminForSlot(InvalidXLogRecPtr, snapshot->xmin, false); + FreeSnapshot(snapshot); + + /* Tell the backend that the file is available. */ + SpinLockAcquire(&shared->mutex); + Assert(shared->snapshot_requested); + shared->snapshot_requested = false; + shared->response |= WORKER_RESPONSE_SNAPSHOT; + shared->last_exported_snapshot++; + SpinLockRelease(&shared->mutex); + ConditionVariableSignal(&shared->cv); /* * Only historic snapshots should be used now. Do not let us restrict the @@ -3786,7 +4026,7 @@ repack_worker_internal(dsm_segment *seg) * Make snapshot available to the backend that launched the decoding worker. */ static void -export_initial_snapshot(Snapshot snapshot, DecodingWorkerShared *shared) +export_snapshot(Snapshot snapshot, DecodingWorkerShared *shared) { char fname[MAXPGPATH]; BufFile *file; @@ -3796,29 +4036,23 @@ export_initial_snapshot(Snapshot snapshot, DecodingWorkerShared *shared) snap_size = EstimateSnapshotSpace(snapshot); snap_space = (char *) palloc(snap_size); SerializeSnapshot(snapshot, snap_space); - FreeSnapshot(snapshot); - DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1); + DecodingWorkerFileName(fname, shared->relid, + shared->last_exported_snapshot + 1, + true); file = BufFileCreateFileSet(&shared->sfs.fs, fname); /* To make restoration easier, write the snapshot size first. */ BufFileWrite(file, &snap_size, sizeof(snap_size)); BufFileWrite(file, snap_space, snap_size); pfree(snap_space); BufFileClose(file); - - /* Tell the backend that the file is available. */ - SpinLockAcquire(&shared->mutex); - shared->sfs_valid = true; - shared->last_exported++; - SpinLockRelease(&shared->mutex); - ConditionVariableSignal(&shared->cv); } /* - * Get the initial snapshot from the decoding worker. + * Get snapshot from the decoding worker. */ -static Snapshot -get_initial_snapshot(DecodingWorker *worker) +extern Snapshot +repack_get_snapshot(ConcurrentChangeContext *ctx) { DecodingWorkerShared *shared; char fname[MAXPGPATH]; @@ -3826,24 +4060,26 @@ get_initial_snapshot(DecodingWorker *worker) Size snap_size; char *snap_space; Snapshot snapshot; + DecodingWorker *worker = ctx->worker; shared = (DecodingWorkerShared *) dsm_segment_address(worker->seg); /* - * The worker needs to initialize the logical decoding, which usually - * takes some time. Therefore it makes sense to prepare for the sleep - * first. + * For the first snapshot request, the worker needs to initialize the + * logical decoding, which usually takes some time. Therefore it makes + * sense to prepare for the sleep first. Does it make sense to skip the + * preparation on the next requests? */ ConditionVariablePrepareToSleep(&shared->cv); for (;;) { - bool valid; + int response; SpinLockAcquire(&shared->mutex); - valid = shared->sfs_valid; + response = shared->response; SpinLockRelease(&shared->mutex); - if (valid) + if (response & WORKER_RESPONSE_SNAPSHOT) break; ConditionVariableSleep(&shared->cv, WAIT_EVENT_REPACK_WORKER_EXPORT); @@ -3851,7 +4087,9 @@ get_initial_snapshot(DecodingWorker *worker) ConditionVariableCancelSleep(); /* Read the snapshot from a file. */ - DecodingWorkerFileName(fname, shared->relid, shared->last_exported); + DecodingWorkerFileName(fname, shared->relid, + shared->last_exported_snapshot, + true); file = BufFileOpenFileSet(&shared->sfs.fs, fname, O_RDONLY, false); BufFileReadExact(file, &snap_size, sizeof(snap_size)); snap_space = (char *) palloc(snap_size); @@ -3859,7 +4097,8 @@ get_initial_snapshot(DecodingWorker *worker) BufFileClose(file); SpinLockAcquire(&shared->mutex); - shared->sfs_valid = false; + shared->response &= ~WORKER_RESPONSE_SNAPSHOT; + Assert(!shared->snapshot_requested); SpinLockRelease(&shared->mutex); /* Restore it. */ diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index a956892f42f..c8bc85d8bcc 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -1003,6 +1003,7 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) xl_heap_insert *xlrec; ReorderBufferChange *change; RelFileLocator target_locator; + BlockNumber blknum; xlrec = (xl_heap_insert *) XLogRecGetData(r); @@ -1014,7 +1015,7 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) return; /* only interested in our database */ - XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL); + XLogRecGetBlockTag(r, 0, &target_locator, NULL, &blknum); if (target_locator.dbOid != ctx->slot->data.database) return; @@ -1039,6 +1040,15 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple); + /* + * REPACK (CONCURRENTLY) needs block number to check if the corresponding + * part of the table was already copied. + */ + if (OidIsValid(repacked_rel_locator.relNumber)) + /* offnum is not really needed, but let's set valid pointer. */ + ItemPointerSet(&change->data.tp.newtuple->t_self, blknum, + xlrec->offnum); + change->data.tp.clear_toast_afterwards = true; ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, @@ -1060,11 +1070,12 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ReorderBufferChange *change; char *data; RelFileLocator target_locator; + BlockNumber new_blknum; xlrec = (xl_heap_update *) XLogRecGetData(r); /* only interested in our database */ - XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL); + XLogRecGetBlockTag(r, 0, &target_locator, NULL, &new_blknum); if (target_locator.dbOid != ctx->slot->data.database) return; @@ -1090,12 +1101,27 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen); DecodeXLogTuple(data, datalen, change->data.tp.newtuple); + + /* + * REPACK (CONCURRENTLY) needs block number to check if the + * corresponding part of the table was already copied. + */ + if (OidIsValid(repacked_rel_locator.relNumber)) + /* offnum is not really needed, but let's set valid pointer. */ + ItemPointerSet(&change->data.tp.newtuple->t_self, + new_blknum, xlrec->new_offnum); } if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD) { Size datalen; Size tuplelen; + BlockNumber old_blknum; + + if (XLogRecHasBlockRef(r, 1)) + XLogRecGetBlockTag(r, 1, NULL, NULL, &old_blknum); + else + XLogRecGetBlockTag(r, 0, NULL, NULL, &old_blknum); /* caution, remaining data in record is not aligned */ data = XLogRecGetData(r) + SizeOfHeapUpdate; @@ -1106,6 +1132,11 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen); DecodeXLogTuple(data, datalen, change->data.tp.oldtuple); + /* See above. */ + if (OidIsValid(repacked_rel_locator.relNumber)) + ItemPointerSet(&change->data.tp.oldtuple->t_self, + old_blknum, xlrec->old_offnum); + } change->data.tp.clear_toast_afterwards = true; @@ -1126,11 +1157,12 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) xl_heap_delete *xlrec; ReorderBufferChange *change; RelFileLocator target_locator; + BlockNumber blknum; xlrec = (xl_heap_delete *) XLogRecGetData(r); /* only interested in our database */ - XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL); + XLogRecGetBlockTag(r, 0, &target_locator, NULL, &blknum); if (target_locator.dbOid != ctx->slot->data.database) return; @@ -1162,6 +1194,15 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete, datalen, change->data.tp.oldtuple); + + /* + * REPACK (CONCURRENTLY) needs block number to check if the + * corresponding part of the table was already copied. + */ + if (OidIsValid(repacked_rel_locator.relNumber)) + /* offnum is not really needed, but let's set valid pointer. */ + ItemPointerSet(&change->data.tp.oldtuple->t_self, blknum, + xlrec->offnum); } change->data.tp.clear_toast_afterwards = true; diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index b3fd7fec392..1e445704a1b 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1670,14 +1670,17 @@ update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, /* * Set the required catalog xmin horizon for historic snapshots in the current - * replication slot. + * replication slot if catalog is TRUE, or xmin if catalog is FALSE. * * Note that in the most cases, we won't be able to immediately use the xmin * to increase the xmin horizon: we need to wait till the client has confirmed - * receiving current_lsn with LogicalConfirmReceivedLocation(). + * receiving current_lsn with LogicalConfirmReceivedLocation(). However, + * catalog=FALSE is only allowed for temporary replication slots, so the + * horizon is applied immediately. */ void -LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin) +LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin, + bool catalog) { bool updated_xmin = false; ReplicationSlot *slot; @@ -1688,6 +1691,27 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin) Assert(slot != NULL); SpinLockAcquire(&slot->mutex); + if (!catalog) + { + /* + * The non-catalog horizon can only advance in temporary slots, so + * update it in the shared memory immediately (w/o requiring prior + * saving to disk). + */ + Assert(slot->data.persistency == RS_TEMPORARY); + + /* + * The horizon must not go backwards, however it's o.k. to become + * invalid. + */ + Assert(!TransactionIdIsValid(slot->effective_xmin) || + !TransactionIdIsValid(xmin) || + TransactionIdFollowsOrEquals(xmin, slot->effective_xmin)); + + slot->effective_xmin = xmin; + SpinLockRelease(&slot->mutex); + return; + } /* * don't overwrite if we already have a newer xmin. This can happen if we diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index f18c6fb52b5..273f65d6cc8 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -3734,6 +3734,56 @@ ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid) return rbtxn_has_catalog_changes(txn); } +/* + * Check if a transaction (or its subtransaction) contains a heap change. + */ +bool +ReorderBufferXidHasHeapChanges(ReorderBuffer *rb, TransactionId xid) +{ + ReorderBufferTXN *txn; + dlist_iter iter; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + if (txn == NULL) + return false; + + dlist_foreach(iter, &txn->changes) + { + ReorderBufferChange *change; + + change = dlist_container(ReorderBufferChange, node, iter.cur); + + switch (change->action) + { + case REORDER_BUFFER_CHANGE_INSERT: + case REORDER_BUFFER_CHANGE_UPDATE: + case REORDER_BUFFER_CHANGE_DELETE: + return true; + default: + break; + } + } + + /* Check subtransactions. */ + + /* + * TODO Verify that subtransactions must be assigned to the top-level + * transactions by now. + */ + dlist_foreach(iter, &txn->subtxns) + { + ReorderBufferTXN *subtxn; + + subtxn = dlist_container(ReorderBufferTXN, node, iter.cur); + + if (ReorderBufferXidHasHeapChanges(rb, subtxn->xid)) + return true; + } + + return false; +} + /* * ReorderBufferXidHasBaseSnapshot * Have we already set the base snapshot for the given txn/subtxn? diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 7643dfe31bb..4bc6cd22496 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -128,6 +128,7 @@ #include "access/heapam_xlog.h" #include "access/transam.h" #include "access/xact.h" +#include "commands/cluster.h" #include "common/file_utils.h" #include "miscadmin.h" #include "pgstat.h" @@ -496,7 +497,7 @@ SnapBuildInitialSnapshot(SnapBuild *builder) * we do not set MyProc->xmin). XXX Do we yet need to add some restrictions? */ Snapshot -SnapBuildInitialSnapshotForRepack(SnapBuild *builder) +SnapBuildSnapshotForRepack(SnapBuild *builder) { Snapshot snap; @@ -1035,6 +1036,28 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, } } + /* + * Is REPACKED (CONCURRENTLY) is being run by this backend? + */ + else if (OidIsValid(repacked_rel_locator.relNumber)) + { + Assert(builder->building_full_snapshot); + + /* + * In this special mode, heap changes of other relations should not be + * decoded at all - see heap_decode(). Thus if we find a single heap + * change in this transaction (or its subtransaction), we know that + * this transaction changes the relation being repacked. + */ + if (ReorderBufferXidHasHeapChanges(builder->reorder, xid)) + + /* + * Record the commit so we can build snapshots for the relation + * being repacked. + */ + needs_timetravel = true; + } + for (nxact = 0; nxact < nsubxacts; nxact++) { TransactionId subxid = subxacts[nxact]; @@ -1240,7 +1263,7 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact xmin = running->oldestRunningXid; elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u", builder->xmin, builder->xmax, running->oldestRunningXid, xmin); - LogicalIncreaseXminForSlot(lsn, xmin); + LogicalIncreaseXminForSlot(lsn, xmin, true); /* * Also tell the slot where we can restart decoding from. We don't want to diff --git a/src/backend/replication/pgoutput_repack/pgoutput_repack.c b/src/backend/replication/pgoutput_repack/pgoutput_repack.c index fb9956d392d..be1c3ec9626 100644 --- a/src/backend/replication/pgoutput_repack/pgoutput_repack.c +++ b/src/backend/replication/pgoutput_repack/pgoutput_repack.c @@ -195,6 +195,8 @@ store_change(LogicalDecodingContext *ctx, ConcurrentChangeKind kind, BufFileWrite(dstate->file, &tuple->t_len, sizeof(tuple->t_len)); /* ... and the tuple itself. */ BufFileWrite(dstate->file, tuple->t_data, tuple->t_len); + /* CTID is needed as well, to check block ranges. */ + BufFileWrite(dstate->file, &tuple->t_self, sizeof(tuple->t_self)); /* Free the flat copy if created above. */ if (flattened) diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index 3b9d8349078..a82d284c44c 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -2415,6 +2415,16 @@ boot_val => 'true', }, +# TODO Tune boot_val, 1024 is probably too low. +{ name => 'repack_snapshot_after', type => 'int', context => 'PGC_USERSET', group => 'DEVELOPER_OPTIONS', + short_desc => 'Number of pages after which REPACK (CONCURRENTLY) builds a new snapshot.', + flags => 'GUC_UNIT_BLOCKS | GUC_NOT_IN_SAMPLE', + variable => 'repack_blocks_per_snapshot', + boot_val => '1024', + min => '1', + max => 'INT_MAX', +} + { name => 'reserved_connections', type => 'int', context => 'PGC_POSTMASTER', group => 'CONN_AUTH_SETTINGS', short_desc => 'Sets the number of connection slots reserved for roles with privileges of pg_use_reserved_connections.', variable => 'ReservedConnections', diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index f87b558c2c6..07f7cfdd6cc 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -42,6 +42,7 @@ #include "catalog/namespace.h" #include "catalog/storage.h" #include "commands/async.h" +#include "commands/cluster.h" #include "commands/extension.h" #include "commands/event_trigger.h" #include "commands/tablespace.h" diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 321c00682ec..6fe3fc760bc 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -629,12 +629,12 @@ typedef struct TableAmRoutine Relation OldIndex, bool use_sort, TransactionId OldestXmin, - Snapshot snapshot, TransactionId *xid_cutoff, MultiXactId *multi_cutoff, double *num_tuples, double *tups_vacuumed, - double *tups_recently_dead); + double *tups_recently_dead, + void *tableam_data); /* * React to VACUUM command on the relation. The VACUUM can be triggered by @@ -1647,8 +1647,6 @@ table_relation_copy_data(Relation rel, const RelFileLocator *newrlocator) * not needed for the relation's AM * - *xid_cutoff - ditto * - *multi_cutoff - ditto - * - snapshot - if != NULL, ignore data changes done by transactions that this - * (MVCC) snapshot considers still in-progress or in the future. * * Output parameters: * - *xid_cutoff - rel's new relfrozenxid value, may be invalid @@ -1661,19 +1659,19 @@ table_relation_copy_for_cluster(Relation OldTable, Relation NewTable, Relation OldIndex, bool use_sort, TransactionId OldestXmin, - Snapshot snapshot, TransactionId *xid_cutoff, MultiXactId *multi_cutoff, double *num_tuples, double *tups_vacuumed, - double *tups_recently_dead) + double *tups_recently_dead, + void *tableam_data) { OldTable->rd_tableam->relation_copy_for_cluster(OldTable, NewTable, OldIndex, use_sort, OldestXmin, - snapshot, xid_cutoff, multi_cutoff, num_tuples, tups_vacuumed, - tups_recently_dead); + tups_recently_dead, + tableam_data); } /* diff --git a/src/include/commands/cluster.h b/src/include/commands/cluster.h index 0ac70ec30d7..2b61dce92dc 100644 --- a/src/include/commands/cluster.h +++ b/src/include/commands/cluster.h @@ -16,10 +16,12 @@ #include "nodes/execnodes.h" #include "nodes/parsenodes.h" #include "parser/parse_node.h" +#include "postmaster/bgworker.h" #include "replication/logical.h" #include "storage/buffile.h" #include "storage/lock.h" #include "storage/relfilelocator.h" +#include "storage/shm_mq.h" #include "utils/relcache.h" #include "utils/resowner.h" #include "utils/tuplestore.h" @@ -47,6 +49,63 @@ typedef struct ClusterParams extern RelFileLocator repacked_rel_locator; extern RelFileLocator repacked_rel_toast_locator; +extern PGDLLIMPORT int repack_blocks_per_snapshot; + +/* + * Everything we need to call ExecInsertIndexTuples(). + */ +typedef struct IndexInsertState +{ + ResultRelInfo *rri; + EState *estate; +} IndexInsertState; + +/* + * Backend-local information to control the decoding worker. + */ +typedef struct DecodingWorker +{ + /* The worker. */ + BackgroundWorkerHandle *handle; + + /* DecodingWorkerShared is in this segment. */ + dsm_segment *seg; + + /* Handle of the error queue. */ + shm_mq_handle *error_mqh; +} DecodingWorker; + +/* + * Information needed to handle concurrent data changes. + */ +typedef struct ConcurrentChangeContext +{ + /* The relation the changes are applied to. */ + Relation rel; + + /* + * Background worker performing logical decoding of concurrent data + * changes. + */ + DecodingWorker *worker; + + /* + * The following is needed to find the existing tuple if the change is + * UPDATE or DELETE. 'ident_key' should have all the fields except for + * 'sk_argument' initialized. + */ + Relation ident_index; + ScanKey ident_key; + int ident_key_nentries; + + /* Needed to update indexes of rel_dst. */ + IndexInsertState *iistate; + + /* The first block of the scan used to copy the heap. */ + BlockNumber first_block; + /* List of RepackApplyRange objects. */ + List *block_ranges; +} ConcurrentChangeContext; /* * Stored as a single byte in the output file. @@ -102,6 +161,12 @@ extern void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, TransactionId frozenXid, MultiXactId cutoffMulti, char newrelpersistence); +extern void repack_get_concurrent_changes(struct ConcurrentChangeContext *ctx, + XLogRecPtr end_of_wal, + BlockNumber range_end, + bool request_snapshot, + bool done); +extern Snapshot repack_get_snapshot(struct ConcurrentChangeContext *ctx); extern void RepackWorkerMain(Datum main_arg); #endif /* CLUSTER_H */ diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 2e562bee5a9..9924f706f20 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -137,7 +137,7 @@ extern bool DecodingContextReady(LogicalDecodingContext *ctx); extern void FreeDecodingContext(LogicalDecodingContext *ctx); extern void LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, - TransactionId xmin); + TransactionId xmin, bool catalog); extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn); extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn); diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 3cbe106a3c7..b6b739f29f4 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -763,6 +763,7 @@ extern void ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRe extern void ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn); extern bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid); +extern bool ReorderBufferXidHasHeapChanges(ReorderBuffer *rb, TransactionId xid); extern bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid); extern bool ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid, diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index 802fc4b0823..d1f9037fcfb 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -73,7 +73,7 @@ extern void FreeSnapshotBuilder(SnapBuild *builder); extern void SnapBuildSnapDecRefcount(Snapshot snap); extern Snapshot SnapBuildInitialSnapshot(SnapBuild *builder); -extern Snapshot SnapBuildInitialSnapshotForRepack(SnapBuild *builder); +extern Snapshot SnapBuildSnapshotForRepack(SnapBuild *builder); extern Snapshot SnapBuildMVCCFromHistoric(Snapshot snapshot, bool in_place); extern const char *SnapBuildExportSnapshot(SnapBuild *builder); extern void SnapBuildClearExportedSnapshot(void); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 35344910f65..54115135732 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -411,7 +411,6 @@ CatCacheHeader CatalogId CatalogIdMapEntry CatalogIndexState -ChangeDest ChangeVarNodes_callback ChangeVarNodes_context CheckPoint @@ -488,6 +487,7 @@ CompressFileHandle CompressionLocation CompressorState ComputeXidHorizonsResult +ConcurrentChangeContext ConcurrentChangeKind ConditionVariable ConditionVariableMinimallyPadded @@ -2563,6 +2563,7 @@ ReorderBufferTupleCidKey ReorderBufferUpdateProgressTxnCB ReorderTuple RepOriginId +RepackApplyRange RepackCommand RepackDecodingState RepackStmt -- 2.47.3