From b37a7a44f4e41f4a4322edf7648fbb651218c159 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=C3=81lvaro=20Herrera?= <alvherre@kurilemu.de>
Date: Sat, 30 Aug 2025 19:40:04 +0200
Subject: [PATCH v20 6/6] Preserve visibility information of the concurrent
 data changes.

As explained in the commit message of the preceding patch of the series, the
data changes done by applications while REPACK CONCURRENTLY is copying the
table contents to a new file are decoded from WAL and eventually also applied
to the new file. To reduce the complexity a little bit, the preceding patch
uses the current transaction (i.e. transaction opened by the REPACK command)
to execute those INSERT, UPDATE and DELETE commands.

However, REPACK is not expected to change visibility of tuples. Therefore,
this patch fixes the handling of the "concurrent data changes". It ensures
that tuples written into the new table have the same XID and command ID (CID)
as they had in the old table.

To "replay" an UPDATE or DELETE command on the new table, we need the
appropriate snapshot to find the previous tuple version in the new table. The
(historic) snapshot we used to decode the UPDATE / DELETE should (by
definition) see the state of the catalog prior to that UPDATE / DELETE. Thus
we can use the same snapshot to find the "old tuple" for UPDATE / DELETE in
the new table if:

1) REPACK CONCURRENTLY preserves visibility information of all tuples - that's
the purpose of this part of the patch series.

2) The table being REPACKed is treated as a system catalog by all transactions
that modify its data. This ensures that reorderbuffer.c generates a new
snapshot for each data change in the table.

We ensure 2) by maintaining a shared hashtable of tables being REPACKed
CONCURRENTLY and by adjusting the RelationIsAccessibleInLogicalDecoding()
macro so it checks this hashtable. (The corresponding flag is also added to
the relation cache, so that the shared hashtable does not have to be accessed
too often.) It's essential that after adding an entry to the hashtable we wait
for completion of all the transactions that might have started to modify our
table before our entry has was added. We achieve that by upgrading our lock on
the table to ShareLock temporarily: as soon as we acquire it, no DML command
should be running on the table. (This lock upgrade shouldn't cause any
deadlock because we care to not hold a lock on other objects at the same
time.)

As long as we preserve the tuple visibility information (which includes XID),
it's important to avoid logical decoding of the WAL generated by DMLs on the
new table: the logical decoding subsystem probably does not expect that the
incoming WAL records contain XIDs of an already decoded transactions. (And of
course, repeated decoding would be wasted effort.)

Author: Antonin Houska <ah@cybertec.at>
Author: Mikhail Nikalayeu <mihailnikalayeu@gmail.com> (small changes)
---
 src/backend/access/common/toast_internals.c   |   3 +-
 src/backend/access/heap/heapam.c              |  51 ++-
 src/backend/access/heap/heapam_handler.c      |  23 +-
 src/backend/access/transam/xact.c             |  52 +++
 src/backend/commands/cluster.c                | 400 ++++++++++++++++--
 src/backend/replication/logical/decode.c      |  28 +-
 src/backend/replication/logical/snapbuild.c   |  22 +-
 .../pgoutput_repack/pgoutput_repack.c         |  68 ++-
 src/backend/storage/ipc/ipci.c                |   2 +
 .../utils/activity/wait_event_names.txt       |   1 +
 src/backend/utils/cache/inval.c               |  21 +
 src/backend/utils/cache/relcache.c            |   4 +
 src/include/access/heapam.h                   |  12 +-
 src/include/access/xact.h                     |   2 +
 src/include/commands/cluster.h                |  22 +
 src/include/storage/lwlocklist.h              |   1 +
 src/include/utils/inval.h                     |   2 +
 src/include/utils/rel.h                       |   7 +-
 src/include/utils/snapshot.h                  |   3 +
 .../injection_points/specs/repack.spec        |   4 -
 src/tools/pgindent/typedefs.list              |   1 +
 21 files changed, 635 insertions(+), 94 deletions(-)

diff --git a/src/backend/access/common/toast_internals.c b/src/backend/access/common/toast_internals.c
index a1d0eed8953..586eb42a137 100644
--- a/src/backend/access/common/toast_internals.c
+++ b/src/backend/access/common/toast_internals.c
@@ -320,7 +320,8 @@ toast_save_datum(Relation rel, Datum value,
 		memcpy(VARDATA(&chunk_data), data_p, chunk_size);
 		toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull);
 
-		heap_insert(toastrel, toasttup, mycid, options, NULL);
+		heap_insert(toastrel, toasttup, GetCurrentTransactionId(), mycid,
+					options, NULL);
 
 		/*
 		 * Create the index entry.  We cheat a little here by not using
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index f9a4fe3faed..fd17286cabe 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2070,7 +2070,7 @@ ReleaseBulkInsertStatePin(BulkInsertState bistate)
 /*
  *	heap_insert		- insert tuple into a heap
  *
- * The new tuple is stamped with current transaction ID and the specified
+ * The new tuple is stamped with specified transaction ID and the specified
  * command ID.
  *
  * See table_tuple_insert for comments about most of the input flags, except
@@ -2086,15 +2086,16 @@ ReleaseBulkInsertStatePin(BulkInsertState bistate)
  * reflected into *tup.
  */
 void
-heap_insert(Relation relation, HeapTuple tup, CommandId cid,
-			int options, BulkInsertState bistate)
+heap_insert(Relation relation, HeapTuple tup, TransactionId xid,
+			CommandId cid, int options, BulkInsertState bistate)
 {
-	TransactionId xid = GetCurrentTransactionId();
 	HeapTuple	heaptup;
 	Buffer		buffer;
 	Buffer		vmbuffer = InvalidBuffer;
 	bool		all_visible_cleared = false;
 
+	Assert(TransactionIdIsValid(xid));
+
 	/* Cheap, simplistic check that the tuple matches the rel's rowtype. */
 	Assert(HeapTupleHeaderGetNatts(tup->t_data) <=
 		   RelationGetNumberOfAttributes(relation));
@@ -2176,8 +2177,15 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
 		/*
 		 * If this is a catalog, we need to transmit combo CIDs to properly
 		 * decode, so log that as well.
+		 *
+		 * HEAP_INSERT_NO_LOGICAL should be set when applying data changes
+		 * done by other transactions during REPACK CONCURRENTLY. In such a
+		 * case, the insertion should not be decoded at all - see
+		 * heap_decode(). (It's also set by raw_heap_insert() for TOAST, but
+		 * TOAST does not pass this test anyway.)
 		 */
-		if (RelationIsAccessibleInLogicalDecoding(relation))
+		if ((options & HEAP_INSERT_NO_LOGICAL) == 0 &&
+			RelationIsAccessibleInLogicalDecoding(relation))
 			log_heap_new_cid(relation, heaptup);
 
 		/*
@@ -2723,7 +2731,8 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
 void
 simple_heap_insert(Relation relation, HeapTuple tup)
 {
-	heap_insert(relation, tup, GetCurrentCommandId(true), 0, NULL);
+	heap_insert(relation, tup, GetCurrentTransactionId(),
+				GetCurrentCommandId(true), 0, NULL);
 }
 
 /*
@@ -2780,11 +2789,11 @@ xmax_infomask_changed(uint16 new_infomask, uint16 old_infomask)
  */
 TM_Result
 heap_delete(Relation relation, ItemPointer tid,
-			CommandId cid, Snapshot crosscheck, bool wait,
-			TM_FailureData *tmfd, bool changingPart, bool wal_logical)
+			TransactionId xid, CommandId cid, Snapshot crosscheck, bool wait,
+			TM_FailureData *tmfd, bool changingPart,
+			bool wal_logical)
 {
 	TM_Result	result;
-	TransactionId xid = GetCurrentTransactionId();
 	ItemId		lp;
 	HeapTupleData tp;
 	Page		page;
@@ -2801,6 +2810,7 @@ heap_delete(Relation relation, ItemPointer tid,
 	bool		old_key_copied = false;
 
 	Assert(ItemPointerIsValid(tid));
+	Assert(TransactionIdIsValid(xid));
 
 	AssertHasSnapshotForToast(relation);
 
@@ -3097,8 +3107,12 @@ l1:
 		/*
 		 * For logical decode we need combo CIDs to properly decode the
 		 * catalog
+		 *
+		 * Like in heap_insert(), visibility is unchanged when called from
+		 * VACUUM FULL / CLUSTER.
 		 */
-		if (RelationIsAccessibleInLogicalDecoding(relation))
+		if (wal_logical &&
+			RelationIsAccessibleInLogicalDecoding(relation))
 			log_heap_new_cid(relation, &tp);
 
 		xlrec.flags = 0;
@@ -3217,11 +3231,12 @@ simple_heap_delete(Relation relation, ItemPointer tid)
 	TM_Result	result;
 	TM_FailureData tmfd;
 
-	result = heap_delete(relation, tid,
+	result = heap_delete(relation, tid, GetCurrentTransactionId(),
 						 GetCurrentCommandId(true), InvalidSnapshot,
 						 true /* wait for commit */ ,
 						 &tmfd, false,	/* changingPart */
 						 true /* wal_logical */ );
+
 	switch (result)
 	{
 		case TM_SelfModified:
@@ -3260,12 +3275,11 @@ simple_heap_delete(Relation relation, ItemPointer tid)
  */
 TM_Result
 heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
-			CommandId cid, Snapshot crosscheck, bool wait,
-			TM_FailureData *tmfd, LockTupleMode *lockmode,
+			TransactionId xid, CommandId cid, Snapshot crosscheck,
+			bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode,
 			TU_UpdateIndexes *update_indexes, bool wal_logical)
 {
 	TM_Result	result;
-	TransactionId xid = GetCurrentTransactionId();
 	Bitmapset  *hot_attrs;
 	Bitmapset  *sum_attrs;
 	Bitmapset  *key_attrs;
@@ -3305,6 +3319,7 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
 				infomask2_new_tuple;
 
 	Assert(ItemPointerIsValid(otid));
+	Assert(TransactionIdIsValid(xid));
 
 	/* Cheap, simplistic check that the tuple matches the rel's rowtype. */
 	Assert(HeapTupleHeaderGetNatts(newtup->t_data) <=
@@ -4144,8 +4159,12 @@ l2:
 		/*
 		 * For logical decoding we need combo CIDs to properly decode the
 		 * catalog.
+		 *
+		 * Like in heap_insert(), visibility is unchanged when called from
+		 * VACUUM FULL / CLUSTER.
 		 */
-		if (RelationIsAccessibleInLogicalDecoding(relation))
+		if (wal_logical &&
+			RelationIsAccessibleInLogicalDecoding(relation))
 		{
 			log_heap_new_cid(relation, &oldtup);
 			log_heap_new_cid(relation, heaptup);
@@ -4511,7 +4530,7 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup,
 	TM_FailureData tmfd;
 	LockTupleMode lockmode;
 
-	result = heap_update(relation, otid, tup,
+	result = heap_update(relation, otid, tup, GetCurrentTransactionId(),
 						 GetCurrentCommandId(true), InvalidSnapshot,
 						 true /* wait for commit */ ,
 						 &tmfd, &lockmode, update_indexes,
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index d03084768e0..b50f7dc9b9c 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -253,7 +253,8 @@ heapam_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid,
 	tuple->t_tableOid = slot->tts_tableOid;
 
 	/* Perform the insertion, and copy the resulting ItemPointer */
-	heap_insert(relation, tuple, cid, options, bistate);
+	heap_insert(relation, tuple, GetCurrentTransactionId(), cid, options,
+				bistate);
 	ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
 
 	if (shouldFree)
@@ -276,7 +277,8 @@ heapam_tuple_insert_speculative(Relation relation, TupleTableSlot *slot,
 	options |= HEAP_INSERT_SPECULATIVE;
 
 	/* Perform the insertion, and copy the resulting ItemPointer */
-	heap_insert(relation, tuple, cid, options, bistate);
+	heap_insert(relation, tuple, GetCurrentTransactionId(), cid, options,
+				bistate);
 	ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
 
 	if (shouldFree)
@@ -310,8 +312,8 @@ heapam_tuple_delete(Relation relation, ItemPointer tid, CommandId cid,
 	 * the storage itself is cleaning the dead tuples by itself, it is the
 	 * time to call the index tuple deletion also.
 	 */
-	return heap_delete(relation, tid, cid, crosscheck, wait, tmfd, changingPart,
-					   true);
+	return heap_delete(relation, tid, GetCurrentTransactionId(), cid,
+					   crosscheck, wait, tmfd, changingPart, true);
 }
 
 
@@ -329,7 +331,8 @@ heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot,
 	slot->tts_tableOid = RelationGetRelid(relation);
 	tuple->t_tableOid = slot->tts_tableOid;
 
-	result = heap_update(relation, otid, tuple, cid, crosscheck, wait,
+	result = heap_update(relation, otid, tuple, GetCurrentTransactionId(),
+						 cid, crosscheck, wait,
 						 tmfd, lockmode, update_indexes, true);
 	ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
 
@@ -2477,9 +2480,15 @@ reform_and_rewrite_tuple(HeapTuple tuple,
 		 * flag to skip logical decoding: as soon as REPACK CONCURRENTLY swaps
 		 * the relation files, it drops this relation, so no logical
 		 * replication subscription should need the data.
+		 *
+		 * It is also crucial to stamp the new record with the exact same xid
+		 * and cid, because the tuple must be visible to the snapshot of the
+		 * applied concurrent change later.
 		 */
-		heap_insert(NewHeap, copiedTuple, GetCurrentCommandId(true),
-					HEAP_INSERT_NO_LOGICAL, NULL);
+		CommandId	cid = HeapTupleHeaderGetRawCommandId(tuple->t_data);
+		TransactionId xid = HeapTupleHeaderGetXmin(tuple->t_data);
+
+		heap_insert(NewHeap, copiedTuple, xid, cid, HEAP_INSERT_NO_LOGICAL, NULL);
 	}
 
 	heap_freetuple(copiedTuple);
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 5670f2bfbde..e913594fc07 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -126,6 +126,18 @@ static FullTransactionId XactTopFullTransactionId = {InvalidTransactionId};
 static int	nParallelCurrentXids = 0;
 static TransactionId *ParallelCurrentXids;
 
+/*
+ * Another case that requires TransactionIdIsCurrentTransactionId() to behave
+ * specially is when REPACK CONCURRENTLY is processing data changes made in
+ * the old storage of a table by other transactions. When applying the changes
+ * to the new storage, the backend executing the CLUSTER command needs to act
+ * on behalf on those other transactions. The transactions responsible for the
+ * changes in the old storage are stored in this array, sorted by
+ * xidComparator.
+ */
+static int	nRepackCurrentXids = 0;
+static TransactionId *RepackCurrentXids = NULL;
+
 /*
  * Miscellaneous flag bits to record events which occur on the top level
  * transaction. These flags are only persisted in MyXactFlags and are intended
@@ -973,6 +985,8 @@ TransactionIdIsCurrentTransactionId(TransactionId xid)
 		int			low,
 					high;
 
+		Assert(nRepackCurrentXids == 0);
+
 		low = 0;
 		high = nParallelCurrentXids - 1;
 		while (low <= high)
@@ -992,6 +1006,21 @@ TransactionIdIsCurrentTransactionId(TransactionId xid)
 		return false;
 	}
 
+	/*
+	 * When executing CLUSTER CONCURRENTLY, the array of current transactions
+	 * is given.
+	 */
+	if (nRepackCurrentXids > 0)
+	{
+		Assert(nParallelCurrentXids == 0);
+
+		return bsearch(&xid,
+					   RepackCurrentXids,
+					   nRepackCurrentXids,
+					   sizeof(TransactionId),
+					   xidComparator) != NULL;
+	}
+
 	/*
 	 * We will return true for the Xid of the current subtransaction, any of
 	 * its subcommitted children, any of its parents, or any of their
@@ -5661,6 +5690,29 @@ EndParallelWorkerTransaction(void)
 	CurrentTransactionState->blockState = TBLOCK_DEFAULT;
 }
 
+/*
+ * SetRepackCurrentXids
+ *		Set the XID array that TransactionIdIsCurrentTransactionId() should
+ *		use.
+ */
+void
+SetRepackCurrentXids(TransactionId *xip, int xcnt)
+{
+	RepackCurrentXids = xip;
+	nRepackCurrentXids = xcnt;
+}
+
+/*
+ * ResetRepackCurrentXids
+ *		Undo the effect of SetRepackCurrentXids().
+ */
+void
+ResetRepackCurrentXids(void)
+{
+	RepackCurrentXids = NULL;
+	nRepackCurrentXids = 0;
+}
+
 /*
  * ShowTransactionState
  *		Debug support
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index 511b2bb6c43..a44724f3757 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -82,6 +82,11 @@ typedef struct
  * The following definitions are used for concurrent processing.
  */
 
+/*
+ * OID of the table being repacked by this backend.
+ */
+static Oid	repacked_rel = InvalidOid;
+
 /*
  * The locators are used to avoid logical decoding of data that we do not need
  * for our table.
@@ -125,8 +130,10 @@ static List *get_tables_to_repack_partitioned(RepackCommand cmd,
 static bool cluster_is_permitted_for_relation(RepackCommand cmd,
 											  Oid relid, Oid userid);
 
-static void begin_concurrent_repack(Relation rel);
-static void end_concurrent_repack(void);
+static void begin_concurrent_repack(Relation rel, Relation *index_p,
+									bool *entered_p);
+static void end_concurrent_repack(bool error);
+static void cluster_before_shmem_exit_callback(int code, Datum arg);
 static LogicalDecodingContext *setup_logical_decoding(Oid relid,
 													  const char *slotname,
 													  TupleDesc tupdesc);
@@ -146,6 +153,7 @@ static void apply_concurrent_delete(Relation rel, HeapTuple tup_target,
 									ConcurrentChange *change);
 static HeapTuple find_target_tuple(Relation rel, ScanKey key, int nkeys,
 								   HeapTuple tup_key,
+								   Snapshot snapshot,
 								   IndexInsertState *iistate,
 								   TupleTableSlot *ident_slot,
 								   IndexScanDesc *scan_p);
@@ -450,6 +458,8 @@ cluster_rel(RepackCommand cmd, bool usingindex,
 	bool		verbose = ((params->options & CLUOPT_VERBOSE) != 0);
 	bool		recheck = ((params->options & CLUOPT_RECHECK) != 0);
 	bool		concurrent = ((params->options & CLUOPT_CONCURRENT) != 0);
+	bool		entered,
+				success;
 
 	/*
 	 * Check that the correct lock is held. The lock mode is
@@ -620,23 +630,30 @@ cluster_rel(RepackCommand cmd, bool usingindex,
 		TransferPredicateLocksToHeapRelation(OldHeap);
 
 	/* rebuild_relation does all the dirty work */
+	entered = false;
+	success = false;
 	PG_TRY();
 	{
 		/*
-		 * For concurrent processing, make sure that our logical decoding
-		 * ignores data changes of other tables than the one we are
-		 * processing.
+		 * For concurrent processing, make sure that
+		 *
+		 * 1) our logical decoding ignores data changes of other tables than
+		 * the one we are processing.
+		 *
+		 * 2) other transactions treat this table as if it was a system / user
+		 * catalog, and WAL the relevant additional information.
 		 */
 		if (concurrent)
-			begin_concurrent_repack(OldHeap);
+			begin_concurrent_repack(OldHeap, &index, &entered);
 
 		rebuild_relation(cmd, usingindex, OldHeap, index, save_userid,
 						 verbose, concurrent);
+		success = true;
 	}
 	PG_FINALLY();
 	{
-		if (concurrent)
-			end_concurrent_repack();
+		if (concurrent && entered)
+			end_concurrent_repack(!success);
 	}
 	PG_END_TRY();
 
@@ -2396,6 +2413,47 @@ determine_clustered_index(Relation rel, bool usingindex, const char *indexname)
 }
 
 
+/*
+ * Each relation being processed by REPACK CONCURRENTLY must be in the
+ * repackedRels hashtable.
+ */
+typedef struct RepackedRel
+{
+	Oid			relid;
+	Oid			dbid;
+} RepackedRel;
+
+static HTAB *RepackedRelsHash = NULL;
+
+/*
+ * Maximum number of entries in the hashtable.
+ *
+ * A replication slot is needed for the processing, so use this GUC to
+ * allocate memory for the hashtable.
+ */
+#define	MAX_REPACKED_RELS	(max_replication_slots)
+
+Size
+RepackShmemSize(void)
+{
+	return hash_estimate_size(MAX_REPACKED_RELS, sizeof(RepackedRel));
+}
+
+void
+RepackShmemInit(void)
+{
+	HASHCTL		info;
+
+	info.keysize = sizeof(RepackedRel);
+	info.entrysize = info.keysize;
+
+	RepackedRelsHash = ShmemInitHash("Repacked Relations",
+									 MAX_REPACKED_RELS,
+									 MAX_REPACKED_RELS,
+									 &info,
+									 HASH_ELEM | HASH_BLOBS);
+}
+
 /*
  * Call this function before REPACK CONCURRENTLY starts to setup logical
  * decoding. It makes sure that other users of the table put enough
@@ -2410,11 +2468,119 @@ determine_clustered_index(Relation rel, bool usingindex, const char *indexname)
  *
  * Note that TOAST table needs no attention here as it's not scanned using
  * historic snapshot.
+ *
+ * 'index_p' is in/out argument because the function unlocks the index
+ * temporarily.
+ *
+ * 'enter_p' receives a bool value telling whether relation OID was entered
+ * into RepackedRelsHash or not.
  */
 static void
-begin_concurrent_repack(Relation rel)
+begin_concurrent_repack(Relation rel, Relation *index_p, bool *entered_p)
 {
-	Oid			toastrelid;
+	Oid			relid,
+				toastrelid;
+	Relation	index = NULL;
+	Oid			indexid = InvalidOid;
+	RepackedRel key,
+			   *entry;
+	bool		found;
+	static bool before_shmem_exit_callback_setup = false;
+
+	relid = RelationGetRelid(rel);
+	index = index_p ? *index_p : NULL;
+
+	/*
+	 * Make sure that we do not leave an entry in RepackedRelsHash if exiting
+	 * due to FATAL.
+	 */
+	if (!before_shmem_exit_callback_setup)
+	{
+		before_shmem_exit(cluster_before_shmem_exit_callback, 0);
+		before_shmem_exit_callback_setup = true;
+	}
+
+	memset(&key, 0, sizeof(key));
+	key.relid = relid;
+	key.dbid = MyDatabaseId;
+
+	*entered_p = false;
+	LWLockAcquire(RepackedRelsLock, LW_EXCLUSIVE);
+	entry = (RepackedRel *)
+		hash_search(RepackedRelsHash, &key, HASH_ENTER_NULL, &found);
+	if (found)
+	{
+		/*
+		 * Since REPACK CONCURRENTLY takes ShareRowExclusiveLock, a conflict
+		 * should occur much earlier. However that lock may be released
+		 * temporarily, see below.  Anyway, we should complain whatever the
+		 * reason of the conflict might be.
+		 */
+		ereport(ERROR,
+				(errmsg("relation \"%s\" is already being processed by REPACK CONCURRENTLY",
+						RelationGetRelationName(rel))));
+	}
+	if (entry == NULL)
+		ereport(ERROR,
+				(errmsg("too many requests for REPACK CONCURRENTLY at a time")),
+				(errhint("Please consider increasing the \"max_replication_slots\" configuration parameter.")));
+
+	/*
+	 * Even if anything fails below, the caller has to do cleanup in the
+	 * shared memory.
+	 */
+	*entered_p = true;
+
+	/*
+	 * Enable the callback to remove the entry in case of exit. We should not
+	 * do this earlier, otherwise an attempt to insert already existing entry
+	 * could make us remove that entry (inserted by another backend) during
+	 * ERROR handling.
+	 */
+	Assert(!OidIsValid(repacked_rel));
+	repacked_rel = relid;
+
+	LWLockRelease(RepackedRelsLock);
+
+	/*
+	 * Make sure that other backends are aware of the new hash entry as soon
+	 * as they open our table.
+	 */
+	CacheInvalidateRelcacheImmediate(relid);
+
+	/*
+	 * Also make sure that the existing users of the table update their
+	 * relcache entry as soon as they try to run DML commands on it.
+	 *
+	 * ShareLock is the weakest lock that conflicts with DMLs. If any backend
+	 * has a lower lock, we assume it'll accept our invalidation message when
+	 * it changes the lock mode.
+	 *
+	 * Before upgrading the lock on the relation, close the index temporarily
+	 * to avoid a deadlock if another backend running DML already has its lock
+	 * (ShareLock) on the table and waits for the lock on the index.
+	 */
+	if (index)
+	{
+		indexid = RelationGetRelid(index);
+		index_close(index, ShareUpdateExclusiveLock);
+	}
+	LockRelationOid(relid, ShareLock);
+	UnlockRelationOid(relid, ShareLock);
+	if (OidIsValid(indexid))
+	{
+		/*
+		 * Re-open the index and check that it hasn't changed while unlocked.
+		 */
+		check_index_is_clusterable(rel, indexid, ShareUpdateExclusiveLock);
+
+		/*
+		 * Return the new relcache entry to the caller. (It's been locked by
+		 * the call above.)
+		 */
+		index = index_open(indexid, NoLock);
+		*index_p = index;
+	}
 
 	/* Avoid logical decoding of other relations by this backend. */
 	repacked_rel_locator = rel->rd_locator;
@@ -2432,15 +2598,122 @@ begin_concurrent_repack(Relation rel)
 
 /*
  * Call this when done with REPACK CONCURRENTLY.
+ *
+ * 'error' tells whether the function is being called in order to handle
+ * error.
  */
 static void
-end_concurrent_repack(void)
+end_concurrent_repack(bool error)
 {
+	RepackedRel key;
+	RepackedRel *entry = NULL;
+	Oid			relid = repacked_rel;
+
+	/* Remove the relation from the hash if we managed to insert one. */
+	if (OidIsValid(repacked_rel))
+	{
+		memset(&key, 0, sizeof(key));
+		key.relid = repacked_rel;
+		key.dbid = MyDatabaseId;
+		LWLockAcquire(RepackedRelsLock, LW_EXCLUSIVE);
+		entry = hash_search(RepackedRelsHash, &key, HASH_REMOVE, NULL);
+		LWLockRelease(RepackedRelsLock);
+
+		/*
+		 * Make others refresh their information whether they should still
+		 * treat the table as catalog from the perspective of writing WAL.
+		 *
+		 * XXX Unlike entering the entry into the hashtable, we do not bother
+		 * with locking and unlocking the table here:
+		 *
+		 * 1) On normal completion (and sometimes even on ERROR), the caller
+		 * is already holding AccessExclusiveLock on the table, so there
+		 * should be no relcache reference unaware of this change.
+		 *
+		 * 2) In the other cases, the worst scenario is that the other
+		 * backends will write unnecessary information to WAL until they close
+		 * the relation.
+		 *
+		 * Should we use ShareLock mode to fix 2) at least for the non-FATAL
+		 * errors? (Our before_shmem_exit callback is in charge of FATAL, and
+		 * that probably should not try to acquire any lock.)
+		 */
+		CacheInvalidateRelcacheImmediate(repacked_rel);
+
+		/*
+		 * By clearing this variable we also disable
+		 * cluster_before_shmem_exit_callback().
+		 */
+		repacked_rel = InvalidOid;
+	}
+
 	/*
 	 * Restore normal function of (future) logical decoding for this backend.
 	 */
 	repacked_rel_locator.relNumber = InvalidOid;
 	repacked_rel_toast_locator.relNumber = InvalidOid;
+
+	/*
+	 * On normal completion (!error), we should not really fail to remove the
+	 * entry. But if it wasn't there for any reason, raise ERROR to make sure
+	 * the transaction is aborted: if other transactions, while changing the
+	 * contents of the relation, didn't know that REPACK CONCURRENTLY was in
+	 * progress, they could have missed to WAL enough information, and thus we
+	 * could have produced an inconsistent table contents.
+	 *
+	 * On the other hand, if we are already handling an error, there's no
+	 * reason to worry about inconsistent contents of the new storage because
+	 * the transaction is going to be rolled back anyway. Furthermore, by
+	 * raising ERROR here we'd shadow the original error.
+	 */
+	if (!error)
+	{
+		char	   *relname;
+
+		if (OidIsValid(relid) && entry == NULL)
+		{
+			relname = get_rel_name(relid);
+			if (!relname)
+				ereport(ERROR,
+						(errmsg("cache lookup failed for relation %u",
+								relid)));
+
+			ereport(ERROR,
+					(errmsg("relation \"%s\" not found among repacked relations",
+							relname)));
+		}
+	}
+}
+
+/*
+ * A wrapper to call end_concurrent_repack() as a before_shmem_exit callback.
+ */
+static void
+cluster_before_shmem_exit_callback(int code, Datum arg)
+{
+	if (OidIsValid(repacked_rel))
+		end_concurrent_repack(true);
+}
+
+/*
+ * Check if relation is currently being processed by REPACK CONCURRENTLY.
+ */
+bool
+is_concurrent_repack_in_progress(Oid relid)
+{
+	RepackedRel key,
+			   *entry;
+
+	memset(&key, 0, sizeof(key));
+	key.relid = relid;
+	key.dbid = MyDatabaseId;
+
+	LWLockAcquire(RepackedRelsLock, LW_SHARED);
+	entry = (RepackedRel *)
+		hash_search(RepackedRelsHash, &key, HASH_FIND, NULL);
+	LWLockRelease(RepackedRelsLock);
+
+	return entry != NULL;
 }
 
 /*
@@ -2502,6 +2775,9 @@ setup_logical_decoding(Oid relid, const char *slotname, TupleDesc tupdesc)
 	dstate->relid = relid;
 	dstate->tstore = tuplestore_begin_heap(false, false,
 										   maintenance_work_mem);
+#ifdef USE_ASSERT_CHECKING
+	dstate->last_change_xid = InvalidTransactionId;
+#endif
 
 	dstate->tupdesc = tupdesc;
 
@@ -2649,6 +2925,7 @@ apply_concurrent_changes(RepackDecodingState *dstate, Relation rel,
 		char	   *change_raw,
 				   *src;
 		ConcurrentChange change;
+		Snapshot	snapshot;
 		bool		isnull[1];
 		Datum		values[1];
 
@@ -2717,8 +2994,30 @@ apply_concurrent_changes(RepackDecodingState *dstate, Relation rel,
 
 			/*
 			 * Find the tuple to be updated or deleted.
+			 *
+			 * As the table being REPACKed concurrently is treated like a
+			 * catalog, new CID is WAL-logged and decoded. And since we use
+			 * the same XID that the original DMLs did, the snapshot used for
+			 * the logical decoding (by now converted to a non-historic MVCC
+			 * snapshot) should see the tuples inserted previously into the
+			 * new heap and/or updated there.
 			 */
-			tup_exist = find_target_tuple(rel, key, nkeys, tup_key,
+			snapshot = change.snapshot;
+
+			/*
+			 * Set what should be considered current transaction (and
+			 * subtransactions) during visibility check.
+			 *
+			 * Note that this snapshot was created from a historic snapshot
+			 * using SnapBuildMVCCFromHistoric(), which does not touch
+			 * 'subxip'. Thus, unlike in a regular MVCC snapshot, the array
+			 * only contains the transactions whose data changes we are
+			 * applying, and its subtransactions. That's exactly what we need
+			 * to check if particular xact is a "current transaction:".
+			 */
+			SetRepackCurrentXids(snapshot->subxip, snapshot->subxcnt);
+
+			tup_exist = find_target_tuple(rel, key, nkeys, tup_key, snapshot,
 										  iistate, ident_slot, &ind_scan);
 			if (tup_exist == NULL)
 				elog(ERROR, "Failed to find target tuple");
@@ -2729,6 +3028,8 @@ apply_concurrent_changes(RepackDecodingState *dstate, Relation rel,
 			else
 				apply_concurrent_delete(rel, tup_exist, &change);
 
+			ResetRepackCurrentXids();
+
 			if (tup_old != NULL)
 			{
 				pfree(tup_old);
@@ -2741,14 +3042,14 @@ apply_concurrent_changes(RepackDecodingState *dstate, Relation rel,
 		else
 			elog(ERROR, "Unrecognized kind of change: %d", change.kind);
 
-		/*
-		 * If a change was applied now, increment CID for next writes and
-		 * update the snapshot so it sees the changes we've applied so far.
-		 */
-		if (change.kind != CHANGE_UPDATE_OLD)
+		/* Free the snapshot if this is the last change that needed it. */
+		Assert(change.snapshot->active_count > 0);
+		change.snapshot->active_count--;
+		if (change.snapshot->active_count == 0)
 		{
-			CommandCounterIncrement();
-			UpdateActiveSnapshotCommandId();
+			if (change.snapshot == dstate->snapshot)
+				dstate->snapshot = NULL;
+			FreeSnapshot(change.snapshot);
 		}
 
 		/* TTSOpsMinimalTuple has .get_heap_tuple==NULL. */
@@ -2768,16 +3069,35 @@ static void
 apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup,
 						IndexInsertState *iistate, TupleTableSlot *index_slot)
 {
+	Snapshot	snapshot = change->snapshot;
 	List	   *recheck;
 
+	/*
+	 * For INSERT, the visibility information is not important, but we use the
+	 * snapshot to get CID. Index functions might need the whole snapshot
+	 * anyway.
+	 */
+	SetRepackCurrentXids(snapshot->subxip, snapshot->subxcnt);
+
+	/*
+	 * Write the tuple into the new heap.
+	 *
+	 * The snapshot is the one we used to decode the insert (though converted
+	 * to "non-historic" MVCC snapshot), i.e. the snapshot's curcid is the
+	 * tuple CID incremented by one (due to the "new CID" WAL record that got
+	 * written along with the INSERT record). Thus if we want to use the
+	 * original CID, we need to subtract 1 from curcid.
+	 */
+	Assert(snapshot->curcid != InvalidCommandId &&
+		   snapshot->curcid > FirstCommandId);
 
 	/*
 	 * Like simple_heap_insert(), but make sure that the INSERT is not
 	 * logically decoded - see reform_and_rewrite_tuple() for more
 	 * information.
 	 */
-	heap_insert(rel, tup, GetCurrentCommandId(true), HEAP_INSERT_NO_LOGICAL,
-				NULL);
+	heap_insert(rel, tup, change->xid, snapshot->curcid - 1,
+				HEAP_INSERT_NO_LOGICAL, NULL);
 
 	/*
 	 * Update indexes.
@@ -2785,6 +3105,7 @@ apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup,
 	 * In case functions in the index need the active snapshot and caller
 	 * hasn't set one.
 	 */
+	PushActiveSnapshot(snapshot);
 	ExecStoreHeapTuple(tup, index_slot, false);
 	recheck = ExecInsertIndexTuples(iistate->rri,
 									index_slot,
@@ -2795,6 +3116,8 @@ apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup,
 									NIL,	/* arbiterIndexes */
 									false	/* onlySummarizing */
 		);
+	PopActiveSnapshot();
+	ResetRepackCurrentXids();
 
 	/*
 	 * If recheck is required, it must have been preformed on the source
@@ -2816,6 +3139,7 @@ apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target,
 	TU_UpdateIndexes update_indexes;
 	TM_Result	res;
 	List	   *recheck;
+	Snapshot	snapshot = change->snapshot;
 
 	/*
 	 * Write the new tuple into the new heap. ('tup' gets the TID assigned
@@ -2823,13 +3147,19 @@ apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target,
 	 *
 	 * Do it like in simple_heap_update(), except for 'wal_logical' (and
 	 * except for 'wait').
+	 *
+	 * Regarding CID, see the comment in apply_concurrent_insert().
 	 */
+	Assert(snapshot->curcid != InvalidCommandId &&
+		   snapshot->curcid > FirstCommandId);
+
 	res = heap_update(rel, &tup_target->t_self, tup,
-					  GetCurrentCommandId(true),
+					  change->xid, snapshot->curcid - 1,
 					  InvalidSnapshot,
 					  false,	/* no wait - only we are doing changes */
 					  &tmfd, &lockmode, &update_indexes,
-					  false /* wal_logical */ );
+	/* wal_logical */
+					  false);
 	if (res != TM_Ok)
 		ereport(ERROR, (errmsg("failed to apply concurrent UPDATE")));
 
@@ -2837,6 +3167,7 @@ apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target,
 
 	if (update_indexes != TU_None)
 	{
+		PushActiveSnapshot(snapshot);
 		recheck = ExecInsertIndexTuples(iistate->rri,
 										index_slot,
 										iistate->estate,
@@ -2846,6 +3177,7 @@ apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target,
 										NIL,	/* arbiterIndexes */
 		/* onlySummarizing */
 										update_indexes == TU_Summarizing);
+		PopActiveSnapshot();
 		list_free(recheck);
 	}
 
@@ -2858,6 +3190,12 @@ apply_concurrent_delete(Relation rel, HeapTuple tup_target,
 {
 	TM_Result	res;
 	TM_FailureData tmfd;
+	Snapshot	snapshot = change->snapshot;
+
+
+	/* Regarding CID, see the comment in apply_concurrent_insert(). */
+	Assert(snapshot->curcid != InvalidCommandId &&
+		   snapshot->curcid > FirstCommandId);
 
 	/*
 	 * Delete tuple from the new heap.
@@ -2865,11 +3203,11 @@ apply_concurrent_delete(Relation rel, HeapTuple tup_target,
 	 * Do it like in simple_heap_delete(), except for 'wal_logical' (and
 	 * except for 'wait').
 	 */
-	res = heap_delete(rel, &tup_target->t_self, GetCurrentCommandId(true),
-					  InvalidSnapshot, false,
-					  &tmfd,
-					  false,	/* no wait - only we are doing changes */
-					  false /* wal_logical */ );
+	res = heap_delete(rel, &tup_target->t_self, change->xid,
+					  snapshot->curcid - 1, InvalidSnapshot, false,
+					  &tmfd, false,
+	/* wal_logical */
+					  false);
 
 	if (res != TM_Ok)
 		ereport(ERROR, (errmsg("failed to apply concurrent DELETE")));
@@ -2890,7 +3228,7 @@ apply_concurrent_delete(Relation rel, HeapTuple tup_target,
  */
 static HeapTuple
 find_target_tuple(Relation rel, ScanKey key, int nkeys, HeapTuple tup_key,
-				  IndexInsertState *iistate,
+				  Snapshot snapshot, IndexInsertState *iistate,
 				  TupleTableSlot *ident_slot, IndexScanDesc *scan_p)
 {
 	IndexScanDesc scan;
@@ -2899,7 +3237,7 @@ find_target_tuple(Relation rel, ScanKey key, int nkeys, HeapTuple tup_key,
 	HeapTuple	result = NULL;
 
 	/* XXX no instrumentation for now */
-	scan = index_beginscan(rel, iistate->ident_index, GetActiveSnapshot(),
+	scan = index_beginscan(rel, iistate->ident_index, snapshot,
 						   NULL, nkeys, 0);
 	*scan_p = scan;
 	index_rescan(scan, key, nkeys, NULL, 0);
@@ -2971,6 +3309,8 @@ process_concurrent_changes(LogicalDecodingContext *ctx, XLogRecPtr end_of_wal,
 	}
 	PG_FINALLY();
 	{
+		ResetRepackCurrentXids();
+
 		if (rel_src)
 			rel_dst->rd_toastoid = InvalidOid;
 	}
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 5dc4ae58ffe..9fefcffd8b3 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -475,9 +475,14 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 	/*
 	 * If the change is not intended for logical decoding, do not even
-	 * establish transaction for it - REPACK CONCURRENTLY is the typical use
-	 * case.
-	 *
+	 * establish transaction for it. This is particularly important if the
+	 * record was generated by REPACK CONCURRENTLY because this command uses
+	 * the original XID when doing changes in the new storage. The decoding
+	 * system probably does not expect to see the same transaction multiple
+	 * times.
+	 */
+
+	/*
 	 * First, check if REPACK CONCURRENTLY is being performed by this backend.
 	 * If so, only decode data changes of the table that it is processing, and
 	 * the changes of its TOAST relation.
@@ -504,11 +509,11 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	 * Second, skip records which do not contain sufficient information for
 	 * the decoding.
 	 *
-	 * The problem we solve here is that REPACK CONCURRENTLY generates WAL
-	 * when doing changes in the new table. Those changes should not be useful
-	 * for any other user (such as logical replication subscription) because
-	 * the new table will eventually be dropped (after REPACK CONCURRENTLY has
-	 * assigned its file to the "old table").
+	 * One particular problem we solve here is that REPACK CONCURRENTLY
+	 * generates WAL when doing changes in the new table. Those changes should
+	 * not be decoded because reorderbuffer.c considers their XID already
+	 * committed. (REPACK CONCURRENTLY deliberately generates WAL records in
+	 * such a way that they are skipped here.)
 	 */
 	switch (info)
 	{
@@ -995,13 +1000,6 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 	xlrec = (xl_heap_insert *) XLogRecGetData(r);
 
-	/*
-	 * Ignore insert records without new tuples (this does happen when
-	 * raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
-	 */
-	if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
-		return;
-
 	/* only interested in our database */
 	XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
 	if (target_locator.dbOid != ctx->slot->data.database)
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 8e5116a9cab..72a38074a7b 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -155,7 +155,7 @@ static bool ExportInProgress = false;
 static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
 
 /* snapshot building/manipulation/distribution functions */
-static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
+static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder, XLogRecPtr lsn);
 
 static void SnapBuildFreeSnapshot(Snapshot snap);
 
@@ -352,12 +352,17 @@ SnapBuildSnapDecRefcount(Snapshot snap)
  * Build a new snapshot, based on currently committed catalog-modifying
  * transactions.
  *
+ * 'lsn' is the location of the commit record (of a catalog-changing
+ * transaction) that triggered creation of the snapshot. Pass
+ * InvalidXLogRecPtr for the transaction base snapshot or if it the user of
+ * the snapshot should not need the LSN.
+ *
  * In-progress transactions with catalog access are *not* allowed to modify
  * these snapshots; they have to copy them and fill in appropriate ->curcid
  * and ->subxip/subxcnt values.
  */
 static Snapshot
-SnapBuildBuildSnapshot(SnapBuild *builder)
+SnapBuildBuildSnapshot(SnapBuild *builder, XLogRecPtr lsn)
 {
 	Snapshot	snapshot;
 	Size		ssize;
@@ -425,6 +430,7 @@ SnapBuildBuildSnapshot(SnapBuild *builder)
 	snapshot->active_count = 0;
 	snapshot->regd_count = 0;
 	snapshot->snapXactCompletionCount = 0;
+	snapshot->lsn = lsn;
 
 	return snapshot;
 }
@@ -461,7 +467,7 @@ SnapBuildInitialSnapshot(SnapBuild *builder)
 	if (TransactionIdIsValid(MyProc->xmin))
 		elog(ERROR, "cannot build an initial slot snapshot when MyProc->xmin already is valid");
 
-	snap = SnapBuildBuildSnapshot(builder);
+	snap = SnapBuildBuildSnapshot(builder, InvalidXLogRecPtr);
 
 	/*
 	 * We know that snap->xmin is alive, enforced by the logical xmin
@@ -502,7 +508,7 @@ SnapBuildInitialSnapshotForRepack(SnapBuild *builder)
 
 	Assert(builder->state == SNAPBUILD_CONSISTENT);
 
-	snap = SnapBuildBuildSnapshot(builder);
+	snap = SnapBuildBuildSnapshot(builder, InvalidXLogRecPtr);
 	return SnapBuildMVCCFromHistoric(snap, false);
 }
 
@@ -636,7 +642,7 @@ SnapBuildGetOrBuildSnapshot(SnapBuild *builder)
 	/* only build a new snapshot if we don't have a prebuilt one */
 	if (builder->snapshot == NULL)
 	{
-		builder->snapshot = SnapBuildBuildSnapshot(builder);
+		builder->snapshot = SnapBuildBuildSnapshot(builder, InvalidXLogRecPtr);
 		/* increase refcount for the snapshot builder */
 		SnapBuildSnapIncRefcount(builder->snapshot);
 	}
@@ -716,7 +722,7 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
 		/* only build a new snapshot if we don't have a prebuilt one */
 		if (builder->snapshot == NULL)
 		{
-			builder->snapshot = SnapBuildBuildSnapshot(builder);
+			builder->snapshot = SnapBuildBuildSnapshot(builder, lsn);
 			/* increase refcount for the snapshot builder */
 			SnapBuildSnapIncRefcount(builder->snapshot);
 		}
@@ -1130,7 +1136,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
 		if (builder->snapshot)
 			SnapBuildSnapDecRefcount(builder->snapshot);
 
-		builder->snapshot = SnapBuildBuildSnapshot(builder);
+		builder->snapshot = SnapBuildBuildSnapshot(builder, lsn);
 
 		/* we might need to execute invalidations, add snapshot */
 		if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
@@ -1958,7 +1964,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
 	{
 		SnapBuildSnapDecRefcount(builder->snapshot);
 	}
-	builder->snapshot = SnapBuildBuildSnapshot(builder);
+	builder->snapshot = SnapBuildBuildSnapshot(builder, InvalidXLogRecPtr);
 	SnapBuildSnapIncRefcount(builder->snapshot);
 
 	ReorderBufferSetRestartPoint(builder->reorder, lsn);
diff --git a/src/backend/replication/pgoutput_repack/pgoutput_repack.c b/src/backend/replication/pgoutput_repack/pgoutput_repack.c
index 687fbbc59bb..28bd16f9cc7 100644
--- a/src/backend/replication/pgoutput_repack/pgoutput_repack.c
+++ b/src/backend/replication/pgoutput_repack/pgoutput_repack.c
@@ -32,7 +32,8 @@ static void plugin_truncate(struct LogicalDecodingContext *ctx,
 							Relation relations[],
 							ReorderBufferChange *change);
 static void store_change(LogicalDecodingContext *ctx,
-						 ConcurrentChangeKind kind, HeapTuple tuple);
+						 ConcurrentChangeKind kind, HeapTuple tuple,
+						 TransactionId xid);
 
 void
 _PG_output_plugin_init(OutputPluginCallbacks *cb)
@@ -100,6 +101,7 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			  Relation relation, ReorderBufferChange *change)
 {
 	RepackDecodingState *dstate;
+	Snapshot	snapshot;
 
 	dstate = (RepackDecodingState *) ctx->output_writer_private;
 
@@ -107,6 +109,48 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	if (relation->rd_id != dstate->relid)
 		return;
 
+	/*
+	 * Catalog snapshot is fine because the table we are processing is
+	 * temporarily considered a user catalog table.
+	 */
+	snapshot = GetCatalogSnapshot(InvalidOid);
+	Assert(snapshot->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
+	Assert(!snapshot->suboverflowed);
+
+	/*
+	 * This should not happen, but if we don't have enough information to
+	 * apply a new snapshot, the consequences would be bad. Thus prefer ERROR
+	 * to Assert().
+	 */
+	if (XLogRecPtrIsInvalid(snapshot->lsn))
+		ereport(ERROR, (errmsg("snapshot has invalid LSN")));
+
+	/*
+	 * reorderbuffer.c changes the catalog snapshot as soon as it sees a new
+	 * CID or a commit record of a catalog-changing transaction.
+	 */
+	if (dstate->snapshot == NULL || snapshot->lsn != dstate->snapshot_lsn ||
+		snapshot->curcid != dstate->snapshot->curcid)
+	{
+		/* CID should not go backwards. */
+		Assert(dstate->snapshot == NULL ||
+			   snapshot->curcid >= dstate->snapshot->curcid ||
+			   change->txn->xid != dstate->last_change_xid);
+
+		/*
+		 * XXX Is it a problem that the copy is created in
+		 * TopTransactionContext?
+		 *
+		 * XXX Wouldn't it be o.k. for SnapBuildMVCCFromHistoric() to set xcnt
+		 * to 0 instead of converting xip in this case? The point is that
+		 * transactions which are still in progress from the perspective of
+		 * reorderbuffer.c could not be replayed yet, so we do not need to
+		 * examine their XIDs.
+		 */
+		dstate->snapshot = SnapBuildMVCCFromHistoric(snapshot, false);
+		dstate->snapshot_lsn = snapshot->lsn;
+	}
+
 	/* Decode entry depending on its type */
 	switch (change->action)
 	{
@@ -124,7 +168,7 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				if (newtuple == NULL)
 					elog(ERROR, "Incomplete insert info.");
 
-				store_change(ctx, CHANGE_INSERT, newtuple);
+				store_change(ctx, CHANGE_INSERT, newtuple, change->txn->xid);
 			}
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
@@ -141,9 +185,11 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 					elog(ERROR, "Incomplete update info.");
 
 				if (oldtuple != NULL)
-					store_change(ctx, CHANGE_UPDATE_OLD, oldtuple);
+					store_change(ctx, CHANGE_UPDATE_OLD, oldtuple,
+								 change->txn->xid);
 
-				store_change(ctx, CHANGE_UPDATE_NEW, newtuple);
+				store_change(ctx, CHANGE_UPDATE_NEW, newtuple,
+							 change->txn->xid);
 			}
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
@@ -156,7 +202,7 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				if (oldtuple == NULL)
 					elog(ERROR, "Incomplete delete info.");
 
-				store_change(ctx, CHANGE_DELETE, oldtuple);
+				store_change(ctx, CHANGE_DELETE, oldtuple, change->txn->xid);
 			}
 			break;
 		default:
@@ -190,13 +236,13 @@ plugin_truncate(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	if (i == nrelations)
 		return;
 
-	store_change(ctx, CHANGE_TRUNCATE, NULL);
+	store_change(ctx, CHANGE_TRUNCATE, NULL, InvalidTransactionId);
 }
 
 /* Store concurrent data change. */
 static void
 store_change(LogicalDecodingContext *ctx, ConcurrentChangeKind kind,
-			 HeapTuple tuple)
+			 HeapTuple tuple, TransactionId xid)
 {
 	RepackDecodingState *dstate;
 	char	   *change_raw;
@@ -266,6 +312,11 @@ store_change(LogicalDecodingContext *ctx, ConcurrentChangeKind kind,
 	dst = dst_start + SizeOfConcurrentChange;
 	memcpy(dst, tuple->t_data, tuple->t_len);
 
+	/* Initialize the other fields. */
+	change.xid = xid;
+	change.snapshot = dstate->snapshot;
+	dstate->snapshot->active_count++;
+
 	/* The data has been copied. */
 	if (flattened)
 		pfree(tuple);
@@ -279,6 +330,9 @@ store:
 	isnull[0] = false;
 	tuplestore_putvalues(dstate->tstore, dstate->tupdesc_change,
 						 values, isnull);
+#ifdef USE_ASSERT_CHECKING
+	dstate->last_change_xid = xid;
+#endif
 
 	/* Accounting. */
 	dstate->nchanges++;
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index e9ddf39500c..e24e1795aa9 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -151,6 +151,7 @@ CalculateShmemSize(int *num_semaphores)
 	size = add_size(size, InjectionPointShmemSize());
 	size = add_size(size, SlotSyncShmemSize());
 	size = add_size(size, AioShmemSize());
+	size = add_size(size, RepackShmemSize());
 
 	/* include additional requested shmem from preload libraries */
 	size = add_size(size, total_addin_request);
@@ -344,6 +345,7 @@ CreateOrAttachShmemStructs(void)
 	WaitEventCustomShmemInit();
 	InjectionPointShmemInit();
 	AioShmemInit();
+	RepackShmemInit();
 }
 
 /*
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 5427da5bc1b..e94c83726d6 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -352,6 +352,7 @@ DSMRegistry	"Waiting to read or update the dynamic shared memory registry."
 InjectionPoint	"Waiting to read or update information related to injection points."
 SerialControl	"Waiting to read or update shared <filename>pg_serial</filename> state."
 AioWorkerSubmissionQueue	"Waiting to access AIO worker submission queue."
+RepackedRels	"Waiting to access to hash table with list of repacked relations."
 
 #
 # END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE)
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 02505c88b8e..ecaa2283c2a 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -1643,6 +1643,27 @@ CacheInvalidateRelcache(Relation relation)
 								 databaseId, relationId);
 }
 
+/*
+ * CacheInvalidateRelcacheImmediate
+ *		Send invalidation message for the specified relation's relcache entry.
+ *
+ * Currently this is used in REPACK CONCURRENTLY, to make sure that other
+ * backends are aware that the command is being executed for the relation.
+ */
+void
+CacheInvalidateRelcacheImmediate(Oid relid)
+{
+	SharedInvalidationMessage msg;
+
+	msg.rc.id = SHAREDINVALRELCACHE_ID;
+	msg.rc.dbId = MyDatabaseId;
+	msg.rc.relId = relid;
+	/* check AddCatcacheInvalidationMessage() for an explanation */
+	VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
+
+	SendSharedInvalidMessages(&msg, 1);
+}
+
 /*
  * CacheInvalidateRelcacheAll
  *		Register invalidation of the whole relcache at the end of command.
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index d27a4c30548..ea565b5b053 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -1279,6 +1279,10 @@ retry:
 	/* make sure relation is marked as having no open file yet */
 	relation->rd_smgr = NULL;
 
+	/* Is REPACK CONCURRENTLY in progress? */
+	relation->rd_repack_concurrent =
+		is_concurrent_repack_in_progress(targetRelId);
+
 	/*
 	 * now we can free the memory allocated for pg_class_tuple
 	 */
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index b82dd17a966..981425f23b6 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -316,22 +316,24 @@ extern BulkInsertState GetBulkInsertState(void);
 extern void FreeBulkInsertState(BulkInsertState);
 extern void ReleaseBulkInsertStatePin(BulkInsertState bistate);
 
-extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid,
-						int options, BulkInsertState bistate);
+extern void heap_insert(Relation relation, HeapTuple tup, TransactionId xid,
+						CommandId cid, int options, BulkInsertState bistate);
 extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots,
 							  int ntuples, CommandId cid, int options,
 							  BulkInsertState bistate);
 extern TM_Result heap_delete(Relation relation, ItemPointer tid,
-							 CommandId cid, Snapshot crosscheck, bool wait,
+							 TransactionId xid, CommandId cid,
+							 Snapshot crosscheck, bool wait,
 							 struct TM_FailureData *tmfd, bool changingPart,
 							 bool wal_logical);
 extern void heap_finish_speculative(Relation relation, ItemPointer tid);
 extern void heap_abort_speculative(Relation relation, ItemPointer tid);
 extern TM_Result heap_update(Relation relation, ItemPointer otid,
-							 HeapTuple newtup,
+							 HeapTuple newtup, TransactionId xid,
 							 CommandId cid, Snapshot crosscheck, bool wait,
 							 struct TM_FailureData *tmfd, LockTupleMode *lockmode,
-							 TU_UpdateIndexes *update_indexes, bool wal_logical);
+							 TU_UpdateIndexes *update_indexes,
+							 bool wal_logical);
 extern TM_Result heap_lock_tuple(Relation relation, HeapTuple tuple,
 								 CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy,
 								 bool follow_updates,
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index b2bc10ee041..fbb66d559b6 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -482,6 +482,8 @@ extern Size EstimateTransactionStateSpace(void);
 extern void SerializeTransactionState(Size maxsize, char *start_address);
 extern void StartParallelWorkerTransaction(char *tstatespace);
 extern void EndParallelWorkerTransaction(void);
+extern void SetRepackCurrentXids(TransactionId *xip, int xcnt);
+extern void ResetRepackCurrentXids(void);
 extern bool IsTransactionBlock(void);
 extern bool IsTransactionOrTransactionBlock(void);
 extern char TransactionBlockStatusCode(void);
diff --git a/src/include/commands/cluster.h b/src/include/commands/cluster.h
index 4a508c57a50..5dba3d427f5 100644
--- a/src/include/commands/cluster.h
+++ b/src/include/commands/cluster.h
@@ -61,6 +61,14 @@ typedef struct ConcurrentChange
 	/* See the enum above. */
 	ConcurrentChangeKind kind;
 
+	/* Transaction that changes the data. */
+	TransactionId xid;
+
+	/*
+	 * Historic catalog snapshot that was used to decode this change.
+	 */
+	Snapshot	snapshot;
+
 	/*
 	 * The actual tuple.
 	 *
@@ -92,6 +100,8 @@ typedef struct RepackDecodingState
 	 * tuplestore does this transparently.
 	 */
 	Tuplestorestate *tstore;
+	/* XID of the last change added to tstore. */
+	TransactionId last_change_xid PG_USED_FOR_ASSERTS_ONLY;
 
 	/* The current number of changes in tstore. */
 	double		nchanges;
@@ -112,6 +122,14 @@ typedef struct RepackDecodingState
 	/* Slot to retrieve data from tstore. */
 	TupleTableSlot *tsslot;
 
+	/*
+	 * Historic catalog snapshot that was used to decode the most recent
+	 * change.
+	 */
+	Snapshot	snapshot;
+	/* LSN of the record  */
+	XLogRecPtr	snapshot_lsn;
+
 	ResourceOwner resowner;
 } RepackDecodingState;
 
@@ -141,4 +159,8 @@ extern void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap,
 							 MultiXactId cutoffMulti,
 							 char newrelpersistence);
 
+extern Size RepackShmemSize(void);
+extern void RepackShmemInit(void);
+extern bool is_concurrent_repack_in_progress(Oid relid);
+
 #endif							/* CLUSTER_H */
diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h
index 06a1ffd4b08..9a9880b3073 100644
--- a/src/include/storage/lwlocklist.h
+++ b/src/include/storage/lwlocklist.h
@@ -85,6 +85,7 @@ PG_LWLOCK(50, DSMRegistry)
 PG_LWLOCK(51, InjectionPoint)
 PG_LWLOCK(52, SerialControl)
 PG_LWLOCK(53, AioWorkerSubmissionQueue)
+PG_LWLOCK(54, RepackedRels)
 
 /*
  * There also exist several built-in LWLock tranches.  As with the predefined
diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h
index 9b871caef62..ae9dee394dc 100644
--- a/src/include/utils/inval.h
+++ b/src/include/utils/inval.h
@@ -50,6 +50,8 @@ extern void CacheInvalidateCatalog(Oid catalogId);
 
 extern void CacheInvalidateRelcache(Relation relation);
 
+extern void CacheInvalidateRelcacheImmediate(Oid relid);
+
 extern void CacheInvalidateRelcacheAll(void);
 
 extern void CacheInvalidateRelcacheByTuple(HeapTuple classTuple);
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index b552359915f..66de3bc0c29 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -253,6 +253,9 @@ typedef struct RelationData
 	bool		pgstat_enabled; /* should relation stats be counted */
 	/* use "struct" here to avoid needing to include pgstat.h: */
 	struct PgStat_TableStatus *pgstat_info; /* statistics collection area */
+
+	/* Is REPACK CONCURRENTLY being performed on this relation? */
+	bool		rd_repack_concurrent;
 } RelationData;
 
 
@@ -695,7 +698,9 @@ RelationCloseSmgr(Relation relation)
 #define RelationIsAccessibleInLogicalDecoding(relation) \
 	(XLogLogicalInfoActive() && \
 	 RelationNeedsWAL(relation) && \
-	 (IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation)))
+	 (IsCatalogRelation(relation) || \
+	  RelationIsUsedAsCatalogTable(relation) || \
+	  (relation)->rd_repack_concurrent))
 
 /*
  * RelationIsLogicallyLogged
diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h
index 0e546ec1497..014f27db7d7 100644
--- a/src/include/utils/snapshot.h
+++ b/src/include/utils/snapshot.h
@@ -13,6 +13,7 @@
 #ifndef SNAPSHOT_H
 #define SNAPSHOT_H
 
+#include "access/xlogdefs.h"
 #include "lib/pairingheap.h"
 
 
@@ -201,6 +202,8 @@ typedef struct SnapshotData
 	uint32		regd_count;		/* refcount on RegisteredSnapshots */
 	pairingheap_node ph_node;	/* link in the RegisteredSnapshots heap */
 
+	XLogRecPtr	lsn;			/* position in the WAL stream when taken */
+
 	/*
 	 * The transaction completion count at the time GetSnapshotData() built
 	 * this snapshot. Allows to avoid re-computing static snapshots when no
diff --git a/src/test/modules/injection_points/specs/repack.spec b/src/test/modules/injection_points/specs/repack.spec
index 75850334986..3711a7c92b9 100644
--- a/src/test/modules/injection_points/specs/repack.spec
+++ b/src/test/modules/injection_points/specs/repack.spec
@@ -86,9 +86,6 @@ step change_new
 # When applying concurrent data changes, we should see the effects of an
 # in-progress subtransaction.
 #
-# XXX Not sure this test is useful now - it was designed for the patch that
-# preserves tuple visibility and which therefore modifies
-# TransactionIdIsCurrentTransactionId().
 step change_subxact1
 {
 	BEGIN;
@@ -103,7 +100,6 @@ step change_subxact1
 # When applying concurrent data changes, we should not see the effects of a
 # rolled back subtransaction.
 #
-# XXX Is this test useful? See above.
 step change_subxact2
 {
 	BEGIN;
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index b64ab8dfab4..9f5f331cad6 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2540,6 +2540,7 @@ ReorderBufferTupleCidKey
 ReorderBufferUpdateProgressTxnCB
 ReorderTuple
 RepOriginId
+RepackedRel
 RepackCommand
 RepackDecodingState
 RepackStmt
-- 
2.39.5

