From c32b3bd82b53e88cd5cbcca7a2e367aa446a68ae Mon Sep 17 00:00:00 2001
From: Rustam Khamidullin <r.khamidullin@postgrespro.ru>
Date: Fri, 14 Mar 2025 18:18:34 +0700
Subject: [PATCH] Implement batching for WAL records notification during
 cascade replication

Currently standby server notifies walsenders after applying of each WAL
record during cascade replication. This creates a bottleneck in case of large
number of sender processes during WalSndWakeup invocation. This change
introduces batching for such notifications, which are now sent either after
certain number of applied records or specified time interval (whichever comes
first).

Co-authored-by: Alexey Makhmutov <a.makhmutov@postgrespro.ru>
---
 src/backend/access/transam/xlogrecovery.c | 99 ++++++++++++++++++++++-
 src/backend/postmaster/startup.c          |  5 ++
 src/backend/utils/misc/guc_tables.c       | 22 +++++
 src/include/access/xlogrecovery.h         |  8 ++
 src/include/utils/timeout.h               |  1 +
 5 files changed, 133 insertions(+), 2 deletions(-)

diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index f23ec8969c2..4edeac46100 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -64,6 +64,7 @@
 #include "utils/pg_lsn.h"
 #include "utils/ps_status.h"
 #include "utils/pg_rusage.h"
+#include "utils/timeout.h"
 
 /* Unsupported old recovery command file names (relative to $PGDATA) */
 #define RECOVERY_COMMAND_FILE	"recovery.conf"
@@ -147,6 +148,13 @@ bool		InArchiveRecovery = false;
 static bool StandbyModeRequested = false;
 bool		StandbyMode = false;
 
+/*
+ * Whether we are currently in process of processing recovery records while
+ * allowing downstream replication instances
+ */
+#define StandbyWithCascadeReplication() \
+		(AmStartupProcess() && StandbyMode && AllowCascadeReplication())
+
 /* was a signal file present at startup? */
 static bool standby_signal_file_found = false;
 static bool recovery_signal_file_found = false;
@@ -303,6 +311,25 @@ bool		reachedConsistency = false;
 static char *replay_image_masked = NULL;
 static char *primary_image_masked = NULL;
 
+/*
+ * Maximum number of applied records in batch before notifying walsender during
+ * cascade replication
+ */
+int			cascadeReplicationMaxBatchSize;
+
+/*
+ * Maximum batching delay before notifying walsender during cascade replication
+ */
+int			cascadeReplicationMaxBatchDelay;
+
+/* Counter for applied records which are not yet signaled to walsenders */
+static int	appliedRecords = 0;
+
+/*
+ * True if downstream walsenders need to be notified about pending WAL records,
+ * set by timeout handler.
+ */
+volatile sig_atomic_t replicationNotificationPending = false;
 
 /*
  * Shared-memory state for WAL recovery.
@@ -1845,6 +1872,17 @@ PerformWalRecovery(void)
 		 * end of main redo apply loop
 		 */
 
+		/* Send notification for batched messages once loop is ended */
+		if (StandbyWithCascadeReplication() &&
+			cascadeReplicationMaxBatchSize > 1 &&
+			appliedRecords > 0)
+		{
+			if (cascadeReplicationMaxBatchDelay > 0)
+				disable_timeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, false);
+			appliedRecords = 0;
+			WalSndWakeup(false, true);
+		}
+
 		if (reachedRecoveryTarget)
 		{
 			if (!reachedConsistency)
@@ -2043,8 +2081,40 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
 	 *    be created otherwise)
 	 * ------
 	 */
-	if (AllowCascadeReplication())
-		WalSndWakeup(switchedTLI, true);
+
+	if (StandbyWithCascadeReplication())
+	{
+		if (cascadeReplicationMaxBatchSize <= 1)
+			WalSndWakeup(switchedTLI, true);
+		else
+		{
+			/*
+			 * If time line has switched, then we do not want to delay the
+			 * notification, otherwise we will wait until we apply specified
+			 * number of records before notifying downstream logical
+			 * walsenders.
+			 */
+			bool		batchFlushRequired =
+				++appliedRecords >= cascadeReplicationMaxBatchSize ||
+				replicationNotificationPending ||
+				switchedTLI;
+
+			if (batchFlushRequired)
+			{
+				if (appliedRecords > 0 && cascadeReplicationMaxBatchDelay > 0)
+					disable_timeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, false);
+				appliedRecords = 0;
+				replicationNotificationPending = false;
+			}
+
+			WalSndWakeup(switchedTLI, batchFlushRequired);
+
+			/* Setup timeout to limit maximum delay for notifications */
+			if (appliedRecords == 1 && cascadeReplicationMaxBatchDelay > 0)
+				enable_timeout_after(STANDBY_CASCADE_WAL_SEND_TIMEOUT,
+									 cascadeReplicationMaxBatchDelay);
+		}
+	}
 
 	/*
 	 * If rm_redo called XLogRequestWalReceiverReply, then we wake up the
@@ -5083,3 +5153,28 @@ assign_recovery_target_xid(const char *newval, void *extra)
 	else
 		recoveryTarget = RECOVERY_TARGET_UNSET;
 }
+
+/*
+ * Send notifications to downstream walsenders
+ */
+void
+StandbyWalCheckSendNotify(void)
+{
+	if (appliedRecords > 0)
+	{
+		WalSndWakeup(false, true);
+		appliedRecords = 0;
+	}
+	replicationNotificationPending = false;
+}
+
+/*
+ * Timer handler for batch notifications in cascade replication
+ */
+void
+StandbyWalSendTimeoutHandler(void)
+{
+	replicationNotificationPending = true;
+	/* Most likely process is waiting for arrival of WAL records */
+	SetLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+}
diff --git a/src/backend/postmaster/startup.c b/src/backend/postmaster/startup.c
index 27e86cf393f..555090c82a0 100644
--- a/src/backend/postmaster/startup.c
+++ b/src/backend/postmaster/startup.c
@@ -189,6 +189,10 @@ ProcessStartupProcInterrupts(void)
 	if (ProcSignalBarrierPending)
 		ProcessProcSignalBarrier();
 
+	/* Send a notification to downstream walsenders if required */
+	if (replicationNotificationPending)
+		StandbyWalCheckSendNotify();
+
 	/* Perform logging of memory contexts of this process */
 	if (LogMemoryContextPending)
 		ProcessLogMemoryContextInterrupt();
@@ -246,6 +250,7 @@ StartupProcessMain(const void *startup_data, size_t startup_data_len)
 	RegisterTimeout(STANDBY_DEADLOCK_TIMEOUT, StandbyDeadLockHandler);
 	RegisterTimeout(STANDBY_TIMEOUT, StandbyTimeoutHandler);
 	RegisterTimeout(STANDBY_LOCK_TIMEOUT, StandbyLockTimeoutHandler);
+	RegisterTimeout(STANDBY_CASCADE_WAL_SEND_TIMEOUT, StandbyWalSendTimeoutHandler);
 
 	/*
 	 * Unblock signals (they were blocked when the postmaster forked us)
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index f137129209f..cc547ba283b 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -2406,6 +2406,28 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"cascade_replication_batch_size", PGC_POSTMASTER, REPLICATION_STANDBY,
+			gettext_noop("Set the maximum number of applied WAL records before cascade walsenders are notified on standby."),
+			gettext_noop("0 disabled records batching in cascade replication"),
+			GUC_NOT_IN_SAMPLE
+		},
+		&cascadeReplicationMaxBatchSize,
+		500, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
+	{
+		{"cascade_replication_batch_delay", PGC_POSTMASTER, REPLICATION_STANDBY,
+			gettext_noop("Sets the maximum time before cascade walsenders are notified on standby about applied records."),
+			gettext_noop("0 disables timed notifications"),
+			GUC_NOT_IN_SAMPLE | GUC_UNIT_MS
+		},
+		&cascadeReplicationMaxBatchDelay,
+		500, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS,
 			gettext_noop("Sets the maximum number of concurrent connections."),
diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h
index 8e475e266d1..75a09d4b9e3 100644
--- a/src/include/access/xlogrecovery.h
+++ b/src/include/access/xlogrecovery.h
@@ -11,6 +11,8 @@
 #ifndef XLOGRECOVERY_H
 #define XLOGRECOVERY_H
 
+#include <signal.h>
+
 #include "access/xlogreader.h"
 #include "catalog/pg_control.h"
 #include "lib/stringinfo.h"
@@ -67,6 +69,8 @@ extern PGDLLIMPORT char *PrimarySlotName;
 extern PGDLLIMPORT char *recoveryRestoreCommand;
 extern PGDLLIMPORT char *recoveryEndCommand;
 extern PGDLLIMPORT char *archiveCleanupCommand;
+extern PGDLLIMPORT int cascadeReplicationMaxBatchSize;
+extern PGDLLIMPORT int cascadeReplicationMaxBatchDelay;
 
 /* indirectly set via GUC system */
 extern PGDLLIMPORT TransactionId recoveryTargetXid;
@@ -165,4 +169,8 @@ extern void RecoveryRequiresIntParameter(const char *param_name, int currValue,
 
 extern void xlog_outdesc(StringInfo buf, XLogReaderState *record);
 
+extern PGDLLIMPORT volatile sig_atomic_t replicationNotificationPending;
+extern void StandbyWalCheckSendNotify(void);
+extern void StandbyWalSendTimeoutHandler(void);
+
 #endif							/* XLOGRECOVERY_H */
diff --git a/src/include/utils/timeout.h b/src/include/utils/timeout.h
index 7b19beafdc9..0062cb562b9 100644
--- a/src/include/utils/timeout.h
+++ b/src/include/utils/timeout.h
@@ -36,6 +36,7 @@ typedef enum TimeoutId
 	IDLE_STATS_UPDATE_TIMEOUT,
 	CLIENT_CONNECTION_CHECK_TIMEOUT,
 	STARTUP_PROGRESS_TIMEOUT,
+	STANDBY_CASCADE_WAL_SEND_TIMEOUT,
 	/* First user-definable timeout reason */
 	USER_TIMEOUT,
 	/* Maximum number of timeout reasons */
-- 
2.51.0

