From 05a2391a5eaf0ce98612900915933690ad99e5b8 Mon Sep 17 00:00:00 2001 From: bdrouvotAWS Date: Tue, 3 Jan 2023 06:52:31 +0000 Subject: [PATCH v36 6/6] Fixing Walsender corner case with logical decoding on standby. The problem is that WalSndWaitForWal() waits for the *replay* LSN to increase, but gets woken up by walreceiver when new WAL has been flushed. Which means that typically walsenders will get woken up at the same time that the startup process will be - which means that by the time the logical walsender checks GetXLogReplayRecPtr() it's unlikely that the startup process already replayed the record and updated XLogCtl->lastReplayedEndRecPtr. Introducing a new condition variable to fix this corner case. --- src/backend/access/transam/xlogrecovery.c | 28 ++++++++++++++++++++ src/backend/replication/walsender.c | 31 +++++++++++++++++------ src/backend/utils/activity/wait_event.c | 3 +++ src/include/access/xlogrecovery.h | 3 +++ src/include/replication/walsender.h | 1 + src/include/utils/wait_event.h | 1 + 6 files changed, 59 insertions(+), 8 deletions(-) 41.2% src/backend/access/transam/ 48.5% src/backend/replication/ 3.6% src/backend/utils/activity/ 3.4% src/include/access/ diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index bc3c3eb3e7..98c96eb864 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -358,6 +358,9 @@ typedef struct XLogRecoveryCtlData RecoveryPauseState recoveryPauseState; ConditionVariable recoveryNotPausedCV; + /* Replay state (see getReplayedCV() for more explanation) */ + ConditionVariable replayedCV; + slock_t info_lck; /* locks shared variables shown above */ } XLogRecoveryCtlData; @@ -467,6 +470,7 @@ XLogRecoveryShmemInit(void) SpinLockInit(&XLogRecoveryCtl->info_lck); InitSharedLatch(&XLogRecoveryCtl->recoveryWakeupLatch); ConditionVariableInit(&XLogRecoveryCtl->recoveryNotPausedCV); + ConditionVariableInit(&XLogRecoveryCtl->replayedCV); } /* @@ -1916,6 +1920,11 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl XLogRecoveryCtl->lastReplayedTLI = *replayTLI; SpinLockRelease(&XLogRecoveryCtl->info_lck); + /* + * wake up walsender(s) used by logical decoding on standby. + */ + ConditionVariableBroadcast(&XLogRecoveryCtl->replayedCV); + /* * If rm_redo called XLogRequestWalReceiverReply, then we wake up the * receiver so that it notices the updated lastReplayedEndRecPtr and sends @@ -4916,3 +4925,22 @@ assign_recovery_target_xid(const char *newval, void *extra) else recoveryTarget = RECOVERY_TARGET_UNSET; } + +/* + * Return the ConditionVariable indicating that a replay has been done. + * + * This is needed for logical decoding on standby. Indeed the "problem" is that + * WalSndWaitForWal() waits for the *replay* LSN to increase, but gets woken up + * by walreceiver when new WAL has been flushed. Which means that typically + * walsenders will get woken up at the same time that the startup process + * will be - which means that by the time the logical walsender checks + * GetXLogReplayRecPtr() it's unlikely that the startup process already replayed + * the record and updated XLogCtl->lastReplayedEndRecPtr. + * + * The ConditionVariable XLogRecoveryCtl->replayedCV solves this corner case. + */ +ConditionVariable * +getReplayedCV(void) +{ + return &XLogRecoveryCtl->replayedCV; +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index e89c210a8e..b0b6d6ffc7 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1548,6 +1548,7 @@ WalSndWaitForWal(XLogRecPtr loc) { int wakeEvents; static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr; + ConditionVariable *replayedCV = getReplayedCV(); /* * Fast path to avoid acquiring the spinlock in case we already know we @@ -1566,7 +1567,6 @@ WalSndWaitForWal(XLogRecPtr loc) for (;;) { - long sleeptime; /* Clear any already-pending wakeups */ ResetLatch(MyLatch); @@ -1650,20 +1650,35 @@ WalSndWaitForWal(XLogRecPtr loc) WalSndKeepaliveIfNecessary(); /* - * Sleep until something happens or we time out. Also wait for the - * socket becoming writable, if there's still pending output. + * When not in recovery, sleep until something happens or we time out. + * Also wait for the socket becoming writable, if there's still pending output. * Otherwise we might sit on sendable output data while waiting for * new WAL to be generated. (But if we have nothing to send, we don't * want to wake on socket-writable.) */ - sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); + if (!RecoveryInProgress()) + { + long sleeptime; + sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); - wakeEvents = WL_SOCKET_READABLE; + wakeEvents = WL_SOCKET_READABLE; - if (pq_is_send_pending()) - wakeEvents |= WL_SOCKET_WRITEABLE; + if (pq_is_send_pending()) + wakeEvents |= WL_SOCKET_WRITEABLE; - WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL); + WalSndWait(wakeEvents, sleeptime * 10, WAIT_EVENT_WAL_SENDER_WAIT_WAL); + } + else + /* + * We are in the logical decoding on standby case. + * We are waiting for the startup process to replay wal record(s) using + * a timeout in case we are requested to stop. + */ + { + ConditionVariablePrepareToSleep(replayedCV); + ConditionVariableTimedSleep(replayedCV, 1000, + WAIT_EVENT_WAL_SENDER_WAIT_REPLAY); + } } /* reactivate latch so WalSndLoop knows to continue */ diff --git a/src/backend/utils/activity/wait_event.c b/src/backend/utils/activity/wait_event.c index f9574e800f..8ac9a614e2 100644 --- a/src/backend/utils/activity/wait_event.c +++ b/src/backend/utils/activity/wait_event.c @@ -457,6 +457,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_WAL_RECEIVER_WAIT_START: event_name = "WalReceiverWaitStart"; break; + case WAIT_EVENT_WAL_SENDER_WAIT_REPLAY: + event_name = "WalReceiverWaitReplay"; + break; case WAIT_EVENT_XACT_GROUP_UPDATE: event_name = "XactGroupUpdate"; break; diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h index 47c29350f5..b65c2cf1f0 100644 --- a/src/include/access/xlogrecovery.h +++ b/src/include/access/xlogrecovery.h @@ -15,6 +15,7 @@ #include "catalog/pg_control.h" #include "lib/stringinfo.h" #include "utils/timestamp.h" +#include "storage/condition_variable.h" /* * Recovery target type. @@ -155,4 +156,6 @@ extern void RecoveryRequiresIntParameter(const char *param_name, int currValue, extern void xlog_outdesc(StringInfo buf, XLogReaderState *record); +extern ConditionVariable *getReplayedCV(void); + #endif /* XLOGRECOVERY_H */ diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 52bb3e2aae..2fd745fe72 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -13,6 +13,7 @@ #define _WALSENDER_H #include +#include "storage/condition_variable.h" /* * What to do with a snapshot in create replication slot command. diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h index f53254ad1f..1eb441dffe 100644 --- a/src/include/utils/wait_event.h +++ b/src/include/utils/wait_event.h @@ -128,6 +128,7 @@ typedef enum WAIT_EVENT_SYNC_REP, WAIT_EVENT_WAL_RECEIVER_EXIT, WAIT_EVENT_WAL_RECEIVER_WAIT_START, + WAIT_EVENT_WAL_SENDER_WAIT_REPLAY, WAIT_EVENT_XACT_GROUP_UPDATE } WaitEventIPC; -- 2.34.1