diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 505445f2dc..b89b6da768 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -99,6 +99,8 @@ ReplicationSlot *MyReplicationSlot = NULL; int max_replication_slots = 0; /* the maximum number of replication * slots */ +static bool ReplicationSlotIsActive(ReplicationSlot *slot, int *active_pid); +static void DeactivateReplicationSlot(ReplicationSlot *slot); static void ReplicationSlotDropAcquired(void); static void ReplicationSlotDropPtr(ReplicationSlot *slot); @@ -1080,6 +1082,61 @@ ReplicationSlotReserveWal(void) } } +/* + * Is the specified replication slot currently actively being used? + * + * Set *active_pid to the PID of the process using this slot if active. + */ +static bool +ReplicationSlotIsActive(ReplicationSlot *slot, int *active_pid) +{ + int pid; + + SpinLockAcquire(&slot->mutex); + pid = slot->active_pid; + SpinLockRelease(&slot->mutex); + + if (active_pid != NULL) + *active_pid = pid; + + return (pid != 0); +} + +/* + * Deactivate the specified replication slot. + * + * Terminate the process using this slot if active. + */ +static void +DeactivateReplicationSlot(ReplicationSlot *slot) +{ + int active_pid; + bool killed = false; + + /* Quick exit if already inactive */ + if (!ReplicationSlotIsActive(slot, &active_pid)) + return; + + ereport(LOG, + (errmsg("terminating the process %d using replication slot \"%s\"", + active_pid, NameStr(slot->data.name)))); + + ConditionVariablePrepareToSleep(&slot->active_cv); + do + { + /* + * Signal to terminate the process using the replication slot. + * + * Try to signal every 100ms until it succeeds. + */ + if (!killed && kill(active_pid, SIGTERM) == 0) + killed = true; + ConditionVariableTimedSleep(&slot->active_cv, 100, + WAIT_EVENT_REPLICATION_SLOT_DROP); + } while (ReplicationSlotIsActive(slot, NULL)); + ConditionVariableCancelSleep(); +} + /* * Mark any slot that points to an LSN older than the given segment * as invalid; it requires WAL that's about to be removed. @@ -1105,37 +1162,18 @@ restart: continue; SpinLockAcquire(&s->mutex); - if (s->data.restart_lsn == InvalidXLogRecPtr || - s->data.restart_lsn >= oldestLSN) - { - SpinLockRelease(&s->mutex); - continue; - } - slotname = s->data.name; restart_lsn = s->data.restart_lsn; - SpinLockRelease(&s->mutex); + + if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN) + continue; LWLockRelease(ReplicationSlotControlLock); - for (;;) - { - int wspid = ReplicationSlotAcquire(NameStr(slotname), - SAB_Inquire); - - /* no walsender? success! */ - if (wspid == 0) - break; - - ereport(LOG, - (errmsg("terminating walsender %d because replication slot \"%s\" is too far behind", - wspid, NameStr(slotname)))); - (void) kill(wspid, SIGTERM); - - ConditionVariableTimedSleep(&s->active_cv, 10, - WAIT_EVENT_REPLICATION_SLOT_DROP); - } - ConditionVariableCancelSleep(); + DeactivateReplicationSlot(s); + SpinLockAcquire(&s->mutex); + s->data.restart_lsn = InvalidXLogRecPtr; + SpinLockRelease(&s->mutex); ereport(LOG, (errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size", @@ -1143,11 +1181,6 @@ restart: (uint32) (restart_lsn >> 32), (uint32) restart_lsn))); - SpinLockAcquire(&s->mutex); - s->data.restart_lsn = InvalidXLogRecPtr; - SpinLockRelease(&s->mutex); - ReplicationSlotRelease(); - /* if we did anything, start from scratch */ CHECK_FOR_INTERRUPTS(); goto restart;