From 4d6cebab4a74ff21d790e16004b8d695b2e25cfe Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Wed, 15 Oct 2025 13:42:47 -0400
Subject: [PATCH v15 08/19] Write combining for BAS_BULKWRITE

Implement write combining for users of the bulkwrite buffer access
strategy (e.g. COPY FROM). When the buffer access strategy needs to
clean a buffer for reuse, it already opportunistically flushes some
other buffers. Now, combine any contiguous blocks from the same relation
into larger writes and issue them with smgrwritev().

The performance benefit for COPY FROM is mostly noticeable for multiple
concurrent COPY FROMs because a single COPY FROM is either CPU bound or
bound by WAL writes.

The infrastructure for flushing larger batches of IOs will be reused by
checkpointer and other processes doing writes of dirty data.

XXX: remove development-only GUCs

Author: Melanie Plageman <melanieplageman@gmail.com>
Reviewed-by: Chao Li <li.evan.chao@gmail.com>
Discussion: https://postgr.es/m/flat/CAAKRu_bcWRvRwZUop_d9vzF9nHAiT%2B-uPzkJ%3DS3ShZ1GqeAYOw%40mail.gmail.com
---
 doc/src/sgml/config.sgml                      |  19 +
 src/backend/storage/buffer/bufmgr.c           | 523 +++++++++++++++++-
 src/backend/storage/page/bufpage.c            |  22 +
 src/backend/utils/misc/guc_parameters.dat     |   9 +
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/backend/utils/probes.d                    |   2 +
 src/include/storage/bufmgr.h                  |   2 +
 src/include/storage/bufpage.h                 |   2 +
 src/tools/pgindent/typedefs.list              |   1 +
 9 files changed, 564 insertions(+), 17 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index fa566c9e553..560c0fc8c93 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2894,6 +2894,25 @@ include_dir 'conf.d'
        </listitem>
       </varlistentry>
 
+      <varlistentry id="guc-eager-clean-max-batch-size" xreflabel="eager_clean_max_batch_size">
+       <term><varname>eager_clean_max_batch_size</varname> (<type>integer</type>)
+       <indexterm>
+        <primary><varname>eager_clean_max_batch_size</varname> configuration parameter</primary>
+       </indexterm>
+       </term>
+       <listitem>
+        <para>
+         Controls the largest number of blocks flushed as one write by
+         eager cleaning in the buffer manager.
+         If this value is specified without units, it is taken as blocks,
+         that is <symbol>BLCKSZ</symbol> bytes, typically 8kB.
+         The maximum possible size depends on the operating system and block
+         size, but is typically 1MB on Unix and 128kB on Windows.
+         The default is 16 blocks.
+        </para>
+       </listitem>
+      </varlistentry>
+
       <varlistentry id="guc-io-combine-limit" xreflabel="io_combine_limit">
        <term><varname>io_combine_limit</varname> (<type>integer</type>)
        <indexterm>
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 0c14d3b4561..815b037a8ec 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -125,6 +125,27 @@ typedef struct PrivateRefCountEntry
 	PrivateRefCountData data;
 } PrivateRefCountEntry;
 
+/*
+ * Used to write out multiple blocks at a time in a combined IO. bufhdrs
+ * contains buffer descriptors for buffers containing adjacent blocks of the
+ * same fork of the same relation.
+ */
+typedef struct BufferWriteBatch
+{
+	ForkNumber	forkno;
+	SMgrRelation reln;
+
+	/*
+	 * While assembling the buffers, we keep track of the maximum LSN so that
+	 * we can flush WAL through this LSN before flushing the buffers.
+	 */
+	XLogRecPtr	max_lsn;
+
+	/* The number of valid buffers in bufhdrs */
+	uint32		n;
+	BufferDesc *bufhdrs[MAX_IO_COMBINE_LIMIT];
+} BufferWriteBatch;
+
 #define SH_PREFIX refcount
 #define SH_ELEMENT_TYPE PrivateRefCountEntry
 #define SH_KEY_TYPE Buffer
@@ -223,6 +244,7 @@ int			maintenance_io_concurrency = DEFAULT_MAINTENANCE_IO_CONCURRENCY;
 int			io_combine_limit = DEFAULT_IO_COMBINE_LIMIT;
 int			io_combine_limit_guc = DEFAULT_IO_COMBINE_LIMIT;
 int			io_max_combine_limit = DEFAULT_IO_COMBINE_LIMIT;
+int			eager_clean_max_batch_size = DEFAULT_EAGER_CLEAN_MAX_BATCH_SIZE;
 
 /*
  * GUC variables about triggering kernel writeback for buffers written; OS
@@ -647,6 +669,7 @@ static int	SyncOneBuffer(int buf_id, bool skip_recently_used,
 static void WaitIO(BufferDesc *buf);
 static void AbortBufferIO(Buffer buffer);
 static void shared_buffer_write_error_callback(void *arg);
+static void shared_buffer_write_batch_error_callback(void *arg);
 static void local_buffer_write_error_callback(void *arg);
 static inline BufferDesc *BufferAlloc(SMgrRelation smgr,
 									  char relpersistence,
@@ -661,14 +684,31 @@ static pg_attribute_always_inline void TrackBufferHit(IOObject io_object,
 													  IOContext io_context,
 													  Relation rel, char persistence, SMgrRelation smgr,
 													  ForkNumber forknum, BlockNumber blocknum);
+static uint32 MaxWriteBatchSize(BufferAccessStrategy strategy);
+static uint32 CurrentMaxEagerWriteBatchSize(uint32 max_batch_size);
+static uint32 InitForwardEagerWriteBatch(BufferAccessStrategy strategy,
+										 BufferDesc *required_bufhdr,
+										 BufferWriteBatch *batch);
 static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context);
 static void FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln,
 								IOObject io_object, IOContext io_context);
 static void FlushBuffer(BufferDesc *buf, SMgrRelation reln,
 						IOObject io_object, IOContext io_context);
-static BufferDesc *PrepareOrRejectEagerFlushBuffer(Buffer bufnum);
+static BufferDesc *PrepareOrRejectEagerFlushBuffer(Buffer bufnum,
+												   BufferTag *require,
+												   XLogRecPtr *lsn);
 static void ScheduleBufferTagForWriteback(WritebackContext *wb_context,
 										  IOContext io_context, BufferTag *tag);
+static void FlushBufferBatch(BufferWriteBatch *batch, IOContext io_context);
+static void ExtendStrategyEagerWriteBatch(BufferAccessStrategy strategy, Buffer sweep_end,
+										  uint32 batch_limit,
+										  BufferWriteBatch *batch,
+										  int *sweep_cursor);
+static void CompleteWriteBatchIO(BufferWriteBatch *batch, IOContext io_context,
+								 WritebackContext *wb_context);
+static void ScheduleBufferTagBatchForWriteback(WritebackContext *wb_context,
+											   BufferTag tag, uint32 batch_size,
+											   IOContext io_context);
 static void FindAndDropRelationBuffers(RelFileLocator rlocator,
 									   ForkNumber forkNum,
 									   BlockNumber nForkBlock,
@@ -2569,12 +2609,34 @@ static void
 EagerCleanStrategyBuffer(BufferAccessStrategy strategy, Buffer bufnum,
 						 BufferDesc *buf_hdr, IOContext io_context)
 {
+	StartBufferIOResult status;
 	BufferDesc *next_bufhdr = buf_hdr;
 	Buffer		next_bufnum = bufnum;
 	Buffer		sweep_end = bufnum;
 	int			cursor = StrategyGetCurrentIndex(strategy);
 
-	/* Pin victim again so it stays ours even after unpinning below */
+	/* Start IO on the first buffer */
+	if ((status = StartBufferIO(bufnum, false, true, NULL)) !=
+		BUFFER_IO_READY_FOR_IO)
+	{
+		Assert(status == BUFFER_IO_ALREADY_DONE);
+
+		/*
+		 * Don't eagerly flush anything if the target buffer is already clean.
+		 * Leave the buffer pinned for the caller.
+		 */
+		BufferLockUnlock(bufnum, buf_hdr);
+		return;
+	}
+
+	/*
+	 * We pin the victim again so it stays ours even after the batch is
+	 * released. The victim buffer was pinned once by the caller and
+	 * CompleteWriteBatchIO() will release a pin, so to return a pinned buffer
+	 * back to the original caller requesting a clean buffer, we take a second
+	 * pin now. Any other members of the batch will be pinned while building
+	 * the batch.
+	 */
 	IncrBufferRefCount(bufnum);
 
 	/*
@@ -2583,14 +2645,31 @@ EagerCleanStrategyBuffer(BufferAccessStrategy strategy, Buffer bufnum,
 	 */
 	for (;;)
 	{
+		XLogRecPtr	next_buf_lsn;	/* unused */
+
 		if (next_bufhdr)
 		{
-			BufferTag	tag = next_bufhdr->tag;
+			BufferWriteBatch batch;
+			BlockNumber limit;
 
-			FlushBuffer(next_bufhdr, NULL, IOOBJECT_RELATION, io_context);
-			UnlockReleaseBuffer(next_bufnum);
-			/* Only regular backends use buffer access strategies */
-			ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context, &tag);
+			/*
+			 * After finding an eligible buffer, if we are allowed more pins
+			 * and there are more blocks in the relation, identify any of the
+			 * buffers following it which are also eligible and combine them
+			 * into a batch.
+			 *
+			 * The cursor is advanced through the ring slots consumed by the
+			 * current batch. When a buffer is rejected, cursor is left
+			 * pointing to it so that the outer loop can consider it as the
+			 * start of a new batch.
+			 */
+			limit = InitForwardEagerWriteBatch(strategy, next_bufhdr, &batch);
+			if (limit > 1)
+				ExtendStrategyEagerWriteBatch(strategy, sweep_end,
+											  limit, &batch, &cursor);
+			FlushBufferBatch(&batch, io_context);
+			/* Pins and locks released inside CompleteWriteBatchIO */
+			CompleteWriteBatchIO(&batch, io_context, &BackendWritebackContext);
 		}
 
 		/*
@@ -2614,11 +2693,13 @@ EagerCleanStrategyBuffer(BufferAccessStrategy strategy, Buffer bufnum,
 			break;
 
 		/*
-		 * Check buffer eager flush eligibility. If the buffer is ineligible,
-		 * we'll keep looking until we complete one full sweep around the
-		 * ring.
+		 * If the buffer is eligible for eager flushing, it will be the start
+		 * of a new batch. Otherwise, we'll keep looking until we complete one
+		 * full sweep around the ring.
 		 */
-		next_bufhdr = PrepareOrRejectEagerFlushBuffer(next_bufnum);
+		next_bufhdr = PrepareOrRejectEagerFlushBuffer(next_bufnum,
+													  NULL,
+													  &next_buf_lsn);
 	}
 }
 
@@ -2717,6 +2798,46 @@ ClaimVictimBuffer(BufferAccessStrategy strategy,
 	return true;
 }
 
+/*
+ * Determine the largest IO we can assemble given strategy-specific and global
+ * constraints on the number of pinned buffers and max IO size. Currently only
+ * a single write is inflight at a time, so the batch can consume all the
+ * pinned buffers this backend is allowed. Only for batches of shared
+ * (non-local) relations.
+ */
+static uint32
+MaxWriteBatchSize(BufferAccessStrategy strategy)
+{
+	uint32		result = io_combine_limit;
+	uint32		strategy_pin_limit;
+	uint32		max_pin_limit = GetPinLimit();
+
+	/* Apply pin limits */
+	result = Min(result, max_pin_limit);
+	if (strategy)
+	{
+		strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
+		result = Min(result, strategy_pin_limit);
+	}
+
+	/* Ensure forward progress */
+	result = Max(result, 1);
+
+	return result;
+}
+
+static uint32
+CurrentMaxEagerWriteBatchSize(uint32 max_batch_size)
+{
+	uint32		limit;
+
+	limit = Min(eager_clean_max_batch_size, max_batch_size);
+	limit = Min(GetAdditionalPinLimit(), limit);
+
+	/* Guarantee forward progress */
+	return Max(limit, 1);
+}
+
 static Buffer
 GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context)
 {
@@ -4711,31 +4832,84 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	error_context_stack = errcallback.previous;
 }
 
+/*
+ * Given a required buffer that is ready to flush, initialize the batch
+ * structure and place it at the beginning of the new batch. Returns the
+ * maximum number of blocks this batch may contain given the location of the
+ * block, the number of currently available pins, and various configuration
+ * GUCs.
+ */
+static uint32
+InitForwardEagerWriteBatch(BufferAccessStrategy strategy,
+						   BufferDesc *required_bufhdr, BufferWriteBatch *batch)
+{
+	uint32		limit;
+
+	Assert(required_bufhdr);
+	batch->forkno = BufTagGetForkNum(&required_bufhdr->tag);
+	batch->reln = smgropen(BufTagGetRelFileLocator(&required_bufhdr->tag),
+						   INVALID_PROC_NUMBER);
+
+	Assert(BufferIsLockedByMe(BufferDescriptorGetBuffer(required_bufhdr)));
+
+	batch->max_lsn =
+		pg_atomic_read_u64(&required_bufhdr->state) & BM_PERMANENT ?
+		BufferGetLSN(required_bufhdr) : InvalidXLogRecPtr;
+
+	limit = CurrentMaxEagerWriteBatchSize(MaxWriteBatchSize(strategy));
+	limit = Min(limit, smgrmaxcombine(batch->reln, batch->forkno,
+									  required_bufhdr->tag.blockNum));
+	Assert(BlockNumberIsValid(required_bufhdr->tag.blockNum));
+	/* Batch goes forward so required buffer is first */
+	batch->bufhdrs[0] = required_bufhdr;
+	batch->n = 1;
+
+	return limit;
+}
+
 /*
  * Prepare bufnum for eager flushing.
  *
- * Given bufnum, return its buffer descriptor, pinned and locked and ready for
- * write I/O, or NULL if this buffer does not contain a block that should be
- * eager flushed. Buffers that are accepted are suitable for writing out
+ * Given bufnum, return its buffer descriptor, pinned and locked and with
+ * BM_IO_IN_PROGRESS set, or NULL if this buffer does not contain a block that
+ * should be flushed. Buffers that are accepted are suitable for writing out
  * eagerly. The input buffer should not already be pinned.
+ *
+ * If the caller requires a particular block to be in the buffer in order to
+ * accept it, they will provide the required block number and its
+ * RelFileLocator and fork.
+ *
+ * If returning a buffer, also return its LSN.
  */
 static BufferDesc *
-PrepareOrRejectEagerFlushBuffer(Buffer bufnum)
+PrepareOrRejectEagerFlushBuffer(Buffer bufnum,
+								BufferTag *require,
+								XLogRecPtr *lsn)
 {
 	BufferDesc *bufhdr;
 	uint64		buf_state;
 
+	*lsn = InvalidXLogRecPtr;
+
 	if (!BufferIsValid(bufnum))
 		goto reject_buffer;
 
 	Assert(!BufferIsLocal(bufnum));
 
 	bufhdr = GetBufferDescriptor(bufnum - 1);
+
+	/*
+	 * Quick, unsafe checks to see if buffer even possibly contains a block
+	 * meeting our requirements. We'll recheck it all again after getting a
+	 * pin.
+	 */
+	if (require && !BufferTagsEqual(require, &bufhdr->tag))
+		goto reject_buffer;
+
 	buf_state = pg_atomic_read_u64(&bufhdr->state);
 
 	/*
-	 * Quick racy check to see if the buffer is clean, in which case we don't
-	 * need to flush it. We'll recheck if it is dirty again later before
+	 * We'll recheck if it is dirty later, when we have a pin and lock, before
 	 * actually setting BM_IO_IN_PROGRESS.
 	 */
 	if (!(buf_state & BM_DIRTY))
@@ -4773,6 +4947,10 @@ PrepareOrRejectEagerFlushBuffer(Buffer bufnum)
 
 	CheckBufferIsPinnedOnce(bufnum);
 
+	/* Now that we have the buffer pinned, recheck it's got the right block */
+	if (require && !BufferTagsEqual(require, &bufhdr->tag))
+		goto reject_buffer_unpin;
+
 	if (!BufferLockConditional(bufnum, bufhdr, BUFFER_LOCK_SHARE_EXCLUSIVE))
 		goto reject_buffer_unpin;
 
@@ -4784,6 +4962,12 @@ PrepareOrRejectEagerFlushBuffer(Buffer bufnum)
 		XLogNeedsFlush(BufferGetLSN(bufhdr)))
 		goto reject_buffer_unlock;
 
+	/* Try to start an I/O operation */
+	if (StartBufferIO(bufnum, false, true, NULL) != BUFFER_IO_READY_FOR_IO)
+		goto reject_buffer_unlock;
+
+	*lsn = BufferGetLSN(bufhdr);
+
 	return bufhdr;
 
 reject_buffer_unlock:
@@ -4796,6 +4980,181 @@ reject_buffer:
 	return NULL;
 }
 
+/*
+ * Given a starting buffer descriptor from a strategy ring that supports eager
+ * flushing, find additional buffers from the ring that can be combined into a
+ * single write batch with the starting buffer.
+ *
+ * This function will pin and content lock all of the buffers that it
+ * assembles for the IO batch. The caller is responsible for issuing the IO.
+ *
+ * We only want to look in the ring starting at the current *sweep_cursor and
+ * ending at sweep_end. This is to avoid examining the same ring buffers
+ * multiple times. The caller will ensure we make one complete sweep around
+ * the ring. This function only advances *sweep_cursor for buffers consumed
+ * and added to the batch. The caller is responsible for advancing the cursor
+ * to the next candidate batch start.
+ *
+ * batch_limit is the largest batch we are allowed to construct given the
+ * remaining blocks in the table, the number of available pins, and the
+ * current configuration.
+ *
+ * batch is an output parameter that this function will fill with the needed
+ * information to issue this IO.
+ */
+static void
+ExtendStrategyEagerWriteBatch(BufferAccessStrategy strategy,
+							  Buffer sweep_end,
+							  uint32 batch_limit,
+							  BufferWriteBatch *batch,
+							  int *sweep_cursor)
+{
+	BlockNumber batch_start = batch->bufhdrs[0]->tag.blockNum;
+	BufferTag	require;
+
+	Assert(batch_limit > 1);
+
+	/*
+	 * This should only be used to extend batches since it may reject flushing
+	 * the buffer.
+	 */
+	Assert(batch->n >= 1);
+
+	InitBufferTag(&require, &batch->reln->smgr_rlocator.locator,
+				  batch->forkno, InvalidBlockNumber);
+
+	/*
+	 * Now assemble a run of blocks to write out. Our victim buffer is already
+	 * included in the batch (at the head) and we should only start looking
+	 * forward from that buffer for adjacent blocks to include in the batch.
+	 *
+	 * batch->n is advanced in the loop body when a buffer is added.
+	 */
+	while (batch->n < batch_limit)
+	{
+		Buffer		bufnum;
+		BufferDesc *bufhdr;
+		XLogRecPtr	lsn;
+		int			next_cursor = *sweep_cursor;
+
+		/*
+		 * Don't advance the real sweep_cursor until we know if want to
+		 * consume it
+		 */
+		bufnum = StrategyNextBuffer(strategy, &next_cursor);
+
+		/* Completed one sweep of the strategy ring */
+		if (bufnum == sweep_end)
+			break;
+
+		/*
+		 * For BAS_BULKWRITE, once you hit an InvalidBuffer, the remaining
+		 * buffers in the ring will be invalid, so we're done.
+		 */
+		if (!BufferIsValid(bufnum))
+			break;
+
+		require.blockNum = batch_start + batch->n;
+
+		bufhdr = PrepareOrRejectEagerFlushBuffer(bufnum, &require, &lsn);
+
+		/*
+		 * Stop when we encounter a buffer that will break the run. Do not
+		 * consume the ring slot; the caller may consider it the start of the
+		 * next batch. If it did not belong to this batch, we want to consider
+		 * it for the next batch. If it was rejected because it wasn't dirty
+		 * or there was a concurrent user, that may have changed by now, so it
+		 * is okay to do another round of checks next time. While a diabolical
+		 * pattern where every other buffer is rejected could lead to
+		 * pinning/locking and unpinning/unlocking many buffers, that should
+		 * be rare -- especially since we spot those conditions before taking
+		 * the pin and lock.
+		 */
+		if (bufhdr == NULL)
+			break;
+
+		/* This buffer belongs to the current run, so consume this ring slot. */
+		*sweep_cursor = next_cursor;
+
+		/* Add it to the batch */
+		batch->bufhdrs[batch->n++] = bufhdr;
+
+		/*
+		 * Because we don't eagerly flush buffers that need WAL flushed, this
+		 * buffer's LSN should only be greater than the victim buffer LSN if
+		 * the victim doesn't need WAL flushing either -- in which case, we
+		 * don't really need to update max_lsn. But, it seems better to keep
+		 * the max_lsn honest -- especially since doing so is cheap.
+		 */
+		if (lsn > batch->max_lsn)
+			batch->max_lsn = lsn;
+	}
+}
+
+/*
+ * Given a prepared batch of buffers write them out as a vector.
+ */
+static void
+FlushBufferBatch(BufferWriteBatch *batch,
+				 IOContext io_context)
+{
+	BlockNumber batch_start = batch->bufhdrs[0]->tag.blockNum;
+	BlockNumber blknums[MAX_IO_COMBINE_LIMIT];
+	Block		blocks[MAX_IO_COMBINE_LIMIT];
+	instr_time	io_start;
+	ErrorContextCallback errcallback =
+	{
+		.callback = shared_buffer_write_batch_error_callback,
+		.previous = error_context_stack,
+	};
+
+	errcallback.arg = batch;
+	error_context_stack = &errcallback;
+
+	if (XLogRecPtrIsValid(batch->max_lsn))
+		XLogFlush(batch->max_lsn);
+
+	/* Should have been opened when initializing the batch */
+	Assert(batch->reln);
+
+#ifdef USE_ASSERT_CHECKING
+	for (uint32 i = 0; i < batch->n; i++)
+	{
+		BufferDesc *bufhdr = batch->bufhdrs[i];
+
+		Assert(!(pg_atomic_read_u64(&bufhdr->state) & BM_PERMANENT) ||
+			   !XLogNeedsFlush(BufferGetLSN(bufhdr)));
+		Assert(BufTagGetForkNum(&bufhdr->tag) == batch->forkno);
+		Assert(bufhdr->tag.blockNum == batch_start + i);
+	}
+#endif
+
+	TRACE_POSTGRESQL_BUFFER_BATCH_FLUSH_START(batch->forkno,
+											  batch->reln->smgr_rlocator.locator.spcOid,
+											  batch->reln->smgr_rlocator.locator.dbOid,
+											  batch->reln->smgr_rlocator.locator.relNumber,
+											  batch->reln->smgr_rlocator.backend,
+											  batch->n);
+
+	for (BlockNumber i = 0; i < batch->n; i++)
+	{
+		blknums[i] = batch_start + i;
+		blocks[i] = BufHdrGetBlock(batch->bufhdrs[i]);
+	}
+
+	PageSetBatchChecksum((Page *) blocks, blknums, batch->n);
+
+	io_start = pgstat_prepare_io_time(track_io_timing);
+
+	smgrwritev(batch->reln, batch->forkno,
+			   batch_start, (const void **) blocks, batch->n, false);
+
+	pgstat_count_io_op_time(IOOBJECT_RELATION, io_context, IOOP_WRITE,
+							io_start, 1, batch->n * BLCKSZ);
+
+	error_context_stack = errcallback.previous;
+}
+
 /*
  * Convenience wrapper around FlushBuffer() that locks/unlocks the buffer
  * before/after calling FlushBuffer().
@@ -4811,6 +5170,53 @@ FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln,
 	BufferLockUnlock(buffer, buf);
 }
 
+/*
+ * Given a previously initialized batch with buffers that have already been
+ * flushed, terminate the IO on each buffer and then unlock and unpin them.
+ * This assumes all the buffers were locked and pinned. wb_context will be
+ * modified.
+ */
+static void
+CompleteWriteBatchIO(BufferWriteBatch *batch, IOContext io_context,
+					 WritebackContext *wb_context)
+{
+	BufferTag	tag;
+	ErrorContextCallback errcallback =
+	{
+		.callback = shared_buffer_write_error_callback,
+		.previous = error_context_stack,
+	};
+
+	error_context_stack = &errcallback;
+	pgBufferUsage.shared_blks_written += batch->n;
+
+	/* Snapshot the tag before unpinning the buffer */
+	tag = batch->bufhdrs[0]->tag;
+
+	for (uint32 i = 0; i < batch->n; i++)
+	{
+		Buffer		buffer = BufferDescriptorGetBuffer(batch->bufhdrs[i]);
+
+		errcallback.arg = batch->bufhdrs[i];
+
+		/* Mark the buffer as clean and end the BM_IO_IN_PROGRESS state. */
+		TerminateBufferIO(batch->bufhdrs[i], true, 0, true, false);
+		UnlockReleaseBuffer(buffer);
+	}
+
+	TRACE_POSTGRESQL_BUFFER_BATCH_FLUSH_DONE(batch->forkno,
+											 batch->reln->smgr_rlocator.locator.spcOid,
+											 batch->reln->smgr_rlocator.locator.dbOid,
+											 batch->reln->smgr_rlocator.locator.relNumber,
+											 batch->reln->smgr_rlocator.backend,
+											 batch->n,
+											 tag.blockNum);
+
+	error_context_stack = errcallback.previous;
+
+	ScheduleBufferTagBatchForWriteback(wb_context, tag, batch->n, io_context);
+}
+
 /*
  * RelationGetNumberOfBlocksInFork
  *		Determines the current number of pages in the specified relation fork.
@@ -7646,6 +8052,36 @@ shared_buffer_write_error_callback(void *arg)
 							   BufTagGetForkNum(&bufHdr->tag)).str);
 }
 
+/*
+ * Error context callback for errors occurring during a combined write of
+ * multiple shared buffers (see FlushBufferBatch()).
+ */
+static void
+shared_buffer_write_batch_error_callback(void *arg)
+{
+	BufferWriteBatch *batch = (BufferWriteBatch *) arg;
+	BufferDesc *first_bufhdr;
+	BlockNumber start;
+
+	if (batch == NULL || batch->n == 0)
+		return;
+
+	first_bufhdr = batch->bufhdrs[0];
+	start = first_bufhdr->tag.blockNum;
+
+	/* Buffers are pinned, so we can read the tag without locking the spinlock */
+	if (batch->n == 1)
+		errcontext("writing block %u of relation \"%s\"",
+				   start,
+				   relpathperm(BufTagGetRelFileLocator(&first_bufhdr->tag),
+							   BufTagGetForkNum(&first_bufhdr->tag)).str);
+	else
+		errcontext("writing blocks %u..%u of relation \"%s\"",
+				   start, start + batch->n - 1,
+				   relpathperm(BufTagGetRelFileLocator(&first_bufhdr->tag),
+							   BufTagGetForkNum(&first_bufhdr->tag)).str);
+}
+
 /*
  * Error context callback for errors occurring during local buffer writes.
  */
@@ -7900,6 +8336,59 @@ ScheduleBufferTagForWriteback(WritebackContext *wb_context, IOContext io_context
 		IssuePendingWritebacks(wb_context, io_context);
 }
 
+/*
+ * Add all the blocks from a write batch that was recently issued to a list of
+ * pending writeback requests. Don't call while holding buffer locks. tag
+ * should be a copy of a BufferTag from a buffer in the patch from when it was
+ * still pinned. It is okay to call this function for pinned or unpinned
+ * buffers as long as the tag was saved before any pin was released.
+ */
+static void
+ScheduleBufferTagBatchForWriteback(WritebackContext *wb_context,
+								   BufferTag tag, uint32 batch_size,
+								   IOContext io_context)
+{
+	/*
+	 * As pg_flush_data() doesn't do anything with fsync disabled, there's no
+	 * point in tracking in that case.
+	 */
+	if (io_direct_flags & IO_DIRECT_DATA ||
+		!enableFsync)
+		return;
+
+	/*
+	 * Drain the queue to make room if needed. We do this even if writeback
+	 * control is disabled because it may have been previously enabled.
+	 */
+	if (wb_context->nr_pending >= *wb_context->max_pending)
+		IssuePendingWritebacks(wb_context, io_context);
+
+	/* If writeback control is disabled, leave */
+	if (*wb_context->max_pending <= 0)
+		return;
+
+	/* It is okay if n is > max_pending because we flush as we go */
+	Assert(*wb_context->max_pending <= WRITEBACK_MAX_PENDING_FLUSHES);
+
+	/*
+	 * Add buffer to the pending writeback array. All must be from same
+	 * relation.
+	 */
+	for (uint32 i = 0; i < batch_size; i++)
+	{
+		PendingWriteback *pending;
+
+		pending = &wb_context->pending_writebacks[wb_context->nr_pending++];
+		pending->tag = tag;
+
+		tag.blockNum++;
+
+		/* Perform pending flushes if writeback limit exceeded */
+		if (wb_context->nr_pending >= *wb_context->max_pending)
+			IssuePendingWritebacks(wb_context, io_context);
+	}
+}
+
 #define ST_SORT sort_pending_writebacks
 #define ST_ELEMENT_TYPE PendingWriteback
 #define ST_COMPARE(a, b) buffertag_comparator(&a->tag, &b->tag)
diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c
index 1fdfda59edd..186ce83a8e2 100644
--- a/src/backend/storage/page/bufpage.c
+++ b/src/backend/storage/page/bufpage.c
@@ -1528,3 +1528,25 @@ PageSetChecksum(Page page, BlockNumber blkno)
 	((PageHeader) page)->pd_checksum = pg_checksum_page(page, blkno);
 	RESUME_INTERRUPTS();
 }
+
+/*
+ * A helper to set multiple blocks' checksums
+ */
+void
+PageSetBatchChecksum(Page *pages, const BlockNumber *blknos, uint32 length)
+{
+	/* If we don't need a checksum, just return */
+	if (!DataChecksumsNeedWrite())
+		return;
+
+	HOLD_INTERRUPTS();
+	for (uint32 i = 0; i < length; i++)
+	{
+		Page		page = pages[i];
+
+		if (PageIsNew(page))
+			continue;
+		((PageHeader) page)->pd_checksum = pg_checksum_page(page, blknos[i]);
+	}
+	RESUME_INTERRUPTS();
+}
diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat
index afaa058b046..5daf0ed5b1b 100644
--- a/src/backend/utils/misc/guc_parameters.dat
+++ b/src/backend/utils/misc/guc_parameters.dat
@@ -835,6 +835,15 @@
   options => 'dynamic_shared_memory_options',
 },
 
+{ name => 'eager_clean_max_batch_size', type => 'int', context => 'PGC_USERSET', group => 'RESOURCES_IO',
+  short_desc => 'Maximum number of blocks to flush for eager cleaning.',
+  flags => 'GUC_UNIT_BLOCKS',
+  variable => 'eager_clean_max_batch_size',
+  boot_val => 'DEFAULT_EAGER_CLEAN_MAX_BATCH_SIZE',
+  min => '1',
+  max => 'MAX_IO_COMBINE_LIMIT',
+},
+
 { name => 'effective_cache_size', type => 'int', context => 'PGC_USERSET', group => 'QUERY_TUNING_COST',
   short_desc => 'Sets the planner\'s assumption about the total size of the data caches.',
   long_desc => 'That is, the total size of the caches (kernel cache and shared buffers) used for PostgreSQL data files. This is measured in disk pages, which are normally 8 kB each.',
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index ac38cddaaf9..d2351bb1677 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -215,6 +215,7 @@
 #io_max_combine_limit = 128kB           # usually 1-128 blocks (depends on OS)
                                         # (change requires restart)
 #io_combine_limit = 128kB               # usually 1-128 blocks (depends on OS)
+#eager_clean_max_batch_size = 16        # 1-128 blocks (depends on OS)
 
 #io_method = worker                     # worker, io_uring, sync
                                         # (change requires restart)
diff --git a/src/backend/utils/probes.d b/src/backend/utils/probes.d
index 1929521c6a5..e0f48c6d2d9 100644
--- a/src/backend/utils/probes.d
+++ b/src/backend/utils/probes.d
@@ -61,6 +61,8 @@ provider postgresql {
 	probe buffer__flush__done(ForkNumber, BlockNumber, Oid, Oid, Oid);
 	probe buffer__extend__start(ForkNumber, Oid, Oid, Oid, int, unsigned int);
 	probe buffer__extend__done(ForkNumber, Oid, Oid, Oid, int, unsigned int, BlockNumber);
+	probe buffer__batch__flush__start(ForkNumber, Oid, Oid, Oid, int, unsigned int);
+	probe buffer__batch__flush__done(ForkNumber, Oid, Oid, Oid, int, unsigned int, BlockNumber);
 
 	probe buffer__checkpoint__start(int);
 	probe buffer__checkpoint__sync__start();
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 6837b35fc6d..d3d7380e2e2 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -174,9 +174,11 @@ extern PGDLLIMPORT int maintenance_io_concurrency;
 
 #define MAX_IO_COMBINE_LIMIT PG_IOV_MAX
 #define DEFAULT_IO_COMBINE_LIMIT Min(MAX_IO_COMBINE_LIMIT, (128 * 1024) / BLCKSZ)
+#define DEFAULT_EAGER_CLEAN_MAX_BATCH_SIZE 16
 extern PGDLLIMPORT int io_combine_limit;	/* min of the two GUCs below */
 extern PGDLLIMPORT int io_combine_limit_guc;
 extern PGDLLIMPORT int io_max_combine_limit;
+extern PGDLLIMPORT int eager_clean_max_batch_size;
 
 extern PGDLLIMPORT int checkpoint_flush_after;
 extern PGDLLIMPORT int backend_flush_after;
diff --git a/src/include/storage/bufpage.h b/src/include/storage/bufpage.h
index 634e1e49ee5..d13a8d4e162 100644
--- a/src/include/storage/bufpage.h
+++ b/src/include/storage/bufpage.h
@@ -538,5 +538,7 @@ extern void PageIndexTupleDeleteNoCompact(Page page, OffsetNumber offnum);
 extern bool PageIndexTupleOverwrite(Page page, OffsetNumber offnum,
 									const void *newtup, Size newsize);
 extern void PageSetChecksum(Page page, BlockNumber blkno);
+extern void PageSetBatchChecksum(Page *pages, const BlockNumber *blknos,
+								 uint32 length);
 
 #endif							/* BUFPAGE_H */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 2954f47d286..74dc600d742 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -369,6 +369,7 @@ BufferStrategyControl
 BufferTag
 BufferUsage
 BufferUsageCountChange
+BufferWriteBatch
 BuildAccumulator
 BuiltinScript
 BulkInsertState
-- 
2.43.0

