From 54dab841e02a8c00f4d14c5955bacc6309082f52 Mon Sep 17 00:00:00 2001 From: Rahila Syed Date: Wed, 9 Jul 2025 15:35:20 +0530 Subject: [PATCH] Changes for sending of WAL records before flush This patch adds all the wal sender side changes required to send unflushed WAL records to standby. In order to ensure that the unflushed transactions on primary are not applied on standby, update the flushedUpto position on standby only upto the flushPtr on master. Co-authored by: Melih Mutlu Co-authored by: Rahila Syed --- src/backend/access/transam/xlog.c | 16 +++- src/backend/access/transam/xloginsert.c | 4 + src/backend/replication/walreceiver.c | 30 ++++-- src/backend/replication/walsender.c | 122 ++++++++++++++++++------ src/bin/pg_basebackup/receivelog.c | 1 + src/include/access/xlog.h | 2 + 6 files changed, 136 insertions(+), 39 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 0baf0ac6160..f203ac442cb 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -704,7 +704,6 @@ static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr); static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr); -static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto); static char *GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli); static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos); static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos); @@ -923,6 +922,9 @@ XLogInsertRecord(XLogRecData *rdata, class == WALINSERT_SPECIAL_SWITCH, rdata, StartPos, EndPos, insertTLI); + if (StartPos - StartPos % XLOG_BLCKSZ + XLOG_BLCKSZ < EndPos) + WalSndWakeupRequest(); + /* * Unless record is flagged as not important, update LSN of last * important record in the current slot. When holding all locks, just @@ -1503,7 +1505,7 @@ WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt) * uninitialized page), and the inserter might need to evict an old WAL buffer * to make room for a new one, which in turn requires WALWriteLock. */ -static XLogRecPtr +XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto) { uint64 bytepos; @@ -6522,6 +6524,16 @@ GetInsertRecPtr(void) return recptr; } +XLogRecPtr +GetLogInsertRecPtr(void) +{ + XLogRecPtr recptr; + + recptr = pg_atomic_read_membarrier_u64(&XLogCtl->logInsertResult); + + return recptr; +} + /* * GetFlushRecPtr -- Returns the current flush position, ie, the last WAL * position known to be fsync'd to disk. This should only be used on a diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c index c7571429e8e..9c189104946 100644 --- a/src/backend/access/transam/xloginsert.c +++ b/src/backend/access/transam/xloginsert.c @@ -36,6 +36,7 @@ #include "miscadmin.h" #include "pg_trace.h" #include "replication/origin.h" +#include "replication/walsender.h" #include "storage/bufmgr.h" #include "storage/proc.h" #include "utils/memutils.h" @@ -526,6 +527,9 @@ XLogInsert(RmgrId rmid, uint8 info) XLogResetInsertion(); + /* Wake up Walsender and let it know that we inserted new WAL */ + WalSndWakeupProcessRequests(true, !RecoveryInProgress()); + return EndPos; } diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 7361ffc9dcf..335146745a4 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -110,6 +110,7 @@ static struct { XLogRecPtr Write; /* last byte + 1 written out in the standby */ XLogRecPtr Flush; /* last byte + 1 flushed in the standby */ + XLogRecPtr SenderFlush; /* last byte + 1 flushed in the sender */ } LogstreamResult; /* @@ -137,7 +138,7 @@ static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *start static void WalRcvDie(int code, Datum arg); static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli); -static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, +static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, XLogRecPtr flushedupto, TimeLineID tli); static void XLogWalRcvFlush(bool dying, TimeLineID tli); static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli); @@ -821,6 +822,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli) int hdrlen; XLogRecPtr dataStart; XLogRecPtr walEnd; + XLogRecPtr flushedWal; TimestampTz sendTime; bool replyRequested; @@ -830,7 +832,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli) { StringInfoData incoming_message; - hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64); + hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64) + sizeof(int64); if (len < hdrlen) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -842,12 +844,13 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli) /* read the fields */ dataStart = pq_getmsgint64(&incoming_message); walEnd = pq_getmsgint64(&incoming_message); + flushedWal = pq_getmsgint64(&incoming_message); sendTime = pq_getmsgint64(&incoming_message); ProcessWalSndrMessage(walEnd, sendTime); buf += hdrlen; len -= hdrlen; - XLogWalRcvWrite(buf, len, dataStart, tli); + XLogWalRcvWrite(buf, len, dataStart, flushedWal, tli); break; } case PqReplMsg_Keepalive: @@ -887,7 +890,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli) * Write XLOG data to disk. */ static void -XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli) +XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, XLogRecPtr flushedupto, TimeLineID tli) { int startoff; int byteswritten; @@ -960,6 +963,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli) buf += byteswritten; LogstreamResult.Write = recptr; + LogstreamResult.SenderFlush = flushedupto; } /* Update shared-memory status */ @@ -986,20 +990,32 @@ XLogWalRcvFlush(bool dying, TimeLineID tli) { Assert(tli != 0); - if (LogstreamResult.Flush < LogstreamResult.Write) + /* + * The wal records can be flushed on standby once the flushptr on primary + * is greater than flushptr on standby. At a given point in time it may be + * possible that some WAL records that have not been flushed to disk on + * primary may get flushed on standby but those WAL won't be applied on + * standby until they are flushed on primary. + */ + if ((LogstreamResult.Flush < LogstreamResult.Write) && + (LogstreamResult.Flush < LogstreamResult.SenderFlush)) { WalRcvData *walrcv = WalRcv; + XLogRecPtr flush_ptr; issue_xlog_fsync(recvFile, recvSegNo, tli); LogstreamResult.Flush = LogstreamResult.Write; + flush_ptr = LogstreamResult.Flush > LogstreamResult.SenderFlush ? LogstreamResult.SenderFlush : + LogstreamResult.Flush; + /* Update shared-memory status */ SpinLockAcquire(&walrcv->mutex); - if (walrcv->flushedUpto < LogstreamResult.Flush) + if (walrcv->flushedUpto < flush_ptr) { walrcv->latestChunkStart = walrcv->flushedUpto; - walrcv->flushedUpto = LogstreamResult.Flush; + walrcv->flushedUpto = flush_ptr; walrcv->receivedTLI = tli; } SpinLockRelease(&walrcv->mutex); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 59822f22b8d..71800eeb70f 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -3160,7 +3160,7 @@ WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, /* * Send out the WAL in its normal physical/stored form. * - * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk, + * Read up to MAX_SEND_SIZE bytes of WAL that's been written to WAL buffers, * but not yet sent to the client, and buffer it in the libpq output * buffer. * @@ -3174,9 +3174,12 @@ XLogSendPhysical(void) XLogRecPtr startptr; XLogRecPtr endptr; Size nbytes; + Size nbytesUntilFlush; + Size nbytesAfterFlush; XLogSegNo segno; WALReadError errinfo; Size rbytes; + XLogRecPtr flushPtr; /* If requested switch the WAL sender to the stopping state. */ if (got_STOPPING) @@ -3188,6 +3191,11 @@ XLogSendPhysical(void) return; } + if (am_cascading_walsender) + flushPtr = GetStandbyFlushRecPtr(NULL); + else + flushPtr = GetFlushRecPtr(NULL); + /* Figure out how far we can safely send the WAL. */ if (sendTimeLineIsHistoric) { @@ -3265,14 +3273,23 @@ XLogSendPhysical(void) /* * Streaming the current timeline on a primary. * - * Attempt to send all data that's already been written out and - * fsync'd to disk. We cannot go further than what's been written out - * given the current implementation of WALRead(). And in any case - * it's unsafe to send WAL that is not securely down to disk on the - * primary: if the primary subsequently crashes and restarts, standbys - * must not have applied any WAL that got lost on the primary. + * Try to send all data that has already been sent to the WAL buffers, + * even though it is unsafe to send WAL that hasn't been securely + * written to disk on the primary. If the primary crashes and + * restarts, standbys must not apply any WAL that was lost on the + * primary. To prevent this, even if we send and write WAL records to + * disk on the standby before they are flushed on the primary, we only + * apply them after they have been flushed on the primary. */ - SendRqstPtr = GetFlushRecPtr(NULL); + SendRqstPtr = GetLogInsertRecPtr(); + if (sentPtr >= SendRqstPtr) + { + SendRqstPtr = WaitXLogInsertionsToFinish(sentPtr); + } + else + { + SendRqstPtr = WaitXLogInsertionsToFinish(SendRqstPtr); + } } /* @@ -3375,6 +3392,26 @@ XLogSendPhysical(void) nbytes = endptr - startptr; Assert(nbytes <= MAX_SEND_SIZE); + /* + * Older WALs are more likely to be evicted from buffers and written to + * disk. For any WAL before latest flush position, we first try to read + * from WAL buffers and then from disk. WALs after the flush position + * cannot be found on disk, so we only try to read such WALs from buffers. + */ + nbytesUntilFlush = 0; + nbytesAfterFlush = 0; + if (flushPtr > endptr) + nbytesUntilFlush = endptr - startptr; + else if (flushPtr > startptr) + { + nbytesUntilFlush = flushPtr - startptr; + nbytesAfterFlush = endptr - flushPtr; + } + else + nbytesAfterFlush = endptr - startptr; + + Assert(nbytes == (nbytesAfterFlush + nbytesUntilFlush)); + /* * OK to read and send the slice. */ @@ -3382,7 +3419,8 @@ XLogSendPhysical(void) pq_sendbyte(&output_message, PqReplMsg_WALData); pq_sendint64(&output_message, startptr); /* dataStart */ - pq_sendint64(&output_message, SendRqstPtr); /* walEnd */ + pq_sendint64(&output_message, endptr); /* walEnd */ + pq_sendint64(&output_message, flushPtr); /* wal flushed upto */ pq_sendint64(&output_message, 0); /* sendtime, filled in last */ /* @@ -3392,25 +3430,49 @@ XLogSendPhysical(void) enlargeStringInfo(&output_message, nbytes); retry: - /* attempt to read WAL from WAL buffers first */ - rbytes = WALReadFromBuffers(&output_message.data[output_message.len], - startptr, nbytes, xlogreader->seg.ws_tli); - output_message.len += rbytes; - startptr += rbytes; - nbytes -= rbytes; - - /* now read the remaining WAL from WAL file */ - if (nbytes > 0 && - !WALRead(xlogreader, - &output_message.data[output_message.len], - startptr, - nbytes, - xlogreader->seg.ws_tli, /* Pass the current TLI because - * only WalSndSegmentOpen controls - * whether new TLI is needed. */ - &errinfo)) - WALReadRaiseError(&errinfo); + if (nbytesAfterFlush == 0) + { + /* attempt to read WAL from WAL buffers first */ + rbytes = WALReadFromBuffers(&output_message.data[output_message.len], + startptr, nbytesUntilFlush, xlogreader->seg.ws_tli); + output_message.len += rbytes; + startptr += rbytes; + nbytes -= rbytes; + nbytesUntilFlush -= rbytes; + } + if (nbytesUntilFlush > 0) + { + if (!WALRead(xlogreader, + &output_message.data[output_message.len], + startptr, + nbytesUntilFlush, + xlogreader->seg.ws_tli, /* Pass the current TLI + * because only + * WalSndSegmentOpen controls + * whether new TLI is needed. */ + &errinfo)) + WALReadRaiseError(&errinfo); + output_message.len += nbytesUntilFlush; + startptr += nbytesUntilFlush; + nbytes -= nbytesUntilFlush; + } + /* + * Any WAL further than the latest flush position cannot be found on disk, + * so we try to read such WALs from buffers. + */ + if (nbytesAfterFlush > 0) + { + /* attempt to read WAL from WAL buffers for the rest */ + rbytes = WALReadFromBuffers(&output_message.data[output_message.len], + startptr, nbytesAfterFlush, xlogreader->seg.ws_tli); + output_message.len += rbytes; + startptr += rbytes; + nbytesAfterFlush -= rbytes; + } + endptr -= nbytesAfterFlush; + + output_message.data[output_message.len] = '\0'; /* See logical_read_xlog_page(). */ XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize); CheckXLogRemoved(segno, xlogreader->seg.ws_tli); @@ -3439,15 +3501,13 @@ retry: } } - output_message.len += nbytes; - output_message.data[output_message.len] = '\0'; /* * Fill the send timestamp last, so that it is taken as late as possible. */ resetStringInfo(&tmpbuf); pq_sendint64(&tmpbuf, GetCurrentTimestamp()); - memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)], + memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64) + sizeof(int64)], tmpbuf.data, sizeof(int64)); pq_putmessage_noblock(PqMsg_CopyData, output_message.data, output_message.len); @@ -4194,7 +4254,9 @@ WalSndKeepaliveIfNecessary(void) /* Try to flush pending output to the client */ if (pq_flush_if_writable() != 0) + { WalSndShutdown(); + } } } diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c index 25b13c7f55c..572b0a3b19f 100644 --- a/src/bin/pg_basebackup/receivelog.c +++ b/src/bin/pg_basebackup/receivelog.c @@ -1068,6 +1068,7 @@ ProcessWALDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, hdr_len = 1; /* msgtype PqReplMsg_WALData */ hdr_len += 8; /* dataStart */ hdr_len += 8; /* walEnd */ + hdr_len += 8; /* flushPtr */ hdr_len += 8; /* sendTime */ if (len < hdr_len) { diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index d12798be3d8..4033e62bb93 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -249,6 +249,7 @@ extern void UpdateFullPageWrites(void); extern void GetFullPageWriteInfo(XLogRecPtr *RedoRecPtr_p, bool *doPageWrites_p); extern XLogRecPtr GetRedoRecPtr(void); extern XLogRecPtr GetInsertRecPtr(void); +extern XLogRecPtr GetLogInsertRecPtr(void); extern XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI); extern TimeLineID GetWALInsertionTimeLine(void); extern TimeLineID GetWALInsertionTimeLineIfSet(void); @@ -297,6 +298,7 @@ extern void do_pg_backup_stop(BackupState *state, bool waitforarchive); extern void do_pg_abort_backup(int code, Datum arg); extern void register_persistent_abort_backup_handler(void); extern SessionBackupState get_backup_status(void); +extern XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto); /* File path names (all relative to $PGDATA) */ #define RECOVERY_SIGNAL_FILE "recovery.signal" -- 2.34.1