From e99ef98faad4a2ff32daa4db36daceddf8225cf7 Mon Sep 17 00:00:00 2001 From: Shveta Malik Date: Tue, 30 Sep 2025 14:54:41 +0530 Subject: [PATCH] refactored synchronize_one_slot --- src/backend/replication/logical/slotsync.c | 209 ++++++++------------- 1 file changed, 82 insertions(+), 127 deletions(-) diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index dbf9380cb39..a8d44fe6b45 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -649,6 +649,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) return false; } +retry: /* Search for the named slot */ if ((slot = SearchNamedReplicationSlot(remote_slot->name, true))) { @@ -666,146 +667,100 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) /* Check if we need to overwrite an existing logical slot */ if (allow_overwrite) { - /* Recreate the failover slot and synchronize it */ - NameData plugin_name; - TransactionId xmin_horizon = InvalidTransactionId; - ereport(LOG, - errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("slot \"%s\" already exists" - " on the standby but try to recreate it because " - "flag allow_overwrite is set to true", - remote_slot->name)); + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("slot \"%s\" already exists" + " on the standby but will be overwritten as" + " allow_overwrite is set to true", + remote_slot->name)); /* Get rid of a replication slot that is no longer wanted */ - ReplicationSlotDrop(remote_slot->name,true); - - /* Skip creating the local slot if remote_slot is invalidated already */ - if (remote_slot->invalidated != RS_INVAL_NONE) - return false; - - /* - * We create temporary slots instead of ephemeral slots here because - * we want the slots to survive after releasing them. This is done to - * avoid dropping and re-creating the slots in each synchronization - * cycle if the restart_lsn or catalog_xmin of the remote slot has not - * caught up. - */ - ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY, - remote_slot->two_phase, - remote_slot->failover, - true, - true); - - /* For shorter lines. */ - slot = MyReplicationSlot; - - /* Avoid expensive operations while holding a spinlock. */ - namestrcpy(&plugin_name, remote_slot->plugin); - - SpinLockAcquire(&slot->mutex); - slot->data.database = remote_dbid; - slot->data.plugin = plugin_name; - SpinLockRelease(&slot->mutex); - - reserve_wal_for_local_slot(remote_slot->restart_lsn); - - LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); - xmin_horizon = GetOldestSafeDecodingTransactionId(true); - SpinLockAcquire(&slot->mutex); - slot->effective_catalog_xmin = xmin_horizon; - slot->data.catalog_xmin = xmin_horizon; - SpinLockRelease(&slot->mutex); - ReplicationSlotsComputeRequiredXmin(true); - LWLockRelease(ProcArrayLock); - - update_and_persist_local_synced_slot(remote_slot, remote_dbid); - - slot_updated = true; + ReplicationSlotAcquire(remote_slot->name, true, false); + ReplicationSlotDropAcquired(); + goto retry; } else + { ereport(ERROR, - errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("exiting from slot synchronization because same" - " name slot \"%s\" already exists on the standby", - remote_slot->name)); + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("exiting from slot synchronization because same" + " name slot \"%s\" already exists on the standby", + remote_slot->name)); + } } - else - { - /* - * The slot has been synchronized before. - * - * It is important to acquire the slot here before checking - * invalidation. If we don't acquire the slot first, there could be a - * race condition that the local slot could be invalidated just after - * checking the 'invalidated' flag here and we could end up - * overwriting 'invalidated' flag to remote_slot's value. See - * InvalidatePossiblyObsoleteSlot() where it invalidates slot directly - * if the slot is not acquired by other processes. - * - * XXX: If it ever turns out that slot acquire/release is costly for - * cases when none of the slot properties is changed then we can do a - * pre-check to ensure that at least one of the slot properties is - * changed before acquiring the slot. - */ - ReplicationSlotAcquire(remote_slot->name, true, false); - - Assert(slot == MyReplicationSlot); + /* + * The slot has been synchronized before. + * + * It is important to acquire the slot here before checking + * invalidation. If we don't acquire the slot first, there could be a + * race condition that the local slot could be invalidated just after + * checking the 'invalidated' flag here and we could end up + * overwriting 'invalidated' flag to remote_slot's value. See + * InvalidatePossiblyObsoleteSlot() where it invalidates slot directly + * if the slot is not acquired by other processes. + * + * XXX: If it ever turns out that slot acquire/release is costly for + * cases when none of the slot properties is changed then we can do a + * pre-check to ensure that at least one of the slot properties is + * changed before acquiring the slot. + */ + ReplicationSlotAcquire(remote_slot->name, true, false); - /* - * Copy the invalidation cause from remote only if local slot is not - * invalidated locally, we don't want to overwrite existing one. - */ - if (slot->data.invalidated == RS_INVAL_NONE && - remote_slot->invalidated != RS_INVAL_NONE) - { - SpinLockAcquire(&slot->mutex); - slot->data.invalidated = remote_slot->invalidated; - SpinLockRelease(&slot->mutex); + Assert(slot == MyReplicationSlot); - /* Make sure the invalidated state persists across server restart */ - ReplicationSlotMarkDirty(); - ReplicationSlotSave(); + /* + * Copy the invalidation cause from remote only if local slot is not + * invalidated locally, we don't want to overwrite existing one. + */ + if (slot->data.invalidated == RS_INVAL_NONE && + remote_slot->invalidated != RS_INVAL_NONE) + { + SpinLockAcquire(&slot->mutex); + slot->data.invalidated = remote_slot->invalidated; + SpinLockRelease(&slot->mutex); - slot_updated = true; - } + /* Make sure the invalidated state persists across server restart */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); - /* Skip the sync of an invalidated slot */ - if (slot->data.invalidated != RS_INVAL_NONE) - { - ReplicationSlotRelease(); - return slot_updated; - } + slot_updated = true; + } - /* Slot not ready yet, let's attempt to make it sync-ready now. */ - if (slot->data.persistency == RS_TEMPORARY) - { - slot_updated = update_and_persist_local_synced_slot(remote_slot, - remote_dbid); - } + /* Skip the sync of an invalidated slot */ + if (slot->data.invalidated != RS_INVAL_NONE) + { + ReplicationSlotRelease(); + return slot_updated; + } - /* Slot ready for sync, so sync it. */ - else - { - /* - * Sanity check: As long as the invalidations are handled - * appropriately as above, this should never happen. - * - * We don't need to check restart_lsn here. See the comments in - * update_local_synced_slot() for details. - */ - if (remote_slot->confirmed_lsn < slot->data.confirmed_flush) - ereport(ERROR, - errmsg_internal("cannot synchronize local slot \"%s\"", - remote_slot->name), - errdetail_internal("Local slot's start streaming location LSN(%X/%08X) is ahead of remote slot's LSN(%X/%08X).", - LSN_FORMAT_ARGS(slot->data.confirmed_flush), - LSN_FORMAT_ARGS(remote_slot->confirmed_lsn))); - - slot_updated = update_local_synced_slot(remote_slot, remote_dbid, - NULL, NULL); - } + /* Slot not ready yet, let's attempt to make it sync-ready now. */ + if (slot->data.persistency == RS_TEMPORARY) + { + slot_updated = update_and_persist_local_synced_slot(remote_slot, + remote_dbid); + } + + /* Slot ready for sync, so sync it. */ + else + { + /* + * Sanity check: As long as the invalidations are handled + * appropriately as above, this should never happen. + * + * We don't need to check restart_lsn here. See the comments in + * update_local_synced_slot() for details. + */ + if (remote_slot->confirmed_lsn < slot->data.confirmed_flush) + ereport(ERROR, + errmsg_internal("cannot synchronize local slot \"%s\"", + remote_slot->name), + errdetail_internal("Local slot's start streaming location LSN(%X/%08X) is ahead of remote slot's LSN(%X/%08X).", + LSN_FORMAT_ARGS(slot->data.confirmed_flush), + LSN_FORMAT_ARGS(remote_slot->confirmed_lsn))); + + slot_updated = update_local_synced_slot(remote_slot, remote_dbid, + NULL, NULL); } } /* Otherwise create the slot first. */ @@ -828,7 +783,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY, remote_slot->two_phase, remote_slot->failover, - true, + false, true); /* For shorter lines. */ -- 2.34.1