From d954d754656b9bc05da4c2edc0a4bad8b3118091 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Tue, 2 Sep 2025 12:43:24 -0400
Subject: [PATCH v3 3/9] Eagerly flush bulkwrite strategy ring

Operations using BAS_BULKWRITE (COPY FROM and createdb) will inevitably
need to flush buffers in the strategy ring in order to reuse
them. By eagerly flushing the buffers in a larger batch, we encourage
larger writes at the kernel level and less interleaving of WAL flushes
and data file writes. The effect is mainly noticeable with multiple
parallel COPY FROMs. In this case, client backends achieve higher write
throughput and end up spending less time waiting on acquiring the lock
to flush WAL. Larger flush operations also mean less time waiting for
flush operations at the kernel level as well.

The heuristic for eager eviction is to only flush buffers in the
strategy ring which flushing does not require flushing WAL.

This patch also is a stepping stone toward AIO writes.

Earlier version
Reviewed-by: Kirill Reshke <reshkekirill@gmail.com>
Discussion: https://postgr.es/m/flat/CAAKRu_Yjn4mvN9NBxtmsCQSGwup45CoA4e05nhR7ADP-v0WCig%40mail.gmail.com
---
 src/backend/storage/buffer/bufmgr.c   | 166 +++++++++++++++++++++++++-
 src/backend/storage/buffer/freelist.c |  63 ++++++++++
 src/include/storage/buf_internals.h   |   3 +
 3 files changed, 229 insertions(+), 3 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 84ff5e0f1bf..90f36a04c19 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -534,6 +534,11 @@ static void DoFlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object
 						  IOContext io_context, XLogRecPtr buffer_lsn);
 static void FlushBuffer(BufferDesc *buf, SMgrRelation reln,
 						IOObject io_object, IOContext io_context);
+static BufferDesc *next_strat_buf_to_flush(BufferAccessStrategy strategy, XLogRecPtr *lsn);
+static BufferDesc *PrepareOrRejectEagerFlushBuffer(Buffer bufnum, BlockNumber require,
+												   RelFileLocator *rlocator,
+												   bool skip_pinned,
+												   XLogRecPtr *max_lsn);
 static void CleanVictimBuffer(BufferAccessStrategy strategy,
 							  BufferDesc *bufdesc, uint32 *buf_state, bool from_ring);
 static void FindAndDropRelationBuffers(RelFileLocator rlocator,
@@ -4253,6 +4258,31 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 		DoFlushBuffer(buf, reln, io_object, io_context, lsn);
 }
 
+/*
+ * Returns the buffer descriptor of the buffer containing the next block we
+ * should eagerly flush or or NULL when there are no further buffers to
+ * consider writing out.
+ */
+static BufferDesc *
+next_strat_buf_to_flush(BufferAccessStrategy strategy,
+						XLogRecPtr *lsn)
+{
+	Buffer		bufnum;
+	BufferDesc *bufdesc;
+
+	while ((bufnum = StrategySweepNextBuffer(strategy)) != InvalidBuffer)
+	{
+		if ((bufdesc = PrepareOrRejectEagerFlushBuffer(bufnum,
+													   InvalidBlockNumber,
+													   NULL,
+													   true,
+													   lsn)) != NULL)
+			return bufdesc;
+	}
+
+	return NULL;
+}
+
 /*
  * Prepare to write and write a dirty victim buffer.
  */
@@ -4263,6 +4293,7 @@ CleanVictimBuffer(BufferAccessStrategy strategy,
 
 	XLogRecPtr	max_lsn = InvalidXLogRecPtr;
 	LWLock	   *content_lock;
+	bool		first_buffer = true;
 	IOContext	io_context = IOContextForStrategy(strategy);
 
 	Assert(*buf_state & BM_DIRTY);
@@ -4271,11 +4302,140 @@ CleanVictimBuffer(BufferAccessStrategy strategy,
 	if (!PrepareFlushBuffer(bufdesc, buf_state, &max_lsn))
 		return;
 
-	DoFlushBuffer(bufdesc, NULL, IOOBJECT_RELATION, io_context, max_lsn);
+	if (from_ring && strategy_supports_eager_flush(strategy))
+	{
+		/* Clean victim buffer and find more to flush opportunistically */
+		StartStrategySweep(strategy);
+		do
+		{
+			DoFlushBuffer(bufdesc, NULL, IOOBJECT_RELATION, io_context, max_lsn);
+			content_lock = BufferDescriptorGetContentLock(bufdesc);
+			LWLockRelease(content_lock);
+			ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context,
+										  &bufdesc->tag);
+			/* We leave the first buffer pinned for the caller */
+			if (!first_buffer)
+				UnpinBuffer(bufdesc);
+			first_buffer = false;
+		} while ((bufdesc = next_strat_buf_to_flush(strategy, &max_lsn)) != NULL);
+	}
+	else
+	{
+		DoFlushBuffer(bufdesc, NULL, IOOBJECT_RELATION, io_context, max_lsn);
+		content_lock = BufferDescriptorGetContentLock(bufdesc);
+		LWLockRelease(content_lock);
+		ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context,
+									  &bufdesc->tag);
+	}
+}
+
+/*
+ * Prepare bufdesc for eager flushing.
+ *
+ * Given bufnum, returns the block -- the pointer to the block data in memory
+ * -- which we will opportunistically flush or NULL if this buffer does not
+ *  contain a block that should be flushed.
+ *
+ * require is the BlockNumber required by the caller. Some callers may require
+ * a specific BlockNumber to be in bufnum because they are assembling a
+ * contiguous run of blocks.
+ *
+ * If the caller needs the block to be from a specific relation, rlocator will
+ * be provided.
+ */
+BufferDesc *
+PrepareOrRejectEagerFlushBuffer(Buffer bufnum, BlockNumber require,
+								RelFileLocator *rlocator, bool skip_pinned,
+								XLogRecPtr *max_lsn)
+{
+	BufferDesc *bufdesc;
+	uint32		buf_state;
+	XLogRecPtr	lsn;
+	BlockNumber blknum;
+	LWLock	   *content_lock;
+
+	if (!BufferIsValid(bufnum))
+		return NULL;
+
+	Assert(!BufferIsLocal(bufnum));
+
+	bufdesc = GetBufferDescriptor(bufnum - 1);
+
+	/* Block may need to be in a specific relation */
+	if (rlocator &&
+		!RelFileLocatorEquals(BufTagGetRelFileLocator(&bufdesc->tag),
+							  *rlocator))
+		return NULL;
+
+	/* Must do this before taking the buffer header spinlock. */
+	ResourceOwnerEnlarge(CurrentResourceOwner);
+	ReservePrivateRefCountEntry();
+
+	buf_state = LockBufHdr(bufdesc);
+
+	if (!(buf_state & BM_DIRTY) || !(buf_state & BM_VALID))
+		goto except_unlock_header;
+
+	/* We don't include used buffers in batches */
+	if (skip_pinned &&
+		(BUF_STATE_GET_REFCOUNT(buf_state) > 0 ||
+		 BUF_STATE_GET_USAGECOUNT(buf_state) > 1))
+		goto except_unlock_header;
+
+	/* Get page LSN while holding header lock */
+	lsn = BufferGetLSN(bufdesc);
+
+	PinBuffer_Locked(bufdesc);
+	CheckBufferIsPinnedOnce(bufnum);
+
+	blknum = BufferGetBlockNumber(bufnum);
+	Assert(BlockNumberIsValid(blknum));
+
+	/* If we'll have to flush WAL to flush the block, we're done */
+	if (buf_state & BM_PERMANENT && XLogNeedsFlush(lsn))
+		goto except_unpin_buffer;
+
+	/* We only include contiguous blocks in the run */
+	if (BlockNumberIsValid(require) && blknum != require)
+		goto except_unpin_buffer;
+
 	content_lock = BufferDescriptorGetContentLock(bufdesc);
+	if (!LWLockConditionalAcquire(content_lock, LW_SHARED))
+		goto except_unpin_buffer;
+
+	/*
+	 * Now that we have the content lock, we need to recheck if we need to
+	 * flush WAL.
+	 */
+	buf_state = LockBufHdr(bufdesc);
+	lsn = BufferGetLSN(bufdesc);
+	UnlockBufHdr(bufdesc, buf_state);
+
+	if (buf_state & BM_PERMANENT && XLogNeedsFlush(lsn))
+		goto except_unlock_content;
+
+	/* Try to start an I/O operation. */
+	if (!StartBufferIO(bufdesc, false, true))
+		goto except_unlock_content;
+
+	if (lsn > *max_lsn)
+		*max_lsn = lsn;
+	buf_state = LockBufHdr(bufdesc);
+	buf_state &= ~BM_JUST_DIRTIED;
+	UnlockBufHdr(bufdesc, buf_state);
+
+	return bufdesc;
+
+except_unlock_content:
 	LWLockRelease(content_lock);
-	ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context,
-								  &bufdesc->tag);
+
+except_unpin_buffer:
+	UnpinBuffer(bufdesc);
+	return NULL;
+
+except_unlock_header:
+	UnlockBufHdr(bufdesc, buf_state);
+	return NULL;
 }
 
 /*
diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c
index a90a7ed4e16..e26a546bc99 100644
--- a/src/backend/storage/buffer/freelist.c
+++ b/src/backend/storage/buffer/freelist.c
@@ -75,6 +75,15 @@ typedef struct BufferAccessStrategyData
 	 */
 	int			current;
 
+	/*
+	 * If the strategy supports eager flushing, we may initiate a sweep of the
+	 * strategy ring, flushing all the dirty buffers we can cheaply flush.
+	 * sweep_start and sweep_current keep track of a given sweep so we don't
+	 * loop around the ring infinitely.
+	 */
+	int			sweep_start;
+	int			sweep_current;
+
 	/*
 	 * Array of buffer numbers.  InvalidBuffer (that is, zero) indicates we
 	 * have not yet selected a buffer for this ring slot.  For allocation
@@ -156,6 +165,31 @@ ClockSweepTick(void)
 	return victim;
 }
 
+/*
+ * Some BufferAccessStrategies support eager flushing -- which is flushing
+ * buffers in the ring before they are needed. This can lean to better I/O
+ * patterns than lazily flushing buffers directly before reusing them.
+ */
+bool
+strategy_supports_eager_flush(BufferAccessStrategy strategy)
+{
+	Assert(strategy);
+
+	switch (strategy->btype)
+	{
+		case BAS_BULKWRITE:
+			return true;
+		case BAS_VACUUM:
+		case BAS_NORMAL:
+		case BAS_BULKREAD:
+			return false;
+		default:
+			elog(ERROR, "unrecognized buffer access strategy: %d",
+				 (int) strategy->btype);
+			return false;
+	}
+}
+
 /*
  * StrategyGetBuffer
  *
@@ -270,6 +304,35 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state, bool *from_r
 	}
 }
 
+/*
+ * Return the next buffer in the ring or InvalidBuffer if the current sweep is
+ * over.
+ */
+Buffer
+StrategySweepNextBuffer(BufferAccessStrategy strategy)
+{
+	strategy->sweep_current++;
+	if (strategy->sweep_current >= strategy->nbuffers)
+		strategy->sweep_current = 0;
+
+	if (strategy->sweep_current == strategy->sweep_start)
+		return InvalidBuffer;
+
+	return strategy->buffers[strategy->sweep_current];
+}
+
+/*
+ * Start a sweep of the strategy ring.
+ */
+void
+StartStrategySweep(BufferAccessStrategy strategy)
+{
+	if (!strategy)
+		return;
+	strategy->sweep_start = strategy->sweep_current = strategy->current;
+}
+
+
 /*
  * StrategySyncStart -- tell BgBufferSync where to start syncing
  *
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index b1b81f31419..7963d1189a6 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -437,6 +437,9 @@ extern void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag
 
 
 /* freelist.c */
+extern bool strategy_supports_eager_flush(BufferAccessStrategy strategy);
+extern Buffer StrategySweepNextBuffer(BufferAccessStrategy strategy);
+extern void StartStrategySweep(BufferAccessStrategy strategy);
 extern IOContext IOContextForStrategy(BufferAccessStrategy strategy);
 extern BufferDesc *StrategyGetBuffer(BufferAccessStrategy strategy,
 									 uint32 *buf_state, bool *from_ring);
-- 
2.43.0

