From f9de90a0f67594065f788c0f9230c13e510694b6 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Wed, 13 May 2026 13:06:51 -0400
Subject: [PATCH v15 18/19] Write combining for background writer

Using the same logic used by normal client backends, when the bgwriter
is going to write out a dirty buffer, look for preceding and following
contiguous blocks also dirty and in shared buffers and write those out.
In situations where the working set doesn't fit in shared memory and
IOPs are constrained this can substantially improve performance and
throughput.

This commit removes ScheduleBufferTagForWriteback() since all callers
use the batch variant instead now.
---
 src/backend/storage/buffer/bufmgr.c | 123 +++++++++++++++-------------
 1 file changed, 64 insertions(+), 59 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 99c738aa2b1..595eda57db9 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -692,12 +692,11 @@ static void FlushBuffer(BufferDesc *buf, SMgrRelation reln,
 						IOObject io_object, IOContext io_context);
 static BufferDesc *PrepareOrRejectEagerFlushBuffer(Buffer bufnum,
 												   BufferTag *require,
-												   XLogRecPtr *lsn);
-static void ScheduleBufferTagForWriteback(WritebackContext *wb_context,
-										  IOContext io_context, BufferTag *tag);
+												   XLogRecPtr *lsn,
+												   bool *usage_count_zero);
 static void FlushBufferBatch(BufferWriteBatch *batch, IOContext io_context);
-static void ConstructCenteredEagerWriteBatch(BufferDesc *required_bufhdr,
-											 BufferWriteBatch *batch);
+static uint32 ConstructCenteredEagerWriteBatch(BufferDesc *required_bufhdr,
+											   BufferWriteBatch *batch);
 static Buffer LookupBufferForTag(BufferTag *tag);
 static void ExtendStrategyEagerWriteBatch(BufferAccessStrategy strategy, Buffer sweep_end,
 										  uint32 batch_limit,
@@ -2644,6 +2643,7 @@ EagerCleanStrategyBuffer(BufferAccessStrategy strategy, Buffer bufnum,
 	 */
 	for (;;)
 	{
+		bool		usage_count_zero;	/* unused */
 		XLogRecPtr	next_buf_lsn;	/* unused */
 
 		if (next_bufhdr)
@@ -2698,7 +2698,8 @@ EagerCleanStrategyBuffer(BufferAccessStrategy strategy, Buffer bufnum,
 		 */
 		next_bufhdr = PrepareOrRejectEagerFlushBuffer(next_bufnum,
 													  NULL,
-													  &next_buf_lsn);
+													  &next_buf_lsn,
+													  &usage_count_zero);
 	}
 }
 
@@ -4286,8 +4287,10 @@ BgBufferSyncCleanBuffers(int lru_maxpages, WritebackContext *wb_context,
 	for (; to_scan > 0; to_scan--, clean_idx++)
 	{
 		uint64		buf_state;
+		StartBufferIOResult status;
 		BufferDesc *bufHdr;
-		BufferTag	tag;
+		Buffer		bufnum;
+		BufferWriteBatch batch;
 
 		if (reusable >= upcoming_alloc_est)
 			break;
@@ -4318,18 +4321,32 @@ BgBufferSyncCleanBuffers(int lru_maxpages, WritebackContext *wb_context,
 		ReservePrivateRefCountEntry();
 		ResourceOwnerEnlarge(CurrentResourceOwner);
 
+		/*
+		 * Any other buffers found and added to the same batch will be pinned
+		 * when they are identified. We know we want to flush this buffer,
+		 * however, so we'll pin and lock it now. Pins and locks are released
+		 * when completing the writes on all buffers in the batch.
+		 */
 		if (!PinBuffer(bufHdr, BUC_ZERO, true))
 			continue;
 
-		FlushUnlockedBuffer(bufHdr, NULL, IOOBJECT_RELATION, IOCONTEXT_NORMAL);
+		bufnum = BufferDescriptorGetBuffer(bufHdr);
+		BufferLockAcquire(bufnum, bufHdr, BUFFER_LOCK_SHARE_EXCLUSIVE);
 
-		/* Snapshot the tag before unpinning */
-		tag = bufHdr->tag;
-		UnpinBuffer(bufHdr);
+		if ((status = StartBufferIO(bufnum, false, true, NULL)) !=
+			BUFFER_IO_READY_FOR_IO)
+		{
+			Assert(status == BUFFER_IO_ALREADY_DONE);
+			UnlockReleaseBuffer(bufnum);
+			continue;
+		}
 
-		ScheduleBufferTagForWriteback(wb_context, IOCONTEXT_NORMAL, &tag);
+		reusable += ConstructCenteredEagerWriteBatch(bufHdr, &batch);
+		FlushBufferBatch(&batch, IOCONTEXT_NORMAL);
+		CompleteWriteBatchIO(&batch, IOCONTEXT_NORMAL, wb_context);
+		num_written += batch.n;
 
-		if (++num_written >= lru_maxpages)
+		if (num_written >= lru_maxpages)
 		{
 			*maxwritten_clean = true;
 			break;
@@ -5170,8 +5187,11 @@ InitCenteredEagerWriteBatch(BufferDesc *required_bufhdr,
  * Construct a contiguous, fully prepared batch containing required_bufhdr.
  * This looks both forward and backwards for contiguous blocks that are dirty
  * and in shared buffers.
+ *
+ * Returns the number of buffers in the batch of adjacents that have zero
+ * usage counts. These will be reusable once their contents have been flushed.
  */
-static void
+static uint32
 ConstructCenteredEagerWriteBatch(BufferDesc *required_bufhdr,
 								 BufferWriteBatch *batch)
 {
@@ -5182,6 +5202,7 @@ ConstructCenteredEagerWriteBatch(BufferDesc *required_bufhdr,
 	BufferTag	require;
 	XLogRecPtr	lsn;
 	uint32		batch_limit;
+	uint32		zero_usage_count_buffers = 0;
 	BlockNumber blkno = required_bufhdr->tag.blockNum;
 
 	Assert(required_bufhdr);
@@ -5191,7 +5212,7 @@ ConstructCenteredEagerWriteBatch(BufferDesc *required_bufhdr,
 											  batch,
 											  &scan_start, &scan_end);
 	if (batch_limit <= 1)
-		return;
+		return zero_usage_count_buffers;
 
 	InitBufferTag(&require, &batch->reln->smgr_rlocator.locator,
 				  batch->forkno,
@@ -5206,6 +5227,7 @@ ConstructCenteredEagerWriteBatch(BufferDesc *required_bufhdr,
 	{
 		Buffer		bufnum;
 		BufferDesc *bufhdr;
+		bool		usage_count_zero;
 
 		/*
 		 * We must be sure not to process the required buffer here, as we've
@@ -5221,10 +5243,14 @@ ConstructCenteredEagerWriteBatch(BufferDesc *required_bufhdr,
 		Assert(BlockNumberIsValid(blkno));
 		bufnum = LookupBufferForTag(&require);
 		bufhdr = PrepareOrRejectEagerFlushBuffer(bufnum, &require,
-												 &lsn);
+												 &lsn,
+												 &usage_count_zero);
 		if (bufhdr == NULL)
 			break;
 
+		if (usage_count_zero)
+			zero_usage_count_buffers++;
+
 		if (lsn > batch->max_lsn)
 			batch->max_lsn = lsn;
 
@@ -5254,6 +5280,7 @@ ConstructCenteredEagerWriteBatch(BufferDesc *required_bufhdr,
 	{
 		Buffer		bufnum;
 		BufferDesc *bufhdr;
+		bool		usage_count_zero;
 
 		/*
 		 * We must be sure not to process the required buffer here, as we've
@@ -5269,13 +5296,17 @@ ConstructCenteredEagerWriteBatch(BufferDesc *required_bufhdr,
 		Assert(BlockNumberIsValid(right_blkno));
 		bufnum = LookupBufferForTag(&require);
 		bufhdr = PrepareOrRejectEagerFlushBuffer(bufnum, &require,
-												 &lsn);
+												 &lsn,
+												 &usage_count_zero);
 		if (bufhdr == NULL)
 			break;
 
 		if (lsn > batch->max_lsn)
 			batch->max_lsn = lsn;
 
+		if (usage_count_zero)
+			zero_usage_count_buffers++;
+
 		batch->bufhdrs[batch->n++] = bufhdr;
 	}
 
@@ -5289,6 +5320,8 @@ ConstructCenteredEagerWriteBatch(BufferDesc *required_bufhdr,
 		   batch->bufhdrs[0]->tag.blockNum + batch->n);
 	Assert(batch->bufhdrs[required_bufhdr->tag.blockNum -
 						  batch->bufhdrs[0]->tag.blockNum] == required_bufhdr);
+
+	return zero_usage_count_buffers;
 }
 
 /*
@@ -5304,16 +5337,23 @@ ConstructCenteredEagerWriteBatch(BufferDesc *required_bufhdr,
  * RelFileLocator and fork.
  *
  * If returning a buffer, also return its LSN.
+ *
+ * usage_count_zero is set to true if the buffer that is selected has a zero
+ * usage count. It is set to false if either the buffer is rejected or it has
+ * a non-zero usage count. It is only a hint, as we would need to hold the
+ * buffer header lock for the whole time to avoid it changing.
  */
 static BufferDesc *
 PrepareOrRejectEagerFlushBuffer(Buffer bufnum,
 								BufferTag *require,
-								XLogRecPtr *lsn)
+								XLogRecPtr *lsn,
+								bool *usage_count_zero)
 {
 	BufferDesc *bufhdr;
 	uint64		buf_state;
 
 	*lsn = InvalidXLogRecPtr;
+	*usage_count_zero = false;
 
 	if (!BufferIsValid(bufnum))
 		goto reject_buffer;
@@ -5391,6 +5431,8 @@ PrepareOrRejectEagerFlushBuffer(Buffer bufnum,
 		goto reject_buffer_unlock;
 
 	*lsn = BufferGetLSN(bufhdr);
+	*usage_count_zero =
+		BUF_STATE_GET_USAGECOUNT(pg_atomic_read_u64(&bufhdr->state)) == 0;
 
 	return bufhdr;
 
@@ -5435,6 +5477,7 @@ ExtendStrategyEagerWriteBatch(BufferAccessStrategy strategy,
 {
 	BlockNumber batch_start = batch->bufhdrs[0]->tag.blockNum;
 	BufferTag	require;
+	bool		usage_count_zero;
 
 	Assert(batch_limit > 1);
 
@@ -5480,7 +5523,8 @@ ExtendStrategyEagerWriteBatch(BufferAccessStrategy strategy,
 
 		require.blockNum = batch_start + batch->n;
 
-		bufhdr = PrepareOrRejectEagerFlushBuffer(bufnum, &require, &lsn);
+		bufhdr = PrepareOrRejectEagerFlushBuffer(bufnum, &require, &lsn,
+												 &usage_count_zero);
 
 		/*
 		 * Stop when we encounter a buffer that will break the run. Do not
@@ -8728,45 +8772,6 @@ WritebackContextInit(WritebackContext *context, int *max_pending)
 	context->nr_pending = 0;
 }
 
-/*
- * Add buffer to list of pending writeback requests.
- */
-static void
-ScheduleBufferTagForWriteback(WritebackContext *wb_context, IOContext io_context,
-							  BufferTag *tag)
-{
-	PendingWriteback *pending;
-
-	/*
-	 * As pg_flush_data() doesn't do anything with fsync disabled, there's no
-	 * point in tracking in that case.
-	 */
-	if (io_direct_flags & IO_DIRECT_DATA ||
-		!enableFsync)
-		return;
-
-	/*
-	 * Add buffer to the pending writeback array, unless writeback control is
-	 * disabled.
-	 */
-	if (*wb_context->max_pending > 0)
-	{
-		Assert(*wb_context->max_pending <= WRITEBACK_MAX_PENDING_FLUSHES);
-
-		pending = &wb_context->pending_writebacks[wb_context->nr_pending++];
-
-		pending->tag = *tag;
-	}
-
-	/*
-	 * Perform pending flushes if the writeback limit is exceeded. This
-	 * includes the case where previously an item has been added, but control
-	 * is now disabled.
-	 */
-	if (wb_context->nr_pending >= *wb_context->max_pending)
-		IssuePendingWritebacks(wb_context, io_context);
-}
-
 /*
  * Add all the blocks from a write batch that was recently issued to a list of
  * pending writeback requests. Don't call while holding buffer locks. tag
@@ -8829,7 +8834,7 @@ ScheduleBufferTagBatchForWriteback(WritebackContext *wb_context,
 
 /*
  * Issue all pending writeback requests, previously scheduled with
- * ScheduleBufferTagForWriteback, to the OS.
+ * ScheduleBufferTagBatchForWriteback, to the OS.
  *
  * Because this is only used to improve the OSs IO scheduling we try to never
  * error out - it's just a hint.
-- 
2.43.0

