From 05b6194a321f4168dcc17b1fd5589dfcfb6e49d4 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <rupiredd@amazon.com>
Date: Fri, 3 Apr 2026 18:27:15 +0000
Subject: [PATCH v10] Introduce max_slot_xid_age to invalidate old replication
 slots.

This commit introduces a new GUC parameter, max_slot_xid_age. It
invalidates replication slots whose xmin or catalog_xmin age exceeds
the configured limit. While time-based slot invalidation is useful for
cleaning up inactive slots, an XID-age-based limit acts as a critical
backstop to directly prevent transaction ID wraparound and severe
bloat caused by orphaned slots.

The invalidation check occurs during both VACUUM (manual and autovacuum)
and checkpoints. During vacuum, the check is performed on a per-relation
basis. Crucially, the XID-age based slot invalidation is considered
only when slots' XIDs are actively holding back the OldestXmin for the
current relation. In other words, we only invalidate slots when doing
so has the potential to advance the vacuum cutoff and allow dead tuple
reclamation.

Checking slot XIDs per relation could introduce significant performance
overhead if it required acquiring ProcArrayLock repeatedly to get
replication slot xmin values (xmin and catalog_xmin in procArray). To
avoid this, the vacuum cutoff calculation has been optimized. A new
function GetOldestNonRemovableTransactionIdWithSlotXids() is
introduced to return the global OldestXmin along with the oldest slot
xmin and catalog_xmin values. This allows the per-relation invalidation
check to be performed with zero additional lock acquisitions.

In addition to vacuum, slots are also checked and invalidated during
checkpoints. This is particularly important for standby servers where
vacuum does not run. Note that on standbys, slots that are currently
being synced from the primary (i.e., synced = true) are exempt from
this invalidation mechanism.

Author: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Co-authored-by: John Hsu <johnhyvr@gmail.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-by: Bertrand Drouvot <bertranddrouvot.pg@gmail.com>
Reviewed-by: shveta malik <shveta.malik@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-by: SATYANARAYANA NARLAPURAM <satyanarlapuram@gmail.com>
Discussion: https://postgr.es/m/CALj2ACW4aUe-_uFQOjdWCEN-xXoLGhmvRFnL8SNw_TZ5nJe+aw@mail.gmail.com
Discussion: https://postgr.es/m/CA+-JvFsMHckBMzsu5Ov9HCG3AFbMh056hHy1FiXazBRtZ9pFBg@mail.gmail.com
---
 doc/src/sgml/config.sgml                      |  54 ++++++
 doc/src/sgml/logical-replication.sgml         |   4 +-
 doc/src/sgml/system-views.sgml                |   8 +
 src/backend/access/heap/vacuumlazy.c          |  18 ++
 src/backend/access/transam/xlog.c             |  34 +++-
 src/backend/commands/vacuum.c                 |  80 +++++++-
 src/backend/replication/slot.c                |  82 +++++++-
 src/backend/storage/ipc/procarray.c           |  61 ++++--
 src/backend/storage/ipc/standby.c             |   3 +-
 src/backend/utils/misc/guc_parameters.dat     |   8 +
 src/backend/utils/misc/postgresql.conf.sample |   2 +
 src/bin/pg_basebackup/pg_createsubscriber.c   |   2 +-
 src/include/commands/vacuum.h                 |  10 +
 src/include/replication/slot.h                |   8 +-
 src/include/storage/procarray.h               |   3 +
 src/test/recovery/t/019_replslot_limit.pl     | 175 ++++++++++++++++++
 16 files changed, 524 insertions(+), 28 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index d3fea738ca3..16e45748118 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4764,6 +4764,60 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-max-slot-xid-age" xreflabel="max_slot_xid_age">
+      <term><varname>max_slot_xid_age</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>max_slot_xid_age</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Invalidate replication slots whose <structfield>xmin</structfield> age
+        or <structfield>catalog_xmin</structfield> age in the
+        <link linkend="view-pg-replication-slots">pg_replication_slots</link>
+        view has exceeded the age specified by this setting. Slot invalidation
+        due to this limit occurs during vacuum (both <command>VACUUM</command>
+        command and autovacuum) and during checkpoint.
+        A value of zero (the default) disables this feature. Users can set
+        this value anywhere from zero to two billion transactions. This parameter
+        can only be set in the <filename>postgresql.conf</filename> file or on
+        the server command line.
+       </para>
+
+       <para>
+        The current age of a slot's <literal>xmin</literal> and
+        <literal>catalog_xmin</literal> can be monitored by applying the
+        <function>age</function> function to the corresponding columns in the
+        <link linkend="view-pg-replication-slots">pg_replication_slots</link>
+        view.
+       </para>
+
+       <para>
+        Idle or forgotten replication slots can hold back vacuum, leading to
+        bloat and eventually transaction ID wraparound. This setting avoids
+        that by invalidating slots that have fallen too far behind.
+        See <xref linkend="routine-vacuuming"/> for more details.
+       </para>
+
+       <para>
+        It is recommended to set <varname>max_slot_xid_age</varname>
+        to a value equal to or slightly less than
+        <xref linkend="guc-vacuum-failsafe-age"/>, so that the slot holding the
+        vacuum back is invalidated before vacuum enters failsafe mode.
+       </para>
+
+       <para>
+        Note that this invalidation mechanism is not applicable for slots
+        on the standby server that are being synced from the primary server
+        (i.e., standby slots having
+        <link linkend="view-pg-replication-slots">pg_replication_slots</link>.<structfield>synced</structfield>
+        value <literal>true</literal>). Synced slots are always considered to
+        be inactive because they don't perform logical decoding to produce
+        changes.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-wal-sender-timeout" xreflabel="wal_sender_timeout">
       <term><varname>wal_sender_timeout</varname> (<type>integer</type>)
       <indexterm>
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 23b268273b9..a865a5e6c28 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -2649,7 +2649,9 @@ CONTEXT:  processing remote data for replication origin "pg_16395" during "INSER
 
    <para>
     Logical replication slots are also affected by
-    <link linkend="guc-idle-replication-slot-timeout"><varname>idle_replication_slot_timeout</varname></link>.
+    <link linkend="guc-idle-replication-slot-timeout"><varname>idle_replication_slot_timeout</varname></link>
+    and
+    <link linkend="guc-max-slot-xid-age"><varname>max_slot_xid_age</varname></link>.
    </para>
 
    <para>
diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml
index 9ee1a2bfc6a..1a507b430f9 100644
--- a/doc/src/sgml/system-views.sgml
+++ b/doc/src/sgml/system-views.sgml
@@ -3102,6 +3102,14 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
           <xref linkend="guc-idle-replication-slot-timeout"/> duration.
          </para>
         </listitem>
+        <listitem>
+         <para>
+          <literal>xid_aged</literal> means that the slot's
+          <literal>xmin</literal> or <literal>catalog_xmin</literal>
+          has reached the age specified by
+          <xref linkend="guc-max-slot-xid-age"/> parameter.
+         </para>
+        </listitem>
        </itemizedlist>
       </para></entry>
      </row>
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 88c71cd85b6..56b231f2350 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -799,6 +799,24 @@ heap_vacuum_rel(Relation rel, const VacuumParams *params,
 	 * to increase the number of dead tuples it can prune away.)
 	 */
 	vacrel->aggressive = vacuum_get_cutoffs(rel, params, &vacrel->cutoffs);
+
+	/*
+	 * If the current vacuum cutoff (OldestXmin) is being held back by a
+	 * replication slot that has exceeded max_slot_xid_age, attempt to
+	 * invalidate such slots.
+	 */
+	if (maybe_invalidate_xid_aged_slots(vacrel->cutoffs.OldestXmin,
+										vacrel->cutoffs.OldestSlotXmin,
+										vacrel->cutoffs.OldestSlotCatalogXmin))
+	{
+		/*
+		 * Some slots have been invalidated; re-compute the vacuum cutoffs and
+		 * aggressiveness.
+		 */
+		vacrel->aggressive = vacuum_get_cutoffs(rel, params,
+												&vacrel->cutoffs);
+	}
+
 	vacrel->rel_pages = orig_rel_pages = RelationGetNumberOfBlocks(rel);
 	vacrel->vistest = GlobalVisTestFor(rel);
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 9e8999bbb61..42285c21cb3 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7401,6 +7401,8 @@ CreateCheckPoint(int flags)
 	VirtualTransactionId *vxids;
 	int			nvxids;
 	int			oldXLogAllowed = 0;
+	uint32		slotInvalidationCauses;
+	TransactionId slotXidLimit;
 
 	/*
 	 * An end-of-recovery checkpoint is really a shutdown checkpoint, just
@@ -7845,9 +7847,20 @@ CreateCheckPoint(int flags)
 	 */
 	XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
 	KeepLogSeg(recptr, &_logSegNo);
-	if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED | RS_INVAL_IDLE_TIMEOUT,
+
+	slotInvalidationCauses = RS_INVAL_WAL_REMOVED | RS_INVAL_IDLE_TIMEOUT;
+	slotXidLimit = InvalidTransactionId;
+	if (max_slot_xid_age > 0)
+	{
+		slotInvalidationCauses |= RS_INVAL_XID_AGE;
+		slotXidLimit = TransactionIdRetreatedBy(ReadNextTransactionId(),
+												max_slot_xid_age);
+	}
+
+	if (InvalidateObsoleteReplicationSlots(slotInvalidationCauses,
 										   _logSegNo, InvalidOid,
-										   InvalidTransactionId))
+										   InvalidTransactionId,
+										   slotXidLimit))
 	{
 		/*
 		 * Some slots have been invalidated; recalculate the old-segment
@@ -8134,6 +8147,8 @@ CreateRestartPoint(int flags)
 	XLogRecPtr	endptr;
 	XLogSegNo	_logSegNo;
 	TimestampTz xtime;
+	uint32		slotInvalidationCauses;
+	TransactionId slotXidLimit;
 
 	/* Concurrent checkpoint/restartpoint cannot happen */
 	Assert(!IsUnderPostmaster || MyBackendType == B_CHECKPOINTER);
@@ -8312,9 +8327,19 @@ CreateRestartPoint(int flags)
 
 	INJECTION_POINT("restartpoint-before-slot-invalidation", NULL);
 
-	if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED | RS_INVAL_IDLE_TIMEOUT,
+	slotInvalidationCauses = RS_INVAL_WAL_REMOVED | RS_INVAL_IDLE_TIMEOUT;
+	slotXidLimit = InvalidTransactionId;
+	if (max_slot_xid_age > 0)
+	{
+		slotInvalidationCauses |= RS_INVAL_XID_AGE;
+		slotXidLimit = TransactionIdRetreatedBy(ReadNextTransactionId(),
+												max_slot_xid_age);
+	}
+
+	if (InvalidateObsoleteReplicationSlots(slotInvalidationCauses,
 										   _logSegNo, InvalidOid,
-										   InvalidTransactionId))
+										   InvalidTransactionId,
+										   slotXidLimit))
 	{
 		/*
 		 * Some slots have been invalidated; recalculate the old-segment
@@ -9230,6 +9255,7 @@ xlog_redo(XLogReaderState *record)
 				 */
 				InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_LEVEL,
 												   0, InvalidOid,
+												   InvalidTransactionId,
 												   InvalidTransactionId);
 			}
 			else if (sync_replication_slots)
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 0ed363d1c85..ed647e77615 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -48,6 +48,7 @@
 #include "postmaster/autovacuum.h"
 #include "postmaster/bgworker_internals.h"
 #include "postmaster/interrupt.h"
+#include "replication/slot.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 #include "storage/pmsignal.h"
@@ -1133,7 +1134,10 @@ vacuum_get_cutoffs(Relation rel, const VacuumParams *params,
 	 * that only one vacuum process can be working on a particular table at
 	 * any time, and that each vacuum is always an independent transaction.
 	 */
-	cutoffs->OldestXmin = GetOldestNonRemovableTransactionId(rel);
+	cutoffs->OldestXmin =
+		GetOldestNonRemovableTransactionIdWithSlotXids(rel,
+													   &cutoffs->OldestSlotXmin,
+													   &cutoffs->OldestSlotCatalogXmin);
 
 	Assert(TransactionIdIsNormal(cutoffs->OldestXmin));
 
@@ -2688,3 +2692,77 @@ vac_tid_reaped(ItemPointer itemptr, void *state)
 
 	return TidStoreIsMember(dead_items, itemptr);
 }
+
+/*
+ * Invalidate replication slots whose XID age exceeds max_slot_xid_age.
+ *
+ * The caller provides the overall oldest xmin along with the oldest
+ * slot and catalog_xmin, typically all obtained from a single consistent
+ * snapshot via ComputeXidHorizons(). These values are used to avoid
+ * unnecessary work: if the global oldest_xmin is held back by something
+ * other than a replication slot (e.g., a long-running transaction),
+ * invalidating slots would not advance the horizon and is therefore
+ * skipped. Similarly, no action is taken if the current horizons have
+ * not yet exceeded the threshold.
+ *
+ * Returns true if at least one slot was invalidated.
+ */
+bool
+maybe_invalidate_xid_aged_slots(TransactionId oldest_xmin,
+								TransactionId oldest_slot_xmin,
+								TransactionId oldest_slot_catalog_xmin)
+{
+	TransactionId xid_limit;
+	bool		slot_holds_oldest_xmin;
+
+	if (max_slot_xid_age == 0)
+		return false;
+
+	Assert(TransactionIdIsNormal(oldest_xmin));
+
+	/*
+	 * Check if a replication slot's xmin or catalog_xmin is what's holding
+	 * back oldest_xmin. If not, skip the unnecessary work.
+	 */
+	slot_holds_oldest_xmin =
+		(TransactionIdIsValid(oldest_slot_xmin) &&
+		 TransactionIdEquals(oldest_xmin, oldest_slot_xmin)) ||
+		(TransactionIdIsValid(oldest_slot_catalog_xmin) &&
+		 TransactionIdEquals(oldest_xmin, oldest_slot_catalog_xmin));
+
+	if (!slot_holds_oldest_xmin)
+		return false;
+
+	xid_limit = TransactionIdRetreatedBy(ReadNextTransactionId(),
+										 max_slot_xid_age);
+
+	/*
+	 * A replication slot is holding back oldest_xmin. We invalidate slots
+	 * that have exceeded the XID age limit.
+	 *
+	 * Note that while a non-catalog vacuum is technically only blocked by
+	 * physical slots' xmin values, we invalidate logical slots too that
+	 * exceed the XID age limit if we trigger the XID-age based slot
+	 * invalidation. One might think that this is unnecessary for non-catalog
+	 * tables as invalidating logical slots while vacuuming a non-catalog
+	 * table doesn't help advance vacuum cutoffs. But performing invalidation
+	 * trials for physical and logical slots would add complexity.
+	 *
+	 * In practice, XID-age-based invalidation is lightweight (e.g., it does
+	 * not require process termination). This unified approach keeps the API
+	 * simple by avoiding the need to distinguish between catalog and
+	 * non-catalog tables here.
+	 *
+	 * Note: Invalidating a slot does not guarantee that the oldest xmin will
+	 * advance. Due to a race condition, a long-running transaction might be
+	 * holding the same xmin as the slot. In such cases, the slot is
+	 * invalidated, but the global horizon remains unchanged.
+	 */
+	if (TransactionIdPrecedes(oldest_xmin, xid_limit))
+		return InvalidateObsoleteReplicationSlots(RS_INVAL_XID_AGE,
+												  0, InvalidOid,
+												  InvalidTransactionId,
+												  xid_limit);
+
+	return false;
+}
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index a9092fc2382..833ed196128 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -117,6 +117,7 @@ static const SlotInvalidationCauseMap SlotInvalidationCauses[] = {
 	{RS_INVAL_HORIZON, "rows_removed"},
 	{RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
 	{RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
+	{RS_INVAL_XID_AGE, "xid_aged"},
 };
 
 /*
@@ -158,6 +159,12 @@ int			max_replication_slots = 10; /* the maximum number of replication
  */
 int			idle_replication_slot_timeout_secs = 0;
 
+/*
+ * Invalidate replication slots that have xmin or catalog_xmin older
+ * than the specified age; '0' disables it.
+ */
+int			max_slot_xid_age = 0;
+
 /*
  * This GUC lists streaming replication standby server slot names that
  * logical WAL sender processes will wait for.
@@ -1780,7 +1787,10 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 					   XLogRecPtr restart_lsn,
 					   XLogRecPtr oldestLSN,
 					   TransactionId snapshotConflictHorizon,
-					   long slot_idle_seconds)
+					   long slot_idle_seconds,
+					   TransactionId xmin,
+					   TransactionId catalog_xmin,
+					   TransactionId xidLimit)
 {
 	StringInfoData err_detail;
 	StringInfoData err_hint;
@@ -1825,6 +1835,29 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
 								 "idle_replication_slot_timeout");
 				break;
 			}
+
+		case RS_INVAL_XID_AGE:
+			{
+				TransactionId slot_xid = TransactionIdIsValid(xmin) ? xmin : catalog_xmin;
+				int32		exceeded_by = (int32) (xidLimit - slot_xid);
+				int32		slot_age = (int32) max_slot_xid_age + exceeded_by;
+
+				/* Either the slot's xmin or catalog_xmin must be valid */
+				Assert(TransactionIdIsValid(slot_xid));
+
+				/* translator: %s is a GUC variable name */
+				appendStringInfo(&err_detail,
+								 TransactionIdIsValid(xmin)
+								 ? _("The slot's xmin age of %d exceeds the configured \"%s\" of %d by %d transactions")
+								 : _("The slot's catalog xmin age of %d exceeds the configured \"%s\" of %d by %d transactions"),
+								 slot_age, "max_slot_xid_age", max_slot_xid_age, exceeded_by);
+
+				/* translator: %s is a GUC variable name */
+				appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
+								 "max_slot_xid_age");
+				break;
+			}
+
 		case RS_INVAL_NONE:
 			pg_unreachable();
 	}
@@ -1863,6 +1896,25 @@ CanInvalidateIdleSlot(ReplicationSlot *s)
 			!(RecoveryInProgress() && s->data.synced));
 }
 
+/*
+ * Can we invalidate an XID-aged replication slot?
+ *
+ * XID-aged based invalidation is allowed to the given slot when:
+ *
+ * 1. Max XID-age is set
+ * 2. Slot has valid xmin or catalog_xmin
+ * 3. The slot is not being synced from the primary while the server is in
+ *	  recovery.
+ */
+static inline bool
+CanInvalidateXidAgedSlot(ReplicationSlot *s)
+{
+	return (max_slot_xid_age != 0 &&
+			(TransactionIdIsValid(s->data.xmin) ||
+			 TransactionIdIsValid(s->data.catalog_xmin)) &&
+			!(RecoveryInProgress() && s->data.synced));
+}
+
 /*
  * DetermineSlotInvalidationCause - Determine the cause for which a slot
  * becomes invalid among the given possible causes.
@@ -1874,6 +1926,7 @@ static ReplicationSlotInvalidationCause
 DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s,
 							   XLogRecPtr oldestLSN, Oid dboid,
 							   TransactionId snapshotConflictHorizon,
+							   TransactionId xidLimit,
 							   TimestampTz *inactive_since, TimestampTz now)
 {
 	Assert(possible_causes != RS_INVAL_NONE);
@@ -1945,6 +1998,18 @@ DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s,
 		}
 	}
 
+	/* Check if the slot needs to be invalidated due to max_slot_xid_age GUC */
+	if ((possible_causes & RS_INVAL_XID_AGE) && CanInvalidateXidAgedSlot(s))
+	{
+		Assert(TransactionIdIsValid(xidLimit));
+
+		if ((TransactionIdIsValid(s->data.xmin) &&
+			 TransactionIdPrecedes(s->data.xmin, xidLimit)) ||
+			(TransactionIdIsValid(s->data.catalog_xmin) &&
+			 TransactionIdPrecedes(s->data.catalog_xmin, xidLimit)))
+			return RS_INVAL_XID_AGE;
+	}
+
 	return RS_INVAL_NONE;
 }
 
@@ -1967,6 +2032,7 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
 							   ReplicationSlot *s,
 							   XLogRecPtr oldestLSN,
 							   Oid dboid, TransactionId snapshotConflictHorizon,
+							   TransactionId xidLimit,
 							   bool *released_lock_out)
 {
 	int			last_signaled_pid = 0;
@@ -2019,6 +2085,7 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
 																s, oldestLSN,
 																dboid,
 																snapshotConflictHorizon,
+																xidLimit,
 																&inactive_since,
 																now);
 
@@ -2112,7 +2179,8 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
 				ReportSlotInvalidation(invalidation_cause, true, active_pid,
 									   slotname, restart_lsn,
 									   oldestLSN, snapshotConflictHorizon,
-									   slot_idle_secs);
+									   slot_idle_secs, s->data.xmin,
+									   s->data.catalog_xmin, xidLimit);
 
 				if (MyBackendType == B_STARTUP)
 					(void) SignalRecoveryConflict(GetPGProcByNumber(active_proc),
@@ -2165,7 +2233,8 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
 			ReportSlotInvalidation(invalidation_cause, false, active_pid,
 								   slotname, restart_lsn,
 								   oldestLSN, snapshotConflictHorizon,
-								   slot_idle_secs);
+								   slot_idle_secs, s->data.xmin,
+								   s->data.catalog_xmin, xidLimit);
 
 			/* done with this slot for now */
 			break;
@@ -2192,6 +2261,8 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
  *   logical.
  * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
  *   "idle_replication_slot_timeout" duration.
+ * - RS_INVAL_XID_AGE: slot xid age is older than the configured
+ *   "max_slot_xid_age" age.
  *
  * Note: This function attempts to invalidate the slot for multiple possible
  * causes in a single pass, minimizing redundant iterations. The "cause"
@@ -2205,7 +2276,8 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
 bool
 InvalidateObsoleteReplicationSlots(uint32 possible_causes,
 								   XLogSegNo oldestSegno, Oid dboid,
-								   TransactionId snapshotConflictHorizon)
+								   TransactionId snapshotConflictHorizon,
+								   TransactionId xidLimit)
 {
 	XLogRecPtr	oldestLSN;
 	bool		invalidated = false;
@@ -2244,7 +2316,7 @@ restart:
 
 		if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN,
 										   dboid, snapshotConflictHorizon,
-										   &released_lock))
+										   xidLimit, &released_lock))
 		{
 			Assert(released_lock);
 
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index cc207cb56e3..898ef4a0833 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -1937,6 +1937,31 @@ GlobalVisHorizonKindForRel(Relation rel)
 		return VISHORIZON_TEMP;
 }
 
+/*
+ * A helper function to return the appropriate oldest non-removable
+ * TransactionId from the pre-computed horizons, based on the relation
+ * type.
+ */
+static pg_attribute_always_inline TransactionId
+GetOldestNonRemovableTransactionIdFromHorizons(ComputeXidHorizonsResult *horizons,
+											   Relation rel)
+{
+	switch (GlobalVisHorizonKindForRel(rel))
+	{
+		case VISHORIZON_SHARED:
+			return horizons->shared_oldest_nonremovable;
+		case VISHORIZON_CATALOG:
+			return horizons->catalog_oldest_nonremovable;
+		case VISHORIZON_DATA:
+			return horizons->data_oldest_nonremovable;
+		case VISHORIZON_TEMP:
+			return horizons->temp_oldest_nonremovable;
+	}
+
+	/* just to prevent compiler warnings */
+	return InvalidTransactionId;
+}
+
 /*
  * Return the oldest XID for which deleted tuples must be preserved in the
  * passed table.
@@ -1955,20 +1980,30 @@ GetOldestNonRemovableTransactionId(Relation rel)
 
 	ComputeXidHorizons(&horizons);
 
-	switch (GlobalVisHorizonKindForRel(rel))
-	{
-		case VISHORIZON_SHARED:
-			return horizons.shared_oldest_nonremovable;
-		case VISHORIZON_CATALOG:
-			return horizons.catalog_oldest_nonremovable;
-		case VISHORIZON_DATA:
-			return horizons.data_oldest_nonremovable;
-		case VISHORIZON_TEMP:
-			return horizons.temp_oldest_nonremovable;
-	}
+	return GetOldestNonRemovableTransactionIdFromHorizons(&horizons, rel);
+}
 
-	/* just to prevent compiler warnings */
-	return InvalidTransactionId;
+/*
+ * Same as GetOldestNonRemovableTransactionId(), but also returns the
+ * replication slot xmin and catalog_xmin from the same ComputeXidHorizons()
+ * call.  This avoids a separate ProcArrayLock acquisition when the caller
+ * needs both values.
+ */
+TransactionId
+GetOldestNonRemovableTransactionIdWithSlotXids(Relation rel,
+											   TransactionId *slot_xmin,
+											   TransactionId *slot_catalog_xmin)
+{
+	ComputeXidHorizonsResult horizons;
+
+	ComputeXidHorizons(&horizons);
+
+	if (slot_xmin)
+		*slot_xmin = horizons.slot_xmin;
+	if (slot_catalog_xmin)
+		*slot_catalog_xmin = horizons.slot_catalog_xmin;
+
+	return GetOldestNonRemovableTransactionIdFromHorizons(&horizons, rel);
 }
 
 /*
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index de9092fdf5b..d60f39ec08e 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -504,7 +504,8 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
 	 */
 	if (IsLogicalDecodingEnabled() && isCatalogRel)
 		InvalidateObsoleteReplicationSlots(RS_INVAL_HORIZON, 0, locator.dbOid,
-										   snapshotConflictHorizon);
+										   snapshotConflictHorizon,
+										   InvalidTransactionId);
 }
 
 /*
diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat
index a315c4ab8ab..5a438df93d0 100644
--- a/src/backend/utils/misc/guc_parameters.dat
+++ b/src/backend/utils/misc/guc_parameters.dat
@@ -2090,6 +2090,14 @@
   max => 'MAX_KILOBYTES',
 },
 
+{ name => 'max_slot_xid_age', type => 'int', context => 'PGC_SIGHUP', group => 'REPLICATION_SENDING',
+  short_desc => 'Age of the transaction ID at which a replication slot gets invalidated.',
+  variable => 'max_slot_xid_age',
+  boot_val => '0',
+  min => '0',
+  max => '2100000000',
+},
+
 # We use the hopefully-safely-small value of 100kB as the compiled-in
 # default for max_stack_depth.  InitializeGUCOptions will increase it
 # if possible, depending on the actual platform-specific stack limit.
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 6d0337853e0..1817330484d 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -351,6 +351,8 @@
 #wal_keep_size = 0              # in megabytes; 0 disables
 #max_slot_wal_keep_size = -1    # in megabytes; -1 disables
 #idle_replication_slot_timeout = 0      # in seconds; 0 disables
+#max_slot_xid_age = 0           # maximum XID age before a replication slot
+                                # gets invalidated; 0 disables
 #wal_sender_timeout = 60s       # in milliseconds; 0 disables
 #track_commit_timestamp = off   # collect timestamp of transaction commit
                                 # (change requires restart)
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index 37631f700af..9ebc71fcac2 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -1901,7 +1901,7 @@ start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_
 	appendPQExpBufferStr(pg_ctl_cmd, " -s -o \"-c sync_replication_slots=off\"");
 
 	/* Prevent unintended slot invalidation */
-	appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0\"");
+	appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c idle_replication_slot_timeout=0 -c max_slot_xid_age=0\"");
 
 	if (restricted_access)
 	{
diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h
index 5b8023616c0..c47558fc6b3 100644
--- a/src/include/commands/vacuum.h
+++ b/src/include/commands/vacuum.h
@@ -287,6 +287,13 @@ struct VacuumCutoffs
 	 */
 	TransactionId FreezeLimit;
 	MultiXactId MultiXactCutoff;
+
+	/*
+	 * Oldest xmin and catalog xmin of any replication slot obtained from the
+	 * same ComputeXidHorizons() call that computed OldestXmin.
+	 */
+	TransactionId OldestSlotXmin;
+	TransactionId OldestSlotCatalogXmin;
 };
 
 /*
@@ -399,6 +406,9 @@ extern IndexBulkDeleteResult *vac_bulkdel_one_index(IndexVacuumInfo *ivinfo,
 													VacDeadItemsInfo *dead_items_info);
 extern IndexBulkDeleteResult *vac_cleanup_one_index(IndexVacuumInfo *ivinfo,
 													IndexBulkDeleteResult *istat);
+extern bool maybe_invalidate_xid_aged_slots(TransactionId oldest_xmin,
+											TransactionId oldest_slot_xmin,
+											TransactionId oldest_slot_catalog_xmin);
 
 /* In postmaster/autovacuum.c */
 extern void AutoVacuumUpdateCostLimit(void);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 4b4709f6e2c..5040d53072b 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -66,10 +66,12 @@ typedef enum ReplicationSlotInvalidationCause
 	RS_INVAL_WAL_LEVEL = (1 << 2),
 	/* idle slot timeout has occurred */
 	RS_INVAL_IDLE_TIMEOUT = (1 << 3),
+	/* slot's xmin or catalog_xmin has reached max xid age */
+	RS_INVAL_XID_AGE = (1 << 4),
 } ReplicationSlotInvalidationCause;
 
 /* Maximum number of invalidation causes */
-#define	RS_INVAL_MAX_CAUSES 4
+#define	RS_INVAL_MAX_CAUSES 5
 
 /*
  * When the slot synchronization worker is running, or when
@@ -326,6 +328,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
 extern PGDLLIMPORT int max_replication_slots;
 extern PGDLLIMPORT char *synchronized_standby_slots;
 extern PGDLLIMPORT int idle_replication_slot_timeout_secs;
+extern PGDLLIMPORT int max_slot_xid_age;
 
 /* shmem initialization functions */
 extern Size ReplicationSlotsShmemSize(void);
@@ -367,7 +370,8 @@ extern void ReplicationSlotsDropDBSlots(Oid dboid);
 extern bool InvalidateObsoleteReplicationSlots(uint32 possible_causes,
 											   XLogSegNo oldestSegno,
 											   Oid dboid,
-											   TransactionId snapshotConflictHorizon);
+											   TransactionId snapshotConflictHorizon,
+											   TransactionId xidLimit);
 extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
 extern int	ReplicationSlotIndex(ReplicationSlot *slot);
 extern bool ReplicationSlotName(int index, Name name);
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index abdf021e66e..a94091ce7fd 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -53,6 +53,9 @@ extern RunningTransactions GetRunningTransactionData(void);
 
 extern bool TransactionIdIsInProgress(TransactionId xid);
 extern TransactionId GetOldestNonRemovableTransactionId(Relation rel);
+extern TransactionId GetOldestNonRemovableTransactionIdWithSlotXids(Relation rel,
+																	TransactionId *slot_xmin,
+																	TransactionId *slot_catalog_xmin);
 extern TransactionId GetOldestTransactionIdConsideredRunning(void);
 extern TransactionId GetOldestActiveTransactionId(bool inCommitOnly,
 												  bool allDbs);
diff --git a/src/test/recovery/t/019_replslot_limit.pl b/src/test/recovery/t/019_replslot_limit.pl
index 7b253e64d9c..6b7e4818bf4 100644
--- a/src/test/recovery/t/019_replslot_limit.pl
+++ b/src/test/recovery/t/019_replslot_limit.pl
@@ -540,4 +540,179 @@ is( $publisher4->safe_psql(
 $publisher4->stop;
 $subscriber4->stop;
 
+# Wait for the given slot to be invalidated with reason 'xid_aged'
+sub wait_for_xid_aged_invalidation
+{
+	my ($node, $slot_name) = @_;
+	$node->poll_query_until(
+		'postgres', qq[
+		SELECT COUNT(slot_name) = 1 FROM pg_replication_slots
+			WHERE slot_name = '$slot_name' AND
+			active = false AND
+			invalidation_reason = 'xid_aged';
+	]) or die "Timed out waiting for slot $slot_name to be invalidated";
+}
+
+# =====================================================================
+# Testcase start: Invalidate physical slot due to max_slot_xid_age GUC
+
+# Initialize primary node for XID age tests
+my $primary5 = PostgreSQL::Test::Cluster->new('primary5');
+$primary5->init(allows_streaming => 'logical');
+
+# Disable autovacuum so checkpointer triggers the invalidation
+my $max_slot_xid_age = 100;
+$primary5->append_conf(
+	'postgresql.conf', qq{
+max_slot_xid_age = $max_slot_xid_age
+autovacuum = off
+});
+
+$primary5->start;
+
+# Create a procedure to consume XIDs
+$primary5->safe_psql(
+	'postgres', qq{
+	CREATE PROCEDURE consume_xid(cnt int)
+	AS \$\$
+	DECLARE
+	    i int;
+	BEGIN
+	    FOR i IN 1..cnt LOOP
+	        EXECUTE 'SELECT pg_current_xact_id()';
+	        COMMIT;
+	    END LOOP;
+	END;
+	\$\$ LANGUAGE plpgsql;
+});
+
+# Take a backup for creating standby
+$backup_name = 'backup5';
+$primary5->backup($backup_name);
+
+# Create standby with HS feedback so the slot gains an xmin
+my $standby5 = PostgreSQL::Test::Cluster->new('standby5');
+$standby5->init_from_backup($primary5, $backup_name, has_streaming => 1);
+$standby5->append_conf(
+	'postgresql.conf', q{
+primary_slot_name = 'sb5_slot'
+hot_standby_feedback = on
+wal_receiver_status_interval = 1
+});
+$primary5->safe_psql(
+	'postgres', qq[
+    SELECT pg_create_physical_replication_slot(slot_name := 'sb5_slot', immediately_reserve := true);
+]);
+$standby5->start;
+
+# Create some content on primary to move xmin
+$primary5->safe_psql('postgres',
+	"CREATE TABLE tab_int5 AS SELECT generate_series(1,10) AS a");
+$primary5->wait_for_catchup($standby5);
+
+# Wait for the physical slot to get xmin via hot_standby_feedback
+$primary5->poll_query_until(
+	'postgres', qq[
+	SELECT xmin IS NOT NULL
+		FROM pg_catalog.pg_replication_slots
+		WHERE slot_name = 'sb5_slot';
+]) or die "Timed out waiting for slot sb5_slot xmin from HS feedback";
+
+# Stop standby so the slot becomes inactive with its xmin frozen
+$standby5->stop;
+
+# Advance XIDs past 2x max_slot_xid_age so the slot's xmin is stale enough
+$primary5->safe_psql('postgres', qq{CALL consume_xid(2 * $max_slot_xid_age)});
+$primary5->safe_psql('postgres', "CHECKPOINT");
+wait_for_xid_aged_invalidation($primary5, 'sb5_slot');
+ok(1, "physical slot invalidated due to XID age (via checkpoint)");
+
+# Testcase end: Invalidate physical slot due to max_slot_xid_age GUC
+# ===================================================================
+
+# ====================================================================
+# Testcase start: Invalidate logical slot due to max_slot_xid_age GUC
+
+# Create a logical slot directly on the primary (no subscriber needed).
+# The slot gets a catalog_xmin immediately upon creation.
+$primary5->safe_psql('postgres',
+	"SELECT pg_create_logical_replication_slot('lsub5_slot', 'pgoutput')");
+
+$primary5->poll_query_until(
+	'postgres', qq[
+	SELECT catalog_xmin IS NOT NULL
+	FROM pg_catalog.pg_replication_slots
+	WHERE slot_name = 'lsub5_slot';
+]) or die "Timed out waiting for slot lsub5_slot catalog_xmin";
+
+# Advance XIDs past 2x max_slot_xid_age so the slot's catalog_xmin is stale enough
+$primary5->safe_psql('postgres', qq{CALL consume_xid(2 * $max_slot_xid_age)});
+
+# Vacuum a user table so OldestXmin does not include the slot's catalog_xmin,
+# skipping the invalidation of the slot.
+$primary5->safe_psql('postgres', "VACUUM tab_int5");
+is( $primary5->safe_psql(
+		'postgres',
+		qq[SELECT invalidation_reason IS NULL FROM pg_replication_slots WHERE slot_name = 'lsub5_slot';]
+	),
+	't',
+	'logical slot not invalidated after vacuuming a data table');
+
+# Vacuum a catalog table so OldestXmin includes the slot's catalog_xmin,
+# triggering invalidation of the slot.
+$primary5->safe_psql('postgres', "VACUUM pg_class");
+wait_for_xid_aged_invalidation($primary5, 'lsub5_slot');
+ok(1, "logical slot invalidated due to XID age (via vacuum)");
+
+# Testcase end: Invalidate logical slot due to max_slot_xid_age GUC
+# ==================================================================
+
+# ===============================================================================
+# Testcase start: Invalidate logical slot on standby due to max_slot_xid_age GUC
+
+# Disable max_slot_xid_age on primary and recreate the streaming slot
+$primary5->safe_psql(
+	'postgres',
+	q{
+ALTER SYSTEM SET max_slot_xid_age = 0;
+SELECT pg_reload_conf();
+});
+$primary5->safe_psql('postgres',
+	"SELECT pg_drop_replication_slot('sb5_slot')");
+$primary5->safe_psql('postgres',
+	"SELECT pg_create_physical_replication_slot('sb5_slot', true)");
+$standby5->append_conf(
+	'postgresql.conf', qq{
+max_slot_xid_age = $max_slot_xid_age
+autovacuum = off
+});
+$standby5->start;
+
+$primary5->wait_for_catchup($standby5);
+
+$standby5->create_logical_slot_on_standby($primary5, 'sb5_logical_slot',
+	'postgres');
+
+$standby5->poll_query_until(
+	'postgres', qq[
+	SELECT catalog_xmin IS NOT NULL
+	FROM pg_catalog.pg_replication_slots
+	WHERE slot_name = 'sb5_logical_slot';
+]) or die "Timed out waiting for sb5_logical_slot catalog_xmin";
+
+# Advance XIDs on primary, replay on standby, then restartpoint to invalidate
+$primary5->safe_psql('postgres', qq{CALL consume_xid(2 * $max_slot_xid_age)});
+$primary5->safe_psql('postgres', "CHECKPOINT");
+$primary5->wait_for_replay_catchup($standby5);
+$standby5->safe_psql('postgres', "CHECKPOINT");
+
+wait_for_xid_aged_invalidation($standby5, 'sb5_logical_slot');
+ok(1, "logical (standby) slot invalidated due to XID age (via restartpoint)");
+
+$standby5->stop;
+$primary5->stop;
+
+# Testcase end: Invalidate logical slot on standby due to max_slot_xid_age GUC
+# =============================================================================
+
 done_testing();
-- 
2.53.0

