From 3164ce4ebea1f7ad05f252f335f449c382e6fd8f Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Mon, 25 Aug 2025 14:13:04 -0400
Subject: [PATCH v2 1/2] Eagerly flush bulkwrite strategy ring

Operations using BAS_BULKWRITE (COPY FROM and createdb) will inevitably
need to flush buffers in the strategy ring in order to reuse
them. By eagerly flushing the buffers in a larger batch, we encourage
larger writes at the kernel level and less interleaving of WAL flushes
and data file writes. The effect is mainly noticeable with multiple
parallel COPY FROMs. In this case, client backends achieve higher write
throughput and end up spending less time waiting on acquiring the lock
to flush WAL. Larger flush operations also mean less time waiting for
flush operations at the kernel level as well.

The heuristic for eager eviction is to only flush buffers in the
strategy ring which flushing does not require flushing WAL.

This patch also is a stepping stone toward using AIO for COPY FROM.

Reviewed-by: Kirill Reshke <reshkekirill@gmail.com>
Discussion: https://postgr.es/m/flat/CAAKRu_Yjn4mvN9NBxtmsCQSGwup45CoA4e05nhR7ADP-v0WCig%40mail.gmail.com
---
 src/backend/storage/buffer/bufmgr.c   | 71 +++++++++++++++++++++++++++
 src/backend/storage/buffer/freelist.c | 67 +++++++++++++++++++++++++
 src/include/storage/buf_internals.h   |  1 +
 src/include/storage/bufmgr.h          |  2 +
 4 files changed, 141 insertions(+)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index fd7e21d96d3..46fe206ddfd 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -2341,6 +2341,74 @@ InvalidateVictimBuffer(BufferDesc *buf_hdr)
 	return true;
 }
 
+/*
+ * Pin and lock a shared buffer and then flush it. Don't flush the buffer if
+ * doing so would mean we have to flush WAL. We only evict the buffer if doing
+ * so is "cheap", i.e. we're able to lock the buffer and we don't have to
+ * flush WAL. This is appropriate for occasions in which we don't need to
+ * guarantee that the buffer is flushed.
+ */
+void
+QuickCleanBuffer(Buffer buffer, IOContext io_context)
+{
+	uint32		buf_state;
+	XLogRecPtr	lsn;
+	LWLock	   *content_lock;
+	BufferDesc *bufdesc;
+
+	Assert(BufferIsValid(buffer));
+	Assert(!BufferIsLocal(buffer));
+
+	bufdesc = GetBufferDescriptor(buffer - 1);
+
+	buf_state = LockBufHdr(bufdesc);
+
+	/*
+	 * No need to flush the buffer if it isn't dirty. We won't flush buffers
+	 * in use by other backends.
+	 */
+	if (!(buf_state & BM_DIRTY) ||
+		BUF_STATE_GET_REFCOUNT(buf_state) > 0 ||
+		BUF_STATE_GET_USAGECOUNT(buf_state) > 1)
+	{
+		UnlockBufHdr(bufdesc, buf_state);
+		return;
+	}
+
+	ReservePrivateRefCountEntry();
+	ResourceOwnerEnlarge(CurrentResourceOwner);
+
+	/* Releases buffer header lock before acquiring content lock */
+	PinBuffer_Locked(bufdesc);
+	content_lock = BufferDescriptorGetContentLock(bufdesc);
+	if (!LWLockConditionalAcquire(content_lock, LW_SHARED))
+	{
+		UnpinBuffer(bufdesc);
+		return;
+	}
+
+	CheckBufferIsPinnedOnce(buffer);
+
+	/* Need buffer header lock to get the LSN */
+	buf_state = LockBufHdr(bufdesc);
+	lsn = BufferGetLSN(bufdesc);
+	UnlockBufHdr(bufdesc, buf_state);
+
+	if (XLogNeedsFlush(lsn))
+	{
+		UnlockReleaseBuffer(buffer);
+		return;
+	}
+
+	FlushBuffer(bufdesc, NULL, IOOBJECT_RELATION, io_context);
+
+	LWLockRelease(content_lock);
+	UnpinBuffer(bufdesc);
+
+	ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context,
+								  &bufdesc->tag);
+}
+
 static Buffer
 GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context)
 {
@@ -2446,6 +2514,9 @@ again:
 
 		ScheduleBufferTagForWriteback(&BackendWritebackContext, io_context,
 									  &buf_hdr->tag);
+
+		if (strategy && from_ring)
+			EagerFlushStrategyRing(strategy, io_context);
 	}
 
 
diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c
index 01909be0272..a4fefb599c7 100644
--- a/src/backend/storage/buffer/freelist.c
+++ b/src/backend/storage/buffer/freelist.c
@@ -98,6 +98,10 @@ static BufferDesc *GetBufferFromRing(BufferAccessStrategy strategy,
 static void AddBufferToRing(BufferAccessStrategy strategy,
 							BufferDesc *buf);
 
+static bool StrategySupportsEagerFlush(BufferAccessStrategy strategy);
+
+static int	StrategySweepNext(BufferAccessStrategy strategy, int current);
+
 /*
  * ClockSweepTick - Helper routine for StrategyGetBuffer()
  *
@@ -180,6 +184,31 @@ have_free_buffer(void)
 		return false;
 }
 
+/*
+ * Some BufferAccessStrategies support eager flushing -- which is flushing
+ * data in buffers in the ring before they are needed. This can lead to better
+ * I/O patterns than lazily flushing buffers directly before reusing them.
+ */
+bool
+StrategySupportsEagerFlush(BufferAccessStrategy strategy)
+{
+	Assert(strategy);
+
+	switch (strategy->btype)
+	{
+		case BAS_BULKWRITE:
+			return true;
+		case BAS_VACUUM:
+		case BAS_NORMAL:
+		case BAS_BULKREAD:
+			return false;
+		default:
+			elog(ERROR, "unrecognized buffer access strategy: %d",
+				 (int) strategy->btype);
+			return false;
+	}
+}
+
 /*
  * StrategyGetBuffer
  *
@@ -780,6 +809,44 @@ GetBufferFromRing(BufferAccessStrategy strategy, uint32 *buf_state)
 	return NULL;
 }
 
+static int
+StrategySweepNext(BufferAccessStrategy strategy, int current)
+{
+	int			next = current + 1;
+
+	if (next >= strategy->nbuffers)
+		next = 0;
+	return next;
+}
+
+/*
+ * Flush all the buffers we can in the strategy ring. This encourages write
+ * batching at the kernel level and leaves a ring full of clean buffers. We'll
+ * skip flushing buffers that would require us to flush WAL first.
+ */
+void
+EagerFlushStrategyRing(BufferAccessStrategy strategy, IOContext io_context)
+{
+	int			sweep_start,
+				sweep_current;
+
+	if (!StrategySupportsEagerFlush(strategy))
+		return;
+
+	sweep_start = StrategySweepNext(strategy, strategy->current);
+	sweep_current = sweep_start;
+
+	do
+	{
+		Buffer		bufnum = strategy->buffers[sweep_current];
+
+		if (bufnum != InvalidBuffer)
+			QuickCleanBuffer(bufnum, io_context);
+		sweep_current = StrategySweepNext(strategy, sweep_current);
+	} while (sweep_start != sweep_current);
+}
+
+
 /*
  * AddBufferToRing -- add a buffer to the buffer ring
  *
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index 52a71b138f7..974f494557a 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -433,6 +433,7 @@ extern void WritebackContextInit(WritebackContext *context, int *max_pending);
 extern void IssuePendingWritebacks(WritebackContext *wb_context, IOContext io_context);
 extern void ScheduleBufferTagForWriteback(WritebackContext *wb_context,
 										  IOContext io_context, BufferTag *tag);
+extern void QuickCleanBuffer(Buffer buffer, IOContext io_context);
 
 /* solely to make it easier to write tests */
 extern bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait);
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 41fdc1e7693..8214ef982aa 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -14,6 +14,7 @@
 #ifndef BUFMGR_H
 #define BUFMGR_H
 
+#include "pgstat.h"
 #include "port/pg_iovec.h"
 #include "storage/aio_types.h"
 #include "storage/block.h"
@@ -331,6 +332,7 @@ extern BufferAccessStrategy GetAccessStrategyWithSize(BufferAccessStrategyType b
 extern int	GetAccessStrategyBufferCount(BufferAccessStrategy strategy);
 extern int	GetAccessStrategyPinLimit(BufferAccessStrategy strategy);
 
+extern void EagerFlushStrategyRing(BufferAccessStrategy strategy, IOContext io_context);
 extern void FreeAccessStrategy(BufferAccessStrategy strategy);
 
 
-- 
2.43.0

