From 10414785dc5c7661da2f39fe3db78b335478dfe0 Mon Sep 17 00:00:00 2001
From: Imran Zaheer <imran.zhir@gmail.com>
Date: Mon, 22 Jun 2026 12:33:53 +0500
Subject: [PATCH v4 3/4] Pipelined Recovery - Decoupling startup and producer
 states.

Before this patch, most states/variables were statically defined
under xlogrecovery.c as they would only be needed by the startup process.
However, to implement pipelining, the producer (parallel decoder)
should also be aware of these static states. Some of these states stays
constant throughout decoding/recovery, while others may change during
recovery.

To address this, WalPipelineParams contains a set of all local states
that the startup process had before it started the pipeline. These
states are passed to the producer, which updates them accordingly
before entering the decoding loop. This is done using
WalPipeline_ExportRecoveryState() and WalPipeline_ImportRecoveryState().

WalPipeline_ExportRecoveryState --> saves all the local startup proc states
WalPipeline_ImportRecoveryState --> apply those states to the producer context

To manage the states that may keep changing during the recovery, we
added them to the shared memory in XLogRecoveryCtlData. Now whenever
the startup proc updates any of such local states the shared state
is also upated accordingly,  so that the pipeline stay aware of this
update during the decoding.
---
 src/backend/access/transam/xlog.c         |  15 +-
 src/backend/access/transam/xlogrecovery.c | 518 ++++++++++++++++++++--
 src/backend/storage/ipc/standby.c         |   1 +
 src/include/access/xlogpipeline.h         |  46 ++
 src/include/access/xlogrecovery.h         |  85 ++++
 5 files changed, 629 insertions(+), 36 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index a81912b7441..8cdc1119987 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -59,6 +59,7 @@
 #include "access/xlog_internal.h"
 #include "access/xlogarchive.h"
 #include "access/xloginsert.h"
+#include "access/xlogpipeline.h"
 #include "access/xlogreader.h"
 #include "access/xlogrecovery.h"
 #include "access/xlogutils.h"
@@ -6269,6 +6270,12 @@ StartupXLOG(void)
 
 				ProcArrayApplyRecoveryInfo(&running);
 			}
+
+			/*
+			 * Finaly update the shared standby state, so that pipeline worker
+			 * stay consistent with the startup process.
+			 */
+			SetSharedHotStandbyState();
 		}
 
 		/*
@@ -8925,6 +8932,12 @@ xlog_redo(XLogReaderState *record)
 			running.xids = xids;
 
 			ProcArrayApplyRecoveryInfo(&running);
+
+			/*
+			 * Update the shared standby state, so that pipeline worker
+			 * stay consistent with the startup process.
+			 */
+			SetSharedHotStandbyState();
 		}
 
 		/* ControlFile->checkPointCopy always tracks the latest ckpt XID */
@@ -10149,7 +10162,7 @@ GetOldestRestartPoint(XLogRecPtr *oldrecptr, TimeLineID *oldtli)
 void
 XLogShutdownWalRcv(void)
 {
-	Assert(AmStartupProcess() || !IsUnderPostmaster);
+	Assert(AmStartupProcess() || AmWalPipeline() || !IsUnderPostmaster);
 
 	ShutdownWalRcv();
 	ResetInstallXLogFileSegmentActive();
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 9cc28c4a717..ba367ec7b63 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -35,6 +35,7 @@
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "access/xlogarchive.h"
+#include "access/xlogpipeline.h"
 #include "access/xlogprefetcher.h"
 #include "access/xlogreader.h"
 #include "access/xlogrecovery.h"
@@ -100,6 +101,8 @@ int			recovery_min_apply_delay = 0;
 char	   *PrimaryConnInfo = NULL;
 char	   *PrimarySlotName = NULL;
 bool		wal_receiver_create_temp_slot = false;
+bool		wal_pipeline_enabled = false;
+int			wal_pipeline_mq_size_mb = 128;
 
 /*
  * recoveryTargetTimeLineGoal: what the user requested, if any
@@ -206,17 +209,6 @@ typedef struct XLogPageReadPrivate
 /* flag to tell XLogPageRead that we have started replaying */
 static bool InRedo = false;
 
-/*
- * Codes indicating where we got a WAL file from during recovery, or where
- * to attempt to get one.
- */
-typedef enum
-{
-	XLOG_FROM_ANY = 0,			/* request to read WAL from any source */
-	XLOG_FROM_ARCHIVE,			/* restored using restore_command */
-	XLOG_FROM_PG_WAL,			/* existing file in pg_wal */
-	XLOG_FROM_STREAM,			/* streamed from primary */
-} XLogSource;
 
 /* human-readable names for XLogSources, for debugging output */
 static const char *const xlogSourceNames[] = {"any", "archive", "pg_wal", "stream"};
@@ -365,12 +357,6 @@ static void recoveryPausesHere(bool endOfRecovery);
 static bool recoveryApplyDelay(XLogReaderState *record);
 static void ConfirmRecoveryPaused(void);
 
-static XLogRecord *ReadRecord(XLogPrefetcher *xlogprefetcher,
-							  int emode, bool fetching_ckpt,
-							  TimeLineID replayTLI);
-
-static int	XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
-						 int reqLen, XLogRecPtr targetRecPtr, char *readBuf);
 static XLogPageReadResult WaitForWALToBecomeAvailable(XLogRecPtr RecPtr,
 													  bool randAccess,
 													  bool fetching_ckpt,
@@ -392,6 +378,7 @@ static bool HotStandbyActiveInReplay(void);
 
 static void SetCurrentChunkStartTime(TimestampTz xtime);
 static void SetLatestXTime(TimestampTz xtime);
+static void WalPipeline_ExportRecoveryState(WalPipelineParams *params);
 
 /*
  * Register shared memory for WAL recovery
@@ -412,9 +399,27 @@ XLogRecoveryShmemInit(void *arg)
 
 	SpinLockInit(&XLogRecoveryCtl->info_lck);
 	InitSharedLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+	InitSharedLatch(&XLogRecoveryCtl->recoveryApplyDelayLatch);
 	ConditionVariableInit(&XLogRecoveryCtl->recoveryNotPausedCV);
 }
 
+/*
+ * We may not be able to share expectedTLEs list across the sharedmemory.
+ * For now just trigger the startup process (consumer) to
+ * reread the timelinehistory file  whenever pipeline updates the value for
+ * expectedTLEs. So the consumer proc will update expectedTLEs  locally.
+ */
+static void
+PipelineConsumerExpectedTLEsUpdateTLI(TimeLineID recoveryTargetTLI)
+{
+	if (wal_pipeline_enabled)
+	{
+		SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+		XLogRecoveryCtl->expectedTLEsUpdateTLI = recoveryTargetTLI;
+		SpinLockRelease(&XLogRecoveryCtl->info_lck);
+	}
+}
+
 /*
  * A thin wrapper to enable StandbyMode and do other preparatory work as
  * needed.
@@ -429,8 +434,12 @@ EnableStandbyMode(void)
 	 * standby as it will always be in recovery unless promoted. We disable
 	 * startup progress timeout in standby mode to avoid calling
 	 * startup_progress_timeout_handler() unnecessarily.
+	 *
+	 * This function could also be called from the pipeline worker.
+	 * But timeout could only be dsiabled by the startup process.
 	 */
-	disable_startup_progress_timeout();
+	if (!AmWalPipeline())
+		disable_startup_progress_timeout();
 }
 
 /*
@@ -490,7 +499,10 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr,
 	 * recovery, if required.
 	 */
 	if (ArchiveRecoveryRequested)
+	{
+		OwnLatch(&XLogRecoveryCtl->recoveryApplyDelayLatch);
 		OwnLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+	}
 
 	/*
 	 * Set the WAL reading processor now, as it will be needed when reading
@@ -962,6 +974,14 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr,
 		minRecoveryPointTLI = 0;
 	}
 
+	/* update shared state. */
+	if (wal_pipeline_enabled)
+	{
+		SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+		XLogRecoveryCtl->InArchiveRecovery = InArchiveRecovery;
+		SpinLockRelease(&XLogRecoveryCtl->info_lck);
+	}
+
 	/*
 	 * Start recovery assuming that the final record isn't lost.
 	 */
@@ -973,6 +993,12 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr,
 	*haveTblspcMap_ptr = haveTblspcMap;
 }
 
+void DisownRecoveryWakeupLatch(void)
+{
+	if (ArchiveRecoveryRequested)
+		DisownLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+}
+
 /*
  * See if there are any recovery signal files and if so, set state for
  * recovery.
@@ -1402,6 +1428,55 @@ read_tablespace_map(List **tablespaces)
 	return true;
 }
 
+static void
+ResetStatesIfPipelined(void)
+{
+	if (wal_pipeline_enabled)
+	{
+		/*
+		 * Invalidate ptrs pointing the pipeline shared message queue.
+		 *
+		 * We do this because instead of pointing to the decode_buffer (the
+		 * normal i.e. pipeline-off) reader would be pointing to the
+		 * sh_mq (ring buffer) to get the next decoded record. And we cannot
+		 * continue like this for future xlog read i.e. FinishWalRecovery().
+		 *
+		 * See under deserialize_wal_record() for more details.
+		 */
+		xlogreader->record = NULL;
+		xlogreader->decode_queue_head = NULL;
+		xlogreader->decode_queue_tail = NULL;
+
+		/*
+		 * Invalidate contents of internal buffer before read attempt.
+		 *
+		 * This is needed because we were reading from the pipelined reader
+		 * so far and updating the local buffer public states accordingly.
+		 * So better to invalidate the any cached state that was there
+		 * before the pipeline started, and force a reread for the future
+		 * xlog reads i.e. refetching the last record in FinishWalRecovery().
+		 */
+		xlogreader->readLen = 0;
+		xlogreader->seg.ws_segno = 0;
+		xlogreader->segoff = 0;
+
+		/*
+		 * Update the startup proc local state with the pipeline state,
+		 * because pipeline was doing the main decoding and hence have the more
+		 * updated information.
+		 */
+		SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+		InArchiveRecovery = XLogRecoveryCtl->InArchiveRecovery;
+		missingContrecPtr = XLogRecoveryCtl->missingContrecPtr;
+		abortedRecPtr = XLogRecoveryCtl->abortedRecPtr;
+
+		/* only update if found a new timeline */
+		if (recoveryTargetTLI < XLogRecoveryCtl->recoveryTargetTLI)
+			recoveryTargetTLI = XLogRecoveryCtl->recoveryTargetTLI;
+		SpinLockRelease(&XLogRecoveryCtl->info_lck);
+	}
+}
+
 /*
  * Finish WAL recovery.
  *
@@ -1421,6 +1496,8 @@ FinishWalRecovery(void)
 	TimeLineID	lastRecTLI;
 	XLogRecPtr	endOfLog;
 
+	ResetStatesIfPipelined();
+
 	/*
 	 * Kill WAL receiver, if it's still running, before we continue to write
 	 * the startup checkpoint and aborted-contrecord records. It will trump
@@ -1478,6 +1555,7 @@ FinishWalRecovery(void)
 		lastRec = XLogRecoveryCtl->lastReplayedReadRecPtr;
 		lastRecTLI = XLogRecoveryCtl->lastReplayedTLI;
 	}
+
 	XLogPrefetcherBeginRead(xlogprefetcher, lastRec);
 	(void) ReadRecord(xlogprefetcher, PANIC, false, lastRecTLI);
 	endOfLog = xlogreader->EndRecPtr;
@@ -1600,6 +1678,17 @@ ShutdownWalRecovery(void)
 	 * it, but let's do it for the sake of tidiness.
 	 */
 	if (ArchiveRecoveryRequested)
+	{
+		DisownLatch(&XLogRecoveryCtl->recoveryApplyDelayLatch);
+
+		/*
+		 * Only disown the latch if we (startup process) were the owner
+		 * i.e. pipeline disabled.
+		 */
+		if (!wal_pipeline_enabled)
+			DisownLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+	}
+}
 
 /*
  * Get next record for redo.
@@ -1758,6 +1847,7 @@ PerformWalRecovery(void)
 			WalPipelineParams *params = palloc0(sizeof(WalPipelineParams));
 
 			params->ReplayTLI = replayTLI;
+			WalPipeline_ExportRecoveryState(params);
 			WalPipeline_Start(params);
 		}
 
@@ -1953,7 +2043,9 @@ static void
 ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *replayTLI)
 {
 	ErrorContextCallback errcallback;
+	bool 		wakeupWalRcvr = false;
 	bool		switchedTLI = false;
+	bool		pipeline_enabled_standby = false;
 
 	/* Setup error traceback support for ereport() */
 	errcallback.callback = rm_redo_error_callback;
@@ -2054,6 +2146,22 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
 	XLogRecoveryCtl->lastReplayedReadRecPtr = xlogreader->ReadRecPtr;
 	XLogRecoveryCtl->lastReplayedEndRecPtr = xlogreader->EndRecPtr;
 	XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
+
+	/*
+	 * As the pipeline (producer) was running way ahead of the startup proc
+	 * (consumer), see if the producer asked to wakeup the wal_reciever by
+	 * updating the value of `WakeupWalRcvrRecPtr`.
+	 */
+	if (XLogRecoveryCtl->WakeupWalRcvrRecPtr != InvalidXLogRecPtr &&
+		xlogreader->EndRecPtr >= XLogRecoveryCtl->WakeupWalRcvrRecPtr)
+	{
+		Assert(wal_pipeline_enabled);
+
+		XLogRecoveryCtl->WakeupWalRcvrRecPtr = InvalidXLogRecPtr;
+		wakeupWalRcvr = true;
+	}
+
+	pipeline_enabled_standby = XLogRecoveryCtl->stanbyEnabled;
 	SpinLockRelease(&XLogRecoveryCtl->info_lck);
 
 	/* ------
@@ -2085,8 +2193,11 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
 	 * If rm_redo called XLogRequestWalReceiverReply, then we wake up the
 	 * receiver so that it notices the updated lastReplayedEndRecPtr and sends
 	 * a reply to the primary.
+	 *
+	 * Also wakeup in case requested by the pipeline decoder before waiting
+	 * for more wal.
 	 */
-	if (doRequestWalReceiverReply)
+	if (doRequestWalReceiverReply || wakeupWalRcvr)
 	{
 		doRequestWalReceiverReply = false;
 		WalRcvRequestApplyReply();
@@ -2107,6 +2218,16 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
 		/* Reset the prefetcher. */
 		XLogPrefetchReconfigure();
 	}
+
+	/*
+	 * Conusmer should also enable the standby if pipline have.
+	 */
+	if (pipeline_enabled_standby)
+		EnableStandbyMode();
+
+	SpinLockAcquire(&WalPipelineShm->mutex);
+	WalPipelineShm->applied_lsn = xlogreader->EndRecPtr;
+	SpinLockRelease(&WalPipelineShm->mutex);
 }
 
 /*
@@ -2232,12 +2353,27 @@ CheckRecoveryConsistency(void)
 
 	Assert(InArchiveRecovery);
 
-	/*
-	 * assume that we are called in the startup process, and hence don't need
-	 * a lock to read lastReplayedEndRecPtr
-	 */
-	lastReplayedEndRecPtr = XLogRecoveryCtl->lastReplayedEndRecPtr;
-	lastReplayedTLI = XLogRecoveryCtl->lastReplayedTLI;
+
+	if (AmStartupProcess())
+	{
+		/*
+		 * assume that we are called in the startup process, and hence don't need
+		 * a lock to read lastReplayedEndRecPtr
+		 */
+		lastReplayedEndRecPtr = XLogRecoveryCtl->lastReplayedEndRecPtr;
+		lastReplayedTLI = XLogRecoveryCtl->lastReplayedTLI;
+	}
+	else
+	{
+		/*
+		 * We could be in the pipeline worker, so update the shared states.
+		 */
+		SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+		lastReplayedEndRecPtr = XLogRecoveryCtl->lastReplayedEndRecPtr;
+		lastReplayedTLI = XLogRecoveryCtl->lastReplayedTLI;
+		standbyState = XLogRecoveryCtl->standbyState;
+		SpinLockRelease(&XLogRecoveryCtl->info_lck);
+	}
 
 	/*
 	 * Have we reached the point where our base backup was completed?
@@ -2424,12 +2560,28 @@ static void
 checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI, TimeLineID prevTLI,
 					TimeLineID replayTLI)
 {
+
+
 	/* Check that the record agrees on what the current (old) timeline is */
 	if (prevTLI != replayTLI)
 		ereport(PANIC,
 				(errmsg("unexpected previous timeline ID %u (current timeline ID %u) in checkpoint record",
 						prevTLI, replayTLI)));
 
+
+	/* Pipeline may have updated the expectedTLEs */
+	if (wal_pipeline_enabled)
+	{
+		TimeLineID	targetTLI;
+
+		SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+		targetTLI = XLogRecoveryCtl->expectedTLEsUpdateTLI;
+		SpinLockRelease(&XLogRecoveryCtl->info_lck);
+
+		if (targetTLI)
+			expectedTLEs = readTimeLineHistory(targetTLI);
+	}
+
 	/*
 	 * The new timeline better be in the list of timelines we expect to see,
 	 * according to the timeline history. It should also not decrease.
@@ -2971,8 +3123,13 @@ getRecoveryStopReason(void)
 static void
 recoveryPausesHere(bool endOfRecovery)
 {
-	/* Don't pause unless users can connect! */
-	if (!LocalHotStandbyActive)
+	/*
+	 * Don't pause unless users can connect!
+	 *
+	 * As this function could be called from the pipeline process so better
+	 * to rely on the shared state then checking LocalHotStandbyActive.
+	 */
+	if (!HotStandbyActive())
 		return;
 
 	/* Don't pause after standby promotion has been triggered */
@@ -3077,7 +3234,7 @@ recoveryApplyDelay(XLogReaderState *record)
 
 	while (true)
 	{
-		ResetLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+		ResetLatch(&XLogRecoveryCtl->recoveryApplyDelayLatch);
 
 		/* This might change recovery_min_apply_delay. */
 		ProcessStartupProcInterrupts();
@@ -3102,7 +3259,7 @@ recoveryApplyDelay(XLogReaderState *record)
 
 		elog(DEBUG2, "recovery apply delay %ld milliseconds", msecs);
 
-		(void) WaitLatch(&XLogRecoveryCtl->recoveryWakeupLatch,
+		(void) WaitLatch(&XLogRecoveryCtl->recoveryApplyDelayLatch,
 						 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
 						 msecs,
 						 WAIT_EVENT_RECOVERY_APPLY_DELAY);
@@ -3217,6 +3374,17 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
 			{
 				abortedRecPtr = xlogreader->abortedRecPtr;
 				missingContrecPtr = xlogreader->missingContrecPtr;
+
+				/*
+				 * Also update the shared state if necessary
+				 */
+				if (wal_pipeline_enabled)
+				{
+					SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+					XLogRecoveryCtl->abortedRecPtr = abortedRecPtr;
+					XLogRecoveryCtl->missingContrecPtr = missingContrecPtr;
+					SpinLockRelease(&XLogRecoveryCtl->info_lck);
+				}
 			}
 
 			if (readFile >= 0)
@@ -3284,9 +3452,31 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
 			if (!InArchiveRecovery && ArchiveRecoveryRequested &&
 				!fetching_ckpt)
 			{
+				/*
+				 * Wait for the startup process to apply the last sent record
+				 * by the pipeline, otherwise we will fail the consistency
+				 * check as all the records decoded by the pipeline have not
+				 * arrived/consumed by the consumer (statup proc) yet.
+				 */
+				if (wal_pipeline_enabled && AmWalPipeline())
+					WalPipeline_WaitForConsumerCatchup();
+
 				ereport(DEBUG1,
 						(errmsg_internal("reached end of WAL in pg_wal, entering archive recovery")));
 				InArchiveRecovery = true;
+
+				if (wal_pipeline_enabled)
+				{
+					/* also update the shared state */
+					SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+					XLogRecoveryCtl->InArchiveRecovery = InArchiveRecovery;
+
+					/* update startup proc (consumer) about the standbymode */
+					if (StandbyModeRequested)
+						XLogRecoveryCtl->stanbyEnabled = true;
+					SpinLockRelease(&XLogRecoveryCtl->info_lck);
+				}
+
 				if (StandbyModeRequested)
 					EnableStandbyMode();
 
@@ -3783,7 +3973,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 							 LSN_FORMAT_ARGS(RecPtr));
 
 						/* Do background tasks that might benefit us later. */
-						KnownAssignedTransactionIdsIdleMaintenance();
+						if (AmStartupProcess())
+							KnownAssignedTransactionIdsIdleMaintenance();
 
 						(void) WaitLatch(&XLogRecoveryCtl->recoveryWakeupLatch,
 										 WL_LATCH_SET | WL_TIMEOUT |
@@ -3795,6 +3986,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 
 						/* Handle interrupt signals of startup process */
 						ProcessStartupProcInterrupts();
+						if (wal_pipeline_enabled)
+							ProcessPipelineBgwInterrupts();
 					}
 					last_fail_time = now;
 					currentSource = XLOG_FROM_ARCHIVE;
@@ -3820,6 +4013,15 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 				 xlogSourceNames[oldSource], xlogSourceNames[currentSource],
 				 lastSourceFailed ? "failure" : "success");
 
+		if (wal_pipeline_enabled)
+		{
+			/* also update the shared state */
+			SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+			XLogRecoveryCtl->currentSource = currentSource;
+			pendingWalRcvRestart = XLogRecoveryCtl->pendingWalRcvRestart;
+			SpinLockRelease(&XLogRecoveryCtl->info_lck);
+		}
+
 		/*
 		 * We've now handled possible failure. Try to read from the chosen
 		 * source.
@@ -3894,6 +4096,14 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 					}
 					pendingWalRcvRestart = false;
 
+					if (wal_pipeline_enabled)
+					{
+						/* also update the shared state */
+						SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+						XLogRecoveryCtl->pendingWalRcvRestart = false;
+						SpinLockRelease(&XLogRecoveryCtl->info_lck);
+					}
+
 					/*
 					 * Launch walreceiver if needed.
 					 *
@@ -3971,6 +4181,15 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 							if (latestChunkStart <= RecPtr)
 							{
 								XLogReceiptTime = GetCurrentTimestamp();
+
+								if (wal_pipeline_enabled)
+								{
+									/* also update the shared state */
+									SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+									XLogRecoveryCtl->XLogReceiptTime = XLogReceiptTime;
+									SpinLockRelease(&XLogRecoveryCtl->info_lck);
+								}
+
 								SetCurrentChunkStartTime(XLogReceiptTime);
 							}
 						}
@@ -3999,7 +4218,12 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 						if (readFile < 0)
 						{
 							if (!expectedTLEs)
+							{
 								expectedTLEs = readTimeLineHistory(recoveryTargetTLI);
+								PipelineConsumerExpectedTLEsUpdateTLI(recoveryTargetTLI);
+							}
+
+
 							readFile = XLogFileRead(readSegNo, receiveTLI,
 													XLOG_FROM_STREAM, false);
 							Assert(readFile >= 0);
@@ -4009,6 +4233,13 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 							/* just make sure source info is correct... */
 							readSource = XLOG_FROM_STREAM;
 							XLogReceiptSource = XLOG_FROM_STREAM;
+							if (wal_pipeline_enabled)
+							{
+								/* also update the shared state */
+								SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+								XLogRecoveryCtl->XLogReceiptSource = XLogReceiptSource;
+								SpinLockRelease(&XLogRecoveryCtl->info_lck);
+							}
 							return XLREAD_SUCCESS;
 						}
 						break;
@@ -4046,15 +4277,40 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 					 */
 					if (!streaming_reply_sent)
 					{
-						WalRcvRequestApplyReply();
-						streaming_reply_sent = true;
+						if (wal_pipeline_enabled && AmWalPipeline())
+						{
+							/*
+							 * In case of pipeline enabled, we cannot just call
+							 * WalRcvRequestApplyReply() directly as the
+							 * consumer (startup proc) running behined the
+							 * pipeline producer and haven't actually replayed
+							 * all the wal received from the wal_receiver yet.
+							 *
+							 * So in order to make wakeup consistent, note the
+							 * lsn and consumer will only wakeup walrcvr when
+							 * the lsn is replayed.
+							 */
+							SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+							XLogRecoveryCtl->WakeupWalRcvrRecPtr = flushedUpto;
+							SpinLockRelease(&XLogRecoveryCtl->info_lck);
+							streaming_reply_sent = true;
+						}
+						else
+						{
+							WalRcvRequestApplyReply();
+							streaming_reply_sent = true;
+						}
 					}
 
 					/* Do any background tasks that might benefit us later. */
-					KnownAssignedTransactionIdsIdleMaintenance();
+					if (AmStartupProcess())
+						KnownAssignedTransactionIdsIdleMaintenance();
 
 					/* Update pg_stat_recovery_prefetch before sleeping. */
-					XLogPrefetcherComputeStats(xlogprefetcher);
+					if (AmWalPipeline())
+						XLogPrefetcherComputeStats(xlogprefetcher_pipelined);
+					else
+						XLogPrefetcherComputeStats(xlogprefetcher);
 
 					/*
 					 * Wait for more WAL to arrive, when we will be woken
@@ -4085,6 +4341,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 		 * process.
 		 */
 		ProcessStartupProcInterrupts();
+		if (wal_pipeline_enabled)
+			ProcessPipelineBgwInterrupts();
 	}
 
 	return XLREAD_FAIL;			/* not reached */
@@ -4250,6 +4508,18 @@ rescanLatestTimeLine(TimeLineID replayTLI, XLogRecPtr replayLSN)
 	list_free_deep(expectedTLEs);
 	expectedTLEs = newExpectedTLEs;
 
+	/* Update shared state */
+	if (wal_pipeline_enabled)
+	{
+		PipelineConsumerExpectedTLEsUpdateTLI(newtarget);
+
+		/* XXX May be we can combine the spin locks with about call. */
+		SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+		XLogRecoveryCtl->recoveryTargetTLI = recoveryTargetTLI;
+		SpinLockRelease(&XLogRecoveryCtl->info_lck);
+	}
+
+
 	/*
 	 * As in StartupXLOG(), try to ensure we have all the history files
 	 * between the old target and new target in pg_wal.
@@ -4338,6 +4608,15 @@ XLogFileRead(XLogSegNo segno, TimeLineID tli,
 		if (source != XLOG_FROM_STREAM)
 			XLogReceiptTime = GetCurrentTimestamp();
 
+		if (wal_pipeline_enabled)
+		{
+			/* also update the shared state */
+			SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+			XLogRecoveryCtl->XLogReceiptTime = XLogReceiptTime;
+			XLogRecoveryCtl->XLogReceiptSource = XLogReceiptSource;
+			SpinLockRelease(&XLogRecoveryCtl->info_lck);
+		}
+
 		return fd;
 	}
 	if (errno != ENOENT || !notfoundOk) /* unexpected failure? */
@@ -4422,7 +4701,10 @@ XLogFileReadAnyTLI(XLogSegNo segno, XLogSource source)
 			{
 				elog(DEBUG1, "got WAL segment from archive");
 				if (!expectedTLEs)
+				{
 					expectedTLEs = tles;
+					PipelineConsumerExpectedTLEsUpdateTLI(recoveryTargetTLI);
+				}
 				return fd;
 			}
 		}
@@ -4433,7 +4715,10 @@ XLogFileReadAnyTLI(XLogSegNo segno, XLogSource source)
 			if (fd != -1)
 			{
 				if (!expectedTLEs)
+				{
 					expectedTLEs = tles;
+					PipelineConsumerExpectedTLEsUpdateTLI(recoveryTargetTLI);
+				}
 				return fd;
 			}
 		}
@@ -4455,16 +4740,52 @@ XLogFileReadAnyTLI(XLogSegNo segno, XLogSource source)
 void
 StartupRequestWalReceiverRestart(void)
 {
+
+	/*
+	 * currentSource is also defined as pipeline shared state variable.
+	 * Update the state before procedding.
+	 */
+	if (wal_pipeline_enabled)
+	{
+		SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+		currentSource = XLogRecoveryCtl->currentSource;
+		SpinLockRelease(&XLogRecoveryCtl->info_lck);
+	}
+
 	if (currentSource == XLOG_FROM_STREAM && WalRcvRunning())
 	{
 		ereport(LOG,
 				(errmsg("WAL receiver process shutdown requested")));
 
 		pendingWalRcvRestart = true;
+
+		/*
+		 * pendingWalRcvRestart is also defined as pipeline shared state variable.
+		 * Update the state before procedding.
+		 */
+		if (wal_pipeline_enabled)
+		{
+			SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+			XLogRecoveryCtl->pendingWalRcvRestart = pendingWalRcvRestart;
+			SpinLockRelease(&XLogRecoveryCtl->info_lck);
+		}
 	}
 }
 
 
+/*
+ * standbyState is also defined as a shared state. Pipeline worker can also
+ * update its value, so always confirm the shared state before procedding.
+ */
+void
+SetSharedHotStandbyState(void)
+{
+	SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+	XLogRecoveryCtl->standbyState = standbyState;
+	SpinLockRelease(&XLogRecoveryCtl->info_lck);
+}
+
+
 /*
  * Has a standby promotion already been triggered?
  *
@@ -4516,11 +4837,18 @@ CheckForStandbyTrigger(void)
 	if (LocalPromoteIsTriggered)
 		return true;
 
-	if (IsPromoteSignaled() && CheckPromoteSignal())
+	/*
+	 * Check if the startup process was signaled for promotion. In case we are
+	 * calling this function from the pipeline, we need to check the promotion
+	 * signals for the pipeline worker instead.
+	 */
+	if ((IsPromoteSignaledPipeline() || IsPromoteSignaled()) &&
+		CheckPromoteSignal())
 	{
 		ereport(LOG, (errmsg("received promote request")));
 		RemovePromoteSignalFiles();
 		ResetPromoteSignaled();
+		ResetPromoteSignaledPipeline();
 		SetPromoteIsTriggered();
 		return true;
 	}
@@ -4559,6 +4887,7 @@ void
 WakeupRecovery(void)
 {
 	SetLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+	SetLatch(&XLogRecoveryCtl->recoveryApplyDelayLatch);
 }
 
 /*
@@ -4728,6 +5057,17 @@ GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream)
 	 */
 	Assert(InRecovery);
 
+	/*
+	 * If pipeline enabled, get the updated state.
+	 */
+	if (wal_pipeline_enabled)
+	{
+		SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+		XLogReceiptTime = XLogRecoveryCtl->XLogReceiptTime;
+		XLogReceiptSource = XLogRecoveryCtl->XLogReceiptSource;
+		SpinLockRelease(&XLogRecoveryCtl->info_lck);
+	}
+
 	*rtime = XLogReceiptTime;
 	*fromStream = (XLogReceiptSource == XLOG_FROM_STREAM);
 }
@@ -4813,6 +5153,114 @@ RecoveryRequiresIntParameter(const char *param_name, int currValue, int minValue
 	}
 }
 
+/*
+ * Called by the startup process before launching the pipeline worker.
+ * Exports recovery state variables into WalPipelineParams so the
+ * producer background worker can initialize itself with the same
+ * environment.
+ */
+static void
+WalPipeline_ExportRecoveryState(WalPipelineParams *params)
+{
+	/*
+	 * In order to start decoding through the pipeline,
+	 * these variables should be saved and then restored later.
+	 */
+	params->NextRecPtr = xlogreader->NextRecPtr;
+	params->recoveryTargetTLI = recoveryTargetTLI;
+	params->StandbyModeRequested = StandbyModeRequested;
+	params->StandbyMode = StandbyMode;
+	params->ArchiveRecoveryRequested = ArchiveRecoveryRequested;
+	params->InArchiveRecovery = InArchiveRecovery;
+	params->minRecoveryPointTLI = minRecoveryPointTLI;
+	params->minRecoveryPoint = minRecoveryPoint;
+	params->InRedo = InRedo;
+	params->currentSource = currentSource;
+	params->lastSourceFailed = lastSourceFailed;
+	params->pendingWalRcvRestart = pendingWalRcvRestart;
+	params->RedoStartTLI = RedoStartTLI;
+	params->CheckPointLoc = CheckPointLoc;
+	params->CheckPointTLI = CheckPointTLI;
+	params->RedoStartLSN = RedoStartLSN;
+	params->standbyState = standbyState;
+	params->flushedUpto = flushedUpto;
+	params->receiveTLI = receiveTLI;
+	params->abortedRecPtr = abortedRecPtr;
+	params->missingContrecPtr = missingContrecPtr;
+	params->backupEndRequired = backupEndRequired;
+	params->backupStartPoint = backupStartPoint;
+	params->backupEndPoint = backupEndPoint;
+	params->curFileTLI = curFileTLI;
+
+	/*
+	 * Saving promotion signal now, otherwise we won't be able hanlde this
+	 * signal becuase prodcuer process not started yet and startup proc won't
+	 * be responsible for decoding.
+	 */
+	params->promotedBeforeLaunch = IsPromoteSignaled() && CheckPromoteSignal();
+
+	/*
+	 * The pipeline will do the waiting in this case startup proc should disown
+	 * the latch.
+	 */
+	DisownRecoveryWakeupLatch();
+
+	/*
+	 * Update shared state before starting.
+	 */
+	SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+	XLogRecoveryCtl->InArchiveRecovery = InArchiveRecovery;
+	XLogRecoveryCtl->pendingWalRcvRestart = pendingWalRcvRestart;
+	XLogRecoveryCtl->abortedRecPtr = abortedRecPtr;
+	XLogRecoveryCtl->missingContrecPtr = missingContrecPtr;
+	XLogRecoveryCtl->currentSource = currentSource;
+	XLogRecoveryCtl->standbyState = standbyState;
+	XLogRecoveryCtl->XLogReceiptSource = XLogReceiptSource;
+	XLogRecoveryCtl->XLogReceiptTime = XLogReceiptTime;
+	SpinLockRelease(&XLogRecoveryCtl->info_lck);
+}
+
+/*
+ * Called by the producer background worker on startup.
+ * Imports recovery state from WalPipelineParams into the worker's
+ * own local variables, mirroring the environment the startup process
+ * had when it launched the pipeline.
+ */
+void
+WalPipeline_ImportRecoveryState(WalPipelineParams *params)
+{
+	StandbyMode = params->StandbyMode;
+	StandbyModeRequested = params->StandbyModeRequested;
+	ArchiveRecoveryRequested = params->ArchiveRecoveryRequested;
+	InArchiveRecovery = params->InArchiveRecovery;
+	recoveryTargetTLI = params->recoveryTargetTLI;
+	minRecoveryPointTLI = params->minRecoveryPointTLI;
+	minRecoveryPoint = params->minRecoveryPoint;
+	currentSource = params->currentSource;
+	lastSourceFailed = params->lastSourceFailed;
+	pendingWalRcvRestart = params->pendingWalRcvRestart;
+	RedoStartTLI = params->RedoStartTLI;
+	RedoStartLSN = params->RedoStartLSN;
+	standbyState = params->standbyState;
+	CheckPointLoc = params->CheckPointLoc;
+	CheckPointTLI = params->CheckPointTLI;
+	flushedUpto = params->flushedUpto;
+	receiveTLI = params->receiveTLI;
+	abortedRecPtr = params->abortedRecPtr;
+	missingContrecPtr = params->missingContrecPtr;
+	InRedo = params->InRedo;
+	backupEndRequired = params->backupEndRequired;
+	backupStartPoint = params->backupStartPoint;
+	backupEndPoint = params->backupEndPoint;
+	curFileTLI = params->curFileTLI;
+	InRecovery = true;
+
+	/*
+	 * As pipeline will be reading the wal, so better to own the latch to wait at.
+	 */
+	if (ArchiveRecoveryRequested)
+		OwnLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+}
 
 /*
  * GUC check_hook for primary_slot_name
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index de9092fdf5b..f7f31a916a7 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -1197,6 +1197,7 @@ standby_redo(XLogReaderState *record)
 		running.xids = xlrec->xids;
 
 		ProcArrayApplyRecoveryInfo(&running);
+		SetSharedHotStandbyState();
 
 		/*
 		 * The startup process currently has no convenient way to schedule
diff --git a/src/include/access/xlogpipeline.h b/src/include/access/xlogpipeline.h
index a8995d5c1a7..c47cc72121c 100644
--- a/src/include/access/xlogpipeline.h
+++ b/src/include/access/xlogpipeline.h
@@ -54,6 +54,52 @@ typedef struct WalRecordMsgHeader
 	XLogRecPtr  overwrittenRecPtr;    /* XLogReaderState->overwrittenRecPtr */
 } WalRecordMsgHeader;
 
+/*
+ * Parameters passed from StartupXLOG (consumer side)
+ * to the WAL pipeline producer background worker.
+ */
+typedef struct WalPipelineParams
+{
+	bool		StandbyMode;
+	bool		StandbyModeRequested;
+	bool		ArchiveRecoveryRequested;
+	bool		InArchiveRecovery;
+	bool		InRedo;
+	bool 		lastSourceFailed;
+	bool 		pendingWalRcvRestart;
+	bool 		backupEndRequired;
+	bool 		promotedBeforeLaunch;
+
+	TimeLineID  RedoStartTLI;
+	TimeLineID  CheckPointTLI;
+	TimeLineID  recoveryTargetTLI;
+	TimeLineID	minRecoveryPointTLI;
+	TimeLineID  ReplayTLI;
+	TimeLineID	receiveTLI;
+
+	XLogRecPtr 	backupStartPoint;
+	XLogRecPtr 	backupEndPoint;
+	XLogRecPtr  CheckPointLoc;
+	XLogRecPtr  RedoStartLSN;
+	XLogRecPtr  NextRecPtr;
+	XLogRecPtr	minRecoveryPoint;
+	XLogRecPtr 	flushedUpto;
+	XLogRecPtr 	abortedRecPtr;
+	XLogRecPtr 	missingContrecPtr;
+
+	int	readFile;
+	XLogSegNo readSegNo;
+	uint32 readOff;
+	uint32 readLen;
+	XLogSource readSource;
+	TimeLineID curFileTLI;
+
+
+	HotStandbyState standbyState;
+	XLogSource 	currentSource;
+
+} WalPipelineParams;
+
 /*
  * Shared memory control structure for the WAL pipeline
  */
diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h
index 81ac984a904..45357a25cad 100644
--- a/src/include/access/xlogrecovery.h
+++ b/src/include/access/xlogrecovery.h
@@ -11,7 +11,9 @@
 #ifndef XLOGRECOVERY_H
 #define XLOGRECOVERY_H
 
+#include "access/xlogprefetcher.h"
 #include "access/xlogreader.h"
+#include "access/xlogutils.h"
 #include "catalog/pg_control.h"
 #include "lib/stringinfo.h"
 #include "storage/condition_variable.h"
@@ -60,6 +62,17 @@ typedef enum RecoveryPauseState
 	RECOVERY_PAUSED,			/* recovery is paused */
 } RecoveryPauseState;
 
+/* Codes indicating where we got a WAL file from during recovery, or where
+ * to attempt to get one.
+ */
+typedef enum
+{
+	XLOG_FROM_ANY = 0,			/* request to read WAL from any source */
+	XLOG_FROM_ARCHIVE,			/* restored using restore_command */
+	XLOG_FROM_PG_WAL,			/* existing file in pg_wal */
+	XLOG_FROM_STREAM,			/* streamed from primary */
+} XLogSource;
+
 /*
  * Shared-memory state for WAL recovery.
  */
@@ -94,6 +107,14 @@ typedef struct XLogRecoveryCtlData
 	 */
 	Latch		recoveryWakeupLatch;
 
+	/*
+	 * In case pipeline enabled we will need two latches. One that can be used
+	 * by the pipeline for WAL waiting and other that can be used by the
+	 * startup process for the apply delay. Before this we had only one latch
+	 * for both cases.
+	 */
+	Latch		recoveryApplyDelayLatch;
+
 	/*
 	 * Last record successfully replayed.
 	 */
@@ -121,6 +142,65 @@ typedef struct XLogRecoveryCtlData
 	ConditionVariable recoveryNotPausedCV;
 
 	slock_t		info_lck;		/* locks shared variables shown above */
+
+	/* ------------------------------------------------------------------
+	 * Variables use for IPC between pipeline and the startup proc.
+	 * These are also the static variables in xlogrecovery.c but there state
+	 * keep on changing. So we added them as the shared states so that both
+	 * the pipeline and the startup proc stay synced if any of these state
+	 * changes.
+	 * ------------------------------------------------------------------
+	 */
+
+	/*
+	 * Pipeline could be waiting for the startup process to catchup with the
+	 * decoder. This could happend when no wait wal is available from the
+	 * current resource and now pipline have change the wal srouce
+	 * i.e enabling standby if requested.
+	 */
+	bool		pipeline_waiting;
+	bool		InArchiveRecovery;
+	bool		pendingWalRcvRestart;
+	bool		stanbyEnabled;
+
+	/*
+	 * The target TLI for which expectedTLEs should be recomputed by the
+	 * consumer
+	 */
+	TimeLineID	expectedTLEsUpdateTLI;
+
+	/*
+	 * We also export the recoveryTargetTLI as a WalPipelineParams. But other
+	 * than passing the initial state, the recoveryTargetTLI can also change
+	 * it state during the decoding by the prodcuer (see rescanLatestTimeLine()).
+	 *
+	 * This mean at the end of the recovery,  the startup process should aware
+	 * of any such state changes done by the producer. To handle this
+	 * ResetStatesIfPipelined() will update the startup local recoveryTargetTLI
+	 * with updated one, on FinishWalRecovery().
+	*/
+	TimeLineID	recoveryTargetTLI;
+
+	/*
+	 * Normaly we wakeup walrcvr after specific records have been applied, as
+	 * decoding and apllying are sequential so we wakeup after enough records
+	 * decoded read.
+	 *
+	 * But in case of pipeline reads (decoded records) could be ahead of the
+	 * consumer apply loop. We cannot wakeup wal rcvr based on how much records
+	 * decoded, so we tell consumer to wakeup after only after a specific lsn
+	 * (WakeupWalRcvrRecPtr set by the pipeline) has beed replayed.
+	 */
+	XLogRecPtr	WakeupWalRcvrRecPtr;
+
+	XLogRecPtr	abortedRecPtr;
+	XLogRecPtr	missingContrecPtr;
+
+	XLogSource	currentSource;
+	XLogSource	XLogReceiptSource;
+
+	HotStandbyState standbyState;
+	TimestampTz		XLogReceiptTime;
 } XLogRecoveryCtlData;
 
 extern PGDLLIMPORT XLogRecoveryCtlData *XLogRecoveryCtl;
@@ -205,6 +285,8 @@ typedef struct
 	bool		recovery_signal_file_found;
 } EndOfWalRecoveryInfo;
 
+struct WalPipelineParams;   /* forward declaration */
+
 extern EndOfWalRecoveryInfo *FinishWalRecovery(void);
 extern void ShutdownWalRecovery(void);
 extern void RemovePromoteSignalFiles(void);
@@ -225,11 +307,14 @@ extern int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, i
 extern bool PromoteIsTriggered(void);
 extern bool CheckPromoteSignal(void);
 extern void WakeupRecovery(void);
+extern void DisownRecoveryWakeupLatch(void);
+extern void SetSharedHotStandbyState(void);
 
 extern void StartupRequestWalReceiverRestart(void);
 extern void XLogRequestWalReceiverReply(void);
 
 extern void RecoveryRequiresIntParameter(const char *param_name, int currValue, int minValue);
+extern void WalPipeline_ImportRecoveryState(struct WalPipelineParams *params);
 
 extern void xlog_outdesc(StringInfo buf, XLogReaderState *record);
 
-- 
2.34.1

