From d59f72dc62cf2922800a99ac001fb7c9aeaaab72 Mon Sep 17 00:00:00 2001 From: bdrouvotAWS Date: Thu, 1 Dec 2022 12:34:59 +0000 Subject: [PATCH v27 6/6] Fixing Walsender corner case with logical decoding on standby. The problem is that WalSndWaitForWal() waits for the *replay* LSN to increase, but gets woken up by walreceiver when new WAL has been flushed. Which means that typically walsenders will get woken up at the same time that the startup process will be - which means that by the time the logical walsender checks GetXLogReplayRecPtr() it's unlikely that the startup process already replayed the record and updated XLogCtl->lastReplayedEndRecPtr. Introducing a new condition variable to fix this corner case. --- src/backend/access/transam/xlogrecovery.c | 28 ++++++++++++++++++++ src/backend/replication/walsender.c | 31 +++++++++++++++++------ src/backend/utils/activity/wait_event.c | 3 +++ src/include/access/xlogrecovery.h | 3 +++ src/include/replication/walsender.h | 1 + src/include/utils/wait_event.h | 1 + 6 files changed, 59 insertions(+), 8 deletions(-) 41.2% src/backend/access/transam/ 48.5% src/backend/replication/ 3.6% src/backend/utils/activity/ 3.4% src/include/access/ diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 41ffc57da9..0337fdc77a 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -358,6 +358,9 @@ typedef struct XLogRecoveryCtlData RecoveryPauseState recoveryPauseState; ConditionVariable recoveryNotPausedCV; + /* Replay state (see getReplayedCV() for more explanation) */ + ConditionVariable replayedCV; + slock_t info_lck; /* locks shared variables shown above */ } XLogRecoveryCtlData; @@ -467,6 +470,7 @@ XLogRecoveryShmemInit(void) SpinLockInit(&XLogRecoveryCtl->info_lck); InitSharedLatch(&XLogRecoveryCtl->recoveryWakeupLatch); ConditionVariableInit(&XLogRecoveryCtl->recoveryNotPausedCV); + ConditionVariableInit(&XLogRecoveryCtl->replayedCV); } /* @@ -1916,6 +1920,11 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl XLogRecoveryCtl->lastReplayedTLI = *replayTLI; SpinLockRelease(&XLogRecoveryCtl->info_lck); + /* + * wake up walsender(s) used by logical decoding on standby. + */ + ConditionVariableBroadcast(&XLogRecoveryCtl->replayedCV); + /* * If rm_redo called XLogRequestWalReceiverReply, then we wake up the * receiver so that it notices the updated lastReplayedEndRecPtr and sends @@ -4911,3 +4920,22 @@ assign_recovery_target_xid(const char *newval, void *extra) else recoveryTarget = RECOVERY_TARGET_UNSET; } + +/* + * Return the ConditionVariable indicating that a replay has been done. + * + * This is needed for logical decoding on standby. Indeed the "problem" is that + * WalSndWaitForWal() waits for the *replay* LSN to increase, but gets woken up + * by walreceiver when new WAL has been flushed. Which means that typically + * walsenders will get woken up at the same time that the startup process + * will be - which means that by the time the logical walsender checks + * GetXLogReplayRecPtr() it's unlikely that the startup process already replayed + * the record and updated XLogCtl->lastReplayedEndRecPtr. + * + * The ConditionVariable XLogRecoveryCtl->replayedCV solves this corner case. + */ +ConditionVariable * +getReplayedCV(void) +{ + return &XLogRecoveryCtl->replayedCV; +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 9662e316c9..8c8dbe812f 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1548,6 +1548,7 @@ WalSndWaitForWal(XLogRecPtr loc) { int wakeEvents; static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr; + ConditionVariable *replayedCV = getReplayedCV(); /* * Fast path to avoid acquiring the spinlock in case we already know we @@ -1566,7 +1567,6 @@ WalSndWaitForWal(XLogRecPtr loc) for (;;) { - long sleeptime; /* Clear any already-pending wakeups */ ResetLatch(MyLatch); @@ -1650,20 +1650,35 @@ WalSndWaitForWal(XLogRecPtr loc) WalSndKeepaliveIfNecessary(); /* - * Sleep until something happens or we time out. Also wait for the - * socket becoming writable, if there's still pending output. + * When not in recovery, sleep until something happens or we time out. + * Also wait for the socket becoming writable, if there's still pending output. * Otherwise we might sit on sendable output data while waiting for * new WAL to be generated. (But if we have nothing to send, we don't * want to wake on socket-writable.) */ - sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); + if (!RecoveryInProgress()) + { + long sleeptime; + sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); - wakeEvents = WL_SOCKET_READABLE; + wakeEvents = WL_SOCKET_READABLE; - if (pq_is_send_pending()) - wakeEvents |= WL_SOCKET_WRITEABLE; + if (pq_is_send_pending()) + wakeEvents |= WL_SOCKET_WRITEABLE; - WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL); + WalSndWait(wakeEvents, sleeptime * 10, WAIT_EVENT_WAL_SENDER_WAIT_WAL); + } + else + /* + * We are in the logical decoding on standby case. + * We are waiting for the startup process to replay wal record(s) using + * a timeout in case we are requested to stop. + */ + { + ConditionVariablePrepareToSleep(replayedCV); + ConditionVariableTimedSleep(replayedCV, 1000, + WAIT_EVENT_WAL_SENDER_WAIT_REPLAY); + } } /* reactivate latch so WalSndLoop knows to continue */ diff --git a/src/backend/utils/activity/wait_event.c b/src/backend/utils/activity/wait_event.c index b2abd75ddb..3f6059805a 100644 --- a/src/backend/utils/activity/wait_event.c +++ b/src/backend/utils/activity/wait_event.c @@ -457,6 +457,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_WAL_RECEIVER_WAIT_START: event_name = "WalReceiverWaitStart"; break; + case WAIT_EVENT_WAL_SENDER_WAIT_REPLAY: + event_name = "WalReceiverWaitReplay"; + break; case WAIT_EVENT_XACT_GROUP_UPDATE: event_name = "XactGroupUpdate"; break; diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h index f3398425d8..0afd57ecac 100644 --- a/src/include/access/xlogrecovery.h +++ b/src/include/access/xlogrecovery.h @@ -15,6 +15,7 @@ #include "catalog/pg_control.h" #include "lib/stringinfo.h" #include "utils/timestamp.h" +#include "storage/condition_variable.h" /* * Recovery target type. @@ -155,4 +156,6 @@ extern void RecoveryRequiresIntParameter(const char *param_name, int currValue, extern void xlog_outdesc(StringInfo buf, XLogReaderState *record); +extern ConditionVariable *getReplayedCV(void); + #endif /* XLOGRECOVERY_H */ diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 8336a6e719..550ef3107f 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -13,6 +13,7 @@ #define _WALSENDER_H #include +#include "storage/condition_variable.h" /* * What to do with a snapshot in create replication slot command. diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h index 0b2100be4a..30c2cf35ae 100644 --- a/src/include/utils/wait_event.h +++ b/src/include/utils/wait_event.h @@ -128,6 +128,7 @@ typedef enum WAIT_EVENT_SYNC_REP, WAIT_EVENT_WAL_RECEIVER_EXIT, WAIT_EVENT_WAL_RECEIVER_WAIT_START, + WAIT_EVENT_WAL_SENDER_WAIT_REPLAY, WAIT_EVENT_XACT_GROUP_UPDATE } WaitEventIPC; -- 2.34.1