From 4a65eb21e12504385b1c334758e5ce53d6100896 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Wed, 22 Jan 2025 13:44:52 -0500
Subject: [PATCH v2.6 30/34] bufmgr: use AIO in checkpointer, bgwriter

This is far from ready - just included to be able to exercise AIO writes and
get some preliminary numbers.  In all likelihood this will instead be based
on-top of work by Thomas Munro instead of the preceding commit.
---
 src/include/postmaster/bgwriter.h     |   3 +-
 src/include/storage/buf_internals.h   |   2 +
 src/include/storage/bufmgr.h          |   3 +-
 src/include/storage/bufpage.h         |   1 +
 src/backend/postmaster/bgwriter.c     |  19 +-
 src/backend/postmaster/checkpointer.c |  11 +-
 src/backend/storage/buffer/bufmgr.c   | 588 +++++++++++++++++++++++---
 src/backend/storage/page/bufpage.c    |  10 +
 src/tools/pgindent/typedefs.list      |   1 +
 9 files changed, 580 insertions(+), 58 deletions(-)

diff --git a/src/include/postmaster/bgwriter.h b/src/include/postmaster/bgwriter.h
index 4fd717169f0..4208d5f2c97 100644
--- a/src/include/postmaster/bgwriter.h
+++ b/src/include/postmaster/bgwriter.h
@@ -31,7 +31,8 @@ extern void BackgroundWriterMain(const void *startup_data, size_t startup_data_l
 extern void CheckpointerMain(const void *startup_data, size_t startup_data_len) pg_attribute_noreturn();
 
 extern void RequestCheckpoint(int flags);
-extern void CheckpointWriteDelay(int flags, double progress);
+struct IOQueue;
+extern void CheckpointWriteDelay(struct IOQueue *ioq, int flags, double progress);
 
 extern bool ForwardSyncRequest(const FileTag *ftag, SyncRequestType type);
 
diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h
index 3d7ebf96a7e..3db2f786177 100644
--- a/src/include/storage/buf_internals.h
+++ b/src/include/storage/buf_internals.h
@@ -21,6 +21,8 @@
 #include "storage/buf.h"
 #include "storage/bufmgr.h"
 #include "storage/condition_variable.h"
+#include "storage/io_queue.h"
+#include "storage/latch.h"
 #include "storage/lwlock.h"
 #include "storage/procnumber.h"
 #include "storage/shmem.h"
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index a2bff99fb55..9ed4468d8e1 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -299,7 +299,8 @@ extern bool ConditionalLockBufferForCleanup(Buffer buffer);
 extern bool IsBufferCleanupOK(Buffer buffer);
 extern bool HoldingBufferPinThatDelaysRecovery(void);
 
-extern bool BgBufferSync(struct WritebackContext *wb_context);
+struct IOQueue;
+extern bool BgBufferSync(struct IOQueue *ioq, struct WritebackContext *wb_context);
 
 extern uint32 GetSoftPinLimit(void);
 extern uint32 GetSoftLocalPinLimit(void);
diff --git a/src/include/storage/bufpage.h b/src/include/storage/bufpage.h
index 6646b6f6371..9c045e81857 100644
--- a/src/include/storage/bufpage.h
+++ b/src/include/storage/bufpage.h
@@ -509,5 +509,6 @@ extern bool PageIndexTupleOverwrite(Page page, OffsetNumber offnum,
 									Item newtup, Size newsize);
 extern char *PageSetChecksumCopy(Page page, BlockNumber blkno);
 extern void PageSetChecksumInplace(Page page, BlockNumber blkno);
+extern bool PageNeedsChecksumCopy(Page page);
 
 #endif							/* BUFPAGE_H */
diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c
index 72f5acceec7..6e8801a39e3 100644
--- a/src/backend/postmaster/bgwriter.c
+++ b/src/backend/postmaster/bgwriter.c
@@ -38,11 +38,13 @@
 #include "postmaster/auxprocess.h"
 #include "postmaster/bgwriter.h"
 #include "postmaster/interrupt.h"
+#include "storage/aio.h"
 #include "storage/aio_subsys.h"
 #include "storage/buf_internals.h"
 #include "storage/bufmgr.h"
 #include "storage/condition_variable.h"
 #include "storage/fd.h"
+#include "storage/io_queue.h"
 #include "storage/lwlock.h"
 #include "storage/proc.h"
 #include "storage/procsignal.h"
@@ -90,6 +92,7 @@ BackgroundWriterMain(const void *startup_data, size_t startup_data_len)
 	sigjmp_buf	local_sigjmp_buf;
 	MemoryContext bgwriter_context;
 	bool		prev_hibernate;
+	IOQueue    *ioq;
 	WritebackContext wb_context;
 
 	Assert(startup_data_len == 0);
@@ -131,6 +134,7 @@ BackgroundWriterMain(const void *startup_data, size_t startup_data_len)
 											 ALLOCSET_DEFAULT_SIZES);
 	MemoryContextSwitchTo(bgwriter_context);
 
+	ioq = io_queue_create(128, 0);
 	WritebackContextInit(&wb_context, &bgwriter_flush_after);
 
 	/*
@@ -228,12 +232,22 @@ BackgroundWriterMain(const void *startup_data, size_t startup_data_len)
 		/* Clear any already-pending wakeups */
 		ResetLatch(MyLatch);
 
+		/*
+		 * FIXME: this is theoretically racy, but I didn't want to copy
+		 * ProcessMainLoopInterrupts() remaining body here.
+		 */
+		if (ShutdownRequestPending)
+		{
+			io_queue_wait_all(ioq);
+			io_queue_free(ioq);
+		}
+
 		ProcessMainLoopInterrupts();
 
 		/*
 		 * Do one cycle of dirty-buffer writing.
 		 */
-		can_hibernate = BgBufferSync(&wb_context);
+		can_hibernate = BgBufferSync(ioq, &wb_context);
 
 		/* Report pending statistics to the cumulative stats system */
 		pgstat_report_bgwriter();
@@ -250,6 +264,9 @@ BackgroundWriterMain(const void *startup_data, size_t startup_data_len)
 			smgrdestroyall();
 		}
 
+		/* finish IO before sleeping, to avoid blocking other backends */
+		io_queue_wait_all(ioq);
+
 		/*
 		 * Log a new xl_running_xacts every now and then so replication can
 		 * get into a consistent state faster (think of suboverflowed
diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c
index fda91ffd1ce..904fe167eb4 100644
--- a/src/backend/postmaster/checkpointer.c
+++ b/src/backend/postmaster/checkpointer.c
@@ -49,10 +49,12 @@
 #include "postmaster/bgwriter.h"
 #include "postmaster/interrupt.h"
 #include "replication/syncrep.h"
+#include "storage/aio.h"
 #include "storage/aio_subsys.h"
 #include "storage/bufmgr.h"
 #include "storage/condition_variable.h"
 #include "storage/fd.h"
+#include "storage/io_queue.h"
 #include "storage/ipc.h"
 #include "storage/lwlock.h"
 #include "storage/pmsignal.h"
@@ -766,7 +768,7 @@ ImmediateCheckpointRequested(void)
  * fraction between 0.0 meaning none, and 1.0 meaning all done.
  */
 void
-CheckpointWriteDelay(int flags, double progress)
+CheckpointWriteDelay(IOQueue *ioq, int flags, double progress)
 {
 	static int	absorb_counter = WRITES_PER_ABSORB;
 
@@ -800,6 +802,13 @@ CheckpointWriteDelay(int flags, double progress)
 		/* Report interim statistics to the cumulative stats system */
 		pgstat_report_checkpointer();
 
+		/*
+		 * Ensure all pending IO is submitted to avoid unnecessary delays for
+		 * other processes.
+		 */
+		io_queue_wait_all(ioq);
+
+
 		/*
 		 * This sleep used to be connected to bgwriter_delay, typically 200ms.
 		 * That resulted in more frequent wakeups if not much work to do.
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 1f47edaa7b9..088a6137b60 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -52,6 +52,7 @@
 #include "storage/buf_internals.h"
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
+#include "storage/io_queue.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
 #include "storage/proc.h"
@@ -76,6 +77,7 @@
 /* Bits in SyncOneBuffer's return value */
 #define BUF_WRITTEN				0x01
 #define BUF_REUSABLE			0x02
+#define BUF_CANT_MERGE			0x04
 
 #define RELS_BSEARCH_THRESHOLD		20
 
@@ -512,8 +514,6 @@ static void UnpinBuffer(BufferDesc *buf);
 static void UnpinBufferNoOwner(BufferDesc *buf);
 static void BufferSync(int flags);
 static uint32 WaitBufHdrUnlocked(BufferDesc *buf);
-static int	SyncOneBuffer(int buf_id, bool skip_recently_used,
-						  WritebackContext *wb_context);
 static void WaitIO(BufferDesc *buf);
 static void AbortBufferIO(Buffer buffer);
 static void shared_buffer_write_error_callback(void *arg);
@@ -529,6 +529,7 @@ static bool AsyncReadBuffers(ReadBuffersOperation *operation,
 static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context);
 static void FlushBuffer(BufferDesc *buf, SMgrRelation reln,
 						IOObject io_object, IOContext io_context);
+
 static void FindAndDropRelationBuffers(RelFileLocator rlocator,
 									   ForkNumber forkNum,
 									   BlockNumber nForkBlock,
@@ -3151,6 +3152,57 @@ UnpinBufferNoOwner(BufferDesc *buf)
 	}
 }
 
+typedef struct BuffersToWrite
+{
+	int			nbuffers;
+	BufferTag	start_at_tag;
+	uint32		max_combine;
+
+	XLogRecPtr	max_lsn;
+
+	PgAioHandle *ioh;
+	PgAioWaitRef iow;
+
+	uint64		total_writes;
+
+	Buffer		buffers[IOV_MAX];
+	PgAioBounceBuffer *bounce_buffers[IOV_MAX];
+	const void *data_ptrs[IOV_MAX];
+} BuffersToWrite;
+
+static int	PrepareToWriteBuffer(BuffersToWrite *to_write, Buffer buf,
+								 bool skip_recently_used,
+								 IOQueue *ioq, WritebackContext *wb_context);
+
+static void WriteBuffers(BuffersToWrite *to_write,
+						 IOQueue *ioq, WritebackContext *wb_context);
+
+static void
+BuffersToWriteInit(BuffersToWrite *to_write,
+				   IOQueue *ioq, WritebackContext *wb_context)
+{
+	to_write->total_writes = 0;
+	to_write->nbuffers = 0;
+	to_write->ioh = NULL;
+	pgaio_wref_clear(&to_write->iow);
+	to_write->max_lsn = InvalidXLogRecPtr;
+
+	pgaio_enter_batchmode();
+}
+
+static void
+BuffersToWriteEnd(BuffersToWrite *to_write)
+{
+	if (to_write->ioh != NULL)
+	{
+		pgaio_io_release(to_write->ioh);
+		to_write->ioh = NULL;
+	}
+
+	pgaio_exit_batchmode();
+}
+
+
 #define ST_SORT sort_checkpoint_bufferids
 #define ST_ELEMENT_TYPE CkptSortItem
 #define ST_COMPARE(a, b) ckpt_buforder_comparator(a, b)
@@ -3182,7 +3234,10 @@ BufferSync(int flags)
 	binaryheap *ts_heap;
 	int			i;
 	int			mask = BM_DIRTY;
+	IOQueue    *ioq;
 	WritebackContext wb_context;
+	BuffersToWrite to_write;
+	int			max_combine;
 
 	/*
 	 * Unless this is a shutdown checkpoint or we have been explicitly told,
@@ -3244,7 +3299,9 @@ BufferSync(int flags)
 	if (num_to_scan == 0)
 		return;					/* nothing to do */
 
+	ioq = io_queue_create(512, 0);
 	WritebackContextInit(&wb_context, &checkpoint_flush_after);
+	max_combine = Min(io_bounce_buffers, io_combine_limit);
 
 	TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_scan);
 
@@ -3352,48 +3409,91 @@ BufferSync(int flags)
 	 */
 	num_processed = 0;
 	num_written = 0;
+
+	BuffersToWriteInit(&to_write, ioq, &wb_context);
+
 	while (!binaryheap_empty(ts_heap))
 	{
 		BufferDesc *bufHdr = NULL;
 		CkptTsStatus *ts_stat = (CkptTsStatus *)
 			DatumGetPointer(binaryheap_first(ts_heap));
+		bool		batch_continue = true;
 
-		buf_id = CkptBufferIds[ts_stat->index].buf_id;
-		Assert(buf_id != -1);
-
-		bufHdr = GetBufferDescriptor(buf_id);
-
-		num_processed++;
+		Assert(ts_stat->num_scanned <= ts_stat->num_to_scan);
 
 		/*
-		 * We don't need to acquire the lock here, because we're only looking
-		 * at a single bit. It's possible that someone else writes the buffer
-		 * and clears the flag right after we check, but that doesn't matter
-		 * since SyncOneBuffer will then do nothing.  However, there is a
-		 * further race condition: it's conceivable that between the time we
-		 * examine the bit here and the time SyncOneBuffer acquires the lock,
-		 * someone else not only wrote the buffer but replaced it with another
-		 * page and dirtied it.  In that improbable case, SyncOneBuffer will
-		 * write the buffer though we didn't need to.  It doesn't seem worth
-		 * guarding against this, though.
+		 * Collect a batch of buffers to write out from the current
+		 * tablespace. That causes some imbalance between the tablespaces, but
+		 * that's more than outweighed by the efficiency gain due to batching.
 		 */
-		if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED)
+		while (batch_continue &&
+			   to_write.nbuffers < max_combine &&
+			   ts_stat->num_scanned < ts_stat->num_to_scan)
 		{
-			if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN)
+			buf_id = CkptBufferIds[ts_stat->index].buf_id;
+			Assert(buf_id != -1);
+
+			bufHdr = GetBufferDescriptor(buf_id);
+
+			num_processed++;
+
+			/*
+			 * We don't need to acquire the lock here, because we're only
+			 * looking at a single bit. It's possible that someone else writes
+			 * the buffer and clears the flag right after we check, but that
+			 * doesn't matter since SyncOneBuffer will then do nothing.
+			 * However, there is a further race condition: it's conceivable
+			 * that between the time we examine the bit here and the time
+			 * SyncOneBuffer acquires the lock, someone else not only wrote
+			 * the buffer but replaced it with another page and dirtied it. In
+			 * that improbable case, SyncOneBuffer will write the buffer
+			 * though we didn't need to.  It doesn't seem worth guarding
+			 * against this, though.
+			 */
+			if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED)
 			{
-				TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
-				PendingCheckpointerStats.buffers_written++;
-				num_written++;
+				int			result = PrepareToWriteBuffer(&to_write, buf_id + 1, false,
+														  ioq, &wb_context);
+
+				if (result & BUF_CANT_MERGE)
+				{
+					Assert(to_write.nbuffers > 0);
+					WriteBuffers(&to_write, ioq, &wb_context);
+
+					result = PrepareToWriteBuffer(&to_write, buf_id + 1, false,
+												  ioq, &wb_context);
+					Assert(result != BUF_CANT_MERGE);
+				}
+
+				if (result & BUF_WRITTEN)
+				{
+					TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
+					PendingCheckpointerStats.buffers_written++;
+					num_written++;
+				}
+				else
+				{
+					batch_continue = false;
+				}
 			}
+			else
+			{
+				if (to_write.nbuffers > 0)
+					WriteBuffers(&to_write, ioq, &wb_context);
+			}
+
+			/*
+			 * Measure progress independent of actually having to flush the
+			 * buffer - otherwise writing become unbalanced.
+			 */
+			ts_stat->progress += ts_stat->progress_slice;
+			ts_stat->num_scanned++;
+			ts_stat->index++;
 		}
 
-		/*
-		 * Measure progress independent of actually having to flush the buffer
-		 * - otherwise writing become unbalanced.
-		 */
-		ts_stat->progress += ts_stat->progress_slice;
-		ts_stat->num_scanned++;
-		ts_stat->index++;
+		if (to_write.nbuffers > 0)
+			WriteBuffers(&to_write, ioq, &wb_context);
+
 
 		/* Have all the buffers from the tablespace been processed? */
 		if (ts_stat->num_scanned == ts_stat->num_to_scan)
@@ -3411,15 +3511,23 @@ BufferSync(int flags)
 		 *
 		 * (This will check for barrier events even if it doesn't sleep.)
 		 */
-		CheckpointWriteDelay(flags, (double) num_processed / num_to_scan);
+		CheckpointWriteDelay(ioq, flags, (double) num_processed / num_to_scan);
 	}
 
+	Assert(to_write.nbuffers == 0);
+	io_queue_wait_all(ioq);
+
 	/*
 	 * Issue all pending flushes. Only checkpointer calls BufferSync(), so
 	 * IOContext will always be IOCONTEXT_NORMAL.
 	 */
 	IssuePendingWritebacks(&wb_context, IOCONTEXT_NORMAL);
 
+	io_queue_wait_all(ioq);		/* IssuePendingWritebacks might have added
+								 * more */
+	io_queue_free(ioq);
+	BuffersToWriteEnd(&to_write);
+
 	pfree(per_ts_stat);
 	per_ts_stat = NULL;
 	binaryheap_free(ts_heap);
@@ -3445,7 +3553,7 @@ BufferSync(int flags)
  * bgwriter_lru_maxpages to 0.)
  */
 bool
-BgBufferSync(WritebackContext *wb_context)
+BgBufferSync(IOQueue *ioq, WritebackContext *wb_context)
 {
 	/* info obtained from freelist.c */
 	int			strategy_buf_id;
@@ -3488,6 +3596,9 @@ BgBufferSync(WritebackContext *wb_context)
 	long		new_strategy_delta;
 	uint32		new_recent_alloc;
 
+	BuffersToWrite to_write;
+	int			max_combine;
+
 	/*
 	 * Find out where the freelist clock sweep currently is, and how many
 	 * buffer allocations have happened since our last call.
@@ -3508,6 +3619,8 @@ BgBufferSync(WritebackContext *wb_context)
 		return true;
 	}
 
+	max_combine = Min(io_bounce_buffers, io_combine_limit);
+
 	/*
 	 * Compute strategy_delta = how many buffers have been scanned by the
 	 * clock sweep since last time.  If first time through, assume none. Then
@@ -3664,11 +3777,25 @@ BgBufferSync(WritebackContext *wb_context)
 	num_written = 0;
 	reusable_buffers = reusable_buffers_est;
 
+	BuffersToWriteInit(&to_write, ioq, wb_context);
+
 	/* Execute the LRU scan */
 	while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est)
 	{
-		int			sync_state = SyncOneBuffer(next_to_clean, true,
-											   wb_context);
+		int			sync_state;
+
+		sync_state = PrepareToWriteBuffer(&to_write, next_to_clean + 1,
+										  true, ioq, wb_context);
+		if (sync_state & BUF_CANT_MERGE)
+		{
+			Assert(to_write.nbuffers > 0);
+
+			WriteBuffers(&to_write, ioq, wb_context);
+
+			sync_state = PrepareToWriteBuffer(&to_write, next_to_clean + 1,
+											  true, ioq, wb_context);
+			Assert(sync_state != BUF_CANT_MERGE);
+		}
 
 		if (++next_to_clean >= NBuffers)
 		{
@@ -3679,6 +3806,13 @@ BgBufferSync(WritebackContext *wb_context)
 
 		if (sync_state & BUF_WRITTEN)
 		{
+			Assert(sync_state & BUF_REUSABLE);
+
+			if (to_write.nbuffers == max_combine)
+			{
+				WriteBuffers(&to_write, ioq, wb_context);
+			}
+
 			reusable_buffers++;
 			if (++num_written >= bgwriter_lru_maxpages)
 			{
@@ -3690,6 +3824,11 @@ BgBufferSync(WritebackContext *wb_context)
 			reusable_buffers++;
 	}
 
+	if (to_write.nbuffers > 0)
+		WriteBuffers(&to_write, ioq, wb_context);
+
+	BuffersToWriteEnd(&to_write);
+
 	PendingBgWriterStats.buf_written_clean += num_written;
 
 #ifdef BGW_DEBUG
@@ -3728,8 +3867,66 @@ BgBufferSync(WritebackContext *wb_context)
 	return (bufs_to_lap == 0 && recent_alloc == 0);
 }
 
+static inline bool
+BufferTagsSameRel(const BufferTag *tag1, const BufferTag *tag2)
+{
+	return (tag1->spcOid == tag2->spcOid) &&
+		(tag1->dbOid == tag2->dbOid) &&
+		(tag1->relNumber == tag2->relNumber) &&
+		(tag1->forkNum == tag2->forkNum)
+		;
+}
+
+static bool
+CanMergeWrite(BuffersToWrite *to_write, BufferDesc *cur_buf_hdr)
+{
+	BlockNumber cur_block = cur_buf_hdr->tag.blockNum;
+
+	Assert(to_write->nbuffers > 0); /* can't merge with nothing */
+	Assert(to_write->start_at_tag.relNumber != InvalidOid);
+	Assert(to_write->start_at_tag.blockNum != InvalidBlockNumber);
+
+	Assert(to_write->ioh != NULL);
+
+	/*
+	 * First check if the blocknumber is one that we could actually merge,
+	 * that's cheaper than checking the tablespace/db/relnumber/fork match.
+	 */
+	if (to_write->start_at_tag.blockNum + to_write->nbuffers != cur_block)
+		return false;
+
+	if (!BufferTagsSameRel(&to_write->start_at_tag, &cur_buf_hdr->tag))
+		return false;
+
+	/*
+	 * Need to check with smgr how large a write we're allowed to make. To
+	 * reduce the overhead of the smgr check, only inquire once, when
+	 * processing the first to-be-merged buffer. That avoids the overhead in
+	 * the common case of writing out buffers that definitely not mergeable.
+	 */
+	if (to_write->nbuffers == 1)
+	{
+		SMgrRelation smgr;
+
+		smgr = smgropen(BufTagGetRelFileLocator(&to_write->start_at_tag), INVALID_PROC_NUMBER);
+
+		to_write->max_combine = smgrmaxcombine(smgr,
+											   to_write->start_at_tag.forkNum,
+											   to_write->start_at_tag.blockNum);
+	}
+	else
+	{
+		Assert(to_write->max_combine > 0);
+	}
+
+	if (to_write->start_at_tag.blockNum + to_write->max_combine <= cur_block)
+		return false;
+
+	return true;
+}
+
 /*
- * SyncOneBuffer -- process a single buffer during syncing.
+ * PrepareToWriteBuffer -- process a single buffer during syncing.
  *
  * If skip_recently_used is true, we don't write currently-pinned buffers, nor
  * buffers marked recently used, as these are not replacement candidates.
@@ -3738,22 +3935,50 @@ BgBufferSync(WritebackContext *wb_context)
  *	BUF_WRITTEN: we wrote the buffer.
  *	BUF_REUSABLE: buffer is available for replacement, ie, it has
  *		pin count 0 and usage count 0.
+ *	BUF_CANT_MERGE: can't combine this write with prior writes, caller needs
+ *		to issue those first
  *
  * (BUF_WRITTEN could be set in error if FlushBuffer finds the buffer clean
  * after locking it, but we don't care all that much.)
  */
 static int
-SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context)
+PrepareToWriteBuffer(BuffersToWrite *to_write, Buffer buf,
+					 bool skip_recently_used,
+					 IOQueue *ioq, WritebackContext *wb_context)
 {
-	BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
+	BufferDesc *cur_buf_hdr = GetBufferDescriptor(buf - 1);
+	uint32		buf_state;
 	int			result = 0;
-	uint32		buf_state;
-	BufferTag	tag;
+	XLogRecPtr	cur_buf_lsn;
+	LWLock	   *content_lock;
+	bool		may_block;
+
+	/*
+	 * Check if this buffer can be written out together with already prepared
+	 * writes. We check before we have pinned the buffer, so the buffer can be
+	 * written out and replaced between this check and us pinning the buffer -
+	 * we'll recheck below. The reason for the pre-check is that we don't want
+	 * to pin the buffer just to find out that we can't merge the IO.
+	 */
+	if (to_write->nbuffers != 0)
+	{
+		if (!CanMergeWrite(to_write, cur_buf_hdr))
+		{
+			result |= BUF_CANT_MERGE;
+			return result;
+		}
+	}
+	else
+	{
+		to_write->start_at_tag = cur_buf_hdr->tag;
+	}
 
 	/* Make sure we can handle the pin */
 	ReservePrivateRefCountEntry();
 	ResourceOwnerEnlarge(CurrentResourceOwner);
 
+	/* XXX: Should also check if we are allowed to pin one more buffer */
+
 	/*
 	 * Check whether buffer needs writing.
 	 *
@@ -3763,7 +3988,7 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context)
 	 * don't worry because our checkpoint.redo points before log record for
 	 * upcoming changes and so we are not required to write such dirty buffer.
 	 */
-	buf_state = LockBufHdr(bufHdr);
+	buf_state = LockBufHdr(cur_buf_hdr);
 
 	if (BUF_STATE_GET_REFCOUNT(buf_state) == 0 &&
 		BUF_STATE_GET_USAGECOUNT(buf_state) == 0)
@@ -3772,40 +3997,294 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context)
 	}
 	else if (skip_recently_used)
 	{
+#if 0
+		elog(LOG, "at block %d: skip recent with nbuffers %d",
+			 cur_buf_hdr->tag.blockNum, to_write->nbuffers);
+#endif
 		/* Caller told us not to write recently-used buffers */
-		UnlockBufHdr(bufHdr, buf_state);
+		UnlockBufHdr(cur_buf_hdr, buf_state);
 		return result;
 	}
 
 	if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY))
 	{
 		/* It's clean, so nothing to do */
-		UnlockBufHdr(bufHdr, buf_state);
+		UnlockBufHdr(cur_buf_hdr, buf_state);
 		return result;
 	}
 
+	/* pin the buffer, from now on its identity can't change anymore */
+	PinBuffer_Locked(cur_buf_hdr);
+
+	/*
+	 * Acquire IO, if needed, now that it's likely that we'll need to write.
+	 */
+	if (to_write->ioh == NULL)
+	{
+		/* otherwise we should already have acquired a handle */
+		Assert(to_write->nbuffers == 0);
+
+		to_write->ioh = io_queue_acquire_io(ioq);
+		pgaio_io_get_wref(to_write->ioh, &to_write->iow);
+	}
+
 	/*
-	 * Pin it, share-lock it, write it.  (FlushBuffer will do nothing if the
-	 * buffer is clean by the time we've locked it.)
+	 * If we are merging, check if the buffer's identity possibly changed
+	 * while we hadn't yet pinned it.
+	 *
+	 * XXX: It might be worth checking if we still want to write the buffer
+	 * out, e.g. it could have been replaced with a buffer that doesn't have
+	 * BM_CHECKPOINT_NEEDED set.
 	 */
-	PinBuffer_Locked(bufHdr);
-	LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
+	if (to_write->nbuffers != 0)
+	{
+		if (!CanMergeWrite(to_write, cur_buf_hdr))
+		{
+			elog(LOG, "changed identity");
+			UnpinBuffer(cur_buf_hdr);
+
+			result |= BUF_CANT_MERGE;
+
+			return result;
+		}
+	}
+
+	may_block = to_write->nbuffers == 0
+		&& !pgaio_have_staged()
+		&& io_queue_is_empty(ioq)
+		;
+	content_lock = BufferDescriptorGetContentLock(cur_buf_hdr);
+
+	if (!may_block)
+	{
+		if (LWLockConditionalAcquire(content_lock, LW_SHARED))
+		{
+			/* done */
+		}
+		else if (to_write->nbuffers == 0)
+		{
+			/*
+			 * Need to wait for all prior IO to finish before blocking for
+			 * lock acquisition, to avoid the risk a deadlock due to us
+			 * waiting for another backend that is waiting for our unsubmitted
+			 * IO to complete.
+			 */
+			pgaio_submit_staged();
+			io_queue_wait_all(ioq);
+
+			elog(DEBUG2, "at block %u: can't block, nbuffers = 0",
+				 cur_buf_hdr->tag.blockNum
+				);
+
+			may_block = to_write->nbuffers == 0
+				&& !pgaio_have_staged()
+				&& io_queue_is_empty(ioq)
+				;
+			Assert(may_block);
+
+			LWLockAcquire(content_lock, LW_SHARED);
+		}
+		else
+		{
+			elog(DEBUG2, "at block %d: can't block nbuffers = %d",
+				 cur_buf_hdr->tag.blockNum,
+				 to_write->nbuffers);
 
-	FlushBuffer(bufHdr, NULL, IOOBJECT_RELATION, IOCONTEXT_NORMAL);
+			UnpinBuffer(cur_buf_hdr);
+			result |= BUF_CANT_MERGE;
+			Assert(to_write->nbuffers > 0);
 
-	LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
+			return result;
+		}
+	}
+	else
+	{
+		LWLockAcquire(content_lock, LW_SHARED);
+	}
 
-	tag = bufHdr->tag;
+	if (!may_block)
+	{
+		if (!StartBufferIO(cur_buf_hdr, false, !may_block))
+		{
+			pgaio_submit_staged();
+			io_queue_wait_all(ioq);
 
-	UnpinBuffer(bufHdr);
+			may_block = io_queue_is_empty(ioq) && to_write->nbuffers == 0 && !pgaio_have_staged();
+
+			if (!StartBufferIO(cur_buf_hdr, false, !may_block))
+			{
+				elog(DEBUG2, "at block %d: non-waitable StartBufferIO returns false, %d",
+					 cur_buf_hdr->tag.blockNum,
+					 may_block);
+
+				/*
+				 * FIXME: can't tell whether this is because the buffer has
+				 * been cleaned
+				 */
+				if (!may_block)
+				{
+					result |= BUF_CANT_MERGE;
+					Assert(to_write->nbuffers > 0);
+				}
+				LWLockRelease(content_lock);
+				UnpinBuffer(cur_buf_hdr);
+
+				return result;
+			}
+		}
+	}
+	else
+	{
+		if (!StartBufferIO(cur_buf_hdr, false, false))
+		{
+			elog(DEBUG2, "waitable StartBufferIO returns false");
+			LWLockRelease(content_lock);
+			UnpinBuffer(cur_buf_hdr);
+
+			/*
+			 * FIXME: Historically we returned BUF_WRITTEN in this case, which
+			 * seems wrong
+			 */
+			return result;
+		}
+	}
 
 	/*
-	 * SyncOneBuffer() is only called by checkpointer and bgwriter, so
-	 * IOContext will always be IOCONTEXT_NORMAL.
+	 * Run PageGetLSN while holding header lock, since we don't have the
+	 * buffer locked exclusively in all cases.
 	 */
-	ScheduleBufferTagForWriteback(wb_context, IOCONTEXT_NORMAL, &tag);
+	buf_state = LockBufHdr(cur_buf_hdr);
+
+	cur_buf_lsn = BufferGetLSN(cur_buf_hdr);
+
+	/* To check if block content changes while flushing. - vadim 01/17/97 */
+	buf_state &= ~BM_JUST_DIRTIED;
+
+	UnlockBufHdr(cur_buf_hdr, buf_state);
+
+	to_write->buffers[to_write->nbuffers] = buf;
+	to_write->nbuffers++;
+
+	if (buf_state & BM_PERMANENT &&
+		(to_write->max_lsn == InvalidXLogRecPtr || to_write->max_lsn < cur_buf_lsn))
+	{
+		to_write->max_lsn = cur_buf_lsn;
+	}
+
+	result |= BUF_WRITTEN;
+
+	return result;
+}
+
+static void
+WriteBuffers(BuffersToWrite *to_write,
+			 IOQueue *ioq, WritebackContext *wb_context)
+{
+	SMgrRelation smgr;
+	Buffer		first_buf;
+	BufferDesc *first_buf_hdr;
+	bool		needs_checksum;
+
+	Assert(to_write->nbuffers > 0 && to_write->nbuffers <= io_combine_limit);
+
+	first_buf = to_write->buffers[0];
+	first_buf_hdr = GetBufferDescriptor(first_buf - 1);
+
+	smgr = smgropen(BufTagGetRelFileLocator(&first_buf_hdr->tag), INVALID_PROC_NUMBER);
+
+	/*
+	 * Force XLOG flush up to buffer's LSN.  This implements the basic WAL
+	 * rule that log updates must hit disk before any of the data-file changes
+	 * they describe do.
+	 *
+	 * However, this rule does not apply to unlogged relations, which will be
+	 * lost after a crash anyway.  Most unlogged relation pages do not bear
+	 * LSNs since we never emit WAL records for them, and therefore flushing
+	 * up through the buffer LSN would be useless, but harmless.  However,
+	 * GiST indexes use LSNs internally to track page-splits, and therefore
+	 * unlogged GiST pages bear "fake" LSNs generated by
+	 * GetFakeLSNForUnloggedRel.  It is unlikely but possible that the fake
+	 * LSN counter could advance past the WAL insertion point; and if it did
+	 * happen, attempting to flush WAL through that location would fail, with
+	 * disastrous system-wide consequences.  To make sure that can't happen,
+	 * skip the flush if the buffer isn't permanent.
+	 */
+	if (to_write->max_lsn != InvalidXLogRecPtr)
+		XLogFlush(to_write->max_lsn);
+
+	/*
+	 * Now it's safe to write buffer to disk. Note that no one else should
+	 * have been able to write it while we were busy with log flushing because
+	 * only one process at a time can set the BM_IO_IN_PROGRESS bit.
+	 */
+
+	for (int nbuf = 0; nbuf < to_write->nbuffers; nbuf++)
+	{
+		Buffer		cur_buf = to_write->buffers[nbuf];
+		BufferDesc *cur_buf_hdr = GetBufferDescriptor(cur_buf - 1);
+		Block		bufBlock;
+		char	   *bufToWrite;
+
+		bufBlock = BufHdrGetBlock(cur_buf_hdr);
+		needs_checksum = PageNeedsChecksumCopy((Page) bufBlock);
+
+		/*
+		 * Update page checksum if desired.  Since we have only shared lock on
+		 * the buffer, other processes might be updating hint bits in it, so
+		 * we must copy the page to a bounce buffer if we do checksumming.
+		 */
+		if (needs_checksum)
+		{
+			PgAioBounceBuffer *bb = pgaio_bounce_buffer_get();
+
+			pgaio_io_assoc_bounce_buffer(to_write->ioh, bb);
+
+			bufToWrite = pgaio_bounce_buffer_buffer(bb);
+			memcpy(bufToWrite, bufBlock, BLCKSZ);
+			PageSetChecksumInplace((Page) bufToWrite, cur_buf_hdr->tag.blockNum);
+		}
+		else
+		{
+			bufToWrite = bufBlock;
+		}
+
+		to_write->data_ptrs[nbuf] = bufToWrite;
+	}
+
+	pgaio_io_set_handle_data_32(to_write->ioh,
+								(uint32 *) to_write->buffers,
+								to_write->nbuffers);
+	pgaio_io_register_callbacks(to_write->ioh, PGAIO_HCB_SHARED_BUFFER_WRITEV, 0);
+
+	smgrstartwritev(to_write->ioh, smgr,
+					BufTagGetForkNum(&first_buf_hdr->tag),
+					first_buf_hdr->tag.blockNum,
+					to_write->data_ptrs,
+					to_write->nbuffers,
+					false);
+	pgstat_count_io_op(IOOBJECT_RELATION, IOCONTEXT_NORMAL,
+					   IOOP_WRITE, 1, BLCKSZ * to_write->nbuffers);
+
+
+	for (int nbuf = 0; nbuf < to_write->nbuffers; nbuf++)
+	{
+		Buffer		cur_buf = to_write->buffers[nbuf];
+		BufferDesc *cur_buf_hdr = GetBufferDescriptor(cur_buf - 1);
+
+		UnpinBuffer(cur_buf_hdr);
+	}
+
+	io_queue_track(ioq, &to_write->iow);
+	to_write->total_writes++;
 
-	return result | BUF_WRITTEN;
+	/* clear state for next write */
+	to_write->nbuffers = 0;
+	to_write->start_at_tag.relNumber = InvalidOid;
+	to_write->start_at_tag.blockNum = InvalidBlockNumber;
+	to_write->max_combine = 0;
+	to_write->max_lsn = InvalidXLogRecPtr;
+	to_write->ioh = NULL;
+	pgaio_wref_clear(&to_write->iow);
 }
 
 /*
@@ -4179,6 +4658,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	error_context_stack = errcallback.previous;
 }
 
+
 /*
  * RelationGetNumberOfBlocksInFork
  *		Determines the current number of pages in the specified relation fork.
diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c
index ecc81aacfc3..71dc6f559dd 100644
--- a/src/backend/storage/page/bufpage.c
+++ b/src/backend/storage/page/bufpage.c
@@ -1480,6 +1480,16 @@ PageIndexTupleOverwrite(Page page, OffsetNumber offnum,
 	return true;
 }
 
+bool
+PageNeedsChecksumCopy(Page page)
+{
+	if (PageIsNew(page))
+		return false;
+
+	/* If we don't need a checksum, just return the passed-in data */
+	return DataChecksumsEnabled();
+}
+
 
 /*
  * Set checksum for a page in shared buffers.
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 92ccd2e0514..c29a17a269f 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -348,6 +348,7 @@ BufferManagerRelation
 BufferStrategyControl
 BufferTag
 BufferUsage
+BuffersToWrite
 BuildAccumulator
 BuiltinScript
 BulkInsertState
-- 
2.48.1.76.g4e746b1a31.dirty

