diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 75ec985953..2818bf5e25 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -118,6 +118,7 @@ static void WalRcvDie(int code, Datum arg); static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len); static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalRcvFlush(bool dying); +static void XLogWalRcvClose(XLogRecPtr recptr); static void XLogWalRcvSendReply(bool force, bool requestReply); static void XLogWalRcvSendHSFeedback(bool immed); static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); @@ -920,46 +921,16 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) { int segbytes; - if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo)) - { - bool use_existent; - - /* - * fsync() and close current file before we switch to next one. We - * would otherwise have to reopen this file to fsync it later - */ - if (recvFile >= 0) - { - char xlogfname[MAXFNAMELEN]; + /* Close the current segment if it's completed */ + if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo)) + XLogWalRcvClose(recptr); - XLogWalRcvFlush(false); - - /* - * XLOG segment files will be re-read by recovery in startup - * process soon, so we don't advise the OS to release cache - * pages associated with the file like XLogFileClose() does. - */ - if (close(recvFile) != 0) - ereport(PANIC, - (errcode_for_file_access(), - errmsg("could not close log segment %s: %m", - XLogFileNameP(recvFileTLI, recvSegNo)))); - - /* - * Create .done file forcibly to prevent the streamed segment - * from being archived later. - */ - XLogFileName(xlogfname, recvFileTLI, recvSegNo); - if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS) - XLogArchiveForceDone(xlogfname); - else - XLogArchiveNotify(xlogfname); - } - recvFile = -1; + if (recvFile < 0) + { + bool use_existent = true; /* Create/use new log file */ XLByteToSeg(recptr, recvSegNo); - use_existent = true; recvFile = XLogFileInit(recvSegNo, &use_existent, true); recvFileTLI = ThisTimeLineID; recvOff = 0; @@ -1011,6 +982,15 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) LogstreamResult.Write = recptr; } + + /* + * Close the current segment if it's fully written up in the last cycle of + * the loop, to create its archive notification file soon. Otherwise WAL + * archiving of the segment will be delayed until any data in the next + * segment is received and written. + */ + if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo)) + XLogWalRcvClose(recptr); } /* @@ -1065,6 +1045,52 @@ XLogWalRcvFlush(bool dying) } } +/* + * Close the current segment. + * + * Flush the segment to disk before closing it. Otherwise we have to + * reopen and fsync it later. + * + * Create an archive notification file since the segment is known completed. + */ +static void +XLogWalRcvClose(XLogRecPtr recptr) +{ + char xlogfname[MAXFNAMELEN]; + + Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo)); + + /* + * fsync() and close current file before we switch to next one. We would + * otherwise have to reopen this file to fsync it later + */ + XLogWalRcvFlush(false); + + XLogFileName(xlogfname, recvFileTLI, recvSegNo); + + /* + * XLOG segment files will be re-read by recovery in startup process soon, + * so we don't advise the OS to release cache pages associated with the + * file like XLogFileClose() does. + */ + if (close(recvFile) != 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not close log segment %s: %m", + xlogfname))); + + /* + * Create .done file forcibly to prevent the streamed segment from being + * archived later. + */ + if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS) + XLogArchiveForceDone(xlogfname); + else + XLogArchiveNotify(xlogfname); + + recvFile = -1; +} + /* * Send reply message to primary, indicating our current WAL locations, oldest * xmin and the current time.