From a08a59674ab0297ec89e8790adc4f7c65a0ab6f0 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Wed, 22 Jan 2025 13:44:52 -0500
Subject: [PATCH v2.4 26/29] bufmgr: use AIO in checkpointer, bgwriter

This is far from ready - just included to be able to exercise AIO writes and
get some preliminary numbers.  In all likelihood this will instead be based
ontop of work by Thomas Munro instead of the preceding commit.
---
 src/include/postmaster/bgwriter.h     |   3 +-
 src/include/storage/buf_internals.h   |   2 +
 src/include/storage/bufmgr.h          |   3 +-
 src/include/storage/bufpage.h         |   1 +
 src/backend/postmaster/bgwriter.c     |  19 +-
 src/backend/postmaster/checkpointer.c |  11 +-
 src/backend/storage/buffer/bufmgr.c   | 588 +++++++++++++++++++++++---
 src/backend/storage/page/bufpage.c    |  10 +
 src/tools/pgindent/typedefs.list      |   1 +
 9 files changed, 580 insertions(+), 58 deletions(-)

diff --git a/src/include/postmaster/bgwriter.h b/src/include/postmaster/bgwriter.h
index 2d5854e6879..517c40cd804 100644
--- a/src/include/postmaster/bgwriter.h
+++ b/src/include/postmaster/bgwriter.h
@@ -31,7 +31,8 @@ extern void BackgroundWriterMain(char *startup_data, size_t startup_data_len) pg
 extern void CheckpointerMain(char *startup_data, size_t startup_data_len) pg_attribute_noreturn();
 
 extern void RequestCheckpoint(int flags);
-extern void CheckpointWriteDelay(int flags, double progress);
+struct IOQueue;
+extern void CheckpointWriteDelay(struct IOQueue *ioq, int flags, double progress);
 
 extern bool ForwardSyncRequest(const FileTag *ftag, SyncRequestType type);
 
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index 396642415bc..1d3a936837b 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -21,6 +21,8 @@
 #include "storage/buf.h"
 #include "storage/bufmgr.h"
 #include "storage/condition_variable.h"
+#include "storage/io_queue.h"
+#include "storage/latch.h"
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
 #include "storage/smgr.h"
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 655885ff2d0..2cc7d47661f 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -304,7 +304,8 @@ extern bool ConditionalLockBufferForCleanup(Buffer buffer);
 extern bool IsBufferCleanupOK(Buffer buffer);
 extern bool HoldingBufferPinThatDelaysRecovery(void);
 
-extern bool BgBufferSync(struct WritebackContext *wb_context);
+struct IOQueue;
+extern bool BgBufferSync(struct IOQueue *ioq, struct WritebackContext *wb_context);
 
 extern uint32 GetSoftPinLimit(void);
 extern uint32 GetSoftLocalPinLimit(void);
diff --git a/src/include/storage/bufpage.h b/src/include/storage/bufpage.h
index 6646b6f6371..9c045e81857 100644
--- a/src/include/storage/bufpage.h
+++ b/src/include/storage/bufpage.h
@@ -509,5 +509,6 @@ extern bool PageIndexTupleOverwrite(Page page, OffsetNumber offnum,
 									Item newtup, Size newsize);
 extern char *PageSetChecksumCopy(Page page, BlockNumber blkno);
 extern void PageSetChecksumInplace(Page page, BlockNumber blkno);
+extern bool PageNeedsChecksumCopy(Page page);
 
 #endif							/* BUFPAGE_H */
diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c
index ec1225c433f..1208926c0c9 100644
--- a/src/backend/postmaster/bgwriter.c
+++ b/src/backend/postmaster/bgwriter.c
@@ -38,11 +38,13 @@
 #include "postmaster/auxprocess.h"
 #include "postmaster/bgwriter.h"
 #include "postmaster/interrupt.h"
+#include "storage/aio.h"
 #include "storage/aio_subsys.h"
 #include "storage/buf_internals.h"
 #include "storage/bufmgr.h"
 #include "storage/condition_variable.h"
 #include "storage/fd.h"
+#include "storage/io_queue.h"
 #include "storage/lwlock.h"
 #include "storage/proc.h"
 #include "storage/procsignal.h"
@@ -90,6 +92,7 @@ BackgroundWriterMain(char *startup_data, size_t startup_data_len)
 	sigjmp_buf	local_sigjmp_buf;
 	MemoryContext bgwriter_context;
 	bool		prev_hibernate;
+	IOQueue    *ioq;
 	WritebackContext wb_context;
 
 	Assert(startup_data_len == 0);
@@ -131,6 +134,7 @@ BackgroundWriterMain(char *startup_data, size_t startup_data_len)
 											 ALLOCSET_DEFAULT_SIZES);
 	MemoryContextSwitchTo(bgwriter_context);
 
+	ioq = io_queue_create(128, 0);
 	WritebackContextInit(&wb_context, &bgwriter_flush_after);
 
 	/*
@@ -228,12 +232,22 @@ BackgroundWriterMain(char *startup_data, size_t startup_data_len)
 		/* Clear any already-pending wakeups */
 		ResetLatch(MyLatch);
 
+		/*
+		 * FIXME: this is theoretically racy, but I didn't want to copy
+		 * HandleMainLoopInterrupts() remaining body here.
+		 */
+		if (ShutdownRequestPending)
+		{
+			io_queue_wait_all(ioq);
+			io_queue_free(ioq);
+		}
+
 		HandleMainLoopInterrupts();
 
 		/*
 		 * Do one cycle of dirty-buffer writing.
 		 */
-		can_hibernate = BgBufferSync(&wb_context);
+		can_hibernate = BgBufferSync(ioq, &wb_context);
 
 		/* Report pending statistics to the cumulative stats system */
 		pgstat_report_bgwriter();
@@ -250,6 +264,9 @@ BackgroundWriterMain(char *startup_data, size_t startup_data_len)
 			smgrdestroyall();
 		}
 
+		/* finish IO before sleeping, to avoid blocking other backends */
+		io_queue_wait_all(ioq);
+
 		/*
 		 * Log a new xl_running_xacts every now and then so replication can
 		 * get into a consistent state faster (think of suboverflowed
diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c
index d254b2d1587..89baec0dd69 100644
--- a/src/backend/postmaster/checkpointer.c
+++ b/src/backend/postmaster/checkpointer.c
@@ -49,10 +49,12 @@
 #include "postmaster/bgwriter.h"
 #include "postmaster/interrupt.h"
 #include "replication/syncrep.h"
+#include "storage/aio.h"
 #include "storage/aio_subsys.h"
 #include "storage/bufmgr.h"
 #include "storage/condition_variable.h"
 #include "storage/fd.h"
+#include "storage/io_queue.h"
 #include "storage/ipc.h"
 #include "storage/lwlock.h"
 #include "storage/pmsignal.h"
@@ -766,7 +768,7 @@ ImmediateCheckpointRequested(void)
  * fraction between 0.0 meaning none, and 1.0 meaning all done.
  */
 void
-CheckpointWriteDelay(int flags, double progress)
+CheckpointWriteDelay(IOQueue *ioq, int flags, double progress)
 {
 	static int	absorb_counter = WRITES_PER_ABSORB;
 
@@ -800,6 +802,13 @@ CheckpointWriteDelay(int flags, double progress)
 		/* Report interim statistics to the cumulative stats system */
 		pgstat_report_checkpointer();
 
+		/*
+		 * Ensure all pending IO is submitted to avoid unnecessary delays for
+		 * other processes.
+		 */
+		io_queue_wait_all(ioq);
+
+
 		/*
 		 * This sleep used to be connected to bgwriter_delay, typically 200ms.
 		 * That resulted in more frequent wakeups if not much work to do.
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 65acc891ef1..8b835d962d9 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -52,6 +52,7 @@
 #include "storage/buf_internals.h"
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
+#include "storage/io_queue.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
 #include "storage/proc.h"
@@ -77,6 +78,7 @@
 /* Bits in SyncOneBuffer's return value */
 #define BUF_WRITTEN				0x01
 #define BUF_REUSABLE			0x02
+#define BUF_CANT_MERGE			0x04
 
 #define RELS_BSEARCH_THRESHOLD		20
 
@@ -513,8 +515,6 @@ static void UnpinBuffer(BufferDesc *buf);
 static void UnpinBufferNoOwner(BufferDesc *buf);
 static void BufferSync(int flags);
 static uint32 WaitBufHdrUnlocked(BufferDesc *buf);
-static int	SyncOneBuffer(int buf_id, bool skip_recently_used,
-						  WritebackContext *wb_context);
 static void WaitIO(BufferDesc *buf);
 static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty,
 							  uint32 set_flag_bits, bool forget_owner,
@@ -533,6 +533,7 @@ static bool AsyncReadBuffers(ReadBuffersOperation *operation,
 static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context);
 static void FlushBuffer(BufferDesc *buf, SMgrRelation reln,
 						IOObject io_object, IOContext io_context);
+
 static void FindAndDropRelationBuffers(RelFileLocator rlocator,
 									   ForkNumber forkNum,
 									   BlockNumber nForkBlock,
@@ -3182,6 +3183,57 @@ UnpinBufferNoOwner(BufferDesc *buf)
 	}
 }
 
+typedef struct BuffersToWrite
+{
+	int			nbuffers;
+	BufferTag	start_at_tag;
+	uint32		max_combine;
+
+	XLogRecPtr	max_lsn;
+
+	PgAioHandle *ioh;
+	PgAioWaitRef iow;
+
+	uint64		total_writes;
+
+	Buffer		buffers[IOV_MAX];
+	PgAioBounceBuffer *bounce_buffers[IOV_MAX];
+	const void *data_ptrs[IOV_MAX];
+} BuffersToWrite;
+
+static int	PrepareToWriteBuffer(BuffersToWrite *to_write, Buffer buf,
+								 bool skip_recently_used,
+								 IOQueue *ioq, WritebackContext *wb_context);
+
+static void WriteBuffers(BuffersToWrite *to_write,
+						 IOQueue *ioq, WritebackContext *wb_context);
+
+static void
+BuffersToWriteInit(BuffersToWrite *to_write,
+				   IOQueue *ioq, WritebackContext *wb_context)
+{
+	to_write->total_writes = 0;
+	to_write->nbuffers = 0;
+	to_write->ioh = NULL;
+	pgaio_wref_clear(&to_write->iow);
+	to_write->max_lsn = InvalidXLogRecPtr;
+
+	pgaio_enter_batchmode();
+}
+
+static void
+BuffersToWriteEnd(BuffersToWrite *to_write)
+{
+	if (to_write->ioh != NULL)
+	{
+		pgaio_io_release(to_write->ioh);
+		to_write->ioh = NULL;
+	}
+
+	pgaio_exit_batchmode();
+}
+
+
 #define ST_SORT sort_checkpoint_bufferids
 #define ST_ELEMENT_TYPE CkptSortItem
 #define ST_COMPARE(a, b) ckpt_buforder_comparator(a, b)
@@ -3213,7 +3265,10 @@ BufferSync(int flags)
 	binaryheap *ts_heap;
 	int			i;
 	int			mask = BM_DIRTY;
+	IOQueue    *ioq;
 	WritebackContext wb_context;
+	BuffersToWrite to_write;
+	int			max_combine;
 
 	/*
 	 * Unless this is a shutdown checkpoint or we have been explicitly told,
@@ -3275,7 +3330,9 @@ BufferSync(int flags)
 	if (num_to_scan == 0)
 		return;					/* nothing to do */
 
+	ioq = io_queue_create(512, 0);
 	WritebackContextInit(&wb_context, &checkpoint_flush_after);
+	max_combine = Min(io_bounce_buffers, io_combine_limit);
 
 	TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_scan);
 
@@ -3383,48 +3440,91 @@ BufferSync(int flags)
 	 */
 	num_processed = 0;
 	num_written = 0;
+
+	BuffersToWriteInit(&to_write, ioq, &wb_context);
+
 	while (!binaryheap_empty(ts_heap))
 	{
 		BufferDesc *bufHdr = NULL;
 		CkptTsStatus *ts_stat = (CkptTsStatus *)
 			DatumGetPointer(binaryheap_first(ts_heap));
+		bool		batch_continue = true;
 
-		buf_id = CkptBufferIds[ts_stat->index].buf_id;
-		Assert(buf_id != -1);
-
-		bufHdr = GetBufferDescriptor(buf_id);
-
-		num_processed++;
+		Assert(ts_stat->num_scanned <= ts_stat->num_to_scan);
 
 		/*
-		 * We don't need to acquire the lock here, because we're only looking
-		 * at a single bit. It's possible that someone else writes the buffer
-		 * and clears the flag right after we check, but that doesn't matter
-		 * since SyncOneBuffer will then do nothing.  However, there is a
-		 * further race condition: it's conceivable that between the time we
-		 * examine the bit here and the time SyncOneBuffer acquires the lock,
-		 * someone else not only wrote the buffer but replaced it with another
-		 * page and dirtied it.  In that improbable case, SyncOneBuffer will
-		 * write the buffer though we didn't need to.  It doesn't seem worth
-		 * guarding against this, though.
+		 * Collect a batch of buffers to write out from the current
+		 * tablespace. That causes some imbalance between the tablespaces, but
+		 * that's more than outweighed by the efficiency gain due to batching.
 		 */
-		if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED)
+		while (batch_continue &&
+			   to_write.nbuffers < max_combine &&
+			   ts_stat->num_scanned < ts_stat->num_to_scan)
 		{
-			if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN)
+			buf_id = CkptBufferIds[ts_stat->index].buf_id;
+			Assert(buf_id != -1);
+
+			bufHdr = GetBufferDescriptor(buf_id);
+
+			num_processed++;
+
+			/*
+			 * We don't need to acquire the lock here, because we're only
+			 * looking at a single bit. It's possible that someone else writes
+			 * the buffer and clears the flag right after we check, but that
+			 * doesn't matter since SyncOneBuffer will then do nothing.
+			 * However, there is a further race condition: it's conceivable
+			 * that between the time we examine the bit here and the time
+			 * SyncOneBuffer acquires the lock, someone else not only wrote
+			 * the buffer but replaced it with another page and dirtied it. In
+			 * that improbable case, SyncOneBuffer will write the buffer
+			 * though we didn't need to.  It doesn't seem worth guarding
+			 * against this, though.
+			 */
+			if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED)
 			{
-				TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
-				PendingCheckpointerStats.buffers_written++;
-				num_written++;
+				int			result = PrepareToWriteBuffer(&to_write, buf_id + 1, false,
+														  ioq, &wb_context);
+
+				if (result & BUF_CANT_MERGE)
+				{
+					Assert(to_write.nbuffers > 0);
+					WriteBuffers(&to_write, ioq, &wb_context);
+
+					result = PrepareToWriteBuffer(&to_write, buf_id + 1, false,
+												  ioq, &wb_context);
+					Assert(result != BUF_CANT_MERGE);
+				}
+
+				if (result & BUF_WRITTEN)
+				{
+					TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
+					PendingCheckpointerStats.buffers_written++;
+					num_written++;
+				}
+				else
+				{
+					batch_continue = false;
+				}
 			}
+			else
+			{
+				if (to_write.nbuffers > 0)
+					WriteBuffers(&to_write, ioq, &wb_context);
+			}
+
+			/*
+			 * Measure progress independent of actually having to flush the
+			 * buffer - otherwise writing become unbalanced.
+			 */
+			ts_stat->progress += ts_stat->progress_slice;
+			ts_stat->num_scanned++;
+			ts_stat->index++;
 		}
 
-		/*
-		 * Measure progress independent of actually having to flush the buffer
-		 * - otherwise writing become unbalanced.
-		 */
-		ts_stat->progress += ts_stat->progress_slice;
-		ts_stat->num_scanned++;
-		ts_stat->index++;
+		if (to_write.nbuffers > 0)
+			WriteBuffers(&to_write, ioq, &wb_context);
+
 
 		/* Have all the buffers from the tablespace been processed? */
 		if (ts_stat->num_scanned == ts_stat->num_to_scan)
@@ -3442,15 +3542,23 @@ BufferSync(int flags)
 		 *
 		 * (This will check for barrier events even if it doesn't sleep.)
 		 */
-		CheckpointWriteDelay(flags, (double) num_processed / num_to_scan);
+		CheckpointWriteDelay(ioq, flags, (double) num_processed / num_to_scan);
 	}
 
+	Assert(to_write.nbuffers == 0);
+	io_queue_wait_all(ioq);
+
 	/*
 	 * Issue all pending flushes. Only checkpointer calls BufferSync(), so
 	 * IOContext will always be IOCONTEXT_NORMAL.
 	 */
 	IssuePendingWritebacks(&wb_context, IOCONTEXT_NORMAL);
 
+	io_queue_wait_all(ioq);		/* IssuePendingWritebacks might have added
+								 * more */
+	io_queue_free(ioq);
+	BuffersToWriteEnd(&to_write);
+
 	pfree(per_ts_stat);
 	per_ts_stat = NULL;
 	binaryheap_free(ts_heap);
@@ -3476,7 +3584,7 @@ BufferSync(int flags)
  * bgwriter_lru_maxpages to 0.)
  */
 bool
-BgBufferSync(WritebackContext *wb_context)
+BgBufferSync(IOQueue *ioq, WritebackContext *wb_context)
 {
 	/* info obtained from freelist.c */
 	int			strategy_buf_id;
@@ -3519,6 +3627,9 @@ BgBufferSync(WritebackContext *wb_context)
 	long		new_strategy_delta;
 	uint32		new_recent_alloc;
 
+	BuffersToWrite to_write;
+	int			max_combine;
+
 	/*
 	 * Find out where the freelist clock sweep currently is, and how many
 	 * buffer allocations have happened since our last call.
@@ -3539,6 +3650,8 @@ BgBufferSync(WritebackContext *wb_context)
 		return true;
 	}
 
+	max_combine = Min(io_bounce_buffers, io_combine_limit);
+
 	/*
 	 * Compute strategy_delta = how many buffers have been scanned by the
 	 * clock sweep since last time.  If first time through, assume none. Then
@@ -3695,11 +3808,25 @@ BgBufferSync(WritebackContext *wb_context)
 	num_written = 0;
 	reusable_buffers = reusable_buffers_est;
 
+	BuffersToWriteInit(&to_write, ioq, wb_context);
+
 	/* Execute the LRU scan */
 	while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est)
 	{
-		int			sync_state = SyncOneBuffer(next_to_clean, true,
-											   wb_context);
+		int			sync_state;
+
+		sync_state = PrepareToWriteBuffer(&to_write, next_to_clean + 1,
+										  true, ioq, wb_context);
+		if (sync_state & BUF_CANT_MERGE)
+		{
+			Assert(to_write.nbuffers > 0);
+
+			WriteBuffers(&to_write, ioq, wb_context);
+
+			sync_state = PrepareToWriteBuffer(&to_write, next_to_clean + 1,
+											  true, ioq, wb_context);
+			Assert(sync_state != BUF_CANT_MERGE);
+		}
 
 		if (++next_to_clean >= NBuffers)
 		{
@@ -3710,6 +3837,13 @@ BgBufferSync(WritebackContext *wb_context)
 
 		if (sync_state & BUF_WRITTEN)
 		{
+			Assert(sync_state & BUF_REUSABLE);
+
+			if (to_write.nbuffers == max_combine)
+			{
+				WriteBuffers(&to_write, ioq, wb_context);
+			}
+
 			reusable_buffers++;
 			if (++num_written >= bgwriter_lru_maxpages)
 			{
@@ -3721,6 +3855,11 @@ BgBufferSync(WritebackContext *wb_context)
 			reusable_buffers++;
 	}
 
+	if (to_write.nbuffers > 0)
+		WriteBuffers(&to_write, ioq, wb_context);
+
+	BuffersToWriteEnd(&to_write);
+
 	PendingBgWriterStats.buf_written_clean += num_written;
 
 #ifdef BGW_DEBUG
@@ -3759,8 +3898,66 @@ BgBufferSync(WritebackContext *wb_context)
 	return (bufs_to_lap == 0 && recent_alloc == 0);
 }
 
+static inline bool
+BufferTagsSameRel(const BufferTag *tag1, const BufferTag *tag2)
+{
+	return (tag1->spcOid == tag2->spcOid) &&
+		(tag1->dbOid == tag2->dbOid) &&
+		(tag1->relNumber == tag2->relNumber) &&
+		(tag1->forkNum == tag2->forkNum)
+		;
+}
+
+static bool
+CanMergeWrite(BuffersToWrite *to_write, BufferDesc *cur_buf_hdr)
+{
+	BlockNumber cur_block = cur_buf_hdr->tag.blockNum;
+
+	Assert(to_write->nbuffers > 0); /* can't merge with nothing */
+	Assert(to_write->start_at_tag.relNumber != InvalidOid);
+	Assert(to_write->start_at_tag.blockNum != InvalidBlockNumber);
+
+	Assert(to_write->ioh != NULL);
+
+	/*
+	 * First check if the blocknumber is one that we could actually merge,
+	 * that's cheaper than checking the tablespace/db/relnumber/fork match.
+	 */
+	if (to_write->start_at_tag.blockNum + to_write->nbuffers != cur_block)
+		return false;
+
+	if (!BufferTagsSameRel(&to_write->start_at_tag, &cur_buf_hdr->tag))
+		return false;
+
+	/*
+	 * Need to check with smgr how large a write we're allowed to make. To
+	 * reduce the overhead of the smgr check, only inquire once, when
+	 * processing the first to-be-merged buffer. That avoids the overhead in
+	 * the common case of writing out buffers that definitely not mergeable.
+	 */
+	if (to_write->nbuffers == 1)
+	{
+		SMgrRelation smgr;
+
+		smgr = smgropen(BufTagGetRelFileLocator(&to_write->start_at_tag), INVALID_PROC_NUMBER);
+
+		to_write->max_combine = smgrmaxcombine(smgr,
+											   to_write->start_at_tag.forkNum,
+											   to_write->start_at_tag.blockNum);
+	}
+	else
+	{
+		Assert(to_write->max_combine > 0);
+	}
+
+	if (to_write->start_at_tag.blockNum + to_write->max_combine <= cur_block)
+		return false;
+
+	return true;
+}
+
 /*
- * SyncOneBuffer -- process a single buffer during syncing.
+ * PrepareToWriteBuffer -- process a single buffer during syncing.
  *
  * If skip_recently_used is true, we don't write currently-pinned buffers, nor
  * buffers marked recently used, as these are not replacement candidates.
@@ -3769,22 +3966,50 @@ BgBufferSync(WritebackContext *wb_context)
  *	BUF_WRITTEN: we wrote the buffer.
  *	BUF_REUSABLE: buffer is available for replacement, ie, it has
  *		pin count 0 and usage count 0.
+ *	BUF_CANT_MERGE: can't combine this write with prior writes, caller needs
+ *		to issue those first
  *
  * (BUF_WRITTEN could be set in error if FlushBuffer finds the buffer clean
  * after locking it, but we don't care all that much.)
  */
 static int
-SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context)
+PrepareToWriteBuffer(BuffersToWrite *to_write, Buffer buf,
+					 bool skip_recently_used,
+					 IOQueue *ioq, WritebackContext *wb_context)
 {
-	BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
+	BufferDesc *cur_buf_hdr = GetBufferDescriptor(buf - 1);
+	uint32		buf_state;
 	int			result = 0;
-	uint32		buf_state;
-	BufferTag	tag;
+	XLogRecPtr	cur_buf_lsn;
+	LWLock	   *content_lock;
+	bool		may_block;
+
+	/*
+	 * Check if this buffer can be written out together with already prepared
+	 * writes. We check before we have pinned the buffer, so the buffer can be
+	 * written out and replaced between this check and us pinning the buffer -
+	 * we'll recheck below. The reason for the pre-check is that we don't want
+	 * to pin the buffer just to find out that we can't merge the IO.
+	 */
+	if (to_write->nbuffers != 0)
+	{
+		if (!CanMergeWrite(to_write, cur_buf_hdr))
+		{
+			result |= BUF_CANT_MERGE;
+			return result;
+		}
+	}
+	else
+	{
+		to_write->start_at_tag = cur_buf_hdr->tag;
+	}
 
 	/* Make sure we can handle the pin */
 	ReservePrivateRefCountEntry();
 	ResourceOwnerEnlarge(CurrentResourceOwner);
 
+	/* XXX: Should also check if we are allowed to pin one more buffer */
+
 	/*
 	 * Check whether buffer needs writing.
 	 *
@@ -3794,7 +4019,7 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context)
 	 * don't worry because our checkpoint.redo points before log record for
 	 * upcoming changes and so we are not required to write such dirty buffer.
 	 */
-	buf_state = LockBufHdr(bufHdr);
+	buf_state = LockBufHdr(cur_buf_hdr);
 
 	if (BUF_STATE_GET_REFCOUNT(buf_state) == 0 &&
 		BUF_STATE_GET_USAGECOUNT(buf_state) == 0)
@@ -3803,40 +4028,294 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context)
 	}
 	else if (skip_recently_used)
 	{
+#if 0
+		elog(LOG, "at block %d: skip recent with nbuffers %d",
+			 cur_buf_hdr->tag.blockNum, to_write->nbuffers);
+#endif
 		/* Caller told us not to write recently-used buffers */
-		UnlockBufHdr(bufHdr, buf_state);
+		UnlockBufHdr(cur_buf_hdr, buf_state);
 		return result;
 	}
 
 	if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY))
 	{
 		/* It's clean, so nothing to do */
-		UnlockBufHdr(bufHdr, buf_state);
+		UnlockBufHdr(cur_buf_hdr, buf_state);
 		return result;
 	}
 
+	/* pin the buffer, from now on its identity can't change anymore */
+	PinBuffer_Locked(cur_buf_hdr);
+
+	/*
+	 * Acquire IO, if needed, now that it's likely that we'll need to write.
+	 */
+	if (to_write->ioh == NULL)
+	{
+		/* otherwise we should already have acquired a handle */
+		Assert(to_write->nbuffers == 0);
+
+		to_write->ioh = io_queue_acquire_io(ioq);
+		pgaio_io_get_wref(to_write->ioh, &to_write->iow);
+	}
+
 	/*
-	 * Pin it, share-lock it, write it.  (FlushBuffer will do nothing if the
-	 * buffer is clean by the time we've locked it.)
+	 * If we are merging, check if the buffer's identity possibly changed
+	 * while we hadn't yet pinned it.
+	 *
+	 * XXX: It might be worth checking if we still want to write the buffer
+	 * out, e.g. it could have been replaced with a buffer that doesn't have
+	 * BM_CHECKPOINT_NEEDED set.
 	 */
-	PinBuffer_Locked(bufHdr);
-	LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
+	if (to_write->nbuffers != 0)
+	{
+		if (!CanMergeWrite(to_write, cur_buf_hdr))
+		{
+			elog(LOG, "changed identity");
+			UnpinBuffer(cur_buf_hdr);
+
+			result |= BUF_CANT_MERGE;
+
+			return result;
+		}
+	}
+
+	may_block = to_write->nbuffers == 0
+		&& !pgaio_have_staged()
+		&& io_queue_is_empty(ioq)
+		;
+	content_lock = BufferDescriptorGetContentLock(cur_buf_hdr);
+
+	if (!may_block)
+	{
+		if (LWLockConditionalAcquire(content_lock, LW_SHARED))
+		{
+			/* done */
+		}
+		else if (to_write->nbuffers == 0)
+		{
+			/*
+			 * Need to wait for all prior IO to finish before blocking for
+			 * lock acquisition, to avoid the risk a deadlock due to us
+			 * waiting for another backend that is waiting for our unsubmitted
+			 * IO to complete.
+			 */
+			pgaio_submit_staged();
+			io_queue_wait_all(ioq);
+
+			elog(DEBUG2, "at block %u: can't block, nbuffers = 0",
+				 cur_buf_hdr->tag.blockNum
+				);
+
+			may_block = to_write->nbuffers == 0
+				&& !pgaio_have_staged()
+				&& io_queue_is_empty(ioq)
+				;
+			Assert(may_block);
+
+			LWLockAcquire(content_lock, LW_SHARED);
+		}
+		else
+		{
+			elog(DEBUG2, "at block %d: can't block nbuffers = %d",
+				 cur_buf_hdr->tag.blockNum,
+				 to_write->nbuffers);
 
-	FlushBuffer(bufHdr, NULL, IOOBJECT_RELATION, IOCONTEXT_NORMAL);
+			UnpinBuffer(cur_buf_hdr);
+			result |= BUF_CANT_MERGE;
+			Assert(to_write->nbuffers > 0);
 
-	LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
+			return result;
+		}
+	}
+	else
+	{
+		LWLockAcquire(content_lock, LW_SHARED);
+	}
 
-	tag = bufHdr->tag;
+	if (!may_block)
+	{
+		if (!StartBufferIO(cur_buf_hdr, false, !may_block))
+		{
+			pgaio_submit_staged();
+			io_queue_wait_all(ioq);
 
-	UnpinBuffer(bufHdr);
+			may_block = io_queue_is_empty(ioq) && to_write->nbuffers == 0 && !pgaio_have_staged();
+
+			if (!StartBufferIO(cur_buf_hdr, false, !may_block))
+			{
+				elog(DEBUG2, "at block %d: non-waitable StartBufferIO returns false, %d",
+					 cur_buf_hdr->tag.blockNum,
+					 may_block);
+
+				/*
+				 * FIXME: can't tell whether this is because the buffer has
+				 * been cleaned
+				 */
+				if (!may_block)
+				{
+					result |= BUF_CANT_MERGE;
+					Assert(to_write->nbuffers > 0);
+				}
+				LWLockRelease(content_lock);
+				UnpinBuffer(cur_buf_hdr);
+
+				return result;
+			}
+		}
+	}
+	else
+	{
+		if (!StartBufferIO(cur_buf_hdr, false, false))
+		{
+			elog(DEBUG2, "waitable StartBufferIO returns false");
+			LWLockRelease(content_lock);
+			UnpinBuffer(cur_buf_hdr);
+
+			/*
+			 * FIXME: Historically we returned BUF_WRITTEN in this case, which
+			 * seems wrong
+			 */
+			return result;
+		}
+	}
 
 	/*
-	 * SyncOneBuffer() is only called by checkpointer and bgwriter, so
-	 * IOContext will always be IOCONTEXT_NORMAL.
+	 * Run PageGetLSN while holding header lock, since we don't have the
+	 * buffer locked exclusively in all cases.
 	 */
-	ScheduleBufferTagForWriteback(wb_context, IOCONTEXT_NORMAL, &tag);
+	buf_state = LockBufHdr(cur_buf_hdr);
+
+	cur_buf_lsn = BufferGetLSN(cur_buf_hdr);
+
+	/* To check if block content changes while flushing. - vadim 01/17/97 */
+	buf_state &= ~BM_JUST_DIRTIED;
+
+	UnlockBufHdr(cur_buf_hdr, buf_state);
+
+	to_write->buffers[to_write->nbuffers] = buf;
+	to_write->nbuffers++;
+
+	if (buf_state & BM_PERMANENT &&
+		(to_write->max_lsn == InvalidXLogRecPtr || to_write->max_lsn < cur_buf_lsn))
+	{
+		to_write->max_lsn = cur_buf_lsn;
+	}
+
+	result |= BUF_WRITTEN;
+
+	return result;
+}
+
+static void
+WriteBuffers(BuffersToWrite *to_write,
+			 IOQueue *ioq, WritebackContext *wb_context)
+{
+	SMgrRelation smgr;
+	Buffer		first_buf;
+	BufferDesc *first_buf_hdr;
+	bool		needs_checksum;
+
+	Assert(to_write->nbuffers > 0 && to_write->nbuffers <= io_combine_limit);
+
+	first_buf = to_write->buffers[0];
+	first_buf_hdr = GetBufferDescriptor(first_buf - 1);
+
+	smgr = smgropen(BufTagGetRelFileLocator(&first_buf_hdr->tag), INVALID_PROC_NUMBER);
+
+	/*
+	 * Force XLOG flush up to buffer's LSN.  This implements the basic WAL
+	 * rule that log updates must hit disk before any of the data-file changes
+	 * they describe do.
+	 *
+	 * However, this rule does not apply to unlogged relations, which will be
+	 * lost after a crash anyway.  Most unlogged relation pages do not bear
+	 * LSNs since we never emit WAL records for them, and therefore flushing
+	 * up through the buffer LSN would be useless, but harmless.  However,
+	 * GiST indexes use LSNs internally to track page-splits, and therefore
+	 * unlogged GiST pages bear "fake" LSNs generated by
+	 * GetFakeLSNForUnloggedRel.  It is unlikely but possible that the fake
+	 * LSN counter could advance past the WAL insertion point; and if it did
+	 * happen, attempting to flush WAL through that location would fail, with
+	 * disastrous system-wide consequences.  To make sure that can't happen,
+	 * skip the flush if the buffer isn't permanent.
+	 */
+	if (to_write->max_lsn != InvalidXLogRecPtr)
+		XLogFlush(to_write->max_lsn);
+
+	/*
+	 * Now it's safe to write buffer to disk. Note that no one else should
+	 * have been able to write it while we were busy with log flushing because
+	 * only one process at a time can set the BM_IO_IN_PROGRESS bit.
+	 */
+
+	for (int nbuf = 0; nbuf < to_write->nbuffers; nbuf++)
+	{
+		Buffer		cur_buf = to_write->buffers[nbuf];
+		BufferDesc *cur_buf_hdr = GetBufferDescriptor(cur_buf - 1);
+		Block		bufBlock;
+		char	   *bufToWrite;
+
+		bufBlock = BufHdrGetBlock(cur_buf_hdr);
+		needs_checksum = PageNeedsChecksumCopy((Page) bufBlock);
+
+		/*
+		 * Update page checksum if desired.  Since we have only shared lock on
+		 * the buffer, other processes might be updating hint bits in it, so
+		 * we must copy the page to a bounce buffer if we do checksumming.
+		 */
+		if (needs_checksum)
+		{
+			PgAioBounceBuffer *bb = pgaio_bounce_buffer_get();
+
+			pgaio_io_assoc_bounce_buffer(to_write->ioh, bb);
+
+			bufToWrite = pgaio_bounce_buffer_buffer(bb);
+			memcpy(bufToWrite, bufBlock, BLCKSZ);
+			PageSetChecksumInplace((Page) bufToWrite, cur_buf_hdr->tag.blockNum);
+		}
+		else
+		{
+			bufToWrite = bufBlock;
+		}
+
+		to_write->data_ptrs[nbuf] = bufToWrite;
+	}
+
+	pgaio_io_set_handle_data_32(to_write->ioh,
+								(uint32 *) to_write->buffers,
+								to_write->nbuffers);
+	pgaio_io_register_callbacks(to_write->ioh, PGAIO_HCB_SHARED_BUFFER_WRITEV);
+
+	smgrstartwritev(to_write->ioh, smgr,
+					BufTagGetForkNum(&first_buf_hdr->tag),
+					first_buf_hdr->tag.blockNum,
+					to_write->data_ptrs,
+					to_write->nbuffers,
+					false);
+	pgstat_count_io_op(IOOBJECT_RELATION, IOCONTEXT_NORMAL,
+					   IOOP_WRITE, 1, BLCKSZ * to_write->nbuffers);
+
+
+	for (int nbuf = 0; nbuf < to_write->nbuffers; nbuf++)
+	{
+		Buffer		cur_buf = to_write->buffers[nbuf];
+		BufferDesc *cur_buf_hdr = GetBufferDescriptor(cur_buf - 1);
+
+		UnpinBuffer(cur_buf_hdr);
+	}
+
+	io_queue_track(ioq, &to_write->iow);
+	to_write->total_writes++;
 
-	return result | BUF_WRITTEN;
+	/* clear state for next write */
+	to_write->nbuffers = 0;
+	to_write->start_at_tag.relNumber = InvalidOid;
+	to_write->start_at_tag.blockNum = InvalidBlockNumber;
+	to_write->max_combine = 0;
+	to_write->max_lsn = InvalidXLogRecPtr;
+	to_write->ioh = NULL;
+	pgaio_wref_clear(&to_write->iow);
 }
 
 /*
@@ -4212,6 +4691,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	error_context_stack = errcallback.previous;
 }
 
+
 /*
  * RelationGetNumberOfBlocksInFork
  *		Determines the current number of pages in the specified relation fork.
diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c
index 91da73dda8b..c4a78dc96d2 100644
--- a/src/backend/storage/page/bufpage.c
+++ b/src/backend/storage/page/bufpage.c
@@ -1480,6 +1480,16 @@ PageIndexTupleOverwrite(Page page, OffsetNumber offnum,
 	return true;
 }
 
+bool
+PageNeedsChecksumCopy(Page page)
+{
+	if (PageIsNew(page))
+		return false;
+
+	/* If we don't need a checksum, just return the passed-in data */
+	return DataChecksumsEnabled();
+}
+
 
 /*
  * Set checksum for a page in shared buffers.
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index d084c476ec8..d1d8758566e 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -348,6 +348,7 @@ BufferManagerRelation
 BufferStrategyControl
 BufferTag
 BufferUsage
+BuffersToWrite
 BuildAccumulator
 BuiltinScript
 BulkInsertState
-- 
2.48.1.76.g4e746b1a31.dirty

