From 9685a7bc1ac199de0a0514513888c5b2b1256db2 Mon Sep 17 00:00:00 2001
From: Antonin Houska <ah@cybertec.at>
Date: Tue, 17 Mar 2026 20:22:32 +0100
Subject: [PATCH v44 10/10] Teach snapshot builder to skip transactions running
 REPACK (CONCURRENTLY).

During logical decoding, we need to know if particular transaction performed
catalog changes because catalog information is needed to construct heap
tuples. To be sure that we have enough information of each transaction, the
logical decoding cannot start before all the already running transactions have
completed.

The problem with REPACK (CONCURRENTLY) is that it has XID assigned and writes
WAL records marked with it. Thus if another backend runs REPACK (CONCURRENTLY)
and tries to setup the logical decoding, it has to wait for the completion of
all the other transactions involved in REPACK (CONCURRENTLY).

However, REPACK (CONCURRENTLY) does not perform any catalog changes relevant
to logical decoding, so the other backends executing this command can ignore
it. This patch implements it by adding information about transactions
executing the command to the xl_running_xacts WAL record, and by teaching the
snapshot builder to use the information.
---
 src/backend/access/rmgrdesc/standbydesc.c   | 15 ++++-
 src/backend/access/transam/xlog.c           |  2 +
 src/backend/commands/cluster.c              | 16 +++++
 src/backend/replication/logical/snapbuild.c | 38 ++++++++---
 src/backend/storage/ipc/procarray.c         | 74 +++++++++++++++++++--
 src/backend/storage/ipc/standby.c           |  8 ++-
 src/include/access/xlog_internal.h          |  2 +-
 src/include/storage/proc.h                  |  3 +-
 src/include/storage/standby.h               |  3 +
 src/include/storage/standbydefs.h           |  2 +
 10 files changed, 140 insertions(+), 23 deletions(-)

diff --git a/src/backend/access/rmgrdesc/standbydesc.c b/src/backend/access/rmgrdesc/standbydesc.c
index 0a291354ae2..b96b73d51d3 100644
--- a/src/backend/access/rmgrdesc/standbydesc.c
+++ b/src/backend/access/rmgrdesc/standbydesc.c
@@ -21,10 +21,11 @@ standby_desc_running_xacts(StringInfo buf, xl_running_xacts *xlrec)
 {
 	int			i;
 
-	appendStringInfo(buf, "nextXid %u latestCompletedXid %u oldestRunningXid %u",
+	appendStringInfo(buf, "nextXid %u latestCompletedXid %u oldestRunningXid %u oldestRunningXidLogical %u",
 					 xlrec->nextXid,
 					 xlrec->latestCompletedXid,
-					 xlrec->oldestRunningXid);
+					 xlrec->oldestRunningXid,
+					 xlrec->oldestRunningXidLogical);
 	if (xlrec->xcnt > 0)
 	{
 		appendStringInfo(buf, "; %d xacts:", xlrec->xcnt);
@@ -41,6 +42,16 @@ standby_desc_running_xacts(StringInfo buf, xl_running_xacts *xlrec)
 		for (i = 0; i < xlrec->subxcnt; i++)
 			appendStringInfo(buf, " %u", xlrec->xids[xlrec->xcnt + i]);
 	}
+
+	if (xlrec->xcnt_repack > 0)
+	{
+		TransactionId	*xids_repack;
+
+		appendStringInfo(buf, "; %d xacts_repack:", xlrec->xcnt_repack);
+		xids_repack = xlrec->xids + xlrec->xcnt + xlrec->subxcnt;
+		for (i = 0; i < xlrec->xcnt_repack; i++)
+			appendStringInfo(buf, " %u", xids_repack[i]);
+	}
 }
 
 void
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f5c9a34374d..d050b0b1444 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -5912,6 +5912,7 @@ StartupXLOG(void)
 				 * subxids are listed with their parent prepared transactions.
 				 */
 				running.xcnt = nxids;
+				running.xcnt_repack = 0;
 				running.subxcnt = 0;
 				running.subxid_status = SUBXIDS_IN_SUBTRANS;
 				running.nextXid = XidFromFullTransactionId(checkPoint.nextXid);
@@ -8477,6 +8478,7 @@ xlog_redo(XLogReaderState *record)
 			 * with their parent prepared transactions.
 			 */
 			running.xcnt = nxids;
+			running.xcnt_repack = 0;
 			running.subxcnt = 0;
 			running.subxid_status = SUBXIDS_IN_SUBTRANS;
 			running.nextXid = XidFromFullTransactionId(checkPoint.nextXid);
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index 018e9f7b013..423cea26b0b 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -1075,6 +1075,22 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose,
 
 	if (concurrent)
 	{
+		/*
+		 * Do not let other backends wait for our completion during their
+		 * setup of logical replication. Unlike logical replication publisher,
+		 * we will have XID assigned, so the other backends - whether
+		 * walsenders involved in logical replication or regular backends
+		 * executing also REPACK (CONCURRENTLY) - would have to wait for our
+		 * completion before they can build their initial snapshot. It is
+		 * o.k. for any decoding backend to ignore us because we do not change
+		 * tuple descriptor of any table, and the data changes we write should
+		 * not be decoded by other backends.
+		 */
+		LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+		MyProc->statusFlags |= PROC_IN_REPACK;
+		ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
+		LWLockRelease(ProcArrayLock);
+
 		/*
 		 * The worker needs to be member of the locking group we're the leader
 		 * of. We ought to become the leader before the worker starts. The
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 9cf499ce7c6..fbdd4600a2b 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -1172,7 +1172,7 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
 	 * xmin, which looks odd but is correct and actually more efficient, since
 	 * we hit fast paths in heapam_visibility.c.
 	 */
-	builder->xmin = running->oldestRunningXid;
+	builder->xmin = running->oldestRunningXidLogical;
 
 	/* Remove transactions we don't need to keep track off anymore */
 	SnapBuildPurgeOlderTxn(builder);
@@ -1188,9 +1188,9 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
 	 */
 	xmin = ReorderBufferGetOldestXmin(builder->reorder);
 	if (xmin == InvalidTransactionId)
-		xmin = running->oldestRunningXid;
+		xmin = running->oldestRunningXidLogical;
 	elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u",
-		 builder->xmin, builder->xmax, running->oldestRunningXid, xmin);
+		 builder->xmin, builder->xmax, running->oldestRunningXidLogical, xmin);
 	LogicalIncreaseXminForSlot(lsn, xmin);
 
 	/*
@@ -1275,14 +1275,14 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 	 * have all necessary catalog rows anymore.
 	 */
 	if (TransactionIdIsNormal(builder->initial_xmin_horizon) &&
-		NormalTransactionIdPrecedes(running->oldestRunningXid,
+		NormalTransactionIdPrecedes(running->oldestRunningXidLogical,
 									builder->initial_xmin_horizon))
 	{
 		ereport(DEBUG1,
 				errmsg_internal("skipping snapshot at %X/%08X while building logical decoding snapshot, xmin horizon too low",
 								LSN_FORMAT_ARGS(lsn)),
 				errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
-								   builder->initial_xmin_horizon, running->oldestRunningXid));
+								   builder->initial_xmin_horizon, running->oldestRunningXidLogical));
 
 
 		SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon);
@@ -1299,7 +1299,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 	 * NB: We might have already started to incrementally assemble a snapshot,
 	 * so we need to be careful to deal with that.
 	 */
-	if (running->oldestRunningXid == running->nextXid)
+	if (running->oldestRunningXidLogical == running->nextXid)
 	{
 		if (!XLogRecPtrIsValid(builder->start_decoding_at) ||
 			builder->start_decoding_at <= lsn)
@@ -1378,14 +1378,14 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 	/*
 	 * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
 	 *
-	 * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid
+	 * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXidLogical
 	 * is >= than nextXid from when we switched to BUILDING_SNAPSHOT.  This
 	 * means all transactions starting afterwards have enough information to
 	 * be decoded.  Switch to FULL_SNAPSHOT.
 	 */
 	else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
 			 TransactionIdPrecedesOrEquals(builder->next_phase_at,
-										   running->oldestRunningXid))
+										   running->oldestRunningXidLogical))
 	{
 		builder->state = SNAPBUILD_FULL_SNAPSHOT;
 		builder->next_phase_at = running->nextXid;
@@ -1402,14 +1402,14 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 	/*
 	 * c) transition from FULL_SNAPSHOT to CONSISTENT.
 	 *
-	 * In FULL_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid is
+	 * In FULL_SNAPSHOT state, and this xl_running_xacts' oldestRunningXidLogical is
 	 * >= than nextXid from when we switched to FULL_SNAPSHOT.  This means all
 	 * transactions that are currently in progress have a catalog snapshot,
 	 * and all their changes have been collected.  Switch to CONSISTENT.
 	 */
 	else if (builder->state == SNAPBUILD_FULL_SNAPSHOT &&
 			 TransactionIdPrecedesOrEquals(builder->next_phase_at,
-										   running->oldestRunningXid))
+										   running->oldestRunningXidLogical))
 	{
 		builder->state = SNAPBUILD_CONSISTENT;
 		builder->next_phase_at = InvalidTransactionId;
@@ -1459,6 +1459,24 @@ SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
 		if (TransactionIdFollows(xid, cutoff))
 			continue;
 
+		/* Do not wait for transactions running REPACK (CONCURRENTLY). */
+		if (running->xcnt_repack > 0)
+		{
+			TransactionId	*xids_repack;
+			int		i;
+
+			xids_repack = running->xids + running->xcnt + running->subxcnt;
+
+			for (i = 0; i < running->xcnt_repack; i++)
+			{
+				if (xid == xids_repack[i])
+					break;
+			}
+			/* Found? */
+			if (i < running->xcnt_repack)
+				continue;
+		}
+
 		XactLockTableWait(xid, NULL, NULL, XLTW_None);
 	}
 
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index cc207cb56e3..b02ec4e8c3e 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -2643,15 +2643,24 @@ GetRunningTransactionData(void)
 	RunningTransactions CurrentRunningXacts = &CurrentRunningXactsData;
 	TransactionId latestCompletedXid;
 	TransactionId oldestRunningXid;
+	TransactionId oldestRunningXidLogical;
 	TransactionId oldestDatabaseRunningXid;
 	TransactionId *xids;
 	int			index;
-	int			count;
+	int			count, count_repack;
 	int			subcount;
 	bool		suboverflowed;
+	TransactionId	*xids_repack = NULL;
+	bool		logical_decoding_enabled = IsLogicalDecodingEnabled();
 
 	Assert(!RecoveryInProgress());
 
+	/*
+	 * TODO Consider a GUC to reserve certain amount of replication slots for
+	 * REPACK (CONCURRENTLY) and using it here.
+	 */
+#define		MAX_REPACK_XIDS		16
+
 	/*
 	 * Allocating space for maxProcs xids is usually overkill; numProcs would
 	 * be sufficient.  But it seems better to do the malloc while not holding
@@ -2663,11 +2672,14 @@ GetRunningTransactionData(void)
 	 */
 	if (CurrentRunningXacts->xids == NULL)
 	{
+		/* FIXME probably fails if logical decoding is enable on-the-fly */
+		int		nrepack = logical_decoding_enabled ? MAX_REPACK_XIDS : 0;
+
 		/*
 		 * First call
 		 */
 		CurrentRunningXacts->xids = (TransactionId *)
-			malloc(TOTAL_MAX_CACHED_SUBXIDS * sizeof(TransactionId));
+			malloc((TOTAL_MAX_CACHED_SUBXIDS + nrepack) * sizeof(TransactionId));
 		if (CurrentRunningXacts->xids == NULL)
 			ereport(ERROR,
 					(errcode(ERRCODE_OUT_OF_MEMORY),
@@ -2676,7 +2688,10 @@ GetRunningTransactionData(void)
 
 	xids = CurrentRunningXacts->xids;
 
-	count = subcount = 0;
+	if (logical_decoding_enabled)
+		xids_repack = palloc_array(TransactionId, MAX_REPACK_XIDS);
+
+	count = subcount = count_repack = 0;
 	suboverflowed = false;
 
 	/*
@@ -2688,7 +2703,7 @@ GetRunningTransactionData(void)
 
 	latestCompletedXid =
 		XidFromFullTransactionId(TransamVariables->latestCompletedXid);
-	oldestDatabaseRunningXid = oldestRunningXid =
+	oldestDatabaseRunningXid = oldestRunningXid = oldestRunningXidLogical =
 		XidFromFullTransactionId(TransamVariables->nextXid);
 
 	/*
@@ -2697,6 +2712,8 @@ GetRunningTransactionData(void)
 	for (index = 0; index < arrayP->numProcs; index++)
 	{
 		TransactionId xid;
+		int			pgprocno;
+		PGPROC	   *proc;
 
 		/* Fetch xid just once - see GetNewTransactionId */
 		xid = UINT32_ACCESS_ONCE(other_xids[index]);
@@ -2716,6 +2733,21 @@ GetRunningTransactionData(void)
 		if (TransactionIdPrecedes(xid, oldestRunningXid))
 			oldestRunningXid = xid;
 
+		if (logical_decoding_enabled &&
+			TransactionIdPrecedes(xid, oldestRunningXidLogical))
+		{
+			/*
+			 * Backends running REPACK concurrently need to be excluded from
+			 * oldestRunningXidLogical, otherwise the snapshot builder cannot
+			 * proceed in building the initial snapshot.
+			 */
+			pgprocno = arrayP->pgprocnos[index];
+			proc = &allProcs[pgprocno];
+
+			if ((proc->statusFlags & PROC_IN_REPACK) == 0)
+				oldestRunningXidLogical = xid;
+		}
+
 		/*
 		 * Also, update the oldest running xid within the current database. As
 		 * fetching pgprocno and PGPROC could cause cache misses, we do cheap
@@ -2723,8 +2755,8 @@ GetRunningTransactionData(void)
 		 */
 		if (TransactionIdPrecedes(xid, oldestDatabaseRunningXid))
 		{
-			int			pgprocno = arrayP->pgprocnos[index];
-			PGPROC	   *proc = &allProcs[pgprocno];
+			pgprocno = arrayP->pgprocnos[index];
+			proc = &allProcs[pgprocno];
 
 			if (proc->databaseId == MyDatabaseId)
 				oldestDatabaseRunningXid = xid;
@@ -2742,6 +2774,19 @@ GetRunningTransactionData(void)
 		 */
 
 		xids[count++] = xid;
+
+		/*
+		 * Collect XIDSs of transactions running REPACK (CONCURRENTLY).
+		 */
+		if (logical_decoding_enabled &&
+			count_repack < MAX_REPACK_XIDS)
+		{
+			pgprocno = arrayP->pgprocnos[index];
+			proc = &allProcs[pgprocno];
+
+			if (proc->statusFlags & PROC_IN_REPACK)
+				xids_repack[count_repack++] = xid;
+		}
 	}
 
 	/*
@@ -2782,6 +2827,19 @@ GetRunningTransactionData(void)
 		}
 	}
 
+	/*
+	 * Append the XIDs running REPACK (CONCURRENTLY), if any.
+	 *
+	 * XXX Should we sort the array and use bsearch() when using it?
+	 */
+	if (count_repack > 0)
+	{
+		for (int i = 0; i < count_repack; i++)
+			xids[count++] = xids_repack[i];
+	}
+	if (xids_repack)
+		pfree(xids_repack);
+
 	/*
 	 * It's important *not* to include the limits set by slots here because
 	 * snapbuild.c uses oldestRunningXid to manage its xmin horizon. If those
@@ -2791,11 +2849,13 @@ GetRunningTransactionData(void)
 	 * increases if slots do.
 	 */
 
-	CurrentRunningXacts->xcnt = count - subcount;
+	CurrentRunningXacts->xcnt = count - subcount - count_repack;
 	CurrentRunningXacts->subxcnt = subcount;
+	CurrentRunningXacts->xcnt_repack = count_repack;
 	CurrentRunningXacts->subxid_status = suboverflowed ? SUBXIDS_IN_SUBTRANS : SUBXIDS_IN_ARRAY;
 	CurrentRunningXacts->nextXid = XidFromFullTransactionId(TransamVariables->nextXid);
 	CurrentRunningXacts->oldestRunningXid = oldestRunningXid;
+	CurrentRunningXacts->oldestRunningXidLogical = oldestRunningXidLogical;
 	CurrentRunningXacts->oldestDatabaseRunningXid = oldestDatabaseRunningXid;
 	CurrentRunningXacts->latestCompletedXid = latestCompletedXid;
 
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index f3ad90c7c7a..c785a3db59f 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -1188,6 +1188,7 @@ standby_redo(XLogReaderState *record)
 		RunningTransactionsData running;
 
 		running.xcnt = xlrec->xcnt;
+		running.xcnt_repack = xlrec->xcnt_repack;
 		running.subxcnt = xlrec->subxcnt;
 		running.subxid_status = xlrec->subxid_overflow ? SUBXIDS_MISSING : SUBXIDS_IN_ARRAY;
 		running.nextXid = xlrec->nextXid;
@@ -1358,10 +1359,12 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
 	XLogRecPtr	recptr;
 
 	xlrec.xcnt = CurrRunningXacts->xcnt;
+	xlrec.xcnt_repack = CurrRunningXacts->xcnt_repack;
 	xlrec.subxcnt = CurrRunningXacts->subxcnt;
 	xlrec.subxid_overflow = (CurrRunningXacts->subxid_status != SUBXIDS_IN_ARRAY);
 	xlrec.nextXid = CurrRunningXacts->nextXid;
 	xlrec.oldestRunningXid = CurrRunningXacts->oldestRunningXid;
+	xlrec.oldestRunningXidLogical = CurrRunningXacts->oldestRunningXidLogical;
 	xlrec.latestCompletedXid = CurrRunningXacts->latestCompletedXid;
 
 	/* Header */
@@ -1370,9 +1373,10 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
 	XLogRegisterData(&xlrec, MinSizeOfXactRunningXacts);
 
 	/* array of TransactionIds */
-	if (xlrec.xcnt > 0)
+	if (xlrec.xcnt + xlrec.xcnt_repack > 0)
 		XLogRegisterData(CurrRunningXacts->xids,
-						 (xlrec.xcnt + xlrec.subxcnt) * sizeof(TransactionId));
+						 (xlrec.xcnt + xlrec.xcnt_repack + xlrec.subxcnt) *
+						 sizeof(TransactionId));
 
 	recptr = XLogInsert(RM_STANDBY_ID, XLOG_RUNNING_XACTS);
 
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 629ac3a7d3e..1730b07810f 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -31,7 +31,7 @@
 /*
  * Each page of XLOG file has a header like this:
  */
-#define XLOG_PAGE_MAGIC 0xD11D	/* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD11E	/* can be used as WAL version indicator */
 
 typedef struct XLogPageHeaderData
 {
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 1dad125706e..c6693b93a65 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -69,10 +69,11 @@ struct XidCache
 #define		PROC_AFFECTS_ALL_HORIZONS	0x20	/* this proc's xmin must be
 												 * included in vacuum horizons
 												 * in all databases */
+#define		PROC_IN_REPACK				0x40	/* currently REPACK (CONCURRENTLY) */
 
 /* flags reset at EOXact */
 #define		PROC_VACUUM_STATE_MASK \
-	(PROC_IN_VACUUM | PROC_IN_SAFE_IC | PROC_VACUUM_FOR_WRAPAROUND)
+	(PROC_IN_VACUUM | PROC_IN_SAFE_IC | PROC_VACUUM_FOR_WRAPAROUND | PROC_IN_REPACK)
 
 /*
  * Xmin-related flags. Make sure any flags that affect how the process' Xmin
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index 6a314c693cd..9e6e7b18786 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -127,10 +127,13 @@ typedef enum
 typedef struct RunningTransactionsData
 {
 	int			xcnt;			/* # of xact ids in xids[] */
+	int			xcnt_repack;	/* # of xacts running REPACK
+								 * (CONCURRENTLY). */
 	int			subxcnt;		/* # of subxact ids in xids[] */
 	subxids_array_status subxid_status;
 	TransactionId nextXid;		/* xid from TransamVariables->nextXid */
 	TransactionId oldestRunningXid; /* *not* oldestXmin */
+	TransactionId oldestRunningXidLogical;
 	TransactionId oldestDatabaseRunningXid; /* same as above, but within the
 											 * current database */
 	TransactionId latestCompletedXid;	/* so we can set xmax */
diff --git a/src/include/storage/standbydefs.h b/src/include/storage/standbydefs.h
index 231d251fd51..edad609fa9a 100644
--- a/src/include/storage/standbydefs.h
+++ b/src/include/storage/standbydefs.h
@@ -47,10 +47,12 @@ typedef struct xl_standby_locks
 typedef struct xl_running_xacts
 {
 	int			xcnt;			/* # of xact ids in xids[] */
+	int			xcnt_repack;	/* # of xacts running REPACK (CONCURRENTLY) */
 	int			subxcnt;		/* # of subxact ids in xids[] */
 	bool		subxid_overflow;	/* snapshot overflowed, subxids missing */
 	TransactionId nextXid;		/* xid from TransamVariables->nextXid */
 	TransactionId oldestRunningXid; /* *not* oldestXmin */
+	TransactionId oldestRunningXidLogical;
 	TransactionId latestCompletedXid;	/* so we can set xmax */
 
 	TransactionId xids[FLEXIBLE_ARRAY_MEMBER];
-- 
2.47.3

