From 4173fc5000c9101604e6c64a795322771cb0687a Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Thu, 2 Sep 2021 17:21:46 -0400
Subject: [PATCH v4 1/4] Implement FIRST_IS_ABORTED_CONTRECORD

---
 src/backend/access/transam/xlog.c       | 55 +++++++++++++++++++++++--
 src/backend/access/transam/xlogreader.c | 39 +++++++++++++++++-
 src/include/access/xlog_internal.h      | 14 ++++++-
 src/include/access/xlogreader.h         |  3 ++
 4 files changed, 105 insertions(+), 6 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index e51a7a749d..49912483d5 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -586,6 +586,8 @@ typedef struct XLogCtlData
 	XLogRecPtr	replicationSlotMinLSN;	/* oldest LSN needed by any slot */
 
 	XLogSegNo	lastRemovedSegNo;	/* latest removed/recycled XLOG segment */
+	XLogRecPtr	abortedContrecordPtr; /* LSN of incomplete record at end of
+									   * WAL */
 
 	/* Fake LSN counter, for unlogged relations. Protected by ulsn_lck. */
 	XLogRecPtr	unloggedLSN;
@@ -848,6 +850,7 @@ static XLogSource XLogReceiptSource = XLOG_FROM_ANY;
 /* State information for XLOG reading */
 static XLogRecPtr ReadRecPtr;	/* start of last record read */
 static XLogRecPtr EndRecPtr;	/* end+1 of last record read */
+static XLogRecPtr abortedContrecordPtr;	/* end+1 of incomplete record */
 
 /*
  * Local copies of equivalent fields in the control file.  When running
@@ -2246,6 +2249,31 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 		if (!Insert->forcePageWrites)
 			NewPage->xlp_info |= XLP_BKP_REMOVABLE;
 
+		/*
+		 * If the last page ended with an aborted partial continuation record,
+		 * mark the new page to indicate that the partial record can be
+		 * omitted.
+		 *
+		 * This happens only once at the end of recovery, so there's no race
+		 * condition here.
+		 */
+		if (XLogCtl->abortedContrecordPtr >= NewPageBeginPtr)
+		{
+#ifdef WAL_DEBUG
+			if (XLogCtl->abortedContrecordPtr != NewPageBeginPtr)
+				elog(PANIC, "inconsistent aborted contrecord location %X/%X, expected %X/%X",
+					 LSN_FORMAT_ARGS(XLogCtl->abortedContrecordPtr),
+					 LSN_FORMAT_ARGS(NewPageBeginPtr));
+			if (XLOG_DEBUG)
+				ereport(LOG,
+						(errmsg_internal("setting XLP_FIRST_IS_ABORTED_PARTIAL flag at %X/%X",
+										 LSN_FORMAT_ARGS(NewPageBeginPtr))));
+#endif
+			NewPage->xlp_info |= XLP_FIRST_IS_ABORTED_PARTIAL;
+
+			XLogCtl->abortedContrecordPtr = InvalidXLogRecPtr;
+		}
+
 		/*
 		 * If first page of an XLOG segment file, make it a long header.
 		 */
@@ -4392,6 +4420,7 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
 		record = XLogReadRecord(xlogreader, &errormsg);
 		ReadRecPtr = xlogreader->ReadRecPtr;
 		EndRecPtr = xlogreader->EndRecPtr;
+		abortedContrecordPtr = xlogreader->abortedContrecordPtr;
 		if (record == NULL)
 		{
 			if (readFile >= 0)
@@ -7691,10 +7720,30 @@ StartupXLOG(void)
 	/*
 	 * Re-fetch the last valid or last applied record, so we can identify the
 	 * exact endpoint of what we consider the valid portion of WAL.
+	 *
+	 * When recovery ended in an incomplete record, continue writing from the
+	 * point where it went missing.  This leaves behind an initial part of
+	 * broken record, which rescues downstream which have already received
+	 * that first part.
 	 */
-	XLogBeginRead(xlogreader, LastRec);
-	record = ReadRecord(xlogreader, PANIC, false);
-	EndOfLog = EndRecPtr;
+	if (XLogRecPtrIsInvalid(abortedContrecordPtr))
+	{
+		XLogBeginRead(xlogreader, LastRec);
+		record = ReadRecord(xlogreader, PANIC, false);
+		EndOfLog = EndRecPtr;
+	}
+	else
+	{
+#ifdef WAL_DEBUG
+		if (XLOG_DEBUG)
+			ereport(LOG,
+					(errmsg_internal("recovery overwriting broken contrecord at %X/%X (EndRecPtr: %X/%X)",
+									 LSN_FORMAT_ARGS(abortedContrecordPtr),
+									 LSN_FORMAT_ARGS(EndRecPtr))));
+#endif
+		EndOfLog = abortedContrecordPtr;
+		XLogCtl->abortedContrecordPtr = abortedContrecordPtr;
+	}
 
 	/*
 	 * EndOfLogTLI is the TLI in the filename of the XLOG segment containing
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 5cf74e181a..dbfa6d3562 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -278,6 +278,7 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 				total_len;
 	uint32		targetRecOff;
 	uint32		pageHeaderSize;
+	bool		assembled;
 	bool		gotheader;
 	int			readOff;
 
@@ -293,6 +294,7 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 	state->errormsg_buf[0] = '\0';
 
 	ResetDecoder(state);
+	state->abortedContrecordPtr = InvalidXLogRecPtr;
 
 	RecPtr = state->EndRecPtr;
 
@@ -319,7 +321,9 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 		randAccess = true;
 	}
 
+restart:
 	state->currRecPtr = RecPtr;
+	assembled = false;
 
 	targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
 	targetRecOff = RecPtr % XLOG_BLCKSZ;
@@ -415,6 +419,8 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 		char	   *buffer;
 		uint32		gotlen;
 
+		assembled = true;
+
 		/*
 		 * Enlarge readRecordBuf as needed.
 		 */
@@ -442,14 +448,28 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 			readOff = ReadPageInternal(state, targetPagePtr,
 									   Min(total_len - gotlen + SizeOfXLogShortPHD,
 										   XLOG_BLCKSZ));
-
 			if (readOff < 0)
 				goto err;
 
 			Assert(SizeOfXLogShortPHD <= readOff);
 
-			/* Check that the continuation on next page looks valid */
 			pageHeader = (XLogPageHeader) state->readBuf;
+
+			/*
+			 * If we were expecting a continuation record and got an "aborted
+			 * partial" flag, that means the continuation record was lost.
+			 * Ignore the record we were reading, since we now know it's broken
+			 * and lost forever, and restart the read by assuming the address
+			 * to read is the location where we found this flag.
+			 */
+			if (pageHeader->xlp_info & XLP_FIRST_IS_ABORTED_PARTIAL)
+			{
+				ResetDecoder(state);
+				RecPtr = targetPagePtr;
+				goto restart;
+			}
+
+			/* Check that the continuation on next page looks valid */
 			if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
 			{
 				report_invalid_record(state,
@@ -551,6 +571,21 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 		return NULL;
 
 err:
+	if (assembled)
+	{
+		/*
+		 * We get here when a record that spans multiple pages needs to be
+		 * assembled, but something went wrong -- perhaps a contrecord piece
+		 * was lost.  We deal with this by setting abortedContrecordPtr to the
+		 * location of the piece we failed to read, or the start of the page
+		 * we read where validation failed.  If caller is WAL replay, it will
+		 * know that recovery ended and that this is where to start writing
+		 * future WAL marking the next piece with XLP_FIRST_IS_ABORTED_PARTIAL,
+		 * which will in turn signal downstream WAL consumers that the broken
+		 * WAL record here is to be ignored.
+		 */
+		state->abortedContrecordPtr = targetPagePtr;
+	}
 
 	/*
 	 * Invalidate the read state. We might read from a different source after
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 3b5eceff65..9bc72b4c95 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -76,8 +76,20 @@ typedef XLogLongPageHeaderData *XLogLongPageHeader;
 #define XLP_LONG_HEADER				0x0002
 /* This flag indicates backup blocks starting in this page are optional */
 #define XLP_BKP_REMOVABLE			0x0004
+/*
+ * This flag marks a record that replaces a missing contrecord.
+ * When on WAL replay we expect a continuation record at the start of
+ * a page that is not there, recovery ends but the checkpoint record
+ * that follows is marked with this flag, which indicates WAL readers
+ * that the incomplete record is to be skipped, and that WAL reading
+ * is to be resumed here.  This is useful for downstream consumers of
+ * WAL which have already received (the first half of) the original
+ * broken WAL record, such as via archive_command or physical streaming
+ * replication, which we cannot "rewind".
+ */
+#define XLP_FIRST_IS_ABORTED_PARTIAL 0x0008
 /* All defined flag bits in xlp_info (used for validity checking of header) */
-#define XLP_ALL_FLAGS				0x0007
+#define XLP_ALL_FLAGS				0x000F
 
 #define XLogPageHeaderSize(hdr)		\
 	(((hdr)->xlp_info & XLP_LONG_HEADER) ? SizeOfXLogLongPHD : SizeOfXLogShortPHD)
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 21d200d3df..96e5eab1c9 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -175,6 +175,9 @@ struct XLogReaderState
 	XLogRecPtr	ReadRecPtr;		/* start of last record read */
 	XLogRecPtr	EndRecPtr;		/* end+1 of last record read */
 
+	/* end+1 of incomplete record at end of WAL */
+	XLogRecPtr	abortedContrecordPtr;
+
 
 	/* ----------------------------------------
 	 * Decoded representation of current record
-- 
2.30.2

