From 41b83ef19ceb7ed8c970036e6fc09ebe0e4a9329 Mon Sep 17 00:00:00 2001 From: bdrouvotAWS Date: Tue, 7 Feb 2023 09:00:29 +0000 Subject: [PATCH v56 4/6] Fixing Walsender corner case with logical decoding on standby. The problem is that WalSndWaitForWal() waits for the *replay* LSN to increase, but gets woken up by walreceiver when new WAL has been flushed. Which means that typically walsenders will get woken up at the same time that the startup process will be - which means that by the time the logical walsender checks GetXLogReplayRecPtr() it's unlikely that the startup process already replayed the record and updated XLogCtl->lastReplayedEndRecPtr. Introducing a new condition variable to fix this corner case. --- src/backend/access/transam/xlogrecovery.c | 28 ++++++++++++++++ src/backend/replication/walsender.c | 41 +++++++++++++++++------ src/backend/utils/activity/wait_event.c | 3 ++ src/include/access/xlogrecovery.h | 3 ++ src/include/replication/walsender.h | 1 + src/include/utils/wait_event.h | 1 + 6 files changed, 66 insertions(+), 11 deletions(-) 37.5% src/backend/access/transam/ 53.1% src/backend/replication/ 3.3% src/backend/utils/activity/ 3.2% src/include/access/ diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index dbe9394762..8a9505a52d 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -358,6 +358,9 @@ typedef struct XLogRecoveryCtlData RecoveryPauseState recoveryPauseState; ConditionVariable recoveryNotPausedCV; + /* Replay state (see check_for_replay() for more explanation) */ + ConditionVariable replayedCV; + slock_t info_lck; /* locks shared variables shown above */ } XLogRecoveryCtlData; @@ -468,6 +471,7 @@ XLogRecoveryShmemInit(void) SpinLockInit(&XLogRecoveryCtl->info_lck); InitSharedLatch(&XLogRecoveryCtl->recoveryWakeupLatch); ConditionVariableInit(&XLogRecoveryCtl->recoveryNotPausedCV); + ConditionVariableInit(&XLogRecoveryCtl->replayedCV); } /* @@ -1935,6 +1939,11 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl XLogRecoveryCtl->lastReplayedTLI = *replayTLI; SpinLockRelease(&XLogRecoveryCtl->info_lck); + /* + * wake up walsender(s) used by logical decoding on standby. + */ + ConditionVariableBroadcast(&XLogRecoveryCtl->replayedCV); + /* * If rm_redo called XLogRequestWalReceiverReply, then we wake up the * receiver so that it notices the updated lastReplayedEndRecPtr and sends @@ -4942,3 +4951,22 @@ assign_recovery_target_xid(const char *newval, void *extra) else recoveryTarget = RECOVERY_TARGET_UNSET; } + +/* + * Return the ConditionVariable indicating that a replay has been done. + * + * This is needed for logical decoding on standby. Indeed the "problem" is that + * WalSndWaitForWal() waits for the *replay* LSN to increase, but gets woken up + * by walreceiver when new WAL has been flushed. Which means that typically + * walsenders will get woken up at the same time that the startup process + * will be - which means that by the time the logical walsender checks + * GetXLogReplayRecPtr() it's unlikely that the startup process already replayed + * the record and updated XLogCtl->lastReplayedEndRecPtr. + * + * The ConditionVariable XLogRecoveryCtl->replayedCV solves this corner case. + */ +ConditionVariable * +check_for_replay(void) +{ + return &XLogRecoveryCtl->replayedCV; +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index aeb5f93514..d85826d7f5 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1552,6 +1552,7 @@ WalSndWaitForWal(XLogRecPtr loc) { int wakeEvents; static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr; + ConditionVariable *replayedCV = check_for_replay(); /* * Fast path to avoid acquiring the spinlock in case we already know we @@ -1566,10 +1567,15 @@ WalSndWaitForWal(XLogRecPtr loc) if (!RecoveryInProgress()) RecentFlushPtr = GetFlushRecPtr(NULL); else + { RecentFlushPtr = GetXLogReplayRecPtr(NULL); + /* Prepare the replayedCV to sleep */ + ConditionVariablePrepareToSleep(replayedCV); + } for (;;) { + long sleeptime; /* Clear any already-pending wakeups */ @@ -1653,21 +1659,34 @@ WalSndWaitForWal(XLogRecPtr loc) /* Send keepalive if the time has come */ WalSndKeepaliveIfNecessary(); - /* - * Sleep until something happens or we time out. Also wait for the - * socket becoming writable, if there's still pending output. - * Otherwise we might sit on sendable output data while waiting for - * new WAL to be generated. (But if we have nothing to send, we don't - * want to wake on socket-writable.) - */ sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); - wakeEvents = WL_SOCKET_READABLE; + /* + * When not in recovery, sleep until something happens or we time out. + * Also wait for the socket becoming writable, if there's still + * pending output. Otherwise we might sit on sendable output data + * while waiting for new WAL to be generated. (But if we have nothing + * to send, we don't want to wake on socket-writable.) + */ + if (!RecoveryInProgress()) + { + wakeEvents = WL_SOCKET_READABLE; - if (pq_is_send_pending()) - wakeEvents |= WL_SOCKET_WRITEABLE; + if (pq_is_send_pending()) + wakeEvents |= WL_SOCKET_WRITEABLE; - WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL); + WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL); + } + else + { + /* + * We are in the logical decoding on standby case. We are waiting + * for the startup process to replay wal record(s) using a timeout + * in case we are requested to stop. + */ + ConditionVariableTimedSleep(replayedCV, sleeptime, + WAIT_EVENT_WAL_SENDER_WAIT_REPLAY); + } } /* reactivate latch so WalSndLoop knows to continue */ diff --git a/src/backend/utils/activity/wait_event.c b/src/backend/utils/activity/wait_event.c index 7940d64639..27fc8fab76 100644 --- a/src/backend/utils/activity/wait_event.c +++ b/src/backend/utils/activity/wait_event.c @@ -466,6 +466,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_WAL_RECEIVER_WAIT_START: event_name = "WalReceiverWaitStart"; break; + case WAIT_EVENT_WAL_SENDER_WAIT_REPLAY: + event_name = "WalReceiverWaitReplay"; + break; case WAIT_EVENT_XACT_GROUP_UPDATE: event_name = "XactGroupUpdate"; break; diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h index 47c29350f5..c8842d94bb 100644 --- a/src/include/access/xlogrecovery.h +++ b/src/include/access/xlogrecovery.h @@ -14,6 +14,7 @@ #include "access/xlogreader.h" #include "catalog/pg_control.h" #include "lib/stringinfo.h" +#include "storage/condition_variable.h" #include "utils/timestamp.h" /* @@ -155,4 +156,6 @@ extern void RecoveryRequiresIntParameter(const char *param_name, int currValue, extern void xlog_outdesc(StringInfo buf, XLogReaderState *record); +extern ConditionVariable *check_for_replay(void); + #endif /* XLOGRECOVERY_H */ diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 52bb3e2aae..2fd745fe72 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -13,6 +13,7 @@ #define _WALSENDER_H #include +#include "storage/condition_variable.h" /* * What to do with a snapshot in create replication slot command. diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h index 518d3b0a1f..e13abdc365 100644 --- a/src/include/utils/wait_event.h +++ b/src/include/utils/wait_event.h @@ -131,6 +131,7 @@ typedef enum WAIT_EVENT_SYNC_REP, WAIT_EVENT_WAL_RECEIVER_EXIT, WAIT_EVENT_WAL_RECEIVER_WAIT_START, + WAIT_EVENT_WAL_SENDER_WAIT_REPLAY, WAIT_EVENT_XACT_GROUP_UPDATE } WaitEventIPC; -- 2.34.1