From 8a990476cee50cb336f6be488616c82c4129557e Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Wed, 13 May 2026 12:45:55 -0400
Subject: [PATCH v15 15/19] Write combining for client backends

When flushing a dirty buffer, check if the blocks preceding and
following it are in shared buffers and whether or not they are dirty.
If they are, flush them together with the victim buffer.

Author: Melanie Plageman <melanieplageman@gmail.com>
Earlier version Reviewed-by: Chao Li <li.evan.chao@gmail.com>
Discussion: https://postgr.es/m/flat/0198DBB9-4A76-49E4-87F8-43D46DD0FD76%40gmail.com#1d8677fc75dc8b39f0eb5dd6bbafb65d
---
 src/backend/storage/buffer/bufmgr.c | 274 +++++++++++++++++++++++++++-
 src/backend/storage/smgr/md.c       |  36 ++++
 src/backend/storage/smgr/smgr.c     |  24 +++
 src/include/storage/md.h            |   3 +
 src/include/storage/smgr.h          |   3 +
 5 files changed, 334 insertions(+), 6 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 6c82e7d4039..abe78c39306 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -682,6 +682,9 @@ static uint32 CurrentMaxEagerWriteBatchSize(uint32 max_batch_size);
 static uint32 InitForwardEagerWriteBatch(BufferAccessStrategy strategy,
 										 BufferDesc *required_bufhdr,
 										 BufferWriteBatch *batch);
+static BlockNumber InitCenteredEagerWriteBatch(BufferDesc *required_bufhdr,
+											   BufferWriteBatch *batch,
+											   BlockNumber *scan_start, BlockNumber *scan_end);
 static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context);
 static void FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln,
 								IOObject io_object, IOContext io_context);
@@ -693,6 +696,9 @@ static BufferDesc *PrepareOrRejectEagerFlushBuffer(Buffer bufnum,
 static void ScheduleBufferTagForWriteback(WritebackContext *wb_context,
 										  IOContext io_context, BufferTag *tag);
 static void FlushBufferBatch(BufferWriteBatch *batch, IOContext io_context);
+static void ConstructCenteredEagerWriteBatch(BufferDesc *required_bufhdr,
+											 BufferWriteBatch *batch);
+static Buffer LookupBufferForTag(BufferTag *tag);
 static void ExtendStrategyEagerWriteBatch(BufferAccessStrategy strategy, Buffer sweep_end,
 										  uint32 batch_limit,
 										  BufferWriteBatch *batch,
@@ -2696,6 +2702,49 @@ EagerCleanStrategyBuffer(BufferAccessStrategy strategy, Buffer bufnum,
 	}
 }
 
+/*
+ * Given a target victim buffer, clean it and look for additional adjacent
+ * blocks that are already in shared buffers and dirty to eagerly flush along
+ * with it in a single write. The victim buffer must be already pinned and
+ * locked and will remain pinned upon return.
+ */
+static void
+EagerCleanBuffer(Buffer bufnum, BufferDesc *buf_hdr, IOContext io_context,
+				 WritebackContext *wb_context)
+{
+	BufferWriteBatch batch;
+	StartBufferIOResult status;
+
+	/* Start IO on the victim buffer */
+	if ((status = StartBufferIO(bufnum, false, true, NULL)) !=
+		BUFFER_IO_READY_FOR_IO)
+	{
+		Assert(status == BUFFER_IO_ALREADY_DONE);
+
+		/*
+		 * Don't eagerly flush anything if the victim buffer is already clean.
+		 * Leave the buffer pinned for the caller.
+		 */
+		BufferLockUnlock(bufnum, buf_hdr);
+		return;
+	}
+
+	/*
+	 * Pin victim again so it stays ours even after batch released. See
+	 * EagerCleanStrategyBuffer() for more details.
+	 */
+	IncrBufferRefCount(bufnum);
+
+	/*
+	 * If we are allowed more pins and there are more blocks in the relation
+	 * and the victim buffer's block's preceding and/or following blocks are
+	 * eligible for eager flushing, combine them into a batch.
+	 */
+	ConstructCenteredEagerWriteBatch(buf_hdr, &batch);
+	FlushBufferBatch(&batch, io_context);
+	CompleteWriteBatchIO(&batch, io_context, wb_context);
+}
+
 /*
  * Helper to claim a victim buffer -- which is invalidating its existing
  * contents (including flushing the old contents first if needed).
@@ -2769,12 +2818,7 @@ ClaimVictimBuffer(BufferAccessStrategy strategy,
 		if (strategy && from_ring && StrategySupportsEagerFlush(strategy))
 			EagerCleanStrategyBuffer(strategy, bufnum, buf_hdr, io_context);
 		else
-		{
-			FlushBuffer(buf_hdr, NULL, IOOBJECT_RELATION, io_context);
-			BufferLockUnlock(bufnum, buf_hdr);
-			ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context,
-										  &buf_hdr->tag);
-		}
+			EagerCleanBuffer(bufnum, buf_hdr, io_context, &BackendWritebackContext);
 	}
 
 	/*
@@ -4948,6 +4992,32 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	error_context_stack = errcallback.previous;
 }
 
+static Buffer
+LookupBufferForTag(BufferTag *tag)
+{
+	uint32		hash;
+	LWLock	   *partition_lock;
+	int			buf_id;
+
+	hash = BufTableHashCode(tag);
+	partition_lock = BufMappingPartitionLock(hash);
+
+	/*
+	 * We can release the partition lock as soon as we've done the lookup. The
+	 * buffer may be evicted and reused before we pin it, but
+	 * PrepareOrRejectEagerFlushBuffer() rechecks the buffer's tag after
+	 * pinning it, so it will reject the buffer if that happens.
+	 */
+	LWLockAcquire(partition_lock, LW_SHARED);
+	buf_id = BufTableLookup(tag, hash);
+	LWLockRelease(partition_lock);
+
+	if (buf_id < 0)
+		return InvalidBuffer;
+
+	return buf_id + 1;
+}
+
 /*
  * Given a required buffer that is ready to flush, initialize the batch
  * structure and place it at the beginning of the new batch. Returns the
@@ -4983,6 +5053,198 @@ InitForwardEagerWriteBatch(BufferAccessStrategy strategy,
 	return limit;
 }
 
+/*
+ * Given a required bufhdr, set up a batch that will include it and may
+ * include blocks from its same segment, preceding and/or following. Returns
+ * the current maximum number of blocks that can be in this batch given the
+ * location of the block in the file, the current available pins, and various
+ * configuration GUCs.
+ */
+static BlockNumber
+InitCenteredEagerWriteBatch(BufferDesc *required_bufhdr,
+							BufferWriteBatch *batch,
+							BlockNumber *scan_start, BlockNumber *scan_end)
+{
+	BlockNumber batch_limit;
+	BlockNumber required_block;
+	BlockNumber bound_start,
+				bound_end;
+
+	Assert(required_bufhdr);
+	batch->forkno = BufTagGetForkNum(&required_bufhdr->tag);
+	batch->reln = smgropen(BufTagGetRelFileLocator(&required_bufhdr->tag),
+						   INVALID_PROC_NUMBER);
+
+	Assert(BufferIsLockedByMe(BufferDescriptorGetBuffer(required_bufhdr)));
+	batch->max_lsn =
+		pg_atomic_read_u64(&required_bufhdr->state) & BM_PERMANENT ?
+		BufferGetLSN(required_bufhdr) : InvalidXLogRecPtr;
+
+	batch->n = 0;
+
+	batch_limit = CurrentMaxEagerWriteBatchSize(MaxWriteBatchSize(NULL));
+
+	/* If we can only write out the required buffer, do that */
+	if (batch_limit <= 1)
+	{
+		batch->bufhdrs[0] = required_bufhdr;
+		batch->n = 1;
+		return 1;
+	}
+
+	required_block = required_bufhdr->tag.blockNum;
+	Assert(BlockNumberIsValid(required_block));
+
+	/*
+	 * This shouldn't fail but because we are eagerly constructing a batch,
+	 * let's not make it a source of runtime failures and instead cope. If we
+	 * fail later, that should hopefully be a more useful error.
+	 */
+	if (!smgrblockbounds(batch->reln, batch->forkno, required_block,
+						 &bound_start, &bound_end))
+	{
+		batch->bufhdrs[0] = required_bufhdr;
+		batch->n = 1;
+		return 1;
+	}
+
+	Assert(bound_start <= required_block);
+	Assert(required_block <= bound_end);
+
+	/* Limit the scan to batch_limit in either direction */
+	*scan_start = required_block - Min(required_block - bound_start,
+									   batch_limit - 1);
+	*scan_end = required_block + Min(bound_end - required_block,
+									 batch_limit - 1);
+
+	return batch_limit;
+}
+
+/*
+ * Construct a contiguous, fully prepared batch containing required_bufhdr.
+ * This looks both forward and backwards for contiguous blocks that are dirty
+ * and in shared buffers.
+ */
+static void
+ConstructCenteredEagerWriteBatch(BufferDesc *required_bufhdr,
+								 BufferWriteBatch *batch)
+{
+	BufferDesc *left_bufhdrs[MAX_IO_COMBINE_LIMIT];
+	uint32		left_count = 0;
+	BlockNumber scan_start,
+				scan_end;
+	BufferTag	require;
+	XLogRecPtr	lsn;
+	uint32		batch_limit;
+	BlockNumber blkno = required_bufhdr->tag.blockNum;
+
+	Assert(required_bufhdr);
+	Assert(BufferIsLockedByMe(BufferDescriptorGetBuffer(required_bufhdr)));
+
+	batch_limit = InitCenteredEagerWriteBatch(required_bufhdr,
+											  batch,
+											  &scan_start, &scan_end);
+	if (batch_limit <= 1)
+		return;
+
+	InitBufferTag(&require, &batch->reln->smgr_rlocator.locator,
+				  batch->forkno,
+				  InvalidBlockNumber);
+
+	/*
+	 * Loop backwards from our required block, looking for up to batch_limit
+	 * contiguous blocks, but stop once we hit the start of the current
+	 * file/segment.
+	 */
+	while (blkno > scan_start && left_count < batch_limit - 1)
+	{
+		Buffer		bufnum;
+		BufferDesc *bufhdr;
+
+		/*
+		 * We must be sure not to process the required buffer here, as we've
+		 * already confirmed it is dirty and needs to be written out and
+		 * pinned and locked it.
+		 */
+		require.blockNum = --blkno;
+
+		/*
+		 * blkno is guaranteed to be valid because we checked scan_start
+		 * before decrementing it.
+		 */
+		Assert(BlockNumberIsValid(blkno));
+		bufnum = LookupBufferForTag(&require);
+		bufhdr = PrepareOrRejectEagerFlushBuffer(bufnum, &require,
+												 &lsn);
+		if (bufhdr == NULL)
+			break;
+
+		if (lsn > batch->max_lsn)
+			batch->max_lsn = lsn;
+
+		left_bufhdrs[left_count++] = bufhdr;
+	}
+
+	/*
+	 * Copy those left side buffer descriptors we found into the final result
+	 * batch.
+	 */
+	while (left_count > 0)
+	{
+		batch->bufhdrs[batch->n++] = left_bufhdrs[left_count - 1];
+		left_count--;
+	}
+
+	/* Now we know where our required buffer will be in the batch */
+	batch->bufhdrs[batch->n++] = required_bufhdr;
+
+	/*
+	 * Loop forward from our required block and put any contiguous blocks we
+	 * find, up to batch_limit, in our bufhdrs array. Stop if we hit the end
+	 * of the segment.
+	 */
+	for (BlockNumber right_blkno = required_bufhdr->tag.blockNum;
+		 right_blkno < scan_end && batch->n < batch_limit;)
+	{
+		Buffer		bufnum;
+		BufferDesc *bufhdr;
+
+		/*
+		 * We must be sure not to process the required buffer here, as we've
+		 * already confirmed it is dirty and needs to be written out and
+		 * pinned and locked it.
+		 */
+		require.blockNum = ++right_blkno;
+
+		/*
+		 * right_blkno is guaranteed to be valid because we checked scan_end
+		 * before incrementing it.
+		 */
+		Assert(BlockNumberIsValid(right_blkno));
+		bufnum = LookupBufferForTag(&require);
+		bufhdr = PrepareOrRejectEagerFlushBuffer(bufnum, &require,
+												 &lsn);
+		if (bufhdr == NULL)
+			break;
+
+		if (lsn > batch->max_lsn)
+			batch->max_lsn = lsn;
+
+		batch->bufhdrs[batch->n++] = bufhdr;
+	}
+
+	/*
+	 * Batch validations, including that we actually added the required buffer
+	 * to the batch and it is at the expected location in the array.
+	 */
+	Assert(batch->n > 0);
+	Assert(batch->bufhdrs[0]->tag.blockNum <= required_bufhdr->tag.blockNum);
+	Assert(required_bufhdr->tag.blockNum <
+		   batch->bufhdrs[0]->tag.blockNum + batch->n);
+	Assert(batch->bufhdrs[required_bufhdr->tag.blockNum -
+						  batch->bufhdrs[0]->tag.blockNum] == required_bufhdr);
+}
+
 /*
  * Prepare bufnum for eager flushing.
  *
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index dee29037b16..3256e2a9891 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -852,6 +852,42 @@ mdmaxcombine(SMgrRelation reln, ForkNumber forknum,
 	return RELSEG_SIZE - segoff;
 }
 
+/*
+ * Return the start and end block numbers of the segment containing blocknum,
+ * clamped to the current relation fork size.
+ */
+bool
+mdblockbounds(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+			  BlockNumber *start, BlockNumber *end)
+{
+	BlockNumber nblocks;
+	BlockNumber max_block;
+	BlockNumber segstart;
+	BlockNumber segend;
+
+	nblocks = mdnblocks(reln, forknum);
+	if (nblocks == 0 || blocknum >= nblocks)
+		return false;
+
+	max_block = nblocks - 1;
+	segstart = (blocknum / ((BlockNumber) RELSEG_SIZE)) *
+		((BlockNumber) RELSEG_SIZE);
+	segend = segstart + ((BlockNumber) RELSEG_SIZE - 1);
+
+	/*
+	 * segend is how big the relation fork could be, but if it is the last
+	 * segment of the relation, it may not be full. Be sure segend does not
+	 * exceed the size of the relation.
+	 */
+	if (segend > max_block)
+		segend = max_block;
+
+	*start = segstart;
+	*end = segend;
+
+	return true;
+}
+
 /*
  * mdreadv() -- Read the specified blocks from a relation.
  */
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index 5391640d861..e5d29e7c168 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -104,6 +104,9 @@ typedef struct f_smgr
 								  BlockNumber blocknum, int nblocks);
 	uint32		(*smgr_maxcombine) (SMgrRelation reln, ForkNumber forknum,
 									BlockNumber blocknum);
+	bool		(*smgr_blockbounds) (SMgrRelation reln, ForkNumber forknum,
+									 BlockNumber blocknum,
+									 BlockNumber *start, BlockNumber *end);
 	void		(*smgr_readv) (SMgrRelation reln, ForkNumber forknum,
 							   BlockNumber blocknum,
 							   void **buffers, BlockNumber nblocks);
@@ -139,6 +142,7 @@ static const f_smgr smgrsw[] = {
 		.smgr_zeroextend = mdzeroextend,
 		.smgr_prefetch = mdprefetch,
 		.smgr_maxcombine = mdmaxcombine,
+		.smgr_blockbounds = mdblockbounds,
 		.smgr_readv = mdreadv,
 		.smgr_startreadv = mdstartreadv,
 		.smgr_writev = mdwritev,
@@ -706,6 +710,26 @@ smgrmaxcombine(SMgrRelation reln, ForkNumber forknum,
 	return ret;
 }
 
+/*
+ * Return the start and end block numbers of the storage manager's combinable
+ * range containing blocknum.
+ *
+ * Returns false if blocknum is not within the current relation fork size.
+ */
+bool
+smgrblockbounds(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+				BlockNumber *start, BlockNumber *end)
+{
+	bool		ret;
+
+	HOLD_INTERRUPTS();
+	ret = smgrsw[reln->smgr_which].smgr_blockbounds(reln, forknum, blocknum,
+													start, end);
+	RESUME_INTERRUPTS();
+
+	return ret;
+}
+
 /*
  * smgrreadv() -- read a particular block range from a relation into the
  *				 supplied buffers.
diff --git a/src/include/storage/md.h b/src/include/storage/md.h
index b8d10329eb8..8bbf5f0b94a 100644
--- a/src/include/storage/md.h
+++ b/src/include/storage/md.h
@@ -37,6 +37,9 @@ extern bool mdprefetch(SMgrRelation reln, ForkNumber forknum,
 					   BlockNumber blocknum, int nblocks);
 extern uint32 mdmaxcombine(SMgrRelation reln, ForkNumber forknum,
 						   BlockNumber blocknum);
+extern bool mdblockbounds(SMgrRelation reln, ForkNumber forknum,
+						  BlockNumber blocknum,
+						  BlockNumber *start, BlockNumber *end);
 extern void mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 					void **buffers, BlockNumber nblocks);
 extern void mdstartreadv(PgAioHandle *ioh,
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index 09bd42fcf4b..79a7f2c53c7 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -97,6 +97,9 @@ extern bool smgrprefetch(SMgrRelation reln, ForkNumber forknum,
 						 BlockNumber blocknum, int nblocks);
 extern uint32 smgrmaxcombine(SMgrRelation reln, ForkNumber forknum,
 							 BlockNumber blocknum);
+extern bool smgrblockbounds(SMgrRelation reln, ForkNumber forknum,
+							BlockNumber blocknum,
+							BlockNumber *start, BlockNumber *end);
 extern void smgrreadv(SMgrRelation reln, ForkNumber forknum,
 					  BlockNumber blocknum,
 					  void **buffers, BlockNumber nblocks);
-- 
2.43.0

