From 5bf277b71fca5ec3c016babd5ac483928ecab96f Mon Sep 17 00:00:00 2001 From: Antonin Houska Date: Tue, 16 Jun 2026 13:54:16 +0200 Subject: [PATCH 4/8] Use multiple snapshots to copy the data. REPACK (CONCURRENTLY) does not prevent applications from using the table that is being processed, however it can prevent the xmin horizon from advancing and thus restrict VACUUM for the whole database. This patch adds the ability to use particular snapshot only for certain range of pages. Each time that range is processed, a new snapshot is built, which supposedly has its xmin higher than the previous snapshot. Note that we still use the same XID throughout the REPACK execution, and that also prevents the xmin horizon from advancing. The following part of this series will fix the problem. To use multiple snapshots, the data copying works as follows: 1. Have the logical decoding system build a snapshot S0 for range R0 at LSN0. This snapshot sees all the data changes whose commit records have LSN < LSN0. 2. Copy the pages in that range to the new relation. The changes not visible to the snapshot (because their transactions are still running from the POV of the snapshot) will appear in the output of the logical decoding system as soon as their commit records are decoded. 3. Perform logical decoding of all changes we find in WAL for the table we're repacking, but only apply those that affect the range R0 in the old relation. (Naturally, we cannot apply ones that belong to other pages because it's impossible to UPDATE / DELETE a row in the new relation if it hasn't been copied yet.) Then consider LSN1 to be the position of the end of the last WAL record decoded. 4. Build a new snapshot S1 at position LSN1, i.e. one that sees all the data whose commit records are at WAL positions < LSN1. Use this snapshot to copy the range of pages R1. 5. Perform logical decoding like in step 3, but out of this next set, only apply changes belonging to ranges R0 *and* R1 in the old table. 6. etc Special attention needs to be paid to UPDATES that span page ranges. For example, if the old tuple is in range R0, but the new tuple is in R1, and R1 hasn't been copied yet, we only DELETE the old version from the new relation. The new version will be handled during processing of range R1. The snapshot S1 will be based on WAL position following that UPDATE, so it'll see the new tuple if its transaction's commit record is at WAL position lower than the position where we built the snapshot. On the other hand, if the commit record appears at higher position than the that of the snapshot, the insertion of the new tuple will be decoded and replayed later (after the copying of range R1 has completed). Likewise, if the old tuple is in range R1 (not yet copied) but the new tuple is in R0, we only perform INSERT on the new relation. The deletion of the old version will either be visible to the snapshot S1 (i.e. the snapshot won't see the old version), or replayed later. Due to these cross-range UPDATEs, we must apply the changes pertaining to given range before processing of the next range starts. Specifically, if UPDATE becomes DELETE for specific range, that DELETE must be replayed soon enough so that we don't see both old and new tuple when building the identity index. The problem is that if the UPDATE does not change the identity key, we'd end up with duplicate key values. Even if the USING INDEX clause is specified, a sequential scan is used to retrieve the tuples from the old relation: the approach described above requires that the tuples are in CTID order. For sorting we use a regular table ("auxiliary table"), on which we create the clustering index and scan it. The scan output is inserted into the new relation. Tuplesort is not appropriate here because it has no identity index, so it's not possible to apply the decoded changes to it "eagerly", as explained above. A new GUC repack_snapshot_after can be used to set the number of pages per snapshot. It's currently classified as DEVELOPER_OPTIONS and may be replaced by a constant after enough evaluation is done. --- src/backend/access/heap/heapam_handler.c | 238 ++++- src/backend/commands/repack.c | 861 +++++++++++++----- src/backend/commands/repack_worker.c | 97 +- src/backend/replication/logical/decode.c | 83 +- src/backend/replication/logical/logical.c | 30 +- .../replication/logical/reorderbuffer.c | 40 + src/backend/replication/logical/snapbuild.c | 30 +- src/backend/replication/pgrepack/pgrepack.c | 24 +- src/backend/utils/misc/guc_parameters.dat | 10 + src/backend/utils/misc/guc_tables.c | 1 + src/include/access/tableam.h | 14 +- src/include/commands/repack.h | 64 +- src/include/commands/repack_internal.h | 13 +- src/include/replication/logical.h | 2 +- src/include/replication/reorderbuffer.h | 7 + src/test/modules/injection_points/Makefile | 1 + .../expected/repack_snapshots.out | 393 ++++++++ src/test/modules/injection_points/meson.build | 1 + .../specs/repack_snapshots.spec | 235 +++++ 19 files changed, 1850 insertions(+), 294 deletions(-) create mode 100644 src/test/modules/injection_points/expected/repack_snapshots.out create mode 100644 src/test/modules/injection_points/specs/repack_snapshots.spec diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index 2866a5e696e..d402e51e6a1 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -43,12 +43,16 @@ #include "storage/lmgr.h" #include "storage/lock.h" #include "storage/predicate.h" +#include "storage/proc.h" #include "storage/procarray.h" #include "storage/smgr.h" #include "utils/builtins.h" +#include "utils/injection_point.h" #include "utils/rel.h" #include "utils/tuplesort.h" +static Snapshot finalize_block_range(ChangeContext *chgcxt, BlockNumber cur, + BlockNumber start, BlockNumber *end_p); static void reform_and_rewrite_tuple(TupleTableSlot *src, TupleTableSlot *reform, RewriteState rwstate); @@ -589,15 +593,14 @@ static void heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, Relation OldIndex, bool use_sort, TransactionId OldestXmin, - Snapshot snapshot, TransactionId *xid_cutoff, MultiXactId *multi_cutoff, double *num_tuples, double *tups_vacuumed, - double *tups_recently_dead) + double *tups_recently_dead, + void *tableam_data) { RewriteState rwstate; - BulkInsertState bistate; IndexScanDesc indexScan; TableScanDesc tableScan; HeapScanDesc heapScan; @@ -608,7 +611,11 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, TupleTableSlot *reform_slot; BufferHeapTupleTableSlot *hslot; BlockNumber prev_cblock = InvalidBlockNumber; - bool concurrent = snapshot != NULL; + ChangeContext *chgcxt = (ChangeContext *) tableam_data; + bool concurrent = chgcxt != NULL; + Snapshot snapshot = NULL; + BlockNumber range_start = InvalidBlockNumber; + BlockNumber range_end = InvalidBlockNumber; /* Remember if it's a system catalog */ is_system_catalog = IsSystemRelation(OldHeap); @@ -629,14 +636,11 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, else rwstate = NULL; - /* In concurrent mode, prepare for bulk-insert operation. */ - if (concurrent) - bistate = GetBulkInsertState(); - else - bistate = NULL; - - /* Set up sorting if wanted */ - if (use_sort) + /* + * Set up sorting if wanted. CONCURRENTLY sorts the tuple w/o tuplesort, + * see below. + */ + if (use_sort && !concurrent) tuplesort = tuplesort_begin_cluster(oldTupDesc, OldIndex, maintenance_work_mem, NULL, TUPLESORT_NONE); @@ -648,8 +652,11 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, * that still need to be copied, we scan with SnapshotAny and use * HeapTupleSatisfiesVacuum for the visibility test. * - * In the CONCURRENTLY case, we do regular MVCC visibility tests, using - * the snapshot passed by the caller. + * In the CONCURRENTLY case, we do regular MVCC visibility tests. The + * snapshot changes several times during the scan so that we do not block + * the progress of the xmin horizon for VACUUM too much. Index scan + * should not be used because it returns tuples in random order, which + * makes it impossible to split the scan into block ranges. */ if (OldIndex != NULL && !use_sort) { @@ -659,6 +666,8 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, }; int64 ci_val[2]; + Assert(!concurrent); + /* Set phase and OIDOldIndex to columns */ ci_val[0] = PROGRESS_REPACK_PHASE_INDEX_SCAN_HEAP; ci_val[1] = RelationGetRelid(OldIndex); @@ -666,10 +675,8 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, tableScan = NULL; heapScan = NULL; - indexScan = index_beginscan(OldHeap, OldIndex, - snapshot ? snapshot : SnapshotAny, - NULL, 0, 0, - SO_NONE); + indexScan = index_beginscan(OldHeap, OldIndex, SnapshotAny, NULL, 0, + 0, SO_NONE); index_rescan(indexScan, NULL, 0, NULL, 0); } else @@ -678,16 +685,28 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, pgstat_progress_update_param(PROGRESS_REPACK_PHASE, PROGRESS_REPACK_PHASE_SEQ_SCAN_HEAP); - tableScan = table_beginscan(OldHeap, - snapshot ? snapshot : SnapshotAny, - 0, (ScanKey) NULL, + tableScan = table_beginscan(OldHeap, SnapshotAny, 0, (ScanKey) NULL, SO_NONE); heapScan = (HeapScanDesc) tableScan; + + /* + * In CONCURRENTLY mode we scan the table by ranges of blocks and the + * algorithm below expects forward direction. (No other direction + * should be set here regardless concurrently anyway.) + */ + Assert(heapScan->rs_dir == ForwardScanDirection || !concurrent); indexScan = NULL; /* Set total heap blocks */ pgstat_progress_update_param(PROGRESS_REPACK_TOTAL_HEAP_BLKS, heapScan->rs_nblocks); + + /* Setup the first range. */ + if (concurrent) + { + range_start = heapScan->rs_startblock; + range_end = range_start + repack_pages_per_snapshot; + } } slot = table_slot_create(OldHeap, NULL); @@ -695,6 +714,30 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, reform_slot = MakeSingleTupleTableSlot(RelationGetDescr(OldHeap), &TTSOpsVirtual); + if (concurrent) + { + /* + * Do not block the progress of xmin horizons. + */ + PopActiveSnapshot(); + InvalidateCatalogSnapshot(); + + /* + * As there is no snapshot, our xmin should be invalid now. + * + * XXX xid can still be valid. The next patches in the series fix + * that. + */ + Assert(!TransactionIdIsValid(MyProc->xmin)); + + /* + * Wait until the worker has the initial snapshot and retrieve it. + */ + snapshot = repack_get_snapshot(chgcxt); + + PushActiveSnapshot(snapshot); + } + /* * Scan through the OldHeap, either in OldIndex order or sequentially; * copy each tuple into the NewHeap, or transiently to the tuplesort @@ -711,6 +754,9 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, if (indexScan != NULL) { + /* See above. */ + Assert(!concurrent); + if (!index_getnext_slot(indexScan, ForwardScanDirection, slot)) break; @@ -732,6 +778,7 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, */ pgstat_progress_update_param(PROGRESS_REPACK_HEAP_BLKS_SCANNED, heapScan->rs_nblocks); + break; } @@ -848,10 +895,39 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, continue; } } + else + { + BlockNumber blkno; + bool visible; + + /* + * With CONCURRENTLY, we use each snapshot only for certain range + * of pages, so that VACUUM does not get blocked for too long. So + * first check if the tuple falls into the current range. + */ + blkno = BufferGetBlockNumber(buf); + + Assert(BlockNumberIsValid(range_end)); + + /* End of the current range or wraparound? */ + if (blkno >= range_end || blkno < range_start) + snapshot = finalize_block_range(chgcxt, blkno, range_start, + &range_end); + + /* Finally check the tuple visibility. */ + LockBuffer(buf, BUFFER_LOCK_SHARE); + visible = HeapTupleSatisfiesVisibility(tuple, snapshot, buf); + LockBuffer(buf, BUFFER_LOCK_UNLOCK); + + if (!visible) + continue; + } *num_tuples += 1; if (tuplesort != NULL) { + Assert(!concurrent); + tuplesort_putheaptuple(tuplesort, tuple); /* @@ -872,7 +948,7 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, if (!concurrent) reform_and_rewrite_tuple(slot, reform_slot, rwstate); else - heap_insert_for_repack(NewHeap, slot, reform_slot, bistate); + heap_insert_for_repack(chgcxt, slot, reform_slot); /* * In indexscan mode and also VACUUM FULL, report increase in @@ -884,6 +960,28 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, } } + if (concurrent) + { + XLogRecPtr end_of_wal; + + /* + * Process the changes belonging to the last range. + */ + end_of_wal = GetFlushRecPtr(NULL); + repack_process_concurrent_changes(chgcxt, end_of_wal, + InvalidBlockNumber, + InvalidBlockNumber, + false, false); + + /* + * There was an active transaction snapshot on entry, so push one + * before return. + */ + PopActiveSnapshot(); + PushActiveSnapshot(GetTransactionSnapshot()); + + } + if (indexScan != NULL) index_endscan(indexScan); if (tableScan != NULL) @@ -929,10 +1027,13 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, ExecStoreHeapTuple(tuple, slot, false); n_tuples += 1; - if (!concurrent) - reform_and_rewrite_tuple(slot, reform_slot, rwstate); - else - heap_insert_for_repack(NewHeap, slot, reform_slot, bistate); + + /* + * The CONCURRENTLY mode uses auxiliary tables rather than + * tuplesort. + */ + Assert(!concurrent); + reform_and_rewrite_tuple(slot, reform_slot, rwstate); /* Report n_tuples */ pgstat_progress_update_param(PROGRESS_REPACK_HEAP_TUPLES_INSERTED, @@ -948,8 +1049,89 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, /* Write out any remaining tuples, and fsync if needed */ if (rwstate) end_heap_rewrite(rwstate); - if (bistate) - FreeBulkInsertState(bistate); +} + +/* + * Finalize processing of the current block range. + * + * 'cur' is the current block, 'start' is the first block of the current + * range. + * + * '*end_p': on entry, the first block beyond the current range, on exit, the + * first block beyond the new range. + * + * Return the snapshot for the scan of the new range. + */ +static Snapshot +finalize_block_range(ChangeContext *chgcxt, BlockNumber cur, + BlockNumber start, BlockNumber *end_p) +{ + BlockNumber end = *end_p; + XLogRecPtr end_of_wal; + Snapshot snapshot; + + /* + * Wait here when testing how snapshot is changed at page boundary. + */ + INJECTION_POINT("repack-concurrently-new-range", NULL); + + /* + * Decode all the concurrent data changes committed so far before + * requesting the next snapshot - these changes are applicable on top of + * the current snapshot. Since we only copied part of the table so far, + * only changes applicable to that part can be applied. + * + * It's important to apply the changes before we start copying the next + * range of blocks. Without that, in case of concurrent UPDATE, we could + * end up with both old and new tuple present in the new table: the old + * still visible in the current range and the new already visible in the + * following range (for which we'll use more recent snapshot). Thus it'd + * be non-trivial to apply the UPDATE later. By replaying it now, we get + * rid of the old tuple in the current range. + */ + end_of_wal = GetFlushRecPtr(NULL); + repack_process_concurrent_changes(chgcxt, end_of_wal, start, end, true, + false); + + /* + * A new snapshot will be pushed below. Note that it's important to not do + * this earlier, because - while processing the concurrent data changes - + * we might have needed to fetch TOASTed values from the old relation - + * see the UPDATE-to-INSERT conversion in apply_concurrent_changes(). As + * this snapshot protects the data copied from VACUUM, it should also + * protect the TOAST values referenced by the consequent UPDATE + * statements. + */ + PopActiveSnapshot(); + InvalidateCatalogSnapshot(); + + /* See above. */ + Assert(!TransactionIdIsValid(MyProc->xmin)); + + /* + * XXX It might be worth Assert(CatalogSnapshot == NULL) here, however + * that symbol is not external. + */ + + /* + * Compute the end of the new range by aligning 'cur' to a multiple of + * range boundary. This accounts for the possibility that some block + * numbers could have been skipped (due to pages being empty) or that the + * block number could have wrapped around. + */ + end = cur + repack_pages_per_snapshot - (cur % repack_pages_per_snapshot); + *end_p = end; + + /* + * Get the snapshot for the next range - it should have been built at the + * position right after the last change decoded. Data present in the next + * range of blocks will either be visible to the snapshot or appear in the + * next batch of decoded changes. + */ + snapshot = repack_get_snapshot(chgcxt); + PushActiveSnapshot(snapshot); + + return snapshot; } /* diff --git a/src/backend/commands/repack.c b/src/backend/commands/repack.c index 04d39ad8c86..2debadd86ed 100644 --- a/src/backend/commands/repack.c +++ b/src/backend/commands/repack.c @@ -33,6 +33,7 @@ #include "postgres.h" #include "access/amapi.h" +#include "access/detoast.h" #include "access/heapam.h" #include "access/multixact.h" #include "access/relscan.h" @@ -93,6 +94,12 @@ typedef struct Oid indexOid; } RelToCluster; +/* + * When REPACK (CONCURRENTLY) copies data to the new heap, a new snapshot is + * built after processing this many pages. XXX Tune the value. + */ +int repack_pages_per_snapshot = 1024; + /* * Backend-local information to control the decoding worker. */ @@ -126,11 +133,11 @@ static void check_concurrent_repack_requirements(Relation rel, static void rebuild_relation(Relation OldHeap, Relation index, bool verbose, Oid ident_idx); static void copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, - Snapshot snapshot, bool verbose, bool *pSwapToastByContent, TransactionId *pFreezeXid, - MultiXactId *pCutoffMulti); + MultiXactId *pCutoffMulti, + ChangeContext *chgcxt); static List *get_tables_to_repack(RepackCommand cmd, bool usingindex, MemoryContext permcxt); static List *get_tables_to_repack_partitioned(RepackCommand cmd, @@ -139,23 +146,26 @@ static List *get_tables_to_repack_partitioned(RepackCommand cmd, static bool repack_is_permitted_for_relation(RepackCommand cmd, Oid relid, Oid userid); -static void apply_concurrent_changes(ChangeContext *chgcxt); +static void apply_concurrent_changes(ChangeContext *chgcxt, + BlockNumber range_start, + BlockNumber range_end); static void apply_concurrent_insert(RepackDest *dest, TupleTableSlot *slot); static void apply_concurrent_update(RepackDest *dest, TupleTableSlot *spilled_tuple, TupleTableSlot *ondisk_tuple); static void apply_concurrent_delete(Relation rel, TupleTableSlot *slot); static void restore_tuple(BufFile *file, Relation relation, - TupleTableSlot *slot); + TupleTableSlot *slot, BlockNumber *block_nr_p, + BlockNumber *old_block_nr_p); static void adjust_toast_pointers(Relation relation, TupleTableSlot *dest, TupleTableSlot *src); +static bool is_block_in_range(BlockNumber blknum, BlockNumber start, + BlockNumber end); static bool find_target_tuple(RepackDest *dest, TupleTableSlot *locator, TupleTableSlot *retrieved); -static bool identity_key_equal(RepackDest *dest, TupleTableSlot *locator, +static bool identity_key_equal(RepackDest *dest, + TupleTableSlot *locator, TupleTableSlot *candidate); -static void process_concurrent_changes(XLogRecPtr end_of_wal, - ChangeContext *chgcxt, - bool done); static void initialize_change_context(ChangeContext *chgcxt, Relation relation, Oid ident_index_id); @@ -166,8 +176,12 @@ static void release_change_dest(RepackDest *dest); static void rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, Oid identIdx, TransactionId frozenXid, - MultiXactId cutoffMulti); + MultiXactId cutoffMulti, + ChangeContext *chgcxt); +static void process_auxiliary_table(ChangeContext *chgcxt, Relation OldHeap, + Oid identIdx); static List *build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes); +static Oid build_new_index(Relation NewHeap, Relation OldHeap, Oid oldindex); static void copy_index_constraints(Relation old_index, Oid new_index_id, Oid new_heap_id); static Relation process_single_relation(RepackStmt *stmt, @@ -180,7 +194,6 @@ static Oid determine_clustered_index(Relation rel, bool usingindex, static void start_repack_decoding_worker(Oid relid); static void stop_repack_decoding_worker(void); static void stop_repack_decoding_worker_cb(int code, Datum arg); -static Snapshot get_initial_snapshot(DecodingWorker *worker); static void ProcessRepackMessage(StringInfo msg); static const char *RepackCommandAsString(RepackCommand cmd); @@ -947,6 +960,14 @@ check_concurrent_repack_requirements(Relation rel, Oid *ident_idx_p) RelationGetRelationName(rel))); } + /* + * In the CONCURRENTLY mode we don't want to use the same snapshot + * throughout the whole processing, as it could block the progress of xmin + * horizon. Assert should be ok as we already disallow transaction block + * in the CONCURRENTLY case. + */ + Assert(!IsolationUsesXactSnapshot()); + *ident_idx_p = ident_idx; } @@ -983,7 +1004,7 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose, TransactionId frozenXid; MultiXactId cutoffMulti; bool concurrent = OidIsValid(ident_idx); - Snapshot snapshot = NULL; + ChangeContext *chgcxt = NULL; #if USE_ASSERT_CHECKING LOCKMODE lmode; @@ -1020,13 +1041,6 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose, * REPACK CONCURRENTLY. */ start_repack_decoding_worker(tableOid); - - /* - * Wait until the worker has the initial snapshot and retrieve it. - */ - snapshot = get_initial_snapshot(decoding_worker); - - PushActiveSnapshot(snapshot); } /* for CLUSTER or REPACK USING INDEX, mark the index as the one to use */ @@ -1049,19 +1063,101 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose, Assert(CheckRelationOidLockedByMe(OIDNewHeap, AccessExclusiveLock, false)); NewHeap = table_open(OIDNewHeap, NoLock); + if (concurrent) + { + bool need_aux_rel; + + /* + * Auxiliary table is needed for clustering in the CONCURRENTLY mode, + * see comments in ChangeContext. FIXME Non-btree indexes are allowed + * historically, but in general, these can hardly define any useful + * order. We ignore them here. + */ + need_aux_rel = index != NULL && index->rd_rel->relam == BTREE_AM_OID; + + /* Gather information to apply concurrent changes. */ + chgcxt = palloc0_object(ChangeContext); + + if (!need_aux_rel) + { + Oid ident_idx_new; + + /* + * Create the identity index. We will need it during data copying + * so that we can apply the data changes at the appropriate time - + * see comments around the call of + * repack_process_concurrent_changes() with block range specified. + * + * XXX NewHeap is empty - should we pass INDEX_CREATE_SKIP_BUILD? + */ + ident_idx_new = build_new_index(NewHeap, OldHeap, ident_idx); + + initialize_change_context(chgcxt, NewHeap, ident_idx_new); + } + else + { + Oid aux_oid; + Relation aux_rel; + Oid aux_ident_idx; + + /* + * As the concurrent data changes will be applied to the auxiliary + * heap, the new heap does not need the identity index yet. We'll + * build it after having copied the data from the auxiliary heap. + * (Bulk insert should be more efficient.) + */ + initialize_change_context(chgcxt, NewHeap, InvalidOid); + + /* + * Like above, but only temporary - no other backend should need + * it. + */ + aux_oid = make_new_heap(tableOid, tableSpace, accessMethod, + RELPERSISTENCE_TEMP, NoLock); + Assert(CheckRelationOidLockedByMe(aux_oid, AccessExclusiveLock, + false)); + aux_rel = table_open(aux_oid, NoLock); + + /* + * The same for identity index. (The additional + * ShareUpdateExclusiveLock on ident_idx is not a problem, it'll + * be released at the end of transaction.) + */ + aux_ident_idx = build_new_index(aux_rel, OldHeap, ident_idx); + + /* + * Make the relation ready for use. + */ + chgcxt->cc_dest_aux = palloc0_object(RepackDest); + initialize_change_dest(chgcxt->cc_dest_aux, aux_rel, + aux_ident_idx); + + /* + * Set OID of the old relation's clustering index if it's + * different from the identity index. Otherwise set InvalidOid to + * indicate that the identity index should be used for clustering. + */ + if (RelationGetRelid(index) != ident_idx) + chgcxt->cc_clustering_index = RelationGetRelid(index); + else + chgcxt->cc_clustering_index = InvalidOid; + } + } + /* Copy the heap data into the new table in the desired order */ - copy_table_data(NewHeap, OldHeap, index, snapshot, verbose, - &swap_toast_by_content, &frozenXid, &cutoffMulti); + copy_table_data(NewHeap, OldHeap, index, verbose, + &swap_toast_by_content, &frozenXid, &cutoffMulti, + chgcxt); /* The historic snapshot won't be needed anymore. */ - if (snapshot) + if (concurrent) { - PopActiveSnapshot(); + /* + * Make sure the active snapshot can see the data copied, so the rows + * can be updated / deleted. + */ UpdateActiveSnapshotCommandId(); - } - if (concurrent) - { Assert(!swap_toast_by_content); /* @@ -1072,10 +1168,16 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose, index_close(index, NoLock); rebuild_relation_finish_concurrent(NewHeap, OldHeap, ident_idx, - frozenXid, cutoffMulti); + frozenXid, cutoffMulti, chgcxt); pgstat_progress_update_param(PROGRESS_REPACK_PHASE, PROGRESS_REPACK_PHASE_FINAL_CLEANUP); + + /* + * REPACK (CONCURRENTLY) launches separate transaction(s) so it + * shouldn't rely on the current portal to pop the active snapshot. + */ + PopActiveSnapshot(); } else { @@ -1238,10 +1340,9 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod, * Insert tuple when processing REPACK CONCURRENTLY. * * rewriteheap.c is not used in the CONCURRENTLY case because it'd be - * difficult to do the same in the catch-up phase (as the logical - * decoding does not provide us with sufficient visibility - * information). Thus we must use heap_insert() both during the - * catch-up and here. + * difficult to do the same in the catch-up phase (as the logical decoding + * does not provide us with sufficient visibility information). Thus we must + * use heap_insert() both during the catch-up and here. * * 'reform' is a slot to use for tuple "reforming", typically to get set * values of dropped columns to NULL. @@ -1249,20 +1350,27 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod, * We pass the NO_LOGICAL flag to heap_insert() in order to skip logical * decoding: as soon as REPACK CONCURRENTLY swaps the relation files, it drops * this relation, so no logical replication subscription should need the data. - * - * BulkInsertState is used because many tuples are inserted in the typical - * case. */ void -heap_insert_for_repack(Relation rel, TupleTableSlot *src, - TupleTableSlot *reform, BulkInsertStateData *bistate) +heap_insert_for_repack(ChangeContext *chgcxt, TupleTableSlot *src, + TupleTableSlot *reform) { HeapTuple tuple; bool shouldFree; TupleTableSlot *slot; + RepackDest *dest; + + /* + * Use the current auxiliary table as output if one is active, otherwise + * insert the tuple into the actual destination table. + */ + if (chgcxt->cc_dest_aux) + dest = chgcxt->cc_dest_aux; + else + dest = &chgcxt->cc_dest; tuple = ExecFetchSlotHeapTuple(src, false, &shouldFree); - if (tuple_needs_reform(tuple, src->tts_tupleDescriptor)) + if (reform != NULL && tuple_needs_reform(tuple, src->tts_tupleDescriptor)) { clear_dropped_attributes(tuple, reform); slot = reform; @@ -1277,8 +1385,16 @@ heap_insert_for_repack(Relation rel, TupleTableSlot *src, if (shouldFree) heap_freetuple(tuple); - table_tuple_insert(rel, slot, GetCurrentCommandId(true), - TABLE_INSERT_NO_LOGICAL, bistate); + table_tuple_insert(dest->rel, slot, GetCurrentCommandId(true), + TABLE_INSERT_NO_LOGICAL, dest->bistate); + + /* + * Insert the tuple into the identity index. initialize_change_context() + * may skip opening of indexes if the identity index is not needed + * immediately. + */ + if (dest->rri) + ExecInsertIndexTuples(dest->rri, dest->estate, 0, slot, NIL, NULL); } bool @@ -1328,9 +1444,6 @@ clear_dropped_attributes(HeapTuple tuple, TupleTableSlot *reform) /* * Do the physical copying of table data. * - * 'snapshot' and 'decoding_ctx': see table_relation_copy_for_cluster(). Pass - * iff concurrent processing is required. - * * There are three output parameters: * *pSwapToastByContent is set true if toast tables must be swapped by content. * *pFreezeXid receives the TransactionId used as freeze cutoff point. @@ -1338,8 +1451,9 @@ clear_dropped_attributes(HeapTuple tuple, TupleTableSlot *reform) */ static void copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, - Snapshot snapshot, bool verbose, bool *pSwapToastByContent, - TransactionId *pFreezeXid, MultiXactId *pCutoffMulti) + bool verbose, bool *pSwapToastByContent, + TransactionId *pFreezeXid, MultiXactId *pCutoffMulti, + ChangeContext *chgcxt) { Relation relRelation; HeapTuple reltup; @@ -1356,7 +1470,7 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, int elevel = verbose ? INFO : DEBUG2; PGRUsage ru0; char *nspname; - bool concurrent = snapshot != NULL; + bool concurrent = chgcxt != NULL; LOCKMODE lmode; lmode = RepackLockLevel(concurrent); @@ -1460,18 +1574,28 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, cutoffs.MultiXactCutoff = relminmxid; } - /* - * Decide whether to use an indexscan or seqscan-and-optional-sort to scan - * the OldHeap. We know how to use a sort to duplicate the ordering of a - * btree index, and will use seqscan-and-sort for that case if the planner - * tells us it's cheaper. Otherwise, always indexscan if an index is - * provided, else plain seqscan. - */ - if (OldIndex != NULL && OldIndex->rd_rel->relam == BTREE_AM_OID) - use_sort = plan_cluster_use_sort(RelationGetRelid(OldHeap), - RelationGetRelid(OldIndex)); + if (!concurrent) + { + /* + * Decide whether to use an indexscan or seqscan-and-optional-sort to + * scan the OldHeap. We know how to use a sort to duplicate the + * ordering of a btree index, and will use seqscan-and-sort for that + * case if the planner tells us it's cheaper. Otherwise, always + * indexscan if an index is provided, else plain seqscan. + */ + if (OldIndex != NULL && OldIndex->rd_rel->relam == BTREE_AM_OID) + use_sort = plan_cluster_use_sort(RelationGetRelid(OldHeap), + RelationGetRelid(OldIndex)); + else + use_sort = false; + } else - use_sort = false; + { + /* + * To use multiple snapshots, we need to read the table sequentially. + */ + use_sort = true; + } /* Log what we're doing */ if (OldIndex != NULL && !use_sort) @@ -1498,11 +1622,11 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, * values (e.g. because the AM doesn't use freezing). */ table_relation_copy_for_cluster(OldHeap, NewHeap, OldIndex, use_sort, - cutoffs.OldestXmin, snapshot, + cutoffs.OldestXmin, &cutoffs.FreezeLimit, &cutoffs.MultiXactCutoff, &num_tuples, &tups_vacuumed, - &tups_recently_dead); + &tups_recently_dead, chgcxt); /* return selected values to caller, get set as relfrozenxid/minmxid */ *pFreezeXid = cutoffs.FreezeLimit; @@ -2428,6 +2552,8 @@ repack_is_permitted_for_relation(RepackCommand cmd, Oid relid, Oid userid) * instead return the opened and locked relcache entry, so that caller can * process the partitions using the multiple-table handling code. In this * case, if an index name is given, it's up to the caller to resolve it. + * + * A new transaction is started in either case. */ static Relation process_single_relation(RepackStmt *stmt, LOCKMODE lockmode, bool isTopLevel, @@ -2440,6 +2566,31 @@ process_single_relation(RepackStmt *stmt, LOCKMODE lockmode, bool isTopLevel, Assert(stmt->command == REPACK_COMMAND_CLUSTER || stmt->command == REPACK_COMMAND_REPACK); + if (params->options & CLUOPT_CONCURRENT) + { + /* + * Since REPACK (CONCURRENTLY) pops the active snapshot during the + * processing (it creates and pushes snapshots on its own), and since + * that snapshot can be referenced by the current portal, we need to + * make sure that the portal has no dangling pointer to the snapshot. + * Starting a new transaction seems to be the simplest way. + * + * XXX The following patches in the series make this unnecessary, as + * they start new transactions for other reasons elsewhere. + */ + PopActiveSnapshot(); + CommitTransactionCommand(); + + /* Start a new transaction. */ + StartTransactionCommand(); + + /* + * Functions in indexes may want a snapshot set. Note that the portal + * is not aware of this one, so the caller needs to pop it explicitly. + */ + PushActiveSnapshot(GetTransactionSnapshot()); + } + /* * Make sure ANALYZE is specified if a column list is present. */ @@ -2579,10 +2730,11 @@ RepackCommandAsString(RepackCommand cmd) } /* - * Apply all the changes provided by decoding worker. + * Apply data changes that affect pages in given range. */ static void -apply_concurrent_changes(ChangeContext *chgcxt) +apply_concurrent_changes(ChangeContext *chgcxt, BlockNumber range_start, + BlockNumber range_end) { ConcurrentChangeKind kind = '\0'; RepackDest *dest; @@ -2591,18 +2743,31 @@ apply_concurrent_changes(ChangeContext *chgcxt) TupleTableSlot *old_update_tuple; TupleTableSlot *ondisk_tuple; bool have_old_tuple = false; + bool check_range; MemoryContext oldcxt; DecodingWorkerShared *shared; char fname[MAXPGPATH]; BufFile *file; - dest = &chgcxt->cc_dest; + /* + * Use the auxiliary table if one exists, otherwise the "final" + * destination table. + */ + dest = chgcxt->cc_dest_aux ? chgcxt->cc_dest_aux : &chgcxt->cc_dest; rel = dest->rel; + /* + * Range needs to be checked if the bounds are specified. Expect either + * both or none. + */ + Assert(BlockNumberIsValid(range_start) == BlockNumberIsValid(range_end)); + check_range = BlockNumberIsValid(range_start); + shared = (DecodingWorkerShared *) dsm_segment_address(decoding_worker->seg); /* Open the file containing the changes. */ - DecodingWorkerFileName(fname, shared->relid, chgcxt->cc_file_seq); + DecodingWorkerFileName(fname, shared->relid, chgcxt->cc_file_seq_changes, + false); file = BufFileOpenFileSet(&shared->sfs.fs, fname, O_RDONLY, false); spilled_tuple = MakeSingleTupleTableSlot(RelationGetDescr(rel), @@ -2618,6 +2783,9 @@ apply_concurrent_changes(ChangeContext *chgcxt) { size_t nread; ConcurrentChangeKind prevkind = kind; + BlockNumber block, + old_block; + BlockNumber *old_block_p; CHECK_FOR_INTERRUPTS(); @@ -2632,7 +2800,7 @@ apply_concurrent_changes(ChangeContext *chgcxt) */ if (kind == CHANGE_UPDATE_OLD) { - restore_tuple(file, rel, old_update_tuple); + restore_tuple(file, rel, old_update_tuple, NULL, NULL); have_old_tuple = true; continue; } @@ -2656,22 +2824,39 @@ apply_concurrent_changes(ChangeContext *chgcxt) /* * Now restore the tuple into the slot and execute the change. + * + * old_block is only stored with UPDATE_NEW. */ - restore_tuple(file, rel, spilled_tuple); + old_block_p = kind == CHANGE_UPDATE_NEW ? &old_block : NULL; + restore_tuple(file, rel, spilled_tuple, &block, old_block_p); if (kind == CHANGE_INSERT) { - apply_concurrent_insert(dest, spilled_tuple); + /* + * Only insert the tuple if it fits into the current range (or if + * range does not matter). + */ + if (!check_range || + is_block_in_range(block, range_start, range_end)) + apply_concurrent_insert(dest, spilled_tuple); } else if (kind == CHANGE_DELETE) { - bool found; + /* + * Only delete the tuple if it fits into the current range (or if + * range does not matter). + */ + if (!check_range || + is_block_in_range(block, range_start, range_end)) + { + bool found; - /* Find the tuple to be deleted */ - found = find_target_tuple(dest, spilled_tuple, ondisk_tuple); - if (!found) - elog(ERROR, "could not find target tuple"); - apply_concurrent_delete(rel, ondisk_tuple); + /* Find the tuple to be deleted */ + found = find_target_tuple(dest, spilled_tuple, ondisk_tuple); + if (!found) + elog(ERROR, "could not find target tuple"); + apply_concurrent_delete(rel, ondisk_tuple); + } } else if (kind == CHANGE_UPDATE_NEW) { @@ -2683,21 +2868,71 @@ apply_concurrent_changes(ChangeContext *chgcxt) else key = spilled_tuple; - /* Find the tuple to be updated or deleted. */ - found = find_target_tuple(dest, key, ondisk_tuple); - if (!found) - elog(ERROR, "could not find target tuple"); - /* - * If 'tup' contains TOAST pointers, they point to the old - * relation's toast. Copy the corresponding TOAST pointers for the - * new relation from the existing tuple. (The fact that we - * received a TOAST pointer here implies that the attribute hasn't - * changed.) + * Perform normal update if both old and new version are in the + * current range. */ - adjust_toast_pointers(rel, spilled_tuple, ondisk_tuple); + if (!check_range || + (is_block_in_range(old_block, range_start, range_end) && + is_block_in_range(block, range_start, range_end))) + { + /* Find the tuple to be updated or deleted. */ + found = find_target_tuple(dest, key, ondisk_tuple); + if (!found) + elog(ERROR, "could not find target tuple"); + + /* + * If 'spilled_tuple' contains TOAST pointers, they point to + * the old relation's toast. Copy the corresponding TOAST + * pointers for the new relation from the existing tuple. (The + * fact that we received a TOAST pointer here implies that the + * attribute hasn't changed.) + */ + adjust_toast_pointers(rel, spilled_tuple, ondisk_tuple); + + apply_concurrent_update(dest, spilled_tuple, ondisk_tuple); + } + else + { + Assert(check_range); + + if (is_block_in_range(block, range_start, range_end)) + { + /* + * The old key is in another range, so only insert the new + * one into the current range. The old version should not + * be visible to the snapshot that we'll use to copy the + * other range. + * + * Unlike UPDATE, there's no old tuple to copy the TOAST + * pointers from. Therefore pass NULL for the source + * tuple, to enforce detoasting of the TOAST pointers in + * 'spilled_tuple'. + */ + adjust_toast_pointers(rel, spilled_tuple, NULL); + + apply_concurrent_insert(dest, spilled_tuple); + } + else if (is_block_in_range(old_block, range_start, range_end)) + { + found = find_target_tuple(dest, key, ondisk_tuple); + if (!found) + elog(ERROR, "could not find target tuple"); + + /* + * The new key is in another range, so only delete the old + * one from the current range. The new version should be + * visible to the snapshot that we'll use to copy the + * other range. + */ + apply_concurrent_delete(rel, ondisk_tuple); + } - apply_concurrent_update(dest, spilled_tuple, ondisk_tuple); + /* + * Otherwise, both tuple versions belong to another range, so + * there's nothing to do here. + */ + } ExecClearTuple(old_update_tuple); have_old_tuple = false; @@ -2819,7 +3054,8 @@ apply_concurrent_delete(Relation rel, TupleTableSlot *slot) * smaller than MaxAllocSize but the whole tuple is bigger. */ static void -restore_tuple(BufFile *file, Relation relation, TupleTableSlot *slot) +restore_tuple(BufFile *file, Relation relation, TupleTableSlot *slot, + BlockNumber *block_nr_p, BlockNumber *old_block_nr_p) { uint32 t_len; HeapTuple tup; @@ -2831,7 +3067,6 @@ restore_tuple(BufFile *file, Relation relation, TupleTableSlot *slot) tup->t_data = (HeapTupleHeader) ((char *) tup + HEAPTUPLESIZE); BufFileReadExact(file, tup->t_data, t_len); tup->t_len = t_len; - ItemPointerSetInvalid(&tup->t_self); tup->t_tableOid = RelationGetRelid(relation); /* @@ -2840,6 +3075,12 @@ restore_tuple(BufFile *file, Relation relation, TupleTableSlot *slot) */ ExecForceStoreHeapTuple(tup, slot, false); + /* Handle TID separate because not all tuple slots care about it. */ + if (block_nr_p) + *block_nr_p = ItemPointerGetBlockNumber(&tup->t_data->t_ctid); + if (old_block_nr_p) + BufFileReadExact(file, old_block_nr_p, sizeof(BlockNumber)); + /* * Next, read any attributes we stored separately into the tts_values * array elements expecting them, if any. This matches @@ -2886,10 +3127,12 @@ restore_tuple(BufFile *file, Relation relation, TupleTableSlot *slot) /* * Adjust 'dest' replacing any EXTERNAL_ONDISK toast pointers with the - * corresponding ones from 'src'. + * corresponding ones from 'src'. If 'src' is NULL, replace the toast pointer + * with the actual value. */ static void -adjust_toast_pointers(Relation relation, TupleTableSlot *dest, TupleTableSlot *src) +adjust_toast_pointers(Relation relation, TupleTableSlot *dest, + TupleTableSlot *src) { TupleDesc desc = dest->tts_tupleDescriptor; @@ -2910,9 +3153,44 @@ adjust_toast_pointers(Relation relation, TupleTableSlot *dest, TupleTableSlot *s varlena_dst = (varlena *) DatumGetPointer(dest->tts_values[i]); if (!VARATT_IS_EXTERNAL_ONDISK(varlena_dst)) continue; - slot_getsomeattrs(src, i + 1); - dest->tts_values[i] = src->tts_values[i]; + /* + * Ideally we just copy the value, but if there is no source tuple, we + * need to detoast the value. + */ + if (src) + { + slot_getsomeattrs(src, i + 1); + dest->tts_values[i] = src->tts_values[i]; + } + else + { + varlena *detoasted; + + detoasted = detoast_external_attr(varlena_dst); + dest->tts_values[i] = PointerGetDatum(detoasted); + } + } +} + +/* + * Check if tuple originates from given range of blocks that have already been + * copied. + */ +static bool +is_block_in_range(BlockNumber blknum, BlockNumber start, BlockNumber end) +{ + Assert(BlockNumberIsValid(start) && BlockNumberIsValid(end)); + Assert(BlockNumberIsValid(blknum)); + + if (start < end) + return blknum >= start && blknum < end; + else + { + /* Has the scan position wrapped around? */ + Assert(start > end); + + return blknum >= start || blknum < end; } } @@ -3008,75 +3286,19 @@ identity_key_equal(RepackDest *dest, TupleTableSlot *locator, } /* - * Decode and apply concurrent changes, up to (and including) the record whose - * LSN is 'end_of_wal'. - * - * XXX the names "process_concurrent_changes" and "apply_concurrent_changes" - * are far too similar to each other. + * Initialize the ChangeContext struct for the given relation. */ static void -process_concurrent_changes(XLogRecPtr end_of_wal, ChangeContext *chgcxt, bool done) -{ - DecodingWorkerShared *shared; - char fname[MAXPGPATH]; - BufFile *file; - - pgstat_progress_update_param(PROGRESS_REPACK_PHASE, - PROGRESS_REPACK_PHASE_CATCH_UP); - - /* Ask the worker for the file. */ - shared = (DecodingWorkerShared *) dsm_segment_address(decoding_worker->seg); - SpinLockAcquire(&shared->mutex); - shared->lsn_upto = end_of_wal; - shared->done = done; - SpinLockRelease(&shared->mutex); - - /* - * The worker needs to finish processing of the current WAL record. Even - * if it's idle, it'll need to close the output file. Thus we're likely to - * wait, so prepare for sleep. - */ - ConditionVariablePrepareToSleep(&shared->cv); - for (;;) - { - int last_exported; - - SpinLockAcquire(&shared->mutex); - last_exported = shared->last_exported; - SpinLockRelease(&shared->mutex); - - /* - * Has the worker exported the file we are waiting for? - */ - if (last_exported == chgcxt->cc_file_seq) - break; - - ConditionVariableSleep(&shared->cv, WAIT_EVENT_REPACK_WORKER_EXPORT); - } - ConditionVariableCancelSleep(); - - /* Open the file. */ - DecodingWorkerFileName(fname, shared->relid, chgcxt->cc_file_seq); - file = BufFileOpenFileSet(&shared->sfs.fs, fname, O_RDONLY, false); - apply_concurrent_changes(chgcxt); - - BufFileClose(file); - - /* Get ready for the next file. */ - chgcxt->cc_file_seq++; -} - -/* - * Initialize the ChangeContext struct for the given relation, with - * the given index as identity index. - */ -static void -initialize_change_context(ChangeContext *chgcxt, - Relation relation, Oid ident_index_id) +initialize_change_context(ChangeContext *chgcxt, Relation relation, + Oid ident_index_id) { initialize_change_dest(&chgcxt->cc_dest, relation, ident_index_id); - chgcxt->cc_file_seq = WORKER_FILE_SNAPSHOT + 1; + chgcxt->cc_file_seq_snapshot = 0; + chgcxt->cc_file_seq_changes = 0; + + chgcxt->cc_dest_aux = NULL; + chgcxt->cc_clustering_index = InvalidOid; } /* @@ -3086,6 +3308,8 @@ static void release_change_context(ChangeContext *chgcxt) { release_change_dest(&chgcxt->cc_dest); + if (chgcxt->cc_dest_aux) + release_change_dest(chgcxt->cc_dest_aux); } /* @@ -3100,6 +3324,10 @@ initialize_change_dest(RepackDest *dest, Relation relation, dest->rel = relation; dest->bistate = GetBulkInsertState(); + /* If there's no identity index yet, there should be no indexes at all. */ + if (!OidIsValid(ident_index_id)) + return; + /* Only initialize fields needed by ExecInsertIndexTuples(). */ dest->estate = CreateExecutorState(); @@ -3185,6 +3413,11 @@ static void release_change_dest(RepackDest *dest) { FreeBulkInsertState(dest->bistate); + + /* It's possible that no indexes were opened during initialization. */ + if (dest->rri == NULL) + return; + ExecCloseIndices(dest->rri); FreeExecutorState(dest->estate); /* XXX are these pfrees necessary? */ @@ -3203,7 +3436,8 @@ release_change_dest(RepackDest *dest) static void rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, Oid identIdx, TransactionId frozenXid, - MultiXactId cutoffMulti) + MultiXactId cutoffMulti, + ChangeContext *chgcxt) { List *ind_oids_new; Oid old_table_oid = RelationGetRelid(OldHeap); @@ -3213,14 +3447,17 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, *lc2; char relpersistence; bool is_system_catalog; - Oid ident_idx_new; XLogRecPtr end_of_wal; List *indexrels; - ChangeContext chgcxt; + List *inds_tmp = NIL; Assert(CheckRelationLockedByMe(OldHeap, ShareUpdateExclusiveLock, false)); Assert(CheckRelationLockedByMe(NewHeap, AccessExclusiveLock, false)); + /* If we have the auxiliary table, this is the moment we should use it. */ + if (chgcxt->cc_dest_aux) + process_auxiliary_table(chgcxt, OldHeap, identIdx); + /* * Unlike the exclusive case, we build new indexes for the new relation * rather than swapping the storage and reindexing the old relation. The @@ -3236,32 +3473,26 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, * might not be enough for commands like ALTER INDEX ... SET ... (Those * are not necessarily dangerous, but can make user confused if the * changes they do get lost due to REPACK.) + * + * As the identity index already had to be built, skip it here. XXX + * Consider if the retail inserts during data copying (in the case w/o + * auxiliary table) can be a problem in terms of index layout. Shouldn't + * we drop the identity index and build it using bulk insert too? */ + foreach_oid(ind_oid, ind_oids_old) + { + if (ind_oid != identIdx) + inds_tmp = lappend_oid(inds_tmp, ind_oid); + } + ind_oids_old = inds_tmp; ind_oids_new = build_new_indexes(NewHeap, OldHeap, ind_oids_old); /* - * The identity index in the new relation appears in the same relative - * position as the corresponding index in the old relation. Find it. + * The identity index will be involved in the following processing. */ - ident_idx_new = InvalidOid; - foreach_oid(ind_old, ind_oids_old) - { - if (identIdx == ind_old) - { - int pos = foreach_current_index(ind_old); - - if (list_length(ind_oids_new) <= pos) - elog(ERROR, "list of new indexes too short"); - ident_idx_new = list_nth_oid(ind_oids_new, pos); - break; - } - } - if (!OidIsValid(ident_idx_new)) - elog(ERROR, "could not find index matching \"%s\" at the new relation", - get_rel_name(identIdx)); - - /* Gather information to apply concurrent changes. */ - initialize_change_context(&chgcxt, NewHeap, ident_idx_new); + ind_oids_old = lappend_oid(ind_oids_old, identIdx); + ind_oids_new = lappend_oid(ind_oids_new, + RelationGetRelid(chgcxt->cc_dest.ident_index)); /* * During testing, wait for another backend to perform concurrent data @@ -3278,11 +3509,13 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, end_of_wal = GetFlushRecPtr(NULL); /* - * Apply concurrent changes first time, to minimize the time we need to - * hold AccessExclusiveLock. (Quite some amount of WAL could have been + * Decode and apply concurrent changes again, to minimize the time we need + * to hold AccessExclusiveLock. (Quite some amount of WAL could have been * written during the data copying and index creation.) */ - process_concurrent_changes(end_of_wal, &chgcxt, false); + repack_process_concurrent_changes(chgcxt, end_of_wal, + InvalidBlockNumber, InvalidBlockNumber, + false, false); /* * Acquire AccessExclusiveLock on the table, its TOAST relation (if there @@ -3336,10 +3569,12 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, end_of_wal = GetFlushRecPtr(NULL); /* - * Apply the concurrent changes again. Indicate that the decoding worker - * won't be needed anymore. + * Decode and apply the concurrent changes again. Indicate that the + * decoding worker won't be needed anymore. */ - process_concurrent_changes(end_of_wal, &chgcxt, true); + repack_process_concurrent_changes(chgcxt, end_of_wal, + InvalidBlockNumber, InvalidBlockNumber, + false, true); /* Remember info about rel before closing OldHeap */ relpersistence = OldHeap->rd_rel->relpersistence; @@ -3387,7 +3622,7 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, table_close(NewHeap, NoLock); /* Cleanup what we don't need anymore. (And close the identity index.) */ - release_change_context(&chgcxt); + release_change_context(chgcxt); /* * Swap the relations and their TOAST relations and TOAST indexes. This @@ -3406,6 +3641,103 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, relpersistence); } +/* + * Copy the contents of the auxiliary table to the new table in the desired + * order, then drop the auxiliary table. + */ +static void +process_auxiliary_table(ChangeContext *chgcxt, Relation OldHeap, Oid identIdx) +{ + RepackDest *dest = chgcxt->cc_dest_aux; + Oid ident_idx_new; + Relation clustering_index; + IndexScanDesc scan; + TupleTableSlot *slot; + Oid aux_oid; + ObjectAddress object; + Relation rel; + + /* + * First, make sure the clustering index exists. + */ + if (OidIsValid(chgcxt->cc_clustering_index)) + { + Oid cl_ind_oid; + + /* + * Create it according to the clustering index on the old relation. + */ + cl_ind_oid = build_new_index(dest->rel, OldHeap, + chgcxt->cc_clustering_index); + clustering_index = index_open(cl_ind_oid, NoLock); + } + else + { + /* The identity index is also the clustering index. */ + clustering_index = dest->ident_index; + } + + /* + * Now do the copying. Before starting, clear ->cc_dest_aux so that + * insertions go to the final table, rather than the auxiliary one. + */ + chgcxt->cc_dest_aux = NULL; + slot = table_slot_create(dest->rel, NULL); + + /* + * Note: the current active snapshot blocks the progress of xmin + * horizon(s). The next patches in the series should fix this by using a + * new kind of snapshot (which we can use here because there are no + * transaction aborts in the auxiliary table). + */ + scan = index_beginscan(dest->rel, clustering_index, GetActiveSnapshot(), + NULL, 0, 0, SO_NONE); + index_rescan(scan, NULL, 0, NULL, 0); + for (;;) + { + CHECK_FOR_INTERRUPTS(); + + if (!index_getnext_slot(scan, ForwardScanDirection, slot)) + break; + + /* + * Reforming should have been performed during insertions into the + * auxiliary table. + */ + heap_insert_for_repack(chgcxt, slot, NULL); + } + index_endscan(scan); + ExecDropSingleTupleTableSlot(slot); + + /* + * Close the relation, its identity index and clustering index if we had + * to open it above. Lock will be released on commit. + */ + aux_oid = RelationGetRelid(dest->rel); + table_close(dest->rel, NoLock); + if (OidIsValid(chgcxt->cc_clustering_index)) + index_close(clustering_index, NoLock); + /* Here we close the other indexes. */ + release_change_dest(dest); + + /* Drop the auxiliary table. */ + object.classId = RelationRelationId; + object.objectId = aux_oid; + object.objectSubId = 0; + performDeletion(&object, DROP_RESTRICT, PERFORM_DELETION_INTERNAL); + + /* Build the identity index on the new relation. */ + ident_idx_new = build_new_index(chgcxt->cc_dest.rel, OldHeap, identIdx); + + /* + * Make the new heap ready to use the index for future replaying of + * concurrent changes. + */ + rel = chgcxt->cc_dest.rel; + release_change_dest(&chgcxt->cc_dest); + initialize_change_dest(&chgcxt->cc_dest, rel, ident_idx_new); +} + /* * Build indexes on NewHeap according to those on OldHeap. * @@ -3421,34 +3753,48 @@ build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes) { List *result = NIL; - pgstat_progress_update_param(PROGRESS_REPACK_PHASE, - PROGRESS_REPACK_PHASE_REBUILD_INDEX); - foreach_oid(oldindex, OldIndexes) { Oid newindex; - char *newName; - Relation ind; - - ind = index_open(oldindex, ShareUpdateExclusiveLock); - - newName = ChooseRelationName(get_rel_name(oldindex), - NULL, - "repacknew", - get_rel_namespace(ind->rd_index->indrelid), - false); - newindex = index_create_copy(NewHeap, INDEX_CREATE_SUPPRESS_PROGRESS, - oldindex, ind->rd_rel->reltablespace, - newName); - copy_index_constraints(ind, newindex, RelationGetRelid(NewHeap)); - result = lappend_oid(result, newindex); - index_close(ind, NoLock); + newindex = build_new_index(NewHeap, OldHeap, oldindex); + result = lappend_oid(result, newindex); } return result; } +/* + * Subroutine of build_new_indexes(). + */ +static Oid +build_new_index(Relation NewHeap, Relation OldHeap, Oid oldindex) +{ + Oid newindex; + char *newName; + Relation ind; + + pgstat_progress_update_param(PROGRESS_REPACK_PHASE, + PROGRESS_REPACK_PHASE_REBUILD_INDEX); + + ind = index_open(oldindex, ShareUpdateExclusiveLock); + + newName = ChooseRelationName(get_rel_name(oldindex), + NULL, + "repacknew", + get_rel_namespace(ind->rd_index->indrelid), + false); + /* Functions in indexes may want a snapshot set. */ + Assert(ActiveSnapshotSet()); + newindex = index_create_copy(NewHeap, INDEX_CREATE_SUPPRESS_PROGRESS, + oldindex, ind->rd_rel->reltablespace, + newName); + copy_index_constraints(ind, newindex, RelationGetRelid(NewHeap)); + index_close(ind, NoLock); + + return newindex; +} + /* * Create a transient copy of a constraint -- supported by a transient * copy of the index that supports the original constraint. @@ -3545,10 +3891,13 @@ start_repack_decoding_worker(Oid relid) shared = (DecodingWorkerShared *) dsm_segment_address(decoding_worker->seg); shared->initialized = false; + /* Snapshot is the first thing we need from the worker. */ + shared->snapshot_requested = true; shared->lsn_upto = InvalidXLogRecPtr; shared->done = false; SharedFileSetInit(&shared->sfs, decoding_worker->seg); - shared->last_exported = -1; + shared->last_exported_snapshot = -1; + shared->last_exported_changes = -1; SpinLockInit(&shared->mutex); shared->dbid = MyDatabaseId; @@ -3670,10 +4019,10 @@ stop_repack_decoding_worker_cb(int code, Datum arg) } /* - * Get the initial snapshot from the decoding worker. + * Get snapshot from the decoding worker. */ -static Snapshot -get_initial_snapshot(DecodingWorker *worker) +Snapshot +repack_get_snapshot(ChangeContext *chgcxt) { DecodingWorkerShared *shared; char fname[MAXPGPATH]; @@ -3682,12 +4031,13 @@ get_initial_snapshot(DecodingWorker *worker) char *snap_space; Snapshot snapshot; - shared = (DecodingWorkerShared *) dsm_segment_address(worker->seg); + shared = (DecodingWorkerShared *) dsm_segment_address(decoding_worker->seg); /* - * The worker needs to initialize the logical decoding, which usually - * takes some time. Therefore it makes sense to prepare for the sleep - * first. + * For the first snapshot request, the worker needs to initialize the + * logical decoding, which usually takes some time. Therefore it makes + * sense to prepare for the sleep first. Does it make sense to skip the + * preparation on the next requests? */ ConditionVariablePrepareToSleep(&shared->cv); for (;;) @@ -3695,13 +4045,13 @@ get_initial_snapshot(DecodingWorker *worker) int last_exported; SpinLockAcquire(&shared->mutex); - last_exported = shared->last_exported; + last_exported = shared->last_exported_snapshot; SpinLockRelease(&shared->mutex); /* * Has the worker exported the file we are waiting for? */ - if (last_exported == WORKER_FILE_SNAPSHOT) + if (last_exported == chgcxt->cc_file_seq_snapshot) break; ConditionVariableSleep(&shared->cv, WAIT_EVENT_REPACK_WORKER_EXPORT); @@ -3709,20 +4059,98 @@ get_initial_snapshot(DecodingWorker *worker) ConditionVariableCancelSleep(); /* Read the snapshot from a file. */ - DecodingWorkerFileName(fname, shared->relid, WORKER_FILE_SNAPSHOT); + DecodingWorkerFileName(fname, shared->relid, chgcxt->cc_file_seq_snapshot, + true); file = BufFileOpenFileSet(&shared->sfs.fs, fname, O_RDONLY, false); BufFileReadExact(file, &snap_size, sizeof(snap_size)); snap_space = (char *) palloc(snap_size); BufFileReadExact(file, snap_space, snap_size); BufFileClose(file); +#ifdef USE_ASSERT_CHECKING + SpinLockAcquire(&shared->mutex); + Assert(!shared->snapshot_requested); + shared->snapshot_requested = false; + SpinLockRelease(&shared->mutex); +#endif + /* Restore it. */ snapshot = RestoreSnapshot(snap_space); pfree(snap_space); + /* Get ready for the next snapshot. */ + chgcxt->cc_file_seq_snapshot++; + return snapshot; } +/* + * Get concurrent changes, up to (and including) the record whose LSN is + * 'end_of_wal', from the decoding worker, and apply them to the new table. If + * block range is specified, only apply changes related to that range. + * + * If 'request_snapshot' is true, the snapshot built at LSN following the last + * data change needs to be exported too. + */ +void +repack_process_concurrent_changes(ChangeContext *chgcxt, + XLogRecPtr end_of_wal, + BlockNumber range_start, + BlockNumber range_end, + bool request_snapshot, bool done) +{ + DecodingWorkerShared *shared; + + pgstat_progress_update_param(PROGRESS_REPACK_PHASE, + PROGRESS_REPACK_PHASE_CATCH_UP); + + /* Ask the worker for the file. */ + shared = (DecodingWorkerShared *) dsm_segment_address(decoding_worker->seg); + SpinLockAcquire(&shared->mutex); + shared->lsn_upto = end_of_wal; + Assert(!shared->snapshot_requested); + shared->snapshot_requested = request_snapshot; + shared->done = done; + SpinLockRelease(&shared->mutex); + + /* + * The worker needs to finish processing of the current WAL record. Even + * if it's idle, it'll need to close the output file. Thus we're likely to + * wait, so prepare for sleep. + */ + ConditionVariablePrepareToSleep(&shared->cv); + for (;;) + { + int last_exported; + + SpinLockAcquire(&shared->mutex); + last_exported = shared->last_exported_changes; + SpinLockRelease(&shared->mutex); + + /* + * Has the worker exported the file we are waiting for? + */ + if (last_exported == chgcxt->cc_file_seq_changes) + break; + + ConditionVariableSleep(&shared->cv, WAIT_EVENT_REPACK_WORKER_EXPORT); + } + ConditionVariableCancelSleep(); + +#ifdef USE_ASSERT_CHECKING + /* No file is exported until the worker exports the next one. */ + SpinLockAcquire(&shared->mutex); + Assert(XLogRecPtrIsInvalid(shared->lsn_upto)); + SpinLockRelease(&shared->mutex); +#endif + + /* Apply the changes to the new table. */ + apply_concurrent_changes(chgcxt, range_start, range_end); + + /* Get ready for the next set of changes. */ + chgcxt->cc_file_seq_changes++; +} + /* * Generate worker's file name into 'fname', which must be of size MAXPGPATH. * If relations of the same 'relid' happen to be processed at the same time, @@ -3730,10 +4158,13 @@ get_initial_snapshot(DecodingWorker *worker) * be involved. */ void -DecodingWorkerFileName(char *fname, Oid relid, uint32 seq) +DecodingWorkerFileName(char *fname, Oid relid, uint32 seq, bool snapshot) { /* The PID is already present in the fileset name, so we needn't add it */ - snprintf(fname, MAXPGPATH, "%u-%u", relid, seq); + if (!snapshot) + snprintf(fname, MAXPGPATH, "%u-%u", relid, seq); + else + snprintf(fname, MAXPGPATH, "%u-%u-snapshot", relid, seq); } /* diff --git a/src/backend/commands/repack_worker.c b/src/backend/commands/repack_worker.c index db9ff057cc6..461d60ec0ca 100644 --- a/src/backend/commands/repack_worker.c +++ b/src/backend/commands/repack_worker.c @@ -33,8 +33,7 @@ static void RepackWorkerShutdown(int code, Datum arg); static LogicalDecodingContext *repack_setup_logical_decoding(Oid relid); static void repack_cleanup_logical_decoding(LogicalDecodingContext *ctx); -static void export_initial_snapshot(Snapshot snapshot, - DecodingWorkerShared *shared); +static void export_snapshot(Snapshot snapshot, DecodingWorkerShared *shared); static bool decode_concurrent_changes(LogicalDecodingContext *ctx, DecodingWorkerShared *shared); @@ -65,6 +64,8 @@ RepackWorkerMain(Datum main_arg) shm_mq_handle *mqh; LogicalDecodingContext *decoding_ctx; SharedFileSet *sfs; + RepackDecodingState *dstate; + MemoryContext oldcxt; Snapshot snapshot; am_repack_worker = true; @@ -118,7 +119,9 @@ RepackWorkerMain(Datum main_arg) * anything in the shared memory until we have serialized the snapshot. */ SpinLockAcquire(&shared->mutex); - Assert(!XLogRecPtrIsValid(shared->lsn_upto)); + /* Initially we're expected to provide a snapshot and only that. */ + Assert(shared->snapshot_requested && + XLogRecPtrIsInvalid(shared->lsn_upto)); sfs = &shared->sfs; SpinLockRelease(&shared->mutex); @@ -139,9 +142,25 @@ RepackWorkerMain(Datum main_arg) XactIsoLevel = XACT_REPEATABLE_READ; XactReadOnly = true; - /* Build the initial snapshot and export it. */ + /* + * Build the initial snapshot and export it. + * + * Since there is no API to free the "external snapshot", and since such + * snapshot is not guaranteed to be flat (i.e. pfree() is not appropriate) + * the easiest way to clean it up is to use a separate memory context for + * it. + */ + dstate = (RepackDecodingState *) decoding_ctx->output_writer_private; + MemoryContextReset(dstate->snapshot_cxt); + oldcxt = MemoryContextSwitchTo(dstate->snapshot_cxt); snapshot = SnapBuildInitialSnapshot(decoding_ctx->snapshot_builder); - export_initial_snapshot(snapshot, shared); + MemoryContextSwitchTo(oldcxt); + export_snapshot(snapshot, shared); + + /* + * Adjust the replication slot's xmin so that VACUUM can do more work. + */ + LogicalIncreaseXminForSlot(InvalidXLogRecPtr, snapshot->xmin, false); /* * Only historic snapshots should be used now. Do not let us restrict the @@ -307,7 +326,7 @@ repack_cleanup_logical_decoding(LogicalDecodingContext *ctx) * Make snapshot available to the backend that launched the decoding worker. */ static void -export_initial_snapshot(Snapshot snapshot, DecodingWorkerShared *shared) +export_snapshot(Snapshot snapshot, DecodingWorkerShared *shared) { char fname[MAXPGPATH]; BufFile *file; @@ -318,7 +337,9 @@ export_initial_snapshot(Snapshot snapshot, DecodingWorkerShared *shared) snap_space = (char *) palloc(snap_size); SerializeSnapshot(snapshot, snap_space); - DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1); + DecodingWorkerFileName(fname, shared->relid, + shared->last_exported_snapshot + 1, + true); file = BufFileCreateFileSet(&shared->sfs.fs, fname); /* To make restoration easier, write the snapshot size first. */ BufFileWrite(file, &snap_size, sizeof(snap_size)); @@ -328,7 +349,8 @@ export_initial_snapshot(Snapshot snapshot, DecodingWorkerShared *shared) /* Increase the counter to tell the backend that the file is available. */ SpinLockAcquire(&shared->mutex); - shared->last_exported++; + shared->last_exported_snapshot++; + shared->snapshot_requested = false; SpinLockRelease(&shared->mutex); ConditionVariableSignal(&shared->cv); } @@ -343,6 +365,7 @@ decode_concurrent_changes(LogicalDecodingContext *ctx, DecodingWorkerShared *shared) { RepackDecodingState *dstate; + bool snapshot_requested; XLogRecPtr lsn_upto; bool done; char fname[MAXPGPATH]; @@ -350,11 +373,14 @@ decode_concurrent_changes(LogicalDecodingContext *ctx, dstate = (RepackDecodingState *) ctx->output_writer_private; /* Open the output file. */ - DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1); + DecodingWorkerFileName(fname, shared->relid, + shared->last_exported_changes + 1, + false); dstate->file = BufFileCreateFileSet(&shared->sfs.fs, fname); SpinLockAcquire(&shared->mutex); lsn_upto = shared->lsn_upto; + snapshot_requested = shared->snapshot_requested; done = shared->done; SpinLockRelease(&shared->mutex); @@ -437,6 +463,7 @@ decode_concurrent_changes(LogicalDecodingContext *ctx, { SpinLockAcquire(&shared->mutex); lsn_upto = shared->lsn_upto; + snapshot_requested = shared->snapshot_requested; /* 'done' should be set at the same time as 'lsn_upto' */ done = shared->done; SpinLockRelease(&shared->mutex); @@ -483,9 +510,59 @@ decode_concurrent_changes(LogicalDecodingContext *ctx, */ BufFileClose(dstate->file); dstate->file = NULL; + + /* + * Before publishing the data changes, export the snapshot too if + * requested. Publishing both at once makes sense because both are needed + * at the same time, and it's simpler. + */ + if (snapshot_requested) + { + Snapshot snapshot; + MemoryContext oldcxt; + + /* See comments about memory context in RepackWorkerMain(). */ + MemoryContextReset(dstate->snapshot_cxt); + oldcxt = MemoryContextSwitchTo(dstate->snapshot_cxt); + + /* + * SnapBuildInitialSnapshot() assumes invalid XID, so set it. We do + * not use the snapshot, so it's ok. + */ + MyProc->xmin = InvalidTransactionId; + snapshot = SnapBuildInitialSnapshot(ctx->snapshot_builder); + MemoryContextSwitchTo(oldcxt); + export_snapshot(snapshot, shared); + + /* + * Adjust the replication slot's xmin so that VACUUM can do more work. + */ + LogicalIncreaseXminForSlot(InvalidXLogRecPtr, snapshot->xmin, false); + } + else + { + /* + * If data changes were requested but no following snapshot, we don't + * care about xmin horizon because the heap copying should be done by + * now. + */ + LogicalIncreaseXminForSlot(InvalidXLogRecPtr, InvalidTransactionId, + false); + + } + + /* + * Make sure the xmin of our slot is taken into account when computing new + * VACUUM horizons. + */ + ReplicationSlotsComputeRequiredXmin(false); + + /* + * Now increase the counter(s) to announce that the output is available. + */ SpinLockAcquire(&shared->mutex); + shared->last_exported_changes++; shared->lsn_upto = InvalidXLogRecPtr; - shared->last_exported++; SpinLockRelease(&shared->mutex); ConditionVariableSignal(&shared->cv); diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index c944be4ac83..c3722b5c623 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -921,6 +921,7 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) xl_heap_insert *xlrec; ReorderBufferChange *change; RelFileLocator target_locator; + BlockNumber blknum; xlrec = (xl_heap_insert *) XLogRecGetData(r); @@ -932,7 +933,7 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) return; /* only interested in our database */ - XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL); + XLogRecGetBlockTag(r, 0, &target_locator, NULL, &blknum); if (target_locator.dbOid != ctx->slot->data.database) return; @@ -947,7 +948,8 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT; change->origin_id = XLogRecGetOrigin(r); - memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator)); + memcpy(&change->data.tp.rlocator, &target_locator, + sizeof(RelFileLocator)); tupledata = XLogRecGetBlockData(r, 0, &datalen); tuplelen = datalen - SizeOfHeapHeader; @@ -957,6 +959,20 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple); + /* + * REPACK (CONCURRENTLY) needs block number to check if the corresponding + * part of the table was already copied. XXX Should we only do this if + * AmRepackWorker()? It might save a few cycles, but not sure it's good to + * leave the fields unset in other cases. + */ + { + HeapTupleHeader header; + + header = change->data.tp.newtuple->t_data; + /* offnum is not really needed, but let's set valid pointer. */ + ItemPointerSet(&header->t_ctid, blknum, xlrec->offnum); + } + change->data.tp.clear_toast_afterwards = true; ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, @@ -978,11 +994,13 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ReorderBufferChange *change; char *data; RelFileLocator target_locator; + BlockNumber new_blknum, + old_blknum; xlrec = (xl_heap_update *) XLogRecGetData(r); /* only interested in our database */ - XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL); + XLogRecGetBlockTag(r, 0, &target_locator, NULL, &new_blknum); if (target_locator.dbOid != ctx->slot->data.database) return; @@ -990,6 +1008,11 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) return; + if (XLogRecHasBlockRef(r, 1)) + XLogRecGetBlockTag(r, 1, NULL, NULL, &old_blknum); + else + old_blknum = new_blknum; + change = ReorderBufferAllocChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_UPDATE; change->origin_id = XLogRecGetOrigin(r); @@ -1008,6 +1031,20 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen); DecodeXLogTuple(data, datalen, change->data.tp.newtuple); + + /* + * REPACK (CONCURRENTLY) needs block numbers to check if the + * corresponding part of the table was already copied. XXX Do this + * only if AmRepackWorker()? + */ + { + HeapTupleHeader header; + + header = change->data.tp.newtuple->t_data; + /* offnum is not really needed, but let's set valid pointer. */ + ItemPointerSet(&header->t_ctid, new_blknum, xlrec->new_offnum); + change->data.tp.old_blknum = old_blknum; + } } if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD) @@ -1044,6 +1081,7 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) xl_heap_delete *xlrec; ReorderBufferChange *change; RelFileLocator target_locator; + BlockNumber blknum; xlrec = (xl_heap_delete *) XLogRecGetData(r); @@ -1057,7 +1095,7 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) return; /* only interested in our database */ - XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL); + XLogRecGetBlockTag(r, 0, &target_locator, NULL, &blknum); if (target_locator.dbOid != ctx->slot->data.database) return; @@ -1089,6 +1127,19 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete, datalen, change->data.tp.oldtuple); + + /* + * REPACK (CONCURRENTLY) needs block number to check if the + * corresponding part of the table was already copied. XXX Do this + * only if AmRepackWorker()? + */ + { + HeapTupleHeader header; + + header = change->data.tp.oldtuple->t_data; + /* offnum is not really needed, but let's set valid pointer. */ + ItemPointerSet(&header->t_ctid, blknum, xlrec->offnum); + } } change->data.tp.clear_toast_afterwards = true; @@ -1148,8 +1199,11 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) char *tupledata; Size tuplelen; RelFileLocator rlocator; + BlockNumber blknum; + bool isinit; xlrec = (xl_heap_multi_insert *) XLogRecGetData(r); + isinit = (XLogRecGetInfo(r) & XLOG_HEAP_INIT_PAGE) != 0; /* * Ignore insert records without new tuples. This happens when a @@ -1159,7 +1213,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) return; /* only interested in our database */ - XLogRecGetBlockTag(r, 0, &rlocator, NULL, NULL); + XLogRecGetBlockTag(r, 0, &rlocator, NULL, &blknum); if (rlocator.dbOid != ctx->slot->data.database) return; @@ -1227,6 +1281,25 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) else change->data.tp.clear_toast_afterwards = false; + /* + * REPACK (CONCURRENTLY) needs block number to check if the + * corresponding part of the table was already copied. + */ + if (AmRepackWorker()) + { + OffsetNumber offnum; + + /* + * offnum is not really needed, but let's set valid pointer. (It + * will be invalid anyway if the page was initially empty.) + */ + if (isinit) + offnum = FirstOffsetNumber + i; + else + offnum = xlrec->offsets[i]; + ItemPointerSet(&header->t_ctid, blknum, offnum); + } + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change, false); diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 3541fc793e4..fdcaa5036b4 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1659,14 +1659,17 @@ update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, /* * Set the required catalog xmin horizon for historic snapshots in the current - * replication slot. + * replication slot if catalog is TRUE, or xmin if catalog is FALSE. * * Note that in the most cases, we won't be able to immediately use the xmin * to increase the xmin horizon: we need to wait till the client has confirmed - * receiving current_lsn with LogicalConfirmReceivedLocation(). + * receiving current_lsn with LogicalConfirmReceivedLocation(). However, + * catalog=FALSE is only allowed for temporary replication slots, so the + * horizon is applied immediately. */ void -LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin) +LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin, + bool catalog) { bool updated_xmin = false; ReplicationSlot *slot; @@ -1677,6 +1680,27 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin) Assert(slot != NULL); SpinLockAcquire(&slot->mutex); + if (!catalog) + { + /* + * The non-catalog horizon can only advance in temporary slots, so + * update it in the shared memory immediately (w/o requiring prior + * saving to disk). + */ + Assert(slot->data.persistency == RS_TEMPORARY); + + /* + * The horizon must not go backwards, however it's o.k. to become + * invalid. + */ + Assert(!TransactionIdIsValid(slot->effective_xmin) || + !TransactionIdIsValid(xmin) || + TransactionIdFollowsOrEquals(xmin, slot->effective_xmin)); + + slot->effective_xmin = xmin; + SpinLockRelease(&slot->mutex); + return; + } /* * don't overwrite if we already have a newer xmin. This can happen if we diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 682d13c9f22..06f31e02b9a 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -3732,6 +3732,40 @@ ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid) return rbtxn_has_catalog_changes(txn); } +/* + * Check if a transaction (or its subtransaction) contains a heap change. + */ +bool +ReorderBufferXidHasHeapChanges(ReorderBuffer *rb, TransactionId xid) +{ + ReorderBufferTXN *txn; + dlist_iter iter; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + if (txn == NULL) + return false; + + dlist_foreach(iter, &txn->changes) + { + ReorderBufferChange *change; + + change = dlist_container(ReorderBufferChange, node, iter.cur); + + switch (change->action) + { + case REORDER_BUFFER_CHANGE_INSERT: + case REORDER_BUFFER_CHANGE_UPDATE: + case REORDER_BUFFER_CHANGE_DELETE: + return true; + default: + break; + } + } + + return false; +} + /* * ReorderBufferXidHasBaseSnapshot * Have we already set the base snapshot for the given txn/subtxn? @@ -5229,6 +5263,12 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, Assert(newtup->t_len <= MaxHeapTupleSize); Assert(newtup->t_data == (HeapTupleHeader) ((char *) newtup + HEAPTUPLESIZE)); + /* + * Preserve TID - REPACK relies on it when dealing with block ranges. XXX + * Shouldn't we add a new field to ReorderBufferChange instead? + */ + tmphtup->t_data->t_ctid = newtup->t_data->t_ctid; + memcpy(newtup->t_data, tmphtup->t_data, tmphtup->t_len); newtup->t_len = tmphtup->t_len; diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index b8992234924..254f0e1aa9d 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -128,6 +128,7 @@ #include "access/heapam_xlog.h" #include "access/transam.h" #include "access/xact.h" +#include "commands/repack.h" #include "common/file_utils.h" #include "miscadmin.h" #include "pgstat.h" @@ -982,6 +983,13 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, } } + /* + * REPACK decoding worker may need timetravel anytime. It takes + * responsibility for tracking transaction commits, see below. + */ + else if (AmRepackWorker()) + needs_timetravel = true; + for (nxact = 0; nxact < nsubxacts; nxact++) { TransactionId subxid = subxacts[nxact]; @@ -989,8 +997,12 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, /* * Add subtransaction to base snapshot if catalog modifying, we don't * distinguish to toplevel transactions there. + * + * See comments on REPACK worker below. */ - if (SnapBuildXidHasCatalogChanges(builder, subxid, xinfo)) + if (SnapBuildXidHasCatalogChanges(builder, subxid, xinfo) || + (AmRepackWorker() && + ReorderBufferXidHasHeapChanges(builder->reorder, xid))) { sub_needs_timetravel = true; needs_snapshot = true; @@ -1018,8 +1030,18 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, } } - /* if top-level modified catalog, it'll need a snapshot */ - if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo)) + /* + * If top-level modified catalog, it'll need a snapshot. + * + * If we're decoding changes on behalf of REPACK (CONCURRENTLY), only + * changes of the relation being processed are decoded - see + * heap_decode(). Thus any heap change we find here must belong to that + * relation. Add the transaction so that we can keep building snapshots to + * scan that relation. + */ + if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo) || + (AmRepackWorker() && + ReorderBufferXidHasHeapChanges(builder->reorder, xid))) { elog(DEBUG2, "found top level transaction %u, with catalog changes", xid); @@ -1187,7 +1209,7 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact xmin = running->oldestRunningXid; elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u", builder->xmin, builder->xmax, running->oldestRunningXid, xmin); - LogicalIncreaseXminForSlot(lsn, xmin); + LogicalIncreaseXminForSlot(lsn, xmin, true); /* * Also tell the slot where we can restart decoding from. We don't want to diff --git a/src/backend/replication/pgrepack/pgrepack.c b/src/backend/replication/pgrepack/pgrepack.c index 959551f5724..4b205bba241 100644 --- a/src/backend/replication/pgrepack/pgrepack.c +++ b/src/backend/replication/pgrepack/pgrepack.c @@ -30,7 +30,8 @@ static void repack_commit_txn(LogicalDecodingContext *ctx, static void repack_process_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change); static void repack_store_change(LogicalDecodingContext *ctx, Relation relation, - ConcurrentChangeKind kind, HeapTuple tuple); + ConcurrentChangeKind kind, HeapTuple tuple, + BlockNumber old_blknum); void _PG_output_plugin_init(OutputPluginCallbacks *cb) @@ -64,6 +65,9 @@ repack_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, dstate->change_cxt = AllocSetContextCreate(ctx->context, "REPACK - change", ALLOCSET_DEFAULT_SIZES); + dstate->snapshot_cxt = AllocSetContextCreate(ctx->context, + "REPACK - snapshot", + ALLOCSET_DEFAULT_SIZES); /* repack_setup_logical_decoding fills in the rest */ ctx->output_writer_private = dstate; @@ -133,7 +137,8 @@ repack_process_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (newtuple == NULL) elog(ERROR, "incomplete insert info"); - repack_store_change(ctx, relation, CHANGE_INSERT, newtuple); + repack_store_change(ctx, relation, CHANGE_INSERT, newtuple, + InvalidBlockNumber); } break; case REORDER_BUFFER_CHANGE_UPDATE: @@ -148,9 +153,11 @@ repack_process_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, elog(ERROR, "incomplete update info"); if (oldtuple != NULL) - repack_store_change(ctx, relation, CHANGE_UPDATE_OLD, oldtuple); + repack_store_change(ctx, relation, CHANGE_UPDATE_OLD, oldtuple, + InvalidBlockNumber); - repack_store_change(ctx, relation, CHANGE_UPDATE_NEW, newtuple); + repack_store_change(ctx, relation, CHANGE_UPDATE_NEW, newtuple, + change->data.tp.old_blknum); } break; case REORDER_BUFFER_CHANGE_DELETE: @@ -162,7 +169,8 @@ repack_process_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (oldtuple == NULL) elog(ERROR, "incomplete delete info"); - repack_store_change(ctx, relation, CHANGE_DELETE, oldtuple); + repack_store_change(ctx, relation, CHANGE_DELETE, oldtuple, + InvalidBlockNumber); } break; default: @@ -189,7 +197,8 @@ repack_process_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, */ static void repack_store_change(LogicalDecodingContext *ctx, Relation relation, - ConcurrentChangeKind kind, HeapTuple tuple) + ConcurrentChangeKind kind, HeapTuple tuple, + BlockNumber old_blknum) { RepackDecodingState *dstate; MemoryContext oldcxt; @@ -285,6 +294,9 @@ repack_store_change(LogicalDecodingContext *ctx, Relation relation, */ BufFileWrite(file, &tuple->t_len, sizeof(tuple->t_len)); BufFileWrite(file, tuple->t_data, tuple->t_len); + /* If old_blknum is specified, write it too. */ + if (old_blknum != InvalidBlockNumber) + BufFileWrite(file, &old_blknum, sizeof(old_blknum)); /* Then, write the number of external attributes we found. */ natt_ext = list_length(attrs_ext); diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index afaa058b046..59dec018de9 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -2528,6 +2528,16 @@ boot_val => 'true', }, +# TODO Tune boot_val, 1024 is probably too low. +{ name => 'repack_snapshot_after', type => 'int', context => 'PGC_USERSET', group => 'DEVELOPER_OPTIONS', + short_desc => 'Number of pages REPACK (CONCURRENTLY) can read using a single snapshot.', + flags => 'GUC_UNIT_BLOCKS | GUC_NOT_IN_SAMPLE', + variable => 'repack_pages_per_snapshot', + boot_val => '1024', + min => '1', + max => 'INT_MAX', +} + { name => 'reserved_connections', type => 'int', context => 'PGC_POSTMASTER', group => 'CONN_AUTH_SETTINGS', short_desc => 'Sets the number of connection slots reserved for roles with privileges of pg_use_reserved_connections.', variable => 'ReservedConnections', diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 290ccbc543e..8a48172ea57 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -44,6 +44,7 @@ #include "commands/async.h" #include "commands/extension.h" #include "commands/event_trigger.h" +#include "commands/repack.h" #include "commands/tablespace.h" #include "commands/trigger.h" #include "commands/user.h" diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index f2c36696bca..132248c5d43 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -666,12 +666,12 @@ typedef struct TableAmRoutine Relation OldIndex, bool use_sort, TransactionId OldestXmin, - Snapshot snapshot, TransactionId *xid_cutoff, MultiXactId *multi_cutoff, double *num_tuples, double *tups_vacuumed, - double *tups_recently_dead); + double *tups_recently_dead, + void *tableam_data); /* * React to VACUUM command on the relation. The VACUUM can be triggered by @@ -1733,8 +1733,6 @@ table_relation_copy_data(Relation rel, const RelFileLocator *newrlocator) * not needed for the relation's AM * - *xid_cutoff - ditto * - *multi_cutoff - ditto - * - snapshot - if != NULL, ignore data changes done by transactions that this - * (MVCC) snapshot considers still in-progress or in the future. * * Output parameters: * - *xid_cutoff - rel's new relfrozenxid value, may be invalid @@ -1747,19 +1745,19 @@ table_relation_copy_for_cluster(Relation OldTable, Relation NewTable, Relation OldIndex, bool use_sort, TransactionId OldestXmin, - Snapshot snapshot, TransactionId *xid_cutoff, MultiXactId *multi_cutoff, double *num_tuples, double *tups_vacuumed, - double *tups_recently_dead) + double *tups_recently_dead, + void *tableam_data) { OldTable->rd_tableam->relation_copy_for_cluster(OldTable, NewTable, OldIndex, use_sort, OldestXmin, - snapshot, xid_cutoff, multi_cutoff, num_tuples, tups_vacuumed, - tups_recently_dead); + tups_recently_dead, + tableam_data); } /* diff --git a/src/include/commands/repack.h b/src/include/commands/repack.h index 07f887e99f6..8af73f8c81f 100644 --- a/src/include/commands/repack.h +++ b/src/include/commands/repack.h @@ -17,11 +17,14 @@ #include "access/hio.h" #include "access/skey.h" +#include "access/xlogdefs.h" #include "nodes/execnodes.h" #include "nodes/parsenodes.h" #include "parser/parse_node.h" +#include "storage/block.h" #include "storage/lockdefs.h" #include "utils/relcache.h" +#include "utils/snapshot.h" /* flag bits for ClusterParams->options */ @@ -66,13 +69,14 @@ typedef struct RepackDest /* The latest column we need to deform to have the tuple identity */ AttrNumber last_key_attno; -} RepackDest; -/* - * The first file exported by the decoding worker must contain a snapshot, the - * following ones contain the data changes. - */ -#define WORKER_FILE_SNAPSHOT 0 + /* + * Range of blocks in the old table the contents of this table comes from. + * Note that range_end is the first block of the next range. + */ + BlockNumber range_start; + BlockNumber range_end; +} RepackDest; /* * Information needed to apply concurrent data changes. @@ -84,10 +88,42 @@ typedef struct ChangeContext /* The destination table. */ RepackDest cc_dest; - /* Sequential number of the file containing the changes. */ - int cc_file_seq; + /* Sequential number of the file containing snapshot. */ + int cc_file_seq_snapshot; + /* Sequential number of the file containing data changes. */ + int cc_file_seq_changes; + + /* + * Auxiliary table to store ordered tuples temporarily. + * + * When the new relation needs to be clustered, we use this table instead + * of tuplesort. The problem with a tuplesort is that data changes need to + * be applied at range boundary (see heapam_relation_copy_for_cluster() + * for more information), however it's not possible to look-up and change + * tuples in tuplestore. + * + * Once the contents of the REPACKed table has been copied into the + * auxiliary table, we build the clustering index (unless it's the same as + * the identity index) and scan it to get the tuple in the desired order. + * XXX Is it worth putting the contents into a tuplestore and sorting it? + * Not sure, it'd require disk space for one more copy and the copying + * itself is not free. + * + * TODO 1) make the tables unlogged, 2) if REPACK locks the TOAST relation + * too (not sure it does) try to preserve TOAST pointers, instead of + * storing them to TOAST relations of these tables, 3) Check that the + * tables are dropped on transaction abort. + */ + RepackDest *cc_dest_aux; + + /* + * The index that defines ordering of the old table. + */ + Oid cc_clustering_index; } ChangeContext; +extern PGDLLIMPORT int repack_pages_per_snapshot; + extern void ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel); extern void cluster_rel(RepackCommand cmd, Relation OldHeap, Oid indexOid, @@ -98,9 +134,8 @@ extern void mark_index_clustered(Relation rel, Oid indexOid, bool is_internal); extern Oid make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod, char relpersistence, LOCKMODE lockmode); -extern void heap_insert_for_repack(Relation rel, TupleTableSlot *src, - TupleTableSlot *reform, - BulkInsertStateData *bistate); +extern void heap_insert_for_repack(ChangeContext *chgcxt, TupleTableSlot *src, + TupleTableSlot *reform); extern bool tuple_needs_reform(HeapTuple tuple, TupleDesc tupDesc); extern void clear_dropped_attributes(HeapTuple tuple, TupleTableSlot *reform); extern void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, @@ -112,7 +147,12 @@ extern void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, TransactionId frozenXid, MultiXactId cutoffMulti, char newrelpersistence); - +extern Snapshot repack_get_snapshot(ChangeContext *chgcxt); +extern void repack_process_concurrent_changes(ChangeContext *chgcxt, + XLogRecPtr end_of_wal, + BlockNumber range_start, + BlockNumber range_end, + bool request_snapshot, bool done); extern void HandleRepackMessageInterrupt(void); extern void ProcessRepackMessages(void); diff --git a/src/include/commands/repack_internal.h b/src/include/commands/repack_internal.h index 42111aa4ae3..8003e999864 100644 --- a/src/include/commands/repack_internal.h +++ b/src/include/commands/repack_internal.h @@ -44,6 +44,8 @@ typedef struct RepackDecodingState /* Per-change memory context. */ MemoryContext change_cxt; + /* Per-snapshot memory context. */ + MemoryContext snapshot_cxt; /* A tuple slot used to pass tuples back and forth */ TupleTableSlot *slot; @@ -67,6 +69,9 @@ typedef struct DecodingWorkerShared /* Is the decoding initialized? */ bool initialized; + /* Set to request a snapshot. */ + bool snapshot_requested; + /* * Once the worker has reached this LSN, it should close the current * output file and either create a new one or exit, according to the field @@ -74,6 +79,8 @@ typedef struct DecodingWorkerShared * the WAL available and keep checking this field. It is ok if the worker * had already decoded records whose LSN is >= lsn_upto before this field * has been set. + * + * Set a valid LSN to request data changes. */ XLogRecPtr lsn_upto; @@ -84,7 +91,8 @@ typedef struct DecodingWorkerShared SharedFileSet sfs; /* Number of the last file exported by the worker. */ - int last_exported; + int last_exported_snapshot; + int last_exported_changes; /* Synchronize access to the fields above. */ slock_t mutex; @@ -116,7 +124,8 @@ typedef struct DecodingWorkerShared char error_queue[FLEXIBLE_ARRAY_MEMBER]; } DecodingWorkerShared; -extern void DecodingWorkerFileName(char *fname, Oid relid, uint32 seq); +extern void DecodingWorkerFileName(char *fname, Oid relid, uint32 seq, + bool snapshot); #endif /* REPACK_INTERNAL_H */ diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 6e0b7628001..37315a424e0 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -138,7 +138,7 @@ extern bool DecodingContextReady(LogicalDecodingContext *ctx); extern void FreeDecodingContext(LogicalDecodingContext *ctx); extern void LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, - TransactionId xmin); + TransactionId xmin, bool catalog); extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn); extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn); diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index ff825e4b7b2..cdefc4808df 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -104,6 +104,12 @@ typedef struct ReorderBufferChange HeapTuple oldtuple; /* valid for INSERT || UPDATE */ HeapTuple newtuple; + + /* + * valid for UPDATE - this is the physical location of the old + * tuple version, valid even if 'oldtuple' is NULL. + */ + BlockNumber old_blknum; } tp; /* @@ -763,6 +769,7 @@ extern void ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRe extern void ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn); extern bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid); +extern bool ReorderBufferXidHasHeapChanges(ReorderBuffer *rb, TransactionId xid); extern bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid); extern bool ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid, diff --git a/src/test/modules/injection_points/Makefile b/src/test/modules/injection_points/Makefile index c01d2fb095c..9c942599c49 100644 --- a/src/test/modules/injection_points/Makefile +++ b/src/test/modules/injection_points/Makefile @@ -15,6 +15,7 @@ REGRESS_OPTS = --dlpath=$(top_builddir)/src/test/regress ISOLATION = basic \ inplace \ repack \ + repack_snapshots \ repack_temporal \ repack_temporal_multirange \ repack_toast \ diff --git a/src/test/modules/injection_points/expected/repack_snapshots.out b/src/test/modules/injection_points/expected/repack_snapshots.out new file mode 100644 index 00000000000..e247b1e1946 --- /dev/null +++ b/src/test/modules/injection_points/expected/repack_snapshots.out @@ -0,0 +1,393 @@ +Parsed test spec with 2 sessions + +starting permutation: load repack change_new_beyond change_old_beyond check2 wakeup_new_range check1 +injection_points_attach +----------------------- + +(1 row) + +step load: + SELECT load(1); + +load +---- + +(1 row) + +step repack: + REPACK (CONCURRENTLY) repack_test; + +step change_new_beyond: + INSERT INTO repack_test + SELECT max(i) + 1, gen_external() + FROM repack_test + RETURNING tid_block(ctid); + + UPDATE repack_test SET j = gen_external() + WHERE ctid=(SELECT min(ctid) FROM repack_test) + RETURNING tid_block(OLD.ctid), tid_block(NEW.ctid); + +tid_block +--------- + 1 +(1 row) + +tid_block|tid_block +---------+--------- + 0| 1 +(1 row) + +step change_old_beyond: + DELETE FROM repack_test + WHERE ctid=(SELECT max(ctid) FROM repack_test) + RETURNING tid_block(ctid); + + UPDATE repack_test SET j = gen_external() + WHERE ctid=(SELECT max(ctid) FROM repack_test) + RETURNING tid_block(OLD.ctid), tid_block(NEW.ctid); + +tid_block +--------- + 1 +(1 row) + +tid_block|tid_block +---------+--------- + 1| 1 +(1 row) + +step check2: + INSERT INTO data_s2(i, j) + SELECT i, j FROM repack_test; + +step wakeup_new_range: + SELECT injection_points_wakeup('repack-concurrently-new-range'); + +injection_points_wakeup +----------------------- + +(1 row) + +step repack: <... completed> +step check1: + INSERT INTO data_s1(i, j) + SELECT i, j FROM repack_test; + + SELECT count(*) > 0 FROM repack_test; + + SELECT count(*) + FROM data_s1 d1 FULL JOIN data_s2 d2 USING (i, j) + WHERE d1.i ISNULL OR d2.i ISNULL; + +?column? +-------- +t +(1 row) + +count +----- + 0 +(1 row) + +injection_points_detach +----------------------- + +(1 row) + + +starting permutation: load2 load2_vacuum repack change_old_beyond2 check2 wakeup_new_range wakeup_new_range check1 +injection_points_attach +----------------------- + +(1 row) + +step load2: + SELECT load(2); + + DELETE FROM repack_test WHERE i < 100; + +load +---- + +(1 row) + +step load2_vacuum: + VACUUM repack_test; + +step repack: + REPACK (CONCURRENTLY) repack_test; + +step change_old_beyond2: + UPDATE repack_test SET j = gen_external() + WHERE ctid=(SELECT max(ctid) FROM repack_test WHERE tid_block(ctid) = 1) + RETURNING tid_block(OLD.ctid), tid_block(NEW.ctid); + +tid_block|tid_block +---------+--------- + 1| 0 +(1 row) + +step check2: + INSERT INTO data_s2(i, j) + SELECT i, j FROM repack_test; + +step wakeup_new_range: + SELECT injection_points_wakeup('repack-concurrently-new-range'); + +injection_points_wakeup +----------------------- + +(1 row) + +step wakeup_new_range: + SELECT injection_points_wakeup('repack-concurrently-new-range'); + +injection_points_wakeup +----------------------- + +(1 row) + +step repack: <... completed> +step check1: + INSERT INTO data_s1(i, j) + SELECT i, j FROM repack_test; + + SELECT count(*) > 0 FROM repack_test; + + SELECT count(*) + FROM data_s1 d1 FULL JOIN data_s2 d2 USING (i, j) + WHERE d1.i ISNULL OR d2.i ISNULL; + +?column? +-------- +t +(1 row) + +count +----- + 0 +(1 row) + +injection_points_detach +----------------------- + +(1 row) + + +starting permutation: load repack_pkey change_new_beyond change_old_beyond check2 wakeup_new_range check1 check1_order_asc +injection_points_attach +----------------------- + +(1 row) + +step load: + SELECT load(1); + +load +---- + +(1 row) + +step repack_pkey: + REPACK (CONCURRENTLY) repack_test USING INDEX repack_test_pkey; + +step change_new_beyond: + INSERT INTO repack_test + SELECT max(i) + 1, gen_external() + FROM repack_test + RETURNING tid_block(ctid); + + UPDATE repack_test SET j = gen_external() + WHERE ctid=(SELECT min(ctid) FROM repack_test) + RETURNING tid_block(OLD.ctid), tid_block(NEW.ctid); + +tid_block +--------- + 1 +(1 row) + +tid_block|tid_block +---------+--------- + 0| 1 +(1 row) + +step change_old_beyond: + DELETE FROM repack_test + WHERE ctid=(SELECT max(ctid) FROM repack_test) + RETURNING tid_block(ctid); + + UPDATE repack_test SET j = gen_external() + WHERE ctid=(SELECT max(ctid) FROM repack_test) + RETURNING tid_block(OLD.ctid), tid_block(NEW.ctid); + +tid_block +--------- + 1 +(1 row) + +tid_block|tid_block +---------+--------- + 1| 1 +(1 row) + +step check2: + INSERT INTO data_s2(i, j) + SELECT i, j FROM repack_test; + +step wakeup_new_range: + SELECT injection_points_wakeup('repack-concurrently-new-range'); + +injection_points_wakeup +----------------------- + +(1 row) + +step repack_pkey: <... completed> +step check1: + INSERT INTO data_s1(i, j) + SELECT i, j FROM repack_test; + + SELECT count(*) > 0 FROM repack_test; + + SELECT count(*) + FROM data_s1 d1 FULL JOIN data_s2 d2 USING (i, j) + WHERE d1.i ISNULL OR d2.i ISNULL; + +?column? +-------- +t +(1 row) + +count +----- + 0 +(1 row) + +step check1_order_asc: + SELECT i FROM repack_test LIMIT 10; + + i +-- + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 +10 +11 +(10 rows) + +injection_points_detach +----------------------- + +(1 row) + + +starting permutation: load repack_other_index change_new_beyond change_old_beyond check2 wakeup_new_range check1 check1_order_desc +injection_points_attach +----------------------- + +(1 row) + +step load: + SELECT load(1); + +load +---- + +(1 row) + +step repack_other_index: + REPACK (CONCURRENTLY) repack_test USING INDEX repack_test_i_idx; + +step change_new_beyond: + INSERT INTO repack_test + SELECT max(i) + 1, gen_external() + FROM repack_test + RETURNING tid_block(ctid); + + UPDATE repack_test SET j = gen_external() + WHERE ctid=(SELECT min(ctid) FROM repack_test) + RETURNING tid_block(OLD.ctid), tid_block(NEW.ctid); + +tid_block +--------- + 1 +(1 row) + +tid_block|tid_block +---------+--------- + 0| 1 +(1 row) + +step change_old_beyond: + DELETE FROM repack_test + WHERE ctid=(SELECT max(ctid) FROM repack_test) + RETURNING tid_block(ctid); + + UPDATE repack_test SET j = gen_external() + WHERE ctid=(SELECT max(ctid) FROM repack_test) + RETURNING tid_block(OLD.ctid), tid_block(NEW.ctid); + +tid_block +--------- + 1 +(1 row) + +tid_block|tid_block +---------+--------- + 1| 1 +(1 row) + +step check2: + INSERT INTO data_s2(i, j) + SELECT i, j FROM repack_test; + +step wakeup_new_range: + SELECT injection_points_wakeup('repack-concurrently-new-range'); + +injection_points_wakeup +----------------------- + +(1 row) + +step repack_other_index: <... completed> +step check1: + INSERT INTO data_s1(i, j) + SELECT i, j FROM repack_test; + + SELECT count(*) > 0 FROM repack_test; + + SELECT count(*) + FROM data_s1 d1 FULL JOIN data_s2 d2 USING (i, j) + WHERE d1.i ISNULL OR d2.i ISNULL; + +?column? +-------- +t +(1 row) + +count +----- + 0 +(1 row) + +step check1_order_desc: + WITH tmp(diff) as ( + SELECT i - lag(i, 1, 10000) OVER (ORDER BY ctid) + FROM repack_test + LIMIT 10) + SELECT * FROM tmp WHERE diff > -1; + +diff +---- +(0 rows) + +injection_points_detach +----------------------- + +(1 row) + diff --git a/src/test/modules/injection_points/meson.build b/src/test/modules/injection_points/meson.build index 59dba1cb023..d432a6b8f76 100644 --- a/src/test/modules/injection_points/meson.build +++ b/src/test/modules/injection_points/meson.build @@ -46,6 +46,7 @@ tests += { 'basic', 'inplace', 'repack', + 'repack_snapshots', 'repack_temporal', 'repack_temporal_multirange', 'repack_toast', diff --git a/src/test/modules/injection_points/specs/repack_snapshots.spec b/src/test/modules/injection_points/specs/repack_snapshots.spec new file mode 100644 index 00000000000..0706e25a6e7 --- /dev/null +++ b/src/test/modules/injection_points/specs/repack_snapshots.spec @@ -0,0 +1,235 @@ +# REPACK (CONCURRENTLY) - use one snapshot per block range. +setup +{ + CREATE EXTENSION injection_points; + + CREATE TABLE repack_test(i int PRIMARY KEY, j text); + CREATE INDEX ON repack_test(i DESC); + CREATE TABLE relfilenodes(node oid); + + CREATE TABLE data_s1(i int, j text); + CREATE TABLE data_s2(i int, j text); + + -- Keep inserting tuples into repack_test until several tuples need to + -- be inserted into block number last_block. + CREATE FUNCTION load(last_block int) + RETURNS void + LANGUAGE 'plpgsql' + AS $$ + DECLARE + cnt int; + BEGIN + INSERT INTO repack_test VALUES (1, gen_external()); + + LOOP + WITH max(m) AS (SELECT max(i) FROM repack_test) + INSERT INTO repack_test(i, j) + SELECT m + x, gen_external() + FROM generate_series(1, 100) s(x), max; + + SELECT count(*) + FROM repack_test WHERE tid_block(ctid) = last_block + INTO cnt; + + IF cnt >= 10 THEN + EXIT; + END IF; + END LOOP; + END; + $$; + + -- Generate a string of random characters that is not likely to be + -- compressed, but is big enough to be stored externally. + CREATE FUNCTION gen_external() + RETURNS text + LANGUAGE sql as $$ + SELECT string_agg(chr(65 + trunc(25 * random())::int), '') + FROM generate_series(1, 2048) s(x); + $$; +} + +teardown +{ + DROP TABLE repack_test; + DROP EXTENSION injection_points; + + DROP TABLE relfilenodes; + DROP TABLE data_s1; + DROP TABLE data_s2; + + DROP FUNCTION load(int); + DROP FUNCTION gen_external(); +} + +session s1 +setup +{ + SET repack_snapshot_after = 1; + + SELECT injection_points_set_local(); + SELECT injection_points_attach('repack-concurrently-new-range', 'wait'); +} +# The most practical way to test the corner cases is to set range size to 1 +# block. To initialize, insert new tuples until we have several tuples in the +# 2nd block. +step load +{ + SELECT load(1); +} +# Start the initial load and wait when the first range has been completed. +step repack +{ + REPACK (CONCURRENTLY) repack_test; +} +# The same, but with clustering. +step repack_pkey +{ + REPACK (CONCURRENTLY) repack_test USING INDEX repack_test_pkey; +} +# Clustering by other than the identity index. +step repack_other_index +{ + REPACK (CONCURRENTLY) repack_test USING INDEX repack_test_i_idx; +} +# Check the table from the perspective of s1. +step check1 +{ + INSERT INTO data_s1(i, j) + SELECT i, j FROM repack_test; + + SELECT count(*) > 0 FROM repack_test; + + SELECT count(*) + FROM data_s1 d1 FULL JOIN data_s2 d2 USING (i, j) + WHERE d1.i ISNULL OR d2.i ISNULL; +} +# Check the ordering where appropriate. We don't know the exact number of +# rows, so just check a sample. (The replayed concurrent changes are not +# ordered, but those shouldn't fit into the first 10 rows.) +step check1_order_asc +{ + SELECT i FROM repack_test LIMIT 10; +} +# Due to the special way of loading the data (see the load() function above) +# we don't know the maximum value. To make the test output deterministic, +# check for cases where the current row is not lower than the previous row. +step check1_order_desc +{ + WITH tmp(diff) as ( + SELECT i - lag(i, 1, 10000) OVER (ORDER BY ctid) + FROM repack_test + LIMIT 10) + SELECT * FROM tmp WHERE diff > -1; +} +teardown +{ + SELECT injection_points_detach('repack-concurrently-new-range'); +} + +session s2 +# Test processing of changes such that the new tuple is beyond the current +# range. Specifically for UPDATE, the old tuple should be in the current +# range. So when applying it, we have to convert it to DELETE. +step change_new_beyond +{ + INSERT INTO repack_test + SELECT max(i) + 1, gen_external() + FROM repack_test + RETURNING tid_block(ctid); + + UPDATE repack_test SET j = gen_external() + WHERE ctid=(SELECT min(ctid) FROM repack_test) + RETURNING tid_block(OLD.ctid), tid_block(NEW.ctid); +} +# Test processing of changes such that the old tuple is beyond the current +# range. The UPDATE puts also the new tuple beyond the current range. +step change_old_beyond +{ + DELETE FROM repack_test + WHERE ctid=(SELECT max(ctid) FROM repack_test) + RETURNING tid_block(ctid); + + UPDATE repack_test SET j = gen_external() + WHERE ctid=(SELECT max(ctid) FROM repack_test) + RETURNING tid_block(OLD.ctid), tid_block(NEW.ctid); +} +# Arrange for UPDATE to put the new tuples into block 0. In particular, fill +# the block 1 and delete some tuples from block 1. +step load2 +{ + SELECT load(2); + + DELETE FROM repack_test WHERE i < 100; +} +# Logically this belongs to the previous step, however VACUUM cannot run +# inside a transaction block. +step load2_vacuum +{ + VACUUM repack_test; +} +# This UPDATE should put the new tuple into block 0 (the current range). So +# when replaying it, we have to convert it to INSERT. That includes fetching +# the old tuple's TOAST from the TOAST table because the old tuple is not +# available during the replay. +step change_old_beyond2 +{ + UPDATE repack_test SET j = gen_external() + WHERE ctid=(SELECT max(ctid) FROM repack_test WHERE tid_block(ctid) = 1) + RETURNING tid_block(OLD.ctid), tid_block(NEW.ctid); +} +# Check the table from the perspective of s4. +step check2 +{ + INSERT INTO data_s2(i, j) + SELECT i, j FROM repack_test; +} +step wakeup_new_range +{ + SELECT injection_points_wakeup('repack-concurrently-new-range'); +} + +# Test if snapshots are used correctly to scan block ranges. +permutation + load + repack + change_new_beyond + change_old_beyond + check2 + wakeup_new_range + check1 + +# Special attention is needed to update tuple in block 1 so that the new tuple +# appears in block 0. The preparation includes VACUUM, which in turn cannot +# proceed while REPACK is in progress. That's why we need a separate +# permutation. Note that two wake-ups are needed as we have two range +# boundaries now. +permutation + load2 + load2_vacuum + repack + change_old_beyond2 + check2 + wakeup_new_range + wakeup_new_range + check1 + +# The first permutation with identity index as the clustering index. +permutation + load + repack_pkey + change_new_beyond + change_old_beyond + check2 + wakeup_new_range + check1 + check1_order_asc +# The first permutation with another clustering index. +permutation + load + repack_other_index + change_new_beyond + change_old_beyond + check2 + wakeup_new_range + check1 + check1_order_desc -- 2.52.0