From 5885605a6579e517f664422b763ec683464852d5 Mon Sep 17 00:00:00 2001
From: Imran Zaheer <imran.zhir@gmail.com>
Date: Mon, 22 Jun 2026 12:39:49 +0500
Subject: [PATCH v4 1/4] Pipelined Recovery - Producer Related Code

This includes the producer specific code for the producer-consumer
architecture for WAL replay that separates WAL decoding from the startup
process, enabling parallel processing between differemt steps of replay.

The producer includes a background worker that reads and decodes WAL records,
then send them to the startup process for the redo. IPC happens via shared
memory message queues (shm_mq), allowing the decoder to run ahead of
the apply process.

This will give some performance improvements in recoveries for CPU-bound
workloads.

New GUC: wal_pipeline (default: off)

Author: Imran Zaheer <imran.zhir@gmail.com>
Idea by: Ants Aasma <ants@cybertec.at>
---
 src/backend/access/transam/Makefile           |   1 +
 src/backend/access/transam/meson.build        |   1 +
 src/backend/access/transam/xlogpipeline.c     | 567 ++++++++++++++++++
 src/backend/access/transam/xlogprefetcher.c   |   3 +-
 src/backend/access/transam/xlogrecovery.c     |   8 +-
 src/backend/postmaster/bgworker.c             |   5 +
 src/backend/postmaster/postmaster.c           |  12 +-
 src/backend/utils/misc/guc_parameters.dat     |  15 +
 src/backend/utils/misc/postgresql.conf.sample |   2 +
 src/include/access/xlog.h                     |   2 +
 src/include/access/xlogpipeline.h             | 105 ++++
 src/include/access/xlogrecovery.h             |   4 +
 src/include/storage/subsystemlist.h           |   1 +
 13 files changed, 720 insertions(+), 6 deletions(-)
 create mode 100644 src/backend/access/transam/xlogpipeline.c
 create mode 100644 src/include/access/xlogpipeline.h

diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index a32f473e0a2..ba0bf343769 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -32,6 +32,7 @@ OBJS = \
 	xlogbackup.o \
 	xlogfuncs.o \
 	xloginsert.o \
+	xlogpipeline.o \
 	xlogprefetcher.o \
 	xlogreader.o \
 	xlogrecovery.o \
diff --git a/src/backend/access/transam/meson.build b/src/backend/access/transam/meson.build
index 06aadc7f315..be37b40581d 100644
--- a/src/backend/access/transam/meson.build
+++ b/src/backend/access/transam/meson.build
@@ -20,6 +20,7 @@ backend_sources += files(
   'xlogbackup.c',
   'xlogfuncs.c',
   'xloginsert.c',
+  'xlogpipeline.c',
   'xlogprefetcher.c',
   'xlogrecovery.c',
   'xlogstats.c',
diff --git a/src/backend/access/transam/xlogpipeline.c b/src/backend/access/transam/xlogpipeline.c
new file mode 100644
index 00000000000..ab9e6eccc15
--- /dev/null
+++ b/src/backend/access/transam/xlogpipeline.c
@@ -0,0 +1,567 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogpipeline.c
+ *    WAL replay pipeline implementation
+ *
+ * This module implements a producer-consumer pipeline for WAL replay.
+ * The producer (background worker) reads and decodes WAL records in parallel
+ * with the consumer (startup process) that applies them.
+ *
+ * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *    src/backend/access/transam/xlogpipeline.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <fcntl.h>
+#include <unistd.h>
+
+#include "access/heapam_xlog.h"
+#include "access/rmgr.h"
+#include "access/xlog.h"
+#include "access/xlogpipeline.h"
+#include "access/xlogprefetcher.h"
+#include "access/xlogreader.h"
+#include "access/xlogrecord.h"
+#include "access/xlogrecovery.h"
+#include "access/xlogutils.h"
+#include "access/xlog_internal.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "postmaster/bgworker.h"
+#include "postmaster/startup.h"
+#include "storage/bufmgr.h"
+#include "storage/dsm.h"
+#include "storage/ipc.h"
+#include "storage/lwlock.h"
+#include "storage/md.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/procsignal.h"
+#include "storage/shm_mq.h"
+#include "storage/shm_toc.h"
+#include "storage/smgr.h"
+#include "storage/subsystems.h"
+#include "tcop/tcopprot.h"
+#include "utils/elog.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+#include "utils/rel.h"
+#include "utils/timeout.h"
+
+
+/*
+ * Convert values of GUCs measured in megabytes to bytes
+ */
+#define MBToBytes(mbvar) (mbvar * 1024 * 1024)
+
+/*
+ * Waiting for consumer before exiting gracefully.
+ */
+#define MAX_SHUTDOWN_WAIT_ITERS 1000	/* 1000 * 10ms = 10 seconds */
+
+
+/* Global shared memory control structure */
+WalPipelineShmCtl *WalPipelineShm = NULL;
+
+static void WalPipelineShmemRequest(void *arg);
+static void WalPipelineShmemInit(void *arg);
+
+const ShmemCallbacks WalPipelineShmemCallbacks = {
+	.request_fn = WalPipelineShmemRequest,
+	.init_fn = WalPipelineShmemInit,
+};
+
+XLogPrefetcher *xlogprefetcher_pipelined = NULL;
+
+/* Local state for producer */
+static dsm_segment *producer_dsm_seg = NULL;
+static shm_mq *producer_mq = NULL;
+static shm_mq_handle *producer_mq_handle = NULL;
+
+/*
+ * Flags set by interrupt handlers for later service in the redo loop.
+ */
+static volatile sig_atomic_t got_SIGHUP = false;
+static volatile sig_atomic_t promote_signaled = false;
+
+/* Signal handlers */
+static void PipelineBgwSigHupHandler(SIGNAL_ARGS);
+static void PipelineProcTriggerHandler(SIGNAL_ARGS);
+
+static void wal_pipeline_cleanup_callback(int code, Datum arg);
+static Size serialize_wal_record(XLogReaderState *record, char **buffer);
+static void cleanup_producer_resources(void);
+
+/* copied from xlogrecovery.c */
+/* Parameters passed down from ReadRecord to the XLogPageRead callback. */
+typedef struct XLogPageReadPrivate
+{
+	int			emode;
+	bool		fetching_ckpt;	/* are we fetching a checkpoint record? */
+	bool		randAccess;
+	TimeLineID	replayTLI;
+} XLogPageReadPrivate;
+
+
+/*
+ * Register shared memory for WAL Pipeline
+ */
+static void
+WalPipelineShmemRequest(void *arg)
+{
+	ShmemRequestStruct(.name = "WAL Pipeline Ctl",
+					   .size = sizeof(WalPipelineShmCtl),
+					   .ptr = (void **) &WalPipelineShm,
+		);
+}
+
+static void
+WalPipelineShmemInit(void *arg)
+{
+	memset(WalPipelineShm, 0, sizeof(WalPipelineShmCtl));
+
+	SpinLockInit(&WalPipelineShm->mutex);
+}
+
+
+/*
+ * Producer Function.
+ * Main loop for the producer background worker.
+ */
+void
+WalPipeline_ProducerMain(Datum main_arg)
+{
+	dsm_handle           handle = DatumGetUInt32(main_arg);
+	dsm_segment        	*seg;
+	shm_toc            	*toc;
+	WalPipelineParams  	*params;
+	XLogReaderState    	*xlogreader;
+	XLogPageReadPrivate *private;
+	XLogRecord         	*record;
+	TimeLineID           replayTLI = 0;
+	bool 				 end_of_wal = false;
+	uint64				 records_sent;
+	uint64				 records_received;
+
+	/*
+	 * Properly accept or ignore signals the postmaster might send us.
+	 */
+	pqsignal(SIGHUP, PipelineBgwSigHupHandler); 	/* reload config file */
+	pqsignal(SIGUSR2, PipelineProcTriggerHandler);
+
+	/* Register cleanup callback */
+	before_shmem_exit(wal_pipeline_cleanup_callback, (Datum) 0);
+
+	seg = dsm_attach(handle);
+	if (seg == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("[walpipeline] producer: could not map dynamic shared memory segment")));
+
+	toc = shm_toc_attach(PG_WAL_PIPELINE_MAGIC, dsm_segment_address(seg));
+	if (toc == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("[walpipeline] producer: bad magic number in dynamic shared memory segment")));
+
+	/* Lookup params and queue */
+	params = shm_toc_lookup(toc, 1, false);
+	producer_mq = shm_toc_lookup(toc, 2, false);
+
+	/* Set up producer side of queue */
+	producer_dsm_seg = seg;
+	shm_mq_set_sender(producer_mq, MyProc);
+	producer_mq_handle = shm_mq_attach(producer_mq, seg, NULL);
+
+	SpinLockAcquire(&WalPipelineShm->mutex);
+	WalPipelineShm->producer_pid = MyProcPid;
+	SpinLockRelease(&WalPipelineShm->mutex);
+
+	/* DSM is now attached, so safe to unblock the signals */
+	BackgroundWorkerUnblockSignals();
+
+	/* Set up WAL reading processor */
+	private = palloc0(sizeof(XLogPageReadPrivate));
+	xlogreader =
+		XLogReaderAllocate(wal_segment_size, NULL,
+						   XL_ROUTINE(.page_read = &XLogPageRead,
+									  .segment_open = NULL,
+									  .segment_close = wal_segment_close),
+						   private);
+
+	if (!xlogreader)
+		ereport(ERROR,
+				(errcode(ERRCODE_OUT_OF_MEMORY),
+				 errmsg("out of memory"),
+				 errdetail("Failed while allocating a WAL reading processor.")));
+	xlogreader->system_identifier = GetSystemIdentifier();
+
+	/*
+	 * Set the WAL decode buffer size.  This limits how far ahead we can read
+	 * in the WAL.
+	 */
+	XLogReaderSetDecodeBuffer(xlogreader, NULL, wal_decode_buffer_size);
+
+	/* Init some important globals before starting */
+	replayTLI = params->ReplayTLI;
+	WalPipeline_ImportRecoveryState(params);
+
+	/* Reinit the WAL prefetcher. */
+	xlogprefetcher_pipelined = XLogPrefetcherAllocate(xlogreader);
+
+
+	elog(LOG, "[walpipeline] producer: started at %X/%X, TLI %u",
+		 LSN_FORMAT_ARGS(params->NextRecPtr), replayTLI);
+
+	XLogPrefetcherBeginRead(xlogprefetcher_pipelined, params->NextRecPtr);
+
+	/* Handle the signal if we were promoted before the pipeline launch */
+	promote_signaled = params->promotedBeforeLaunch;
+
+	/* Main decoding loop */
+	while (true)
+	{
+		bool shutdown_requested;
+
+		/* Check if consumer requested to stop decoding */
+		SpinLockAcquire(&WalPipelineShm->mutex);
+		shutdown_requested = WalPipelineShm->shutdown_requested;
+		SpinLockRelease(&WalPipelineShm->mutex);
+
+		if (shutdown_requested)
+		{
+			elog(DEBUG1, "[walpipeline] producer: got shutdown request from the consumer.");
+			break;
+		}
+
+		/* Read next WAL record */
+		record = ReadRecord(xlogprefetcher_pipelined, LOG, false, replayTLI);
+
+		if (record == NULL)
+		{
+			end_of_wal = true;
+			elog(DEBUG1, "[walpipeline] producer: reached end of WAL");
+			break;
+		}
+
+		/*
+		 * Successfully decoded a record. Send it to the consumer.
+		 */
+		if (!WalPipeline_SendRecord(xlogreader))
+		{
+			elog(WARNING, "[walpipeline] producer: failed to send record, queue full or detached");
+			break;
+		}
+
+		/* Update our position for monitoring */
+		SpinLockAcquire(&WalPipelineShm->mutex);
+		WalPipelineShm->producer_lsn = xlogreader->EndRecPtr;
+		SpinLockRelease(&WalPipelineShm->mutex);
+
+		CHECK_FOR_INTERRUPTS();
+	}
+
+
+	if (end_of_wal)
+	{
+		/* Notify consumer we need to exit as no more records found */
+		WalPipeline_SendShutdown();
+		WalPipeline_WaitForConsumerShutdownRequest();
+	}
+
+	SpinLockAcquire(&WalPipelineShm->mutex);
+	records_sent = WalPipelineShm->records_sent;
+	records_received = WalPipelineShm->records_received;
+	SpinLockRelease(&WalPipelineShm->mutex);
+
+	elog(LOG, "[walpipeline] producer: exiting: sent=" UINT64_FORMAT " received=" UINT64_FORMAT,
+		 records_sent, records_received);
+
+	/* Cleanup */
+	pfree(private);
+	XLogReaderFree(xlogreader);
+	XLogPrefetcherFree(xlogprefetcher_pipelined);
+	DisownRecoveryWakeupLatch();
+	cleanup_producer_resources();
+}
+
+/*
+ * Producer Function.
+ * Send a decoded WAL record to the consumer
+ */
+bool
+WalPipeline_SendRecord(XLogReaderState *record)
+{
+	char       *buffer = NULL;
+	Size        msglen;
+	shm_mq_result res;
+
+
+	if (!producer_mq_handle)
+		return false;
+
+	/* Serialize the record */
+	msglen = serialize_wal_record(record, &buffer);
+
+	res = shm_mq_send(producer_mq_handle, msglen, buffer, false, true);
+
+	if (res == SHM_MQ_SUCCESS)
+	{
+		SpinLockAcquire(&WalPipelineShm->mutex);
+		WalPipelineShm->records_sent++;
+		WalPipelineShm->bytes_sent += msglen;
+		SpinLockRelease(&WalPipelineShm->mutex);
+
+		pfree(buffer);
+		return true;
+	}
+
+	if (res == SHM_MQ_DETACHED)
+	{
+		elog(PANIC, "[walpipeline] producer: consumer detached");
+		pfree(buffer);
+		return false;
+	}
+
+	/* Some other error */
+	elog(PANIC, "[walpipeline] producer: shm_mq_send failed with result %d", res);
+	pfree(buffer);
+	return false;
+}
+
+/*
+ * Producer Function.
+ * Send shutdown message to consumer
+ */
+bool
+WalPipeline_SendShutdown(void)
+{
+	WalRecordMsgHeader hdr;
+	shm_mq_result res;
+
+	if (!producer_mq_handle)
+		return false;
+
+	hdr.msg_type = WAL_MSG_SHUTDOWN;
+	hdr.endRecPtr = InvalidXLogRecPtr;
+
+	res = shm_mq_send(producer_mq_handle, sizeof(hdr), &hdr, false, true);
+	return (res == SHM_MQ_SUCCESS);
+}
+
+
+/*
+ * serialize_wal_record (Producer)
+ *
+ * Pack a WalRecordMsgHeader followed by the DecodedXLogRecord into a
+ * contiguous buffer, converting interior pointers to relative offsets.
+ *
+ * Output buffer layout:
+ *   [WalRecordMsgHeader][DecodedXLogRecord + trailing data]
+ */
+static Size
+serialize_wal_record(XLogReaderState *xlogreader, char **outbuf)
+{
+	DecodedXLogRecord *dec = xlogreader->record;
+	DecodedXLogRecord *dec_copy;
+	WalRecordMsgHeader hdr;
+	Size payload_size = dec->size;
+	Size total = sizeof(WalRecordMsgHeader) + payload_size;
+	char *buf = palloc(total);
+
+	/* build header */
+	hdr.msg_type          = WAL_MSG_RECORD;
+	hdr.readRecPtr        = xlogreader->ReadRecPtr;
+	hdr.endRecPtr         = xlogreader->EndRecPtr;
+	hdr.missingContrecPtr = xlogreader->missingContrecPtr;
+	hdr.abortedRecPtr     = xlogreader->abortedRecPtr;
+	hdr.overwrittenRecPtr = xlogreader->overwrittenRecPtr;
+	hdr.decoded_size      = payload_size;
+
+	memcpy(buf, &hdr, sizeof(hdr));
+	memcpy(buf + sizeof(hdr), dec, payload_size);
+
+	/*
+	 * Fix up the interior pointers: main_data and each block's data/bkp_image
+	 * are absolute addresses in the producer. Convert them to byte offsets
+	 * from the start of the copied DecodedXLogRecord so the consumer can
+	 * reconstruct them.
+	 */
+	dec_copy = (DecodedXLogRecord *)(buf + sizeof(hdr));
+
+	if (dec_copy->main_data_len > 0)
+		dec_copy->main_data = (char *)((char *)dec->main_data - (char *)dec);
+
+	for (int i = 0; i <= dec_copy->max_block_id; i++)
+	{
+		DecodedBkpBlock *blk = &dec_copy->blocks[i];
+		if (!blk->in_use)
+			continue;
+		if (blk->has_data)
+			blk->data = (char *)((char *)dec->blocks[i].data - (char *)dec);
+		if (blk->has_image)
+			blk->bkp_image = (char *)((char *)dec->blocks[i].bkp_image - (char *)dec);
+	}
+
+	*outbuf = buf;
+	return total;
+}
+
+/*
+ * We need to put some assertion that only pipeline worker should be touching
+ * the specific code.
+ */
+bool AmWalPipeline(void)
+{
+	if (MyBackendType == B_BG_WORKER && MyBgworkerEntry)
+	{
+		if (strncmp(MyBgworkerEntry->bgw_name, "wal pipeline", 12) == 0)
+			return true;
+	}
+
+	return false;
+}
+
+/*
+ * Clean up producer-side resources
+ */
+static void
+cleanup_producer_resources(void)
+{
+	if (producer_mq_handle)
+	{
+		shm_mq_detach(producer_mq_handle);
+		producer_mq_handle = NULL;
+	}
+
+	if (producer_dsm_seg)
+	{
+		dsm_detach(producer_dsm_seg);
+		producer_dsm_seg = NULL;
+	}
+
+	producer_mq = NULL;
+
+	SpinLockAcquire(&WalPipelineShm->mutex);
+	WalPipelineShm->producer_pid = 0;
+	SpinLockRelease(&WalPipelineShm->mutex);
+}
+
+
+/*
+ * Cleanup callback for process exit
+ */
+static void
+wal_pipeline_cleanup_callback(int code, Datum arg)
+{
+	pid_t mypid = MyProcPid;
+	bool is_producer = false;
+
+	if (WalPipelineShm)
+	{
+		SpinLockAcquire(&WalPipelineShm->mutex);
+		is_producer = (WalPipelineShm->producer_pid == mypid);
+		SpinLockRelease(&WalPipelineShm->mutex);
+	}
+
+	if (is_producer)
+		cleanup_producer_resources();
+	else
+		cleanup_consumer_resources();
+}
+
+/* --------------------------------
+ *		signal handler routines
+ * --------------------------------
+ */
+
+ /* SIGUSR2: set flag to finish recovery */
+static void
+PipelineProcTriggerHandler(SIGNAL_ARGS)
+{
+	promote_signaled = true;
+	WakeupRecovery();
+}
+
+/* SIGHUP: set flag to re-read config file at next convenient time */
+static void
+PipelineBgwSigHupHandler(SIGNAL_ARGS)
+{
+	got_SIGHUP = true;
+	WakeupRecovery();
+}
+
+/*
+ * Re-read the config file.
+ *
+ * If one of the critical walreceiver options has changed, flag xlogrecovery.c
+ * to restart it.
+ */
+static void
+PipelineRereadConfig(void)
+{
+	char	   *conninfo = pstrdup(PrimaryConnInfo);
+	char	   *slotname = pstrdup(PrimarySlotName);
+	bool		tempSlot = wal_receiver_create_temp_slot;
+	bool		conninfoChanged;
+	bool		slotnameChanged;
+	bool		tempSlotChanged = false;
+
+	ProcessConfigFile(PGC_SIGHUP);
+
+	conninfoChanged = strcmp(conninfo, PrimaryConnInfo) != 0;
+	slotnameChanged = strcmp(slotname, PrimarySlotName) != 0;
+
+	/*
+	 * wal_receiver_create_temp_slot is used only when we have no slot
+	 * configured.  We do not need to track this change if it has no effect.
+	 */
+	if (!slotnameChanged && strcmp(PrimarySlotName, "") == 0)
+		tempSlotChanged = tempSlot != wal_receiver_create_temp_slot;
+	pfree(conninfo);
+	pfree(slotname);
+
+	if (conninfoChanged || slotnameChanged || tempSlotChanged)
+		StartupRequestWalReceiverRestart();
+}
+
+bool
+IsPromoteSignaledPipeline(void)
+{
+	return promote_signaled;
+}
+
+void
+ResetPromoteSignaledPipeline(void)
+{
+	promote_signaled = false;
+}
+
+/*
+ * Process any requests or signals received recently.
+ */
+void
+ProcessPipelineBgwInterrupts(void)
+{
+
+	bool shutdown_requested;
+
+	if (got_SIGHUP)
+	{
+		got_SIGHUP = false;
+		PipelineRereadConfig();
+	}
+
+	SpinLockAcquire(&WalPipelineShm->mutex);
+	shutdown_requested = WalPipelineShm->shutdown_requested;
+	SpinLockRelease(&WalPipelineShm->mutex);
+
+	if (shutdown_requested)
+		proc_exit(0);
+
+	CHECK_FOR_INTERRUPTS();
+}
\ No newline at end of file
diff --git a/src/backend/access/transam/xlogprefetcher.c b/src/backend/access/transam/xlogprefetcher.c
index 83a3f97a57c..ac91fdf078f 100644
--- a/src/backend/access/transam/xlogprefetcher.c
+++ b/src/backend/access/transam/xlogprefetcher.c
@@ -27,6 +27,7 @@
 
 #include "postgres.h"
 
+#include "access/xlogpipeline.h"
 #include "access/xlogprefetcher.h"
 #include "access/xlogreader.h"
 #include "catalog/pg_control.h"
@@ -355,7 +356,7 @@ XLogPrefetchReconfigure(void)
 static inline void
 XLogPrefetchIncrement(pg_atomic_uint64 *counter)
 {
-	Assert(AmStartupProcess() || !IsUnderPostmaster);
+	Assert(AmStartupProcess() || AmWalPipeline() || !IsUnderPostmaster);
 	pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
 }
 
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 4d61795b483..6de4a75c6c2 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -3104,7 +3104,7 @@ ConfirmRecoveryPaused(void)
  * (emode must be either PANIC, LOG). In standby mode, retries until a valid
  * record is available.
  */
-static XLogRecord *
+XLogRecord *
 ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
 		   bool fetching_ckpt, TimeLineID replayTLI)
 {
@@ -3112,7 +3112,7 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
 	XLogReaderState *xlogreader = XLogPrefetcherGetReader(xlogprefetcher);
 	XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
 
-	Assert(AmStartupProcess() || !IsUnderPostmaster);
+	Assert(AmStartupProcess() || AmWalPipeline() || !IsUnderPostmaster);
 
 	/* Pass through parameters to XLogPageRead */
 	private->fetching_ckpt = fetching_ckpt;
@@ -3273,7 +3273,7 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
  * XLogPageRead() to try fetching the record from another source, or to
  * sleep and retry.
  */
-static int
+int
 XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
 			 XLogRecPtr targetRecPtr, char *readBuf)
 {
@@ -3285,7 +3285,7 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
 	int			r;
 	instr_time	io_start;
 
-	Assert(AmStartupProcess() || !IsUnderPostmaster);
+	Assert(AmStartupProcess() || AmWalPipeline()|| !IsUnderPostmaster);
 
 	XLByteToSeg(targetPagePtr, targetSegNo, wal_segment_size);
 	targetPageOff = XLogSegmentOffset(targetPagePtr, wal_segment_size);
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index 2e4acad4f00..24b188f1af4 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -13,6 +13,7 @@
 #include "postgres.h"
 
 #include "access/parallel.h"
+#include "access/xlogpipeline.h"
 #include "commands/repack.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
@@ -166,6 +167,10 @@ static const struct
 	{
 		.fn_name = "DataChecksumsWorkerMain",
 		.fn_addr = DataChecksumsWorkerMain
+	},
+	{
+		.fn_name = "WalPipeline_ProducerMain",
+		.fn_addr = WalPipeline_ProducerMain
 	}
 };
 
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 90c7c4528e8..bc0239c0cff 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -91,6 +91,7 @@
 
 #include "access/xlog.h"
 #include "access/xlog_internal.h"
+#include "access/xlogpipeline.h"
 #include "access/xlogrecovery.h"
 #include "common/file_perm.h"
 #include "common/pg_prng.h"
@@ -3936,12 +3937,21 @@ process_pm_pmsignal(void)
 		CheckPromoteSignal())
 	{
 		/*
-		 * Tell startup process to finish recovery.
+		 * Tell startup process to finish recovery. Incase pipeline is enabled
+		 * also signal the pipeline worker.
 		 *
 		 * Leave the promote signal file in place and let the Startup process
 		 * do the unlink.
 		 */
 		signal_child(StartupPMChild, SIGUSR2);
+
+		if (wal_pipeline_enabled && WalPipeline_IsActive())
+		{
+			pid_t pipeline_bgw = WalPipeline_GetProducerPid();
+
+			if (pipeline_bgw != InvalidPid)
+				signal_child(FindPostmasterChildByPid(pipeline_bgw), SIGUSR2);
+		}
 	}
 }
 
diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat
index afaa058b046..5055b0f4f35 100644
--- a/src/backend/utils/misc/guc_parameters.dat
+++ b/src/backend/utils/misc/guc_parameters.dat
@@ -3508,6 +3508,21 @@
   boot_val => 'false',
 },
 
+{ name => 'wal_pipeline', type => 'bool', context => 'PGC_POSTMASTER', group => 'WAL_RECOVERY',
+  short_desc => 'Use parallel workers to speedup recovery.',
+  variable => 'wal_pipeline_enabled',
+  boot_val => 'false',
+},
+
+{ name => 'wal_pipeline_queue_size', type => 'int', context => 'PGC_POSTMASTER', group => 'WAL_RECOVERY',
+  short_desc => 'Size of the shared memory queue used by the WAL pipeline.',
+  flags => 'GUC_UNIT_MB',
+  variable => 'wal_pipeline_mq_size_mb',
+  boot_val => '128',
+  min => '1',
+  max => '1024',
+},
+
 { name => 'wal_receiver_create_temp_slot', type => 'bool', context => 'PGC_SIGHUP', group => 'REPLICATION_STANDBY',
   short_desc => 'Sets whether a WAL receiver should create a temporary replication slot if no permanent slot is configured.',
   variable => 'wal_receiver_create_temp_slot',
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index ac38cddaaf9..4dc79db1b05 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -288,6 +288,8 @@
 #recovery_prefetch = try        # prefetch pages referenced in the WAL?
 #wal_decode_buffer_size = 512kB # lookahead window used for prefetching
                                 # (change requires restart)
+#wal_pipeline = off              # decode in parallel
+#wal_pipeline_queue_size = 128MB
 
 # - Archiving -
 
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 4dd98624204..9979e064325 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -40,6 +40,8 @@ extern PGDLLIMPORT int min_wal_size_mb;
 extern PGDLLIMPORT int max_wal_size_mb;
 extern PGDLLIMPORT int wal_keep_size_mb;
 extern PGDLLIMPORT int max_slot_wal_keep_size_mb;
+extern PGDLLIMPORT int wal_pipeline_mq_size_mb;
+extern PGDLLIMPORT bool wal_pipeline_enabled;
 extern PGDLLIMPORT int XLOGbuffers;
 extern PGDLLIMPORT int XLogArchiveTimeout;
 extern PGDLLIMPORT int wal_retrieve_retry_interval;
diff --git a/src/include/access/xlogpipeline.h b/src/include/access/xlogpipeline.h
new file mode 100644
index 00000000000..333090632b4
--- /dev/null
+++ b/src/include/access/xlogpipeline.h
@@ -0,0 +1,105 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogpipeline.h
+ *    WAL replay pipeline for parallel recovery
+ *
+ * This module implements a producer-consumer pipeline for WAL replay:
+ * - Producer: background worker that reads and decodes WAL records
+ * - Consumer: startup process: core redo loop
+ *
+ * The pipeline uses shared memory queues (shm_mq) to pass decoded WAL
+ * records from producer to consumer, enabling parallelism while
+ * maintaining sequential replay semantics.
+ *
+ * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
+ *
+ * src/include/access/xlogpipeline.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAL_PIPELINE_H
+#define WAL_PIPELINE_H
+
+#include "access/xlogreader.h"
+#include "access/xlogrecovery.h"
+#include "access/xlogutils.h"
+#include "storage/dsm.h"
+#include "storage/shm_mq.h"
+#include "storage/spin.h"
+
+/*
+ * Magic number for shared memory TOC
+ */
+#define PG_WAL_PIPELINE_MAGIC 0x57414C50  /* "WALP" */
+
+/*
+ * Message types sent through the pipeline
+ */
+typedef enum WalMsgType
+{
+	WAL_MSG_INVALID = 0,
+	WAL_MSG_RECORD,         /* Decoded WAL record */
+	WAL_MSG_SHUTDOWN,       /* Graceful shutdown request */
+} WalMsgType;
+
+/* Wire header for a serialized WAL message */
+typedef struct WalRecordMsgHeader
+{
+	WalMsgType  msg_type;             /* WAL_MSG_RECORD etc */
+	uint32      decoded_size;         /* byte length of the payload that follows */
+	XLogRecPtr  readRecPtr;           /* XLogReaderState->ReadRecPtr */
+	XLogRecPtr  endRecPtr;            /* XLogReaderState->EndRecPtr */
+	XLogRecPtr  missingContrecPtr;    /* XLogReaderState->missingContrecPtr */
+	XLogRecPtr  abortedRecPtr;        /* XLogReaderState->abortedRecPtr */
+	XLogRecPtr  overwrittenRecPtr;    /* XLogReaderState->overwrittenRecPtr */
+} WalRecordMsgHeader;
+
+/*
+ * Shared memory control structure for the WAL pipeline
+ */
+typedef struct WalPipelineShmCtl
+{
+	/* Lifecycle management */
+	slock_t         mutex;
+	bool            initialized;
+	bool            shutdown_requested;
+
+	/* Producer state */
+	pid_t           producer_pid;
+	XLogRecPtr      producer_lsn;   /* Last LSN read by producer */
+
+	/* Consumer state */
+	pid_t           consumer_pid;
+	XLogRecPtr      consumer_lsn;   /* Last LSN recieved by consumer */
+	XLogRecPtr      applied_lsn;   	/* Last LSN applied by consumer */
+
+	/* Queue handles */
+	dsm_handle      dsm_seg_handle;
+	shm_mq_handle   *producer_mq_handle;
+	shm_mq_handle   *consumer_mq_handle;
+
+	/* Statistics */
+	uint64          records_sent;
+	uint64          records_received;
+	uint64          bytes_sent;
+	uint64          bytes_received;
+} WalPipelineShmCtl;
+
+/* consumer may have to compute prefetecher stats */
+extern PGDLLIMPORT XLogPrefetcher *xlogprefetcher_pipelined;
+
+
+/* Producer functions (called by background worker) */
+extern void WalPipeline_ProducerMain(Datum main_arg);
+extern bool WalPipeline_SendRecord(XLogReaderState *record);
+extern bool WalPipeline_SendShutdown(void);
+extern bool AmWalPipeline(void);
+
+
+extern void ProcessPipelineBgwInterrupts(void);
+extern bool IsPromoteSignaledPipeline(void);
+extern void ResetPromoteSignaledPipeline(void);
+/* Global shared memory pointer */
+extern WalPipelineShmCtl *WalPipelineShm;
+
+#endif   /* WAL_PIPELINE_H */
\ No newline at end of file
diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h
index ba7750dca0b..81ac984a904 100644
--- a/src/include/access/xlogrecovery.h
+++ b/src/include/access/xlogrecovery.h
@@ -217,6 +217,10 @@ extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
 extern TimestampTz GetLatestXTime(void);
 extern TimestampTz GetCurrentChunkReplayStartTime(void);
 extern XLogRecPtr GetCurrentReplayRecPtr(TimeLineID *replayEndTLI);
+extern XLogRecord *ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
+		   bool fetching_ckpt, TimeLineID replayTLI);
+extern int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
+			 XLogRecPtr targetRecPtr, char *readBuf);
 
 extern bool PromoteIsTriggered(void);
 extern bool CheckPromoteSignal(void);
diff --git a/src/include/storage/subsystemlist.h b/src/include/storage/subsystemlist.h
index 9ad619080be..e9ff6de9a1a 100644
--- a/src/include/storage/subsystemlist.h
+++ b/src/include/storage/subsystemlist.h
@@ -42,6 +42,7 @@ PG_SHMEM_SUBSYSTEM(MultiXactShmemCallbacks)
 PG_SHMEM_SUBSYSTEM(BufferManagerShmemCallbacks)
 PG_SHMEM_SUBSYSTEM(StrategyCtlShmemCallbacks)
 PG_SHMEM_SUBSYSTEM(BufTableShmemCallbacks)
+PG_SHMEM_SUBSYSTEM(WalPipelineShmemCallbacks)
 
 /* lock manager */
 PG_SHMEM_SUBSYSTEM(LockManagerShmemCallbacks)
-- 
2.34.1

