From cfe01eadd4d8567f4410bccb8334c52fc897c002 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Wed, 18 Feb 2015 12:30:58 +0900
Subject: [PATCH] Make pg_bascbackup and pg_receivexlog to keep sending WAL
 receive feedback regularly on heavy load v2.

pg_basebackup and pg_receivexlog fail to send receiver reply message
while they are receiving continuous WAL stream caused by heavy load or
something else. This patch makes them to send reply message even on
such a situation.
---
 src/bin/pg_basebackup/receivelog.c | 25 +++++++++++++++++++++++++
 1 file changed, 25 insertions(+)

diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index 8caedff..453a047 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -24,6 +24,10 @@
 #include "libpq-fe.h"
 #include "access/xlog_internal.h"
 
+#define WAL_PROCESS_WORST_DURATION 1 /* Anticipated worst duration to process
+									  * wal record in seconds. This is used to
+									  * calculate how often to check the time
+									  * to send reply message */
 
 /* fd and filename for currently open WAL file */
 static int	walfile = -1;
@@ -826,6 +830,9 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 		int			r;
 		int64		now;
 		long		sleeptime;
+		int			reply_timeout_check_interval = 
+			standby_message_timeout / 1000 / WAL_PROCESS_WORST_DURATION;
+		int			loop_count = 0;
 
 		/*
 		 * Check if we should continue streaming, or abort at this point.
@@ -879,6 +886,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 		sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout,
 												 last_status);
 
+		loop_count = 0;
 		r = CopyStreamReceive(conn, sleeptime, &copybuf);
 		while (r != 0)
 		{
@@ -925,6 +933,23 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 			}
 
 			/*
+			 * Keep sending feedbacks regularly. We check it once per several
+			 * iterations for performance reason.
+			 */
+			if (loop_count++ >= reply_timeout_check_interval)
+			{
+				now = feGetCurrentTimestamp();
+				if (feTimestampDifferenceExceeds(last_status, now,
+												 standby_message_timeout))
+				{
+					if (!sendFeedback(conn, blockpos, now, false))
+						goto error;
+					last_status = now;
+				}
+				loop_count = 0;
+			}
+
+			/*
 			 * Process the received data, and any subsequent data we
 			 * can read without blocking.
 			 */
-- 
2.1.0.GIT

