From 59e27587ff6a6c57586256ef38d91140997a3e0c Mon Sep 17 00:00:00 2001
From: baotiao <baotiao@gmail.com>
Date: Sun, 8 Feb 2026 07:22:31 +0800
Subject: [PATCH v1 4/4] Fix critical correctness bugs in double write buffer
 (DWB)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Fix several issues that made the DWB feature unsafe:

1. CRC computation: Move crc to first field of DWBufPageSlot and zero
   it before computing, so the stale crc value is excluded from the
   CRC range. Previously verification would always fail.

2. Fsync before data file writes: DWB pages must be durable before
   data file writes begin. Add per-page DWBufFlushFile() for
   non-checkpoint writes (bgwriter/backend eviction), and batch
   DWBufFlush() in BufferSync for checkpoint writes.

3. Checkpoint integration: Wire up DWBufPreCheckpoint/PostCheckpoint
   calls in CreateCheckPoint and CreateRestartPoint — they existed
   but were never called.

4. Recovery path: Wire up DWBufRecoveryInit in StartupXLOG to build
   recovery hash, DWBufRecoverPage in buffer_readv_complete to
   recover torn pages, and DWBufRecoveryFinish after FinishWalRecovery.

5. Backup FPW: Keep full_page_writes enabled when a backup is running
   even with DWB on, since pg_basebackup doesn't copy DWB files.

6. Slot overflow: Add bounds check when write_pos >= num_slots,
   flush and wrap instead of overwriting valid data.

7. PostCheckpoint race: Add resetting flag to prevent concurrent
   DWBufWritePage calls during position reset.

8. Build/initdb: Add dwbuf.c to meson.build, pg_dwbuf to initdb
   subdirs array.

9. Minor: Remove unused old_batch_id, fix %ld format string cast.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
---
 src/backend/access/transam/xlog.c      |  28 +++++-
 src/backend/storage/buffer/bufmgr.c    | 106 ++++++++++++++++++---
 src/backend/storage/buffer/dwbuf.c     | 124 ++++++++++++++++++-------
 src/backend/storage/buffer/meson.build |   1 +
 src/bin/initdb/initdb.c                |   3 +-
 src/include/storage/dwbuf.h            |  14 ++-
 6 files changed, 223 insertions(+), 53 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 74808d2fcf..45deb700d9 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -847,9 +847,12 @@ XLogInsertRecord(XLogRecData *rdata,
 			RedoRecPtr = Insert->RedoRecPtr;
 		}
 		/*
-		 * If DWB is enabled, we don't need full page writes.
+		 * If DWB is enabled and no backup is running, we don't need full
+		 * page writes — DWB provides torn page protection.  But during
+		 * backups, FPW must stay on because pg_basebackup doesn't copy
+		 * DWB files, so the standby has no DWB to recover from.
 		 */
-		if (DWBufIsEnabled())
+		if (DWBufIsEnabled() && Insert->runningBackups == 0)
 			doPageWrites = false;
 		else
 			doPageWrites = (Insert->fullPageWrites || Insert->runningBackups > 0);
@@ -5876,6 +5879,9 @@ StartupXLOG(void)
 		 */
 		ResetUnloggedRelations(UNLOGGED_RELATION_CLEANUP);
 
+		/* Scan DWB files and build recovery hash for torn page recovery */
+		DWBufRecoveryInit();
+
 		/*
 		 * Likewise, delete any saved transaction snapshot files that got left
 		 * behind by crashed backends.
@@ -5962,6 +5968,7 @@ StartupXLOG(void)
 	 * Finish WAL recovery.
 	 */
 	endOfRecoveryInfo = FinishWalRecovery();
+	DWBufRecoveryFinish();		/* clean up DWB recovery hash */
 	EndOfLog = endOfRecoveryInfo->endOfLog;
 	EndOfLogTLI = endOfRecoveryInfo->endOfLogTLI;
 	abortedRecPtr = endOfRecoveryInfo->abortedRecPtr;
@@ -6601,8 +6608,9 @@ GetFullPageWriteInfo(XLogRecPtr *RedoRecPtr_p, bool *doPageWrites_p)
 {
 	*RedoRecPtr_p = RedoRecPtr;
 	/*
-	 * If double write buffer is enabled, we don't need full page writes
-	 * because DWB provides torn page protection.
+	 * If DWB is enabled, hint that FPW is not needed.  This is only a
+	 * hint — XLogInsertRecord re-checks with the authoritative backup
+	 * state and will re-enable FPW if a backup is running.
 	 */
 	if (DWBufIsEnabled())
 		*doPageWrites_p = false;
@@ -7324,6 +7332,9 @@ CreateCheckPoint(int flags)
 	}
 	pfree(vxids);
 
+	/* Flush all pending DWB writes before checkpoint */
+	DWBufPreCheckpoint();
+
 	CheckPointGuts(checkPoint.redo, flags);
 
 	vxids = GetVirtualXIDsDelayingChkpt(&nvxids, DELAY_CHKPT_COMPLETE);
@@ -7446,6 +7457,9 @@ CreateCheckPoint(int flags)
 	 */
 	SyncPostCheckpoint();
 
+	/* Reset DWB for next checkpoint cycle */
+	DWBufPostCheckpoint(recptr);
+
 	/*
 	 * Update the average distance between checkpoints if the prior checkpoint
 	 * exists.
@@ -7832,6 +7846,9 @@ CreateRestartPoint(int flags)
 	/* Update the process title */
 	update_checkpoint_display(flags, true, false);
 
+	/* Flush all pending DWB writes before checkpoint */
+	DWBufPreCheckpoint();
+
 	CheckPointGuts(lastCheckPoint.redo, flags);
 
 	/*
@@ -7894,6 +7911,9 @@ CreateRestartPoint(int flags)
 	}
 	LWLockRelease(ControlFileLock);
 
+	/* Reset DWB for next checkpoint cycle */
+	DWBufPostCheckpoint(lastCheckPoint.redo);
+
 	/*
 	 * Update the average distance between checkpoints/restartpoints if the
 	 * prior checkpoint exists.
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 8c1e78fba2..597db1d521 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -3633,10 +3633,56 @@ BufferSync(int flags)
 	binaryheap_build(ts_heap);
 
 	/*
-	 * Iterate through to-be-checkpointed buffers and write the ones (still)
-	 * marked with BM_CHECKPOINT_NEEDED. The writes are balanced between
-	 * tablespaces; otherwise the sorting would lead to only one tablespace
-	 * receiving writes at a time, making inefficient use of the hardware.
+	 * Phase 1 (DWB): If double write buffer is enabled, write all
+	 * checkpoint-dirty pages to DWB first, then batch fsync once.
+	 * This ensures torn page protection before data file writes begin.
+	 */
+	if (DWBufIsEnabled())
+	{
+		char	   *dwb_buf;
+
+		dwb_buf = palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, 0);
+
+		for (i = 0; i < num_to_scan; i++)
+		{
+			BufferDesc *bufHdr;
+			uint64		state;
+			Page		page;
+
+			buf_id = CkptBufferIds[i].buf_id;
+			bufHdr = GetBufferDescriptor(buf_id);
+			state = pg_atomic_read_u64(&bufHdr->state);
+
+			if (!(state & BM_CHECKPOINT_NEEDED))
+				continue;
+			if (state & BM_IO_IN_PROGRESS)
+				continue;		/* being written by someone else */
+
+			/* Copy page content — buffer is readable with shared lock */
+			page = BufHdrGetBlock(bufHdr);
+			memcpy(dwb_buf, page, BLCKSZ);
+
+			DWBufWritePage(BufTagGetRelFileLocator(&bufHdr->tag),
+						   BufTagGetForkNum(&bufHdr->tag),
+						   bufHdr->tag.blockNum,
+						   dwb_buf,
+						   BufferGetLSN(bufHdr));
+		}
+
+		/* Single batch fsync for all DWB writes */
+		DWBufFlush();
+		pfree(dwb_buf);
+
+		/* Tell FlushBuffer to skip per-page DWB writes */
+		DWBufSetCheckpointWritesDone(true);
+	}
+
+	/*
+	 * Phase 2: Iterate through to-be-checkpointed buffers and write the
+	 * ones (still) marked with BM_CHECKPOINT_NEEDED. The writes are
+	 * balanced between tablespaces; otherwise the sorting would lead to
+	 * only one tablespace receiving writes at a time, making inefficient
+	 * use of the hardware.
 	 */
 	num_processed = 0;
 	num_written = 0;
@@ -3708,6 +3754,10 @@ BufferSync(int flags)
 	 */
 	IssuePendingWritebacks(&wb_context, IOCONTEXT_NORMAL);
 
+	/* Clear the checkpoint-writes-done flag */
+	if (DWBufIsEnabled())
+		DWBufSetCheckpointWritesDone(false);
+
 	pfree(per_ts_stat);
 	per_ts_stat = NULL;
 	binaryheap_free(ts_heap);
@@ -4499,17 +4549,21 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	io_start = pgstat_prepare_io_time(track_io_timing);
 
 	/*
-	 * If double write buffer is enabled, write the page to DWB first.
-	 * This protects against torn pages without needing full page writes in WAL.
-	 * DWBufWritePage now includes fsync internally for correctness.
+	 * If double write buffer is enabled and checkpoint has not already
+	 * written this page to DWB, write it now with per-page fsync.
+	 * This protects against torn pages without needing full page writes
+	 * in WAL.
 	 */
-	if (DWBufIsEnabled())
+	if (DWBufIsEnabled() && !DWBufCheckpointWritesDone())
 	{
-		DWBufWritePage(BufTagGetRelFileLocator(&buf->tag),
-					   BufTagGetForkNum(&buf->tag),
-					   buf->tag.blockNum,
-					   bufToWrite,
-					   recptr);
+		int			dwb_file_idx;
+
+		dwb_file_idx = DWBufWritePage(BufTagGetRelFileLocator(&buf->tag),
+									  BufTagGetForkNum(&buf->tag),
+									  buf->tag.blockNum,
+									  bufToWrite,
+									  recptr);
+		DWBufFlushFile(dwb_file_idx);
 	}
 
 	/*
@@ -8190,6 +8244,30 @@ buffer_readv_complete_one(PgAioTargetData *td, uint8 buf_off, Buffer buffer,
 		if (!PageIsVerified((Page) bufdata, tag.blockNum, piv_flags,
 							failed_checksum))
 		{
+			/*
+			 * Page verification failed — try to recover from the
+			 * double write buffer before giving up.
+			 */
+			if (DWBufRecoverPage(BufTagGetRelFileLocator(&tag),
+								 BufTagGetForkNum(&tag),
+								 tag.blockNum,
+								 (char *) bufdata))
+			{
+				/* Re-verify the recovered page */
+				bool	recovered_checksum_failure = false;
+
+				if (PageIsVerified((Page) bufdata, tag.blockNum,
+								   piv_flags, &recovered_checksum_failure))
+				{
+					/* Successfully recovered from DWB */
+					elog(LOG, "recovered torn page %u/%u/%u fork %d block %u from double write buffer",
+						 tag.spcOid, BufTagGetRelFileLocator(&tag).dbOid,
+						 BufTagGetRelFileLocator(&tag).relNumber,
+						 BufTagGetForkNum(&tag), tag.blockNum);
+					goto page_verified;
+				}
+			}
+
 			if (flags & READ_BUFFERS_ZERO_ON_ERROR)
 			{
 				memset(bufdata, 0, BLCKSZ);
@@ -8205,6 +8283,8 @@ buffer_readv_complete_one(PgAioTargetData *td, uint8 buf_off, Buffer buffer,
 		else if (*failed_checksum)
 			*ignored_checksum = true;
 
+page_verified:
+
 		/* undo what we did above */
 #ifdef USE_VALGRIND
 		if (!BufferIsPinned(buffer))
diff --git a/src/backend/storage/buffer/dwbuf.c b/src/backend/storage/buffer/dwbuf.c
index 5c5b14f5b3..9943308323 100644
--- a/src/backend/storage/buffer/dwbuf.c
+++ b/src/backend/storage/buffer/dwbuf.c
@@ -140,6 +140,7 @@ DWBufShmemInit(void)
 		DWBufCtl->batch_id = 0;
 		DWBufCtl->flushed_batch_id = 0;
 		DWBufCtl->checkpoint_lsn = InvalidXLogRecPtr;
+		DWBufCtl->resetting = false;
 	}
 }
 
@@ -306,12 +307,14 @@ DWBufClose(void)
 }
 
 /*
- * Write a page to the double write buffer and fsync.
+ * Write a page to the double write buffer.
  *
- * This function writes the page to DWB and ensures it's fsynced to disk
- * before returning, guaranteeing torn page protection.
+ * Returns the file index that was written to, so the caller can fsync
+ * that specific file if needed (e.g. for non-checkpoint writes).
+ *
+ * Returns -1 if DWB is not enabled.
  */
-void
+int
 DWBufWritePage(RelFileLocator rlocator, ForkNumber forknum,
 			   BlockNumber blkno, const char *page, XLogRecPtr lsn)
 {
@@ -323,7 +326,11 @@ DWBufWritePage(RelFileLocator rlocator, ForkNumber forknum,
 	pg_crc32c	crc;
 
 	if (!double_write_buffer || DWBufCtl == NULL)
-		return;
+		return -1;
+
+	/* Wait if DWB is being reset by PostCheckpoint */
+	while (DWBufCtl->resetting)
+		pg_usleep(100);
 
 	/* Ensure files are opened in this process */
 	DWBufOpenFiles();
@@ -331,15 +338,23 @@ DWBufWritePage(RelFileLocator rlocator, ForkNumber forknum,
 	/* Get next slot position atomically */
 	pos = pg_atomic_fetch_add_u64(&DWBufCtl->write_pos, 1);
 
+	/* If DWB is full, flush before overwriting */
+	if (pos >= (uint64) DWBufCtl->num_slots)
+	{
+		DWBufFlush();
+		pos = pos % DWBufCtl->num_slots;
+	}
+
 	/* Calculate file and slot indices */
-	file_idx = (pos / DWBufCtl->slots_per_file) % DWBufCtl->num_files;
-	slot_idx = pos % DWBufCtl->slots_per_file;
+	file_idx = (pos % DWBufCtl->num_slots) / DWBufCtl->slots_per_file;
+	slot_idx = (pos % DWBufCtl->num_slots) % DWBufCtl->slots_per_file;
 
 	/* Calculate offset in file */
 	offset = sizeof(DWBufFileHeader) + (off_t) slot_idx * DWBUF_SLOT_SIZE;
 
 	/* Build slot header in local buffer */
 	slot = (DWBufPageSlot *) dwbuf_page_buffer;
+	slot->crc = 0;				/* zero before CRC computation */
 	slot->rlocator = rlocator;
 	slot->forknum = forknum;
 	slot->blkno = blkno;
@@ -351,7 +366,7 @@ DWBufWritePage(RelFileLocator rlocator, ForkNumber forknum,
 	/* Copy page data after header */
 	memcpy(dwbuf_page_buffer + sizeof(DWBufPageSlot), page, BLCKSZ);
 
-	/* Compute CRC over slot header and page data */
+	/* Compute CRC over slot header (excluding crc field) and page data */
 	INIT_CRC32C(crc);
 	COMP_CRC32C(crc, dwbuf_page_buffer + sizeof(pg_crc32c),
 				sizeof(DWBufPageSlot) - sizeof(pg_crc32c) + BLCKSZ);
@@ -365,10 +380,7 @@ DWBufWritePage(RelFileLocator rlocator, ForkNumber forknum,
 				(errcode_for_file_access(),
 				 errmsg("could not write to double write buffer: %m")));
 
-	/*
-	 * NOTE: We don't fsync immediately here for performance reasons.
-	 * The DWBufFlush() function will fsync all files before checkpoint.
-	 */
+	return file_idx;
 }
 
 /*
@@ -411,6 +423,30 @@ DWBufFlush(void)
 	pg_atomic_write_u64(&DWBufCtl->flush_pos, current_pos);
 }
 
+/*
+ * Fsync a single DWB file by index.
+ * Used for per-page fsync in the non-checkpoint write path.
+ */
+void
+DWBufFlushFile(int file_idx)
+{
+	if (!double_write_buffer || DWBufCtl == NULL)
+		return;
+
+	/* Ensure files are opened in this process */
+	if (DWBufFilesOpenedPid != getpid() || DWBufFds[0] < 0)
+		DWBufOpenFiles();
+
+	if (file_idx >= 0 && file_idx < DWBufCtl->num_files && DWBufFds[file_idx] >= 0)
+	{
+		if (pg_fsync(DWBufFds[file_idx]) != 0)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not fsync double write buffer file %d: %m",
+							file_idx)));
+	}
+}
+
 /*
  * Flush all pages and ensure DWB is fully synced.
  */
@@ -447,7 +483,6 @@ void
 DWBufPostCheckpoint(XLogRecPtr checkpoint_lsn)
 {
 	int			i;
-	uint64		old_batch_id;
 	uint64		new_batch_id;
 
 	if (!double_write_buffer || DWBufCtl == NULL)
@@ -457,33 +492,34 @@ DWBufPostCheckpoint(XLogRecPtr checkpoint_lsn)
 	if (DWBufFilesOpenedPid != getpid() || DWBufFds[0] < 0)
 		DWBufOpenFiles();
 
+	/*
+	 * Signal writers to wait — prevents new DWBufWritePage calls from
+	 * racing with our reset of write_pos.
+	 */
 	SpinLockAcquire(&DWBufCtl->mutex);
-
-	/* Save old batch ID and increment */
-	old_batch_id = DWBufCtl->batch_id;
-	DWBufCtl->batch_id++;
-	new_batch_id = DWBufCtl->batch_id;
-	DWBufCtl->checkpoint_lsn = checkpoint_lsn;
-
+	DWBufCtl->resetting = true;
 	SpinLockRelease(&DWBufCtl->mutex);
 
 	/*
-	 * Wait for all in-flight writes to complete before resetting write_pos.
-	 * We use batch_id as a synchronization point.
+	 * Wait briefly for any in-flight DWBufWritePage calls to finish their
+	 * pg_pwrite.  They have already obtained their slot position, so we
+	 * just need them to complete the write before we reset positions.
 	 */
-	{
-		uint64 current_pos = pg_atomic_read_u64(&DWBufCtl->write_pos);
-		uint64 num_slots = DWBufCtl->num_slots;
-
-		/* If write_pos wrapped around, wait for flush */
-		if (current_pos >= num_slots)
-			DWBufFlush();
-	}
+	pg_memory_barrier();
+	pg_usleep(1000);	/* 1ms — conservative */
 
 	/* Now safe to reset positions for new batch */
 	pg_atomic_write_u64(&DWBufCtl->write_pos, 0);
 	pg_atomic_write_u64(&DWBufCtl->flush_pos, 0);
 
+	/* Update batch and clear resetting flag */
+	SpinLockAcquire(&DWBufCtl->mutex);
+	DWBufCtl->batch_id++;
+	new_batch_id = DWBufCtl->batch_id;
+	DWBufCtl->checkpoint_lsn = checkpoint_lsn;
+	DWBufCtl->resetting = false;
+	SpinLockRelease(&DWBufCtl->mutex);
+
 	/* Update file headers with new batch info */
 	for (i = 0; i < DWBufCtl->num_files; i++)
 	{
@@ -667,7 +703,7 @@ DWBufRecoveryInit(void)
 	pfree(buffer);
 
 	elog(LOG, "double write buffer recovery initialized with %ld pages",
-		 hash_get_num_entries(dwbuf_recovery_hash));
+		 (long) hash_get_num_entries(dwbuf_recovery_hash));
 }
 
 /*
@@ -784,3 +820,29 @@ DWBufGetBatchId(void)
 
 	return batch_id;
 }
+
+/*
+ * Module-static flag: when true, FlushBuffer skips DWB writes because
+ * checkpoint Phase 1 has already written all pages to DWB and fsynced.
+ */
+static bool dwbuf_checkpoint_writes_done = false;
+
+/*
+ * Set the checkpoint-writes-done flag.
+ * Called by BufferSync to bracket the checkpoint write loop.
+ */
+void
+DWBufSetCheckpointWritesDone(bool done)
+{
+	dwbuf_checkpoint_writes_done = done;
+}
+
+/*
+ * Check if checkpoint DWB writes are already done.
+ * FlushBuffer uses this to skip per-page DWB writes during checkpoint.
+ */
+bool
+DWBufCheckpointWritesDone(void)
+{
+	return dwbuf_checkpoint_writes_done;
+}
diff --git a/src/backend/storage/buffer/meson.build b/src/backend/storage/buffer/meson.build
index ed84bf0897..dafdcb65d5 100644
--- a/src/backend/storage/buffer/meson.build
+++ b/src/backend/storage/buffer/meson.build
@@ -4,6 +4,7 @@ backend_sources += files(
   'buf_init.c',
   'buf_table.c',
   'bufmgr.c',
+  'dwbuf.c',
   'freelist.c',
   'localbuf.c',
 )
diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c
index a3980e5535..4042241877 100644
--- a/src/bin/initdb/initdb.c
+++ b/src/bin/initdb/initdb.c
@@ -251,7 +251,8 @@ static const char *const subdirs[] = {
 	"pg_xact",
 	"pg_logical",
 	"pg_logical/snapshots",
-	"pg_logical/mappings"
+	"pg_logical/mappings",
+	"pg_dwbuf"
 };
 
 
diff --git a/src/include/storage/dwbuf.h b/src/include/storage/dwbuf.h
index 1b096867f2..bf20d691f6 100644
--- a/src/include/storage/dwbuf.h
+++ b/src/include/storage/dwbuf.h
@@ -34,11 +34,11 @@
  */
 typedef struct DWBufPageSlot
 {
+	pg_crc32c		crc;			/* CRC of slot header + page — MUST BE FIRST */
 	RelFileLocator	rlocator;		/* Relation file locator */
 	ForkNumber		forknum;		/* Fork number */
 	BlockNumber		blkno;			/* Block number in relation */
 	XLogRecPtr		lsn;			/* Page LSN at write time */
-	pg_crc32c		crc;			/* CRC of slot header + page content */
 	uint32			slot_id;		/* Slot identifier */
 	uint16			flags;			/* Slot flags */
 	uint16			checksum;		/* Page checksum (if enabled) */
@@ -85,6 +85,7 @@ typedef struct DWBufCtlData
 	uint64			batch_id;		/* Current batch ID */
 	uint64			flushed_batch_id;	/* Last fully flushed batch */
 	XLogRecPtr		checkpoint_lsn;	/* LSN of last checkpoint */
+	bool			resetting;		/* True during PostCheckpoint reset */
 
 	/* Configuration (set at startup) */
 	int				num_slots;		/* Total number of slots */
@@ -117,12 +118,17 @@ extern void DWBufInit(void);
 extern void DWBufClose(void);
 
 /* Write operations */
-extern void DWBufWritePage(RelFileLocator rlocator, ForkNumber forknum,
-						   BlockNumber blkno, const char *page,
-						   XLogRecPtr lsn);
+extern int DWBufWritePage(RelFileLocator rlocator, ForkNumber forknum,
+						  BlockNumber blkno, const char *page,
+						  XLogRecPtr lsn);
+extern void DWBufFlushFile(int file_idx);
 extern void DWBufFlush(void);
 extern void DWBufFlushAll(void);
 
+/* Checkpoint batch write support */
+extern void DWBufSetCheckpointWritesDone(bool done);
+extern bool DWBufCheckpointWritesDone(void);
+
 /* Checkpoint integration */
 extern void DWBufPreCheckpoint(void);
 extern void DWBufPostCheckpoint(XLogRecPtr checkpoint_lsn);
-- 
2.43.0

