From c8cc725b967a705f4626089f083d56190d5c5d44 Mon Sep 17 00:00:00 2001 From: Bertrand Drouvot Date: Mon, 3 Apr 2023 16:46:09 +0000 Subject: [PATCH v63 4/6] For cascading replication, wake up physical walsenders separately from logical walsenders. Physical walsenders can't send data until it's been flushed; logical walsenders can't decode and send data until it's been applied. On the standby, the WAL is flushed first, which will only wake up physical walsenders; and then applied, which will only wake up logical walsenders. Previously, all walsenders were awakened when the WAL was flushed. That was fine for logical walsenders on the primary; but on the standby the flushed WAL would have been not applied yet, so logical walsenders were awakened too early. Author: Bertrand Drouvot per idea from Jeff Davis and Amit Kapila. Reviewed-By: Sawada Masahiko, Robert Haas. --- src/backend/access/transam/xlog.c | 6 +-- src/backend/access/transam/xlogarchive.c | 2 +- src/backend/access/transam/xlogrecovery.c | 30 +++++++++++--- src/backend/replication/walreceiver.c | 2 +- src/backend/replication/walsender.c | 43 +++++++++++++++++---- src/include/replication/walsender.h | 22 +++++------ src/include/replication/walsender_private.h | 3 ++ 7 files changed, 79 insertions(+), 29 deletions(-) 35.6% src/backend/access/transam/ 47.8% src/backend/replication/ 16.5% src/include/replication/ diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 68dfb0344c..caeffc5860 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -2645,7 +2645,7 @@ XLogFlush(XLogRecPtr record) END_CRIT_SECTION(); /* wake up walsenders now that we've released heavily contended locks */ - WalSndWakeupProcessRequests(); + WalSndWakeupProcessRequests(true, !RecoveryInProgress()); /* * If we still haven't flushed to the request point then we have a @@ -2816,7 +2816,7 @@ XLogBackgroundFlush(void) END_CRIT_SECTION(); /* wake up walsenders now that we've released heavily contended locks */ - WalSndWakeupProcessRequests(); + WalSndWakeupProcessRequests(true, !RecoveryInProgress()); /* * Great, done. To take some work off the critical path, try to initialize @@ -5773,7 +5773,7 @@ StartupXLOG(void) * If there were cascading standby servers connected to us, nudge any wal * sender processes to notice that we've been promoted. */ - WalSndWakeup(); + WalSndWakeup(true, true); /* * If this was a promotion, request an (online) checkpoint now. This isn't diff --git a/src/backend/access/transam/xlogarchive.c b/src/backend/access/transam/xlogarchive.c index a0f5aa24b5..f3fb92c8f9 100644 --- a/src/backend/access/transam/xlogarchive.c +++ b/src/backend/access/transam/xlogarchive.c @@ -421,7 +421,7 @@ KeepFileRestoredFromArchive(const char *path, const char *xlogfname) * if we restored something other than a WAL segment, but it does no harm * either. */ - WalSndWakeup(); + WalSndWakeup(true, false); } /* diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index dbe9394762..e6427c54c5 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -1935,6 +1935,30 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl XLogRecoveryCtl->lastReplayedTLI = *replayTLI; SpinLockRelease(&XLogRecoveryCtl->info_lck); + /* + * Wakeup walsenders: + * + * On the standby, the WAL is flushed first (which will only wake up + * physical walsenders) and then applied, which will only wake up logical + * walsenders. + * + * Indeed, logical walsenders on standby can't decode and send data until + * it's been applied. + * + * Physical walsenders don't need to be woken up during replay unless + * cascading replication is allowed and time line change occured (so that + * they can notice that they are on a new time line). + * + * That's why the wake up conditions are for: + * + * - physical walsenders in case of new time line and cascade + * replication is allowed. + * - logical walsenders in case cascade replication is allowed (could not + * be created otherwise). + */ + if (AllowCascadeReplication()) + WalSndWakeup(switchedTLI, true); + /* * If rm_redo called XLogRequestWalReceiverReply, then we wake up the * receiver so that it notices the updated lastReplayedEndRecPtr and sends @@ -1958,12 +1982,6 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl */ RemoveNonParentXlogFiles(xlogreader->EndRecPtr, *replayTLI); - /* - * Wake up any walsenders to notice that we are on a new timeline. - */ - if (AllowCascadeReplication()) - WalSndWakeup(); - /* Reset the prefetcher. */ XLogPrefetchReconfigure(); } diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 685af51d5d..feff709435 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -1010,7 +1010,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli) /* Signal the startup process and walsender that new WAL has arrived */ WakeupRecovery(); if (AllowCascadeReplication()) - WalSndWakeup(); + WalSndWakeup(true, false); /* Report XLOG streaming progress in PS display */ if (update_process_title) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 2d908d1de2..97990e1827 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2628,6 +2628,23 @@ InitWalSenderSlot(void) walsnd->sync_standby_priority = 0; walsnd->latch = &MyProc->procLatch; walsnd->replyTime = 0; + + /* + * The kind assignment is done here and not in StartReplication() + * and StartLogicalReplication(). Indeed, the logical walsender + * needs to read WAL records (like snapshot of running + * transactions) during the slot creation. So it needs to be woken + * up based on its kind. + * + * The kind assignment could also be done in StartReplication(), + * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it + * seems better to set it on one place. + */ + if (MyDatabaseId == InvalidOid) + walsnd->kind = REPLICATION_KIND_PHYSICAL; + else + walsnd->kind = REPLICATION_KIND_LOGICAL; + SpinLockRelease(&walsnd->mutex); /* don't need the lock anymore */ MyWalSnd = (WalSnd *) walsnd; @@ -3310,30 +3327,42 @@ WalSndShmemInit(void) } /* - * Wake up all walsenders + * Wake up physical, logical or both walsenders kind + * + * The distinction between physical and logical walsenders is done, because: + * - physical walsenders can't send data until it's been flushed + * - logical walsenders on standby can't decode and send data until it's been + * applied + * + * For cascading replication we need to wake up physical + * walsenders separately from logical walsenders (see the comment before calling + * WalSndWakeup() in ApplyWalRecord() for more details). * * This will be called inside critical sections, so throwing an error is not * advisable. */ void -WalSndWakeup(void) +WalSndWakeup(bool physical, bool logical) { int i; for (i = 0; i < max_wal_senders; i++) { Latch *latch; + ReplicationKind kind; WalSnd *walsnd = &WalSndCtl->walsnds[i]; - /* - * Get latch pointer with spinlock held, for the unlikely case that - * pointer reads aren't atomic (as they're 8 bytes). - */ + /* get latch pointer and kind with spinlock helds */ SpinLockAcquire(&walsnd->mutex); latch = walsnd->latch; + kind = walsnd->kind; SpinLockRelease(&walsnd->mutex); - if (latch != NULL) + if (latch == NULL) + continue; + + if ((physical && kind == REPLICATION_KIND_PHYSICAL) || + (logical && kind == REPLICATION_KIND_LOGICAL)) SetLatch(latch); } } diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 52bb3e2aae..9df7e50f94 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -42,7 +42,7 @@ extern void WalSndResourceCleanup(bool isCommit); extern void WalSndSignals(void); extern Size WalSndShmemSize(void); extern void WalSndShmemInit(void); -extern void WalSndWakeup(void); +extern void WalSndWakeup(bool physical, bool logical); extern void WalSndInitStopping(void); extern void WalSndWaitStopping(void); extern void HandleWalSndInitStopping(void); @@ -60,15 +60,15 @@ extern void WalSndRqstFileReload(void); /* * wakeup walsenders if there is work to be done */ -#define WalSndWakeupProcessRequests() \ - do \ - { \ - if (wake_wal_senders) \ - { \ - wake_wal_senders = false; \ - if (max_wal_senders > 0) \ - WalSndWakeup(); \ - } \ - } while (0) +static inline void +WalSndWakeupProcessRequests(bool physical, bool logical) +{ + if (wake_wal_senders) + { + wake_wal_senders = false; + if (max_wal_senders > 0) + WalSndWakeup(physical, logical); + } +} #endif /* _WALSENDER_H */ diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 5310e054c4..ff25aa70a8 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -15,6 +15,7 @@ #include "access/xlog.h" #include "lib/ilist.h" #include "nodes/nodes.h" +#include "nodes/replnodes.h" #include "replication/syncrep.h" #include "storage/latch.h" #include "storage/shmem.h" @@ -79,6 +80,8 @@ typedef struct WalSnd * Timestamp of the last message received from standby. */ TimestampTz replyTime; + + ReplicationKind kind; } WalSnd; extern PGDLLIMPORT WalSnd *MyWalSnd; -- 2.34.1