From d4423857bd73c4d87b17a0dac74388f664421e18 Mon Sep 17 00:00:00 2001 From: Bertrand Drouvot Date: Mon, 6 Mar 2023 08:17:52 +0000 Subject: [PATCH v99 4/6] Fixing Walsender corner case with logical decoding on standby. The problem is that WalSndWaitForWal() waits for the *replay* LSN to increase, but gets woken up by walreceiver when new WAL has been flushed. Which means that typically walsenders will get woken up at the same time that the startup process will be - which means that by the time the logical walsender checks GetXLogReplayRecPtr() it's unlikely that the startup process already replayed the record and updated XLogCtl->lastReplayedEndRecPtr. Introducing a new condition variable and a new API ConditionVariableEventSleep() to fix this corner case. --- doc/src/sgml/monitoring.sgml | 4 + src/backend/access/transam/xlogrecovery.c | 28 ++++ src/backend/libpq/pqcomm.c | 14 +- src/backend/replication/walsender.c | 17 ++- src/backend/storage/lmgr/condition_variable.c | 124 +++++++++++++++--- src/backend/storage/lmgr/proc.c | 6 + src/backend/utils/activity/wait_event.c | 3 + src/backend/utils/init/miscinit.c | 1 + src/include/access/xlogrecovery.h | 3 + src/include/libpq/libpq.h | 6 +- src/include/replication/walsender.h | 1 + src/include/storage/condition_variable.h | 10 ++ src/include/utils/wait_event.h | 1 + 13 files changed, 189 insertions(+), 29 deletions(-) 13.5% src/backend/access/transam/ 8.5% src/backend/libpq/ 6.5% src/backend/replication/ 58.6% src/backend/storage/lmgr/ 4.4% src/include/storage/ 4.4% src/include/ diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index cdf7c09b4b..9af8d58da2 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1857,6 +1857,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser Waiting for startup process to send initial data for streaming replication. + + WalSenderWaitReplay + Waiting for startup process to replay write-ahead log. + XactGroupUpdate Waiting for the group leader to update transaction status at diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index dbe9394762..8a9505a52d 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -358,6 +358,9 @@ typedef struct XLogRecoveryCtlData RecoveryPauseState recoveryPauseState; ConditionVariable recoveryNotPausedCV; + /* Replay state (see check_for_replay() for more explanation) */ + ConditionVariable replayedCV; + slock_t info_lck; /* locks shared variables shown above */ } XLogRecoveryCtlData; @@ -468,6 +471,7 @@ XLogRecoveryShmemInit(void) SpinLockInit(&XLogRecoveryCtl->info_lck); InitSharedLatch(&XLogRecoveryCtl->recoveryWakeupLatch); ConditionVariableInit(&XLogRecoveryCtl->recoveryNotPausedCV); + ConditionVariableInit(&XLogRecoveryCtl->replayedCV); } /* @@ -1935,6 +1939,11 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl XLogRecoveryCtl->lastReplayedTLI = *replayTLI; SpinLockRelease(&XLogRecoveryCtl->info_lck); + /* + * wake up walsender(s) used by logical decoding on standby. + */ + ConditionVariableBroadcast(&XLogRecoveryCtl->replayedCV); + /* * If rm_redo called XLogRequestWalReceiverReply, then we wake up the * receiver so that it notices the updated lastReplayedEndRecPtr and sends @@ -4942,3 +4951,22 @@ assign_recovery_target_xid(const char *newval, void *extra) else recoveryTarget = RECOVERY_TARGET_UNSET; } + +/* + * Return the ConditionVariable indicating that a replay has been done. + * + * This is needed for logical decoding on standby. Indeed the "problem" is that + * WalSndWaitForWal() waits for the *replay* LSN to increase, but gets woken up + * by walreceiver when new WAL has been flushed. Which means that typically + * walsenders will get woken up at the same time that the startup process + * will be - which means that by the time the logical walsender checks + * GetXLogReplayRecPtr() it's unlikely that the startup process already replayed + * the record and updated XLogCtl->lastReplayedEndRecPtr. + * + * The ConditionVariable XLogRecoveryCtl->replayedCV solves this corner case. + */ +ConditionVariable * +check_for_replay(void) +{ + return &XLogRecoveryCtl->replayedCV; +} diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index da5bb5fc5d..babd0b6c4e 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -80,6 +80,7 @@ #include "storage/ipc.h" #include "utils/guc_hooks.h" #include "utils/memutils.h" +#include "storage/condition_variable.h" /* * Cope with the various platform-specific ways to spell TCP keepalive socket @@ -172,7 +173,6 @@ void pq_init(void) { int socket_pos PG_USED_FOR_ASSERTS_ONLY; - int latch_pos PG_USED_FOR_ASSERTS_ONLY; /* initialize state variables */ PqSendBufferSize = PQ_SEND_BUFFER_SIZE; @@ -207,20 +207,14 @@ pq_init(void) elog(FATAL, "fcntl(F_SETFD) failed on socket: %m"); #endif - FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, FeBeWaitSetNEvents); + FeBeWaitSet = ConditionVariableWaitSetCreate(TopMemoryContext, FeBeWaitSetNEvents); socket_pos = AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock, NULL, NULL); - latch_pos = AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, PGINVALID_SOCKET, - MyLatch, NULL); - AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, - NULL, NULL); - /* - * The event positions match the order we added them, but let's sanity - * check them to be sure. + * The socket_pos matches the order we added it, but let's sanity + * check it to be sure. */ Assert(socket_pos == FeBeWaitSetSocketPos); - Assert(latch_pos == FeBeWaitSetLatchPos); } /* -------------------------------- diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 3042e5bd64..89d1a36e6a 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1551,7 +1551,9 @@ static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc) { int wakeEvents; + uint32 wait_event; static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr; + ConditionVariable *cv = NULL; /* * Fast path to avoid acquiring the spinlock in case we already know we @@ -1564,9 +1566,20 @@ WalSndWaitForWal(XLogRecPtr loc) /* Get a more recent flush pointer. */ if (!RecoveryInProgress()) + { RecentFlushPtr = GetFlushRecPtr(NULL); + wait_event = WAIT_EVENT_WAL_SENDER_WAIT_WAL; + } else + { RecentFlushPtr = GetXLogReplayRecPtr(NULL); + wait_event = WAIT_EVENT_WAL_SENDER_WAIT_REPLAY; + cv = check_for_replay(); + } + + /* Prepare the cv to sleep */ + if (cv) + ConditionVariablePrepareToSleep(cv); for (;;) { @@ -1667,9 +1680,11 @@ WalSndWaitForWal(XLogRecPtr loc) if (pq_is_send_pending()) wakeEvents |= WL_SOCKET_WRITEABLE; - WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL); + ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, wakeEvents, NULL); + ConditionVariableEventSleep(cv, FeBeWaitSet, sleeptime, wait_event); } + ConditionVariableCancelSleep(); /* reactivate latch so WalSndLoop knows to continue */ SetLatch(MyLatch); return RecentFlushPtr; diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c index 7e2bbf46d9..766f1bd7b2 100644 --- a/src/backend/storage/lmgr/condition_variable.c +++ b/src/backend/storage/lmgr/condition_variable.c @@ -27,9 +27,29 @@ #include "storage/spin.h" #include "utils/memutils.h" +#define ConditionVariableWaitSetLatchPos 0 + /* Initially, we are not prepared to sleep on any condition variable. */ static ConditionVariable *cv_sleep_target = NULL; +/* Used by ConditionVariableSleep() and ConditionVariableTimedSleep(). */ +static WaitEventSet *ConditionVariableWaitSet = NULL; + +/* + * Initialize the process-local condition variable WaitEventSet. + * + * This must be called once during startup of any process that can wait on + * condition variables, before it issues any ConditionVariableInit() calls. + */ +void +InitializeConditionVariableWaitSet(void) +{ + Assert(ConditionVariableWaitSet == NULL); + + ConditionVariableWaitSet = ConditionVariableWaitSetCreate( + TopMemoryContext, 0); +} + /* * Initialize a condition variable. */ @@ -40,6 +60,51 @@ ConditionVariableInit(ConditionVariable *cv) proclist_init(&cv->wakeup); } +/* + * Create a WaitEventSet for ConditionVariableEventSleep(). This should be + * used when the caller of ConditionVariableEventSleep() would like to wake up + * on either the condition variable signal or a socket event. For example: + * + * ConditionVariableInit(&cv); + * waitset = ConditionVariableWaitSetCreate(mcxt, 1); + * event_pos = AddWaitEventToSet(waitset, 0, sock, NULL, NULL); + * ... + * ConditionVariablePrepareToSleep(&cv); + * while (...condition not met...) + * { + * socket_wait_events = ... + * ModifyWaitEvent(waitset, event_pos, socket_wait_events, NULL); + * ConditionVariableEventSleep(&cv, waitset, ...); + * } + * ConditionVariableCancelSleep(); + * + * The waitset is created with the standard events for a condition variable, + * and room for adding n_socket_events additional socket events. The + * initially-filled event positions should not be modified, but added socket + * events can be modified. The same waitset can be used for multiple condition + * variables as long as the callers of ConditionVariableEventSleep() are + * interested in the same sockets. + */ +WaitEventSet * +ConditionVariableWaitSetCreate(MemoryContext mcxt, int n_socket_events) +{ + int latch_pos PG_USED_FOR_ASSERTS_ONLY; + int n_cv_events = IsUnderPostmaster ? 2 : 1; + int nevents = n_cv_events + n_socket_events; + WaitEventSet *waitset = CreateWaitEventSet(mcxt, nevents); + + latch_pos = AddWaitEventToSet(waitset, WL_LATCH_SET, PGINVALID_SOCKET, + MyLatch, NULL); + + if (IsUnderPostmaster) + AddWaitEventToSet(waitset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, + NULL, NULL); + + Assert(latch_pos == ConditionVariableWaitSetLatchPos); + + return waitset; +} + /* * Prepare to wait on a given condition variable. * @@ -97,7 +162,8 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv) void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info) { - (void) ConditionVariableTimedSleep(cv, -1 /* no timeout */ , + (void) ConditionVariableEventSleep(cv, ConditionVariableWaitSet, + -1 /* no timeout */ , wait_event_info); } @@ -111,11 +177,27 @@ ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info) bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, uint32 wait_event_info) +{ + return ConditionVariableEventSleep(cv, ConditionVariableWaitSet, timeout, + wait_event_info); +} + +/* + * Wait for a condition variable to be signaled, a timeout to be reached, or a + * socket event in the given waitset. The waitset must have been created by + * ConditionVariableWaitSetCreate(). + * + * Returns true when timeout expires, otherwise returns false. + * + * See ConditionVariableSleep() for general usage. + */ +bool +ConditionVariableEventSleep(ConditionVariable *cv, WaitEventSet *waitset, + long timeout, uint32 wait_event_info) { long cur_timeout = -1; instr_time start_time; instr_time cur_time; - int wait_events; /* * If the caller didn't prepare to sleep explicitly, then do so now and @@ -132,7 +214,7 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, * If we are currently prepared to sleep on some other CV, we just cancel * that and prepare this one; see ConditionVariablePrepareToSleep. */ - if (cv_sleep_target != cv) + if (cv && cv_sleep_target != cv) { ConditionVariablePrepareToSleep(cv); return false; @@ -147,24 +229,28 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, INSTR_TIME_SET_CURRENT(start_time); Assert(timeout >= 0 && timeout <= INT_MAX); cur_timeout = timeout; - wait_events = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH; } - else - wait_events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH; while (true) { bool done = false; + WaitEvent cvEvent; + int nevents; /* - * Wait for latch to be set. (If we're awakened for some other - * reason, the code below will cope anyway.) + * Wait for latch to be set, or other events which will be handled + * below. */ - (void) WaitLatch(MyLatch, wait_events, cur_timeout, wait_event_info); + nevents = WaitEventSetWait(waitset, cur_timeout, &cvEvent, + 1, wait_event_info); /* Reset latch before examining the state of the wait list. */ ResetLatch(MyLatch); + /* If a socket event occurred, no need to check wait list. */ + if (nevents == 1 && (cvEvent.events & WL_SOCKET_MASK) != 0) + return true; + /* * If this process has been taken out of the wait list, then we know * that it has been signaled by ConditionVariableSignal (or @@ -180,13 +266,21 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, * by something other than ConditionVariableSignal; though we don't * guarantee not to return spuriously, we'll avoid this obvious case. */ - SpinLockAcquire(&cv->mutex); - if (!proclist_contains(&cv->wakeup, MyProc->pgprocno, cvWaitLink)) + + if (cv) { - done = true; - proclist_push_tail(&cv->wakeup, MyProc->pgprocno, cvWaitLink); + SpinLockAcquire(&cv->mutex); + if (!proclist_contains(&cv->wakeup, MyProc->pgprocno, cvWaitLink)) + { + done = true; + proclist_push_tail(&cv->wakeup, MyProc->pgprocno, cvWaitLink); + } + SpinLockRelease(&cv->mutex); } - SpinLockRelease(&cv->mutex); + + /* Note for the POC: If we are not waiting on a CV or have just been promoted. */ + if (!cv || (cv && !RecoveryInProgress())) + done = true; /* * Check for interrupts, and return spuriously if that caused the @@ -194,7 +288,7 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, * waited for a different condition variable). */ CHECK_FOR_INTERRUPTS(); - if (cv != cv_sleep_target) + if (cv && cv != cv_sleep_target) done = true; /* We were signaled, so return */ diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 22b4278610..ae4a7aecd4 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -440,6 +440,9 @@ InitProcess(void) OwnLatch(&MyProc->procLatch); SwitchToSharedLatch(); + /* Initialize process-local condition variable support */ + InitializeConditionVariableWaitSet(); + /* now that we have a proc, report wait events to shared memory */ pgstat_set_wait_event_storage(&MyProc->wait_event_info); @@ -596,6 +599,9 @@ InitAuxiliaryProcess(void) OwnLatch(&MyProc->procLatch); SwitchToSharedLatch(); + /* Initialize process-local condition variable support */ + InitializeConditionVariableWaitSet(); + /* now that we have a proc, report wait events to shared memory */ pgstat_set_wait_event_storage(&MyProc->wait_event_info); diff --git a/src/backend/utils/activity/wait_event.c b/src/backend/utils/activity/wait_event.c index cb99cc6339..a10dcd4e61 100644 --- a/src/backend/utils/activity/wait_event.c +++ b/src/backend/utils/activity/wait_event.c @@ -466,6 +466,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_WAL_RECEIVER_WAIT_START: event_name = "WalReceiverWaitStart"; break; + case WAIT_EVENT_WAL_SENDER_WAIT_REPLAY: + event_name = "WalSenderWaitReplay"; + break; case WAIT_EVENT_XACT_GROUP_UPDATE: event_name = "XactGroupUpdate"; break; diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c index 7eb7fe87f6..d07d24bc45 100644 --- a/src/backend/utils/init/miscinit.c +++ b/src/backend/utils/init/miscinit.c @@ -40,6 +40,7 @@ #include "postmaster/interrupt.h" #include "postmaster/pgarch.h" #include "postmaster/postmaster.h" +#include "storage/condition_variable.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/latch.h" diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h index 47c29350f5..2bfeaaa00f 100644 --- a/src/include/access/xlogrecovery.h +++ b/src/include/access/xlogrecovery.h @@ -15,6 +15,7 @@ #include "catalog/pg_control.h" #include "lib/stringinfo.h" #include "utils/timestamp.h" +#include "storage/condition_variable.h" /* * Recovery target type. @@ -155,4 +156,6 @@ extern void RecoveryRequiresIntParameter(const char *param_name, int currValue, extern void xlog_outdesc(StringInfo buf, XLogReaderState *record); +extern ConditionVariable *check_for_replay(void); + #endif /* XLOGRECOVERY_H */ diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h index 50fc781f47..33eddc7d40 100644 --- a/src/include/libpq/libpq.h +++ b/src/include/libpq/libpq.h @@ -60,9 +60,9 @@ extern const PGDLLIMPORT PQcommMethods *PqCommMethods; */ extern PGDLLIMPORT WaitEventSet *FeBeWaitSet; -#define FeBeWaitSetSocketPos 0 -#define FeBeWaitSetLatchPos 1 -#define FeBeWaitSetNEvents 3 +#define FeBeWaitSetLatchPos 0 +#define FeBeWaitSetSocketPos 2 +#define FeBeWaitSetNEvents 1 extern int StreamServerPort(int family, const char *hostName, unsigned short portNumber, const char *unixSocketDir, diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 52bb3e2aae..2fd745fe72 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -13,6 +13,7 @@ #define _WALSENDER_H #include +#include "storage/condition_variable.h" /* * What to do with a snapshot in create replication slot command. diff --git a/src/include/storage/condition_variable.h b/src/include/storage/condition_variable.h index 589bdd323c..94adb54b91 100644 --- a/src/include/storage/condition_variable.h +++ b/src/include/storage/condition_variable.h @@ -22,6 +22,7 @@ #ifndef CONDITION_VARIABLE_H #define CONDITION_VARIABLE_H +#include "storage/latch.h" #include "storage/proclist_types.h" #include "storage/spin.h" @@ -42,9 +43,14 @@ typedef union ConditionVariableMinimallyPadded char pad[CV_MINIMAL_SIZE]; } ConditionVariableMinimallyPadded; +extern void InitializeConditionVariableWaitSet(void); + /* Initialize a condition variable. */ extern void ConditionVariableInit(ConditionVariable *cv); +extern WaitEventSet *ConditionVariableWaitSetCreate(MemoryContext mcxt, + int n_socket_events); + /* * To sleep on a condition variable, a process should use a loop which first * checks the condition, exiting the loop if it is met, and then calls @@ -56,6 +62,10 @@ extern void ConditionVariableInit(ConditionVariable *cv); extern void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info); extern bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout, uint32 wait_event_info); +extern bool ConditionVariableEventSleep(ConditionVariable *cv, + WaitEventSet *cvEventSet, + long timeout, + uint32 wait_event_info); extern void ConditionVariableCancelSleep(void); /* diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h index 9ab23e1c4a..548ef41dca 100644 --- a/src/include/utils/wait_event.h +++ b/src/include/utils/wait_event.h @@ -131,6 +131,7 @@ typedef enum WAIT_EVENT_SYNC_REP, WAIT_EVENT_WAL_RECEIVER_EXIT, WAIT_EVENT_WAL_RECEIVER_WAIT_START, + WAIT_EVENT_WAL_SENDER_WAIT_REPLAY, WAIT_EVENT_XACT_GROUP_UPDATE } WaitEventIPC; -- 2.34.1