From fa17810dee089ccfb8c058e25b7804a47f01f67f Mon Sep 17 00:00:00 2001 From: bdrouvotAWS Date: Tue, 7 Feb 2023 09:00:29 +0000 Subject: [PATCH v48 4/6] Fixing Walsender corner case with logical decoding on standby. The problem is that WalSndWaitForWal() waits for the *replay* LSN to increase, but gets woken up by walreceiver when new WAL has been flushed. Which means that typically walsenders will get woken up at the same time that the startup process will be - which means that by the time the logical walsender checks GetXLogReplayRecPtr() it's unlikely that the startup process already replayed the record and updated XLogCtl->lastReplayedEndRecPtr. Introducing a new condition variable to fix this corner case. --- src/backend/access/transam/xlogrecovery.c | 28 +++++++++++++++++++ src/backend/replication/walsender.c | 34 +++++++++++++++++------ src/backend/utils/activity/wait_event.c | 3 ++ src/include/access/xlogrecovery.h | 3 ++ src/include/replication/walsender.h | 1 + src/include/utils/wait_event.h | 1 + 6 files changed, 62 insertions(+), 8 deletions(-) 43.2% src/backend/access/transam/ 46.1% src/backend/replication/ 3.8% src/backend/utils/activity/ 3.7% src/include/access/ 3.1% src/include/ diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index dbe9394762..8a9505a52d 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -358,6 +358,9 @@ typedef struct XLogRecoveryCtlData RecoveryPauseState recoveryPauseState; ConditionVariable recoveryNotPausedCV; + /* Replay state (see check_for_replay() for more explanation) */ + ConditionVariable replayedCV; + slock_t info_lck; /* locks shared variables shown above */ } XLogRecoveryCtlData; @@ -468,6 +471,7 @@ XLogRecoveryShmemInit(void) SpinLockInit(&XLogRecoveryCtl->info_lck); InitSharedLatch(&XLogRecoveryCtl->recoveryWakeupLatch); ConditionVariableInit(&XLogRecoveryCtl->recoveryNotPausedCV); + ConditionVariableInit(&XLogRecoveryCtl->replayedCV); } /* @@ -1935,6 +1939,11 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl XLogRecoveryCtl->lastReplayedTLI = *replayTLI; SpinLockRelease(&XLogRecoveryCtl->info_lck); + /* + * wake up walsender(s) used by logical decoding on standby. + */ + ConditionVariableBroadcast(&XLogRecoveryCtl->replayedCV); + /* * If rm_redo called XLogRequestWalReceiverReply, then we wake up the * receiver so that it notices the updated lastReplayedEndRecPtr and sends @@ -4942,3 +4951,22 @@ assign_recovery_target_xid(const char *newval, void *extra) else recoveryTarget = RECOVERY_TARGET_UNSET; } + +/* + * Return the ConditionVariable indicating that a replay has been done. + * + * This is needed for logical decoding on standby. Indeed the "problem" is that + * WalSndWaitForWal() waits for the *replay* LSN to increase, but gets woken up + * by walreceiver when new WAL has been flushed. Which means that typically + * walsenders will get woken up at the same time that the startup process + * will be - which means that by the time the logical walsender checks + * GetXLogReplayRecPtr() it's unlikely that the startup process already replayed + * the record and updated XLogCtl->lastReplayedEndRecPtr. + * + * The ConditionVariable XLogRecoveryCtl->replayedCV solves this corner case. + */ +ConditionVariable * +check_for_replay(void) +{ + return &XLogRecoveryCtl->replayedCV; +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 1e91cbc564..3fc7b42d15 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1552,6 +1552,7 @@ WalSndWaitForWal(XLogRecPtr loc) { int wakeEvents; static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr; + ConditionVariable *replayedCV = check_for_replay(); /* * Fast path to avoid acquiring the spinlock in case we already know we @@ -1566,10 +1567,15 @@ WalSndWaitForWal(XLogRecPtr loc) if (!RecoveryInProgress()) RecentFlushPtr = GetFlushRecPtr(NULL); else + { RecentFlushPtr = GetXLogReplayRecPtr(NULL); + /* Prepare the replayedCV to sleep */ + ConditionVariablePrepareToSleep(replayedCV); + } for (;;) { + long sleeptime; /* Clear any already-pending wakeups */ @@ -1653,21 +1659,33 @@ WalSndWaitForWal(XLogRecPtr loc) /* Send keepalive if the time has come */ WalSndKeepaliveIfNecessary(); + sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); /* - * Sleep until something happens or we time out. Also wait for the - * socket becoming writable, if there's still pending output. + * When not in recovery, sleep until something happens or we time out. + * Also wait for the socket becoming writable, if there's still pending output. * Otherwise we might sit on sendable output data while waiting for * new WAL to be generated. (But if we have nothing to send, we don't * want to wake on socket-writable.) */ - sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); - - wakeEvents = WL_SOCKET_READABLE; + if (!RecoveryInProgress()) + { + wakeEvents = WL_SOCKET_READABLE; - if (pq_is_send_pending()) - wakeEvents |= WL_SOCKET_WRITEABLE; + if (pq_is_send_pending()) + wakeEvents |= WL_SOCKET_WRITEABLE; - WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL); + WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL); + } + else + { + /* + * We are in the logical decoding on standby case. + * We are waiting for the startup process to replay wal record(s) using + * a timeout in case we are requested to stop. + */ + ConditionVariableTimedSleep(replayedCV, sleeptime, + WAIT_EVENT_WAL_SENDER_WAIT_REPLAY); + } } /* reactivate latch so WalSndLoop knows to continue */ diff --git a/src/backend/utils/activity/wait_event.c b/src/backend/utils/activity/wait_event.c index 6e4599278c..38c747b786 100644 --- a/src/backend/utils/activity/wait_event.c +++ b/src/backend/utils/activity/wait_event.c @@ -463,6 +463,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_WAL_RECEIVER_WAIT_START: event_name = "WalReceiverWaitStart"; break; + case WAIT_EVENT_WAL_SENDER_WAIT_REPLAY: + event_name = "WalReceiverWaitReplay"; + break; case WAIT_EVENT_XACT_GROUP_UPDATE: event_name = "XactGroupUpdate"; break; diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h index 47c29350f5..2bfeaaa00f 100644 --- a/src/include/access/xlogrecovery.h +++ b/src/include/access/xlogrecovery.h @@ -15,6 +15,7 @@ #include "catalog/pg_control.h" #include "lib/stringinfo.h" #include "utils/timestamp.h" +#include "storage/condition_variable.h" /* * Recovery target type. @@ -155,4 +156,6 @@ extern void RecoveryRequiresIntParameter(const char *param_name, int currValue, extern void xlog_outdesc(StringInfo buf, XLogReaderState *record); +extern ConditionVariable *check_for_replay(void); + #endif /* XLOGRECOVERY_H */ diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 52bb3e2aae..2fd745fe72 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -13,6 +13,7 @@ #define _WALSENDER_H #include +#include "storage/condition_variable.h" /* * What to do with a snapshot in create replication slot command. diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h index 6cacd6edaf..04a37feee4 100644 --- a/src/include/utils/wait_event.h +++ b/src/include/utils/wait_event.h @@ -130,6 +130,7 @@ typedef enum WAIT_EVENT_SYNC_REP, WAIT_EVENT_WAL_RECEIVER_EXIT, WAIT_EVENT_WAL_RECEIVER_WAIT_START, + WAIT_EVENT_WAL_SENDER_WAIT_REPLAY, WAIT_EVENT_XACT_GROUP_UPDATE } WaitEventIPC; -- 2.34.1