From 9a820140b7356ab94479499a80fc4742403f3ca5 Mon Sep 17 00:00:00 2001 From: Bertrand Drouvot Date: Fri, 10 Mar 2023 10:58:23 +0000 Subject: [PATCH v99 5/7] Fixing Walsender corner case with logical decoding on standby. The problem is that WalSndWaitForWal() waits for the *replay* LSN to increase, but gets woken up by walreceiver when new WAL has been flushed. Which means that typically walsenders will get woken up at the same time that the startup process will be - which means that by the time the logical walsender checks GetXLogReplayRecPtr() it's unlikely that the startup process already replayed the record and updated XLogCtl->lastReplayedEndRecPtr. Introducing a new condition variable and a new API ConditionVariableEventSleep() to fix this corner case. --- doc/src/sgml/monitoring.sgml | 4 ++++ src/backend/access/transam/xlogrecovery.c | 28 +++++++++++++++++++++++ src/backend/replication/walsender.c | 18 ++++++++++++++- src/backend/utils/activity/wait_event.c | 3 +++ src/include/access/xlogrecovery.h | 3 +++ src/include/replication/walsender.h | 1 + src/include/utils/wait_event.h | 1 + 7 files changed, 57 insertions(+), 1 deletion(-) 7.8% doc/src/sgml/ 52.1% src/backend/access/transam/ 27.1% src/backend/replication/ 4.5% src/backend/utils/activity/ 4.5% src/include/access/ 3.7% src/include/ diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index cdf7c09b4b..9af8d58da2 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1857,6 +1857,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser Waiting for startup process to send initial data for streaming replication. + + WalSenderWaitReplay + Waiting for startup process to replay write-ahead log. + XactGroupUpdate Waiting for the group leader to update transaction status at diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index dbe9394762..8a9505a52d 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -358,6 +358,9 @@ typedef struct XLogRecoveryCtlData RecoveryPauseState recoveryPauseState; ConditionVariable recoveryNotPausedCV; + /* Replay state (see check_for_replay() for more explanation) */ + ConditionVariable replayedCV; + slock_t info_lck; /* locks shared variables shown above */ } XLogRecoveryCtlData; @@ -468,6 +471,7 @@ XLogRecoveryShmemInit(void) SpinLockInit(&XLogRecoveryCtl->info_lck); InitSharedLatch(&XLogRecoveryCtl->recoveryWakeupLatch); ConditionVariableInit(&XLogRecoveryCtl->recoveryNotPausedCV); + ConditionVariableInit(&XLogRecoveryCtl->replayedCV); } /* @@ -1935,6 +1939,11 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl XLogRecoveryCtl->lastReplayedTLI = *replayTLI; SpinLockRelease(&XLogRecoveryCtl->info_lck); + /* + * wake up walsender(s) used by logical decoding on standby. + */ + ConditionVariableBroadcast(&XLogRecoveryCtl->replayedCV); + /* * If rm_redo called XLogRequestWalReceiverReply, then we wake up the * receiver so that it notices the updated lastReplayedEndRecPtr and sends @@ -4942,3 +4951,22 @@ assign_recovery_target_xid(const char *newval, void *extra) else recoveryTarget = RECOVERY_TARGET_UNSET; } + +/* + * Return the ConditionVariable indicating that a replay has been done. + * + * This is needed for logical decoding on standby. Indeed the "problem" is that + * WalSndWaitForWal() waits for the *replay* LSN to increase, but gets woken up + * by walreceiver when new WAL has been flushed. Which means that typically + * walsenders will get woken up at the same time that the startup process + * will be - which means that by the time the logical walsender checks + * GetXLogReplayRecPtr() it's unlikely that the startup process already replayed + * the record and updated XLogCtl->lastReplayedEndRecPtr. + * + * The ConditionVariable XLogRecoveryCtl->replayedCV solves this corner case. + */ +ConditionVariable * +check_for_replay(void) +{ + return &XLogRecoveryCtl->replayedCV; +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 3042e5bd64..8ef22616bb 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1551,7 +1551,9 @@ static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc) { int wakeEvents; + uint32 wait_event; static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr; + ConditionVariable *cv = NULL; /* * Fast path to avoid acquiring the spinlock in case we already know we @@ -1564,9 +1566,20 @@ WalSndWaitForWal(XLogRecPtr loc) /* Get a more recent flush pointer. */ if (!RecoveryInProgress()) + { RecentFlushPtr = GetFlushRecPtr(NULL); + wait_event = WAIT_EVENT_WAL_SENDER_WAIT_WAL; + } else + { RecentFlushPtr = GetXLogReplayRecPtr(NULL); + wait_event = WAIT_EVENT_WAL_SENDER_WAIT_REPLAY; + cv = check_for_replay(); + } + + /* Prepare the cv to sleep */ + if (cv) + ConditionVariablePrepareToSleep(cv); for (;;) { @@ -1667,9 +1680,12 @@ WalSndWaitForWal(XLogRecPtr loc) if (pq_is_send_pending()) wakeEvents |= WL_SOCKET_WRITEABLE; - WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL); + ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, wakeEvents, NULL); + ConditionVariableEventSleep(cv, RecoveryInProgress, FeBeWaitSet, NULL, + sleeptime, wait_event); } + ConditionVariableCancelSleep(); /* reactivate latch so WalSndLoop knows to continue */ SetLatch(MyLatch); return RecentFlushPtr; diff --git a/src/backend/utils/activity/wait_event.c b/src/backend/utils/activity/wait_event.c index cb99cc6339..a10dcd4e61 100644 --- a/src/backend/utils/activity/wait_event.c +++ b/src/backend/utils/activity/wait_event.c @@ -466,6 +466,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_WAL_RECEIVER_WAIT_START: event_name = "WalReceiverWaitStart"; break; + case WAIT_EVENT_WAL_SENDER_WAIT_REPLAY: + event_name = "WalSenderWaitReplay"; + break; case WAIT_EVENT_XACT_GROUP_UPDATE: event_name = "XactGroupUpdate"; break; diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h index 47c29350f5..2bfeaaa00f 100644 --- a/src/include/access/xlogrecovery.h +++ b/src/include/access/xlogrecovery.h @@ -15,6 +15,7 @@ #include "catalog/pg_control.h" #include "lib/stringinfo.h" #include "utils/timestamp.h" +#include "storage/condition_variable.h" /* * Recovery target type. @@ -155,4 +156,6 @@ extern void RecoveryRequiresIntParameter(const char *param_name, int currValue, extern void xlog_outdesc(StringInfo buf, XLogReaderState *record); +extern ConditionVariable *check_for_replay(void); + #endif /* XLOGRECOVERY_H */ diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 52bb3e2aae..2fd745fe72 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -13,6 +13,7 @@ #define _WALSENDER_H #include +#include "storage/condition_variable.h" /* * What to do with a snapshot in create replication slot command. diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h index 9ab23e1c4a..548ef41dca 100644 --- a/src/include/utils/wait_event.h +++ b/src/include/utils/wait_event.h @@ -131,6 +131,7 @@ typedef enum WAIT_EVENT_SYNC_REP, WAIT_EVENT_WAL_RECEIVER_EXIT, WAIT_EVENT_WAL_RECEIVER_WAIT_START, + WAIT_EVENT_WAL_SENDER_WAIT_REPLAY, WAIT_EVENT_XACT_GROUP_UPDATE } WaitEventIPC; -- 2.34.1