From 5ab4bb1f376c0a0e7e270f4290668407abf40de9 Mon Sep 17 00:00:00 2001 From: Bertrand Drouvot Date: Mon, 3 Apr 2023 16:46:09 +0000 Subject: [PATCH v57 4/6] Fixing Walsender corner case with logical decoding on standby. The problem is that WalSndWaitForWal() waits for the *replay* LSN to increase, but gets woken up by walreceiver when new WAL has been flushed. Which means that typically walsenders will get woken up at the same time that the startup process will be - which means that by the time the logical walsender checks GetXLogReplayRecPtr() it's unlikely that the startup process already replayed the record and updated XLogCtl->lastReplayedEndRecPtr. Introducing a new replication_kind variable to the WalSnd struct and moved the call to WalSndWakeup() in ApplyWalRecord(). The new replication_kind variable helps to filter what kind of walsender we want to wakeup based on the code path. --- src/backend/access/transam/xlog.c | 6 +++--- src/backend/access/transam/xlogarchive.c | 2 +- src/backend/access/transam/xlogrecovery.c | 10 ++++------ src/backend/replication/walreceiver.c | 2 +- src/backend/replication/walsender.c | 13 +++++++++++-- src/include/replication/walsender.h | 20 ++++++++++---------- src/include/replication/walsender_private.h | 3 +++ 7 files changed, 33 insertions(+), 23 deletions(-) 32.5% src/backend/access/transam/ 28.5% src/backend/replication/ 38.9% src/include/replication/ diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 779f5c3711..70ac8fc33b 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -2645,7 +2645,7 @@ XLogFlush(XLogRecPtr record) END_CRIT_SECTION(); /* wake up walsenders now that we've released heavily contended locks */ - WalSndWakeupProcessRequests(); + WalSndWakeupProcessRequests(true, !RecoveryInProgress()); /* * If we still haven't flushed to the request point then we have a @@ -2816,7 +2816,7 @@ XLogBackgroundFlush(void) END_CRIT_SECTION(); /* wake up walsenders now that we've released heavily contended locks */ - WalSndWakeupProcessRequests(); + WalSndWakeupProcessRequests(true, !RecoveryInProgress()); /* * Great, done. To take some work off the critical path, try to initialize @@ -5773,7 +5773,7 @@ StartupXLOG(void) * If there were cascading standby servers connected to us, nudge any wal * sender processes to notice that we've been promoted. */ - WalSndWakeup(); + WalSndWakeup(true, true); /* * If this was a promotion, request an (online) checkpoint now. This isn't diff --git a/src/backend/access/transam/xlogarchive.c b/src/backend/access/transam/xlogarchive.c index a0f5aa24b5..d06fdc74c0 100644 --- a/src/backend/access/transam/xlogarchive.c +++ b/src/backend/access/transam/xlogarchive.c @@ -421,7 +421,7 @@ KeepFileRestoredFromArchive(const char *path, const char *xlogfname) * if we restored something other than a WAL segment, but it does no harm * either. */ - WalSndWakeup(); + WalSndWakeup(true, true); } /* diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index dbe9394762..18551cc3b3 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -1935,6 +1935,10 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl XLogRecoveryCtl->lastReplayedTLI = *replayTLI; SpinLockRelease(&XLogRecoveryCtl->info_lck); + /* Wakeup walsender(s) */ + WalSndWakeup(switchedTLI && AllowCascadeReplication(), + switchedTLI || RecoveryInProgress()); + /* * If rm_redo called XLogRequestWalReceiverReply, then we wake up the * receiver so that it notices the updated lastReplayedEndRecPtr and sends @@ -1958,12 +1962,6 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl */ RemoveNonParentXlogFiles(xlogreader->EndRecPtr, *replayTLI); - /* - * Wake up any walsenders to notice that we are on a new timeline. - */ - if (AllowCascadeReplication()) - WalSndWakeup(); - /* Reset the prefetcher. */ XLogPrefetchReconfigure(); } diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 685af51d5d..d2aa93734c 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -1010,7 +1010,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli) /* Signal the startup process and walsender that new WAL has arrived */ WakeupRecovery(); if (AllowCascadeReplication()) - WalSndWakeup(); + WalSndWakeup(true, !RecoveryInProgress()); /* Report XLOG streaming progress in PS display */ if (update_process_title) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index aeb5f93514..d5d1d5600c 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2626,6 +2626,12 @@ InitWalSenderSlot(void) walsnd->sync_standby_priority = 0; walsnd->latch = &MyProc->procLatch; walsnd->replyTime = 0; + + if (MyDatabaseId == InvalidOid) + walsnd->kind = REPLICATION_KIND_PHYSICAL; + else + walsnd->kind = REPLICATION_KIND_LOGICAL; + SpinLockRelease(&walsnd->mutex); /* don't need the lock anymore */ MyWalSnd = (WalSnd *) walsnd; @@ -3314,13 +3320,14 @@ WalSndShmemInit(void) * advisable. */ void -WalSndWakeup(void) +WalSndWakeup(bool physical, bool logical) { int i; for (i = 0; i < max_wal_senders; i++) { Latch *latch; + ReplicationKind kind; WalSnd *walsnd = &WalSndCtl->walsnds[i]; /* @@ -3329,9 +3336,11 @@ WalSndWakeup(void) */ SpinLockAcquire(&walsnd->mutex); latch = walsnd->latch; + kind = walsnd->kind; SpinLockRelease(&walsnd->mutex); - if (latch != NULL) + if (latch != NULL && ((physical && kind == REPLICATION_KIND_PHYSICAL) || + (logical && kind == REPLICATION_KIND_LOGICAL))) SetLatch(latch); } } diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 52bb3e2aae..c6e4515201 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -42,7 +42,7 @@ extern void WalSndResourceCleanup(bool isCommit); extern void WalSndSignals(void); extern Size WalSndShmemSize(void); extern void WalSndShmemInit(void); -extern void WalSndWakeup(void); +extern void WalSndWakeup(bool physical, bool logical); extern void WalSndInitStopping(void); extern void WalSndWaitStopping(void); extern void HandleWalSndInitStopping(void); @@ -60,15 +60,15 @@ extern void WalSndRqstFileReload(void); /* * wakeup walsenders if there is work to be done */ -#define WalSndWakeupProcessRequests() \ - do \ - { \ - if (wake_wal_senders) \ - { \ - wake_wal_senders = false; \ - if (max_wal_senders > 0) \ - WalSndWakeup(); \ - } \ +#define WalSndWakeupProcessRequests(physical, logical) \ + do \ + { \ + if (wake_wal_senders) \ + { \ + wake_wal_senders = false; \ + if (max_wal_senders > 0) \ + WalSndWakeup(physical, logical); \ + } \ } while (0) #endif /* _WALSENDER_H */ diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 5310e054c4..ff25aa70a8 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -15,6 +15,7 @@ #include "access/xlog.h" #include "lib/ilist.h" #include "nodes/nodes.h" +#include "nodes/replnodes.h" #include "replication/syncrep.h" #include "storage/latch.h" #include "storage/shmem.h" @@ -79,6 +80,8 @@ typedef struct WalSnd * Timestamp of the last message received from standby. */ TimestampTz replyTime; + + ReplicationKind kind; } WalSnd; extern PGDLLIMPORT WalSnd *MyWalSnd; -- 2.34.1