diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index dbe9394762..8a9505a52d 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -358,6 +358,9 @@ typedef struct XLogRecoveryCtlData RecoveryPauseState recoveryPauseState; ConditionVariable recoveryNotPausedCV; + /* Replay state (see check_for_replay() for more explanation) */ + ConditionVariable replayedCV; + slock_t info_lck; /* locks shared variables shown above */ } XLogRecoveryCtlData; @@ -468,6 +471,7 @@ XLogRecoveryShmemInit(void) SpinLockInit(&XLogRecoveryCtl->info_lck); InitSharedLatch(&XLogRecoveryCtl->recoveryWakeupLatch); ConditionVariableInit(&XLogRecoveryCtl->recoveryNotPausedCV); + ConditionVariableInit(&XLogRecoveryCtl->replayedCV); } /* @@ -1935,6 +1939,11 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl XLogRecoveryCtl->lastReplayedTLI = *replayTLI; SpinLockRelease(&XLogRecoveryCtl->info_lck); + /* + * wake up walsender(s) used by logical decoding on standby. + */ + ConditionVariableBroadcast(&XLogRecoveryCtl->replayedCV); + /* * If rm_redo called XLogRequestWalReceiverReply, then we wake up the * receiver so that it notices the updated lastReplayedEndRecPtr and sends @@ -4942,3 +4951,22 @@ assign_recovery_target_xid(const char *newval, void *extra) else recoveryTarget = RECOVERY_TARGET_UNSET; } + +/* + * Return the ConditionVariable indicating that a replay has been done. + * + * This is needed for logical decoding on standby. Indeed the "problem" is that + * WalSndWaitForWal() waits for the *replay* LSN to increase, but gets woken up + * by walreceiver when new WAL has been flushed. Which means that typically + * walsenders will get woken up at the same time that the startup process + * will be - which means that by the time the logical walsender checks + * GetXLogReplayRecPtr() it's unlikely that the startup process already replayed + * the record and updated XLogCtl->lastReplayedEndRecPtr. + * + * The ConditionVariable XLogRecoveryCtl->replayedCV solves this corner case. + */ +ConditionVariable * +check_for_replay(void) +{ + return &XLogRecoveryCtl->replayedCV; +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 3042e5bd64..05350bb535 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1552,6 +1552,17 @@ WalSndWaitForWal(XLogRecPtr loc) { int wakeEvents; static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr; + ConditionVariable *replayedCV = check_for_replay(); + + /* + * Prepare the replayedCV to sleep. Note that this is enough to be added + * in the wait queue and then waked up (while in WalSndWait() below) + * by ConditionVariableBroadcast() during the WAL replay. Also Note that + * if awakaned by the CV broadcast we'll exit the loop right after due to + * the loc <= RecentFlushPtr test done in the loop. Indeed, CV brodcast + * would mean that a replay occured. + */ + ConditionVariablePrepareToSleep(replayedCV); /* * Fast path to avoid acquiring the spinlock in case we already know we @@ -1670,6 +1681,7 @@ WalSndWaitForWal(XLogRecPtr loc) WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL); } + ConditionVariableCancelSleep(); /* reactivate latch so WalSndLoop knows to continue */ SetLatch(MyLatch); return RecentFlushPtr; diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h index 47c29350f5..2bfeaaa00f 100644 --- a/src/include/access/xlogrecovery.h +++ b/src/include/access/xlogrecovery.h @@ -15,6 +15,7 @@ #include "catalog/pg_control.h" #include "lib/stringinfo.h" #include "utils/timestamp.h" +#include "storage/condition_variable.h" /* * Recovery target type. @@ -155,4 +156,6 @@ extern void RecoveryRequiresIntParameter(const char *param_name, int currValue, extern void xlog_outdesc(StringInfo buf, XLogReaderState *record); +extern ConditionVariable *check_for_replay(void); + #endif /* XLOGRECOVERY_H */ diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 52bb3e2aae..2fd745fe72 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -13,6 +13,7 @@ #define _WALSENDER_H #include +#include "storage/condition_variable.h" /* * What to do with a snapshot in create replication slot command.