diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 60de3be92c..5b07eef3aa 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -125,6 +125,7 @@ static void WalRcvDie(int code, Datum arg); static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len); static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalRcvFlush(bool dying); +static void XLogWalRcvClose(XLogRecPtr recptr); static void XLogWalRcvSendReply(bool force, bool requestReply); static void XLogWalRcvSendHSFeedback(bool immed); static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime); @@ -883,42 +884,11 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) { int segbytes; - if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo, wal_segment_size)) - { - /* - * fsync() and close current file before we switch to next one. We - * would otherwise have to reopen this file to fsync it later - */ - if (recvFile >= 0) - { - char xlogfname[MAXFNAMELEN]; - - XLogWalRcvFlush(false); - - XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size); - - /* - * XLOG segment files will be re-read by recovery in startup - * process soon, so we don't advise the OS to release cache - * pages associated with the file like XLogFileClose() does. - */ - if (close(recvFile) != 0) - ereport(PANIC, - (errcode_for_file_access(), - errmsg("could not close log segment %s: %m", - xlogfname))); - - /* - * Create .done file forcibly to prevent the streamed segment - * from being archived later. - */ - if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS) - XLogArchiveForceDone(xlogfname); - else - XLogArchiveNotify(xlogfname, true); - } - recvFile = -1; + /* Close the current segment if it's completed */ + XLogWalRcvClose(recptr); + if (recvFile < 0) + { /* Create/use new log file */ XLByteToSeg(recptr, recvSegNo, wal_segment_size); recvFile = XLogFileInit(recvSegNo); @@ -967,6 +937,14 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) /* Update shared-memory status */ pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write); + + /* + * Close the current segment if it's fully written up in the last cycle of + * the loop, to create its archive notification file soon. Otherwise WAL + * archiving of the segment will be delayed until any data in the next + * segment is received and written. + */ + XLogWalRcvClose(recptr); } /* @@ -1020,6 +998,53 @@ XLogWalRcvFlush(bool dying) } } +/* + * Close the current segment if it's completed. + * + * Flush the segment to disk before closing it. Otherwise we have to + * reopen and fsync it later. + * + * Create an archive notification file since the segment is known completed. + */ +static void +XLogWalRcvClose(XLogRecPtr recptr) +{ + if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size)) + { + char xlogfname[MAXFNAMELEN]; + + /* + * fsync() and close current file before we switch to next one. We + * would otherwise have to reopen this file to fsync it later + */ + XLogWalRcvFlush(false); + + XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size); + + /* + * XLOG segment files will be re-read by recovery in startup process + * soon, so we don't advise the OS to release cache pages associated + * with the file like XLogFileClose() does. + */ + if (close(recvFile) != 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not close log segment %s: %m", + xlogfname))); + + /* + * Create .done file forcibly to prevent the streamed segment from + * being archived later. + */ + if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS) + XLogArchiveForceDone(xlogfname); + else + XLogArchiveNotify(xlogfname, true); + + recvFile = -1; + } +} + /* * Send reply message to primary, indicating our current WAL locations, oldest * xmin and the current time.