From c9017f7c55de864bb3459f6f927803577e94c5eb Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Mon, 2 Feb 2015 12:49:45 +0900
Subject: [PATCH] Make sure to send feedback at desired timing.

Continuous stream due to heavy-load on client side can prevent
feedbacks to be sent with expected interval, and it results in a
replication timeout on walsender. Exiting from the fast-path loop when
the time comes to make sure the feedback to be sent with expected
intervals.
---
 src/backend/replication/walreceiver.c | 54 +++++++++++++++++++++--------------
 src/bin/pg_basebackup/receivelog.c    | 10 +++++--
 2 files changed, 40 insertions(+), 24 deletions(-)

diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index bfbc02f..d77dc91 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -409,35 +409,45 @@ WalReceiverMain(void)
 				if (len != 0)
 				{
 					/*
+					 * The feedback interval cannot be longer than
+					 * wal_receiver_status_interval. last_recv_timestamp is a
+					 * bit earlier than the actual time here so this is
+					 * available for the start time of the timeout in the loop
+					 * below.
+					 */
+					TimestampTz last_feedback = last_recv_timestamp;
+
+					/*
 					 * Process the received data, and any subsequent data we
 					 * can read without blocking.
 					 */
-					for (;;)
+					while (len > 0)
 					{
-						if (len > 0)
-						{
-							/*
-							 * Something was received from master, so reset
-							 * timeout
-							 */
-							last_recv_timestamp = GetCurrentTimestamp();
-							ping_sent = false;
-							XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
-						}
-						else if (len == 0)
-							break;
-						else if (len < 0)
-						{
-							ereport(LOG,
-									(errmsg("replication terminated by primary server"),
-									 errdetail("End of WAL reached on timeline %u at %X/%X.",
-											   startpointTLI,
-											   (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));
-							endofwal = true;
+						/*
+						 * Something was received from master, so reset
+						 * timeout
+						 */
+						last_recv_timestamp = GetCurrentTimestamp();
+						ping_sent = false;
+						XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
+
+						/* Exit this loop if the time to reply has come */
+						if (TimestampDifferenceExceeds(last_feedback, last_recv_timestamp,
+													  wal_receiver_status_interval * 1000))
 							break;
-						}
+
 						len = walrcv_receive(0, &buf);
 					}
+					if (len < 0)
+					{
+						ereport(LOG,
+								(errmsg("replication terminated by primary server"),
+								 errdetail("End of WAL reached on timeline %u at %X/%X.",
+										   startpointTLI,
+										   (uint32) (LogstreamResult.Write >> 32), (uint32) LogstreamResult.Write)));
+						endofwal = true;
+						break;
+					}
 
 					/* Let the master know that we received some data. */
 					XLogWalRcvSendReply(false, false);
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index 8caedff..fb6738d 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -925,9 +925,15 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 			}
 
 			/*
-			 * Process the received data, and any subsequent data we
-			 * can read without blocking.
+			 * Process the received data, and any subsequent data we can read
+			 * without blocking except when the time to feedback comes.
 			 */
+			now = feGetCurrentTimestamp();
+			if (standby_message_timeout > 0 &&
+				feTimestampDifferenceExceeds(last_status, now,
+											 standby_message_timeout))
+				break;
+
 			r = CopyStreamReceive(conn, 0, &copybuf);
 		}
 	}
-- 
2.1.0.GIT

