From 22517556da34075ca1a3daadebc9a45ae58c6fa4 Mon Sep 17 00:00:00 2001 From: bdrouvotAWS Date: Wed, 15 Sep 2021 11:07:03 +0000 Subject: [PATCH v24 6/6] Fixing Walsender corner cases with logical decoding on standby. The problem is that WalSndWaitForWal() waits for the *replay* LSN to increase, but gets woken up by walreceiver when new WAL has been flushed. Which means that typically walsenders will get woken up at the same time that the startup process will be - which means that by the time the logical walsender checks GetXLogReplayRecPtr() it's unlikely that the startup process already replayed the record and updated XLogCtl->lastReplayedEndRecPtr. Fixed by making used of a condition variable on the replay position. --- src/backend/access/transam/xlog.c | 20 +++++++++++++---- src/backend/replication/walsender.c | 29 ++++++++++++++++++------- src/backend/utils/activity/wait_event.c | 3 +++ src/include/replication/walsender.h | 12 ++++++++++ src/include/utils/wait_event.h | 1 + 5 files changed, 53 insertions(+), 12 deletions(-) 29.6% src/backend/access/transam/ 52.7% src/backend/replication/ 4.1% src/backend/utils/activity/ 11.7% src/include/replication/ diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index c25a22a0c9..9843d2d6bf 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -727,6 +727,7 @@ typedef struct XLogCtlData } XLogCtlData; static XLogCtlData *XLogCtl = NULL; +XLogCtlCvData *XLogCtlCv = NULL; /* a private copy of XLogCtl->Insert.WALInsertLocks, for convenience */ static WALInsertLockPadded *WALInsertLocks = NULL; @@ -5141,7 +5142,8 @@ void XLOGShmemInit(void) { bool foundCFile, - foundXLog; + foundXLog, + foundXLogCv; char *allocptr; int i; ControlFileData *localControlFile; @@ -5166,14 +5168,17 @@ XLOGShmemInit(void) XLogCtl = (XLogCtlData *) ShmemInitStruct("XLOG Ctl", XLOGShmemSize(), &foundXLog); + XLogCtlCv = (XLogCtlCvData *) + ShmemInitStruct("XLOG Cv Ctl", sizeof(XLogCtlCvData), &foundXLogCv); + localControlFile = ControlFile; ControlFile = (ControlFileData *) ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile); - if (foundCFile || foundXLog) + if (foundCFile || foundXLog || foundXLogCv) { - /* both should be present or neither */ - Assert(foundCFile && foundXLog); + /* All should be present or neither */ + Assert(foundCFile && foundXLog && foundXLogCv); /* Initialize local copy of WALInsertLocks */ WALInsertLocks = XLogCtl->Insert.WALInsertLocks; @@ -5183,6 +5188,7 @@ XLOGShmemInit(void) return; } memset(XLogCtl, 0, sizeof(XLogCtlData)); + memset(XLogCtlCv, 0, sizeof(XLogCtlCvData)); /* * Already have read control file locally, unless in bootstrap mode. Move @@ -5244,6 +5250,7 @@ XLOGShmemInit(void) SpinLockInit(&XLogCtl->ulsn_lck); InitSharedLatch(&XLogCtl->recoveryWakeupLatch); ConditionVariableInit(&XLogCtl->recoveryNotPausedCV); + ConditionVariableInit(&XLogCtlCv->replayedCV); } /* @@ -7542,6 +7549,11 @@ StartupXLOG(void) XLogCtl->lastReplayedTLI = ThisTimeLineID; SpinLockRelease(&XLogCtl->info_lck); + /* + * wake up walsender(s) used by logical decoding on standby. + */ + ConditionVariableBroadcast(&XLogCtlCv->replayedCV); + /* * If rm_redo called XLogRequestWalReceiverReply, then we wake * up the receiver so that it notices the updated diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index da2533e1c9..4d07d28a31 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1394,6 +1394,7 @@ WalSndWaitForWal(XLogRecPtr loc) { int wakeEvents; static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr; + XLogCtlCvData *xlogctlcv = XLogCtlCv; /* * Fast path to avoid acquiring the spinlock in case we already know we @@ -1412,7 +1413,6 @@ WalSndWaitForWal(XLogRecPtr loc) for (;;) { - long sleeptime; /* Clear any already-pending wakeups */ ResetLatch(MyLatch); @@ -1496,20 +1496,33 @@ WalSndWaitForWal(XLogRecPtr loc) WalSndKeepaliveIfNecessary(); /* - * Sleep until something happens or we time out. Also wait for the - * socket becoming writable, if there's still pending output. + * When not in recovery, sleep until something happens or we time out. + * Also wait for the socket becoming writable, if there's still pending output. * Otherwise we might sit on sendable output data while waiting for * new WAL to be generated. (But if we have nothing to send, we don't * want to wake on socket-writable.) */ - sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); + if (!RecoveryInProgress()) + { + long sleeptime; + sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); - wakeEvents = WL_SOCKET_READABLE; + wakeEvents = WL_SOCKET_READABLE; - if (pq_is_send_pending()) - wakeEvents |= WL_SOCKET_WRITEABLE; + if (pq_is_send_pending()) + wakeEvents |= WL_SOCKET_WRITEABLE; - WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL); + WalSndWait(wakeEvents, sleeptime * 10, WAIT_EVENT_WAL_SENDER_WAIT_WAL); + } + else + /* + * We are in the logical decoding on standby case. + * We are waiting for the startup process to replay wal record(s). + */ + { + ConditionVariablePrepareToSleep(&xlogctlcv->replayedCV); + ConditionVariableSleep(&xlogctlcv->replayedCV, WAIT_EVENT_WAL_SENDER_WAIT_REPLAY); + } } /* reactivate latch so WalSndLoop knows to continue */ diff --git a/src/backend/utils/activity/wait_event.c b/src/backend/utils/activity/wait_event.c index ef7e6bfb77..6e74e60630 100644 --- a/src/backend/utils/activity/wait_event.c +++ b/src/backend/utils/activity/wait_event.c @@ -448,6 +448,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_WAL_RECEIVER_WAIT_START: event_name = "WalReceiverWaitStart"; break; + case WAIT_EVENT_WAL_SENDER_WAIT_REPLAY: + event_name = "WalReceiverWaitReplay"; + break; case WAIT_EVENT_XACT_GROUP_UPDATE: event_name = "XactGroupUpdate"; break; diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 828106933c..7a2c04c937 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -13,6 +13,7 @@ #define _WALSENDER_H #include +#include "storage/condition_variable.h" /* * What to do with a snapshot in create replication slot command. @@ -48,6 +49,17 @@ extern void WalSndWaitStopping(void); extern void HandleWalSndInitStopping(void); extern void WalSndRqstFileReload(void); +/* + * shared-memory state for Condition Variable(s) + * between the startup process and the walsender. + */ +typedef struct XLogCtlCvData +{ + ConditionVariable replayedCV; +} XLogCtlCvData; + +extern XLogCtlCvData *XLogCtlCv; + /* * Remember that we want to wakeup walsenders later * diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h index 6007827b44..53d8ce85c5 100644 --- a/src/include/utils/wait_event.h +++ b/src/include/utils/wait_event.h @@ -125,6 +125,7 @@ typedef enum WAIT_EVENT_SYNC_REP, WAIT_EVENT_WAL_RECEIVER_EXIT, WAIT_EVENT_WAL_RECEIVER_WAIT_START, + WAIT_EVENT_WAL_SENDER_WAIT_REPLAY, WAIT_EVENT_XACT_GROUP_UPDATE } WaitEventIPC; -- 2.18.4