From c741e5c1205efedcc6841c85ab0539832c658578 Mon Sep 17 00:00:00 2001 From: alterego655 <824662526@qq.com> Date: Fri, 12 Jun 2026 10:27:04 +0800 Subject: [PATCH v1] Avoid stale slot pointers in slotsync cleanup drop_local_obsolete_slots() kept raw ReplicationSlot * values after scanning the shared slot array. Once ReplicationSlotControlLock was released, those array entries could be dropped and reused before the later retention check or database-lock/drop path. Copy the local synced slot identity while scanning the array, and carry those copied values instead of raw slot pointers. Revalidate the copied identity before acquiring and dropping the slot, and use the copied name and database OID for acquire, unlock, and logging. This makes the cleanup path avoid depending on slot array entries remaining stable across unlocked windows. --- src/backend/replication/logical/slotsync.c | 205 +++++++++++++++++---- src/tools/pgindent/typedefs.list | 1 + 2 files changed, 167 insertions(+), 39 deletions(-) diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index ea73f0aa262..a323df452fd 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -178,6 +178,20 @@ typedef struct RemoteSlot ReplicationSlotInvalidationCause invalidated; } RemoteSlot; +/* + * Copied identity of a local synchronized slot. + * + * The slot number is a cached array position used to avoid a later name scan. + * It is not sufficient as identity by itself because slot array entries can be + * reused after a slot is dropped. + */ +typedef struct LocalSyncedSlot +{ + NameData name; + Oid database; + int slotno; +} LocalSyncedSlot; + static void slotsync_failure_callback(int code, Datum arg); static void update_synced_slots_inactive_since(void); @@ -444,7 +458,7 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) } /* - * Get the list of local logical slots that are synchronized from the + * Get copied identities of local logical slots that are synchronized from the * primary server. */ static List * @@ -459,10 +473,29 @@ get_local_synced_slots(void) ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; /* Check if it is a synchronized slot */ - if (s->in_use && s->data.synced) + if (s->in_use) { - Assert(SlotIsLogical(s)); - local_slots = lappend(local_slots, s); + LocalSyncedSlot local_slot; + bool synced; + + SpinLockAcquire(&s->mutex); + synced = s->data.synced; + if (synced) + { + local_slot.name = s->data.name; + local_slot.database = s->data.database; + local_slot.slotno = i; + } + SpinLockRelease(&s->mutex); + + if (synced) + { + LocalSyncedSlot *slot = palloc(sizeof(LocalSyncedSlot)); + + Assert(local_slot.database != InvalidOid); + *slot = local_slot; + local_slots = lappend(local_slots, slot); + } } } @@ -471,34 +504,125 @@ get_local_synced_slots(void) return local_slots; } +/* + * Check whether the previously observed slot array cell still contains a + * synchronized logical slot matching the copied identity. If requested, copy + * the current invalidation cause. + */ +static bool +local_synced_slot_matches(const LocalSyncedSlot *local_slot, + ReplicationSlotInvalidationCause *invalidated) +{ + ReplicationSlot *slot; + bool matches = false; + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + + Assert(local_slot->slotno >= 0); + Assert(local_slot->slotno < + max_replication_slots + max_repack_replication_slots); + + slot = &ReplicationSlotCtl->replication_slots[local_slot->slotno]; + if (slot->in_use) + { + NameData slot_name; + Oid slot_database; + bool slot_synced; + ReplicationSlotInvalidationCause slot_invalidated; + + SpinLockAcquire(&slot->mutex); + slot_name = slot->data.name; + slot_database = slot->data.database; + slot_synced = slot->data.synced; + slot_invalidated = slot->data.invalidated; + SpinLockRelease(&slot->mutex); + + /* + * A synced slot's database can be updated by slot synchronization, so + * database is not globally immutable. It is still part of the copied + * identity here because slot synchronization is serialized by + * SlotSyncCtx->syncing. + */ + matches = slot_database != InvalidOid && + slot_synced && + strcmp(NameStr(slot_name), NameStr(local_slot->name)) == 0 && + slot_database == local_slot->database; + + if (matches && invalidated) + *invalidated = slot_invalidated; + } + + LWLockRelease(ReplicationSlotControlLock); + + return matches; +} + +/* + * Check whether the acquired slot still matches a previously copied local slot + * identity. + */ +static bool +acquired_slot_matches(const LocalSyncedSlot *local_slot) +{ + ReplicationSlot *slot = MyReplicationSlot; + NameData slot_name; + Oid slot_database; + bool slot_synced; + int slotno; + + Assert(slot != NULL); + + /* + * We own MyReplicationSlot, so slot.h allows reading its fields without + * taking the slot mutex. + */ + slot_name = slot->data.name; + slot_database = slot->data.database; + slot_synced = slot->data.synced; + slotno = ReplicationSlotIndex(slot); + + return slotno == local_slot->slotno && + slot_database != InvalidOid && + slot_synced && + strcmp(NameStr(slot_name), NameStr(local_slot->name)) == 0 && + slot_database == local_slot->database; +} + /* * Helper function to check if local_slot is required to be retained. * * Return false either if local_slot does not exist in the remote_slots list - * or is invalidated while the corresponding remote slot is still valid, - * otherwise true. + * or is invalidated while the corresponding remote slot is still valid, or if + * the copied local slot no longer matches the current slot array entry. + * Otherwise, return true. */ static bool -local_sync_slot_required(ReplicationSlot *local_slot, List *remote_slots) +local_sync_slot_required(const LocalSyncedSlot *local_slot, List *remote_slots) { bool remote_exists = false; bool locally_invalidated = false; foreach_ptr(RemoteSlot, remote_slot, remote_slots) { - if (strcmp(remote_slot->name, NameStr(local_slot->data.name)) == 0) + if (strcmp(remote_slot->name, NameStr(local_slot->name)) == 0) { remote_exists = true; /* * If remote slot is not invalidated but local slot is marked as - * invalidated, then set locally_invalidated flag. + * invalidated, then set locally_invalidated flag. When the remote + * slot is already invalidated, keep the local slot as before and + * avoid an unnecessary revalidation. */ - SpinLockAcquire(&local_slot->mutex); - locally_invalidated = - (remote_slot->invalidated == RS_INVAL_NONE) && - (local_slot->data.invalidated != RS_INVAL_NONE); - SpinLockRelease(&local_slot->mutex); + if (remote_slot->invalidated == RS_INVAL_NONE) + { + ReplicationSlotInvalidationCause invalidated; + + if (!local_synced_slot_matches(local_slot, &invalidated)) + return false; + + locally_invalidated = invalidated != RS_INVAL_NONE; + } break; } @@ -536,40 +660,31 @@ drop_local_obsolete_slots(List *remote_slot_list) { List *local_slots = get_local_synced_slots(); - foreach_ptr(ReplicationSlot, local_slot, local_slots) + foreach_ptr(LocalSyncedSlot, local_slot, local_slots) { /* Drop the local slot if it is not required to be retained. */ if (!local_sync_slot_required(local_slot, remote_slot_list)) { bool dropped = false; - NameData slot_name = {0}; - Oid slot_database = local_slot->data.database; - bool synced_slot; /* * Use shared lock to prevent a conflict with * ReplicationSlotsDropDBSlots(), trying to drop the same slot * during a drop-database operation. */ - LockSharedObject(DatabaseRelationId, slot_database, 0, + LockSharedObject(DatabaseRelationId, local_slot->database, 0, AccessShareLock); /* - * In the small window between getting the slot to drop and - * locking the database, there is a possibility of a parallel - * database drop by the startup process and the creation of a new - * slot by the user. This new user-created slot may end up using - * the same shared memory as that of 'local_slot'. Thus check if - * local_slot is still the synced one before performing the actual - * drop. + * In the window between copying the slot identity and locking the + * database, there is a possibility of a parallel database drop by + * the startup process and the creation of a new slot by the user. + * This new user-created slot may end up using the same shared + * memory cell as the copied slot. Thus check whether the cached + * slot position still contains the same synced slot before + * performing the actual drop. */ - SpinLockAcquire(&local_slot->mutex); - synced_slot = local_slot->in_use && local_slot->data.synced; - if (synced_slot) - slot_name = local_slot->data.name; - SpinLockRelease(&local_slot->mutex); - - if (synced_slot) + if (local_synced_slot_matches(local_slot, NULL)) { /* * Now acquire and drop the slot. Note we purposely don't @@ -577,21 +692,33 @@ drop_local_obsolete_slots(List *remote_slot_list) * a standby, which derives its logical decoding state from * the primary, it would be wrong to do so. */ - ReplicationSlotAcquire(NameStr(slot_name), true, false); - ReplicationSlotDropAcquired(false); - dropped = true; + ReplicationSlotAcquire(NameStr(local_slot->name), true, false); + + /* + * Recheck the acquired slot defensively in case the slot + * changed between revalidation and acquisition. + */ + if (acquired_slot_matches(local_slot)) + { + ReplicationSlotDropAcquired(false); + dropped = true; + } + else + ReplicationSlotRelease(); } - UnlockSharedObject(DatabaseRelationId, slot_database, 0, + UnlockSharedObject(DatabaseRelationId, local_slot->database, 0, AccessShareLock); if (dropped) ereport(LOG, errmsg("dropped replication slot \"%s\" of database with OID %u", - NameStr(slot_name), - slot_database)); + NameStr(local_slot->name), + local_slot->database)); } } + + list_free_deep(local_slots); } /* diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 8cf40c87043..663383b0531 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1646,6 +1646,7 @@ LoInfo LoadStmt LocalBufferLookupEnt LocalPgBackendStatus +LocalSyncedSlot LocalTransactionId Location LocationIndex -- 2.51.0