From 33f08678bf20eed3a4cb3d10960bb06543a1b3db Mon Sep 17 00:00:00 2001 From: Alvaro Herrera Date: Wed, 12 Jul 2017 18:38:33 -0400 Subject: [PATCH v4] Wait for slot to become free in when dropping it --- src/backend/replication/logical/logicalfuncs.c | 2 +- src/backend/replication/slot.c | 115 ++++++++++++++++++------- src/backend/replication/slotfuncs.c | 32 ++++--- src/backend/replication/walsender.c | 6 +- src/include/replication/slot.h | 10 ++- 5 files changed, 110 insertions(+), 55 deletions(-) diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 363ca82cb0..a3ba2b1266 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -244,7 +244,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin else end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID); - ReplicationSlotAcquire(NameStr(*name)); + ReplicationSlotAcquire(NameStr(*name), true); PG_TRY(); { diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index dc7de20e11..ea9cd1f22b 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -157,6 +157,7 @@ ReplicationSlotsShmemInit(void) /* everything else is zeroed by the memset above */ SpinLockInit(&slot->mutex); LWLockInitialize(&slot->io_in_progress_lock, LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS); + ConditionVariableInit(&slot->active_cv); } } } @@ -313,25 +314,35 @@ ReplicationSlotCreate(const char *name, bool db_specific, LWLockRelease(ReplicationSlotControlLock); /* - * Now that the slot has been marked as in_use and in_active, it's safe to + * Now that the slot has been marked as in_use and active, it's safe to * let somebody else try to allocate a slot. */ LWLockRelease(ReplicationSlotAllocationLock); + + /* Let everybody know we've modified this slot */ + ConditionVariableBroadcast(&slot->active_cv); } /* * Find a previously created slot and mark it as used by this backend. */ void -ReplicationSlotAcquire(const char *name) +ReplicationSlotAcquire(const char *name, bool nowait) { - ReplicationSlot *slot = NULL; + ReplicationSlot *slot; + int active_pid; int i; - int active_pid = 0; /* Keep compiler quiet */ +retry: Assert(MyReplicationSlot == NULL); - /* Search for the named slot and mark it active if we find it. */ + /* + * Search for the named slot and mark it active if we find it. If the + * slot is already active, we exit the loop with active_pid set to the PID + * of the backend that owns it. + */ + active_pid = 0; + slot = NULL; LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) { @@ -339,35 +350,59 @@ ReplicationSlotAcquire(const char *name) if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0) { + /* Found the slot we want -- can we activate it? */ SpinLockAcquire(&s->mutex); + active_pid = s->active_pid; if (active_pid == 0) active_pid = s->active_pid = MyProcPid; + SpinLockRelease(&s->mutex); slot = s; + break; } } LWLockRelease(ReplicationSlotControlLock); - /* If we did not find the slot or it was already active, error out. */ + /* If we did not find the slot, error out. */ if (slot == NULL) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("replication slot \"%s\" does not exist", name))); + + /* + * If we found the slot but it's already active in another backend, we + * either error out or retry after a short wait, as caller specified. + */ if (active_pid != MyProcPid) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_IN_USE), - errmsg("replication slot \"%s\" is active for PID %d", - name, active_pid))); + { + if (nowait) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_IN_USE), + errmsg("replication slot \"%s\" is active for PID %d", + name, active_pid))); + + /* Wait here until we get signaled by whoever is active */ + ConditionVariablePrepareToSleep(&slot->active_cv); + ConditionVariableSleep(&slot->active_cv, PG_WAIT_LOCK); + ConditionVariableCancelSleep(); + + goto retry; + } + + /* Let everybody know we've modified this slot */ + ConditionVariableBroadcast(&slot->active_cv); /* We made this slot active, so it's ours now. */ MyReplicationSlot = slot; } /* - * Release a replication slot, this or another backend can ReAcquire it - * later. Resources this slot requires will be preserved. + * Release the replication slot that this backend considers to own. + * + * This or another backend can re-acquire the slot later. + * Resources this slot requires will be preserved. */ void ReplicationSlotRelease(void) @@ -385,17 +420,6 @@ ReplicationSlotRelease(void) */ ReplicationSlotDropAcquired(); } - else if (slot->data.persistency == RS_PERSISTENT) - { - /* - * Mark persistent slot inactive. We're not freeing it, just - * disconnecting. - */ - SpinLockAcquire(&slot->mutex); - slot->active_pid = 0; - SpinLockRelease(&slot->mutex); - } - /* * If slot needed to temporarily restrain both data and catalog xmin to @@ -412,6 +436,18 @@ ReplicationSlotRelease(void) ReplicationSlotsComputeRequiredXmin(false); } + if (slot->data.persistency == RS_PERSISTENT) + { + /* + * Mark persistent slot inactive. We're not freeing it, just + * disconnecting, but wake up others that may be waiting for it. + */ + SpinLockAcquire(&slot->mutex); + slot->active_pid = 0; + SpinLockRelease(&slot->mutex); + ConditionVariableBroadcast(&slot->active_cv); + } + MyReplicationSlot = NULL; /* might not have been set when we've been a plain slot */ @@ -430,32 +466,43 @@ ReplicationSlotCleanup(void) Assert(MyReplicationSlot == NULL); - /* - * No need for locking as we are only interested in slots active in - * current process and those are not touched by other processes. - */ +restart: + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + if (!s->in_use) + continue; + + SpinLockAcquire(&s->mutex); if (s->active_pid == MyProcPid) { - Assert(s->in_use && s->data.persistency == RS_TEMPORARY); + Assert(s->data.persistency == RS_TEMPORARY); + SpinLockRelease(&s->mutex); + LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */ ReplicationSlotDropPtr(s); + + ConditionVariableBroadcast(&s->active_cv); + goto restart; } + else + SpinLockRelease(&s->mutex); } + + LWLockRelease(ReplicationSlotControlLock); } /* * Permanently drop replication slot identified by the passed in name. */ void -ReplicationSlotDrop(const char *name) +ReplicationSlotDrop(const char *name, bool nowait) { Assert(MyReplicationSlot == NULL); - ReplicationSlotAcquire(name); + ReplicationSlotAcquire(name, nowait); ReplicationSlotDropAcquired(); } @@ -527,6 +574,9 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) slot->active_pid = 0; SpinLockRelease(&slot->mutex); + /* wake up anyone waiting on this slot */ + ConditionVariableBroadcast(&slot->active_cv); + ereport(fail_softly ? WARNING : ERROR, (errcode_for_file_access(), errmsg("could not rename file \"%s\" to \"%s\": %m", @@ -535,15 +585,18 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) /* * The slot is definitely gone. Lock out concurrent scans of the array - * long enough to kill it. It's OK to clear the active flag here without + * long enough to kill it. It's OK to clear the active PID here without * grabbing the mutex because nobody else can be scanning the array here, * and nobody can be attached to this slot and thus access it without * scanning the array. + * + * Also wake up processes waiting for it. */ LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); slot->active_pid = 0; slot->in_use = false; LWLockRelease(ReplicationSlotControlLock); + ConditionVariableBroadcast(&slot->active_cv); /* * Slot is dead and doesn't prevent resource removal anymore, recompute diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 6dc808874d..d4cbd83bde 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -171,7 +171,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) CheckSlotRequirements(); - ReplicationSlotDrop(NameStr(*name)); + ReplicationSlotDrop(NameStr(*name), false); PG_RETURN_VOID(); } @@ -221,6 +221,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) MemoryContextSwitchTo(oldcontext); + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (slotno = 0; slotno < max_replication_slots; slotno++) { ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno]; @@ -238,25 +239,21 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) NameData plugin; int i; - SpinLockAcquire(&slot->mutex); if (!slot->in_use) - { - SpinLockRelease(&slot->mutex); continue; - } - else - { - xmin = slot->data.xmin; - catalog_xmin = slot->data.catalog_xmin; - database = slot->data.database; - restart_lsn = slot->data.restart_lsn; - confirmed_flush_lsn = slot->data.confirmed_flush; - namecpy(&slot_name, &slot->data.name); - namecpy(&plugin, &slot->data.plugin); - active_pid = slot->active_pid; - persistency = slot->data.persistency; - } + SpinLockAcquire(&slot->mutex); + + xmin = slot->data.xmin; + catalog_xmin = slot->data.catalog_xmin; + database = slot->data.database; + restart_lsn = slot->data.restart_lsn; + confirmed_flush_lsn = slot->data.confirmed_flush; + namecpy(&slot_name, &slot->data.name); + namecpy(&plugin, &slot->data.plugin); + active_pid = slot->active_pid; + persistency = slot->data.persistency; + SpinLockRelease(&slot->mutex); memset(nulls, 0, sizeof(nulls)); @@ -309,6 +306,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) tuplestore_putvalues(tupstore, tupdesc, values, nulls); } + LWLockRelease(ReplicationSlotControlLock); tuplestore_donestoring(tupstore); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 002143b26a..9a2babef1e 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -541,7 +541,7 @@ StartReplication(StartReplicationCmd *cmd) if (cmd->slotname) { - ReplicationSlotAcquire(cmd->slotname); + ReplicationSlotAcquire(cmd->slotname, true); if (SlotIsLogical(MyReplicationSlot)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), @@ -1028,7 +1028,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) static void DropReplicationSlot(DropReplicationSlotCmd *cmd) { - ReplicationSlotDrop(cmd->slotname); + ReplicationSlotDrop(cmd->slotname, false); EndCommand("DROP_REPLICATION_SLOT", DestRemote); } @@ -1046,7 +1046,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) Assert(!MyReplicationSlot); - ReplicationSlotAcquire(cmd->slotname); + ReplicationSlotAcquire(cmd->slotname, true); /* * Force a disconnect, so that the decoding code doesn't need to care diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index a283f4e2b8..0bf2611fe9 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -12,6 +12,7 @@ #include "fmgr.h" #include "access/xlog.h" #include "access/xlogreader.h" +#include "storage/condition_variable.h" #include "storage/lwlock.h" #include "storage/shmem.h" #include "storage/spin.h" @@ -19,7 +20,7 @@ /* * Behaviour of replication slots, upon release or crash. * - * Slots marked as PERSISTENT are crashsafe and will not be dropped when + * Slots marked as PERSISTENT are crash-safe and will not be dropped when * released. Slots marked as EPHEMERAL will be dropped when released or after * restarts. * @@ -117,6 +118,9 @@ typedef struct ReplicationSlot /* is somebody performing io on this slot? */ LWLock io_in_progress_lock; + /* Condition variable signalled when active_pid changes */ + ConditionVariable active_cv; + /* all the remaining data is only used for logical slots */ /* @@ -162,9 +166,9 @@ extern void ReplicationSlotsShmemInit(void); extern void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency p); extern void ReplicationSlotPersist(void); -extern void ReplicationSlotDrop(const char *name); +extern void ReplicationSlotDrop(const char *name, bool nowait); -extern void ReplicationSlotAcquire(const char *name); +extern void ReplicationSlotAcquire(const char *name, bool nowait); extern void ReplicationSlotRelease(void); extern void ReplicationSlotCleanup(void); extern void ReplicationSlotSave(void); -- 2.11.0