From 7e4f738b224b758e3ed4e14e2483a7e4a7dfc187 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=C3=81lvaro=20Herrera?= <alvherre@kurilemu.de>
Date: Wed, 1 Apr 2026 19:54:14 +0200
Subject: [PATCH v49 7/7] WIP add max_repack_replication_slots

---
 src/backend/commands/repack_worker.c          |  4 +-
 src/backend/replication/logical/launcher.c    |  2 +-
 src/backend/replication/logical/slotsync.c    |  5 +-
 src/backend/replication/slot.c                | 77 +++++++++++--------
 src/backend/replication/slotfuncs.c           |  8 +-
 src/backend/replication/walsender.c           |  4 +-
 src/backend/utils/misc/guc_parameters.dat     |  8 ++
 src/backend/utils/misc/postgresql.conf.sample |  2 +
 src/include/replication/slot.h                |  6 +-
 9 files changed, 74 insertions(+), 42 deletions(-)

diff --git a/src/backend/commands/repack_worker.c b/src/backend/commands/repack_worker.c
index 106b2b60441..00b21ede481 100644
--- a/src/backend/commands/repack_worker.c
+++ b/src/backend/commands/repack_worker.c
@@ -228,8 +228,8 @@ repack_setup_logical_decoding(Oid relid)
 	 * RS_TEMPORARY so that the slot gets cleaned up on ERROR.
 	 */
 	snprintf(NameStr(slotname), NAMEDATALEN, "repack_%d", MyProcPid);
-	ReplicationSlotCreate(NameStr(slotname), true, RS_TEMPORARY, false, false,
-						  false);
+	ReplicationSlotCreate(NameStr(slotname), true, RS_TEMPORARY, false, true,
+						  false, false);
 
 	EnsureLogicalDecodingEnabled();
 
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 09964198550..d83125afd0d 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -1575,7 +1575,7 @@ CreateConflictDetectionSlot(void)
 			errmsg("creating replication conflict detection slot"));
 
 	ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
-						  false, false);
+						  false, false, false);
 
 	init_conflict_slot_xmin();
 }
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index e75db69e3f6..1bc7f3f600d 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -425,7 +425,7 @@ get_local_synced_slots(void)
 
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
-	for (int i = 0; i < max_replication_slots; i++)
+	for (int i = 0; i < TotalMaxReplicationSlots; i++)
 	{
 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 
@@ -814,6 +814,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid,
 		 */
 		ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY,
 							  remote_slot->two_phase,
+							  false,
 							  remote_slot->failover,
 							  true);
 
@@ -1691,7 +1692,7 @@ update_synced_slots_inactive_since(void)
 
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
-	for (int i = 0; i < max_replication_slots; i++)
+	for (int i = 0; i < TotalMaxReplicationSlots; i++)
 	{
 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index a9092fc2382..fe6bfeba25e 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -151,6 +151,9 @@ ReplicationSlot *MyReplicationSlot = NULL;
 /* GUC variables */
 int			max_replication_slots = 10; /* the maximum number of replication
 										 * slots */
+int			max_repack_replication_slots = 1;	/* the maximum number of slots
+												 * for REPACK */
+int			TotalMaxReplicationSlots = 0;	/* sum of both */
 
 /*
  * Invalidate replication slots that have remained idle longer than this
@@ -190,12 +193,14 @@ ReplicationSlotsShmemSize(void)
 {
 	Size		size = 0;
 
-	if (max_replication_slots == 0)
+	TotalMaxReplicationSlots = max_replication_slots + max_repack_replication_slots;
+
+	if (TotalMaxReplicationSlots == 0)
 		return size;
 
 	size = offsetof(ReplicationSlotCtlData, replication_slots);
 	size = add_size(size,
-					mul_size(max_replication_slots, sizeof(ReplicationSlot)));
+					mul_size(TotalMaxReplicationSlots, sizeof(ReplicationSlot)));
 
 	return size;
 }
@@ -208,7 +213,7 @@ ReplicationSlotsShmemInit(void)
 {
 	bool		found;
 
-	if (max_replication_slots == 0)
+	if (TotalMaxReplicationSlots == 0)
 		return;
 
 	ReplicationSlotCtl = (ReplicationSlotCtlData *)
@@ -222,7 +227,7 @@ ReplicationSlotsShmemInit(void)
 		/* First time through, so initialize */
 		MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
 
-		for (i = 0; i < max_replication_slots; i++)
+		for (i = 0; i < TotalMaxReplicationSlots; i++)
 		{
 			ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
 
@@ -372,6 +377,7 @@ IsSlotForConflictCheck(const char *name)
  * db_specific: logical decoding is db specific; if the slot is going to
  *	   be used for that pass true, otherwise false.
  * two_phase: If enabled, allows decoding of prepared transactions.
+ * repack: If true, use a slot from the pool for REPACK.
  * failover: If enabled, allows the slot to be synced to standbys so
  *     that logical replication can be resumed after failover.
  * synced: True if the slot is synchronized from the primary server.
@@ -379,10 +385,11 @@ IsSlotForConflictCheck(const char *name)
 void
 ReplicationSlotCreate(const char *name, bool db_specific,
 					  ReplicationSlotPersistency persistency,
-					  bool two_phase, bool failover, bool synced)
+					  bool two_phase, bool repack, bool failover, bool synced)
 {
 	ReplicationSlot *slot = NULL;
-	int			i;
+	int			startpoint,
+				endpoint;
 
 	Assert(MyReplicationSlot == NULL);
 
@@ -431,12 +438,16 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 	LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
 
 	/*
-	 * Check for name collision, and identify an allocatable slot.  We need to
-	 * hold ReplicationSlotControlLock in shared mode for this, so that nobody
-	 * else can change the in_use flags while we're looking at them.
+	 * Check for name collision (across the whole array), and identify an
+	 * allocatable slot (in the array slice specific to our current use case:
+	 * either general, or REPACK only).  We need to hold
+	 * ReplicationSlotControlLock in shared mode for this, so that nobody else
+	 * can change the in_use flags while we're looking at them.
 	 */
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-	for (i = 0; i < max_replication_slots; i++)
+	startpoint = repack ? max_replication_slots : 0;
+	endpoint = repack ? TotalMaxReplicationSlots : max_replication_slots;
+	for (int i = 0; i < TotalMaxReplicationSlots; i++)
 	{
 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 
@@ -444,7 +455,9 @@ ReplicationSlotCreate(const char *name, bool db_specific,
 			ereport(ERROR,
 					(errcode(ERRCODE_DUPLICATE_OBJECT),
 					 errmsg("replication slot \"%s\" already exists", name)));
-		if (!s->in_use && slot == NULL)
+
+		if (i >= startpoint && i < endpoint &&
+			!s->in_use && slot == NULL)
 			slot = s;
 	}
 	LWLockRelease(ReplicationSlotControlLock);
@@ -547,7 +560,7 @@ SearchNamedReplicationSlot(const char *name, bool need_lock)
 	if (need_lock)
 		LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
-	for (i = 0; i < max_replication_slots; i++)
+	for (i = 0; i < TotalMaxReplicationSlots; i++)
 	{
 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 
@@ -575,7 +588,7 @@ int
 ReplicationSlotIndex(ReplicationSlot *slot)
 {
 	Assert(slot >= ReplicationSlotCtl->replication_slots &&
-		   slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
+		   slot < ReplicationSlotCtl->replication_slots + TotalMaxReplicationSlots);
 
 	return slot - ReplicationSlotCtl->replication_slots;
 }
@@ -869,7 +882,7 @@ ReplicationSlotCleanup(bool synced_only)
 restart:
 	found_valid_logicalslot = false;
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-	for (i = 0; i < max_replication_slots; i++)
+	for (i = 0; i < TotalMaxReplicationSlots; i++)
 	{
 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 
@@ -1251,7 +1264,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
 	if (!already_locked)
 		LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
-	for (i = 0; i < max_replication_slots; i++)
+	for (i = 0; i < TotalMaxReplicationSlots; i++)
 	{
 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 		TransactionId effective_xmin;
@@ -1306,7 +1319,7 @@ ReplicationSlotsComputeRequiredLSN(void)
 	Assert(ReplicationSlotCtl != NULL);
 
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-	for (i = 0; i < max_replication_slots; i++)
+	for (i = 0; i < TotalMaxReplicationSlots; i++)
 	{
 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 		XLogRecPtr	restart_lsn;
@@ -1373,12 +1386,12 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
 	XLogRecPtr	result = InvalidXLogRecPtr;
 	int			i;
 
-	if (max_replication_slots <= 0)
+	if (TotalMaxReplicationSlots <= 0)
 		return InvalidXLogRecPtr;
 
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
-	for (i = 0; i < max_replication_slots; i++)
+	for (i = 0; i < TotalMaxReplicationSlots; i++)
 	{
 		ReplicationSlot *s;
 		XLogRecPtr	restart_lsn;
@@ -1453,11 +1466,11 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
 
 	*nslots = *nactive = 0;
 
-	if (max_replication_slots <= 0)
+	if (TotalMaxReplicationSlots <= 0)
 		return false;
 
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-	for (i = 0; i < max_replication_slots; i++)
+	for (i = 0; i < TotalMaxReplicationSlots; i++)
 	{
 		ReplicationSlot *s;
 
@@ -1514,13 +1527,13 @@ ReplicationSlotsDropDBSlots(Oid dboid)
 	bool		found_valid_logicalslot;
 	bool		dropped = false;
 
-	if (max_replication_slots <= 0)
+	if (TotalMaxReplicationSlots <= 0)
 		return;
 
 restart:
 	found_valid_logicalslot = false;
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-	for (i = 0; i < max_replication_slots; i++)
+	for (i = 0; i < TotalMaxReplicationSlots; i++)
 	{
 		ReplicationSlot *s;
 		char	   *slotname;
@@ -1617,11 +1630,11 @@ CheckLogicalSlotExists(void)
 {
 	bool		found = false;
 
-	if (max_replication_slots <= 0)
+	if (TotalMaxReplicationSlots <= 0)
 		return false;
 
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-	for (int i = 0; i < max_replication_slots; i++)
+	for (int i = 0; i < TotalMaxReplicationSlots; i++)
 	{
 		ReplicationSlot *s;
 		bool		invalidated;
@@ -1662,7 +1675,10 @@ CheckSlotRequirements(void)
 	 * needs the same check.
 	 */
 
-	if (max_replication_slots == 0)
+	/* FIXME how to relax this for repack in a way that doesn't mess everything
+	 * up?
+	 */
+	if (TotalMaxReplicationSlots == 0)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
@@ -2216,7 +2232,7 @@ InvalidateObsoleteReplicationSlots(uint32 possible_causes,
 	Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
 	Assert(possible_causes != RS_INVAL_NONE);
 
-	if (max_replication_slots == 0)
+	if (TotalMaxReplicationSlots == 0)
 		return invalidated;
 
 	XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
@@ -2224,7 +2240,7 @@ InvalidateObsoleteReplicationSlots(uint32 possible_causes,
 restart:
 	found_valid_logicalslot = false;
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-	for (int i = 0; i < max_replication_slots; i++)
+	for (int i = 0; i < TotalMaxReplicationSlots; i++)
 	{
 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 		bool		released_lock = false;
@@ -2329,7 +2345,7 @@ CheckPointReplicationSlots(bool is_shutdown)
 	 */
 	LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
 
-	for (i = 0; i < max_replication_slots; i++)
+	for (i = 0; i < TotalMaxReplicationSlots; i++)
 	{
 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 		char		path[MAXPGPATH];
@@ -2430,7 +2446,7 @@ StartupReplicationSlots(void)
 	FreeDir(replication_dir);
 
 	/* currently no slots exist, we're done. */
-	if (max_replication_slots <= 0)
+	if (TotalMaxReplicationSlots <= 0)
 		return;
 
 	/* Now that we have recovered all the data, compute replication xmin */
@@ -2860,7 +2876,7 @@ RestoreSlotFromDisk(const char *name)
 				 errhint("Change \"wal_level\" to be \"replica\" or higher.")));
 
 	/* nothing can be active yet, don't lock anything */
-	for (i = 0; i < max_replication_slots; i++)
+	for (i = 0; i < TotalMaxReplicationSlots; i++)
 	{
 		ReplicationSlot *slot;
 
@@ -2902,6 +2918,7 @@ RestoreSlotFromDisk(const char *name)
 		break;
 	}
 
+	/* XXX might be misleading if the slots previously in use were REPACK. */
 	if (!restored)
 		ereport(FATAL,
 				(errmsg("too many replication slots active before shutdown"),
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 9f5e4f998fe..cdb4dbd87c9 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -53,7 +53,7 @@ create_physical_replication_slot(char *name, bool immediately_reserve,
 	/* acquire replication slot, this will check for conflicting names */
 	ReplicationSlotCreate(name, false,
 						  temporary ? RS_TEMPORARY : RS_PERSISTENT, false,
-						  false, false);
+						  false, false, false);
 
 	if (immediately_reserve)
 	{
@@ -146,7 +146,7 @@ create_logical_replication_slot(char *name, char *plugin,
 	 */
 	ReplicationSlotCreate(name, true,
 						  temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase,
-						  failover, false);
+						  false, failover, false);
 
 	/*
 	 * Ensure the logical decoding is enabled before initializing the logical
@@ -270,7 +270,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 	currlsn = GetXLogWriteRecPtr();
 
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
-	for (slotno = 0; slotno < max_replication_slots; slotno++)
+	for (slotno = 0; slotno < TotalMaxReplicationSlots; slotno++)
 	{
 		ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
 		ReplicationSlot slot_contents;
@@ -665,7 +665,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
 	 * managed to create the new slot, we advance the new slot's restart_lsn
 	 * to the source slot's updated restart_lsn the second time we lock it.
 	 */
-	for (int i = 0; i < max_replication_slots; i++)
+	for (int i = 0; i < TotalMaxReplicationSlots; i++)
 	{
 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
 
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index fbbe09135bf..caea3ed1257 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1220,7 +1220,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 	{
 		ReplicationSlotCreate(cmd->slotname, false,
 							  cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT,
-							  false, false, false);
+							  false, false, false, false);
 
 		if (reserve_wal)
 		{
@@ -1251,7 +1251,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
 		 */
 		ReplicationSlotCreate(cmd->slotname, true,
 							  cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
-							  two_phase, failover, false);
+							  two_phase, false, failover, false);
 
 		/*
 		 * Do options check early so that we can bail before calling the
diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat
index 0a862693fcd..d12a3bd7bdf 100644
--- a/src/backend/utils/misc/guc_parameters.dat
+++ b/src/backend/utils/misc/guc_parameters.dat
@@ -2070,6 +2070,14 @@
   max => 'MAX_BACKENDS',
 },
 
+{ name => 'max_repack_replication_slots', type => 'int', context => 'PGC_POSTMASTER', group => 'REPLICATION_SENDING',
+  short_desc => 'Sets the maximum number of replication slots for use by REPACK.',
+  variable => 'max_repack_replication_slots',
+  boot_val => '1',
+  min => '0',
+  max => 'MAX_BACKENDS /* XXX? */',
+},
+
 /* see max_wal_senders */
 { name => 'max_replication_slots', type => 'int', context => 'PGC_POSTMASTER', group => 'REPLICATION_SENDING',
   short_desc => 'Sets the maximum number of simultaneously defined replication slots.',
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index cf15597385b..15014f37753 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -348,6 +348,8 @@
                                 # (change requires restart)
 #max_replication_slots = 10     # max number of replication slots
                                 # (change requires restart)
+#max_repack_replication_slots = 1 # max number of replication slots for REPACK
+                                # (change requires restart)
 #wal_keep_size = 0              # in megabytes; 0 disables
 #max_slot_wal_keep_size = -1    # in megabytes; -1 disables
 #idle_replication_slot_timeout = 0      # in seconds; 0 disables
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 4b4709f6e2c..88953576894 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -324,9 +324,13 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
 
 /* GUCs */
 extern PGDLLIMPORT int max_replication_slots;
+extern PGDLLIMPORT int max_repack_replication_slots;
 extern PGDLLIMPORT char *synchronized_standby_slots;
 extern PGDLLIMPORT int idle_replication_slot_timeout_secs;
 
+/* only for slotfuncs.c, slotsync.c etc */
+extern int TotalMaxReplicationSlots;
+
 /* shmem initialization functions */
 extern Size ReplicationSlotsShmemSize(void);
 extern void ReplicationSlotsShmemInit(void);
@@ -334,7 +338,7 @@ extern void ReplicationSlotsShmemInit(void);
 /* management of individual slots */
 extern void ReplicationSlotCreate(const char *name, bool db_specific,
 								  ReplicationSlotPersistency persistency,
-								  bool two_phase, bool failover,
+								  bool two_phase, bool repack, bool failover,
 								  bool synced);
 extern void ReplicationSlotPersist(void);
 extern void ReplicationSlotDrop(const char *name, bool nowait);
-- 
2.47.3

