From 387e5d15f89cdff505c28d61f4d89b6dd5f4dbcb Mon Sep 17 00:00:00 2001
From: reVInotip <grigoriy.novikov220@gmail.com>
Date: Sat, 18 Oct 2025 10:45:08 +0700
Subject: [PATCH 1/4] Cascade sync rep
 Implemented cascaded synchronous replication support. The new
 SyncRepGetSendingSyncRecPtr function in syncrep.c computes replication status
 values by aggregating positions from synchronous standbys with local server
 positions. Furthermore, walsender processes can now prompt walreceiver
 processes for immediate feedback delivery to upstream replication sources.

---
 src/backend/replication/syncrep.c           | 161 +++++++++++++++++++++++-----
 src/backend/replication/walreceiver.c       |  27 +++--
 src/backend/replication/walsender.c         |   2 +
 src/include/access/xlogdefs.h               |   8 ++
 src/include/replication/syncrep.h           |   4 +
 src/include/replication/walsender_private.h |   3 +-
 6 files changed, 172 insertions(+), 33 deletions(-)

diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index 32cf3a48b89..97351db97bc 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -105,6 +105,11 @@ static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
 								 XLogRecPtr *flushPtr,
 								 XLogRecPtr *applyPtr,
 								 bool *am_sync);
+static bool SyncRepGetSyncRecPtrBySyncRepMethod(XLogRecPtr *writePtr,
+												XLogRecPtr *flushPtr,
+												XLogRecPtr *applyPtr,
+												SyncRepStandbyData *sync_standbys,
+												int num_standbys);
 static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
 									   XLogRecPtr *flushPtr,
 									   XLogRecPtr *applyPtr,
@@ -624,29 +629,11 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
 		return false;
 	}
 
-	/*
-	 * In a priority-based sync replication, the synced positions are the
-	 * oldest ones among sync standbys. In a quorum-based, they are the Nth
-	 * latest ones.
-	 *
-	 * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest
-	 * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation
-	 * because it's a bit more efficient.
-	 *
-	 * XXX If the numbers of current and requested sync standbys are the same,
-	 * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
-	 * positions even in a quorum-based sync replication.
-	 */
-	if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
+	/* Try calculate sync Write, Flush and Apply positions */
+	if (!SyncRepGetSyncRecPtrBySyncRepMethod(writePtr, flushPtr, applyPtr, sync_standbys, num_standbys))
 	{
-		SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
-								   sync_standbys, num_standbys);
-	}
-	else
-	{
-		SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
-									  sync_standbys, num_standbys,
-									  SyncRepConfig->num_sync);
+		pfree(sync_standbys);
+		return false;
 	}
 
 	pfree(sync_standbys);
@@ -867,8 +854,6 @@ SyncRepGetStandbyPriority(void)
 	 * Since synchronous cascade replication is not allowed, we always set the
 	 * priority of cascading walsender to zero.
 	 */
-	if (am_cascading_walsender)
-		return 0;
 
 	if (!SyncStandbysDefined() || SyncRepConfig == NULL)
 		return 0;
@@ -1048,6 +1033,134 @@ SyncRepQueueIsOrderedByLSN(int mode)
 }
 #endif
 
+/*
+ * ===========================================================
+ * Synchronous Replication functions for wal receiver processes
+ * ===========================================================
+ */
+
+/*
+ * Calculate sync Write, Flush and Apply positions for sending to replication source.
+ * If synchronous replication not configured return myWritePtr, myFlushPtr and myApplyPtr.
+ */
+void
+SyncRepGetSendingSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr,
+							XLogRecPtr myWritePtr, XLogRecPtr myFlushPtr, XLogRecPtr myApplyPtr)
+{
+	SyncRepStandbyData *sync_standbys;
+	int			num_standbys;
+
+	/*
+	 * Initialize default results. We use InvalidXLogRecPtr instead of
+	 * DefaultSendingLSN to ensure the correct operation of the algorithm
+	 * under priority-based sync replication
+	 */
+	*writePtr = InvalidXLogRecPtr;
+	*flushPtr = InvalidXLogRecPtr;
+	*applyPtr = InvalidXLogRecPtr;
+
+	/*
+	 * Returns the standby's Write, Flush, and Apply positions even if it is
+	 * not configured as a synchronous standby, or if there are no synchronous
+	 * replicas configured.
+	 */
+	if (SyncRepConfig == NULL || SyncRepConfig->num_sync == 0)
+	{
+		*writePtr = myWritePtr;
+		*flushPtr = myFlushPtr;
+		*applyPtr = myApplyPtr;
+		return;
+	}
+
+	/* Get standbys that are considered as synchronous at this moment */
+	num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
+
+	/*
+	 * Nothing more to do if there are not enough synchronous standbys.
+	 */
+	if (num_standbys < SyncRepConfig->num_sync)
+	{
+		*writePtr = DefaultSendingLSN;
+		*flushPtr = DefaultSendingLSN;
+		*applyPtr = DefaultSendingLSN;
+
+		elog(DEBUG3, "waiting %d standbys, but only %d is connecting", SyncRepConfig->num_sync, num_standbys);
+		pfree(sync_standbys);
+		return;
+	}
+
+	/*
+	 * Try calculate sync Write, Flush and Apply positions. If return false
+	 * writePtr et al not be changed.
+	 */
+	if (!SyncRepGetSyncRecPtrBySyncRepMethod(writePtr, flushPtr, applyPtr, sync_standbys, num_standbys))
+	{
+		*writePtr = DefaultSendingLSN;
+		*flushPtr = DefaultSendingLSN;
+		*applyPtr = DefaultSendingLSN;
+
+		elog(DEBUG3, "something went wrong then trying calculate sync write, flush and apply positions");
+		pfree(sync_standbys);
+		return;
+	}
+
+	*writePtr = Min(myWritePtr, *writePtr);
+	*flushPtr = Min(myFlushPtr, *flushPtr);
+	*applyPtr = Min(myApplyPtr, *applyPtr);
+
+	pfree(sync_standbys);
+}
+
+/*
+ * ===========================================================
+ * Synchronous Replication functions for wal receiver and wal sender processes
+ * ===========================================================
+ */
+
+/*
+ * Calculates the Write, Flush, and Apply positions for synchronous
+ * standbys using the synchronous replication method. Returns true if the calculation is successful,
+ * otherwise false.
+ */
+static bool
+SyncRepGetSyncRecPtrBySyncRepMethod(XLogRecPtr *writePtr,
+									XLogRecPtr *flushPtr,
+									XLogRecPtr *applyPtr,
+									SyncRepStandbyData *sync_standbys,
+									int num_standbys)
+{
+	/* Quick out if not even configured to be synchronous */
+	if (SyncRepConfig == NULL)
+		return false;
+
+	/*
+	 * In a priority-based sync replication, the synced positions are the
+	 * oldest ones among sync standbys. In a quorum-based, they are the Nth
+	 * latest ones.
+	 *
+	 * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest
+	 * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation
+	 * because it's a bit more efficient.
+	 *
+	 * XXX If the numbers of current and requested sync standbys are the same,
+	 * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
+	 * positions even in a quorum-based sync replication.
+	 */
+	if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
+	{
+		SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
+								   sync_standbys, num_standbys);
+	}
+	else
+	{
+		SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
+									  sync_standbys, num_standbys,
+									  SyncRepConfig->num_sync);
+	}
+
+	return true;
+}
+
 /*
  * ===========================================================
  * Synchronous Replication functions executed by any process
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 7361ffc9dcf..54452de125c 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -66,6 +66,7 @@
 #include "postmaster/auxprocess.h"
 #include "postmaster/interrupt.h"
 #include "replication/walreceiver.h"
+#include "replication/syncrep.h"
 #include "replication/walsender.h"
 #include "storage/ipc.h"
 #include "storage/proc.h"
@@ -529,10 +530,11 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len)
 					if (walrcv->force_reply)
 					{
 						/*
-						 * The recovery process has asked us to send apply
-						 * feedback now.  Make sure the flag is really set to
-						 * false in shared memory before sending the reply, so
-						 * we don't miss a new request for a reply.
+						 * The recovery or one of walsender processes has
+						 * asked us to send apply feedback now.  Make sure the
+						 * flag is really set to false in shared memory before
+						 * sending the reply, so we don't miss a new request
+						 * for a reply.
 						 */
 						walrcv->force_reply = false;
 						pg_memory_barrier();
@@ -1077,7 +1079,10 @@ XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
 
 /*
  * Send reply message to primary, indicating our current WAL locations, oldest
- * xmin and the current time.
+ * xmin and the current time. When synchronous replication is enabled, transmit
+ * the oldest write, flush, and apply positions from the current node and its
+ * standbys to the primary. If position calculation fails, fall back to
+ * DefaultSendingLSN.
  *
  * If 'force' is not set, the message is only sent if enough time has
  * passed since last status update to reach wal_receiver_status_interval.
@@ -1125,9 +1130,15 @@ XLogWalRcvSendReply(bool force, bool requestReply)
 	WalRcvComputeNextWakeup(WALRCV_WAKEUP_REPLY, now);
 
 	/* Construct a new message */
-	writePtr = LogstreamResult.Write;
-	flushPtr = LogstreamResult.Flush;
-	applyPtr = GetXLogReplayRecPtr(NULL);
+	if (SyncRepRequested())
+		SyncRepGetSendingSyncRecPtr(&writePtr, &flushPtr, &applyPtr,
+									LogstreamResult.Write, LogstreamResult.Flush, GetXLogReplayRecPtr(NULL));
+	else
+	{
+		writePtr = LogstreamResult.Write;
+		flushPtr = LogstreamResult.Flush;
+		applyPtr = GetXLogReplayRecPtr(NULL);
+	}
 
 	resetStringInfo(&reply_message);
 	pq_sendbyte(&reply_message, PqReplMsg_StandbyStatusUpdate);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 0855bae3535..90b12edf58a 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2500,6 +2500,8 @@ ProcessStandbyReplyMessage(void)
 
 	if (!am_cascading_walsender)
 		SyncRepReleaseWaiters();
+	else if (SyncRepRequested() && MyWalSnd->sync_standby_priority > 0)
+		WalRcvForceReply();
 
 	/*
 	 * Advance our local xmin horizon when the client confirmed a flush.
diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h
index 514f03df0b6..3e0cb1c9a14 100644
--- a/src/include/access/xlogdefs.h
+++ b/src/include/access/xlogdefs.h
@@ -28,6 +28,14 @@ typedef uint64 XLogRecPtr;
 #define InvalidXLogRecPtr	0
 #define XLogRecPtrIsInvalid(r)	((r) == InvalidXLogRecPtr)
 
+/*
+ * The default value for sending from a cascade synchronous standby to another
+ * cascade synchronous standby or to the primary, used when the sending LSN
+ * cannot be calculated (for example, when the number of synchronous standbys
+ * is less than required).
+ */
+#define DefaultSendingLSN   ((XLogRecPtr) 2)
+
 /*
  * First LSN to use for "fake" LSNs.
  *
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index 675669a79f7..8b6cec6b76d 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -92,6 +92,10 @@ extern int	SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys);
 /* called by checkpointer */
 extern void SyncRepUpdateSyncStandbysDefined(void);
 
+/* called by walreciever */
+extern void SyncRepGetSendingSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecPtr *applyPtr,
+										XLogRecPtr myWritePtr, XLogRecPtr myFlushPtr, XLogRecPtr myApplyPtr);
+
 /*
  * Internal functions for parsing synchronous_standby_names grammar,
  * in syncrep_gram.y and syncrep_scanner.l
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index e98701038f5..6b89ad80082 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -50,7 +50,8 @@ typedef struct WalSnd
 	/*
 	 * The xlog locations that have been written, flushed, and applied by
 	 * standby-side. These may be invalid if the standby-side has not offered
-	 * values yet.
+	 * values yet. If walsender is synchronous this locations will be contains
+	 * minimum (oldest) of the synchronous standbys among the entire subtree.
 	 */
 	XLogRecPtr	write;
 	XLogRecPtr	flush;
