From 0242daf5622cf649ad88975ce991c851388f8c99 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathandbossart@gmail.com>
Date: Wed, 5 Oct 2022 10:31:35 -0700
Subject: [PATCH v4 1/2] Move WAL receivers' non-shared state to a new struct.

This is preparatory work for a follow-up change that will revamp
the wakeup mechanism for periodic tasks that WAL receivers must
perform.
---
 src/backend/replication/walreceiver.c | 90 ++++++++++++++-------------
 1 file changed, 48 insertions(+), 42 deletions(-)

diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 6cbb67c92a..89985c54cf 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -116,6 +116,14 @@ static struct
 	XLogRecPtr	Flush;			/* last byte + 1 flushed in the standby */
 }			LogstreamResult;
 
+/*
+ * A struct to keep track of non-shared state.
+ */
+typedef struct WalRcvInfo
+{
+	TimeLineID	startpointTLI;
+} WalRcvInfo;
+
 static StringInfoData reply_message;
 static StringInfoData incoming_message;
 
@@ -123,12 +131,12 @@ static StringInfoData incoming_message;
 static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
 static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
 static void WalRcvDie(int code, Datum arg);
-static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len,
-								 TimeLineID tli);
-static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr,
-							TimeLineID tli);
-static void XLogWalRcvFlush(bool dying, TimeLineID tli);
-static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli);
+static void XLogWalRcvProcessMsg(WalRcvInfo *state, unsigned char type,
+								 char *buf, Size len);
+static void XLogWalRcvWrite(WalRcvInfo *state, char *buf, Size nbytes,
+							XLogRecPtr recptr);
+static void XLogWalRcvFlush(WalRcvInfo *state, bool dying);
+static void XLogWalRcvClose(WalRcvInfo *state, XLogRecPtr recptr);
 static void XLogWalRcvSendReply(bool force, bool requestReply);
 static void XLogWalRcvSendHSFeedback(bool immed);
 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
@@ -175,7 +183,6 @@ WalReceiverMain(void)
 	char		slotname[NAMEDATALEN];
 	bool		is_temp_slot;
 	XLogRecPtr	startpoint;
-	TimeLineID	startpointTLI;
 	TimeLineID	primaryTLI;
 	bool		first_stream;
 	WalRcvData *walrcv = WalRcv;
@@ -185,6 +192,7 @@ WalReceiverMain(void)
 	char	   *err;
 	char	   *sender_host = NULL;
 	int			sender_port = 0;
+	WalRcvInfo	state = {0};
 
 	/*
 	 * WalRcv should be set up already (if we are a backend, we inherit this
@@ -238,7 +246,7 @@ WalReceiverMain(void)
 	strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
 	is_temp_slot = walrcv->is_temp_slot;
 	startpoint = walrcv->receiveStart;
-	startpointTLI = walrcv->receiveStartTLI;
+	state.startpointTLI = walrcv->receiveStartTLI;
 
 	/*
 	 * At most one of is_temp_slot and slotname can be set; otherwise,
@@ -258,7 +266,7 @@ WalReceiverMain(void)
 	pg_atomic_write_u64(&WalRcv->writtenUpto, 0);
 
 	/* Arrange to clean up at walreceiver exit */
-	on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI));
+	on_shmem_exit(WalRcvDie, PointerGetDatum(&state));
 
 	/* Properly accept or ignore signals the postmaster might send us */
 	pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
@@ -345,11 +353,11 @@ WalReceiverMain(void)
 		 * Confirm that the current timeline of the primary is the same or
 		 * ahead of ours.
 		 */
-		if (primaryTLI < startpointTLI)
+		if (primaryTLI < state.startpointTLI)
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 					 errmsg("highest timeline %u of the primary is behind recovery timeline %u",
-							primaryTLI, startpointTLI)));
+							primaryTLI, state.startpointTLI)));
 
 		/*
 		 * Get any missing history files. We do this always, even when we're
@@ -361,7 +369,7 @@ WalReceiverMain(void)
 		 * but let's avoid the confusion of timeline id collisions where we
 		 * can.
 		 */
-		WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
+		WalRcvFetchTimeLineHistoryFiles(state.startpointTLI, primaryTLI);
 
 		/*
 		 * Create temporary replication slot if requested, and update slot
@@ -396,17 +404,17 @@ WalReceiverMain(void)
 		options.logical = false;
 		options.startpoint = startpoint;
 		options.slotname = slotname[0] != '\0' ? slotname : NULL;
-		options.proto.physical.startpointTLI = startpointTLI;
+		options.proto.physical.startpointTLI = state.startpointTLI;
 		if (walrcv_startstreaming(wrconn, &options))
 		{
 			if (first_stream)
 				ereport(LOG,
 						(errmsg("started streaming WAL from primary at %X/%X on timeline %u",
-								LSN_FORMAT_ARGS(startpoint), startpointTLI)));
+								LSN_FORMAT_ARGS(startpoint), state.startpointTLI)));
 			else
 				ereport(LOG,
 						(errmsg("restarted WAL streaming at %X/%X on timeline %u",
-								LSN_FORMAT_ARGS(startpoint), startpointTLI)));
+								LSN_FORMAT_ARGS(startpoint), state.startpointTLI)));
 			first_stream = false;
 
 			/* Initialize LogstreamResult and buffers for processing messages */
@@ -464,8 +472,8 @@ WalReceiverMain(void)
 							 */
 							last_recv_timestamp = GetCurrentTimestamp();
 							ping_sent = false;
-							XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
-												 startpointTLI);
+							XLogWalRcvProcessMsg(&state, buf[0], &buf[1],
+												 len - 1);
 						}
 						else if (len == 0)
 							break;
@@ -474,7 +482,7 @@ WalReceiverMain(void)
 							ereport(LOG,
 									(errmsg("replication terminated by primary server"),
 									 errdetail("End of WAL reached on timeline %u at %X/%X.",
-											   startpointTLI,
+											   state.startpointTLI,
 											   LSN_FORMAT_ARGS(LogstreamResult.Write))));
 							endofwal = true;
 							break;
@@ -490,7 +498,7 @@ WalReceiverMain(void)
 					 * let the startup process and primary server know about
 					 * them.
 					 */
-					XLogWalRcvFlush(false, startpointTLI);
+					XLogWalRcvFlush(&state, false);
 				}
 
 				/* Check if we need to exit the streaming loop. */
@@ -596,12 +604,12 @@ WalReceiverMain(void)
 			 * know about when we began streaming, fetch its timeline history
 			 * file now.
 			 */
-			WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
+			WalRcvFetchTimeLineHistoryFiles(state.startpointTLI, primaryTLI);
 		}
 		else
 			ereport(LOG,
 					(errmsg("primary server contains no more WAL on requested timeline %u",
-							startpointTLI)));
+							state.startpointTLI)));
 
 		/*
 		 * End of WAL reached on the requested timeline. Close the last
@@ -611,7 +619,7 @@ WalReceiverMain(void)
 		{
 			char		xlogfname[MAXFNAMELEN];
 
-			XLogWalRcvFlush(false, startpointTLI);
+			XLogWalRcvFlush(&state, false);
 			XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
 			if (close(recvFile) != 0)
 				ereport(PANIC,
@@ -631,7 +639,7 @@ WalReceiverMain(void)
 		recvFile = -1;
 
 		elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
-		WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
+		WalRcvWaitForStartPosition(&startpoint, &state.startpointTLI);
 	}
 	/* not reached */
 }
@@ -779,12 +787,10 @@ static void
 WalRcvDie(int code, Datum arg)
 {
 	WalRcvData *walrcv = WalRcv;
-	TimeLineID *startpointTLI_p = (TimeLineID *) DatumGetPointer(arg);
-
-	Assert(*startpointTLI_p != 0);
+	WalRcvInfo *state = (WalRcvInfo *) DatumGetPointer(arg);
 
 	/* Ensure that all WAL records received are flushed to disk */
-	XLogWalRcvFlush(true, *startpointTLI_p);
+	XLogWalRcvFlush(state, true);
 
 	/* Mark ourselves inactive in shared memory */
 	SpinLockAcquire(&walrcv->mutex);
@@ -814,7 +820,7 @@ WalRcvDie(int code, Datum arg)
  * Accept the message from XLOG stream, and process it.
  */
 static void
-XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
+XLogWalRcvProcessMsg(WalRcvInfo *state, unsigned char type, char *buf, Size len)
 {
 	int			hdrlen;
 	XLogRecPtr	dataStart;
@@ -844,7 +850,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
 
 				buf += hdrlen;
 				len -= hdrlen;
-				XLogWalRcvWrite(buf, len, dataStart, tli);
+				XLogWalRcvWrite(state, buf, len, dataStart);
 				break;
 			}
 		case 'k':				/* Keepalive */
@@ -881,12 +887,12 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
  * Write XLOG data to disk.
  */
 static void
-XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
+XLogWalRcvWrite(WalRcvInfo *state, char *buf, Size nbytes, XLogRecPtr recptr)
 {
 	int			startoff;
 	int			byteswritten;
 
-	Assert(tli != 0);
+	Assert(state->startpointTLI != 0);
 
 	while (nbytes > 0)
 	{
@@ -894,14 +900,14 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
 
 		/* Close the current segment if it's completed */
 		if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
-			XLogWalRcvClose(recptr, tli);
+			XLogWalRcvClose(state, recptr);
 
 		if (recvFile < 0)
 		{
 			/* Create/use new log file */
 			XLByteToSeg(recptr, recvSegNo, wal_segment_size);
-			recvFile = XLogFileInit(recvSegNo, tli);
-			recvFileTLI = tli;
+			recvFile = XLogFileInit(recvSegNo, state->startpointTLI);
+			recvFileTLI = state->startpointTLI;
 		}
 
 		/* Calculate the start offset of the received logs */
@@ -954,7 +960,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
 	 * segment is received and written.
 	 */
 	if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
-		XLogWalRcvClose(recptr, tli);
+		XLogWalRcvClose(state, recptr);
 }
 
 /*
@@ -964,15 +970,15 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
  * an error, so we skip sending a reply in that case.
  */
 static void
-XLogWalRcvFlush(bool dying, TimeLineID tli)
+XLogWalRcvFlush(WalRcvInfo *state, bool dying)
 {
-	Assert(tli != 0);
+	Assert(state->startpointTLI != 0);
 
 	if (LogstreamResult.Flush < LogstreamResult.Write)
 	{
 		WalRcvData *walrcv = WalRcv;
 
-		issue_xlog_fsync(recvFile, recvSegNo, tli);
+		issue_xlog_fsync(recvFile, recvSegNo, state->startpointTLI);
 
 		LogstreamResult.Flush = LogstreamResult.Write;
 
@@ -982,7 +988,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
 		{
 			walrcv->latestChunkStart = walrcv->flushedUpto;
 			walrcv->flushedUpto = LogstreamResult.Flush;
-			walrcv->receivedTLI = tli;
+			walrcv->receivedTLI = state->startpointTLI;
 		}
 		SpinLockRelease(&walrcv->mutex);
 
@@ -1019,18 +1025,18 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
  * Create an archive notification file since the segment is known completed.
  */
 static void
-XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
+XLogWalRcvClose(WalRcvInfo *state, XLogRecPtr recptr)
 {
 	char		xlogfname[MAXFNAMELEN];
 
 	Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size));
-	Assert(tli != 0);
+	Assert(state->startpointTLI != 0);
 
 	/*
 	 * fsync() and close current file before we switch to next one. We would
 	 * otherwise have to reopen this file to fsync it later
 	 */
-	XLogWalRcvFlush(false, tli);
+	XLogWalRcvFlush(state, false);
 
 	XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
 
-- 
2.25.1

