From 8a0ee29c10e07598c52e7d3b3ae1f4075db89748 Mon Sep 17 00:00:00 2001 From: Antonin Houska Date: Tue, 16 Jun 2026 13:54:16 +0200 Subject: [PATCH 8/8] Make REPACK (CONCURRENTLY) MVCC-safe. In the initial implementation of REPACK (CONCURRENTLY), tuples in the new table are marked with XID of the transaction the command executes in. This is simpler to implement, but can cause surprising behavior: on specific conditions, the table contents may become invisible for transactions that should see it. (Both ALTER TABLE and TRUNCATE commands have this problem.) Besides that, as REPACK (CONCURRENTLY) needs to have XID assigned (to mark the new tuples) while it copies the data (which can take long time), it can restrict the progress of the xmin horizons for VACUUM quite a bit. This patch teaches REPACK (CONCURRENTLY) to transfer the visibility information from the old table to the new one. Thus there's no need to have XID assigned during the copying, and therefore VACUUM (of other tables) is not restricted anymore. A new type of snapshot is used to look for the existing tuples in the new table, in order to apply the concurrent data changes (i.e. changes performed by other transactions during the initial copying). This snapshot assumes that the new table does not contain any tuples created by aborted transactions. This is simpler and more efficient than building historic MVCC snapshots for the look-ups. One problem that needs special attention is tuple freezing. Besides page pruning code, freezing is currently implemented only in rewriteheap.c. Thus it seems simpler to teach REPACK (CONCURRENTLY) to copy the data using rewriteheap.c rather than implementing freezing of individual tuples independent from page pruning. (The latter would include WAL logging of the individual frozen tuples, whereas rewriteheap.c logs the whole page at once.) Disadvantage of this approach is that we only freeze tuples during the initial copying, while the data changes decoded from WAL are replayed without freezing. This patch also uses rewriteheap.c (in particular raw_heap_insert()) to insert TOAST tuples. That seems to be the easiest way to avoid getting XID assigned just to insert TOAST. However, due to not using a new XID, even TOAST needs to be frozen now - again, we use the existing code in rewriteheap.c. --- doc/src/sgml/mvcc.sgml | 12 +- doc/src/sgml/ref/repack.sgml | 9 - src/backend/access/common/toast_internals.c | 45 +- src/backend/access/heap/heapam.c | 84 +++- src/backend/access/heap/heapam_handler.c | 265 +++++++++-- src/backend/access/heap/heapam_visibility.c | 62 +++ src/backend/access/heap/heaptoast.c | 23 +- src/backend/access/heap/rewriteheap.c | 263 ++++++++--- src/backend/access/table/tableam.c | 1 + src/backend/access/table/toast_helper.c | 11 +- src/backend/access/transam/xloginsert.c | 31 +- src/backend/access/transam/xlogrecovery.c | 7 +- src/backend/catalog/indexing.c | 2 +- src/backend/commands/matview.c | 3 +- src/backend/commands/repack.c | 433 ++++++++++++------ src/backend/commands/tablecmds.c | 1 + src/backend/executor/nodeModifyTable.c | 1 + src/backend/replication/logical/decode.c | 110 ++++- .../replication/logical/reorderbuffer.c | 3 + .../utils/activity/wait_event_names.txt | 1 + src/include/access/heapam.h | 4 +- src/include/access/heaptoast.h | 7 +- src/include/access/rewriteheap.h | 13 +- src/include/access/tableam.h | 28 +- src/include/access/toast_helper.h | 14 +- src/include/access/toast_internals.h | 9 +- src/include/access/xlog_internal.h | 2 +- src/include/access/xloginsert.h | 1 + src/include/access/xlogrecord.h | 8 + src/include/commands/repack.h | 17 +- src/include/utils/snapmgr.h | 12 + src/include/utils/snapshot.h | 27 +- .../injection_points/expected/repack.out | 10 +- .../injection_points/specs/repack.spec | 23 +- 34 files changed, 1200 insertions(+), 342 deletions(-) diff --git a/doc/src/sgml/mvcc.sgml b/doc/src/sgml/mvcc.sgml index 241caeb3593..e775260936a 100644 --- a/doc/src/sgml/mvcc.sgml +++ b/doc/src/sgml/mvcc.sgml @@ -1845,17 +1845,15 @@ SELECT pg_advisory_lock(q.id) FROM Caveats - Some commands, currently only TRUNCATE, the - table-rewriting forms of ALTER - TABLE and REPACK with - the CONCURRENTLY option, are not + Some DDL commands, currently only TRUNCATE and the + table-rewriting forms of ALTER TABLE, are not MVCC-safe. This means that after the truncation or rewrite commits, the table will appear empty to concurrent transactions, if they are using a - snapshot taken before the command committed. This will only be an + snapshot taken before the DDL command committed. This will only be an issue for a transaction that did not access the table in question - before the command started — any transaction that has done so + before the DDL command started — any transaction that has done so would hold at least an ACCESS SHARE table lock, - which would block the truncating or rewriting command until that transaction completes. + which would block the DDL command until that transaction completes. So these commands will not cause any apparent inconsistency in the table contents for successive queries on the target table, but they could cause visible inconsistency between the contents of the target diff --git a/doc/src/sgml/ref/repack.sgml b/doc/src/sgml/ref/repack.sgml index 0cb72b6b289..acbd92ef79d 100644 --- a/doc/src/sgml/ref/repack.sgml +++ b/doc/src/sgml/ref/repack.sgml @@ -300,15 +300,6 @@ REPACK [ ( option [, ...] ) ] USING - - - - REPACK with the CONCURRENTLY - option is not MVCC-safe, see for - details. - - - diff --git a/src/backend/access/common/toast_internals.c b/src/backend/access/common/toast_internals.c index 77d42e7ed65..ba80fed0ca9 100644 --- a/src/backend/access/common/toast_internals.c +++ b/src/backend/access/common/toast_internals.c @@ -112,12 +112,15 @@ toast_compress_datum(Datum value, char cmethod) * rel: the main relation we're working with (not the toast rel!) * value: datum to be pushed to toast storage * oldexternal: if not NULL, toast pointer previously representing the datum + * rwstate: state needed for "raw insert". + * tup_main: the tuple whose attribute we're saving * options: options to be passed to heap_insert() for toast rows * ---------- */ Datum toast_save_datum(Relation rel, Datum value, - varlena *oldexternal, uint32 options) + varlena *oldexternal, RewriteState rwstate, + HeapTuple tup_main, uint32 options) { Relation toastrel; Relation *toastidxs; @@ -311,7 +314,40 @@ toast_save_datum(Relation rel, Datum value, toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull); - heap_insert(toastrel, toasttup, mycid, options, NULL); + if (rwstate == NULL) + { + /* + * With REUSE_XID we'd need regular freezing, however that is + * currently implemented only as a part of VACUUM or in + * rewriteheap.c. On the other hand, tuples containing a new XID + * can be marked as frozen in special cases - see the current uses + * of TABLE_INSERT_FROZEN. + */ + Assert((options & HEAP_INSERT_FROZEN) == 0 || + (options & TABLE_REUSE_XID) == 0); + + /* + * If an existing XID should be used, the entire visibility info + * of the TOAST tuple should be equal to that of corresponding + * tuple in the main table. + */ + if (options & TABLE_REUSE_XID) + rewrite_copy_visibility_info(toasttup, tup_main); + + heap_insert(toastrel, toasttup, mycid, options, NULL); + } + else + { + /* + * During heap rewrite, XID is always reused and the tuple is + * always frozen - we do not expect the user to tell us what to + * do. + */ + Assert((options & HEAP_INSERT_FROZEN) == 0 && + (options & TABLE_REUSE_XID) == 0); + + rewrite_heap_tuple_no_chains(rwstate, tup_main, toasttup, true); + } /* * Create the index entry. We cheat a little here by not using @@ -373,7 +409,8 @@ toast_save_datum(Relation rel, Datum value, * ---------- */ void -toast_delete_datum(Relation rel, Datum value, bool is_speculative) +toast_delete_datum(Relation rel, Datum value, bool is_speculative, + TransactionId xid) { varlena *attr = (varlena *) DatumGetPointer(value); varatt_external toast_pointer; @@ -425,7 +462,7 @@ toast_delete_datum(Relation rel, Datum value, bool is_speculative) if (is_speculative) heap_abort_speculative(toastrel, &toasttup->t_self); else - simple_heap_delete(toastrel, &toasttup->t_self); + simple_heap_delete(toastrel, &toasttup->t_self, xid); } /* diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index abfd8e8970a..35fa50d82b0 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -63,7 +63,7 @@ static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, Buffer newbuf, HeapTuple oldtup, HeapTuple newtup, HeapTuple old_key_tuple, bool all_visible_cleared, bool new_all_visible_cleared, - bool walLogical); + bool walLogical, TransactionId xid); #ifdef USE_ASSERT_CHECKING static void check_lock_if_inplace_updateable_rel(Relation relation, const ItemPointerData *otid, @@ -2004,7 +2004,7 @@ void heap_insert(Relation relation, HeapTuple tup, CommandId cid, uint32 options, BulkInsertState bistate) { - TransactionId xid = GetCurrentTransactionId(); + TransactionId xid; HeapTuple heaptup; Buffer buffer; Page page; @@ -2017,6 +2017,13 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, AssertHasSnapshotForToast(relation); + /* The caller might need to preserve the existing xmin. */ + if ((options & TABLE_REUSE_XID) == 0) + xid = GetCurrentTransactionId(); + else + xid = HeapTupleHeaderGetXmin(tup->t_data); + Assert(TransactionIdIsValid(xid)); + /* * Fill in tuple header fields and toast the tuple if necessary. * @@ -2159,6 +2166,13 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, /* filtering by origin on a row level is much more efficient */ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); + /* + * Even if we don't have XID assigned, valid XID is necessary for + * recovery and streaming replication to work. + */ + if (options & TABLE_REUSE_XID) + XLogSetRecordXid(xid); + recptr = XLogInsert(RM_HEAP_ID, info); PageSetLSN(page, recptr); @@ -2236,7 +2250,7 @@ heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, return tup; } else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD) - return heap_toast_insert_or_update(relation, tup, NULL, options); + return heap_toast_insert_or_update(relation, tup, NULL, NULL, options); else return tup; } @@ -2299,6 +2313,8 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, /* currently not needed (thus unsupported) for heap_multi_insert() */ Assert(!(options & HEAP_INSERT_NO_LOGICAL)); + /* Likewise. */ + Assert(!(options & TABLE_REUSE_XID)); AssertHasSnapshotForToast(relation); @@ -2715,11 +2731,11 @@ xmax_infomask_changed(uint16 new_infomask, uint16 old_infomask) */ TM_Result heap_delete(Relation relation, const ItemPointerData *tid, - CommandId cid, uint32 options, Snapshot crosscheck, + TransactionId xid, CommandId cid, uint32 options, + Snapshot crosscheck, bool wait, TM_FailureData *tmfd) { TM_Result result; - TransactionId xid = GetCurrentTransactionId(); ItemId lp; HeapTupleData tp; Page page; @@ -2736,11 +2752,14 @@ heap_delete(Relation relation, const ItemPointerData *tid, bool all_visible_cleared = false; HeapTuple old_key_tuple = NULL; /* replica identity of the tuple */ bool old_key_copied = false; + bool override_xid = false; Assert(ItemPointerIsValid(tid)); AssertHasSnapshotForToast(relation); + Assert((options & TABLE_REUSE_XID) == 0); + /* * Forbid this during a parallel operation, lest it allocate a combo CID. * Other workers might need that combo CID for visibility checks, and we @@ -2751,6 +2770,12 @@ heap_delete(Relation relation, const ItemPointerData *tid, (errcode(ERRCODE_INVALID_TRANSACTION_STATE), errmsg("cannot delete tuples during a parallel operation"))); + /* Caller can override the xid. */ + if (!TransactionIdIsValid(xid)) + xid = GetCurrentTransactionId(); + else + override_xid = true; + block = ItemPointerGetBlockNumber(tid); buffer = ReadBuffer(relation, block); page = BufferGetPage(buffer); @@ -3089,6 +3114,9 @@ l1: /* filtering by origin on a row level is much more efficient */ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); + /* See heap_insert() for details. */ + if (override_xid) + XLogSetRecordXid(xid); recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE); @@ -3115,7 +3143,7 @@ l1: Assert(!HeapTupleHasExternal(&tp)); } else if (HeapTupleHasExternal(&tp)) - heap_toast_delete(relation, &tp, false); + heap_toast_delete(relation, &tp, false, xid); /* * Mark tuple for invalidation from system caches at next command @@ -3148,14 +3176,18 @@ l1: * the target tuple are not expected (for example, because we have a lock * on the relation associated with the tuple). Any failure is reported * via ereport(). + * + * XXX Add simple_heap_delete_xid() so that the signature of this function can + * stay intact? */ void -simple_heap_delete(Relation relation, const ItemPointerData *tid) +simple_heap_delete(Relation relation, const ItemPointerData *tid, + TransactionId xid) { TM_Result result; TM_FailureData tmfd; - result = heap_delete(relation, tid, + result = heap_delete(relation, tid, xid, GetCurrentCommandId(true), 0, InvalidSnapshot, @@ -3204,7 +3236,7 @@ heap_update(Relation relation, const ItemPointerData *otid, HeapTuple newtup, TU_UpdateIndexes *update_indexes) { TM_Result result; - TransactionId xid = GetCurrentTransactionId(); + TransactionId xid; Bitmapset *hot_attrs; Bitmapset *sum_attrs; Bitmapset *key_attrs; @@ -3267,6 +3299,13 @@ heap_update(Relation relation, const ItemPointerData *otid, HeapTuple newtup, check_lock_if_inplace_updateable_rel(relation, otid, newtup); #endif + /* The caller might need to preserve the existing xmin. */ + if ((options & TABLE_REUSE_XID) == 0) + xid = GetCurrentTransactionId(); + else + xid = HeapTupleHeaderGetXmin(newtup->t_data); + Assert(TransactionIdIsValid(xid)); + /* * Fetch the list of attributes to be checked for various operations. * @@ -3871,7 +3910,8 @@ l2: if (need_toast) { /* Note we always use WAL and FSM during updates */ - heaptup = heap_toast_insert_or_update(relation, newtup, &oldtup, 0); + heaptup = heap_toast_insert_or_update(relation, newtup, &oldtup, + NULL, options); newtupsize = MAXALIGN(heaptup->t_len); } else @@ -4099,7 +4139,9 @@ l2: old_key_tuple, all_visible_cleared, all_visible_cleared_new, - walLogical); + walLogical, + options & TABLE_REUSE_XID ? xid : InvalidTransactionId); + if (newbuf != buffer) { PageSetLSN(newpage, recptr); @@ -5297,7 +5339,9 @@ compute_new_xmax_infomask(TransactionId xmax, uint16 old_infomask, uint16 new_infomask, new_infomask2; - Assert(TransactionIdIsCurrentTransactionId(add_to_xmax)); + /* REPACK (CONCURRENTLY) might not have XID assigned. */ + Assert(TransactionIdIsCurrentTransactionId(add_to_xmax) || + !TransactionIdIsValid(GetTopTransactionIdIfAny())); l5: new_infomask = 0; @@ -6269,7 +6313,9 @@ heap_abort_speculative(Relation relation, const ItemPointerData *tid) if (HeapTupleHasExternal(&tp)) { Assert(!IsToastRelation(relation)); - heap_toast_delete(relation, &tp, true); + /* XID overriding is not needed for speculative abort. */ + Assert(TransactionIdIsValid(GetCurrentTransactionIdIfAny())); + heap_toast_delete(relation, &tp, true, GetCurrentTransactionId()); } /* @@ -6695,7 +6741,8 @@ FreezeMultiXactId(MultiXactId multi, uint16 t_infomask, pagefrz->freeze_required = true; return InvalidTransactionId; } - else if (MultiXactIdPrecedes(multi, cutoffs->relminmxid)) + else if (MultiXactIdIsValid(cutoffs->relminmxid) && + MultiXactIdPrecedes(multi, cutoffs->relminmxid)) ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), errmsg_internal("found multixact %u from before relminmxid %u", @@ -7199,7 +7246,8 @@ heap_prepare_freeze_tuple(HeapTupleHeader tuple, else if (TransactionIdIsNormal(xid)) { /* Raw xmax is normal XID */ - if (TransactionIdPrecedes(xid, cutoffs->relfrozenxid)) + if (TransactionIdIsValid(cutoffs->relfrozenxid) && + TransactionIdPrecedes(xid, cutoffs->relfrozenxid)) ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), errmsg_internal("found xmax %u from before relfrozenxid %u", @@ -8776,7 +8824,7 @@ log_heap_update(Relation reln, Buffer oldbuf, Buffer newbuf, HeapTuple oldtup, HeapTuple newtup, HeapTuple old_key_tuple, bool all_visible_cleared, bool new_all_visible_cleared, - bool walLogical) + bool walLogical, TransactionId xid) { xl_heap_update xlrec; xl_heap_header xlhdr; @@ -8983,6 +9031,10 @@ log_heap_update(Relation reln, Buffer oldbuf, /* filtering by origin on a row level is much more efficient */ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); + /* See heap_insert() for details. */ + if (TransactionIdIsValid(xid)) + XLogSetRecordXid(xid); + recptr = XLogInsert(RM_HEAP_ID, info); return recptr; diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index d402e51e6a1..7a7531ecb32 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -50,11 +50,24 @@ #include "utils/injection_point.h" #include "utils/rel.h" #include "utils/tuplesort.h" - -static Snapshot finalize_block_range(ChangeContext *chgcxt, BlockNumber cur, - BlockNumber start, BlockNumber *end_p); +#include "utils/wait_event_types.h" + +static void update_identity_index(ChangeContext *chgcxt, + BlockNumber range_start, + BlockNumber range_end); +static Snapshot finalize_block_range(Relation rel_old, Relation rel_dst, + TransactionId oldest_xmin, + TransactionId xid_cutoff, + MultiXactId multi_cutoff, + ChangeContext *chgcxt, BlockNumber cur, + BlockNumber start, BlockNumber *end_p, + RewriteState *rwstate_p, + BlockNumber *range_start_dst_p); static void reform_and_rewrite_tuple(TupleTableSlot *src, TupleTableSlot *reform, RewriteState rwstate); +static void heap_insert_for_repack(ChangeContext *chgcxt, TupleTableSlot *src, + TupleTableSlot *reform, + RewriteState rwstate); static bool SampleHeapTupleVisible(TableScanDesc scan, Buffer buffer, HeapTuple tuple, @@ -205,7 +218,8 @@ heapam_tuple_complete_speculative(Relation relation, TupleTableSlot *slot, } static TM_Result -heapam_tuple_delete(Relation relation, ItemPointer tid, CommandId cid, +heapam_tuple_delete(Relation relation, ItemPointer tid, TransactionId xid, + CommandId cid, uint32 options, Snapshot snapshot, Snapshot crosscheck, bool wait, TM_FailureData *tmfd) { @@ -214,7 +228,7 @@ heapam_tuple_delete(Relation relation, ItemPointer tid, CommandId cid, * the storage itself is cleaning the dead tuples by itself, it is the * time to call the index tuple deletion also. */ - return heap_delete(relation, tid, cid, options, crosscheck, wait, + return heap_delete(relation, tid, xid, cid, options, crosscheck, wait, tmfd); } @@ -616,25 +630,30 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, Snapshot snapshot = NULL; BlockNumber range_start = InvalidBlockNumber; BlockNumber range_end = InvalidBlockNumber; + BlockNumber range_start_new = InvalidBlockNumber; + BlockNumber range_end_new = InvalidBlockNumber; + Relation rel_dst; /* Remember if it's a system catalog */ is_system_catalog = IsSystemRelation(OldHeap); + /* Determine the destination for the tuples. */ + if (!concurrent || chgcxt->cc_dest_aux == NULL) + rel_dst = NewHeap; + else + rel_dst = chgcxt->cc_dest_aux->rel; + /* * Valid smgr_targblock implies something already wrote to the relation. * This may be harmless, but this function hasn't planned for it. */ - Assert(RelationGetTargetBlock(NewHeap) == InvalidBlockNumber); + Assert(RelationGetTargetBlock(rel_dst) == InvalidBlockNumber); /* - * In non-concurrent mode, initialize the rewrite operation. This is not - * needed in concurrent mode. + * Initialize the rewrite operation. */ - if (!concurrent) - rwstate = begin_heap_rewrite(OldHeap, NewHeap, OldestXmin, - *xid_cutoff, *multi_cutoff); - else - rwstate = NULL; + rwstate = begin_heap_rewrite(OldHeap, rel_dst, OldestXmin, *xid_cutoff, + *multi_cutoff, concurrent); /* * Set up sorting if wanted. CONCURRENTLY sorts the tuple w/o tuplesort, @@ -706,6 +725,7 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, { range_start = heapScan->rs_startblock; range_end = range_start + repack_pages_per_snapshot; + range_start_new = RelationGetNumberOfBlocks(rel_dst); } } @@ -723,24 +743,40 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, InvalidateCatalogSnapshot(); /* - * As there is no snapshot, our xmin should be invalid now. - * - * XXX xid can still be valid. The next patches in the series fix - * that. + * As there is no snapshot, our xmin should be invalid now. xid should + * be invalid too because our transactions didn't have to do any + * writes yet. */ Assert(!TransactionIdIsValid(MyProc->xmin)); + Assert(!TransactionIdIsValid(MyProc->xid)); /* * Wait until the worker has the initial snapshot and retrieve it. */ snapshot = repack_get_snapshot(chgcxt); + chgcxt->cc_last_snapshot_xmin = snapshot->xmin; + + /* + * Since we currently do not freeze tuples when replaying data + * changes, the replayed transactions should not precede the new value + * of relfrozenxid. (Only transactions having XID >= snapshot->xmin + * will be replayed.) + * + * Regarding multixacts, the corresponding cutoff is probably not + * trivial to determine, however there should not be any multixacts in + * the new relation at all: we do not replay tuple locking records an + * that's ok because the tuple locks should no longer exist at the + * moment we acquire AccessExclusiveLock on the old relation. + */ + if (TransactionIdFollows(*xid_cutoff, snapshot->xmin)) + *xid_cutoff = snapshot->xmin; PushActiveSnapshot(snapshot); } /* * Scan through the OldHeap, either in OldIndex order or sequentially; - * copy each tuple into the NewHeap, or transiently to the tuplesort + * copy each tuple into the rel_dst, or transiently to the tuplesort * module. Note that we don't bother sorting dead tuples (they won't get * to the new table anyway). */ @@ -911,8 +947,12 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, /* End of the current range or wraparound? */ if (blkno >= range_end || blkno < range_start) - snapshot = finalize_block_range(chgcxt, blkno, range_start, - &range_end); + snapshot = finalize_block_range(OldHeap, rel_dst, + OldestXmin, *xid_cutoff, + *multi_cutoff, + chgcxt, blkno, range_start, + &range_end, &rwstate, + &range_start_new); /* Finally check the tuple visibility. */ LockBuffer(buf, BUFFER_LOCK_SHARE); @@ -948,7 +988,7 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, if (!concurrent) reform_and_rewrite_tuple(slot, reform_slot, rwstate); else - heap_insert_for_repack(chgcxt, slot, reform_slot); + heap_insert_for_repack(chgcxt, slot, reform_slot, rwstate); /* * In indexscan mode and also VACUUM FULL, report increase in @@ -964,6 +1004,13 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, { XLogRecPtr end_of_wal; + /* Write out any remaining tuples, and fsync if needed */ + end_heap_rewrite(rwstate); + + /* Finalize the last range in the new relation. */ + range_end_new = RelationGetNumberOfBlocks(rel_dst); + update_identity_index(chgcxt, range_start_new, range_end_new); + /* * Process the changes belonging to the last range. */ @@ -975,11 +1022,16 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, /* * There was an active transaction snapshot on entry, so push one - * before return. + * before return. While there is no active snapshot, also invalidate + * the catalog snapshot so that the xmin horizons for VACUUM can + * advance. */ PopActiveSnapshot(); + InvalidateCatalogSnapshot(); + Assert(!TransactionIdIsValid(MyProc->xmin)); + Assert(!TransactionIdIsValid(MyProc->xid)); + Assert(!HaveRegisteredOrActiveSnapshot()); PushActiveSnapshot(GetTransactionSnapshot()); - } if (indexScan != NULL) @@ -1046,9 +1098,70 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, ExecDropSingleTupleTableSlot(reform_slot); - /* Write out any remaining tuples, and fsync if needed */ - if (rwstate) + if (!concurrent) + { + /* + * In the CONCURRENTLY case, we had to do away with the rewrite state + * earlier. + */ end_heap_rewrite(rwstate); + } +} + +/* + * Scan range of the new / auxiliary relation and insert all the tuples into + * the identity index. range_end is the first block of the next range. + */ +static void +update_identity_index(ChangeContext *chgcxt, BlockNumber range_start, + BlockNumber range_end) +{ + RepackDest *dest; + BlockNumber first_block, + last_block; + ItemPointerData mintid, + maxtid; + TableScanDesc scan; + TupleTableSlot *slot; + + /* + * If an auxiliary table exists, it's the one we're copying the data into. + */ + if (chgcxt->cc_dest_aux) + dest = chgcxt->cc_dest_aux; + else + dest = &chgcxt->cc_dest; + + first_block = range_start; + + if (range_end > range_start) + last_block = range_end - 1; + else + { + Assert(range_start == range_end); + + last_block = range_end; + } + + ItemPointerSet(&mintid, first_block, FirstOffsetNumber); + ItemPointerSet(&maxtid, last_block, MaxOffsetNumber); + + /* XXX flags? */ + scan = table_beginscan_tidrange(dest->rel, SnapshotAny, &mintid, &maxtid, + 0); + slot = table_slot_create(dest->rel, NULL); + while (table_scan_getnextslot(scan, ForwardScanDirection, slot)) + ExecInsertIndexTuples(dest->rri, dest->estate, 0, slot, NIL, NULL); + ExecDropSingleTupleTableSlot(slot); + table_endscan(scan); + + /* + * Index insertion could have accessed catalog when checking constraints, + * so make sure that we no longer block the progress of xmin horizons for + * VACUUM. (It's ok to have a snapshot throughout range scan, so there's + * no point in doing this invalidation more often.) + */ + InvalidateCatalogSnapshot(); } /* @@ -1061,20 +1174,47 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, * first block beyond the new range. * * Return the snapshot for the scan of the new range. + * + * TODO Consider a structure to accommodate (most of) the arguments. */ static Snapshot -finalize_block_range(ChangeContext *chgcxt, BlockNumber cur, - BlockNumber start, BlockNumber *end_p) +finalize_block_range(Relation rel_old, Relation rel_dst, + TransactionId oldest_xmin, + TransactionId xid_cutoff, + MultiXactId multi_cutoff, + ChangeContext *chgcxt, BlockNumber cur, + BlockNumber start, BlockNumber *end_p, + RewriteState *rwstate_p, + BlockNumber *range_start_dst_p) { BlockNumber end = *end_p; + RewriteState rwstate = *rwstate_p; XLogRecPtr end_of_wal; Snapshot snapshot; + BlockNumber range_end_dst; /* * Wait here when testing how snapshot is changed at page boundary. */ INJECTION_POINT("repack-concurrently-new-range", NULL); + /* + * Make sure the data is flushed to file before changes can be applied to + * it. (Nothing of it should be in shared buffers so far.) + */ + end_heap_rewrite(rwstate); + + /* + * Now that the data can be accessed via shared buffers, scan it and add + * each tuple to the identity index - this is necessary to update the + * concurrent data changes below. We could not do that earlier because + * even insertion into index might need to fetch heap tuples, in order to + * check unique or exclusion constraints. + */ + range_end_dst = RelationGetNumberOfBlocks(rel_dst); + update_identity_index(chgcxt, *range_start_dst_p, range_end_dst); + *range_start_dst_p = range_end_dst; + /* * Decode all the concurrent data changes committed so far before * requesting the next snapshot - these changes are applicable on top of @@ -1107,6 +1247,7 @@ finalize_block_range(ChangeContext *chgcxt, BlockNumber cur, /* See above. */ Assert(!TransactionIdIsValid(MyProc->xmin)); + Assert(!TransactionIdIsValid(MyProc->xid)); /* * XXX It might be worth Assert(CatalogSnapshot == NULL) here, however @@ -1129,8 +1270,20 @@ finalize_block_range(ChangeContext *chgcxt, BlockNumber cur, * next batch of decoded changes. */ snapshot = repack_get_snapshot(chgcxt); + + Assert(TransactionIdFollowsOrEquals(snapshot->xmin, + chgcxt->cc_last_snapshot_xmin)); + chgcxt->cc_last_snapshot_xmin = snapshot->xmin; + PushActiveSnapshot(snapshot); + /* + * Prepare for bulk insert of the next set of tuples. We rely on it to + * start on a new page, even if the last existing page is not full. + */ + *rwstate_p = begin_heap_rewrite(rel_old, rel_dst, oldest_xmin, xid_cutoff, + multi_cutoff, true); + return snapshot; } @@ -2560,6 +2713,64 @@ reform_and_rewrite_tuple(TupleTableSlot *src, TupleTableSlot *reform, heap_freetuple(newtuple); } +/* + * Insert tuple when processing REPACK CONCURRENTLY. + * + * 'reform' is a slot to use for tuple "reforming", typically to get set + * values of dropped columns to NULL. + * + * We pass the NO_LOGICAL flag to heap_insert() in order to skip logical + * decoding: as soon as REPACK CONCURRENTLY swaps the relation files, it drops + * this relation, so no logical replication subscription should need the data. + */ +static void +heap_insert_for_repack(ChangeContext *chgcxt, TupleTableSlot *src, + TupleTableSlot *reform, RewriteState rwstate) +{ + HeapTuple tuple, + new_tuple; + TransactionId xid; + bool shouldFree, + shouldFreeNew; + TupleTableSlot *slot; + bool freeze; + + if (chgcxt->cc_dest_aux) + { + /* Will freeze when copying data to the new table. */ + freeze = false; + } + else + freeze = true; + + Assert(TTS_IS_BUFFERTUPLE(src)); + tuple = ExecFetchSlotHeapTuple(src, false, &shouldFree); + xid = HeapTupleHeaderGetXmin(tuple->t_data); + if (reform != NULL && tuple_needs_reform(tuple, src->tts_tupleDescriptor)) + { + clear_dropped_attributes(tuple, reform); + slot = reform; + } + else + slot = src; + + /* Make sure we have a copy of the tuple, and set its XID. */ + new_tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFreeNew); + HeapTupleHeaderSetXmin(new_tuple->t_data, xid); + + /* Perform the insertion. */ + rewrite_heap_tuple_no_chains(rwstate, tuple, new_tuple, freeze); + + /* The insertion shouldn't have caused XID assignment. */ + Assert(!TransactionIdIsValid(GetCurrentTransactionIdIfAny())); + + /* Cleanup. */ + if (shouldFree) + heap_freetuple(tuple); + if (shouldFreeNew) + heap_freetuple(new_tuple); +} + /* * Check visibility of the tuple. */ diff --git a/src/backend/access/heap/heapam_visibility.c b/src/backend/access/heap/heapam_visibility.c index 361b76e5065..f37e4848667 100644 --- a/src/backend/access/heap/heapam_visibility.c +++ b/src/backend/access/heap/heapam_visibility.c @@ -1363,6 +1363,65 @@ HeapTupleSatisfiesNonVacuumable(HeapTuple htup, Snapshot snapshot, return res != HEAPTUPLE_DEAD; } +/* + * HeapTupleSatisfiesNewHeap + * Consider all transactions committed or current. + * + * See SNAPSHOT_NEW_HEAP's definition for the intended behaviour. + * + */ +static bool +HeapTupleSatisfiesNewHeap(HeapTuple htup, Snapshot snapshot, Buffer buffer) +{ + HeapTupleHeader tuple = htup->t_data; + + Assert(ItemPointerIsValid(&htup->t_self)); + Assert(htup->t_tableOid != InvalidOid); + + /* xmin should always be there. */ + Assert(TransactionIdIsValid(HeapTupleHeaderGetXmin(tuple))); + + /* + * No one should have the chance to set XMIN_INVALID until REPACK has + * finished, and we do not need it. + */ + Assert(!HeapTupleHeaderXminInvalid(tuple)); + + /* + * Unlike that, XMIN_COMMITTED might have been set earlier by REPACK + * itself, but we don't need it here. + */ + + /* + * Set XMIN_COMMITTED to make the next checks (by any snapshot) faster. + * + * TODO Set the flag in the initial load and in + * apply_concurrent_changes(). + */ + SetHintBits(tuple, buffer, HEAP_XMIN_COMMITTED, + HeapTupleHeaderGetRawXmax(tuple)); + + /* Inserted tuples have XMAX_INVALID set. */ + if (tuple->t_infomask & HEAP_XMAX_INVALID) + return true; + + /* + * REPACK could have set XMAX_COMMITTED during UPDATE or DELETE, or below. + */ + if (tuple->t_infomask & HEAP_XMAX_COMMITTED) + return false; + + if (!TransactionIdIsValid(HeapTupleHeaderGetRawXmax(tuple))) + return true; + + /* + * Set XMAX_COMMITTED to make the next checks faster. + */ + SetHintBits(tuple, buffer, HEAP_XMAX_COMMITTED, + HeapTupleHeaderGetRawXmax(tuple)); + + return false; +} /* * HeapTupleIsSurelyDead @@ -1747,6 +1806,9 @@ HeapTupleSatisfiesVisibility(HeapTuple htup, Snapshot snapshot, Buffer buffer) return HeapTupleSatisfiesHistoricMVCC(htup, snapshot, buffer); case SNAPSHOT_NON_VACUUMABLE: return HeapTupleSatisfiesNonVacuumable(htup, snapshot, buffer); + case SNAPSHOT_NEW_HEAP: + return HeapTupleSatisfiesNewHeap(htup, snapshot, buffer); + } return false; /* keep compiler quiet */ diff --git a/src/backend/access/heap/heaptoast.c b/src/backend/access/heap/heaptoast.c index 03f885a25b0..4a6a674b80e 100644 --- a/src/backend/access/heap/heaptoast.c +++ b/src/backend/access/heap/heaptoast.c @@ -40,7 +40,8 @@ * ---------- */ void -heap_toast_delete(Relation rel, HeapTuple oldtup, bool is_speculative) +heap_toast_delete(Relation rel, HeapTuple oldtup, bool is_speculative, + TransactionId xid) { TupleDesc tupleDesc; Datum toast_values[MaxHeapAttributeNumber]; @@ -70,7 +71,8 @@ heap_toast_delete(Relation rel, HeapTuple oldtup, bool is_speculative) heap_deform_tuple(oldtup, tupleDesc, toast_values, toast_isnull); /* Do the real work. */ - toast_delete_external(rel, toast_values, toast_isnull, is_speculative); + toast_delete_external(rel, toast_values, toast_isnull, is_speculative, + xid); } @@ -84,6 +86,7 @@ heap_toast_delete(Relation rel, HeapTuple oldtup, bool is_speculative) * newtup: the candidate new tuple to be inserted * oldtup: the old row version for UPDATE, or NULL for INSERT * options: options to be passed to heap_insert() for toast rows + * rwstate: if valid, use raw_heap_insert() * Result: * either newtup if no toasting is needed, or a palloc'd modified tuple * that is what should actually get stored @@ -94,7 +97,7 @@ heap_toast_delete(Relation rel, HeapTuple oldtup, bool is_speculative) */ HeapTuple heap_toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup, - uint32 options) + RewriteState rwstate, uint32 options) { HeapTuple result_tuple; TupleDesc tupleDesc; @@ -109,6 +112,7 @@ heap_toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup, Datum toast_oldvalues[MaxHeapAttributeNumber]; ToastAttrInfo toast_attr[MaxHeapAttributeNumber]; ToastTupleContext ttc; + TransactionId xid; /* * Ignore the INSERT_SPECULATIVE option. Speculative insertions/super @@ -156,6 +160,13 @@ heap_toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup, ttc.ttc_attr = toast_attr; toast_tuple_init(&ttc); + /* + * raw_heap_insert() may be needed for insertion into the TOAST table. In + * that case, visibility information will be retrieved from 'newtup'. + */ + ttc.ttc_rwstate = rwstate; + ttc.ttc_tup_main = newtup; + /* ---------- * Compress and/or save external until data fits into target length * @@ -330,7 +341,11 @@ heap_toast_insert_or_update(Relation rel, HeapTuple newtup, HeapTuple oldtup, else result_tuple = newtup; - toast_tuple_cleanup(&ttc); + if (options & TABLE_REUSE_XID) + xid = HeapTupleHeaderGetXmin(newtup->t_data); + else + xid = InvalidTransactionId; /* The current transaction. */ + toast_tuple_cleanup(&ttc, xid); return result_tuple; } diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c index 5a5398a76ae..6d7d0f48d86 100644 --- a/src/backend/access/heap/rewriteheap.c +++ b/src/backend/access/heap/rewriteheap.c @@ -107,6 +107,7 @@ #include "access/heapam.h" #include "access/heapam_xlog.h" #include "access/heaptoast.h" +#include "access/multixact.h" #include "access/rewriteheap.h" #include "access/transam.h" #include "access/xact.h" @@ -119,6 +120,7 @@ #include "storage/bufmgr.h" #include "storage/bulk_write.h" #include "storage/fd.h" +#include "storage/lmgr.h" #include "storage/procarray.h" #include "utils/memutils.h" #include "utils/rel.h" @@ -151,6 +153,12 @@ typedef struct RewriteStateData HTAB *rs_old_new_tid_map; /* unmatched B tuples */ HTAB *rs_logical_mappings; /* logical remapping files */ uint32 rs_num_rewrite_mappings; /* # in memory mappings */ + + /* + * If this is initialized, raw_heap_insert() is also used for TOAST + * relation. + */ + struct RewriteStateData *toast; } RewriteStateData; /* @@ -211,6 +219,11 @@ typedef struct RewriteMappingDataEntry /* prototypes for internal functions */ +static RewriteState begin_heap_rewrite_common(Relation old_heap, + Relation new_heap, + TransactionId oldest_xmin, + TransactionId freeze_xid, + MultiXactId cutoff_multi); static void raw_heap_insert(RewriteState state, HeapTuple tup); /* internal logical remapping prototypes */ @@ -227,18 +240,19 @@ static void logical_end_heap_rewrite(RewriteState state); * oldest_xmin xid used by the caller to determine which tuples are dead * freeze_xid xid before which tuples will be frozen * cutoff_multi multixact before which multis will be removed + * no_chains only raw insert (and freezing), do not care of HOT chains * * Returns an opaque RewriteState, allocated in current memory context, * to be used in subsequent calls to the other functions. */ RewriteState begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xmin, - TransactionId freeze_xid, MultiXactId cutoff_multi) + TransactionId freeze_xid, MultiXactId cutoff_multi, + bool no_chains) { - RewriteState state; MemoryContext rw_cxt; MemoryContext old_cxt; - HASHCTL hash_ctl; + RewriteState state; /* * To ease cleanup, make a separate context that will contain the @@ -249,9 +263,99 @@ begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xm ALLOCSET_DEFAULT_SIZES); old_cxt = MemoryContextSwitchTo(rw_cxt); + state = begin_heap_rewrite_common(old_heap, new_heap, oldest_xmin, + freeze_xid, cutoff_multi); + state->rs_cxt = rw_cxt; + + if (!no_chains) + { + HASHCTL hash_ctl; + + /* Initialize hash tables used to track update chains */ + hash_ctl.keysize = sizeof(TidHashKey); + hash_ctl.entrysize = sizeof(UnresolvedTupData); + hash_ctl.hcxt = state->rs_cxt; + + state->rs_unresolved_tups = + hash_create("Rewrite / Unresolved ctids", + 128, /* arbitrary initial size */ + &hash_ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + + hash_ctl.entrysize = sizeof(OldToNewMappingData); + + state->rs_old_new_tid_map = + hash_create("Rewrite / Old to new tid map", + 128, /* arbitrary initial size */ + &hash_ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + + logical_begin_heap_rewrite(state); + } + else + { + Oid toastid; + + /* + * The current user of this mode, REPACK (CONCURRENTLY), does not want + * XID assigned at all, so "raw insert" is also used for TOAST. + * + * XXX "raw insert" would be helpful for REPACK w/o CONCURRENTLY too, + * as it writes the whole pages to WAL. The problem here is that it + * might check the existence of chunk OIDs in the new TOAST relation + * (see the hacks with rd_toastoid in toast_save_datum()), and that + * doesn't work while the rewrite is still in progress: the relation + * pages are not guaranteed to be flushed to disk (and read into + * shared buffers) before the end of the rewrite. + */ + toastid = new_heap->rd_rel->reltoastrelid; + if (OidIsValid(toastid)) + { + Oid toastid_old; + Relation toast_rel; + Relation toast_rel_old = NULL; + + /* New relation's TOAST should already be locked. */ + Assert(CheckRelationOidLockedByMe(toastid, AccessExclusiveLock, + false)); + toast_rel = table_open(toastid, NoLock); + + toastid_old = old_heap ? old_heap->rd_rel->reltoastrelid : + InvalidTransactionId; + if (OidIsValid(toastid_old)) + { + /* + * Currently we do not lock the old relation's TOAST. Use the + * same lock mode we use for the parent relation. + */ + toast_rel_old = table_open(toastid_old, + ShareUpdateExclusiveLock); + } + + /* Create the state for TOAST insertions. */ + state->toast = begin_heap_rewrite_common(toast_rel_old, + toast_rel, + oldest_xmin, + freeze_xid, + cutoff_multi); + } + } + + MemoryContextSwitchTo(old_cxt); + + return state; +} + +static RewriteState +begin_heap_rewrite_common(Relation old_heap, Relation new_heap, + TransactionId oldest_xmin, + TransactionId freeze_xid, MultiXactId cutoff_multi) + +{ + RewriteState state; + /* Create and fill in the state struct */ state = palloc0_object(RewriteStateData); - state->rs_old_rel = old_heap; state->rs_new_rel = new_heap; state->rs_buffer = NULL; @@ -260,32 +364,8 @@ begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xm state->rs_oldest_xmin = oldest_xmin; state->rs_freeze_xid = freeze_xid; state->rs_cutoff_multi = cutoff_multi; - state->rs_cxt = rw_cxt; state->rs_bulkstate = smgr_bulk_start_rel(new_heap, MAIN_FORKNUM); - /* Initialize hash tables used to track update chains */ - hash_ctl.keysize = sizeof(TidHashKey); - hash_ctl.entrysize = sizeof(UnresolvedTupData); - hash_ctl.hcxt = state->rs_cxt; - - state->rs_unresolved_tups = - hash_create("Rewrite / Unresolved ctids", - 128, /* arbitrary initial size */ - &hash_ctl, - HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); - - hash_ctl.entrysize = sizeof(OldToNewMappingData); - - state->rs_old_new_tid_map = - hash_create("Rewrite / Old to new tid map", - 128, /* arbitrary initial size */ - &hash_ctl, - HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); - - MemoryContextSwitchTo(old_cxt); - - logical_begin_heap_rewrite(state); - return state; } @@ -300,16 +380,20 @@ end_heap_rewrite(RewriteState state) HASH_SEQ_STATUS seq_status; UnresolvedTup unresolved; - /* - * Write any remaining tuples in the UnresolvedTups table. If we have any - * left, they should in fact be dead, but let's err on the safe side. - */ - hash_seq_init(&seq_status, state->rs_unresolved_tups); - - while ((unresolved = hash_seq_search(&seq_status)) != NULL) + if (state->rs_unresolved_tups) { - ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid); - raw_heap_insert(state, unresolved->tuple); + /* + * Write any remaining tuples in the UnresolvedTups table. If we have + * any left, they should in fact be dead, but let's err on the safe + * side. + */ + hash_seq_init(&seq_status, state->rs_unresolved_tups); + + while ((unresolved = hash_seq_search(&seq_status)) != NULL) + { + ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid); + raw_heap_insert(state, unresolved->tuple); + } } /* Write the last page, if any */ @@ -318,10 +402,29 @@ end_heap_rewrite(RewriteState state) smgr_bulk_write(state->rs_bulkstate, state->rs_blockno, state->rs_buffer, true); state->rs_buffer = NULL; } - smgr_bulk_finish(state->rs_bulkstate); - logical_end_heap_rewrite(state); + /* The same for TOAST */ + if (state->toast) + { + RewriteState toast = state->toast; + + if (toast->rs_buffer) + { + smgr_bulk_write(toast->rs_bulkstate, toast->rs_blockno, + toast->rs_buffer, true); + toast->rs_buffer = NULL; + } + smgr_bulk_finish(toast->rs_bulkstate); + + /* Close relation(s) opened by begin_heap_rewrite(). */ + table_close(toast->rs_new_rel, NoLock); + if (toast->rs_old_rel) + table_close(toast->rs_old_rel, ShareUpdateExclusiveLock); + } + + if (state->rs_logical_rewrite) + logical_end_heap_rewrite(state); /* Deleting the context frees everything */ MemoryContextDelete(state->rs_cxt); @@ -350,30 +453,13 @@ rewrite_heap_tuple(RewriteState state, old_cxt = MemoryContextSwitchTo(state->rs_cxt); - /* - * Copy the original tuple's visibility information into new_tuple. - * - * XXX we might later need to copy some t_infomask2 bits, too? Right now, - * we intentionally clear the HOT status bits. - */ - memcpy(&new_tuple->t_data->t_choice.t_heap, - &old_tuple->t_data->t_choice.t_heap, - sizeof(HeapTupleFields)); - - new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK; - new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK; - new_tuple->t_data->t_infomask |= - old_tuple->t_data->t_infomask & HEAP_XACT_MASK; + rewrite_copy_visibility_info(new_tuple, old_tuple); /* * While we have our hands on the tuple, we may as well freeze any * eligible xmin or xmax, so that future VACUUM effort can be saved. */ - heap_freeze_tuple(new_tuple->t_data, - state->rs_old_rel->rd_rel->relfrozenxid, - state->rs_old_rel->rd_rel->relminmxid, - state->rs_freeze_xid, - state->rs_cutoff_multi); + rewrite_freeze_tuple(state, new_tuple); /* * Invalid ctid means that ctid should point to the tuple itself. We'll @@ -534,6 +620,24 @@ rewrite_heap_tuple(RewriteState state, MemoryContextSwitchTo(old_cxt); } +/* + * Like rewrite_heap_tuple(), but do not care about hot chains. The user + * should have used the appropriate snapshot to pick at most one tuple of the + * chain - this is typical for REPACK (CONCURRENTLY). + */ +void +rewrite_heap_tuple_no_chains(RewriteState state, HeapTuple old_tuple, + HeapTuple new_tuple, bool freeze) +{ + if (new_tuple != old_tuple) + rewrite_copy_visibility_info(new_tuple, old_tuple); + + if (freeze) + rewrite_freeze_tuple(state, new_tuple); + + raw_heap_insert(state, new_tuple); +} + /* * Register a dead tuple with an ongoing rewrite. Dead tuples are not * copied to the new table, but we still make note of them so that we @@ -628,7 +732,7 @@ raw_heap_insert(RewriteState state, HeapTuple tup) options |= HEAP_INSERT_NO_LOGICAL; heaptup = heap_toast_insert_or_update(state->rs_new_rel, tup, NULL, - options); + state->toast, options); } else heaptup = tup; @@ -704,6 +808,47 @@ raw_heap_insert(RewriteState state, HeapTuple tup) heap_freetuple(heaptup); } +/* + * Freeze tuple. 'old_tuple' provides the initial visibility information. + */ +void +rewrite_freeze_tuple(RewriteState state, HeapTuple tuple) +{ + TransactionId relfrozenxid = InvalidTransactionId; + MultiXactId relminmxid = InvalidMultiXactId; + + /* The old relation may be missing if dealing with TOAST. */ + if (state->rs_old_rel) + { + relfrozenxid = state->rs_old_rel->rd_rel->relfrozenxid; + relminmxid = state->rs_old_rel->rd_rel->relminmxid; + } + + heap_freeze_tuple(tuple->t_data, relfrozenxid, relminmxid, + state->rs_freeze_xid, + state->rs_cutoff_multi); +} + +/* + * Copy the old tuple's visibility information into the new tuple. + */ +void +rewrite_copy_visibility_info(HeapTuple new_tuple, HeapTuple old_tuple) +{ + /* + * XXX we might later need to copy some t_infomask2 bits, too? Right now, + * we intentionally clear the HOT status bits. + */ + memcpy(&new_tuple->t_data->t_choice.t_heap, + &old_tuple->t_data->t_choice.t_heap, + sizeof(HeapTupleFields)); + + new_tuple->t_data->t_infomask &= ~HEAP_XACT_MASK; + new_tuple->t_data->t_infomask2 &= ~HEAP2_XACT_MASK; + new_tuple->t_data->t_infomask |= + old_tuple->t_data->t_infomask & HEAP_XACT_MASK; +} + /* ------------------------------------------------------------------------ * Logical rewrite support * diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c index 68ff0966f1c..43b853d949e 100644 --- a/src/backend/access/table/tableam.c +++ b/src/backend/access/table/tableam.c @@ -319,6 +319,7 @@ simple_table_tuple_delete(Relation rel, ItemPointer tid, Snapshot snapshot) TM_FailureData tmfd; result = table_tuple_delete(rel, tid, + InvalidTransactionId, GetCurrentCommandId(true), 0, snapshot, InvalidSnapshot, true /* wait for commit */ , diff --git a/src/backend/access/table/toast_helper.c b/src/backend/access/table/toast_helper.c index 2f2022d9951..e0c3af410d4 100644 --- a/src/backend/access/table/toast_helper.c +++ b/src/backend/access/table/toast_helper.c @@ -261,7 +261,7 @@ toast_tuple_externalize(ToastTupleContext *ttc, int attribute, uint32 options) attr->tai_colflags |= TOASTCOL_IGNORE; *value = toast_save_datum(ttc->ttc_rel, old_value, attr->tai_oldexternal, - options); + ttc->ttc_rwstate, ttc->ttc_tup_main, options); if ((attr->tai_colflags & TOASTCOL_NEEDS_FREE) != 0) pfree(DatumGetPointer(old_value)); attr->tai_colflags |= TOASTCOL_NEEDS_FREE; @@ -272,7 +272,7 @@ toast_tuple_externalize(ToastTupleContext *ttc, int attribute, uint32 options) * Perform appropriate cleanup after one tuple has been subjected to TOAST. */ void -toast_tuple_cleanup(ToastTupleContext *ttc) +toast_tuple_cleanup(ToastTupleContext *ttc, TransactionId xid) { TupleDesc tupleDesc = ttc->ttc_rel->rd_att; int numAttrs = tupleDesc->natts; @@ -305,7 +305,8 @@ toast_tuple_cleanup(ToastTupleContext *ttc) ToastAttrInfo *attr = &ttc->ttc_attr[i]; if ((attr->tai_colflags & TOASTCOL_NEEDS_DELETE_OLD) != 0) - toast_delete_datum(ttc->ttc_rel, ttc->ttc_oldvalues[i], false); + toast_delete_datum(ttc->ttc_rel, ttc->ttc_oldvalues[i], false, + xid); } } } @@ -316,7 +317,7 @@ toast_tuple_cleanup(ToastTupleContext *ttc) */ void toast_delete_external(Relation rel, const Datum *values, const bool *isnull, - bool is_speculative) + bool is_speculative, TransactionId xid) { TupleDesc tupleDesc = rel->rd_att; int numAttrs = tupleDesc->natts; @@ -331,7 +332,7 @@ toast_delete_external(Relation rel, const Datum *values, const bool *isnull, if (isnull[i]) continue; else if (VARATT_IS_EXTERNAL_ONDISK(DatumGetPointer(value))) - toast_delete_datum(rel, value, is_speculative); + toast_delete_datum(rel, value, is_speculative, xid); } } } diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c index f2e10b82b7d..ed20a35f7d6 100644 --- a/src/backend/access/transam/xloginsert.c +++ b/src/backend/access/transam/xloginsert.c @@ -105,6 +105,9 @@ static uint64 mainrdata_len; /* total # of bytes in chain */ /* flags for the in-progress insertion */ static uint8 curinsert_flags = 0; +/* XID to override the XID of the current transaction. */ +static TransactionId curinsert_xid = InvalidTransactionId; + /* * These are used to hold the record header while constructing a record. * 'hdr_scratch' is not a plain variable, but is palloc'd at initialization, @@ -235,6 +238,7 @@ XLogResetInsertion(void) mainrdata_len = 0; mainrdata_last = (XLogRecData *) &mainrdata_head; curinsert_flags = 0; + curinsert_xid = InvalidTransactionId; begininsert_called = false; } @@ -467,6 +471,18 @@ XLogSetRecordFlags(uint8 flags) curinsert_flags |= flags; } +/* + * Set XID status flags for the upcoming WAL record. + * + * Useful when creating WAL records on behalf of another transaction. + */ +void +XLogSetRecordXid(TransactionId xid) +{ + Assert(begininsert_called); + curinsert_xid = xid; +} + /* * Insert an XLOG record having the specified RMID and info bytes, with the * body of the record being the data and buffer references registered earlier @@ -928,6 +944,12 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, { TransactionId xid = GetTopTransactionIdIfAny(); + /* + * On curinsert_xid: if it's set, it's only for recovery and streaming + * replication to work. On the other hand, the record shouldn't be + * logically decoded, so we don't care if the toplevel XID is invalid. + */ + /* Set the flag that the top xid is included in the WAL */ *topxid_included = true; @@ -1000,7 +1022,14 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, * once we know where in the WAL the record will be inserted. The CRC does * not include the record header yet. */ - rechdr->xl_xid = GetCurrentTransactionIdIfAny(); + if (!TransactionIdIsValid(curinsert_xid)) + rechdr->xl_xid = GetCurrentTransactionIdIfAny(); + else + { + /* The overriding XID should be handled specially. */ + rechdr->xl_xid = curinsert_xid; + info |= XLR_XID_REPLAYED; + } rechdr->xl_tot_len = (uint32) total_len; rechdr->xl_info = info; rechdr->xl_rmid = rmid; diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 73b78a83fa7..72940448818 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -1949,10 +1949,13 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl SpinLockRelease(&XLogRecoveryCtl->info_lck); /* - * If we are attempting to enter Hot Standby mode, process XIDs we see + * If we are attempting to enter Hot Standby mode, process XIDs we see. + * + * "replayed" changes should not get into the array again. */ if (standbyState >= STANDBY_INITIALIZED && - TransactionIdIsValid(record->xl_xid)) + TransactionIdIsValid(record->xl_xid) && + (record->xl_info & XLR_XID_REPLAYED) == 0) RecordKnownAssignedTransactionIds(record->xl_xid); /* diff --git a/src/backend/catalog/indexing.c b/src/backend/catalog/indexing.c index fd7d2ec0e3a..e09e9fcd8ac 100644 --- a/src/backend/catalog/indexing.c +++ b/src/backend/catalog/indexing.c @@ -364,5 +364,5 @@ CatalogTupleUpdateWithInfo(Relation heapRel, const ItemPointerData *otid, HeapTu void CatalogTupleDelete(Relation heapRel, const ItemPointerData *tid) { - simple_heap_delete(heapRel, tid); + simple_heap_delete(heapRel, tid, InvalidTransactionId); } diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index 9d490da5f81..372cf57e9f4 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -894,7 +894,8 @@ refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence) { finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true, true, /* reindex */ - RecentXmin, ReadNextMultiXactId(), relpersistence); + RecentXmin, ReadNextMultiXactId(), false, + relpersistence); } /* diff --git a/src/backend/commands/repack.c b/src/backend/commands/repack.c index 92c1b678e6f..e1a9812161f 100644 --- a/src/backend/commands/repack.c +++ b/src/backend/commands/repack.c @@ -71,6 +71,7 @@ #include "storage/lmgr.h" #include "storage/predicate.h" #include "storage/proc.h" +#include "storage/procarray.h" #include "utils/acl.h" #include "utils/fmgroids.h" #include "utils/guc.h" @@ -125,6 +126,7 @@ typedef struct ChangeContexBackup int file_seq_snapshot; int file_seq_changes; Oid clustering_index; + TransactionId last_snapshot_xmin; } ChangeContextBackup; /* @@ -185,7 +187,11 @@ static void copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldInde bool *pSwapToastByContent, TransactionId *pFreezeXid, MultiXactId *pCutoffMulti, + double *p_num_tuples, ChangeContext *chgcxt); +static void copy_table_data_update_stats(Relation OldHeap, Relation NewHeap, + BlockNumber num_pages, + double num_tuples); static void update_relation_cutoffs(Oid relid, TransactionId frozenXid, MultiXactId cutoffMulti); static List *get_tables_to_repack(RepackCommand cmd, bool usingindex, @@ -199,14 +205,21 @@ static bool repack_is_permitted_for_relation(RepackCommand cmd, static void apply_concurrent_changes(ChangeContext *chgcxt, BlockNumber range_start, BlockNumber range_end); -static void apply_concurrent_insert(RepackDest *dest, TupleTableSlot *slot); +static void apply_concurrent_insert(RepackDest *dest, + TupleTableSlot *spill_tuple, + TupleTableSlot *new_tuple, + TransactionId xid); static void apply_concurrent_update(RepackDest *dest, TupleTableSlot *spilled_tuple, - TupleTableSlot *ondisk_tuple); -static void apply_concurrent_delete(Relation rel, TupleTableSlot *slot); + TupleTableSlot *ondisk_tuple, + TupleTableSlot *new_tuple, + TransactionId xid); +static void apply_concurrent_delete(Relation rel, TupleTableSlot *slot, + TransactionId xid); static void restore_tuple(BufFile *file, Relation relation, TupleTableSlot *slot, BlockNumber *block_nr_p, - BlockNumber *old_block_nr_p); + BlockNumber *old_block_nr_p, + TransactionId *xid_p); static void adjust_toast_pointers(Relation relation, TupleTableSlot *dest, TupleTableSlot *src); static bool is_block_in_range(BlockNumber blknum, BlockNumber start, @@ -236,11 +249,14 @@ static void rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHea Oid identIdx, TransactionId frozenXid, MultiXactId cutoffMulti, + double num_tuples, ChangeContext *chgcxt); static ChangeContext *process_auxiliary_table(ChangeContext *chgcxt, Relation *pOldHeap, Relation *pNewHeap, - Oid identIdx); + Oid identIdx, + TransactionId freeze_xid, + MultiXactId cutoff_multi); static List *build_new_indexes(List *OldIndexes, Relation *p_old, Relation *p_new, ChangeContext **p_chgcxt); @@ -1060,6 +1076,7 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose, TransactionId frozenXid; MultiXactId cutoffMulti; bool concurrent = OidIsValid(ident_idx); + double num_tuples = 0; IndexBuildSecurity ibsec; ChangeContext *chgcxt = NULL; #if USE_ASSERT_CHECKING @@ -1149,17 +1166,10 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose, /* Copy the heap data into the new table in the desired order */ copy_table_data(NewHeap, OldHeap, index, verbose, &swap_toast_by_content, &frozenXid, &cutoffMulti, - chgcxt); + &num_tuples, chgcxt); - /* The historic snapshot won't be needed anymore. */ if (concurrent) { - /* - * Make sure the active snapshot can see the data copied, so the rows - * can be updated / deleted. - */ - UpdateActiveSnapshotCommandId(); - Assert(!swap_toast_by_content); /* @@ -1170,7 +1180,8 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose, index_close(index, NoLock); rebuild_relation_finish_concurrent(NewHeap, OldHeap, ident_idx, - frozenXid, cutoffMulti, chgcxt); + frozenXid, cutoffMulti, num_tuples, + chgcxt); pgstat_progress_update_param(PROGRESS_REPACK_PHASE, PROGRESS_REPACK_PHASE_FINAL_CLEANUP); @@ -1200,11 +1211,15 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose, /* * Swap the physical files of the target and transient tables, then * rebuild the target's indexes and throw away the transient table. + * + * If swap_toast_by_content is false, we don't need to update the + * cutoffs because the TOAST relation is new. */ finish_heap_swap(tableOid, OIDNewHeap, is_system_catalog, swap_toast_by_content, false, true, true, /* reindex */ frozenXid, cutoffMulti, + swap_toast_by_content, /* update_toast_cutoffs */ relpersistence); } @@ -1657,67 +1672,6 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod, return OIDNewHeap; } -/* - * Insert tuple when processing REPACK CONCURRENTLY. - * - * rewriteheap.c is not used in the CONCURRENTLY case because it'd be - * difficult to do the same in the catch-up phase (as the logical decoding - * does not provide us with sufficient visibility information). Thus we must - * use heap_insert() both during the catch-up and here. - * - * 'reform' is a slot to use for tuple "reforming", typically to get set - * values of dropped columns to NULL. - * - * We pass the NO_LOGICAL flag to heap_insert() in order to skip logical - * decoding: as soon as REPACK CONCURRENTLY swaps the relation files, it drops - * this relation, so no logical replication subscription should need the data. - */ -void -heap_insert_for_repack(ChangeContext *chgcxt, TupleTableSlot *src, - TupleTableSlot *reform) -{ - HeapTuple tuple; - bool shouldFree; - TupleTableSlot *slot; - RepackDest *dest; - - /* - * Use the current auxiliary table as output if one is active, otherwise - * insert the tuple into the actual destination table. - */ - if (chgcxt->cc_dest_aux) - dest = chgcxt->cc_dest_aux; - else - dest = &chgcxt->cc_dest; - - tuple = ExecFetchSlotHeapTuple(src, false, &shouldFree); - if (reform != NULL && tuple_needs_reform(tuple, src->tts_tupleDescriptor)) - { - clear_dropped_attributes(tuple, reform); - slot = reform; - } - else - slot = src; - - /* - * clear_dropped_attributes() should have deformed the tuple, so nothing - * should depend on it now. - */ - if (shouldFree) - heap_freetuple(tuple); - - table_tuple_insert(dest->rel, slot, GetCurrentCommandId(true), - TABLE_INSERT_NO_LOGICAL, dest->bistate); - - /* - * Insert the tuple into the identity index. initialize_change_context() - * may skip opening of indexes if the identity index is not needed - * immediately. - */ - if (dest->rri) - ExecInsertIndexTuples(dest->rri, dest->estate, 0, slot, NIL, NULL); -} - bool tuple_needs_reform(HeapTuple tuple, TupleDesc tupDesc) { @@ -1751,14 +1705,22 @@ clear_dropped_attributes(HeapTuple tuple, TupleTableSlot *reform) { TupleDesc tupDesc = reform->tts_tupleDescriptor; - /* Assuming 'reform' is virtual, this deforms the tuple. */ - Assert(TTS_IS_VIRTUAL(reform)); + Assert(TTS_IS_VIRTUAL(reform) || TTS_IS_HEAPTUPLE(reform)); ExecForceStoreHeapTuple(tuple, reform, false); for (int i = 0; i < tupDesc->natts; i++) { if (TupleDescCompactAttr(tupDesc, i)->attisdropped) + { + /* + * If 'reform' is virtual, all the attributes are already + * deformed. XXX Should we use the virtual slot at all? . + */ + if (TTS_IS_HEAPTUPLE(reform)) + slot_getsomeattrs(reform, i + 1); + reform->tts_isnull[i] = true; + } } } @@ -1769,16 +1731,14 @@ clear_dropped_attributes(HeapTuple tuple, TupleTableSlot *reform) * *pSwapToastByContent is set true if toast tables must be swapped by content. * *pFreezeXid receives the TransactionId used as freeze cutoff point. * *pCutoffMulti receives the MultiXactId used as a cutoff point. + * *p_num_tuples receives the number of tuples copied. */ static void copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, bool verbose, bool *pSwapToastByContent, TransactionId *pFreezeXid, MultiXactId *pCutoffMulti, - ChangeContext *chgcxt) + double *p_num_tuples, ChangeContext *chgcxt) { - Relation relRelation; - HeapTuple reltup; - Form_pg_class relform; TupleDesc oldTupDesc PG_USED_FOR_ASSERTS_ONLY; TupleDesc newTupDesc PG_USED_FOR_ASSERTS_ONLY; VacuumParams params; @@ -1787,7 +1747,6 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, double num_tuples = 0, tups_vacuumed = 0, tups_recently_dead = 0; - BlockNumber num_pages; int elevel = verbose ? INFO : DEBUG2; PGRUsage ru0; char *nspname; @@ -1960,8 +1919,6 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, */ NewHeap->rd_toastoid = InvalidOid; - num_pages = RelationGetNumberOfBlocks(NewHeap); - /* Log what we did */ ereport(elevel, (errmsg("\"%s.%s\": found %.0f removable, %.0f nonremovable row versions in %u pages", @@ -1974,6 +1931,35 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, tups_recently_dead, pg_rusage_show(&ru0)))); + /* + * Update pg_class fields. In the CONCURRENTLY case we do it later because + * 1) the catalog update triggers XID assignment, 2) the work is split + * into several transactions, so the catalog update should take place in + * the last one. + */ + if (!concurrent) + { + BlockNumber num_pages; + + num_pages = RelationGetNumberOfBlocks(NewHeap); + + copy_table_data_update_stats(OldHeap, NewHeap, num_pages, num_tuples); + } + + *p_num_tuples = num_tuples; +} + +/* + * Sub-routine of copy_table_data(), to update pg_class. + */ +static void +copy_table_data_update_stats(Relation OldHeap, Relation NewHeap, + BlockNumber num_pages, double num_tuples) +{ + Relation relRelation; + HeapTuple reltup; + Form_pg_class relform; + /* Update pg_class to reflect the correct values of pages and tuples. */ relRelation = table_open(RelationRelationId, RowExclusiveLock); @@ -2421,6 +2407,9 @@ update_relation_cutoffs(Oid relid, TransactionId frozenXid, /* * Remove the transient table that was built by make_new_heap, and finish * cleaning up (including rebuilding all indexes on the old heap). + * + * 'update_toast_cutoffs' tells whether relfrozenxid and relminmxid of the + * TOAST relation should be updated too. */ void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, @@ -2431,6 +2420,7 @@ finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, bool reindex, TransactionId frozenXid, MultiXactId cutoffMulti, + bool update_toast_cutoffs, char newrelpersistence) { ObjectAddress object; @@ -2438,6 +2428,17 @@ finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, Oid oid_old_toastid; int i; + /* + * In the swap-toast-by-content case, we always need to update the + * cutoffs. In the swap-toast-links case, we usually assume we don't need + * to change the toast table's relfrozenxid: the new version of the toast + * table should already have relfrozenxid set to RecentXmin, which is good + * enough. However, there's a special case - REPACK (CONCURRENTLY) - which + * still needs to update the cutoffs - see the related call for more info. + */ + Assert((swap_toast_by_content && update_toast_cutoffs) || + !swap_toast_by_content); + /* Report that we are now swapping relation files */ pgstat_progress_update_param(PROGRESS_REPACK_PHASE, PROGRESS_REPACK_PHASE_SWAP_REL_FILES); @@ -2515,12 +2516,8 @@ finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, */ CommandCounterIncrement(); update_relation_cutoffs(OIDOldHeap, frozenXid, cutoffMulti); - - /* - * The same for TOAST, if needed. In the swap-toast-links case, the new - * the toast table should already have relfrozenxid set to RecentXmin. - */ - if (OidIsValid(oid_old_toastid) && swap_toast_by_content) + /* The same for TOAST, if requested. */ + if (OidIsValid(oid_old_toastid) && update_toast_cutoffs) update_relation_cutoffs(oid_old_toastid, frozenXid, cutoffMulti); /* Destroy new heap with old filenumber */ @@ -3035,12 +3032,14 @@ apply_concurrent_changes(ChangeContext *chgcxt, BlockNumber range_start, TupleTableSlot *spilled_tuple; TupleTableSlot *old_update_tuple; TupleTableSlot *ondisk_tuple; + TupleTableSlot *new_tuple; bool have_old_tuple = false; bool check_range; MemoryContext oldcxt; DecodingWorkerShared *shared; char fname[MAXPGPATH]; BufFile *file; + SnapshotData SnapshotNewHeap; /* * Use the auxiliary table if one exists, otherwise the "final" @@ -3069,16 +3068,25 @@ apply_concurrent_changes(ChangeContext *chgcxt, BlockNumber range_start, table_slot_callbacks(rel)); old_update_tuple = MakeSingleTupleTableSlot(RelationGetDescr(rel), &TTSOpsVirtual); + new_tuple = MakeSingleTupleTableSlot(RelationGetDescr(rel), + &TTSOpsHeapTuple); oldcxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(dest->estate)); + /* + * Finding tuples to UPDATE / DELETE is exactly the purpose of + * SNAPSHOT_NEW_HEAP. + */ + InitNewHeapSnapshot(SnapshotNewHeap); + PushActiveSnapshot(&SnapshotNewHeap); + while (true) { size_t nread; - ConcurrentChangeKind prevkind = kind; BlockNumber block, old_block; BlockNumber *old_block_p; + TransactionId xid; CHECK_FOR_INTERRUPTS(); @@ -3093,35 +3101,18 @@ apply_concurrent_changes(ChangeContext *chgcxt, BlockNumber range_start, */ if (kind == CHANGE_UPDATE_OLD) { - restore_tuple(file, rel, old_update_tuple, NULL, NULL); + restore_tuple(file, rel, old_update_tuple, NULL, NULL, NULL); have_old_tuple = true; continue; } - /* - * Just before an UPDATE or DELETE, we must update the command - * counter, because the change could refer to a tuple that we have - * just inserted; and before an INSERT, we have to do this also if the - * previous command was either update or delete. - * - * With this approach we don't spend so many CCIs for long strings of - * only INSERTs, which can't affect one another. - */ - if (kind == CHANGE_UPDATE_NEW || kind == CHANGE_DELETE || - (kind == CHANGE_INSERT && (prevkind == CHANGE_UPDATE_NEW || - prevkind == CHANGE_DELETE))) - { - CommandCounterIncrement(); - UpdateActiveSnapshotCommandId(); - } - /* * Now restore the tuple into the slot and execute the change. * * old_block is only stored with UPDATE_NEW. */ old_block_p = kind == CHANGE_UPDATE_NEW ? &old_block : NULL; - restore_tuple(file, rel, spilled_tuple, &block, old_block_p); + restore_tuple(file, rel, spilled_tuple, &block, old_block_p, &xid); if (kind == CHANGE_INSERT) { @@ -3131,7 +3122,7 @@ apply_concurrent_changes(ChangeContext *chgcxt, BlockNumber range_start, */ if (!check_range || is_block_in_range(block, range_start, range_end)) - apply_concurrent_insert(dest, spilled_tuple); + apply_concurrent_insert(dest, spilled_tuple, new_tuple, xid); } else if (kind == CHANGE_DELETE) { @@ -3148,7 +3139,7 @@ apply_concurrent_changes(ChangeContext *chgcxt, BlockNumber range_start, found = find_target_tuple(dest, spilled_tuple, ondisk_tuple); if (!found) elog(ERROR, "could not find target tuple"); - apply_concurrent_delete(rel, ondisk_tuple); + apply_concurrent_delete(rel, ondisk_tuple, xid); } } else if (kind == CHANGE_UPDATE_NEW) @@ -3183,7 +3174,8 @@ apply_concurrent_changes(ChangeContext *chgcxt, BlockNumber range_start, */ adjust_toast_pointers(rel, spilled_tuple, ondisk_tuple); - apply_concurrent_update(dest, spilled_tuple, ondisk_tuple); + apply_concurrent_update(dest, spilled_tuple, ondisk_tuple, + new_tuple, xid); } else { @@ -3204,7 +3196,8 @@ apply_concurrent_changes(ChangeContext *chgcxt, BlockNumber range_start, */ adjust_toast_pointers(rel, spilled_tuple, NULL); - apply_concurrent_insert(dest, spilled_tuple); + apply_concurrent_insert(dest, spilled_tuple, new_tuple, + xid); } else if (is_block_in_range(old_block, range_start, range_end)) { @@ -3218,7 +3211,7 @@ apply_concurrent_changes(ChangeContext *chgcxt, BlockNumber range_start, * visible to the snapshot that we'll use to copy the * other range. */ - apply_concurrent_delete(rel, ondisk_tuple); + apply_concurrent_delete(rel, ondisk_tuple, xid); } /* @@ -3235,11 +3228,13 @@ apply_concurrent_changes(ChangeContext *chgcxt, BlockNumber range_start, ResetPerTupleExprContext(dest->estate); } + PopActiveSnapshot(); /* Cleanup. */ ExecDropSingleTupleTableSlot(spilled_tuple); ExecDropSingleTupleTableSlot(ondisk_tuple); ExecDropSingleTupleTableSlot(old_update_tuple); + ExecDropSingleTupleTableSlot(new_tuple); MemoryContextSwitchTo(oldcxt); @@ -3248,44 +3243,85 @@ apply_concurrent_changes(ChangeContext *chgcxt, BlockNumber range_start, /* * Apply an insert from the spill of concurrent changes to the new copy of the - * table. + * table. 'new_tuple' is the source for table AM. */ static void -apply_concurrent_insert(RepackDest *dest, TupleTableSlot *slot) +apply_concurrent_insert(RepackDest *dest, TupleTableSlot *spill_tuple, + TupleTableSlot *new_tuple, TransactionId xid) { - /* Put the tuple in the table, but make sure it won't be decoded */ - table_tuple_insert(dest->rel, slot, GetCurrentCommandId(true), - TABLE_INSERT_NO_LOGICAL, NULL); + HeapTuple tup; + bool shouldFree; + + /* Copy the contents to a slot that preserves the header fields. */ + Assert(TTS_IS_HEAPTUPLE(new_tuple)); + ExecCopySlot(new_tuple, spill_tuple); + + /* Get pointer to the contained tuple (not a copy). */ + tup = ExecFetchSlotHeapTuple(new_tuple, false, &shouldFree); + Assert(!shouldFree); + + /* Set the XID. */ + HeapTupleHeaderSetXmin(tup->t_data, xid); + + /* + * Put the tuple in the table, but make sure it won't be decoded. At the + * same time, request that the XID we set above is used, instead of + * generating a new one. + * + * FirstCommandId is ok in the new table because the transaction that + * inserted the tuple has already committed, and no other transaction + * should ever need the CID. + */ + table_tuple_insert(dest->rel, new_tuple, FirstCommandId, + TABLE_INSERT_NO_LOGICAL | TABLE_REUSE_XID, + NULL); /* Update indexes with this new tuple. */ ExecInsertIndexTuples(dest->rri, dest->estate, 0, - slot, + new_tuple, NIL, NULL); pgstat_progress_incr_param(PROGRESS_REPACK_HEAP_TUPLES_INSERTED, 1); } /* * Apply an update from the spill of concurrent changes to the new copy of the - * table. + * table. 'new_tuple' is the source for table AM. */ static void apply_concurrent_update(RepackDest *dest, TupleTableSlot *spilled_tuple, - TupleTableSlot *ondisk_tuple) + TupleTableSlot *ondisk_tuple, + TupleTableSlot *new_tuple, TransactionId xid) { + HeapTuple tup; + bool shouldFree; Relation rel = dest->rel; LockTupleMode lockmode; TM_FailureData tmfd; TU_UpdateIndexes update_indexes; TM_Result res; + /* Copy the contents to a slot that preserves the header fields. */ + Assert(TTS_IS_HEAPTUPLE(new_tuple)); + ExecCopySlot(new_tuple, spilled_tuple); + + /* Get pointer to the contained tuple (not a copy). */ + tup = ExecFetchSlotHeapTuple(new_tuple, false, &shouldFree); + Assert(!shouldFree); + + /* Set the XID. */ + HeapTupleHeaderSetXmin(tup->t_data, xid); + /* * Carry out the update, skipping logical decoding for it. + * + * See comments in apply_concurrent_insert() to understand why + * FirstCommandId is ok in the new table. */ - res = table_tuple_update(rel, &(ondisk_tuple->tts_tid), spilled_tuple, - GetCurrentCommandId(true), - TABLE_UPDATE_NO_LOGICAL, + res = table_tuple_update(rel, &(ondisk_tuple->tts_tid), new_tuple, + FirstCommandId, + TABLE_UPDATE_NO_LOGICAL | TABLE_REUSE_XID, InvalidSnapshot, InvalidSnapshot, false, @@ -3305,7 +3341,7 @@ apply_concurrent_update(RepackDest *dest, TupleTableSlot *spilled_tuple, ExecInsertIndexTuples(dest->rri, dest->estate, flags, - spilled_tuple, + new_tuple, NIL, NULL); } @@ -3313,16 +3349,22 @@ apply_concurrent_update(RepackDest *dest, TupleTableSlot *spilled_tuple, } static void -apply_concurrent_delete(Relation rel, TupleTableSlot *slot) +apply_concurrent_delete(Relation rel, TupleTableSlot *slot, TransactionId xid) { TM_Result res; TM_FailureData tmfd; /* * Delete tuple from the new heap, skipping logical decoding for it. + * + * See comments in heap_insert_for_repack() to understand why + * FirstCommandId is ok in the new table. + * + * See comments in apply_concurrent_insert() to understand why + * FirstCommandId is ok in the new table. */ res = table_tuple_delete(rel, &(slot->tts_tid), - GetCurrentCommandId(true), + xid, FirstCommandId, TABLE_DELETE_NO_LOGICAL, InvalidSnapshot, InvalidSnapshot, false, @@ -3348,7 +3390,8 @@ apply_concurrent_delete(Relation rel, TupleTableSlot *slot) */ static void restore_tuple(BufFile *file, Relation relation, TupleTableSlot *slot, - BlockNumber *block_nr_p, BlockNumber *old_block_nr_p) + BlockNumber *block_nr_p, BlockNumber *old_block_nr_p, + TransactionId *xid_p) { uint32 t_len; HeapTuple tup; @@ -3371,6 +3414,8 @@ restore_tuple(BufFile *file, Relation relation, TupleTableSlot *slot, /* Handle TID separate because not all tuple slots care about it. */ if (block_nr_p) *block_nr_p = ItemPointerGetBlockNumber(&tup->t_data->t_ctid); + if (xid_p) + *xid_p = HeapTupleHeaderGetXmin(tup->t_data); if (old_block_nr_p) BufFileReadExact(file, old_block_nr_p, sizeof(BlockNumber)); @@ -3592,6 +3637,7 @@ initialize_change_context(ChangeContext *chgcxt, Relation relation, chgcxt->cc_dest_aux = NULL; chgcxt->cc_clustering_index = InvalidOid; + chgcxt->cc_last_snapshot_xmin = InvalidTransactionId; } /* @@ -3755,6 +3801,7 @@ backup_change_context(ChangeContext *chgcxt, ChangeContextBackup *backup) backup->file_seq_snapshot = chgcxt->cc_file_seq_snapshot; backup->file_seq_changes = chgcxt->cc_file_seq_changes; backup->clustering_index = chgcxt->cc_clustering_index; + backup->last_snapshot_xmin = chgcxt->cc_last_snapshot_xmin; } /* @@ -3788,6 +3835,7 @@ reinitialize_change_context(ChangeContextBackup *backup) chgcxt->cc_file_seq_snapshot = backup->file_seq_snapshot; chgcxt->cc_file_seq_changes = backup->file_seq_changes; chgcxt->cc_clustering_index = backup->clustering_index; + chgcxt->cc_last_snapshot_xmin = backup->last_snapshot_xmin; return chgcxt; } @@ -3900,6 +3948,7 @@ static void rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, Oid identIdx, TransactionId frozenXid, MultiXactId cutoffMulti, + double num_tuples, ChangeContext *chgcxt) { List *ind_oids_new; @@ -3914,6 +3963,7 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, MemoryContext oldcxt; List *indexrels; List *inds_tmp = NIL; + BlockNumber num_pages; Assert(CheckRelationLockedByMe(OldHeap, ShareUpdateExclusiveLock, false)); Assert(CheckRelationLockedByMe(NewHeap, AccessExclusiveLock, false)); @@ -3925,7 +3975,8 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, * cache entries updated. */ if (chgcxt->cc_dest_aux) - chgcxt = process_auxiliary_table(chgcxt, &OldHeap, &NewHeap, identIdx); + chgcxt = process_auxiliary_table(chgcxt, &OldHeap, &NewHeap, identIdx, + frozenXid, cutoffMulti); /* * Unlike the exclusive case, we build new indexes for the new relation @@ -3967,6 +4018,43 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, ind_oids_new = lappend_oid(ind_oids_new, RelationGetRelid(chgcxt->cc_dest.ident_index)); + /* + * Since we haven't copied "recently dead" tuples into the new heap, we + * must not finish the processing until they are considered dead by all + * backends. + * + * In particular, the VACUUM xmin horizon for the table must be at least + * xmin of the last snapshot that we used to copy the data. That means + * even the least recently deleted tuples we omitted from the copying + * (because we considered them dead) must be considered dead by anyone. + * + * Note: Although some time should have elapsed since the data copying + * stage (at least the time to build the indexes), we might get stuck here + * due to another backend running REPACK because its snapshot does not + * allow the xmin horizon to advance for some time. + * + * TODO Consider this when determining the value of + * repack_pages_per_snapshot (currently GUC, in the future preferably a + * constant). Is this worth an additional phase in progress reporting? + */ + Assert(TransactionIdIsValid(chgcxt->cc_last_snapshot_xmin)); + while (true) + { + TransactionId oldest_xmin; + + oldest_xmin = GetOldestNonRemovableTransactionId(OldHeap); + if (TransactionIdFollowsOrEquals(oldest_xmin, + chgcxt->cc_last_snapshot_xmin)) + break; + + /* Wait before the next check. */ + (void) WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 1000L, + WAIT_EVENT_REPACK_MVCC_SAFETY); + ResetLatch(MyLatch); + } + /* * During testing, wait for another backend to perform concurrent data * changes which we will process below. @@ -4088,6 +4176,10 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, /* The new indexes must be visible for deletion. */ CommandCounterIncrement(); + /* Update the numbers of pages and tuples in pg_class. */ + num_pages = RelationGetNumberOfBlocks(NewHeap); + copy_table_data_update_stats(OldHeap, NewHeap, num_pages, num_tuples); + /* Close the old heap but keep lock until transaction commit. */ table_close(OldHeap, NoLock); /* Close the new heap. (We didn't have to open its indexes). */ @@ -4100,6 +4192,12 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, * Swap the relations and their TOAST relations and TOAST indexes. This * also drops the new relation and its indexes. * + * update_toast_cutoffs is true because REPACK (CONCURRENTLY) does not + * freeze tuples decoded from WAL, and because RecentXmin is not affected + * by logical decoding. Thus if we accepted relfrozenxid of the new TOAST + * relation (derived from RecentXmin), it could incorrectly tell that we + * froze more recent XID's than we actually did. + * * (System catalogs are currently not supported.) */ Assert(!is_system_catalog); @@ -4110,6 +4208,7 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, true, false, /* reindex */ frozenXid, cutoffMulti, + true, /* update_toast_cutoffs */ relpersistence); } @@ -4124,7 +4223,8 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, */ static ChangeContext * process_auxiliary_table(ChangeContext *chgcxt, Relation *pOldHeap, - Relation *pNewHeap, Oid identIdx) + Relation *pNewHeap, Oid identIdx, + TransactionId freeze_xid, MultiXactId cutoff_multi) { RepackDest *dest = chgcxt->cc_dest_aux; Oid ident_idx_new; @@ -4134,6 +4234,8 @@ process_auxiliary_table(ChangeContext *chgcxt, Relation *pOldHeap, Oid aux_oid; ObjectAddress object; Relation rel; + SnapshotData SnapshotNewHeap; + RewriteState rwstate; /* * First, make sure the clustering index exists. @@ -4163,37 +4265,81 @@ process_auxiliary_table(ChangeContext *chgcxt, Relation *pOldHeap, clustering_index = dest->ident_index; } + /* Now do the copying. */ + slot = table_slot_create(dest->rel, NULL); + /* - * Now do the copying. Before starting, clear ->cc_dest_aux so that - * insertions go to the final table, rather than the auxiliary one. + * No point in specifying the auxiliary relation as the old one: we + * haven't frozen tuples when inserting them (one freezing is enough, see + * below), so the tuples do not satisfy the relfrozenxid / relminmxid + * limits, and thus the following freezing would fail. */ - chgcxt->cc_dest_aux = NULL; - slot = table_slot_create(dest->rel, NULL); + rwstate = begin_heap_rewrite(NULL, chgcxt->cc_dest.rel, + /* oldest_xmin only needed for rewriting */ + InvalidTransactionId, + freeze_xid, cutoff_multi, + true); /* - * Note: the current active snapshot blocks the progress of xmin - * horizon(s). The next patches in the series should fix this by using a - * new kind of snapshot (which we can use here because there are no - * transaction aborts in the auxiliary table). + * Scan of the auxiliary table can take long time, but the SnapshotNewHeap + * snapshot can be used here (because there should be no aborted + * insertions in the table), so the scan should not affect the xmin + * horizons. */ + PopActiveSnapshot(); + InitNewHeapSnapshot(SnapshotNewHeap); + PushActiveSnapshot(&SnapshotNewHeap); scan = index_beginscan(dest->rel, clustering_index, GetActiveSnapshot(), NULL, 0, 0, SO_NONE); index_rescan(scan, NULL, 0, NULL, 0); for (;;) { + HeapTuple tuple; + bool shouldFree; + CHECK_FOR_INTERRUPTS(); if (!index_getnext_slot(scan, ForwardScanDirection, slot)) break; + /* Make sure we have a writable copy of the tuple. */ + tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); + /* - * Reforming should have been performed during insertions into the - * auxiliary table. + * This kind of slot maintains the tuple header. We don't need to copy + * the contents into a slot of other kind because reforming was + * performed when populating the auxiliary table. + */ + Assert(TTS_IS_BUFFERTUPLE(slot)); + Assert(TransactionIdIsValid(HeapTupleHeaderGetXmin(tuple->t_data))); + + /* + * Insert the tuple into the new relation, and freeze it while doing + * so. + * + * Since our copy is already writable, the tuple can be passed for + * both old and new tuple. */ - heap_insert_for_repack(chgcxt, slot, NULL); + rewrite_heap_tuple_no_chains(rwstate, tuple, tuple, true); + + if (shouldFree) + pfree(tuple); } index_endscan(scan); + PopActiveSnapshot(); + InvalidateCatalogSnapshot(); + + /* + * We should not be restricting the progress of xmin horizons at the + * moment. + */ + Assert(!TransactionIdIsValid(MyProc->xmin)); + Assert(!TransactionIdIsValid(MyProc->xid)); + Assert(!HaveRegisteredOrActiveSnapshot()); + + PushActiveSnapshot(GetTransactionSnapshot()); ExecDropSingleTupleTableSlot(slot); + end_heap_rewrite(rwstate); /* * Close the relation, its identity index and clustering index if we had @@ -4205,6 +4351,7 @@ process_auxiliary_table(ChangeContext *chgcxt, Relation *pOldHeap, index_close(clustering_index, NoLock); /* Here we close the other indexes. */ release_change_dest(dest); + chgcxt->cc_dest_aux = NULL; /* Drop the auxiliary table. */ object.classId = RelationRelationId; diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index ca60e4123b1..214ea22fffc 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -6087,6 +6087,7 @@ ATRewriteTables(AlterTableStmt *parsetree, List **wqueue, LOCKMODE lockmode, true, /* reindex */ RecentXmin, ReadNextMultiXactId(), + false, /* update_toast_cutoffs */ persistence); InvokeObjectPostAlterHook(RelationRelationId, tab->relid, 0); diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 33a6735f08d..9a7888724ad 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -1765,6 +1765,7 @@ ExecDeleteAct(ModifyTableContext *context, ResultRelInfo *resultRelInfo, options |= TABLE_DELETE_CHANGING_PARTITION; return table_tuple_delete(resultRelInfo->ri_RelationDesc, tupleid, + InvalidTransactionId, estate->es_output_cid, options, estate->es_snapshot, diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index c3722b5c623..e3c94f83875 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -425,6 +425,18 @@ heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) TransactionId xid = XLogRecGetXid(buf->record); SnapBuild *builder = ctx->snapshot_builder; + /* + * XLOG_HEAP2_MULTI_INSERT is not replayed. + */ + Assert((XLogRecGetInfo(buf->record) & XLR_XID_REPLAYED) == 0); + + /* See heap_decode(). */ + if (change_useless_for_repack(buf)) + { + Assert(!ctx->fast_forward); + return; + } + ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); /* @@ -442,8 +454,7 @@ heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { case XLOG_HEAP2_MULTI_INSERT: if (SnapBuildProcessChange(builder, xid, buf->origptr) && - !ctx->fast_forward && - !change_useless_for_repack(buf)) + !ctx->fast_forward) DecodeMultiInsert(ctx, buf); break; case XLOG_HEAP2_NEW_CID: @@ -488,6 +499,50 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) TransactionId xid = XLogRecGetXid(buf->record); SnapBuild *builder = ctx->snapshot_builder; + /* + * REPACK decoding should only decode changes of the relation being + * processed. Decoding changes of other tables would not only introduce + * performance overhead, it would also make us process already committed + * (and replayed) XIDs again - that's probably not expected by the logical + * decoding system. + * + * The problem is that during the replay, REPACK can (and does) avoid WAL + * logging the extra information needed for logical decoding, however this + * is checked in DecodeInsert(), DecodeUpdate(), etc., which is too late. + * Moreover, these function do not distinguish whether the transaction is + * being decoded the first time, or if the WAL records originate from the + * replay phase of REPACK. + * + * Unlike the fast-forward case (see comments below), REPACK does not need + * the base snapshot for the transaction until it receives a change that + * really needs to be decoded. Thus it's ok to skip + * SnapBuildProcessChange(). + * + * (With fast-forward, we must not omit the setup of the transaction base + * snapshot because the changes skipped by fast-forward initially may need + * to be decoded after restart. Thus the base snapshot may be needed after + * the restart too. If we didn't create the snapshot in the fast-forward + * mode, the snapshot builder's xmin would advance too eagerly, so the + * same snapshot wouldn't work after restart.) + * + * First, filter out WAL records generated by REPACK (CONCURRENTLY) + * replaying the data changes of other transactions - these transactions + * have already been decoded, so no backend / worker should decode them + * again. + */ + if (XLogRecGetInfo(buf->record) & XLR_XID_REPLAYED) + return; + + /* + * Now let REPACK decoding worker filter out changes of tables other than + * the one whose REPACKing it's involved in. + */ + if (change_useless_for_repack(buf)) + { + Assert(!ctx->fast_forward); + return; + } + ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); /* @@ -505,8 +560,7 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { case XLOG_HEAP_INSERT: if (SnapBuildProcessChange(builder, xid, buf->origptr) && - !ctx->fast_forward && - !change_useless_for_repack(buf)) + !ctx->fast_forward) DecodeInsert(ctx, buf); break; @@ -518,22 +572,19 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) case XLOG_HEAP_HOT_UPDATE: case XLOG_HEAP_UPDATE: if (SnapBuildProcessChange(builder, xid, buf->origptr) && - !ctx->fast_forward && - !change_useless_for_repack(buf)) + !ctx->fast_forward) DecodeUpdate(ctx, buf); break; case XLOG_HEAP_DELETE: if (SnapBuildProcessChange(builder, xid, buf->origptr) && - !ctx->fast_forward && - !change_useless_for_repack(buf)) + !ctx->fast_forward) DecodeDelete(ctx, buf); break; case XLOG_HEAP_TRUNCATE: if (SnapBuildProcessChange(builder, xid, buf->origptr) && - !ctx->fast_forward && - !change_useless_for_repack(buf)) + !ctx->fast_forward) DecodeTruncate(ctx, buf); break; @@ -549,8 +600,7 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) case XLOG_HEAP_CONFIRM: if (SnapBuildProcessChange(builder, xid, buf->origptr) && - !ctx->fast_forward && - !change_useless_for_repack(buf)) + !ctx->fast_forward) DecodeSpecConfirm(ctx, buf); break; @@ -960,15 +1010,17 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple); /* - * REPACK (CONCURRENTLY) needs block number to check if the corresponding - * part of the table was already copied. XXX Should we only do this if - * AmRepackWorker()? It might save a few cycles, but not sure it's good to - * leave the fields unset in other cases. + * REPACK (CONCURRENTLY) needs xmin to preserve visibility information and + * block number to check if the corresponding part of the table was + * already copied. XXX Should we only do this if AmRepackWorker()? It + * might save a few cycles, but not sure it's good to leave the fields + * unset in other cases. */ { HeapTupleHeader header; header = change->data.tp.newtuple->t_data; + HeapTupleHeaderSetXmin(header, XLogRecGetXid(r)); /* offnum is not really needed, but let's set valid pointer. */ ItemPointerSet(&header->t_ctid, blknum, xlrec->offnum); } @@ -1033,14 +1085,15 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) DecodeXLogTuple(data, datalen, change->data.tp.newtuple); /* - * REPACK (CONCURRENTLY) needs block numbers to check if the - * corresponding part of the table was already copied. XXX Do this - * only if AmRepackWorker()? + * REPACK (CONCURRENTLY) needs xmin to preserve visibility information + * and block numbers to check if the corresponding part of the table + * was already copied. XXX Do this only if AmRepackWorker()? */ { HeapTupleHeader header; header = change->data.tp.newtuple->t_data; + HeapTupleHeaderSetXmin(header, XLogRecGetXid(r)); /* offnum is not really needed, but let's set valid pointer. */ ItemPointerSet(&header->t_ctid, new_blknum, xlrec->new_offnum); change->data.tp.old_blknum = old_blknum; @@ -1129,14 +1182,20 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) datalen, change->data.tp.oldtuple); /* - * REPACK (CONCURRENTLY) needs block number to check if the - * corresponding part of the table was already copied. XXX Do this - * only if AmRepackWorker()? + * REPACK (CONCURRENTLY) needs xmax to preserve visibility information + * and block number to check if the corresponding part of the table + * was already copied. XXX Do this only if AmRepackWorker()? */ { HeapTupleHeader header; header = change->data.tp.oldtuple->t_data; + + /* + * xmax makes more sense here, but we don't want restore_tuple() + * to pay attention to the change kind, so use xmin here as well. + */ + HeapTupleHeaderSetXmin(header, XLogRecGetXid(r)); /* offnum is not really needed, but let's set valid pointer. */ ItemPointerSet(&header->t_ctid, blknum, xlrec->offnum); } @@ -1282,13 +1341,16 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->data.tp.clear_toast_afterwards = false; /* - * REPACK (CONCURRENTLY) needs block number to check if the - * corresponding part of the table was already copied. + * REPACK (CONCURRENTLY) needs xmin to preserve visibility information + * and block number to check if the corresponding part of the table + * was already copied. */ if (AmRepackWorker()) { OffsetNumber offnum; + HeapTupleHeaderSetXmin(header, XLogRecGetXid(r)); + /* * offnum is not really needed, but let's set valid pointer. (It * will be invalid anyway if the page was initially empty.) diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 06f31e02b9a..5a475cd5ad8 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -5268,6 +5268,9 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, * Shouldn't we add a new field to ReorderBufferChange instead? */ tmphtup->t_data->t_ctid = newtup->t_data->t_ctid; + /* Likewise, preserve XID - REPACK needs it to be MVCC-safe. */ + HeapTupleHeaderSetXmin(tmphtup->t_data, + HeapTupleHeaderGetXmin(newtup->t_data)); memcpy(newtup->t_data, tmphtup->t_data, tmphtup->t_len); newtup->t_len = tmphtup->t_len; diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 560659f9568..c145dbc88cf 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -184,6 +184,7 @@ PG_SLEEP "Waiting due to a call to pg_sleep or a sibling fu RECOVERY_APPLY_DELAY "Waiting to apply WAL during recovery because of a delay setting." RECOVERY_RETRIEVE_RETRY_INTERVAL "Waiting during recovery when WAL data is not available from any source (pg_wal, archive or stream)." REGISTER_SYNC_REQUEST "Waiting while sending synchronization requests to the checkpointer, because the request queue is full." +REPACK_MVCC_SAFETY "Waiting until not copied tuples are considered dead." SPIN_DELAY "Waiting while acquiring a contended spinlock." VACUUM_DELAY "Waiting in a cost-based vacuum delay point." VACUUM_TRUNCATE "Waiting to acquire an exclusive lock to truncate off any empty pages at the end of a table vacuumed." diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 5176478c295..bc6eadcb978 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -380,6 +380,7 @@ extern void heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, CommandId cid, uint32 options, BulkInsertState bistate); extern TM_Result heap_delete(Relation relation, const ItemPointerData *tid, + TransactionId xid, CommandId cid, uint32 options, Snapshot crosscheck, bool wait, TM_FailureData *tmfd); extern void heap_finish_speculative(Relation relation, const ItemPointerData *tid); @@ -422,7 +423,8 @@ extern bool heap_tuple_should_freeze(HeapTupleHeader tuple, extern bool heap_tuple_needs_eventual_freeze(HeapTupleHeader tuple); extern void simple_heap_insert(Relation relation, HeapTuple tup); -extern void simple_heap_delete(Relation relation, const ItemPointerData *tid); +extern void simple_heap_delete(Relation relation, const ItemPointerData *tid, + TransactionId xid); extern void simple_heap_update(Relation relation, const ItemPointerData *otid, HeapTuple tup, TU_UpdateIndexes *update_indexes); diff --git a/src/include/access/heaptoast.h b/src/include/access/heaptoast.h index 631cb1836b9..36d1d65c131 100644 --- a/src/include/access/heaptoast.h +++ b/src/include/access/heaptoast.h @@ -14,6 +14,7 @@ #define HEAPTOAST_H #include "access/htup_details.h" +#include "access/rewriteheap.h" #include "storage/lockdefs.h" #include "utils/relcache.h" @@ -95,7 +96,9 @@ * ---------- */ extern HeapTuple heap_toast_insert_or_update(Relation rel, HeapTuple newtup, - HeapTuple oldtup, uint32 options); + HeapTuple oldtup, + RewriteState rwstate, + uint32 options); /* ---------- * heap_toast_delete - @@ -104,7 +107,7 @@ extern HeapTuple heap_toast_insert_or_update(Relation rel, HeapTuple newtup, * ---------- */ extern void heap_toast_delete(Relation rel, HeapTuple oldtup, - bool is_speculative); + bool is_speculative, TransactionId xid); /* ---------- * toast_flatten_tuple - diff --git a/src/include/access/rewriteheap.h b/src/include/access/rewriteheap.h index 6ccf7b45c04..80d928e0e26 100644 --- a/src/include/access/rewriteheap.h +++ b/src/include/access/rewriteheap.h @@ -22,12 +22,21 @@ typedef struct RewriteStateData *RewriteState; extern RewriteState begin_heap_rewrite(Relation old_heap, Relation new_heap, - TransactionId oldest_xmin, TransactionId freeze_xid, - MultiXactId cutoff_multi); + TransactionId oldest_xmin, + TransactionId freeze_xid, + MultiXactId cutoff_multi, + bool no_chains); extern void end_heap_rewrite(RewriteState state); extern void rewrite_heap_tuple(RewriteState state, HeapTuple old_tuple, HeapTuple new_tuple); +extern void rewrite_heap_tuple_no_chains(RewriteState state, + HeapTuple old_tuple, + HeapTuple new_tuple, + bool freeze); extern bool rewrite_heap_dead_tuple(RewriteState state, HeapTuple old_tuple); +extern void rewrite_freeze_tuple(RewriteState state, HeapTuple tuple); +extern void rewrite_copy_visibility_info(HeapTuple new_tuple, + HeapTuple old_tuple); /* * On-Disk data format for an individual logical rewrite mapping. diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 132248c5d43..1af01235c8b 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -292,6 +292,12 @@ typedef struct TM_IndexDeleteOp /* "options" flag bits for table_tuple_update */ #define TABLE_UPDATE_NO_LOGICAL (1 << 0) +/* + * For INSERT or UPDATE, use XID (xmin) contained the in new tuple rather than + * the XID of the current transaction. + */ +#define TABLE_REUSE_XID (1 << 31) + /* flag bits for table_tuple_lock */ /* Follow tuples whose update is in progress if lock modes don't conflict */ #define TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS (1 << 0) @@ -568,6 +574,7 @@ typedef struct TableAmRoutine /* see table_tuple_delete() for reference about parameters */ TM_Result (*tuple_delete) (Relation rel, ItemPointer tid, + TransactionId xid, CommandId cid, uint32 options, Snapshot snapshot, @@ -1458,6 +1465,14 @@ static inline void table_tuple_insert(Relation rel, TupleTableSlot *slot, CommandId cid, uint32 options, BulkInsertStateData *bistate) { + /* + * TABLE_REUSE_XID restricts the slot type because not all slots preserve + * the visibility information. XXX Isn't this a reason to pass the xid as + * an argument? + */ + Assert((options & TABLE_REUSE_XID) == 0 || TTS_IS_HEAPTUPLE(slot) || + TTS_IS_BUFFERTUPLE(slot)); + rel->rd_tableam->tuple_insert(rel, slot, cid, options, bistate); } @@ -1479,6 +1494,9 @@ table_tuple_insert_speculative(Relation rel, TupleTableSlot *slot, BulkInsertStateData *bistate, uint32 specToken) { + /* TABLE_REUSE_XID is currently not needed here. */ + Assert((options & TABLE_REUSE_XID) == 0); + rel->rd_tableam->tuple_insert_speculative(rel, slot, cid, options, bistate, specToken); } @@ -1526,6 +1544,7 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, * Input parameters: * rel - table to be modified (caller must hold suitable lock) * tid - TID of tuple to be deleted + * xid - XID to use or InvalidTransactionId for the current transaction * cid - delete command ID (used for visibility test, and stored into * cmax if successful) * options - bitmask of options. Supported values: @@ -1546,11 +1565,12 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, * TM_FailureData for additional info. */ static inline TM_Result -table_tuple_delete(Relation rel, ItemPointer tid, CommandId cid, +table_tuple_delete(Relation rel, ItemPointer tid, TransactionId xid, + CommandId cid, uint32 options, Snapshot snapshot, Snapshot crosscheck, bool wait, TM_FailureData *tmfd) { - return rel->rd_tableam->tuple_delete(rel, tid, cid, options, + return rel->rd_tableam->tuple_delete(rel, tid, xid, cid, options, snapshot, crosscheck, wait, tmfd); } @@ -1601,6 +1621,10 @@ table_tuple_update(Relation rel, ItemPointer otid, TupleTableSlot *slot, bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode, TU_UpdateIndexes *update_indexes) { + /* See table_tuple_insert(). */ + Assert((options & TABLE_REUSE_XID) == 0 || TTS_IS_HEAPTUPLE(slot) || + TTS_IS_BUFFERTUPLE(slot)); + return rel->rd_tableam->tuple_update(rel, otid, slot, cid, options, snapshot, crosscheck, wait, tmfd, diff --git a/src/include/access/toast_helper.h b/src/include/access/toast_helper.h index 2ec92397f26..f86f88c774e 100644 --- a/src/include/access/toast_helper.h +++ b/src/include/access/toast_helper.h @@ -14,6 +14,7 @@ #ifndef TOAST_HELPER_H #define TOAST_HELPER_H +#include "access/rewriteheap.h" #include "utils/rel.h" /* @@ -60,6 +61,15 @@ typedef struct */ uint8 ttc_flags; ToastAttrInfo *ttc_attr; + + /* + * These fields are needed when the option HEAP_INSERT_REUSE_XID_FREEZE + * was passed to heap_toast_insert_or_update(). We could actually use + * normal insert, but that would require WAL support of + * heap_freeze_tuple(). + */ + RewriteState ttc_rwstate; + HeapTuple ttc_tup_main; } ToastTupleContext; /* @@ -108,9 +118,9 @@ extern int toast_tuple_find_biggest_attribute(ToastTupleContext *ttc, extern void toast_tuple_try_compression(ToastTupleContext *ttc, int attribute); extern void toast_tuple_externalize(ToastTupleContext *ttc, int attribute, uint32 options); -extern void toast_tuple_cleanup(ToastTupleContext *ttc); +extern void toast_tuple_cleanup(ToastTupleContext *ttc, TransactionId xid); extern void toast_delete_external(Relation rel, const Datum *values, const bool *isnull, - bool is_speculative); + bool is_speculative, TransactionId xid); #endif diff --git a/src/include/access/toast_internals.h b/src/include/access/toast_internals.h index bf45889a642..62da4a18d63 100644 --- a/src/include/access/toast_internals.h +++ b/src/include/access/toast_internals.h @@ -12,6 +12,7 @@ #ifndef TOAST_INTERNALS_H #define TOAST_INTERNALS_H +#include "access/rewriteheap.h" #include "access/toast_compression.h" #include "storage/lockdefs.h" #include "utils/relcache.h" @@ -48,9 +49,13 @@ typedef struct toast_compress_header extern Datum toast_compress_datum(Datum value, char cmethod); extern Oid toast_get_valid_index(Oid toastoid, LOCKMODE lock); -extern void toast_delete_datum(Relation rel, Datum value, bool is_speculative); +extern void toast_delete_datum(Relation rel, Datum value, bool is_speculative, + TransactionId xid); extern Datum toast_save_datum(Relation rel, Datum value, - varlena *oldexternal, uint32 options); + varlena *oldexternal, + RewriteState rwstate, + HeapTuple tup_main, + uint32 options); extern int toast_open_indexes(Relation toastrel, LOCKMODE lock, diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index 55663e6f4af..be718993401 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -32,7 +32,7 @@ /* * Each page of XLOG file has a header like this: */ -#define XLOG_PAGE_MAGIC 0xD120 /* can be used as WAL version indicator */ +#define XLOG_PAGE_MAGIC 0xD121 /* can be used as WAL version indicator */ typedef struct XLogPageHeaderData { diff --git a/src/include/access/xloginsert.h b/src/include/access/xloginsert.h index 91dfbd5627f..a0cea5fde01 100644 --- a/src/include/access/xloginsert.h +++ b/src/include/access/xloginsert.h @@ -43,6 +43,7 @@ /* prototypes for public functions in xloginsert.c: */ extern void XLogBeginInsert(void); extern void XLogSetRecordFlags(uint8 flags); +extern void XLogSetRecordXid(TransactionId xid); extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info); extern XLogRecPtr XLogSimpleInsertInt64(RmgrId rmid, uint8 info, int64 value); extern void XLogEnsureRecordSpace(int max_block_id, int ndatas); diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h index e8999d3fe91..bf3c1965e05 100644 --- a/src/include/access/xlogrecord.h +++ b/src/include/access/xlogrecord.h @@ -90,6 +90,14 @@ typedef struct XLogRecord */ #define XLR_CHECK_CONSISTENCY 0x02 +/* + * The record contains a data change that was already committed and now is + * being applied to a new relation due to rewriting. The original XID is + * needed to keep the rewriting MVCC-safe, however the transaction should be + * ignored by logical decoding and it should not get into KnownAssignedXids. + */ +#define XLR_XID_REPLAYED 0x04 + /* * Header info for block data appended to an XLOG record. * diff --git a/src/include/commands/repack.h b/src/include/commands/repack.h index ad8125790b0..9e2f3e491c6 100644 --- a/src/include/commands/repack.h +++ b/src/include/commands/repack.h @@ -16,6 +16,7 @@ #include #include "access/hio.h" +#include "access/rewriteheap.h" #include "access/skey.h" #include "access/xlogdefs.h" #include "catalog/index.h" @@ -110,10 +111,10 @@ typedef struct ChangeContext * Not sure, it'd require disk space for one more copy and the copying * itself is not free. * - * TODO 1) make the tables unlogged, 2) if REPACK locks the TOAST relation - * too (not sure it does) try to preserve TOAST pointers, instead of - * storing them to TOAST relations of these tables, 3) Check that the - * tables are dropped on transaction abort. + * XXX REPACK currently does not lock the old TOAST relation. If it did, + * we could perhaps copy TOAST pointers from the old relation to the + * auxiliary relation, so that the auxiliary relation would not need its + * own TOAST relation. */ RepackDest *cc_dest_aux; @@ -127,6 +128,11 @@ typedef struct ChangeContext * functions. This is needed when starting a new transaction. */ IndexBuildSecurity cc_ind_build_sec; + + /* + * xmin of the last snapshot used to copy data. + */ + TransactionId cc_last_snapshot_xmin; } ChangeContext; extern PGDLLIMPORT int repack_pages_per_snapshot; @@ -142,8 +148,6 @@ extern void mark_index_clustered(Relation rel, Oid indexOid, bool is_internal); extern Oid make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod, char relpersistence, LOCKMODE lockmode, bool auxiliary); -extern void heap_insert_for_repack(ChangeContext *chgcxt, TupleTableSlot *src, - TupleTableSlot *reform); extern bool tuple_needs_reform(HeapTuple tuple, TupleDesc tupDesc); extern void clear_dropped_attributes(HeapTuple tuple, TupleTableSlot *reform); extern void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, @@ -154,6 +158,7 @@ extern void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, bool reindex, TransactionId frozenXid, MultiXactId cutoffMulti, + bool update_toast_cutoffs, char newrelpersistence); extern Snapshot repack_get_snapshot(ChangeContext *chgcxt); extern void repack_process_concurrent_changes(ChangeContext *chgcxt, diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h index 1c550096393..b0c40df7934 100644 --- a/src/include/utils/snapmgr.h +++ b/src/include/utils/snapmgr.h @@ -51,6 +51,18 @@ extern PGDLLIMPORT SnapshotData SnapshotToastData; ((snapshotdata).snapshot_type = SNAPSHOT_NON_VACUUMABLE, \ (snapshotdata).vistest = (vistestp)) +/* + * NewHeap snapshot needs to be used as the active snapshot at some point, so + * initialize the fields related to PushActiveSnapshot(). + */ +#define InitNewHeapSnapshot(snapshotdata) \ + ((snapshotdata).snapshot_type = SNAPSHOT_NEW_HEAP, \ + (snapshotdata).regd_count = 0, \ + (snapshotdata).active_count = 0, \ + (snapshotdata).copied = false, \ + (snapshotdata).xcnt = 0, \ + (snapshotdata).subxcnt = 0) + /* * Is the snapshot implemented as an MVCC snapshot (i.e. it uses * SNAPSHOT_MVCC)? If so, there will be at most one visible tuple in a chain diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h index 9766aabcad4..f91cb43a5a2 100644 --- a/src/include/utils/snapshot.h +++ b/src/include/utils/snapshot.h @@ -112,6 +112,29 @@ typedef enum SnapshotType * horizon to use. */ SNAPSHOT_NON_VACUUMABLE, + + /* + * The effects of all transactions are visible. Unlike SNAPSHOT_DIRTY, + * aborted (sub)transactions are not expected. + * + * This is specific to applying data changes to the new heap by the REPACK + * command - that replays applies changes done in the old heap by + * transactions that have already committed. No other transactions can + * access the new heap while this snapshot is in use. + * + * Therefore, whenever a transaction being applied looks for a tuple to + * update or delete, it can assume that the insertion of any candidate + * tuple was already committed - otherwise the inserting transaction + * wouldn't have been applied. + * + * By considering effects of all transactions visible we also ensure that + * a transaction can update / delete tuples that it inserted itself. + * + * TODO Consider better name. Would SNAPSHOT_BOOTSTRAP be confusing? + * During cluster bootstrap we also consider all changes committed + * immediately. + */ + SNAPSHOT_NEW_HEAP, } SnapshotType; typedef struct SnapshotData *Snapshot; @@ -127,8 +150,8 @@ typedef struct SnapshotData *Snapshot; * * Historic MVCC snapshots used during logical decoding * * snapshots passed to HeapTupleSatisfiesDirty() * * snapshots passed to HeapTupleSatisfiesNonVacuumable() - * * snapshots used for SatisfiesAny, Toast, Self where no members are - * accessed. + * * snapshots used for SatisfiesAny, Toast, Self, NewHeap where no members + * are accessed. * * TODO: It's probably a good idea to split this struct using a NodeTag * similar to how parser and executor nodes are handled, with one type for diff --git a/src/test/modules/injection_points/expected/repack.out b/src/test/modules/injection_points/expected/repack.out index b575e9052ee..9919c93fe7f 100644 --- a/src/test/modules/injection_points/expected/repack.out +++ b/src/test/modules/injection_points/expected/repack.out @@ -45,8 +45,8 @@ step check2: SELECT i, j FROM repack_test ORDER BY i, j; - INSERT INTO data_s2(i, j) - SELECT i, j FROM repack_test; + INSERT INTO data_s2(i, j, _xmin) + SELECT i, j, xmin FROM repack_test; i| j ---+--- @@ -77,11 +77,11 @@ step check1: SELECT i, j FROM repack_test ORDER BY i, j; - INSERT INTO data_s1(i, j) - SELECT i, j FROM repack_test; + INSERT INTO data_s1(i, j, _xmin) + SELECT i, j, xmin FROM repack_test; SELECT count(*) - FROM data_s1 d1 FULL JOIN data_s2 d2 USING (i, j) + FROM data_s1 d1 FULL JOIN data_s2 d2 USING (i, j, _xmin) WHERE d1.i ISNULL OR d2.i ISNULL; count diff --git a/src/test/modules/injection_points/specs/repack.spec b/src/test/modules/injection_points/specs/repack.spec index d727a9b056b..41dc753fccb 100644 --- a/src/test/modules/injection_points/specs/repack.spec +++ b/src/test/modules/injection_points/specs/repack.spec @@ -8,8 +8,8 @@ setup CREATE TABLE relfilenodes(node oid); - CREATE TABLE data_s1(i int, j int); - CREATE TABLE data_s2(i int, j int); + CREATE TABLE data_s1(i int, j int, _xmin xid); + CREATE TABLE data_s2(i int, j int, _xmin xid); } teardown @@ -38,7 +38,8 @@ step wait_before_lock # Besides the contents, we also check that relfilenode has changed. # Have each session write the contents into a table and use FULL JOIN to check -# if the outputs are identical. +# if the outputs are identical. xmin is included in order to check the MVCC +# safety. step check1 { INSERT INTO relfilenodes(node) @@ -48,11 +49,11 @@ step check1 SELECT i, j FROM repack_test ORDER BY i, j; - INSERT INTO data_s1(i, j) - SELECT i, j FROM repack_test; + INSERT INTO data_s1(i, j, _xmin) + SELECT i, j, xmin FROM repack_test; SELECT count(*) - FROM data_s1 d1 FULL JOIN data_s2 d2 USING (i, j) + FROM data_s1 d1 FULL JOIN data_s2 d2 USING (i, j, _xmin) WHERE d1.i ISNULL OR d2.i ISNULL; } teardown @@ -84,10 +85,6 @@ step change_new # When applying concurrent data changes, we should see the effects of an # in-progress subtransaction. -# -# XXX Not sure this test is useful now - it was designed for the patch that -# preserves tuple visibility and which therefore modifies -# TransactionIdIsCurrentTransactionId(). step change_subxact1 { BEGIN; @@ -101,8 +98,6 @@ step change_subxact1 # When applying concurrent data changes, we should not see the effects of a # rolled back subtransaction. -# -# XXX Is this test useful? See above. step change_subxact2 { BEGIN; @@ -121,8 +116,8 @@ step check2 SELECT i, j FROM repack_test ORDER BY i, j; - INSERT INTO data_s2(i, j) - SELECT i, j FROM repack_test; + INSERT INTO data_s2(i, j, _xmin) + SELECT i, j, xmin FROM repack_test; } step wakeup_before_lock { -- 2.52.0