From 63160d69f734bdb2ed45aa839ec2903f12194d50 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Tue, 12 May 2020 19:39:57 -0400
Subject: [PATCH] Adjust walsender usage of xlogreader, simplify APIs

Per comments from Kyotaro Horiguchi
---
 src/backend/access/transam/xlogreader.c | 35 +++++----
 src/backend/access/transam/xlogutils.c  | 16 ++--
 src/backend/replication/walsender.c     | 98 ++++++++++++-------------
 src/bin/pg_waldump/pg_waldump.c         | 16 ++--
 src/include/access/xlogreader.h         | 20 ++---
 src/include/access/xlogutils.h          |  3 +-
 6 files changed, 83 insertions(+), 105 deletions(-)

diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 7cee8b92c9..aae3fee24c 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1044,14 +1044,12 @@ err:
 
 /*
  * Helper function to ease writing of XLogRoutine->page_read callbacks.
- * If this function is used, caller must supply an open_segment callback in
+ * If this function is used, caller must supply a segment_open callback in
  * 'state', as that is used here.
  *
  * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
  * fetched from timeline 'tli'.
  *
- * 'seg/segcxt' identify the last segment used.
- *
  * Returns true if succeeded, false if an error occurs, in which case
  * 'errinfo' receives error details.
  *
@@ -1061,7 +1059,6 @@ err:
 bool
 WALRead(XLogReaderState *state,
 		char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
-		WALOpenSegment *seg, WALSegmentContext *segcxt,
 		WALReadError *errinfo)
 {
 	char	   *p;
@@ -1078,34 +1075,36 @@ WALRead(XLogReaderState *state,
 		int			segbytes;
 		int			readbytes;
 
-		startoff = XLogSegmentOffset(recptr, segcxt->ws_segsize);
+		startoff = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
 
 		/*
 		 * If the data we want is not in a segment we have open, close what we
 		 * have (if anything) and open the next one, using the caller's
 		 * provided openSegment callback.
 		 */
-		if (seg->ws_file < 0 ||
-			!XLByteInSeg(recptr, seg->ws_segno, segcxt->ws_segsize) ||
-			tli != seg->ws_tli)
+		if (state->seg.ws_file < 0 ||
+			!XLByteInSeg(recptr, state->seg.ws_segno, state->segcxt.ws_segsize) ||
+			tli != state->seg.ws_tli)
 		{
 			XLogSegNo	nextSegNo;
 
-			if (seg->ws_file >= 0)
+			if (state->seg.ws_file >= 0)
 				state->routine.segment_close(state);
 
-			XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize);
-			seg->ws_file = state->routine.segment_open(state, nextSegNo,
-													   segcxt, &tli);
+			XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize);
+			state->routine.segment_open(state, nextSegNo, &tli);
+
+			/* This shouldn't happen -- indicates a bug in segment_open */
+			Assert(state->seg.ws_file >= 0);
 
 			/* Update the current segment info. */
-			seg->ws_tli = tli;
-			seg->ws_segno = nextSegNo;
+			state->seg.ws_tli = tli;
+			state->seg.ws_segno = nextSegNo;
 		}
 
 		/* How many bytes are within this segment? */
-		if (nbytes > (segcxt->ws_segsize - startoff))
-			segbytes = segcxt->ws_segsize - startoff;
+		if (nbytes > (state->segcxt.ws_segsize - startoff))
+			segbytes = state->segcxt.ws_segsize - startoff;
 		else
 			segbytes = nbytes;
 
@@ -1115,7 +1114,7 @@ WALRead(XLogReaderState *state,
 
 		/* Reset errno first; eases reporting non-errno-affecting errors */
 		errno = 0;
-		readbytes = pg_pread(seg->ws_file, p, segbytes, (off_t) startoff);
+		readbytes = pg_pread(state->seg.ws_file, p, segbytes, (off_t) startoff);
 
 #ifndef FRONTEND
 		pgstat_report_wait_end();
@@ -1127,7 +1126,7 @@ WALRead(XLogReaderState *state,
 			errinfo->wre_req = segbytes;
 			errinfo->wre_read = readbytes;
 			errinfo->wre_off = startoff;
-			errinfo->wre_seg = *seg;
+			errinfo->wre_seg = state->seg;
 			return false;
 		}
 
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 0bb69447c2..322b0e8ff5 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -784,18 +784,17 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
 }
 
 /* XLogReaderRoutine->segment_open callback for local pg_wal files */
-int
+void
 wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo,
-				 WALSegmentContext *segcxt, TimeLineID *tli_p)
+				 TimeLineID *tli_p)
 {
 	TimeLineID	tli = *tli_p;
 	char		path[MAXPGPATH];
-	int			fd;
 
-	XLogFilePath(path, tli, nextSegNo, segcxt->ws_segsize);
-	fd = BasicOpenFile(path, O_RDONLY | PG_BINARY);
-	if (fd >= 0)
-		return fd;
+	XLogFilePath(path, tli, nextSegNo, state->segcxt.ws_segsize);
+	state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
+	if (state->seg.ws_file >= 0)
+		return;
 
 	if (errno == ENOENT)
 		ereport(ERROR,
@@ -807,8 +806,6 @@ wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo,
 				(errcode_for_file_access(),
 				 errmsg("could not open file \"%s\": %m",
 						path)));
-
-	return -1;					/* keep compiler quiet */
 }
 
 /* stock XLogReaderRoutine->segment_close callback */
@@ -947,7 +944,6 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
 	 * zero-padded up to the page boundary if it's incomplete.
 	 */
 	if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli,
-				 &state->seg, &state->segcxt,
 				 &errinfo))
 		WALReadRaiseError(&errinfo);
 
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 9f14b99231..3367aa98f8 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -129,8 +129,14 @@ bool		log_replication_commands = false;
  */
 bool		wake_wal_senders = false;
 
-static WALOpenSegment *sendSeg = NULL;
-static WALSegmentContext *sendCxt = NULL;
+/*
+ * Physical walsender does not use xlogreader to read WAL, but it does use a
+ * fake one to keep state.  Logical walsender uses a proper xlogreader.  Both
+ * keep the 'xlogreader' pointer to the right one, for the sake of common
+ * routines.
+ */
+static XLogReaderState fake_xlogreader;
+static XLogReaderState *xlogreader;
 
 /*
  * These variables keep track of the state of the timeline we're currently
@@ -248,8 +254,8 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
 static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
 static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 
-static int	WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
-							  WALSegmentContext *segcxt, TimeLineID *tli_p);
+static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
+							  TimeLineID *tli_p);
 static void UpdateSpillStats(LogicalDecodingContext *ctx);
 
 
@@ -280,12 +286,19 @@ InitWalSender(void)
 	/* Initialize empty timestamp buffer for lag tracking. */
 	lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker));
 
-	/* Make sure we can remember the current read position in XLOG. */
-	sendSeg = (WALOpenSegment *)
-		MemoryContextAlloc(TopMemoryContext, sizeof(WALOpenSegment));
-	sendCxt = (WALSegmentContext *)
-		MemoryContextAlloc(TopMemoryContext, sizeof(WALSegmentContext));
-	WALOpenSegmentInit(sendSeg, sendCxt, wal_segment_size, NULL);
+	/*
+	 * Prepare physical walsender's fake xlogreader struct.  Logical walsender
+	 * does this later.
+	 */
+	if (!am_db_walsender)
+	{
+		xlogreader = &fake_xlogreader;
+		xlogreader->routine =
+			*XL_ROUTINE(.segment_open = WalSndSegmentOpen,
+						.segment_close = wal_segment_close);
+		WALOpenSegmentInit(&xlogreader->seg, &xlogreader->segcxt,
+						   wal_segment_size, NULL);
+	}
 }
 
 /*
@@ -302,11 +315,8 @@ WalSndErrorCleanup(void)
 	ConditionVariableCancelSleep();
 	pgstat_report_wait_end();
 
-	if (sendSeg->ws_file >= 0)
-	{
-		close(sendSeg->ws_file);
-		sendSeg->ws_file = -1;
-	}
+	if (xlogreader->seg.ws_file >= 0)
+		wal_segment_close(xlogreader);
 
 	if (MyReplicationSlot != NULL)
 		ReplicationSlotRelease();
@@ -837,11 +847,9 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 				 cur_page,
 				 targetPagePtr,
 				 XLOG_BLCKSZ,
-				 sendSeg->ws_tli,	/* Pass the current TLI because only
+				 state->seg.ws_tli, /* Pass the current TLI because only
 									 * WalSndSegmentOpen controls whether new
 									 * TLI is needed. */
-				 sendSeg,
-				 sendCxt,
 				 &errinfo))
 		WALReadRaiseError(&errinfo);
 
@@ -852,8 +860,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
 	 * read() succeeds in that case, but the data we tried to read might
 	 * already have been overwritten with new WAL records.
 	 */
-	XLByteToSeg(targetPagePtr, segno, sendCxt->ws_segsize);
-	CheckXLogRemoved(segno, sendSeg->ws_tli);
+	XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
+	CheckXLogRemoved(segno, state->seg.ws_tli);
 
 	return count;
 }
@@ -1176,6 +1184,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 										 .segment_close = wal_segment_close),
 							  WalSndPrepareWrite, WalSndWriteData,
 							  WalSndUpdateProgress);
+	xlogreader = logical_decoding_ctx->reader;
 
 	WalSndSetState(WALSNDSTATE_CATCHUP);
 
@@ -2447,13 +2456,11 @@ WalSndKill(int code, Datum arg)
 }
 
 /* XLogReaderRoutine->segment_open callback */
-static int
-WalSndSegmentOpen(XLogReaderState *state,
-				  XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+static void
+WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
 				  TimeLineID *tli_p)
 {
 	char		path[MAXPGPATH];
-	int			fd;
 
 	/*-------
 	 * When reading from a historic timeline, and there is a timeline switch
@@ -2484,15 +2491,15 @@ WalSndSegmentOpen(XLogReaderState *state,
 	{
 		XLogSegNo	endSegNo;
 
-		XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize);
-		if (sendSeg->ws_segno == endSegNo)
+		XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize);
+		if (state->seg.ws_segno == endSegNo)
 			*tli_p = sendTimeLineNextTLI;
 	}
 
-	XLogFilePath(path, *tli_p, nextSegNo, segcxt->ws_segsize);
-	fd = BasicOpenFile(path, O_RDONLY | PG_BINARY);
-	if (fd >= 0)
-		return fd;
+	XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
+	state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
+	if (state->seg.ws_file >= 0)
+		return;
 
 	/*
 	 * If the file is not found, assume it's because the standby asked for a
@@ -2515,7 +2522,6 @@ WalSndSegmentOpen(XLogReaderState *state,
 				(errcode_for_file_access(),
 				 errmsg("could not open file \"%s\": %m",
 						path)));
-	return -1;					/* keep compiler quiet */
 }
 
 /*
@@ -2537,12 +2543,6 @@ XLogSendPhysical(void)
 	Size		nbytes;
 	XLogSegNo	segno;
 	WALReadError errinfo;
-	static XLogReaderState fake_xlogreader =
-	{
-		/* Fake xlogreader state for WALRead */
-		.routine.segment_open = WalSndSegmentOpen,
-		.routine.segment_close = wal_segment_close
-	};
 
 	/* If requested switch the WAL sender to the stopping state. */
 	if (got_STOPPING)
@@ -2685,9 +2685,8 @@ XLogSendPhysical(void)
 	if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
 	{
 		/* close the current file. */
-		if (sendSeg->ws_file >= 0)
-			close(sendSeg->ws_file);
-		sendSeg->ws_file = -1;
+		if (xlogreader->seg.ws_file >= 0)
+			wal_segment_close(xlogreader);
 
 		/* Send CopyDone */
 		pq_putmessage_noblock('c', NULL, 0);
@@ -2760,21 +2759,19 @@ XLogSendPhysical(void)
 	enlargeStringInfo(&output_message, nbytes);
 
 retry:
-	if (!WALRead(&fake_xlogreader,
+	if (!WALRead(xlogreader,
 				 &output_message.data[output_message.len],
 				 startptr,
 				 nbytes,
-				 sendSeg->ws_tli,	/* Pass the current TLI because only
-									 * WalSndSegmentOpen controls whether new
-									 * TLI is needed. */
-				 sendSeg,
-				 sendCxt,
+				 xlogreader->seg.ws_tli,	/* Pass the current TLI because
+											 * only WalSndSegmentOpen controls
+											 * whether new TLI is needed. */
 				 &errinfo))
 		WALReadRaiseError(&errinfo);
 
 	/* See logical_read_xlog_page(). */
-	XLByteToSeg(startptr, segno, sendCxt->ws_segsize);
-	CheckXLogRemoved(segno, sendSeg->ws_tli);
+	XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
+	CheckXLogRemoved(segno, xlogreader->seg.ws_tli);
 
 	/*
 	 * During recovery, the currently-open WAL file might be replaced with the
@@ -2792,10 +2789,9 @@ retry:
 		walsnd->needreload = false;
 		SpinLockRelease(&walsnd->mutex);
 
-		if (reload && sendSeg->ws_file >= 0)
+		if (reload && xlogreader->seg.ws_file >= 0)
 		{
-			close(sendSeg->ws_file);
-			sendSeg->ws_file = -1;
+			wal_segment_close(xlogreader);
 
 			goto retry;
 		}
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index e29f65500f..d1a0678935 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -280,17 +280,15 @@ identify_target_directory(char *directory, char *fname)
 }
 
 /* pg_waldump's XLogReaderRoutine->segment_open callback */
-static int
-WALDumpOpenSegment(XLogReaderState *state,
-				   XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+static void
+WALDumpOpenSegment(XLogReaderState *state, XLogSegNo nextSegNo,
 				   TimeLineID *tli_p)
 {
 	TimeLineID	tli = *tli_p;
 	char		fname[MAXPGPATH];
-	int			fd;
 	int			tries;
 
-	XLogFileName(fname, tli, nextSegNo, segcxt->ws_segsize);
+	XLogFileName(fname, tli, nextSegNo, state->segcxt.ws_segsize);
 
 	/*
 	 * In follow mode there is a short period of time after the server has
@@ -300,9 +298,9 @@ WALDumpOpenSegment(XLogReaderState *state,
 	 */
 	for (tries = 0; tries < 10; tries++)
 	{
-		fd = open_file_in_directory(segcxt->ws_dir, fname);
-		if (fd >= 0)
-			return fd;
+		state->seg.ws_file = open_file_in_directory(state->segcxt.ws_dir, fname);
+		if (state->seg.ws_file >= 0)
+			return;
 		if (errno == ENOENT)
 		{
 			int			save_errno = errno;
@@ -318,7 +316,6 @@ WALDumpOpenSegment(XLogReaderState *state,
 	}
 
 	fatal_error("could not find file \"%s\": %m", fname);
-	return -1;					/* keep compiler quiet */
 }
 
 /*
@@ -356,7 +353,6 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
 	}
 
 	if (!WALRead(state, readBuff, targetPagePtr, count, private->timeline,
-				 &state->seg, &state->segcxt,
 				 &errinfo))
 	{
 		WALOpenSegment *seg = &errinfo.wre_seg;
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 80cf62acb7..c21b0ba972 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -63,10 +63,9 @@ typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader,
 							   int reqLen,
 							   XLogRecPtr targetRecPtr,
 							   char *readBuf);
-typedef int (*WALSegmentOpenCB) (XLogReaderState *xlogreader,
-								 XLogSegNo nextSegNo,
-								 WALSegmentContext *segcxt,
-								 TimeLineID *tli_p);
+typedef void (*WALSegmentOpenCB) (XLogReaderState *xlogreader,
+								  XLogSegNo nextSegNo,
+								  TimeLineID *tli_p);
 typedef void (*WALSegmentCloseCB) (XLogReaderState *xlogreader);
 
 typedef struct XLogReaderRoutine
@@ -94,21 +93,16 @@ typedef struct XLogReaderRoutine
 	XLogPageReadCB page_read;
 
 	/*
-	 * Callback to open the specified WAL segment for reading.  The file
-	 * descriptor of the opened segment shall be returned.  In case of
+	 * Callback to open the specified WAL segment for reading.  ->seg.ws_file
+	 * shall be set to the file descriptor of the opened segment.  In case of
 	 * failure, an error shall be raised by the callback and it shall not
 	 * return.
 	 *
 	 * "nextSegNo" is the number of the segment to be opened.
 	 *
-	 * "segcxt" is additional information about the segment.
-	 *
 	 * "tli_p" is an input/output argument. WALRead() uses it to pass the
 	 * timeline in which the new segment should be found, but the callback can
 	 * use it to return the TLI that it actually opened.
-	 *
-	 * BasicOpenFile() is the preferred way to open the segment file in
-	 * backend code, whereas open(2) should be used in frontend.
 	 */
 	WALSegmentOpenCB segment_open;
 
@@ -301,9 +295,7 @@ typedef struct WALReadError
 
 extern bool WALRead(XLogReaderState *state,
 					char *buf, XLogRecPtr startptr, Size count,
-					TimeLineID tli, WALOpenSegment *seg,
-					WALSegmentContext *segcxt,
-					WALReadError *errinfo);
+					TimeLineID tli, WALReadError *errinfo);
 
 /* Functions for decoding an XLogRecord */
 
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 68ce815476..e59b6cf3a9 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -50,9 +50,8 @@ extern void FreeFakeRelcacheEntry(Relation fakerel);
 extern int	read_local_xlog_page(XLogReaderState *state,
 								 XLogRecPtr targetPagePtr, int reqLen,
 								 XLogRecPtr targetRecPtr, char *cur_page);
-extern int	wal_segment_open(XLogReaderState *state,
+extern void wal_segment_open(XLogReaderState *state,
 							 XLogSegNo nextSegNo,
-							 WALSegmentContext *segcxt,
 							 TimeLineID *tli_p);
 extern void wal_segment_close(XLogReaderState *state);
 
-- 
2.20.1

