From 7cdb018181f0f71bf9651f7abc427c675bc484cc Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Tue, 18 Mar 2025 14:40:06 -0400
Subject: [PATCH v2.14 22/29] bufmgr: Implement AIO write support

As of this commit there are no users of these AIO facilities, that'll come in
later commits.

Problems with AIO writes:

- Write logic needs to be rebased on-top of the patch series to not hit bit
  dirty buffers while IO is going on

  The performance impact of doing the memory copies is rather substantial, as
  on intel memory bandwidth is *the* IO bottleneck even just for the checksum
  computation, without a copy. That makes the memory copy for something like
  bounce buffers hurt really badly.

  And the memory usage of bounce buffers is also really concerning.

  And even without checksums, several filesystems *really* don't like buffers
  getting modified during DIO writes. Which I think would mean we ought to use
  bounce buffers for *all* writes, which would impose a *very* substantial
  overhead (basically removing the benefit of DMA happening off-cpu).

- I think it requires new lwlock.c infrastructure (as v1 of aio had), to make
  LockBuffer(BUFFER_LOCK_EXCLUSIVE) etc wait in a concurrency safe manner for
  in-progress writes

  I can think of ways to solve this purely in bufmgr.c, but only in ways that
  would cause other problems (e.g. setting BM_IO_IN_PROGRESS before waiting
  for an exclusive lock) and/or expensive.
---
 src/include/storage/aio.h              |   4 +-
 src/include/storage/bufmgr.h           |   2 +
 src/backend/storage/aio/aio_callback.c |   2 +
 src/backend/storage/buffer/bufmgr.c    | 188 ++++++++++++++++++++++++-
 4 files changed, 190 insertions(+), 6 deletions(-)

diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h
index f91f0afc5a5..72d5680e767 100644
--- a/src/include/storage/aio.h
+++ b/src/include/storage/aio.h
@@ -197,11 +197,13 @@ typedef enum PgAioHandleCallbackID
 	PGAIO_HCB_MD_WRITEV,
 
 	PGAIO_HCB_SHARED_BUFFER_READV,
+	PGAIO_HCB_SHARED_BUFFER_WRITEV,
 
 	PGAIO_HCB_LOCAL_BUFFER_READV,
+	PGAIO_HCB_LOCAL_BUFFER_WRITEV,
 } PgAioHandleCallbackID;
 
-#define PGAIO_HCB_MAX	PGAIO_HCB_LOCAL_BUFFER_READV
+#define PGAIO_HCB_MAX	PGAIO_HCB_LOCAL_BUFFER_WRITEV
 StaticAssertDecl(PGAIO_HCB_MAX <= (1 << PGAIO_RESULT_ID_BITS),
 				 "PGAIO_HCB_MAX is too big for PGAIO_RESULT_ID_BITS");
 
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index f2192ceb271..492feab0cb5 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -174,7 +174,9 @@ extern PGDLLIMPORT int backend_flush_after;
 extern PGDLLIMPORT int bgwriter_flush_after;
 
 extern const PgAioHandleCallbacks aio_shared_buffer_readv_cb;
+extern const PgAioHandleCallbacks aio_shared_buffer_writev_cb;
 extern const PgAioHandleCallbacks aio_local_buffer_readv_cb;
+extern const PgAioHandleCallbacks aio_local_buffer_writev_cb;
 
 /* in buf_init.c */
 extern PGDLLIMPORT char *BufferBlocks;
diff --git a/src/backend/storage/aio/aio_callback.c b/src/backend/storage/aio/aio_callback.c
index 67026f5f6b4..f9c4599ee21 100644
--- a/src/backend/storage/aio/aio_callback.c
+++ b/src/backend/storage/aio/aio_callback.c
@@ -44,8 +44,10 @@ static const PgAioHandleCallbacksEntry aio_handle_cbs[] = {
 	CALLBACK_ENTRY(PGAIO_HCB_MD_WRITEV, aio_md_writev_cb),
 
 	CALLBACK_ENTRY(PGAIO_HCB_SHARED_BUFFER_READV, aio_shared_buffer_readv_cb),
+	CALLBACK_ENTRY(PGAIO_HCB_SHARED_BUFFER_WRITEV, aio_shared_buffer_writev_cb),
 
 	CALLBACK_ENTRY(PGAIO_HCB_LOCAL_BUFFER_READV, aio_local_buffer_readv_cb),
+	CALLBACK_ENTRY(PGAIO_HCB_LOCAL_BUFFER_WRITEV, aio_local_buffer_writev_cb),
 #undef CALLBACK_ENTRY
 };
 
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index c65d3916211..688d74f997c 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -5536,7 +5536,15 @@ LockBuffer(Buffer buffer, int mode)
 	else if (mode == BUFFER_LOCK_SHARE)
 		LWLockAcquire(BufferDescriptorGetContentLock(buf), LW_SHARED);
 	else if (mode == BUFFER_LOCK_EXCLUSIVE)
+	{
+		/*
+		 * FIXME: Wait for AIO writes, otherwise there would be a risk of
+		 * deadlock. This isn't entirely trivial to do in a race-free way, IO
+		 * could be started between us checking whether there is IO and
+		 * enqueueing ourselves for the lock.
+		 */
 		LWLockAcquire(BufferDescriptorGetContentLock(buf), LW_EXCLUSIVE);
+	}
 	else
 		elog(ERROR, "unrecognized buffer lock mode: %d", mode);
 }
@@ -5551,6 +5559,19 @@ ConditionalLockBuffer(Buffer buffer)
 {
 	BufferDesc *buf;
 
+	/*
+	 * FIXME: Wait for AIO writes. Some code does not deal well
+	 * ConditionalLockBuffer() continuously failing, e.g.
+	 * spginsert()->spgdoinsert() ends up busy-looping (easily reproducible by
+	 * just making this function always fail and running the regression
+	 * tests). While that code could be fixed, it'd be hard to find all
+	 * problematic places.
+	 *
+	 * It would be OK to wait for the IO as waiting for IO completion does not
+	 * need to wait for any locks that could lead to an undetected deadlock or
+	 * such.
+	 */
+
 	Assert(BufferIsPinned(buffer));
 	if (BufferIsLocal(buffer))
 		return true;			/* act as though we got it */
@@ -5614,9 +5635,8 @@ LockBufferForCleanup(Buffer buffer)
 	CheckBufferIsPinnedOnce(buffer);
 
 	/*
-	 * We do not yet need to be worried about in-progress AIOs holding a pin,
-	 * as we only support doing reads via AIO and this function can only be
-	 * called once the buffer is valid (i.e. no read can be in flight).
+	 * FIXME: See AIO related comments in LockBuffer() and
+	 * ConditionalLockBuffer()
 	 */
 
 	/* Nobody else to wait for */
@@ -5629,6 +5649,11 @@ LockBufferForCleanup(Buffer buffer)
 	{
 		uint32		buf_state;
 
+		/*
+		 * FIXME: LockBuffer()'s handling of in-progress writes (once
+		 * implemented) should suffice to deal with deadlock risk.
+		 */
+
 		/* Try to acquire lock */
 		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 		buf_state = LockBufHdr(bufHdr);
@@ -5776,7 +5801,13 @@ ConditionalLockBufferForCleanup(Buffer buffer)
 
 	Assert(BufferIsValid(buffer));
 
-	/* see AIO related comment in LockBufferForCleanup() */
+	/*
+	 * FIXME: Should wait for IO for the same reason as in
+	 * ConditionalLockBuffer(). Needs to happen before the
+	 * ConditionalLockBuffer() call below, as we'd never reach the
+	 * ConditionalLockBuffer() call due the buffer pin held for the duration
+	 * of the IO.
+	 */
 
 	if (BufferIsLocal(buffer))
 	{
@@ -5833,7 +5864,10 @@ IsBufferCleanupOK(Buffer buffer)
 
 	Assert(BufferIsValid(buffer));
 
-	/* see AIO related comment in LockBufferForCleanup() */
+	/*
+	 * FIXME: See AIO related comments in LockBuffer() and
+	 * ConditionalLockBuffer()
+	 */
 
 	if (BufferIsLocal(buffer))
 	{
@@ -7133,12 +7167,129 @@ buffer_readv_report(PgAioResult result, const PgAioTargetData *td,
 			affected_count > 1 ? errhint_internal(hint_mult, affected_count - 1) : 0);
 }
 
+/*
+ * Helper for AIO writev completion callbacks, supporting both shared and temp
+ * buffers. Gets called once for each buffer in a multi-page write.
+ */
+static pg_attribute_always_inline PgAioResult
+buffer_writev_complete_one(uint8 buf_off, Buffer buffer, uint8 flags,
+						   bool failed, bool is_temp)
+{
+	BufferDesc *buf_hdr = is_temp ?
+		GetLocalBufferDescriptor(-buffer - 1)
+		: GetBufferDescriptor(buffer - 1);
+	PgAioResult result = {.status = PGAIO_RS_OK};
+	bool		clear_dirty;
+	uint32		set_flag_bits;
+
+#ifdef USE_ASSERT_CHECKING
+	{
+		uint32		buf_state = pg_atomic_read_u32(&buf_hdr->state);
+
+		Assert(buf_state & BM_VALID);
+		Assert(buf_state & BM_TAG_VALID);
+		/* temp buffers don't use BM_IO_IN_PROGRESS */
+		if (!is_temp)
+			Assert(buf_state & BM_IO_IN_PROGRESS);
+		Assert(buf_state & BM_DIRTY);
+	}
+#endif
+
+	clear_dirty = failed ? false : true;
+	set_flag_bits = failed ? BM_IO_ERROR : 0;
+
+	if (is_temp)
+		TerminateLocalBufferIO(buf_hdr, clear_dirty, set_flag_bits, true);
+	else
+		TerminateBufferIO(buf_hdr, clear_dirty, set_flag_bits, false, true);
+
+	/*
+	 * The initiator of IO is not managing the lock (i.e. we called
+	 * LWLockDisown()), we are.
+	 */
+	if (!is_temp)
+		LWLockReleaseDisowned(BufferDescriptorGetContentLock(buf_hdr),
+							  LW_SHARED);
+
+	/* FIXME: tracepoint */
+
+	return result;
+}
+
+/*
+ * Perform completion handling of a single AIO write. This write may cover
+ * multiple blocks / buffers.
+ *
+ * Shared between shared and local buffers, to reduce code duplication.
+ */
+static pg_attribute_always_inline PgAioResult
+buffer_writev_complete(PgAioHandle *ioh, PgAioResult prior_result,
+					   uint8 cb_data, bool is_temp)
+{
+	PgAioResult result = prior_result;
+	PgAioTargetData *td = pgaio_io_get_target_data(ioh);
+	uint64	   *io_data;
+	uint8		handle_data_len;
+
+	if (is_temp)
+	{
+		Assert(td->smgr.is_temp);
+		Assert(pgaio_io_get_owner(ioh) == MyProcNumber);
+	}
+	else
+		Assert(!td->smgr.is_temp);
+
+	/*
+	 * Iterate over all the buffers affected by this IO and call appropriate
+	 * per-buffer completion function for each buffer.
+	 */
+	io_data = pgaio_io_get_handle_data(ioh, &handle_data_len);
+	for (uint8 buf_off = 0; buf_off < handle_data_len; buf_off++)
+	{
+		Buffer		buf = io_data[buf_off];
+		PgAioResult buf_result;
+		bool		failed;
+
+		Assert(BufferIsValid(buf));
+
+		/*
+		 * If the entire failed on a lower-level, each buffer needs to be
+		 * marked as failed. In case of a partial read, some buffers may be
+		 * ok.
+		 */
+		failed =
+			prior_result.status == PGAIO_RS_ERROR
+			|| prior_result.result <= buf_off;
+
+		buf_result = buffer_writev_complete_one(buf_off, buf, cb_data, failed,
+												is_temp);
+
+		/*
+		 * If there wasn't any prior error and the IO for this page failed in
+		 * some form, set the whole IO's to the page's result.
+		 */
+		if (result.status != PGAIO_RS_ERROR && buf_result.status != PGAIO_RS_OK)
+		{
+			result = buf_result;
+			pgaio_result_report(result, td, LOG);
+		}
+	}
+
+	return result;
+}
+
 static void
 shared_buffer_readv_stage(PgAioHandle *ioh, uint8 cb_data)
 {
 	buffer_stage_common(ioh, false, false);
 }
 
+static void
+shared_buffer_writev_stage(PgAioHandle *ioh, uint8 cb_data)
+{
+	buffer_stage_common(ioh, true, false);
+}
+
 static PgAioResult
 shared_buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result,
 							 uint8 cb_data)
@@ -7182,6 +7333,13 @@ shared_buffer_readv_complete_local(PgAioHandle *ioh, PgAioResult prior_result,
 	return prior_result;
 }
 
+static PgAioResult
+shared_buffer_writev_complete(PgAioHandle *ioh, PgAioResult prior_result,
+							  uint8 cb_data)
+{
+	return buffer_writev_complete(ioh, prior_result, cb_data, false);
+}
+
 static void
 local_buffer_readv_stage(PgAioHandle *ioh, uint8 cb_data)
 {
@@ -7195,6 +7353,17 @@ local_buffer_readv_complete(PgAioHandle *ioh, PgAioResult prior_result,
 	return buffer_readv_complete(ioh, prior_result, cb_data, true);
 }
 
+static void
+local_buffer_writev_stage(PgAioHandle *ioh, uint8 cb_data)
+{
+	/*
+	 * Currently this is unreachable as the only write support is for
+	 * checkpointer / bgwriter, which don't deal with local buffers.
+	 */
+	elog(ERROR, "should be unreachable");
+}
+
+
 /* readv callback is passed READ_BUFFERS_* flags as callback data */
 const PgAioHandleCallbacks aio_shared_buffer_readv_cb = {
 	.stage = shared_buffer_readv_stage,
@@ -7204,6 +7373,11 @@ const PgAioHandleCallbacks aio_shared_buffer_readv_cb = {
 	.report = buffer_readv_report,
 };
 
+const PgAioHandleCallbacks aio_shared_buffer_writev_cb = {
+	.stage = shared_buffer_writev_stage,
+	.complete_shared = shared_buffer_writev_complete,
+};
+
 /* readv callback is passed READ_BUFFERS_* flags as callback data */
 const PgAioHandleCallbacks aio_local_buffer_readv_cb = {
 	.stage = local_buffer_readv_stage,
@@ -7217,3 +7391,7 @@ const PgAioHandleCallbacks aio_local_buffer_readv_cb = {
 	.complete_local = local_buffer_readv_complete,
 	.report = buffer_readv_report,
 };
+
+const PgAioHandleCallbacks aio_local_buffer_writev_cb = {
+	.stage = local_buffer_writev_stage,
+};
-- 
2.48.1.76.g4e746b1a31.dirty

