From 7835af4736e6260aee9b2ba0e5e1c4975fe7d8f2 Mon Sep 17 00:00:00 2001
From: Antonin Houska <ah@cybertec.at>
Date: Mon, 31 Mar 2025 15:47:08 +0200
Subject: [PATCH 9/9] Call logical_rewrite_heap_tuple() when applying
 concurrent data changes.

This was implemented for the sake of completeness, but I think it's currently
not needed. Possible use cases could be:

1. REPACK CONCURRENTLY can process system catalogs.

System catalogs are scanned using a historic snapshot during logical decoding,
and the "combo CIDs" information is needed for that. Since "combo CID" is
associated with the "file locator" and that locator is changed by REPACK, this
command must record the information on individual tuples being moved from the
old file to the new one. This is what logical_rewrite_heap_tuple() does.

However, the logical decoding subsystem currently does not support decoding of
data changes in the system catalog. Therefore, the CONCURRENTLY option cannot
be used for system catalogs.

2. REPACK CONCURRENTLY is processing a relation, but once it has released all
the locks (in order to get the exclusive lock), another backend runs REPACK
CONCURRENTLY on the same table. Since the relation is treated as a system
catalog while these commands are processing it (so it can be scanned using a
historic snapshot during the "initial load"), it is important that the 2nd
backend does not break decoding of the "combo CIDs" performed by the 1st
backend.

However, it's not practical to let multiple backends run REPACK CONCURRENTLY
on the same relation, so we forbid that.
---
 src/backend/access/heap/heapam_handler.c      |   2 +-
 src/backend/access/heap/rewriteheap.c         |  65 +++++-----
 src/backend/commands/cluster.c                | 113 +++++++++++++++---
 src/backend/replication/logical/decode.c      |  42 ++++++-
 .../pgoutput_repack/pgoutput_repack.c         |  21 ++--
 src/include/access/rewriteheap.h              |   5 +-
 src/include/commands/cluster.h                |   3 +
 src/include/replication/reorderbuffer.h       |   7 ++
 8 files changed, 198 insertions(+), 60 deletions(-)

diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 850708c7830..d7b0edc3bf8 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -734,7 +734,7 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
 
 	/* Initialize the rewrite operation */
 	rwstate = begin_heap_rewrite(OldHeap, NewHeap, OldestXmin, *xid_cutoff,
-								 *multi_cutoff);
+								 *multi_cutoff, true);
 
 
 	/* Set up sorting if wanted */
diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index 6aa2ed214f2..83076b582d7 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -214,10 +214,8 @@ static void raw_heap_insert(RewriteState state, HeapTuple tup);
 
 /* internal logical remapping prototypes */
 static void logical_begin_heap_rewrite(RewriteState state);
-static void logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid, HeapTuple new_tuple);
 static void logical_end_heap_rewrite(RewriteState state);
 
-
 /*
  * Begin a rewrite of a table
  *
@@ -226,18 +224,19 @@ static void logical_end_heap_rewrite(RewriteState state);
  * oldest_xmin	xid used by the caller to determine which tuples are dead
  * freeze_xid	xid before which tuples will be frozen
  * cutoff_multi	multixact before which multis will be removed
+ * tid_chains	need to maintain TID chains?
  *
  * Returns an opaque RewriteState, allocated in current memory context,
  * to be used in subsequent calls to the other functions.
  */
 RewriteState
 begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xmin,
-				   TransactionId freeze_xid, MultiXactId cutoff_multi)
+				   TransactionId freeze_xid, MultiXactId cutoff_multi,
+				   bool tid_chains)
 {
 	RewriteState state;
 	MemoryContext rw_cxt;
 	MemoryContext old_cxt;
-	HASHCTL		hash_ctl;
 
 	/*
 	 * To ease cleanup, make a separate context that will contain the
@@ -262,29 +261,34 @@ begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xm
 	state->rs_cxt = rw_cxt;
 	state->rs_bulkstate = smgr_bulk_start_rel(new_heap, MAIN_FORKNUM);
 
-	/* Initialize hash tables used to track update chains */
-	hash_ctl.keysize = sizeof(TidHashKey);
-	hash_ctl.entrysize = sizeof(UnresolvedTupData);
-	hash_ctl.hcxt = state->rs_cxt;
-
-	state->rs_unresolved_tups =
-		hash_create("Rewrite / Unresolved ctids",
-					128,		/* arbitrary initial size */
-					&hash_ctl,
-					HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
-
-	hash_ctl.entrysize = sizeof(OldToNewMappingData);
+	if (tid_chains)
+	{
+		HASHCTL		hash_ctl;
+
+		/* Initialize hash tables used to track update chains */
+		hash_ctl.keysize = sizeof(TidHashKey);
+		hash_ctl.entrysize = sizeof(UnresolvedTupData);
+		hash_ctl.hcxt = state->rs_cxt;
+
+		state->rs_unresolved_tups =
+			hash_create("Rewrite / Unresolved ctids",
+						128,	/* arbitrary initial size */
+						&hash_ctl,
+						HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+
+		hash_ctl.entrysize = sizeof(OldToNewMappingData);
+
+		state->rs_old_new_tid_map =
+			hash_create("Rewrite / Old to new tid map",
+						128,	/* arbitrary initial size */
+						&hash_ctl,
+						HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+	}
 
-	state->rs_old_new_tid_map =
-		hash_create("Rewrite / Old to new tid map",
-					128,		/* arbitrary initial size */
-					&hash_ctl,
-					HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+	logical_begin_heap_rewrite(state);
 
 	MemoryContextSwitchTo(old_cxt);
 
-	logical_begin_heap_rewrite(state);
-
 	return state;
 }
 
@@ -303,12 +307,15 @@ end_heap_rewrite(RewriteState state)
 	 * Write any remaining tuples in the UnresolvedTups table. If we have any
 	 * left, they should in fact be dead, but let's err on the safe side.
 	 */
-	hash_seq_init(&seq_status, state->rs_unresolved_tups);
-
-	while ((unresolved = hash_seq_search(&seq_status)) != NULL)
+	if (state->rs_unresolved_tups)
 	{
-		ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid);
-		raw_heap_insert(state, unresolved->tuple);
+		hash_seq_init(&seq_status, state->rs_unresolved_tups);
+
+		while ((unresolved = hash_seq_search(&seq_status)) != NULL)
+		{
+			ItemPointerSetInvalid(&unresolved->tuple->t_data->t_ctid);
+			raw_heap_insert(state, unresolved->tuple);
+		}
 	}
 
 	/* Write the last page, if any */
@@ -995,7 +1002,7 @@ logical_rewrite_log_mapping(RewriteState state, TransactionId xid,
  * Perform logical remapping for a tuple that's mapped from old_tid to
  * new_tuple->t_self by rewrite_heap_tuple() if necessary for the tuple.
  */
-static void
+void
 logical_rewrite_heap_tuple(RewriteState state, ItemPointerData old_tid,
 						   HeapTuple new_tuple)
 {
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index 734e47eaba3..e4b35b10884 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -23,6 +23,7 @@
 #include "access/heapam.h"
 #include "access/multixact.h"
 #include "access/relscan.h"
+#include "access/rewriteheap.h"
 #include "access/tableam.h"
 #include "access/toast_internals.h"
 #include "access/transam.h"
@@ -161,17 +162,21 @@ static HeapTuple get_changed_tuple(char *change);
 static void apply_concurrent_changes(RepackDecodingState *dstate,
 									 Relation rel, ScanKey key, int nkeys,
 									 IndexInsertState *iistate,
-									 struct timeval *must_complete);
+									 struct timeval *must_complete,
+									 RewriteState rwstate);
 static void apply_concurrent_insert(Relation rel, ConcurrentChange *change,
 									HeapTuple tup, IndexInsertState *iistate,
-									TupleTableSlot *index_slot);
+									TupleTableSlot *index_slot,
+									RewriteState rwstate);
 static void apply_concurrent_update(Relation rel, HeapTuple tup,
 									HeapTuple tup_target,
 									ConcurrentChange *change,
 									IndexInsertState *iistate,
-									TupleTableSlot *index_slot);
+									TupleTableSlot *index_slot,
+									RewriteState rwstate);
 static void apply_concurrent_delete(Relation rel, HeapTuple tup_target,
-									ConcurrentChange *change);
+									ConcurrentChange *change,
+									RewriteState rwstate);
 static HeapTuple find_target_tuple(Relation rel, ScanKey key, int nkeys,
 								   HeapTuple tup_key,
 								   Snapshot snapshot,
@@ -185,7 +190,8 @@ static bool process_concurrent_changes(LogicalDecodingContext *ctx,
 									   ScanKey ident_key,
 									   int ident_key_nentries,
 									   IndexInsertState *iistate,
-									   struct timeval *must_complete);
+									   struct timeval *must_complete,
+									   RewriteState rwstate);
 static bool processing_time_elapsed(struct timeval *must_complete);
 static IndexInsertState *get_index_insert_state(Relation relation,
 												Oid ident_index_id);
@@ -2746,7 +2752,7 @@ repack_decode_concurrent_changes(LogicalDecodingContext *ctx,
 static void
 apply_concurrent_changes(RepackDecodingState *dstate, Relation rel,
 						 ScanKey key, int nkeys, IndexInsertState *iistate,
-						 struct timeval *must_complete)
+						 struct timeval *must_complete, RewriteState rwstate)
 {
 	TupleTableSlot *index_slot,
 			   *ident_slot;
@@ -2821,7 +2827,8 @@ apply_concurrent_changes(RepackDecodingState *dstate, Relation rel,
 		{
 			Assert(tup_old == NULL);
 
-			apply_concurrent_insert(rel, &change, tup, iistate, index_slot);
+			apply_concurrent_insert(rel, &change, tup, iistate, index_slot,
+									rwstate);
 
 			pfree(tup);
 		}
@@ -2829,7 +2836,8 @@ apply_concurrent_changes(RepackDecodingState *dstate, Relation rel,
 				 change.kind == CHANGE_DELETE)
 		{
 			IndexScanDesc ind_scan = NULL;
-			HeapTuple	tup_key;
+			HeapTuple	tup_key,
+						tup_exist_cp;
 
 			if (change.kind == CHANGE_UPDATE_NEW)
 			{
@@ -2871,11 +2879,23 @@ apply_concurrent_changes(RepackDecodingState *dstate, Relation rel,
 			if (tup_exist == NULL)
 				elog(ERROR, "Failed to find target tuple");
 
+			/*
+			 * Update the mapping for xmax of the old version.
+			 *
+			 * Use a copy ('tup_exist' can point to shared buffer) with xmin
+			 * invalid because mapping of that should have been written on
+			 * insertion.
+			 */
+			tup_exist_cp = heap_copytuple(tup_exist);
+			HeapTupleHeaderSetXmin(tup_exist_cp->t_data, InvalidTransactionId);
+			logical_rewrite_heap_tuple(rwstate, change.old_tid, tup_exist_cp);
+			pfree(tup_exist_cp);
+
 			if (change.kind == CHANGE_UPDATE_NEW)
 				apply_concurrent_update(rel, tup, tup_exist, &change, iistate,
-										index_slot);
+										index_slot, rwstate);
 			else
-				apply_concurrent_delete(rel, tup_exist, &change);
+				apply_concurrent_delete(rel, tup_exist, &change, rwstate);
 
 			ResetRepackCurrentXids();
 
@@ -2928,9 +2948,12 @@ apply_concurrent_changes(RepackDecodingState *dstate, Relation rel,
 
 static void
 apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup,
-						IndexInsertState *iistate, TupleTableSlot *index_slot)
+						IndexInsertState *iistate, TupleTableSlot *index_slot,
+						RewriteState rwstate)
 {
+	HeapTupleHeader tup_hdr = tup->t_data;
 	Snapshot	snapshot = change->snapshot;
+	ItemPointerData old_tid;
 	List	   *recheck;
 
 	/*
@@ -2940,6 +2963,9 @@ apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup,
 	 */
 	SetRepackCurrentXids(snapshot->subxip, snapshot->subxcnt);
 
+	/* Remember location in the old heap. */
+	ItemPointerCopy(&tup_hdr->t_ctid, &old_tid);
+
 	/*
 	 * Write the tuple into the new heap.
 	 *
@@ -2955,6 +2981,14 @@ apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup,
 	heap_insert(rel, tup, change->xid, snapshot->curcid - 1,
 				HEAP_INSERT_NO_LOGICAL, NULL);
 
+	/*
+	 * Update the mapping for xmin. (xmax should be invalid). This is needed
+	 * because, during the processing, the table is considered an "user
+	 * catalog".
+	 */
+	Assert(!TransactionIdIsValid(HeapTupleHeaderGetRawXmax(tup->t_data)));
+	logical_rewrite_heap_tuple(rwstate, old_tid, tup);
+
 	/*
 	 * Update indexes.
 	 *
@@ -2988,15 +3022,23 @@ apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup,
 static void
 apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target,
 						ConcurrentChange *change, IndexInsertState *iistate,
-						TupleTableSlot *index_slot)
+						TupleTableSlot *index_slot, RewriteState rwstate)
 {
 	List	   *recheck;
 	LockTupleMode lockmode;
 	TU_UpdateIndexes update_indexes;
+	ItemPointerData tid_new_old_heap,
+				tid_old_new_heap;
 	TM_Result	res;
 	Snapshot	snapshot = change->snapshot;
 	TM_FailureData tmfd;
 
+	/* Location of the new tuple in the old heap. */
+	ItemPointerCopy(&tup->t_data->t_ctid, &tid_new_old_heap);
+
+	/* Location of the existing tuple in the new heap. */
+	ItemPointerCopy(&tup_target->t_self, &tid_old_new_heap);
+
 	/*
 	 * Write the new tuple into the new heap. ('tup' gets the TID assigned
 	 * here.)
@@ -3006,7 +3048,7 @@ apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target,
 	Assert(snapshot->curcid != InvalidCommandId &&
 		   snapshot->curcid > FirstCommandId);
 
-	res = heap_update(rel, &tup_target->t_self, tup,
+	res = heap_update(rel, &tid_old_new_heap, tup,
 					  change->xid, snapshot->curcid - 1,
 					  InvalidSnapshot,
 					  false,	/* no wait - only we are doing changes */
@@ -3016,6 +3058,10 @@ apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target,
 	if (res != TM_Ok)
 		ereport(ERROR, (errmsg("failed to apply concurrent UPDATE")));
 
+	/* Update the mapping for xmin of the new version. */
+	Assert(!TransactionIdIsValid(HeapTupleHeaderGetRawXmax(tup->t_data)));
+	logical_rewrite_heap_tuple(rwstate, tid_new_old_heap, tup);
+
 	ExecStoreHeapTuple(tup, index_slot, false);
 
 	if (update_indexes != TU_None)
@@ -3039,8 +3085,9 @@ apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target,
 
 static void
 apply_concurrent_delete(Relation rel, HeapTuple tup_target,
-						ConcurrentChange *change)
+						ConcurrentChange *change, RewriteState rwstate)
 {
+	ItemPointerData tid_old_new_heap;
 	TM_Result	res;
 	TM_FailureData tmfd;
 	Snapshot	snapshot = change->snapshot;
@@ -3049,7 +3096,10 @@ apply_concurrent_delete(Relation rel, HeapTuple tup_target,
 	Assert(snapshot->curcid != InvalidCommandId &&
 		   snapshot->curcid > FirstCommandId);
 
-	res = heap_delete(rel, &tup_target->t_self, change->xid,
+	/* Location of the existing tuple in the new heap. */
+	ItemPointerCopy(&tup_target->t_self, &tid_old_new_heap);
+
+	res = heap_delete(rel, &tid_old_new_heap, change->xid,
 					  snapshot->curcid - 1, InvalidSnapshot, false,
 					  &tmfd, false,
 	/* wal_logical */
@@ -3131,7 +3181,8 @@ static bool
 process_concurrent_changes(LogicalDecodingContext *ctx, XLogRecPtr end_of_wal,
 						   Relation rel_dst, Relation rel_src, ScanKey ident_key,
 						   int ident_key_nentries, IndexInsertState *iistate,
-						   struct timeval *must_complete)
+						   struct timeval *must_complete,
+						   RewriteState rwstate)
 {
 	RepackDecodingState *dstate;
 
@@ -3164,7 +3215,8 @@ process_concurrent_changes(LogicalDecodingContext *ctx, XLogRecPtr end_of_wal,
 			rel_dst->rd_toastoid = rel_src->rd_rel->reltoastrelid;
 
 		apply_concurrent_changes(dstate, rel_dst, ident_key,
-								 ident_key_nentries, iistate, must_complete);
+								 ident_key_nentries, iistate, must_complete,
+								 rwstate);
 	}
 	PG_FINALLY();
 	{
@@ -3349,6 +3401,7 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
 	Oid			ident_idx_old,
 				ident_idx_new;
 	IndexInsertState *iistate;
+	RewriteState rwstate;
 	ScanKey		ident_key;
 	int			ident_key_nentries;
 	XLogRecPtr	wal_insert_ptr,
@@ -3436,11 +3489,27 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
 	 * Apply concurrent changes first time, to minimize the time we need to
 	 * hold AccessExclusiveLock. (Quite some amount of WAL could have been
 	 * written during the data copying and index creation.)
+	 *
+	 * Now we are processing individual tuples, so pass false for
+	 * 'tid_chains'. Since rwstate is now only needed for
+	 * logical_begin_heap_rewrite(), none of the transaction IDs needs to be
+	 * valid.
 	 */
+	rwstate = begin_heap_rewrite(OldHeap, NewHeap,
+								 InvalidTransactionId,
+								 InvalidTransactionId,
+								 InvalidTransactionId,
+								 false);
 	process_concurrent_changes(ctx, end_of_wal, NewHeap,
 							   swap_toast_by_content ? OldHeap : NULL,
 							   ident_key, ident_key_nentries, iistate,
-							   NULL);
+							   NULL, rwstate);
+
+	/*
+	 * OldHeap will be closed, so we need to initialize rwstate again for the
+	 * next call of process_concurrent_changes().
+	 */
+	end_heap_rewrite(rwstate);
 
 	/*
 	 * Acquire AccessExclusiveLock on the table, its TOAST relation (if there
@@ -3528,6 +3597,11 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
 	end_of_wal = GetFlushRecPtr(NULL);
 
 	/* Apply the concurrent changes again. */
+	rwstate = begin_heap_rewrite(OldHeap, NewHeap,
+								 InvalidTransactionId,
+								 InvalidTransactionId,
+								 InvalidTransactionId,
+								 false);
 
 	/*
 	 * This time we have the exclusive lock on the table, so make sure that
@@ -3557,11 +3631,12 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
 	if (!process_concurrent_changes(ctx, end_of_wal, NewHeap,
 									swap_toast_by_content ? OldHeap : NULL,
 									ident_key, ident_key_nentries, iistate,
-									t_end_ptr))
+									t_end_ptr, rwstate))
 		ereport(ERROR,
 				(errmsg("could not process concurrent data changes in time"),
 				 errhint("Please consider adjusting \"repack_max_xlock_time\".")));
 
+	end_heap_rewrite(rwstate);
 
 	/* Remember info about rel before closing OldHeap */
 	relpersistence = OldHeap->rd_rel->relpersistence;
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 25bb92b33f2..6f4a5f5b95b 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -984,11 +984,13 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	xl_heap_insert *xlrec;
 	ReorderBufferChange *change;
 	RelFileLocator target_locator;
+	BlockNumber blknum;
+	HeapTupleHeader tuphdr;
 
 	xlrec = (xl_heap_insert *) XLogRecGetData(r);
 
 	/* only interested in our database */
-	XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
+	XLogRecGetBlockTag(r, 0, &target_locator, NULL, &blknum);
 	if (target_locator.dbOid != ctx->slot->data.database)
 		return;
 
@@ -1013,6 +1015,13 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 	DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);
 
+	/*
+	 * CTID is needed for logical_rewrite_heap_tuple(), when doing REPACK
+	 * CONCURRENTLY.
+	 */
+	tuphdr = change->data.tp.newtuple->t_data;
+	ItemPointerSet(&tuphdr->t_ctid, blknum, xlrec->offnum);
+
 	change->data.tp.clear_toast_afterwards = true;
 
 	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
@@ -1034,11 +1043,15 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	ReorderBufferChange *change;
 	char	   *data;
 	RelFileLocator target_locator;
+	BlockNumber old_blknum,
+				new_blknum;
 
 	xlrec = (xl_heap_update *) XLogRecGetData(r);
 
+	/* Retrieve blknum, so that we can compose CTID below. */
+	XLogRecGetBlockTag(r, 0, &target_locator, NULL, &new_blknum);
+
 	/* only interested in our database */
-	XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
 	if (target_locator.dbOid != ctx->slot->data.database)
 		return;
 
@@ -1055,6 +1068,7 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	{
 		Size		datalen;
 		Size		tuplelen;
+		HeapTupleHeader tuphdr;
 
 		data = XLogRecGetBlockData(r, 0, &datalen);
 
@@ -1064,6 +1078,13 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
 
 		DecodeXLogTuple(data, datalen, change->data.tp.newtuple);
+
+		/*
+		 * CTID is needed for logical_rewrite_heap_tuple(), when doing REPACK
+		 * CONCURRENTLY.
+		 */
+		tuphdr = change->data.tp.newtuple->t_data;
+		ItemPointerSet(&tuphdr->t_ctid, new_blknum, xlrec->new_offnum);
 	}
 
 	if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD)
@@ -1082,6 +1103,14 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		DecodeXLogTuple(data, datalen, change->data.tp.oldtuple);
 	}
 
+	/*
+	 * Remember the old tuple CTID, for the sake of
+	 * logical_rewrite_heap_tuple().
+	 */
+	if (!XLogRecGetBlockTagExtended(r, 1, NULL, NULL, &old_blknum, NULL))
+		old_blknum = new_blknum;
+	ItemPointerSet(&change->data.tp.old_tid, old_blknum, xlrec->old_offnum);
+
 	change->data.tp.clear_toast_afterwards = true;
 
 	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
@@ -1100,11 +1129,12 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	xl_heap_delete *xlrec;
 	ReorderBufferChange *change;
 	RelFileLocator target_locator;
+	BlockNumber blknum;
 
 	xlrec = (xl_heap_delete *) XLogRecGetData(r);
 
 	/* only interested in our database */
-	XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
+	XLogRecGetBlockTag(r, 0, &target_locator, NULL, &blknum);
 	if (target_locator.dbOid != ctx->slot->data.database)
 		return;
 
@@ -1136,6 +1166,12 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 		DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
 						datalen, change->data.tp.oldtuple);
+
+		/*
+		 * CTID is needed for logical_rewrite_heap_tuple(), when doing REPACK
+		 * CONCURRENTLY.
+		 */
+		ItemPointerSet(&change->data.tp.old_tid, blknum, xlrec->offnum);
 	}
 
 	change->data.tp.clear_toast_afterwards = true;
diff --git a/src/backend/replication/pgoutput_repack/pgoutput_repack.c b/src/backend/replication/pgoutput_repack/pgoutput_repack.c
index 28bd16f9cc7..24d9c9c4884 100644
--- a/src/backend/replication/pgoutput_repack/pgoutput_repack.c
+++ b/src/backend/replication/pgoutput_repack/pgoutput_repack.c
@@ -33,7 +33,7 @@ static void plugin_truncate(struct LogicalDecodingContext *ctx,
 							ReorderBufferChange *change);
 static void store_change(LogicalDecodingContext *ctx,
 						 ConcurrentChangeKind kind, HeapTuple tuple,
-						 TransactionId xid);
+						 TransactionId xid, ItemPointer old_tid);
 
 void
 _PG_output_plugin_init(OutputPluginCallbacks *cb)
@@ -168,7 +168,8 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				if (newtuple == NULL)
 					elog(ERROR, "Incomplete insert info.");
 
-				store_change(ctx, CHANGE_INSERT, newtuple, change->txn->xid);
+				store_change(ctx, CHANGE_INSERT, newtuple, change->txn->xid,
+							 NULL);
 			}
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
@@ -186,10 +187,10 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 				if (oldtuple != NULL)
 					store_change(ctx, CHANGE_UPDATE_OLD, oldtuple,
-								 change->txn->xid);
+								 change->txn->xid, NULL);
 
 				store_change(ctx, CHANGE_UPDATE_NEW, newtuple,
-							 change->txn->xid);
+							 change->txn->xid, &change->data.tp.old_tid);
 			}
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
@@ -202,7 +203,8 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 				if (oldtuple == NULL)
 					elog(ERROR, "Incomplete delete info.");
 
-				store_change(ctx, CHANGE_DELETE, oldtuple, change->txn->xid);
+				store_change(ctx, CHANGE_DELETE, oldtuple, change->txn->xid,
+							 &change->data.tp.old_tid);
 			}
 			break;
 		default:
@@ -236,13 +238,13 @@ plugin_truncate(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	if (i == nrelations)
 		return;
 
-	store_change(ctx, CHANGE_TRUNCATE, NULL, InvalidTransactionId);
+	store_change(ctx, CHANGE_TRUNCATE, NULL, InvalidTransactionId, NULL);
 }
 
 /* Store concurrent data change. */
 static void
 store_change(LogicalDecodingContext *ctx, ConcurrentChangeKind kind,
-			 HeapTuple tuple, TransactionId xid)
+			 HeapTuple tuple, TransactionId xid, ItemPointer old_tid)
 {
 	RepackDecodingState *dstate;
 	char	   *change_raw;
@@ -317,6 +319,11 @@ store_change(LogicalDecodingContext *ctx, ConcurrentChangeKind kind,
 	change.snapshot = dstate->snapshot;
 	dstate->snapshot->active_count++;
 
+	if (old_tid)
+		ItemPointerCopy(old_tid, &change.old_tid);
+	else
+		ItemPointerSetInvalid(&change.old_tid);
+
 	/* The data has been copied. */
 	if (flattened)
 		pfree(tuple);
diff --git a/src/include/access/rewriteheap.h b/src/include/access/rewriteheap.h
index 99c3f362adc..eebda35c7cb 100644
--- a/src/include/access/rewriteheap.h
+++ b/src/include/access/rewriteheap.h
@@ -23,11 +23,14 @@ typedef struct RewriteStateData *RewriteState;
 
 extern RewriteState begin_heap_rewrite(Relation old_heap, Relation new_heap,
 									   TransactionId oldest_xmin, TransactionId freeze_xid,
-									   MultiXactId cutoff_multi);
+									   MultiXactId cutoff_multi, bool tid_chains);
 extern void end_heap_rewrite(RewriteState state);
 extern void rewrite_heap_tuple(RewriteState state, HeapTuple old_tuple,
 							   HeapTuple new_tuple);
 extern bool rewrite_heap_dead_tuple(RewriteState state, HeapTuple old_tuple);
+extern void logical_rewrite_heap_tuple(RewriteState state,
+									   ItemPointerData old_tid,
+									   HeapTuple new_tuple);
 
 /*
  * On-Disk data format for an individual logical rewrite mapping.
diff --git a/src/include/commands/cluster.h b/src/include/commands/cluster.h
index 0267357a261..45cd3fe4276 100644
--- a/src/include/commands/cluster.h
+++ b/src/include/commands/cluster.h
@@ -78,6 +78,9 @@ typedef struct ConcurrentChange
 	/* Transaction that changes the data. */
 	TransactionId xid;
 
+	/* For UPDATE / DELETE, the location of the old tuple version. */
+	ItemPointerData old_tid;
+
 	/*
 	 * Historic catalog snapshot that was used to decode this change.
 	 */
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 3be0cbd7ebe..c2731947b22 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -104,6 +104,13 @@ typedef struct ReorderBufferChange
 			HeapTuple	oldtuple;
 			/* valid for INSERT || UPDATE */
 			HeapTuple	newtuple;
+
+			/*
+			 * REPACK CONCURRENTLY needs the old TID, even if the old tuple
+			 * itself is not WAL-logged (i.e. when the identity key does not
+			 * change).
+			 */
+			ItemPointerData old_tid;
 		}			tp;
 
 		/*
-- 
2.43.5

