From 1dfe8b9f2612e2e917d8c0d20d9631ad62439f36 Mon Sep 17 00:00:00 2001
From: Sokolov Yura <funny.falcon@postgrespro.ru>
Date: Wed, 27 Sep 2017 12:53:56 +0300
Subject: [PATCH 2/2] Fix walsender timeouts when decoding large transaction

The logical slots have fast code path for sending data in order to not
impose too high per message overhead. The fast path skips checks for
interrupts and timeouts. However the fast path failed to consider the
fact that transaction with large number of changes may take very long to
be processed and sent to the client. This causes walsender to ignore
interrupts for potentially long time but more importantly it will cause
walsender being killed due to timeout at the end of such transaction.

This commit changes the fast path to also check for interrupts and only
allows calling the fast path when last keeplaive check happened less
than half of walsender timeout ago, otherwise the slower code path will
be taken.

Discussion:
https://www.postgresql.org/message-id/flat/e082a56a-fd95-a250-3bae-0fff93832510@2ndquadrant.com
---
 src/backend/replication/walsender.c | 80 ++++++++++++++++++++++++++++++-------
 1 file changed, 65 insertions(+), 15 deletions(-)

diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 6ec4e63161..67ddfffea5 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1151,6 +1151,9 @@ static void
 WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 				bool last_write)
 {
+	bool		firsttime = true;
+	TimestampTz now = GetCurrentTimestamp();
+
 	/* output previously gathered data in a CopyData packet */
 	pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
 
@@ -1160,7 +1163,7 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 	 * several releases by streaming physical replication.
 	 */
 	resetStringInfo(&tmpbuf);
-	pq_sendint64(&tmpbuf, GetCurrentTimestamp());
+	pq_sendint64(&tmpbuf, now);
 	memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
 		   tmpbuf.data, sizeof(int64));
 
@@ -1169,14 +1172,29 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 	if (pq_flush_if_writable() != 0)
 		WalSndShutdown();
 
-	if (!pq_is_send_pending())
+	/*
+	 * If transaction is too big, then we can spend a lot of time sending it.
+	 * We should fall to slow path to check reciever's replies to update
+	 * last_reply_timestamp.
+	 */
+	if (!pq_is_send_pending() &&
+		(last_reply_timestamp <= 0 || wal_sender_timeout <= 0 ||
+		 now <= TimestampTzPlusMilliseconds(last_reply_timestamp,
+											wal_sender_timeout / 2)))
+	{
+		/*
+		 * Always check for interrupts to be able to exit from sending huge
+		 * transaction.
+		 */
+		CHECK_FOR_INTERRUPTS();
+
 		return;
+	}
 
 	for (;;)
 	{
 		int			wakeEvents;
 		long		sleeptime;
-		TimestampTz now;
 
 		/*
 		 * Emergency bailout if postmaster has died.  This is to avoid the
@@ -1205,19 +1223,34 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 		if (pq_flush_if_writable() != 0)
 			WalSndShutdown();
 
-		/* If we finished clearing the buffered data, we're done here. */
-		if (!pq_is_send_pending())
-			break;
-
 		now = GetCurrentTimestamp();
 
-		/* die if timeout was reached */
-		WalSndCheckTimeOut(now);
+		/*
+		 * Always sleep first time we enter this loop. This will give a chance
+		 * to catch wal reciever's response in ProcessRepliesIfAny() above.
+		 * This fixes outstanding timeouts on very fast networks (localhost,
+		 * for example). See discussion at:
+		 * https://www.postgresql.org/message-id/flat/e082a56a-fd95-a250-3bae-0fff93832510@2ndquadrant.com
+		 */
+		if (!firsttime || (last_reply_timestamp <= 0 && wal_sender_timeout <= 0))
+		{
+			/* If we finished clearing the buffered data, we're done here. */
+			if (!pq_is_send_pending())
+				break;
 
-		/* Send keepalive if the time has come */
-		WalSndKeepaliveIfNecessary(now);
+			/* die if timeout was reached */
+			WalSndCheckTimeOut(now);
+
+			/* Send keepalive if the time has come */
+			WalSndKeepaliveIfNecessary(now);
+		}
+
+		firsttime = false;
 
 		sleeptime = WalSndComputeSleeptime(now);
+		/* sleep at least 1 ms even if we already timed out */
+		if (sleeptime == 0)
+			sleeptime = 1;
 
 		wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
 			WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
@@ -2074,6 +2107,8 @@ WalSndCheckTimeOut(TimestampTz now)
 static void
 WalSndLoop(WalSndSendDataCallback send_data)
 {
+	bool		firsttime = true;
+
 	/*
 	 * Initialize the last reply timestamp. That enables timeout processing
 	 * from hereon.
@@ -2131,7 +2166,10 @@ WalSndLoop(WalSndSendDataCallback send_data)
 		 * caught up.
 		 */
 		if (!pq_is_send_pending())
+		{
 			send_data();
+			firsttime = true;
+		}
 		else
 			WalSndCaughtUp = false;
 
@@ -2171,11 +2209,20 @@ WalSndLoop(WalSndSendDataCallback send_data)
 
 		now = GetCurrentTimestamp();
 
-		/* Check for replication timeout. */
-		WalSndCheckTimeOut(now);
+		/*
+		 * Skip timeout check first time after sending something. See comment
+		 * in WalSndWriteData.
+		 */
+		if (!firsttime)
+		{
+			/* Check for replication timeout. */
+			WalSndCheckTimeOut(now);
 
-		/* Send keepalive if the time has come */
-		WalSndKeepaliveIfNecessary(now);
+			/* Send keepalive if the time has come */
+			WalSndKeepaliveIfNecessary(now);
+		}
+
+		firsttime = false;
 
 		/*
 		 * We don't block if not caught up, unless there is unsent data
@@ -2194,6 +2241,9 @@ WalSndLoop(WalSndSendDataCallback send_data)
 				WL_SOCKET_READABLE;
 
 			sleeptime = WalSndComputeSleeptime(now);
+			/* sleep at least 1 ms even if we already timed out */
+			if (sleeptime == 0)
+				sleeptime = 1;
 
 			if (pq_is_send_pending())
 				wakeEvents |= WL_SOCKET_WRITEABLE;
-- 
2.11.0

