From ef7b04c9ddf351ca99736d9ec9fa1954383cd124 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Tue, 24 Feb 2015 17:52:01 +0900
Subject: [PATCH] Make effort to send feedback regulary on heavy load.

pg_basebackup and pg_receivexlog might be forced to omit sending
feedback for long time by continuous replication stream caused by
possible heavy load on receiver side. Keep alives from the server
could be delayed on such a situation. This patch let them make efforts
to send feedback on such a situation. On every boundary between WAL
segments, send feedback if so the time has come just after syncing and
closing the segment just finished.
---
 src/bin/pg_basebackup/receivelog.c | 28 +++++++++++++++++++++++++---
 1 file changed, 25 insertions(+), 3 deletions(-)

diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index 8caedff..df51f9d 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -45,7 +45,8 @@ static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
 static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
 							   XLogRecPtr *blockpos, uint32 timeline,
 							   char *basedir, stream_stop_callback stream_stop,
-							   char *partial_suffix, bool mark_done);
+							   char *partial_suffix, bool mark_done,
+							   int standby_message_timeout, int64 *last_status);
 static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf,
 									   XLogRecPtr blockpos, char *basedir, char *partial_suffix,
 									   XLogRecPtr *stoppos, bool mark_done);
@@ -906,7 +907,8 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 			{
 				if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
 										timeline, basedir, stream_stop,
-										partial_suffix, mark_done))
+										partial_suffix, mark_done,
+										standby_message_timeout, &last_status))
 					goto error;
 
 				/*
@@ -1115,7 +1117,8 @@ static bool
 ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
 				   XLogRecPtr *blockpos, uint32 timeline,
 				   char *basedir, stream_stop_callback stream_stop,
-				   char *partial_suffix, bool mark_done)
+				   char *partial_suffix, bool mark_done,
+				   int standby_message_timeout, int64 *last_status)
 {
 	int			xlogoff;
 	int			bytes_left;
@@ -1223,12 +1226,31 @@ ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
 		/* Did we reach the end of a WAL segment? */
 		if (*blockpos % XLOG_SEG_SIZE == 0)
 		{
+			int64 now;
 			if (!close_walfile(basedir, partial_suffix, *blockpos, mark_done))
 				/* Error message written in close_walfile() */
 				return false;
 
 			xlogoff = 0;
 
+			/*
+			 * Continuous input stream might cause long duration after the
+			 * previous feedback. Here is a good point to check if the time to
+			 * feedback has come because the fsync done in close_walfile()
+			 * might have taken long time.
+			 */
+			if (standby_message_timeout > 0)
+			{
+				now = feGetCurrentTimestamp();
+				if(feTimestampDifferenceExceeds(*last_status, now,
+												standby_message_timeout))
+				{
+					if (!sendFeedback(conn, *blockpos, now, false))
+						return false;
+					*last_status = now;
+				}
+			}
+
 			if (still_sending && stream_stop(*blockpos, timeline, true))
 			{
 				if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
-- 
2.1.0.GIT

