diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 6332ca5d53..ce53a236cc 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -727,6 +727,7 @@ typedef struct XLogCtlData } XLogCtlData; static XLogCtlData *XLogCtl = NULL; +XLogCtlCvData *XLogCtlCv = NULL; /* a private copy of XLogCtl->Insert.WALInsertLocks, for convenience */ static WALInsertLockPadded *WALInsertLocks = NULL; @@ -5141,7 +5142,8 @@ void XLOGShmemInit(void) { bool foundCFile, - foundXLog; + foundXLog, + foundXLogCv; char *allocptr; int i; ControlFileData *localControlFile; @@ -5166,14 +5168,17 @@ XLOGShmemInit(void) XLogCtl = (XLogCtlData *) ShmemInitStruct("XLOG Ctl", XLOGShmemSize(), &foundXLog); + XLogCtlCv = (XLogCtlCvData *) + ShmemInitStruct("XLOG Cv Ctl", sizeof(XLogCtlCvData), &foundXLogCv); + localControlFile = ControlFile; ControlFile = (ControlFileData *) ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile); - if (foundCFile || foundXLog) + if (foundCFile || foundXLog || foundXLogCv) { - /* both should be present or neither */ - Assert(foundCFile && foundXLog); + /* All should be present or neither */ + Assert(foundCFile && foundXLog && foundXLogCv); /* Initialize local copy of WALInsertLocks */ WALInsertLocks = XLogCtl->Insert.WALInsertLocks; @@ -5183,6 +5188,7 @@ XLOGShmemInit(void) return; } memset(XLogCtl, 0, sizeof(XLogCtlData)); + memset(XLogCtlCv, 0, sizeof(XLogCtlCvData)); /* * Already have read control file locally, unless in bootstrap mode. Move @@ -5244,6 +5250,7 @@ XLOGShmemInit(void) SpinLockInit(&XLogCtl->ulsn_lck); InitSharedLatch(&XLogCtl->recoveryWakeupLatch); ConditionVariableInit(&XLogCtl->recoveryNotPausedCV); + ConditionVariableInit(&XLogCtlCv->replayedCV); } /* @@ -7533,6 +7540,11 @@ StartupXLOG(void) XLogCtl->lastReplayedTLI = ThisTimeLineID; SpinLockRelease(&XLogCtl->info_lck); + /* + * wake up walsender(s) used by logical decoding on standby. + */ + ConditionVariableBroadcast(&XLogCtlCv->replayedCV); + /* * If rm_redo called XLogRequestWalReceiverReply, then we wake * up the receiver so that it notices the updated diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index da2533e1c9..4d07d28a31 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1394,6 +1394,7 @@ WalSndWaitForWal(XLogRecPtr loc) { int wakeEvents; static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr; + XLogCtlCvData *xlogctlcv = XLogCtlCv; /* * Fast path to avoid acquiring the spinlock in case we already know we @@ -1412,7 +1413,6 @@ WalSndWaitForWal(XLogRecPtr loc) for (;;) { - long sleeptime; /* Clear any already-pending wakeups */ ResetLatch(MyLatch); @@ -1496,20 +1496,33 @@ WalSndWaitForWal(XLogRecPtr loc) WalSndKeepaliveIfNecessary(); /* - * Sleep until something happens or we time out. Also wait for the - * socket becoming writable, if there's still pending output. + * When not in recovery, sleep until something happens or we time out. + * Also wait for the socket becoming writable, if there's still pending output. * Otherwise we might sit on sendable output data while waiting for * new WAL to be generated. (But if we have nothing to send, we don't * want to wake on socket-writable.) */ - sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); + if (!RecoveryInProgress()) + { + long sleeptime; + sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); - wakeEvents = WL_SOCKET_READABLE; + wakeEvents = WL_SOCKET_READABLE; - if (pq_is_send_pending()) - wakeEvents |= WL_SOCKET_WRITEABLE; + if (pq_is_send_pending()) + wakeEvents |= WL_SOCKET_WRITEABLE; - WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL); + WalSndWait(wakeEvents, sleeptime * 10, WAIT_EVENT_WAL_SENDER_WAIT_WAL); + } + else + /* + * We are in the logical decoding on standby case. + * We are waiting for the startup process to replay wal record(s). + */ + { + ConditionVariablePrepareToSleep(&xlogctlcv->replayedCV); + ConditionVariableSleep(&xlogctlcv->replayedCV, WAIT_EVENT_WAL_SENDER_WAIT_REPLAY); + } } /* reactivate latch so WalSndLoop knows to continue */ diff --git a/src/backend/utils/activity/wait_event.c b/src/backend/utils/activity/wait_event.c index ef7e6bfb77..6e74e60630 100644 --- a/src/backend/utils/activity/wait_event.c +++ b/src/backend/utils/activity/wait_event.c @@ -448,6 +448,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_WAL_RECEIVER_WAIT_START: event_name = "WalReceiverWaitStart"; break; + case WAIT_EVENT_WAL_SENDER_WAIT_REPLAY: + event_name = "WalReceiverWaitReplay"; + break; case WAIT_EVENT_XACT_GROUP_UPDATE: event_name = "XactGroupUpdate"; break; diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 828106933c..7a2c04c937 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -13,6 +13,7 @@ #define _WALSENDER_H #include +#include "storage/condition_variable.h" /* * What to do with a snapshot in create replication slot command. @@ -48,6 +49,17 @@ extern void WalSndWaitStopping(void); extern void HandleWalSndInitStopping(void); extern void WalSndRqstFileReload(void); +/* + * shared-memory state for Condition Variable(s) + * between the startup process and the walsender. + */ +typedef struct XLogCtlCvData +{ + ConditionVariable replayedCV; +} XLogCtlCvData; + +extern XLogCtlCvData *XLogCtlCv; + /* * Remember that we want to wakeup walsenders later * diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h index 6007827b44..53d8ce85c5 100644 --- a/src/include/utils/wait_event.h +++ b/src/include/utils/wait_event.h @@ -125,6 +125,7 @@ typedef enum WAIT_EVENT_SYNC_REP, WAIT_EVENT_WAL_RECEIVER_EXIT, WAIT_EVENT_WAL_RECEIVER_WAIT_START, + WAIT_EVENT_WAL_SENDER_WAIT_REPLAY, WAIT_EVENT_XACT_GROUP_UPDATE } WaitEventIPC;