From b9b73e7f610ec236d04c3f1c769a47d4582fd928 Mon Sep 17 00:00:00 2001 From: Rui Zhao Date: Wed, 10 Jun 2026 00:59:21 +0800 Subject: [PATCH v1] Separate catalog_xmin from xmin via ephemeral physical slot When a standby connects without a replication slot and sends hot standby feedback, lazily create an ephemeral physical slot so xmin and catalog_xmin can be tracked separately and atomically (under the slot mutex), instead of conflating both into the walsender's PGPROC xmin. Add an error_if_full flag to ReplicationSlotCreate so the walsender can degrade gracefully to the legacy min(xmin, catalog_xmin) behavior when the slot pool is exhausted, rather than tearing down the connection. --- src/backend/commands/repack_worker.c | 2 +- src/backend/replication/logical/launcher.c | 2 +- src/backend/replication/logical/slotsync.c | 2 +- src/backend/replication/slot.c | 19 ++++++++++-- src/backend/replication/slotfuncs.c | 4 +-- src/backend/replication/walsender.c | 35 +++++++++++++++------- src/include/replication/slot.h | 4 +-- 7 files changed, 49 insertions(+), 19 deletions(-) diff --git a/src/backend/commands/repack_worker.c b/src/backend/commands/repack_worker.c index db9ff057cc..95e5be471d 100644 --- a/src/backend/commands/repack_worker.c +++ b/src/backend/commands/repack_worker.c @@ -220,7 +220,7 @@ repack_setup_logical_decoding(Oid relid) */ snprintf(slotname, NAMEDATALEN, "pg_repack_%d", MyProcPid); ReplicationSlotCreate(slotname, true, RS_TEMPORARY, false, true, - false, false); + false, false, true); EnsureLogicalDecodingEnabled(); /* diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 313e31ff2e..756ad5f12e 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -1576,7 +1576,7 @@ CreateConflictDetectionSlot(void) errmsg("creating replication conflict detection slot")); ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false, - false, false, false); + false, false, false, true); init_conflict_slot_xmin(); } diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 96107c9475..fd12bc9766 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -851,7 +851,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid, remote_slot->two_phase, false, remote_slot->failover, - true); + true, true); /* For shorter lines. */ slot = MyReplicationSlot; diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index d7fb9f5a67..ab0d1b3f6e 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -373,11 +373,17 @@ IsSlotForConflictCheck(const char *name) * failover: If enabled, allows the slot to be synced to standbys so * that logical replication can be resumed after failover. * synced: True if the slot is synchronized from the primary server. + * error_if_full: If true, raise an error when no slot is free; if false, + * return false instead so the caller can degrade gracefully. + * + * Returns true if a slot was created, false only when error_if_full is false + * and the slot pool is exhausted. */ -void +bool ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase, bool repack, bool failover, bool synced) + bool two_phase, bool repack, bool failover, bool synced, + bool error_if_full) { ReplicationSlot *slot = NULL; int startpoint, @@ -456,11 +462,18 @@ ReplicationSlotCreate(const char *name, bool db_specific, /* If all slots are in use, we're out of luck. */ if (slot == NULL) + { + if (!error_if_full) + { + LWLockRelease(ReplicationSlotAllocationLock); + return false; + } ereport(ERROR, (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), errmsg("all replication slots are in use"), errhint("Free one or increase \"%s\".", repack ? "max_repack_replication_slots" : "max_replication_slots"))); + } /* * Since this slot is not in use, nobody should be looking at any part of @@ -537,6 +550,8 @@ ReplicationSlotCreate(const char *name, bool db_specific, /* Let everybody know we've modified this slot */ ConditionVariableBroadcast(&slot->active_cv); + + return true; } /* diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 16fbd38373..7ff4d04317 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -53,7 +53,7 @@ create_physical_replication_slot(char *name, bool immediately_reserve, /* acquire replication slot, this will check for conflicting names */ ReplicationSlotCreate(name, false, temporary ? RS_TEMPORARY : RS_PERSISTENT, false, - false, false, false); + false, false, false, true); if (immediately_reserve) { @@ -146,7 +146,7 @@ create_logical_replication_slot(char *name, char *plugin, */ ReplicationSlotCreate(name, true, temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase, - false, failover, false); + false, failover, false, true); /* * Ensure the logical decoding is enabled before initializing the logical diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 04aa770d98..edc2dd2283 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1248,7 +1248,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) { ReplicationSlotCreate(cmd->slotname, false, cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT, - false, false, false, false); + false, false, false, false, true); if (reserve_wal) { @@ -1279,7 +1279,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) */ ReplicationSlotCreate(cmd->slotname, true, cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, - two_phase, false, failover, false); + two_phase, false, failover, false, true); /* * Do options check early so that we can bail before calling the @@ -2786,18 +2786,33 @@ ProcessStandbyHSFeedbackMessage(void) * obviously safe, and if we're moving it backwards, well, the data is at * risk already since a VACUUM could already have determined the horizon.) * - * If we're using a replication slot we reserve the xmin via that, - * otherwise via the walsender's PGPROC entry. We can only track the - * catalog xmin separately when using a slot, so we store the least of the - * two provided when not using a slot. - * - * XXX: It might make sense to generalize the ephemeral slot concept and - * always use the slot mechanism to handle the feedback xmin. + * If we're using a replication slot we reserve the xmin via that. When the + * standby connected without one, lazily create an ephemeral physical slot + * here, so that we can still track xmin and catalog_xmin separately (and + * atomically, under the slot mutex). The ephemeral slot is dropped + * automatically when this walsender exits. */ - if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */ + if (MyReplicationSlot == NULL) + { + char slotname[NAMEDATALEN]; + + snprintf(slotname, sizeof(slotname), "pg_walsender_%d", MyProcPid); + ReplicationSlotCreate(slotname, false, RS_EPHEMERAL, + false, false, false, false, false); + } + + if (MyReplicationSlot != NULL) PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin); else { + /* + * The slot pool is exhausted, so we cannot track the two horizons + * separately. Degrade gracefully to the pre-existing behavior of + * holding back both via the walsender's PGPROC entry, using the + * older of the two values. This loses the catalog/data separation + * for this standby until a slot frees up, but never breaks the + * connection. + */ if (TransactionIdIsNormal(feedbackCatalogXmin) && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin)) MyProc->xmin = feedbackCatalogXmin; diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 9b29444cbc..ba90d574af 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -329,10 +329,10 @@ extern PGDLLIMPORT char *synchronized_standby_slots; extern PGDLLIMPORT int idle_replication_slot_timeout_secs; /* management of individual slots */ -extern void ReplicationSlotCreate(const char *name, bool db_specific, +extern bool ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool repack, bool failover, - bool synced); + bool synced, bool error_if_full); extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); extern void ReplicationSlotDropAcquired(bool try_disable); -- 2.43.7