From 55533aa3cdc2fecbf7b6b6c661649342a204e73b Mon Sep 17 00:00:00 2001 From: Alvaro Herrera Date: Wed, 12 Jul 2017 18:38:33 -0400 Subject: [PATCH v3 1/1] Wait for slot to become free in when dropping it --- src/backend/replication/logical/logicalfuncs.c | 2 +- src/backend/replication/slot.c | 79 +++++++++++++++++++------- src/backend/replication/slotfuncs.c | 2 +- src/backend/replication/walsender.c | 6 +- src/include/replication/slot.h | 8 ++- 5 files changed, 69 insertions(+), 28 deletions(-) diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 363ca82cb0..a3ba2b1266 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -244,7 +244,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin else end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID); - ReplicationSlotAcquire(NameStr(*name)); + ReplicationSlotAcquire(NameStr(*name), true); PG_TRY(); { diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index dc7de20e11..76198a627d 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -157,6 +157,7 @@ ReplicationSlotsShmemInit(void) /* everything else is zeroed by the memset above */ SpinLockInit(&slot->mutex); LWLockInitialize(&slot->io_in_progress_lock, LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS); + ConditionVariableInit(&slot->active_cv); } } } @@ -313,24 +314,27 @@ ReplicationSlotCreate(const char *name, bool db_specific, LWLockRelease(ReplicationSlotControlLock); /* - * Now that the slot has been marked as in_use and in_active, it's safe to + * Now that the slot has been marked as in_use and active, it's safe to * let somebody else try to allocate a slot. */ LWLockRelease(ReplicationSlotAllocationLock); + + ConditionVariableBroadcast(&slot->active_cv); } /* * Find a previously created slot and mark it as used by this backend. */ void -ReplicationSlotAcquire(const char *name) +ReplicationSlotAcquire(const char *name, bool nowait) { ReplicationSlot *slot = NULL; + int active_pid = 0; int i; - int active_pid = 0; /* Keep compiler quiet */ Assert(MyReplicationSlot == NULL); +retry: /* Search for the named slot and mark it active if we find it. */ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) @@ -339,27 +343,52 @@ ReplicationSlotAcquire(const char *name) if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0) { + /* Found the slot we want -- can we activate it? */ SpinLockAcquire(&s->mutex); + active_pid = s->active_pid; if (active_pid == 0) active_pid = s->active_pid = MyProcPid; + SpinLockRelease(&s->mutex); slot = s; + break; } } LWLockRelease(ReplicationSlotControlLock); - /* If we did not find the slot or it was already active, error out. */ + /* If we did not find the slot, error out. */ if (slot == NULL) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("replication slot \"%s\" does not exist", name))); + + /* + * If we found the slot but it's already active in another backend, we + * either error out or retry after a short wait, as caller specified. + */ if (active_pid != MyProcPid) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_IN_USE), - errmsg("replication slot \"%s\" is active for PID %d", - name, active_pid))); + { + if (nowait) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_IN_USE), + errmsg("replication slot \"%s\" is active for PID %d", + name, active_pid))); + + /* Wait here until we get signaled by whoever is active */ + ConditionVariablePrepareToSleep(&slot->active_cv); + ConditionVariableSleep(&slot->active_cv, PG_WAIT_LOCK); + ConditionVariableCancelSleep(); + + goto retry; + } + + /* + * If another process lost the race to set the slot active, it's now + * sleeping; wake it up so that it can continue and fail properly. + */ + ConditionVariableBroadcast(&slot->active_cv); /* We made this slot active, so it's ours now. */ MyReplicationSlot = slot; @@ -385,17 +414,6 @@ ReplicationSlotRelease(void) */ ReplicationSlotDropAcquired(); } - else if (slot->data.persistency == RS_PERSISTENT) - { - /* - * Mark persistent slot inactive. We're not freeing it, just - * disconnecting. - */ - SpinLockAcquire(&slot->mutex); - slot->active_pid = 0; - SpinLockRelease(&slot->mutex); - } - /* * If slot needed to temporarily restrain both data and catalog xmin to @@ -412,6 +430,18 @@ ReplicationSlotRelease(void) ReplicationSlotsComputeRequiredXmin(false); } + if (slot->data.persistency == RS_PERSISTENT) + { + /* + * Mark persistent slot inactive. We're not freeing it, just + * disconnecting, but wake up others that may be waiting for it. + */ + SpinLockAcquire(&slot->mutex); + slot->active_pid = 0; + SpinLockRelease(&slot->mutex); + ConditionVariableBroadcast(&slot->active_cv); + } + MyReplicationSlot = NULL; /* might not have been set when we've been a plain slot */ @@ -438,6 +468,7 @@ ReplicationSlotCleanup(void) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + /* XXX why is it okay to read unlocked here? */ if (s->active_pid == MyProcPid) { Assert(s->in_use && s->data.persistency == RS_TEMPORARY); @@ -451,11 +482,11 @@ ReplicationSlotCleanup(void) * Permanently drop replication slot identified by the passed in name. */ void -ReplicationSlotDrop(const char *name) +ReplicationSlotDrop(const char *name, bool nowait) { Assert(MyReplicationSlot == NULL); - ReplicationSlotAcquire(name); + ReplicationSlotAcquire(name, nowait); ReplicationSlotDropAcquired(); } @@ -527,6 +558,9 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) slot->active_pid = 0; SpinLockRelease(&slot->mutex); + /* wake up anyone waiting on this slot */ + ConditionVariableBroadcast(&slot->active_cv); + ereport(fail_softly ? WARNING : ERROR, (errcode_for_file_access(), errmsg("could not rename file \"%s\" to \"%s\": %m", @@ -539,11 +573,14 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) * grabbing the mutex because nobody else can be scanning the array here, * and nobody can be attached to this slot and thus access it without * scanning the array. + * + * Also wake up processes waiting for it. */ LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); slot->active_pid = 0; slot->in_use = false; LWLockRelease(ReplicationSlotControlLock); + ConditionVariableBroadcast(&slot->active_cv); /* * Slot is dead and doesn't prevent resource removal anymore, recompute diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 6dc808874d..a5ecc85ba5 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -171,7 +171,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) CheckSlotRequirements(); - ReplicationSlotDrop(NameStr(*name)); + ReplicationSlotDrop(NameStr(*name), false); PG_RETURN_VOID(); } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 002143b26a..9a2babef1e 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -541,7 +541,7 @@ StartReplication(StartReplicationCmd *cmd) if (cmd->slotname) { - ReplicationSlotAcquire(cmd->slotname); + ReplicationSlotAcquire(cmd->slotname, true); if (SlotIsLogical(MyReplicationSlot)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), @@ -1028,7 +1028,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) static void DropReplicationSlot(DropReplicationSlotCmd *cmd) { - ReplicationSlotDrop(cmd->slotname); + ReplicationSlotDrop(cmd->slotname, false); EndCommand("DROP_REPLICATION_SLOT", DestRemote); } @@ -1046,7 +1046,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) Assert(!MyReplicationSlot); - ReplicationSlotAcquire(cmd->slotname); + ReplicationSlotAcquire(cmd->slotname, true); /* * Force a disconnect, so that the decoding code doesn't need to care diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index a283f4e2b8..f52e988a6d 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -12,6 +12,7 @@ #include "fmgr.h" #include "access/xlog.h" #include "access/xlogreader.h" +#include "storage/condition_variable.h" #include "storage/lwlock.h" #include "storage/shmem.h" #include "storage/spin.h" @@ -117,6 +118,9 @@ typedef struct ReplicationSlot /* is somebody performing io on this slot? */ LWLock io_in_progress_lock; + /* Condition variable signalled when active_pid changes */ + ConditionVariable active_cv; + /* all the remaining data is only used for logical slots */ /* @@ -162,9 +166,9 @@ extern void ReplicationSlotsShmemInit(void); extern void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency p); extern void ReplicationSlotPersist(void); -extern void ReplicationSlotDrop(const char *name); +extern void ReplicationSlotDrop(const char *name, bool nowait); -extern void ReplicationSlotAcquire(const char *name); +extern void ReplicationSlotAcquire(const char *name, bool nowait); extern void ReplicationSlotRelease(void); extern void ReplicationSlotCleanup(void); extern void ReplicationSlotSave(void); -- 2.11.0