diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index c83ff3b..3e51cf3 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -161,9 +161,12 @@ static StringInfoData output_message; static StringInfoData reply_message; static StringInfoData tmpbuf; +/* Timestamp of last ProcessRepliesIfAny(). */ +static TimestampTz last_processing = 0; + /* - * Timestamp of the last receipt of the reply from the standby. Set to 0 if - * wal_sender_timeout doesn't need to be active. + * Timestamp of last ProcessRepliesIfAny() that saw a reply from the + * standby. Set to 0 if wal_sender_timeout doesn't need to be active. */ static TimestampTz last_reply_timestamp = 0; @@ -240,8 +243,8 @@ static void ProcessStandbyReplyMessage(void); static void ProcessStandbyHSFeedbackMessage(void); static void ProcessRepliesIfAny(void); static void WalSndKeepalive(bool requestReply); -static void WalSndKeepaliveIfNecessary(TimestampTz now); -static void WalSndCheckTimeOut(TimestampTz now); +static void WalSndKeepaliveIfNecessary(void); +static void WalSndCheckTimeOut(void); static long WalSndComputeSleeptime(TimestampTz now); static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); @@ -1202,18 +1205,16 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, /* Check for input from the client */ ProcessRepliesIfAny(); - now = GetCurrentTimestamp(); - /* die if timeout was reached */ - WalSndCheckTimeOut(now); + WalSndCheckTimeOut(); /* Send keepalive if the time has come */ - WalSndKeepaliveIfNecessary(now); + WalSndKeepaliveIfNecessary(); if (!pq_is_send_pending()) break; - sleeptime = WalSndComputeSleeptime(now); + sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT; @@ -1308,7 +1309,6 @@ WalSndWaitForWal(XLogRecPtr loc) for (;;) { long sleeptime; - TimestampTz now; /* * Emergency bailout if postmaster has died. This is to avoid the @@ -1393,13 +1393,11 @@ WalSndWaitForWal(XLogRecPtr loc) !pq_is_send_pending()) break; - now = GetCurrentTimestamp(); - /* die if timeout was reached */ - WalSndCheckTimeOut(now); + WalSndCheckTimeOut(); /* Send keepalive if the time has come */ - WalSndKeepaliveIfNecessary(now); + WalSndKeepaliveIfNecessary(); /* * Sleep until something happens or we time out. Also wait for the @@ -1408,7 +1406,7 @@ WalSndWaitForWal(XLogRecPtr loc) * new WAL to be generated. (But if we have nothing to send, we don't * want to wake on socket-writable.) */ - sleeptime = WalSndComputeSleeptime(now); + sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_SOCKET_READABLE | WL_TIMEOUT; @@ -1605,6 +1603,8 @@ ProcessRepliesIfAny(void) int r; bool received = false; + last_processing = GetCurrentTimestamp(); + for (;;) { pq_startmsgread(); @@ -1692,7 +1692,7 @@ ProcessRepliesIfAny(void) */ if (received) { - last_reply_timestamp = GetCurrentTimestamp(); + last_reply_timestamp = last_processing; waiting_for_ping_response = false; } } @@ -2071,10 +2071,18 @@ WalSndComputeSleeptime(TimestampTz now) /* * Check whether there have been responses by the client within - * wal_sender_timeout and shutdown if not. + * wal_sender_timeout and shutdown if not. Using last_processing as the + * reference point avoids counting server-side stalls against the client. + * However, a long server-side stall can make WalSndKeepaliveIfNecessary() + * postdate last_processing by more than wal_sender_timeout. If that happens, + * the client must reply almost immediately to avoid a timeout. This rarely + * affects the default configuration, under which clients spontaneously send a + * message every standby_message_timeout = wal_sender_timeout/6 = 10s. We + * could eliminate that problem by recognizing timeout expiration at + * wal_sender_timeout/2 after the keepalive. */ static void -WalSndCheckTimeOut(TimestampTz now) +WalSndCheckTimeOut(void) { TimestampTz timeout; @@ -2085,7 +2093,7 @@ WalSndCheckTimeOut(TimestampTz now) timeout = TimestampTzPlusMilliseconds(last_reply_timestamp, wal_sender_timeout); - if (wal_sender_timeout > 0 && now >= timeout) + if (wal_sender_timeout > 0 && last_processing >= timeout) { /* * Since typically expiration of replication timeout means @@ -2116,8 +2124,6 @@ WalSndLoop(WalSndSendDataCallback send_data) */ for (;;) { - TimestampTz now; - /* * Emergency bailout if postmaster has died. This is to avoid the * necessity for manual cleanup of all postmaster children. @@ -2195,13 +2201,11 @@ WalSndLoop(WalSndSendDataCallback send_data) WalSndDone(send_data); } - now = GetCurrentTimestamp(); - /* Check for replication timeout. */ - WalSndCheckTimeOut(now); + WalSndCheckTimeOut(); /* Send keepalive if the time has come */ - WalSndKeepaliveIfNecessary(now); + WalSndKeepaliveIfNecessary(); /* * We don't block if not caught up, unless there is unsent data @@ -2219,7 +2223,11 @@ WalSndLoop(WalSndSendDataCallback send_data) wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT | WL_SOCKET_READABLE; - sleeptime = WalSndComputeSleeptime(now); + /* + * Use fresh timestamp, not last_processed, to reduce the chance + * of reaching wal_sender_timeout before sending a keepalive. + */ + sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); if (pq_is_send_pending()) wakeEvents |= WL_SOCKET_WRITEABLE; @@ -3379,7 +3387,7 @@ WalSndKeepalive(bool requestReply) * Send keepalive message if too much time has elapsed. */ static void -WalSndKeepaliveIfNecessary(TimestampTz now) +WalSndKeepaliveIfNecessary(void) { TimestampTz ping_time; @@ -3400,7 +3408,7 @@ WalSndKeepaliveIfNecessary(TimestampTz now) */ ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp, wal_sender_timeout / 2); - if (now >= ping_time) + if (last_processing >= ping_time) { WalSndKeepalive(true); waiting_for_ping_response = true;