From 53b6179a28b672070a285abbfc2bfe702e263a63 Mon Sep 17 00:00:00 2001
From: Antonin Houska <ah@cybertec.at>
Date: Fri, 3 Apr 2026 12:34:04 +0200
Subject: [PATCH] Introduce an option to make logical replication database
 specific.

By default, the logical decoding assumes access to shared catalogs, so the
snapshot builder needs to consider cluster-wide XIDs during startup. That in
turn means that, if any transaction is already running (and has XID assigned),
the snapshot builder needs to wait for its completion, as it does not know if
that transaction performed catalog changes earlier.

Possible problem with this concept is that if REPACK (CONCURRENTLY) is running
in some database, backends running the same command in other databases get
stuck until the first one has committed. Thus only as single backend in the
cluster can run REPACK (CONCURRENTLY) at any time. Likewise, REPACK
(CONCURRENTLY) can block walsenders starting on behalf of subscriptions
throughout the cluster.

This patch introduces the possibility for a backend to declare that its output
plugin does not use shared catalogs (i.e. catalogs that can be changed by
transactions running in other databases in the cluster). In that case, no
snapshot the backend will use during the decoding needs to contain information
about transactions running in other databases. Thus the snapshot builder only
needs to wait for completion of transactions in the current database.

Currently we only use this option in the REPACK background worker. It could
possibly be used in walsender involved in logical replication too, however
that would need thorough analysis of its output plugin.

The patch bumps WAL version number, due to a new field in xl_running_xacts.
---
 contrib/pg_visibility/pg_visibility.c       |  4 ++--
 src/backend/access/index/genam.c            | 18 ++++++++++++++++
 src/backend/access/rmgrdesc/standbydesc.c   |  2 ++
 src/backend/access/transam/xlog.c           |  2 +-
 src/backend/access/transam/xlogfuncs.c      |  2 +-
 src/backend/commands/repack_worker.c        |  8 +++++++
 src/backend/postmaster/bgwriter.c           |  2 +-
 src/backend/replication/logical/snapbuild.c | 10 ++++++++-
 src/backend/replication/slot.c              | 12 +++++++++--
 src/backend/storage/ipc/procarray.c         | 23 ++++++++++++++++++++-
 src/backend/storage/ipc/standby.c           | 14 +++++++++++--
 src/include/access/genam.h                  |  8 +++++++
 src/include/access/xlog_internal.h          |  2 +-
 src/include/storage/procarray.h             |  2 +-
 src/include/storage/standby.h               |  3 ++-
 src/include/storage/standbydefs.h           |  1 +
 16 files changed, 99 insertions(+), 14 deletions(-)

diff --git a/contrib/pg_visibility/pg_visibility.c b/contrib/pg_visibility/pg_visibility.c
index dfab0b64cf5..d564bd2a00c 100644
--- a/contrib/pg_visibility/pg_visibility.c
+++ b/contrib/pg_visibility/pg_visibility.c
@@ -621,7 +621,7 @@ GetStrictOldestNonRemovableTransactionId(Relation rel)
 	else if (rel == NULL || rel->rd_rel->relisshared)
 	{
 		/* Shared relation: take into account all running xids */
-		runningTransactions = GetRunningTransactionData();
+		runningTransactions = GetRunningTransactionData(InvalidOid);
 		LWLockRelease(ProcArrayLock);
 		LWLockRelease(XidGenLock);
 		return runningTransactions->oldestRunningXid;
@@ -632,7 +632,7 @@ GetStrictOldestNonRemovableTransactionId(Relation rel)
 		 * Normal relation: take into account xids running within the current
 		 * database
 		 */
-		runningTransactions = GetRunningTransactionData();
+		runningTransactions = GetRunningTransactionData(InvalidOid);
 		LWLockRelease(ProcArrayLock);
 		LWLockRelease(XidGenLock);
 		return runningTransactions->oldestDatabaseRunningXid;
diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c
index 1408989c568..97ea882cf71 100644
--- a/src/backend/access/index/genam.c
+++ b/src/backend/access/index/genam.c
@@ -37,6 +37,14 @@
 #include "utils/ruleutils.h"
 #include "utils/snapmgr.h"
 
+/*
+ * If a backend is going to do logical decoding and if the output plugin does
+ * not need to access shared catalogs, setting this variable to false can make
+ * the decoding startup faster. In particular, the backend will not need to
+ * wait for completion of already running transactions in other databases.
+ */
+bool accessSharedCatalogsInDecoding = true;
+
 
 /* ----------------------------------------------------------------
  *		general access method routines
@@ -394,6 +402,16 @@ systable_beginscan(Relation heapRelation,
 	SysScanDesc sysscan;
 	Relation	irel;
 
+	/*
+	 * If this backend promised that it won't access shared catalogs during
+	 * logical decoding, this seems to be the right place to check.
+	 *
+	 * XXX Should this be ereport(ERROR) ?
+	 */
+	Assert(!HistoricSnapshotActive() ||
+		   accessSharedCatalogsInDecoding ||
+		   !heapRelation->rd_rel->relisshared);
+
 	if (indexOK &&
 		!IgnoreSystemIndexes &&
 		!ReindexIsProcessingIndex(indexId))
diff --git a/src/backend/access/rmgrdesc/standbydesc.c b/src/backend/access/rmgrdesc/standbydesc.c
index 0a291354ae2..685d1bdb024 100644
--- a/src/backend/access/rmgrdesc/standbydesc.c
+++ b/src/backend/access/rmgrdesc/standbydesc.c
@@ -41,6 +41,8 @@ standby_desc_running_xacts(StringInfo buf, xl_running_xacts *xlrec)
 		for (i = 0; i < xlrec->subxcnt; i++)
 			appendStringInfo(buf, " %u", xlrec->xids[xlrec->xcnt + i]);
 	}
+
+	appendStringInfo(buf, "; dbid: %u", xlrec->dbid);
 }
 
 void
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 2c1c6f88b74..8da8eff93e4 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7336,7 +7336,7 @@ CreateCheckPoint(int flags)
 	 * recovery we don't need to write running xact data.
 	 */
 	if (!shutdown && XLogStandbyInfoActive())
-		LogStandbySnapshot();
+		LogStandbySnapshot(InvalidOid);
 
 	START_CRIT_SECTION();
 
diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c
index 65bbaeda59c..0f5979691e6 100644
--- a/src/backend/access/transam/xlogfuncs.c
+++ b/src/backend/access/transam/xlogfuncs.c
@@ -245,7 +245,7 @@ pg_log_standby_snapshot(PG_FUNCTION_ARGS)
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("pg_log_standby_snapshot() can only be used if \"wal_level\" >= \"replica\"")));
 
-	recptr = LogStandbySnapshot();
+	recptr = LogStandbySnapshot(InvalidOid);
 
 	/*
 	 * As a convenience, return the WAL location of the last inserted record
diff --git a/src/backend/commands/repack_worker.c b/src/backend/commands/repack_worker.c
index 00b21ede481..c25dbeadff3 100644
--- a/src/backend/commands/repack_worker.c
+++ b/src/backend/commands/repack_worker.c
@@ -16,6 +16,7 @@
  */
 #include "postgres.h"
 
+#include "access/genam.h"
 #include "access/table.h"
 #include "access/xlog_internal.h"
 #include "access/xlogutils.h"
@@ -233,6 +234,13 @@ repack_setup_logical_decoding(Oid relid)
 
 	EnsureLogicalDecodingEnabled();
 
+	/*
+	 * By declaring that our output plugin does not need shared catalogs, we
+	 * avoid waiting for completion of transactions running in other databases
+	 * than the one we're connected to.
+	 */
+	accessSharedCatalogsInDecoding = false;
+
 	/*
 	 * Neither prepare_write nor do_write callback nor update_progress is
 	 * useful for us.
diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c
index 1d8947774a9..a30de4262eb 100644
--- a/src/backend/postmaster/bgwriter.c
+++ b/src/backend/postmaster/bgwriter.c
@@ -289,7 +289,7 @@ BackgroundWriterMain(const void *startup_data, size_t startup_data_len)
 			if (now >= timeout &&
 				last_snapshot_lsn <= GetLastImportantRecPtr())
 			{
-				last_snapshot_lsn = LogStandbySnapshot();
+				last_snapshot_lsn = LogStandbySnapshot(InvalidOid);
 				last_snapshot_ts = now;
 			}
 		}
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 0f6b5df322c..16009fa75a4 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -125,6 +125,7 @@
 #include <sys/stat.h>
 #include <unistd.h>
 
+#include "access/genam.h"
 #include "access/heapam_xlog.h"
 #include "access/transam.h"
 #include "access/xact.h"
@@ -1470,7 +1471,14 @@ SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
 	 */
 	if (!RecoveryInProgress())
 	{
-		LogStandbySnapshot();
+		Oid		dbid;
+
+		/*
+		 * Only consider transactions of the current database if our plugin is
+		 * not supposed to access shared catalogs.
+		 */
+		dbid = accessSharedCatalogsInDecoding ? InvalidOid : MyDatabaseId;
+		LogStandbySnapshot(dbid);
 	}
 }
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index fe6bfeba25e..0d415f5f8f2 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -39,6 +39,7 @@
 #include <unistd.h>
 #include <sys/stat.h>
 
+#include "access/genam.h"
 #include "access/transam.h"
 #include "access/xlog_internal.h"
 #include "access/xlogrecovery.h"
@@ -1776,9 +1777,16 @@ ReplicationSlotReserveWal(void)
 	if (!RecoveryInProgress() && SlotIsLogical(slot))
 	{
 		XLogRecPtr	flushptr;
+		Oid		dbid;
 
-		/* make sure we have enough information to start */
-		flushptr = LogStandbySnapshot();
+		/*
+		 * Make sure we have enough information to start.
+		 *
+		 * Only consider transactions of the current database if our plugin
+		 * is not supposed to access shared catalogs.
+		 */
+		dbid = accessSharedCatalogsInDecoding ? InvalidOid : MyDatabaseId;
+		flushptr = LogStandbySnapshot(dbid);
 
 		/* and make sure it's fsynced to disk */
 		XLogFlush(flushptr);
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index cc207cb56e3..49d1bf4bfac 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -2631,9 +2631,11 @@ ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc)
  *
  * Note that if any transaction has overflowed its cached subtransactions
  * then there is no real need include any subtransactions.
+ *
+ * If 'dbid' is valid, only gather transactions running in that database.
  */
 RunningTransactions
-GetRunningTransactionData(void)
+GetRunningTransactionData(Oid dbid)
 {
 	/* result workspace */
 	static RunningTransactionsData CurrentRunningXactsData;
@@ -2708,6 +2710,18 @@ GetRunningTransactionData(void)
 		if (!TransactionIdIsValid(xid))
 			continue;
 
+		/*
+		 * Filter by database OID if requested.
+		 */
+		if (OidIsValid(dbid))
+		{
+			int			pgprocno = arrayP->pgprocnos[index];
+			PGPROC	   *proc = &allProcs[pgprocno];
+
+			if (proc->databaseId != dbid)
+				continue;
+		}
+
 		/*
 		 * Be careful not to exclude any xids before calculating the values of
 		 * oldestRunningXid and suboverflowed, since these are used to clean
@@ -2758,6 +2772,12 @@ GetRunningTransactionData(void)
 			PGPROC	   *proc = &allProcs[pgprocno];
 			int			nsubxids;
 
+			/*
+			 * Filter by database OID if requested.
+			 */
+			if (OidIsValid(dbid) && proc->databaseId != dbid)
+					continue;
+
 			/*
 			 * Save subtransaction XIDs. Other backends can't add or remove
 			 * entries while we're holding XidGenLock.
@@ -2791,6 +2811,7 @@ GetRunningTransactionData(void)
 	 * increases if slots do.
 	 */
 
+	CurrentRunningXacts->dbid = dbid;
 	CurrentRunningXacts->xcnt = count - subcount;
 	CurrentRunningXacts->subxcnt = subcount;
 	CurrentRunningXacts->subxid_status = suboverflowed ? SUBXIDS_IN_SUBTRANS : SUBXIDS_IN_ARRAY;
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index de9092fdf5b..c653ea742bc 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -1188,6 +1188,14 @@ standby_redo(XLogReaderState *record)
 		xl_running_xacts *xlrec = (xl_running_xacts *) XLogRecGetData(record);
 		RunningTransactionsData running;
 
+		/*
+		 * Records issued for specific database are not suitable for physical
+		 * replication because that affects the whole cluster. In particular,
+		 * the list of XID is probably incomplete here.
+		 */
+		if (OidIsValid(xlrec->dbid))
+			return;
+
 		running.xcnt = xlrec->xcnt;
 		running.subxcnt = xlrec->subxcnt;
 		running.subxid_status = xlrec->subxid_overflow ? SUBXIDS_MISSING : SUBXIDS_IN_ARRAY;
@@ -1277,11 +1285,12 @@ standby_redo(XLogReaderState *record)
  * as there's no independent knob to just enable logical decoding. For
  * details of how this is used, check snapbuild.c's introductory comment.
  *
+ * If 'dbid' is valid, only gather transactions running in that database.
  *
  * Returns the RecPtr of the last inserted record.
  */
 XLogRecPtr
-LogStandbySnapshot(void)
+LogStandbySnapshot(Oid dbid)
 {
 	XLogRecPtr	recptr;
 	RunningTransactions running;
@@ -1314,7 +1323,7 @@ LogStandbySnapshot(void)
 	 * Log details of all in-progress transactions. This should be the last
 	 * record we write, because standby will open up when it sees this.
 	 */
-	running = GetRunningTransactionData();
+	running = GetRunningTransactionData(dbid);
 
 	/*
 	 * GetRunningTransactionData() acquired ProcArrayLock, we must release it.
@@ -1358,6 +1367,7 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
 	xl_running_xacts xlrec;
 	XLogRecPtr	recptr;
 
+	xlrec.dbid = CurrRunningXacts->dbid;
 	xlrec.xcnt = CurrRunningXacts->xcnt;
 	xlrec.subxcnt = CurrRunningXacts->subxcnt;
 	xlrec.subxid_overflow = (CurrRunningXacts->subxid_status != SUBXIDS_IN_ARRAY);
diff --git a/src/include/access/genam.h b/src/include/access/genam.h
index b69320a7fc8..56c573399ab 100644
--- a/src/include/access/genam.h
+++ b/src/include/access/genam.h
@@ -136,6 +136,14 @@ typedef struct IndexOrderByDistance
 	bool		isnull;
 } IndexOrderByDistance;
 
+/*
+ * Is the backend interested in shared catalogs when performing logical
+ * decoding?
+ *
+ * XXX Is there a better place for this declaration?
+ */
+extern bool accessSharedCatalogsInDecoding;
+
 /*
  * generalized index_ interface routines (in indexam.c)
  */
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 755835d63bf..ae19982d88d 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -31,7 +31,7 @@
 /*
  * Each page of XLOG file has a header like this:
  */
-#define XLOG_PAGE_MAGIC 0xD11E	/* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD11F	/* can be used as WAL version indicator */
 
 typedef struct XLogPageHeaderData
 {
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index abdf021e66e..377b3060b9f 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -49,7 +49,7 @@ extern bool ProcArrayInstallImportedXmin(TransactionId xmin,
 										 VirtualTransactionId *sourcevxid);
 extern bool ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc);
 
-extern RunningTransactions GetRunningTransactionData(void);
+extern RunningTransactions GetRunningTransactionData(Oid dbid);
 
 extern bool TransactionIdIsInProgress(TransactionId xid);
 extern TransactionId GetOldestNonRemovableTransactionId(Relation rel);
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index 6a314c693cd..8715c08e94f 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -126,6 +126,7 @@ typedef enum
 
 typedef struct RunningTransactionsData
 {
+	Oid			dbid;			/* only track xacts in this database */
 	int			xcnt;			/* # of xact ids in xids[] */
 	int			subxcnt;		/* # of subxact ids in xids[] */
 	subxids_array_status subxid_status;
@@ -143,7 +144,7 @@ typedef RunningTransactionsData *RunningTransactions;
 extern void LogAccessExclusiveLock(Oid dbOid, Oid relOid);
 extern void LogAccessExclusiveLockPrepare(void);
 
-extern XLogRecPtr LogStandbySnapshot(void);
+extern XLogRecPtr LogStandbySnapshot(Oid dbid);
 extern void LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs,
 									bool relcacheInitFileInval);
 
diff --git a/src/include/storage/standbydefs.h b/src/include/storage/standbydefs.h
index 231d251fd51..e75b7078766 100644
--- a/src/include/storage/standbydefs.h
+++ b/src/include/storage/standbydefs.h
@@ -46,6 +46,7 @@ typedef struct xl_standby_locks
  */
 typedef struct xl_running_xacts
 {
+	Oid			dbid;			/* only track xacts in this database */
 	int			xcnt;			/* # of xact ids in xids[] */
 	int			subxcnt;		/* # of subxact ids in xids[] */
 	bool		subxid_overflow;	/* snapshot overflowed, subxids missing */
-- 
2.47.3

