From 6c86a9d9ec37d8c96e23909955a4ad0e352dcea9 Mon Sep 17 00:00:00 2001
From: Imran Zaheer <imran.zhir@gmail.com>
Date: Mon, 22 Jun 2026 12:30:30 +0500
Subject: [PATCH v4 2/4] Pipelined Recovery - Consumer Related Code

This includes the consumer-specific code for the producer-consumer
architecture for WAL replay that separates WAL decoding from the
recovery process, enabling parallel processing between different steps
 of replay.

The startup process act as consumer and will initiate the pipeline before
starting the redo loop inside PerformWalRecovery() and will connect to
the shm mq to receive the decoded records directly. Subsequently,
it will apply those records and keep on fetching new decoded records in
the main redo loop.

Finally, it will receive a shutdown message from the producer
(based on the consistency checks, the producer will take care of whether
to stop decoding or not). The consumer will exit the loop and later will
ask to fully shutdown the pipeline workers in FinishWalRecovery().

Author: Imran Zaheer <imran.zhir@gmail.com>
Idea by: Ants Aasma <ants@cybertec.at>
---
 src/backend/access/transam/xlogpipeline.c | 476 ++++++++++++++++++++++
 src/backend/access/transam/xlogrecovery.c |  74 +++-
 src/include/access/xlogpipeline.h         |  18 +
 3 files changed, 566 insertions(+), 2 deletions(-)

diff --git a/src/backend/access/transam/xlogpipeline.c b/src/backend/access/transam/xlogpipeline.c
index ab9e6eccc15..c48d2768a9a 100644
--- a/src/backend/access/transam/xlogpipeline.c
+++ b/src/backend/access/transam/xlogpipeline.c
@@ -82,6 +82,11 @@ static dsm_segment *producer_dsm_seg = NULL;
 static shm_mq *producer_mq = NULL;
 static shm_mq_handle *producer_mq_handle = NULL;
 
+/* Local state for consumer */
+static dsm_segment *consumer_dsm_seg = NULL;
+static shm_mq *consumer_mq = NULL;
+static shm_mq_handle *consumer_mq_handle = NULL;
+
 /*
  * Flags set by interrupt handlers for later service in the redo loop.
  */
@@ -92,9 +97,13 @@ static volatile sig_atomic_t promote_signaled = false;
 static void PipelineBgwSigHupHandler(SIGNAL_ARGS);
 static void PipelineProcTriggerHandler(SIGNAL_ARGS);
 
+/* Forward declarations */
 static void wal_pipeline_cleanup_callback(int code, Datum arg);
 static Size serialize_wal_record(XLogReaderState *record, char **buffer);
+static DecodedXLogRecord *deserialize_wal_record(const char *buffer, Size len, XLogReaderState *startup_reader);
 static void cleanup_producer_resources(void);
+static void cleanup_consumer_resources(void);
+static void WalPipeline_WaitForConsumerShutdownRequest(void);
 
 /* copied from xlogrecovery.c */
 /* Parameters passed down from ReadRecord to the XLogPageRead callback. */
@@ -127,6 +136,173 @@ WalPipelineShmemInit(void *arg)
 	SpinLockInit(&WalPipelineShm->mutex);
 }
 
+/*
+ * Called by Consumer.
+ *
+ * Initialize and start the WAL pipeline. This will be called by the startup
+ * process (consumer) as a request to start the pipeline.
+ */
+void
+WalPipeline_Start(WalPipelineParams *params)
+{
+	BackgroundWorkerHandle *handle;
+	BackgroundWorker	worker;
+	shm_toc_estimator 	e;
+	WalPipelineParams  *shared_params;
+	dsm_segment		   *seg;
+	shm_toc			   *toc;
+	shm_mq			   *mq;
+	Size				queue_size;
+	Size				segsize;
+	pid_t		 		pid;
+
+	if (wal_pipeline_mq_size_mb <= 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				errmsg("invalid wal_pipeline_mq_size_mb")));
+
+	queue_size = MBToBytes(wal_pipeline_mq_size_mb);
+
+	/* Set init flag */
+	SpinLockAcquire(&WalPipelineShm->mutex);
+	if (WalPipelineShm->initialized)
+	{
+		SpinLockRelease(&WalPipelineShm->mutex);
+		return;
+	}
+	WalPipelineShm->initialized = true;
+	SpinLockRelease(&WalPipelineShm->mutex);
+
+	/*
+	 * Estimate how much shared memory we need.
+	 *
+	 * We need one key to register the location of the WalPipelineParams, and
+	 * we need 1 key to track the location of the message queue.
+	 */
+	shm_toc_initialize_estimator(&e);
+	shm_toc_estimate_chunk(&e, sizeof(WalPipelineParams));
+	shm_toc_estimate_chunk(&e, queue_size);
+	shm_toc_estimate_keys(&e, 2);
+	segsize = shm_toc_estimate(&e);
+
+	/* Create the shared memory segment and establish a table of contents. */
+	seg = dsm_create(segsize, 0);
+	dsm_pin_segment(seg);
+	toc = shm_toc_create(PG_WAL_PIPELINE_MAGIC, dsm_segment_address(seg),
+						 segsize);
+
+	/* Setup arguments to be passed to the producer */
+	shared_params = shm_toc_allocate(toc, sizeof(WalPipelineParams));
+	shm_toc_insert(toc, 1, shared_params);
+	*shared_params = *params;
+
+	/* Setup the message queue */
+	mq = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size);
+	shm_toc_insert(toc, 2, mq);
+
+	/* update shared state */
+	SpinLockAcquire(&WalPipelineShm->mutex);
+	WalPipelineShm->dsm_seg_handle = dsm_segment_handle(seg);
+	WalPipelineShm->consumer_pid = MyProcPid;
+	SpinLockRelease(&WalPipelineShm->mutex);
+
+	/* Set up consumer side of the queue */
+	consumer_dsm_seg = seg;
+	consumer_mq = mq;
+	shm_mq_set_receiver(consumer_mq, MyProc);
+	consumer_mq_handle = shm_mq_attach(consumer_mq, seg, NULL);
+
+	/* Register background worker */
+	memset(&worker, 0, sizeof(worker));
+	worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
+	worker.bgw_start_time = BgWorkerStart_PostmasterStart;
+	worker.bgw_restart_time = BGW_NEVER_RESTART;
+	sprintf(worker.bgw_library_name, "postgres");
+	sprintf(worker.bgw_function_name, "WalPipeline_ProducerMain");
+	snprintf(worker.bgw_name, BGW_MAXLEN, "wal pipeline producer");
+	snprintf(worker.bgw_type, BGW_MAXLEN, "wal pipeline producer");
+	worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
+	worker.bgw_notify_pid = MyProcPid;
+
+	if (!RegisterDynamicBackgroundWorker(&worker, &handle))
+		goto fail;
+
+	if (WaitForBackgroundWorkerStartup(handle, &pid) != BGWH_STARTED)
+		goto fail;
+
+	/* Register cleanup callback */
+	before_shmem_exit(wal_pipeline_cleanup_callback, (Datum) 0);
+
+	ereport(LOG, (errmsg("[walpipeline] started.")));
+	return;
+
+fail:
+	SpinLockAcquire(&WalPipelineShm->mutex);
+	WalPipelineShm->initialized = false;
+	SpinLockRelease(&WalPipelineShm->mutex);
+
+	cleanup_consumer_resources();
+
+	ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+					errmsg("could not start pipeline background worker"),
+					errhint("More details may be available in the server log.")));
+}
+
+/*
+ * Request producer shutdown.
+ * This is called by the consumer when it no longer needs records.
+ */
+static void
+WalPipeline_RequestShutdown(void)
+{
+	if (!WalPipelineShm)
+		return;
+
+	SpinLockAcquire(&WalPipelineShm->mutex);
+	WalPipelineShm->shutdown_requested = true;
+	SpinLockRelease(&WalPipelineShm->mutex);
+}
+
+/*
+ * Consumer Function.
+ * Stop the WAL pipeline. This be called be the startup process
+ * (consumer). This will only be called  and the time to recovery shutdown.
+ * This function will also wait until the pipeline workers
+ * are exited.
+ */
+void
+WalPipeline_Stop(void)
+{
+	if (!WalPipelineShm || !WalPipelineShm->initialized)
+		return;
+
+	/* Ask producer to stop */
+	WalPipeline_RequestShutdown();
+
+	/* Wait for producer to exit (max 10 seconds) */
+	for (int i = 0; i < 100; i++)
+	{
+		bool producer_alive;
+
+		SpinLockAcquire(&WalPipelineShm->mutex);
+		producer_alive = (WalPipelineShm->producer_pid != 0);
+		SpinLockRelease(&WalPipelineShm->mutex);
+
+		if (!producer_alive)
+			break;
+
+		pg_usleep(100000); /* 100 ms */
+	}
+
+	cleanup_consumer_resources();
+
+	SpinLockAcquire(&WalPipelineShm->mutex);
+	WalPipelineShm->initialized = false;
+	SpinLockRelease(&WalPipelineShm->mutex);
+
+	elog(LOG, "[walpipeline] shutdown");
+}
 
 /*
  * Producer Function.
@@ -353,6 +529,210 @@ WalPipeline_SendShutdown(void)
 	return (res == SHM_MQ_SUCCESS);
 }
 
+/*
+ * Consumer Function.
+ * Receive and deserialize a WAL record from the producer
+ */
+DecodedXLogRecord *
+WalPipeline_ReceiveRecord(XLogReaderState *startup_reader)
+{
+	shm_mq_result res;
+	Size        nbytes;
+	void       *data;
+	WalRecordMsgHeader *hdr;
+	DecodedXLogRecord *record;
+
+	if (!consumer_mq_handle)
+		return NULL;
+
+	/* Receive message from queue */
+	res = shm_mq_receive(consumer_mq_handle, &nbytes, &data, false);
+
+	if (res != SHM_MQ_SUCCESS)
+		elog(ERROR, "[walpipeline] consumer: failed to receive record");
+
+	hdr = (WalRecordMsgHeader *) data;
+
+	/* Handle different message types */
+	switch (hdr->msg_type)
+	{
+		case WAL_MSG_RECORD:
+			record = deserialize_wal_record((char *) data, nbytes, startup_reader);
+
+			/* Update statistics */
+			SpinLockAcquire(&WalPipelineShm->mutex);
+			WalPipelineShm->records_received++;
+			WalPipelineShm->bytes_received += nbytes;
+			WalPipelineShm->consumer_lsn = hdr->endRecPtr;
+			SpinLockRelease(&WalPipelineShm->mutex);
+
+			return record;
+
+		case WAL_MSG_SHUTDOWN:
+			elog(LOG, "[walpipeline] consumer: received shutdown message from the producer");
+			return NULL;
+
+		default:
+			elog(PANIC, "[walpipeline] consumer: unknown message type: %d",
+				 hdr->msg_type);
+			return NULL;
+	}
+}
+
+/*
+ * Consumer Function.
+ * Check if producer is still running
+ */
+bool
+WalPipeline_CheckProducerAlive(void)
+{
+	pid_t       pid;
+	bool        alive;
+
+	SpinLockAcquire(&WalPipelineShm->mutex);
+	pid = WalPipelineShm->producer_pid;
+	SpinLockRelease(&WalPipelineShm->mutex);
+
+	if (pid == 0)
+		return false;
+
+	alive = (kill(pid, 0) == 0);
+
+	if (!alive)
+	{
+		SpinLockAcquire(&WalPipelineShm->mutex);
+		WalPipelineShm->producer_pid = 0;
+		SpinLockRelease(&WalPipelineShm->mutex);
+	}
+
+	return alive;
+}
+
+/*
+ * Consumer Function.
+ * Check if pipeline is active
+ */
+bool
+WalPipeline_IsActive(void)
+{
+	bool        active;
+
+	if (!WalPipelineShm)
+		return false;
+
+	SpinLockAcquire(&WalPipelineShm->mutex);
+	active = WalPipelineShm->initialized && !WalPipelineShm->shutdown_requested;
+	SpinLockRelease(&WalPipelineShm->mutex);
+
+	return active;
+}
+
+/*
+ * Consumer Function.
+ * Check if pid pipeline decoder worker
+ */
+pid_t
+WalPipeline_GetProducerPid(void)
+{
+	pid_t pid;
+
+	SpinLockAcquire(&WalPipelineShm->mutex);
+	pid = WalPipelineShm->producer_pid;
+	SpinLockRelease(&WalPipelineShm->mutex);
+
+	if (!(pid > 0))
+		pid = InvalidPid;
+
+	return pid;
+}
+
+/*
+ * Producer Function.
+ * Producer may can exit without waiting for the consumer, but its better to
+ * wait until consumer request shutdown. This way log messages will show
+ * no of records_sent & records_received records equal to each other.
+ */
+static void
+WalPipeline_WaitForConsumerShutdownRequest(void)
+{
+	int			iters = 0;
+
+	while (true)
+	{
+		bool		shutdown_requested;
+
+		SpinLockAcquire(&WalPipelineShm->mutex);
+		shutdown_requested = WalPipelineShm->shutdown_requested;
+		SpinLockRelease(&WalPipelineShm->mutex);
+
+		if (shutdown_requested)
+			break;
+
+		if (++iters >= MAX_SHUTDOWN_WAIT_ITERS)
+		{
+			elog(WARNING,
+					"[walpipeline] producer: timed out waiting for consumer "
+					"to acknowledge shutdown, exiting anyway");
+			break;
+		}
+
+		/* Allow SIGTERM / SIGHUP to interrupt the wait */
+		ProcessPipelineBgwInterrupts();
+
+		pg_usleep(10000);  /* sleep 10ms */
+	}
+}
+
+/*
+ * Consumer Function.
+ * Wait unless last sent record by the pipeline is applied by the
+ * startup process.
+ */
+void
+WalPipeline_WaitForConsumerCatchup(void)
+{
+	XLogRecPtr producer_lsn;
+	XLogRecPtr consumer_lsn;
+
+	for (;;)
+	{
+		SpinLockAcquire(&WalPipelineShm->mutex);
+		producer_lsn = WalPipelineShm->producer_lsn;
+		consumer_lsn = WalPipelineShm->applied_lsn;
+		SpinLockRelease(&WalPipelineShm->mutex);
+
+		if (producer_lsn == consumer_lsn)
+			return;
+
+		CHECK_FOR_INTERRUPTS();
+
+		/* short sleep to avoid busy looping */
+		pg_usleep(50);   /* 50 microseconds */
+	}
+}
+
+/*
+ * Consumer Function.
+ * Get pipeline statistics
+ */
+void
+WalPipeline_GetStats(uint64 *records_sent, uint64 *records_received,
+					 XLogRecPtr *producer_lsn, XLogRecPtr *consumer_lsn)
+{
+	SpinLockAcquire(&WalPipelineShm->mutex);
+
+	if (records_sent)
+		*records_sent = WalPipelineShm->records_sent;
+	if (records_received)
+		*records_received = WalPipelineShm->records_received;
+	if (producer_lsn)
+		*producer_lsn = WalPipelineShm->producer_lsn;
+	if (consumer_lsn)
+		*consumer_lsn = WalPipelineShm->consumer_lsn;
+
+	SpinLockRelease(&WalPipelineShm->mutex);
+}
+
 
 /*
  * serialize_wal_record (Producer)
@@ -411,6 +791,76 @@ serialize_wal_record(XLogReaderState *xlogreader, char **outbuf)
 	return total;
 }
 
+/*
+ * deserialize_wal_record (Consumer)
+ *
+ * Unpack a buffer produced by serialize_wal_record, restore interior
+ * offsets to pointers, and attach the record to the startup reader.
+ *
+ * Data Layout:
+ * 		[WalRecordMsgHeader][DecodedXLogRecord + trailing data]
+ */
+DecodedXLogRecord *
+deserialize_wal_record(const char *buf, Size len,
+					   XLogReaderState *startup_reader)
+{
+	WalRecordMsgHeader hdr;
+	DecodedXLogRecord *dec;
+	const char *payload;
+
+	if (len < sizeof(hdr))
+		return NULL;
+
+	memcpy(&hdr, buf, sizeof(hdr));
+
+	if (hdr.decoded_size != len - sizeof(hdr))
+		return NULL;
+
+
+	/*
+	 * We don't have to copy the data to some local buffer. The decoded blks
+	 * are continous bytes in memory and hence we can just point to its location
+	 * in the sh_mq
+	 */
+	payload = buf + sizeof(hdr);
+	dec = (DecodedXLogRecord *) payload;
+
+	/*
+	 * Restore interior pointers from offsets.
+	 * Offset 0 means the original pointer was NULL.
+	 */
+	if (dec->main_data_len > 0)
+		dec->main_data = (char *)dec + (ptrdiff_t)dec->main_data;
+
+	for (int i = 0; i <= dec->max_block_id; i++)
+	{
+		DecodedBkpBlock *blk = &dec->blocks[i];
+		if (!blk->in_use)
+			continue;
+		if (blk->has_data)
+			blk->data = (char *)dec + (ptrdiff_t)blk->data;
+		if (blk->has_image)
+			blk->bkp_image = (char *)dec + (ptrdiff_t)blk->bkp_image;
+	}
+
+	/* clear the queue link — it belongs to the producer's queue */
+	dec->next = NULL;
+
+	/* Attach to reader, only updating the public parameters */
+	startup_reader->record            = dec;
+	startup_reader->ReadRecPtr        = dec->lsn;
+	startup_reader->DecodeRecPtr      = dec->lsn;
+	startup_reader->EndRecPtr         = dec->next_lsn;
+	startup_reader->NextRecPtr        = dec->next_lsn;
+	startup_reader->decode_queue_head = dec;
+	startup_reader->decode_queue_tail = dec;
+	startup_reader->missingContrecPtr = hdr.missingContrecPtr;
+	startup_reader->abortedRecPtr     = hdr.abortedRecPtr;
+	startup_reader->overwrittenRecPtr = hdr.overwrittenRecPtr;
+
+	return dec;
+}
+
 /*
  * We need to put some assertion that only pipeline worker should be touching
  * the specific code.
@@ -451,6 +901,32 @@ cleanup_producer_resources(void)
 	SpinLockRelease(&WalPipelineShm->mutex);
 }
 
+/*
+ * Clean up consumer-side resources
+ */
+static void
+cleanup_consumer_resources(void)
+{
+	if (consumer_mq_handle)
+	{
+		shm_mq_detach(consumer_mq_handle);
+		consumer_mq_handle = NULL;
+	}
+
+	if (consumer_dsm_seg)
+	{
+		dsm_unpin_segment(dsm_segment_handle(consumer_dsm_seg));
+		dsm_detach(consumer_dsm_seg);
+		consumer_dsm_seg = NULL;
+	}
+
+	consumer_mq = NULL;
+
+	SpinLockAcquire(&WalPipelineShm->mutex);
+	WalPipelineShm->consumer_pid = 0;
+	WalPipelineShm->dsm_seg_handle = DSM_HANDLE_INVALID;
+	SpinLockRelease(&WalPipelineShm->mutex);
+}
 
 /*
  * Cleanup callback for process exit
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 6de4a75c6c2..9cc28c4a717 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -1600,7 +1600,57 @@ ShutdownWalRecovery(void)
 	 * it, but let's do it for the sake of tidiness.
 	 */
 	if (ArchiveRecoveryRequested)
-		DisownLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+
+/*
+ * Get next record for redo.
+ *
+ * Use the pipeline if enabled for parallel decoding and receive decoded
+ * records from a shared queue, else read it directly.
+ *
+ * Parallel decoding helps us offloading some load from the CPU and hence
+ * boosting the recovery process.
+ */
+static XLogRecord *
+ReceiveRecord(XLogPrefetcher *xlogprefetcher, int emode,
+				bool fetching_ckpt, TimeLineID replayTLI,
+				XLogReaderState **localreader)
+{
+
+	XLogRecord *record = NULL;
+	XLogReaderState *reader = *localreader;
+	DecodedXLogRecord *decoded_record = NULL;
+
+	/*
+	 * If pipeline not enabled read the record directly
+	 */
+	if (!wal_pipeline_enabled)
+	{
+		record = ReadRecord(xlogprefetcher, emode, fetching_ckpt, replayTLI);
+		return record;
+	}
+
+	Assert (WalPipeline_IsActive());
+
+	/*
+	 * Get the record from the pipeline message queue
+	 */
+	decoded_record = WalPipeline_ReceiveRecord(reader);
+
+	if (decoded_record)
+	{
+		record = &decoded_record->header;
+		return record;
+	}
+	else
+	{
+		/*
+		 * We will end up here only when pipeline couldn't read more
+		 * records and have sent a shutdown msg. We will acknowldge this
+		 * and will trigger request to stop more pipelined decoding.
+		 */
+		WalPipeline_Stop();
+		return NULL;
+	}
 }
 
 /*
@@ -1615,6 +1665,12 @@ PerformWalRecovery(void)
 	bool		reachedRecoveryTarget = false;
 	TimeLineID	replayTLI;
 
+	/*
+	 * standalone backend may exist in case of pg_rewind.
+	 */
+	if (!IsUnderPostmaster)
+		wal_pipeline_enabled = false;
+
 	/*
 	 * Initialize shared variables for tracking progress of WAL replay, as if
 	 * we had just replayed the record before the REDO location (or the
@@ -1694,6 +1750,17 @@ PerformWalRecovery(void)
 
 		InRedo = true;
 
+		if(wal_pipeline_enabled)
+		{
+			/*
+			 * Startup proc parameters that pipeline should also be aware of.
+			 */
+			WalPipelineParams *params = palloc0(sizeof(WalPipelineParams));
+
+			params->ReplayTLI = replayTLI;
+			WalPipeline_Start(params);
+		}
+
 		RmgrStartup();
 
 		ereport(LOG,
@@ -1802,7 +1869,7 @@ PerformWalRecovery(void)
 			}
 
 			/* Else, try to fetch the next WAL record */
-			record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
+			record = ReceiveRecord(xlogprefetcher, LOG, false, replayTLI, &xlogreader);
 		} while (record != NULL);
 
 		/*
@@ -1811,6 +1878,9 @@ PerformWalRecovery(void)
 
 		if (reachedRecoveryTarget)
 		{
+			if (wal_pipeline_enabled)
+				WalPipeline_Stop();
+
 			if (!reachedConsistency)
 				ereport(FATAL,
 						(errmsg("requested recovery stop point is before consistent recovery point")));
diff --git a/src/include/access/xlogpipeline.h b/src/include/access/xlogpipeline.h
index 333090632b4..a8995d5c1a7 100644
--- a/src/include/access/xlogpipeline.h
+++ b/src/include/access/xlogpipeline.h
@@ -88,11 +88,29 @@ typedef struct WalPipelineShmCtl
 /* consumer may have to compute prefetecher stats */
 extern PGDLLIMPORT XLogPrefetcher *xlogprefetcher_pipelined;
 
+/*
+ * Public API functions
+ */
+
+/* Start/stop the pipeline */
+extern void WalPipeline_Start(WalPipelineParams *params);
+extern void WalPipeline_Stop(void);
 
 /* Producer functions (called by background worker) */
 extern void WalPipeline_ProducerMain(Datum main_arg);
 extern bool WalPipeline_SendRecord(XLogReaderState *record);
 extern bool WalPipeline_SendShutdown(void);
+
+/* Consumer functions (called by startup process) */
+extern DecodedXLogRecord *WalPipeline_ReceiveRecord(XLogReaderState *startup_reader);
+extern bool WalPipeline_CheckProducerAlive(void);
+
+/* Status and monitoring */
+extern bool WalPipeline_IsActive(void);
+extern pid_t WalPipeline_GetProducerPid(void);
+extern void WalPipeline_WaitForConsumerCatchup(void);
+extern void WalPipeline_GetStats(uint64 *records_sent, uint64 *records_received,
+								  XLogRecPtr *producer_lsn, XLogRecPtr *consumer_lsn);
 extern bool AmWalPipeline(void);
 
 
-- 
2.34.1

