From 380df16ade93dd0f3dc6cda8e9786607c228732a Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Fri, 26 Oct 2018 10:07:05 +0900
Subject: [PATCH v21 2/2] Check removal of in-reading segment file.

Checkpoints can recycle a segment file while it is being read by
ReadRecord and that leads to an apparently odd error message during
logical decoding. This patch explicitly checks that then error out
immediately.  Reading a recycled file is safe. Inconsistency caused by
overwrites as a new segment are caught by page/record validation. So
this is only for keeping consistency with the wal_status shown in
pg_replication_slots.
---
 src/backend/access/transam/xlogreader.c | 20 +++++++++++++++++++-
 1 file changed, 19 insertions(+), 1 deletion(-)

diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index f3fea5132f..90a9649f61 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -270,7 +270,9 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 	uint32		pageHeaderSize;
 	bool		gotheader;
 	int			readOff;
-
+#ifndef FRONTEND
+	XLogSegNo	targetSegNo;
+#endif
 	/*
 	 * randAccess indicates whether to verify the previous-record pointer of
 	 * the record we're reading.  We only do this if we're reading
@@ -314,6 +316,22 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
 	targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
 	targetRecOff = RecPtr % XLOG_BLCKSZ;
 
+#ifndef FRONTEND
+	/*
+	 * Although It's safe that the current segment is recycled as a new
+	 * segment since we check the page/record header at reading, it leads to
+	 * an apparently strange error message when logical replication, which can
+	 * be prevented by explicitly checking if the current segment is removed.
+	 */
+	XLByteToSeg(targetPagePtr, targetSegNo, state->segcxt.ws_segsize);
+	if (targetSegNo <= XLogGetLastRemovedSegno())
+	{
+		report_invalid_record(state,
+							  "WAL segment for LSN %X/%X has been removed",
+							  (uint32)(RecPtr >> 32), (uint32) RecPtr);
+		goto err;
+	}
+#endif
 	/*
 	 * Read the page containing the record into state->readBuf. Request enough
 	 * byte to cover the whole record header, or at least the part of it that
-- 
2.20.1

