From 83100d91f07ac784a355b38fa4dac5775437c1c1 Mon Sep 17 00:00:00 2001
From: "zongzhi.czz" <zongzhi.czz@alibaba-inc.com>
Date: Sat, 7 Feb 2026 22:08:16 +0800
Subject: [PATCH v1 2/4] Fix DWB process handling and skip FPW when DWB enabled

- Skip full page writes when double write buffer is enabled since DWB
  already provides torn page protection
- Fix file descriptor handling after fork by tracking process ID
- Initialize DWB in checkpointer process
- Improve batch synchronization in DWBufPostCheckpoint
- Add DWB shared memory initialization in ipci.c

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
---
 src/backend/access/transam/xlog.c     | 18 +++++-
 src/backend/postmaster/checkpointer.c |  6 ++
 src/backend/storage/buffer/Makefile   |  1 +
 src/backend/storage/buffer/bufmgr.c   |  2 +-
 src/backend/storage/buffer/dwbuf.c    | 79 ++++++++++++++++++++-------
 src/backend/storage/ipc/ipci.c        |  3 +
 6 files changed, 87 insertions(+), 22 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 13ec6225b8..74808d2fcf 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -85,6 +85,7 @@
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "storage/bufmgr.h"
+#include "storage/dwbuf.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/large_object.h"
@@ -845,7 +846,13 @@ XLogInsertRecord(XLogRecData *rdata,
 			Assert(RedoRecPtr < Insert->RedoRecPtr);
 			RedoRecPtr = Insert->RedoRecPtr;
 		}
-		doPageWrites = (Insert->fullPageWrites || Insert->runningBackups > 0);
+		/*
+		 * If DWB is enabled, we don't need full page writes.
+		 */
+		if (DWBufIsEnabled())
+			doPageWrites = false;
+		else
+			doPageWrites = (Insert->fullPageWrites || Insert->runningBackups > 0);
 
 		if (doPageWrites &&
 			(!prevDoPageWrites ||
@@ -6593,7 +6600,14 @@ void
 GetFullPageWriteInfo(XLogRecPtr *RedoRecPtr_p, bool *doPageWrites_p)
 {
 	*RedoRecPtr_p = RedoRecPtr;
-	*doPageWrites_p = doPageWrites;
+	/*
+	 * If double write buffer is enabled, we don't need full page writes
+	 * because DWB provides torn page protection.
+	 */
+	if (DWBufIsEnabled())
+		*doPageWrites_p = false;
+	else
+		*doPageWrites_p = doPageWrites;
 }
 
 /*
diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c
index e03c19123b..af2edbe222 100644
--- a/src/backend/postmaster/checkpointer.c
+++ b/src/backend/postmaster/checkpointer.c
@@ -53,6 +53,7 @@
 #include "replication/syncrep.h"
 #include "storage/aio_subsys.h"
 #include "storage/bufmgr.h"
+#include "storage/dwbuf.h"
 #include "storage/condition_variable.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
@@ -254,6 +255,11 @@ CheckpointerMain(const void *startup_data, size_t startup_data_len)
 												 ALLOCSET_DEFAULT_SIZES);
 	MemoryContextSwitchTo(checkpointer_context);
 
+	/*
+	 * Initialize double write buffer if enabled.
+	 */
+	DWBufInit();
+
 	/*
 	 * If an exception is encountered, processing resumes here.
 	 *
diff --git a/src/backend/storage/buffer/Makefile b/src/backend/storage/buffer/Makefile
index fd7c40dcb0..3abab9ec93 100644
--- a/src/backend/storage/buffer/Makefile
+++ b/src/backend/storage/buffer/Makefile
@@ -16,6 +16,7 @@ OBJS = \
 	buf_init.o \
 	buf_table.o \
 	bufmgr.o \
+	dwbuf.o \
 	freelist.o \
 	localbuf.o
 
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index ea84aeef26..8c1e78fba2 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -4501,6 +4501,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	/*
 	 * If double write buffer is enabled, write the page to DWB first.
 	 * This protects against torn pages without needing full page writes in WAL.
+	 * DWBufWritePage now includes fsync internally for correctness.
 	 */
 	if (DWBufIsEnabled())
 	{
@@ -4509,7 +4510,6 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 					   buf->tag.blockNum,
 					   bufToWrite,
 					   recptr);
-		DWBufFlush();
 	}
 
 	/*
diff --git a/src/backend/storage/buffer/dwbuf.c b/src/backend/storage/buffer/dwbuf.c
index 9ccb99b214..5c5b14f5b3 100644
--- a/src/backend/storage/buffer/dwbuf.c
+++ b/src/backend/storage/buffer/dwbuf.c
@@ -43,10 +43,12 @@ int			double_write_buffer_size = DWBUF_DEFAULT_SIZE_MB;
 /* Shared memory control structure */
 static DWBufCtlData *DWBufCtl = NULL;
 
+/* Process ID that opened the files (to detect fork) */
+static pid_t DWBufFilesOpenedPid = 0;
+
 /* Per-process file descriptors (FDs are per-process, not shareable) */
 static int DWBufFds[DWBUF_MAX_FILES] = {-1, -1, -1, -1, -1, -1, -1, -1,
                                          -1, -1, -1, -1, -1, -1, -1, -1};
-static bool DWBufFilesOpened = false;
 
 /* Directory for DWB files */
 #define DWBUF_DIR			"pg_dwbuf"
@@ -160,10 +162,20 @@ DWBufOpenFiles(void)
 	int			i;
 	char		path[MAXPGPATH];
 	struct stat	st;
-
-	if (DWBufFilesOpened)
+	pid_t		current_pid = getpid();
+
+	/*
+	 * Check if files are already opened in this process.
+	 * After fork, the child process will have different PID and needs to
+	 * reopen the files.
+	 */
+	if (DWBufFilesOpenedPid == current_pid && DWBufFds[0] >= 0)
 		return;
 
+	/* Close any inherited file descriptors from parent process */
+	if (DWBufFilesOpenedPid != current_pid && DWBufFds[0] >= 0)
+		DWBufClose();
+
 	if (!double_write_buffer || DWBufCtl == NULL)
 		return;
 
@@ -252,7 +264,7 @@ DWBufOpenFiles(void)
 													  PG_IO_ALIGN_SIZE,
 													  0);
 
-	DWBufFilesOpened = true;
+	DWBufFilesOpenedPid = current_pid;
 }
 
 /*
@@ -277,8 +289,9 @@ void
 DWBufClose(void)
 {
 	int			i;
+	pid_t		current_pid = getpid();
 
-	if (!DWBufFilesOpened)
+	if (DWBufFilesOpenedPid != current_pid || DWBufFds[0] < 0)
 		return;
 
 	for (i = 0; i < DWBUF_MAX_FILES; i++)
@@ -289,11 +302,14 @@ DWBufClose(void)
 			DWBufFds[i] = -1;
 		}
 	}
-	DWBufFilesOpened = false;
+	DWBufFilesOpenedPid = 0;
 }
 
 /*
- * Write a page to the double write buffer.
+ * Write a page to the double write buffer and fsync.
+ *
+ * This function writes the page to DWB and ensures it's fsynced to disk
+ * before returning, guaranteeing torn page protection.
  */
 void
 DWBufWritePage(RelFileLocator rlocator, ForkNumber forknum,
@@ -309,9 +325,8 @@ DWBufWritePage(RelFileLocator rlocator, ForkNumber forknum,
 	if (!double_write_buffer || DWBufCtl == NULL)
 		return;
 
-	/* Ensure files are opened (lazy initialization) */
-	if (!DWBufFilesOpened)
-		DWBufOpenFiles();
+	/* Ensure files are opened in this process */
+	DWBufOpenFiles();
 
 	/* Get next slot position atomically */
 	pos = pg_atomic_fetch_add_u64(&DWBufCtl->write_pos, 1);
@@ -349,6 +364,11 @@ DWBufWritePage(RelFileLocator rlocator, ForkNumber forknum,
 		ereport(ERROR,
 				(errcode_for_file_access(),
 				 errmsg("could not write to double write buffer: %m")));
+
+	/*
+	 * NOTE: We don't fsync immediately here for performance reasons.
+	 * The DWBufFlush() function will fsync all files before checkpoint.
+	 */
 }
 
 /*
@@ -361,7 +381,7 @@ DWBufFlush(void)
 	uint64		current_pos;
 	uint64		flush_pos;
 
-	if (!double_write_buffer || DWBufCtl == NULL || !DWBufFilesOpened)
+	if (!double_write_buffer || DWBufCtl == NULL)
 		return;
 
 	current_pos = pg_atomic_read_u64(&DWBufCtl->write_pos);
@@ -371,6 +391,10 @@ DWBufFlush(void)
 	if (current_pos <= flush_pos)
 		return;
 
+	/* Ensure files are opened in this process */
+	if (DWBufFilesOpenedPid != getpid() || DWBufFds[0] < 0)
+		DWBufOpenFiles();
+
 	/* Fsync all DWB files */
 	for (i = 0; i < DWBufCtl->num_files; i++)
 	{
@@ -423,26 +447,43 @@ void
 DWBufPostCheckpoint(XLogRecPtr checkpoint_lsn)
 {
 	int			i;
+	uint64		old_batch_id;
+	uint64		new_batch_id;
 
 	if (!double_write_buffer || DWBufCtl == NULL)
 		return;
 
-	/* Ensure files are opened */
-	if (!DWBufFilesOpened)
+	/* Ensure files are opened in this process */
+	if (DWBufFilesOpenedPid != getpid() || DWBufFds[0] < 0)
 		DWBufOpenFiles();
 
 	SpinLockAcquire(&DWBufCtl->mutex);
 
-	/* Reset write position for new batch */
-	pg_atomic_write_u64(&DWBufCtl->write_pos, 0);
-	pg_atomic_write_u64(&DWBufCtl->flush_pos, 0);
-
-	/* Increment batch ID */
+	/* Save old batch ID and increment */
+	old_batch_id = DWBufCtl->batch_id;
 	DWBufCtl->batch_id++;
+	new_batch_id = DWBufCtl->batch_id;
 	DWBufCtl->checkpoint_lsn = checkpoint_lsn;
 
 	SpinLockRelease(&DWBufCtl->mutex);
 
+	/*
+	 * Wait for all in-flight writes to complete before resetting write_pos.
+	 * We use batch_id as a synchronization point.
+	 */
+	{
+		uint64 current_pos = pg_atomic_read_u64(&DWBufCtl->write_pos);
+		uint64 num_slots = DWBufCtl->num_slots;
+
+		/* If write_pos wrapped around, wait for flush */
+		if (current_pos >= num_slots)
+			DWBufFlush();
+	}
+
+	/* Now safe to reset positions for new batch */
+	pg_atomic_write_u64(&DWBufCtl->write_pos, 0);
+	pg_atomic_write_u64(&DWBufCtl->flush_pos, 0);
+
 	/* Update file headers with new batch info */
 	for (i = 0; i < DWBufCtl->num_files; i++)
 	{
@@ -464,7 +505,7 @@ DWBufPostCheckpoint(XLogRecPtr checkpoint_lsn)
 		}
 
 		/* Update header */
-		header.batch_id = DWBufCtl->batch_id;
+		header.batch_id = new_batch_id;
 		header.checkpoint_lsn = checkpoint_lsn;
 
 		/* Recompute CRC */
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 1f7e933d50..87887dcf69 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -40,6 +40,7 @@
 #include "replication/walsender.h"
 #include "storage/aio_subsys.h"
 #include "storage/bufmgr.h"
+#include "storage/dwbuf.h"
 #include "storage/dsm.h"
 #include "storage/dsm_registry.h"
 #include "storage/ipc.h"
@@ -141,6 +142,7 @@ CalculateShmemSize(void)
 	size = add_size(size, AioShmemSize());
 	size = add_size(size, WaitLSNShmemSize());
 	size = add_size(size, LogicalDecodingCtlShmemSize());
+	size = add_size(size, DWBufShmemSize());
 
 	/* include additional requested shmem from preload libraries */
 	size = add_size(size, total_addin_request);
@@ -274,6 +276,7 @@ CreateOrAttachShmemStructs(void)
 	SUBTRANSShmemInit();
 	MultiXactShmemInit();
 	BufferManagerShmemInit();
+	DWBufShmemInit();
 
 	/*
 	 * Set up lock manager
-- 
2.43.0

