From f7073e7b0706e5a699e1a1e7e486c30a5346a012 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Wed, 15 Oct 2025 15:23:16 -0400
Subject: [PATCH v15 12/19] Write combining for checkpointer

When the checkpointer writes out dirty buffers, writing multiple
contiguous blocks as a single IO is a substantial performance
improvement. The checkpointer is usually bottlenecked on IO, so issuing
larger IOs leads to increased write throughput and faster checkpoints.

Author: Melanie Plageman <melanieplageman@gmail.com>
Reviewed-by: Chao Li <li.evan.chao@gmail.com>
Reviewed-by: Soumya <bharatdbpg@gmail.com>
Discussion: https://postgr.es/m/2FA0BAC7-5413-4ABD-94CA-4398FE77750D%40gmail.com
---
 src/backend/storage/buffer/bufmgr.c | 238 ++++++++++++++++++++++++----
 src/backend/utils/probes.d          |   2 +-
 2 files changed, 209 insertions(+), 31 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 713aba1260d..7e9bdc7ef85 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -3768,8 +3768,6 @@ TrackNewBufferPin(Buffer buf)
 void
 CheckPointBuffers(int flags)
 {
-	uint64		buf_state;
-	int			buf_id;
 	int			num_to_scan;
 	int			num_spaces;
 	int			num_processed;
@@ -3780,6 +3778,8 @@ CheckPointBuffers(int flags)
 	int			i;
 	uint64		mask = BM_DIRTY;
 	WritebackContext wb_context;
+	uint32		max_batch_size;
+	BufferWriteBatch batch;
 
 	/*
 	 * Unless this is a shutdown checkpoint or we have been explicitly told,
@@ -3807,10 +3807,11 @@ CheckPointBuffers(int flags)
 	 * certainly need to be written for the next checkpoint attempt, too.
 	 */
 	num_to_scan = 0;
-	for (buf_id = 0; buf_id < NBuffers; buf_id++)
+	for (int buf_id = 0; buf_id < NBuffers; buf_id++)
 	{
 		BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
 		uint64		set_bits = 0;
+		uint64		buf_state;
 
 		/*
 		 * Header spinlock is enough to examine BM_DIRTY, see comment in
@@ -3953,48 +3954,224 @@ CheckPointBuffers(int flags)
 	 */
 	num_processed = 0;
 	num_written = 0;
+	max_batch_size = MaxWriteBatchSize(NULL);
 	while (!binaryheap_empty(ts_heap))
 	{
-		BufferDesc *bufHdr = NULL;
+		BlockNumber batch_limit = max_batch_size;
+		BlockNumber batch_start = InvalidBlockNumber;
 		CkptTsStatus *ts_stat = (CkptTsStatus *)
 			DatumGetPointer(binaryheap_first(ts_heap));
+		int			ts_end = ts_stat->index - ts_stat->num_scanned + ts_stat->num_to_scan;
+		int			processed = 0;
 
-		buf_id = CkptBufferIds[ts_stat->index].buf_id;
-		Assert(buf_id != -1);
+		batch.max_lsn = InvalidXLogRecPtr;
+		batch.n = 0;
 
-		bufHdr = GetBufferDescriptor(buf_id);
+		while (batch.n < batch_limit)
+		{
+			BufferDesc *bufHdr = NULL;
+			uint64		buf_state;
+			CkptSortItem item;
+			Buffer		bufnum;
+			StartBufferIOResult status;
 
-		num_processed++;
+			if (ProcSignalBarrierPending)
+				ProcessProcSignalBarrier();
 
-		/*
-		 * We don't need to acquire the lock here, because we're only looking
-		 * at a single bit. It's possible that someone else writes the buffer
-		 * and clears the flag right after we check, but that doesn't matter
-		 * since SyncOneBuffer will then do nothing.  However, there is a
-		 * further race condition: it's conceivable that between the time we
-		 * examine the bit here and the time SyncOneBuffer acquires the lock,
-		 * someone else not only wrote the buffer but replaced it with another
-		 * page and dirtied it.  In that improbable case, SyncOneBuffer will
-		 * write the buffer though we didn't need to.  It doesn't seem worth
-		 * guarding against this, though.
-		 */
-		if (pg_atomic_read_u64(&bufHdr->state) & BM_CHECKPOINT_NEEDED)
-		{
-			if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN)
+			/* Check if we are done with this tablespace */
+			if (ts_stat->index + processed >= ts_end)
+				break;
+
+			item = CkptBufferIds[ts_stat->index + processed];
+
+			Assert(item.buf_id != -1);
+
+			bufHdr = GetBufferDescriptor(item.buf_id);
+			bufnum = BufferDescriptorGetBuffer(bufHdr);
+
+			/*
+			 * If this is the first block of the batch, then check if we need
+			 * to open a new relation. Open the relation now because we have
+			 * to determine the maximum IO size based on how many blocks
+			 * remain in the file.
+			 */
+			if (!BlockNumberIsValid(batch_start))
+			{
+				RelFileLocator rlocator = {
+					.spcOid = item.tsId,
+					.dbOid = item.dbId,
+					.relNumber = item.relNumber
+				};
+
+				Assert(batch.max_lsn == InvalidXLogRecPtr && batch.n == 0);
+				batch.forkno = item.forkNum;
+				batch_start = item.blockNum;
+				batch.reln = smgropen(rlocator, INVALID_PROC_NUMBER);
+				batch_limit = smgrmaxcombine(batch.reln, batch.forkno, batch_start);
+				batch_limit = Min(max_batch_size, batch_limit);
+				batch_limit = Min(GetAdditionalPinLimit(), batch_limit);
+				/* Guarantee progress even if at max pins */
+				batch_limit = Max(batch_limit, 1);
+			}
+
+			/*
+			 * Once we hit blocks from the next relation or fork of the
+			 * relation, break out of the loop and issue the IO we've built up
+			 * so far. It is important that we don't increment processed
+			 * because we want to start the next IO with this item.
+			 */
+			if (item.dbId != batch.reln->smgr_rlocator.locator.dbOid ||
+				item.relNumber != batch.reln->smgr_rlocator.locator.relNumber ||
+				item.forkNum != batch.forkno)
+				break;
+
+			Assert(item.tsId == batch.reln->smgr_rlocator.locator.spcOid);
+
+			/*
+			 * If the next block is not contiguous, we can't include it in the
+			 * IO we will issue. Break out of the loop and issue what we have
+			 * so far. Do not count this item as processed -- otherwise we
+			 * will end up skipping it.
+			 */
+			if (item.blockNum != batch_start + batch.n)
+				break;
+
+			/*
+			 * We don't need to acquire the lock here, because we're only
+			 * looking at a few bits. It's possible that someone else writes
+			 * the buffer and clears the flag right after we check, but that
+			 * doesn't matter since StartBufferIO will then return false.
+			 *
+			 * If the buffer doesn't need checkpointing, don't include it in
+			 * the batch we are building. And if the buffer doesn't need
+			 * flushing, we're done with the item, so count it as processed
+			 * and break out of the loop to issue the IO so far.
+			 *
+			 * It's okay for us to check if the buffer needs flushing (if it's
+			 * dirty) without holding the buffer content lock as long as we
+			 * mark pages dirty in access methods *before* logging changes
+			 * with XLogInsert(): if someone marks the buffer dirty just after
+			 * our check we don't worry because our checkpoint.redo points
+			 * before log record for upcoming changes and so we are not
+			 * required to write such a dirty buffer.
+			 */
+			buf_state = pg_atomic_read_u64(&bufHdr->state);
+			if ((buf_state & (BM_CHECKPOINT_NEEDED | BM_VALID | BM_DIRTY)) !=
+				(BM_CHECKPOINT_NEEDED | BM_VALID | BM_DIRTY))
+			{
+				processed++;
+				break;
+			}
+
+			ReservePrivateRefCountEntry();
+			ResourceOwnerEnlarge(CurrentResourceOwner);
+
+			/* If the buffer is not BM_VALID, nothing to do on this buffer */
+			if (!PinBuffer(bufHdr, BUC_ZERO, true))
+			{
+				processed++;
+				break;
+			}
+
+			/*
+			 * Now that we have a pin, we must recheck that the buffer
+			 * contains the specified block. Someone may have replaced the
+			 * block in the buffer with a different block. In that case, count
+			 * it as processed and issue the IO so far. These fields won't
+			 * change as long as we hold a pin, so we don't need a spinlock to
+			 * read them.
+			 */
+			if (!BufTagMatchesRelFileLocator(&bufHdr->tag,
+											 &batch.reln->smgr_rlocator.locator) ||
+				BufTagGetForkNum(&bufHdr->tag) != batch.forkno ||
+				bufHdr->tag.blockNum != batch_start + batch.n)
+			{
+				UnpinBuffer(bufHdr);
+				processed++;
+				break;
+			}
+
+			/*
+			 * It's conceivable that between the time we examine the buffer
+			 * header for BM_CHECKPOINT_NEEDED above and when we are now
+			 * acquiring the lock that someone else wrote the buffer out. In
+			 * that improbable case, we will write the buffer though we didn't
+			 * need to. It doesn't seem worth guarding against this, though.
+			 *
+			 * We are willing to wait for the content lock on the first IO in
+			 * the batch. However, for subsequent IOs, waiting could lead to
+			 * deadlock. We have to eventually flush all eligible buffers,
+			 * though. So, if we fail to acquire the lock on a subsequent
+			 * buffer, we break out and issue the IO we've built up so far.
+			 * Then we come back and start a new IO with that buffer as the
+			 * starting buffer. As such, we must not count the item as
+			 * processed if we end up failing to acquire the content lock.
+			 */
+			if (batch.n == 0)
+				BufferLockAcquire(bufnum, bufHdr, BUFFER_LOCK_SHARE_EXCLUSIVE);
+			else if (!BufferLockConditional(bufnum, bufHdr, BUFFER_LOCK_SHARE_EXCLUSIVE))
 			{
-				TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
-				PendingCheckpointerStats.buffers_written++;
-				num_written++;
+				UnpinBuffer(bufHdr);
+				break;
 			}
+
+			/*
+			 * If the buffer doesn't need IO, count the item as processed,
+			 * release the buffer, and break out of the loop to issue the IO
+			 * we have built up so far.
+			 */
+			if ((status = StartBufferIO(bufnum, false, true, NULL)) !=
+				BUFFER_IO_READY_FOR_IO)
+			{
+				Assert(status == BUFFER_IO_ALREADY_DONE);
+				UnlockReleaseBuffer(bufnum);
+				processed++;
+				break;
+			}
+
+			/*
+			 * Keep track of the max LSN so that we can be sure to flush
+			 * enough WAL before flushing data from the buffers. See comment
+			 * in DoFlushBuffer() for more on why we don't consider the LSNs
+			 * of unlogged relations.
+			 */
+			if (pg_atomic_read_u64(&bufHdr->state) & BM_PERMANENT)
+			{
+				XLogRecPtr	lsn = BufferGetLSN(bufHdr);
+
+				if (lsn > batch.max_lsn)
+					batch.max_lsn = lsn;
+			}
+
+			batch.bufhdrs[batch.n++] = bufHdr;
+			processed++;
 		}
 
 		/*
 		 * Measure progress independent of actually having to flush the buffer
-		 * - otherwise writing become unbalanced.
+		 * - otherwise writing becomes unbalanced.
+		 */
+		num_processed += processed;
+		ts_stat->progress += ts_stat->progress_slice * processed;
+		ts_stat->num_scanned += processed;
+		ts_stat->index += processed;
+
+		/*
+		 * If we built up an IO, issue it. There's a chance we didn't find any
+		 * items referencing buffers that needed flushing this time, but we
+		 * still want to check if we should update the heap if we examined and
+		 * processed the items.
 		 */
-		ts_stat->progress += ts_stat->progress_slice;
-		ts_stat->num_scanned++;
-		ts_stat->index++;
+		if (batch.n > 0)
+		{
+			FlushBufferBatch(&batch, IOCONTEXT_NORMAL);
+			CompleteWriteBatchIO(&batch, IOCONTEXT_NORMAL, &wb_context);
+
+			TRACE_POSTGRESQL_BUFFER_BATCH_SYNC_WRITTEN(batch.n);
+			PendingCheckpointerStats.buffers_written += batch.n;
+			num_written += batch.n;
+			batch.n = 0;
+		}
 
 		/* Have all the buffers from the tablespace been processed? */
 		if (ts_stat->num_scanned == ts_stat->num_to_scan)
@@ -4012,6 +4189,7 @@ CheckPointBuffers(int flags)
 		 *
 		 * (This will check for barrier events even if it doesn't sleep.)
 		 */
+		Assert(batch.n == 0);
 		CheckpointWriteDelay(flags, (double) num_processed / num_to_scan);
 	}
 
diff --git a/src/backend/utils/probes.d b/src/backend/utils/probes.d
index e0f48c6d2d9..90169c92c26 100644
--- a/src/backend/utils/probes.d
+++ b/src/backend/utils/probes.d
@@ -68,7 +68,7 @@ provider postgresql {
 	probe buffer__checkpoint__sync__start();
 	probe buffer__checkpoint__done();
 	probe buffer__sync__start(int, int);
-	probe buffer__sync__written(int);
+	probe buffer__batch__sync__written(BlockNumber);
 	probe buffer__sync__done(int, int, int);
 
 	probe deadlock__found();
-- 
2.43.0

