From 517e55c26298decd26eab0dec4da220aa084ad35 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Wed, 12 Feb 2025 14:19:20 -0500
Subject: [PATCH v2.4 20/29] bufmgr: Use aio for StartReadBuffers()

Author:
Reviewed-By:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/include/storage/bufmgr.h        |   7 +
 src/backend/storage/buffer/bufmgr.c | 411 +++++++++++++++++++++-------
 2 files changed, 317 insertions(+), 101 deletions(-)

diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index efba4d88d7d..dc8fe197d6f 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -15,6 +15,7 @@
 #define BUFMGR_H
 
 #include "port/pg_iovec.h"
+#include "storage/aio_types.h"
 #include "storage/block.h"
 #include "storage/buf.h"
 #include "storage/bufpage.h"
@@ -111,6 +112,9 @@ typedef struct BufferManagerRelation
 #define READ_BUFFERS_ZERO_ON_ERROR (1 << 0)
 /* Call smgrprefetch() if I/O necessary. */
 #define READ_BUFFERS_ISSUE_ADVICE (1 << 1)
+/* IO will immediately be waited for */
+#define READ_BUFFERS_SYNCHRONOUSLY (1 << 2)
+
 
 struct ReadBuffersOperation
 {
@@ -130,6 +134,9 @@ struct ReadBuffersOperation
 	BlockNumber blocknum;
 	int			flags;
 	int16		nblocks;
+
+	PgAioWaitRef io_wref;
+	PgAioReturn io_return;
 };
 
 typedef struct ReadBuffersOperation ReadBuffersOperation;
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 96b54f7abdf..ee9a9f70167 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -529,6 +529,8 @@ static inline BufferDesc *BufferAlloc(SMgrRelation smgr,
 									  BlockNumber blockNum,
 									  BufferAccessStrategy strategy,
 									  bool *foundPtr, IOContext io_context);
+static bool AsyncReadBuffers(ReadBuffersOperation *operation,
+							 int *nblocks);
 static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context);
 static void FlushBuffer(BufferDesc *buf, SMgrRelation reln,
 						IOObject io_object, IOContext io_context);
@@ -1237,10 +1239,9 @@ ReadBuffer_common(Relation rel, SMgrRelation smgr, char smgr_persistence,
 		return buffer;
 	}
 
+	flags = READ_BUFFERS_SYNCHRONOUSLY;
 	if (mode == RBM_ZERO_ON_ERROR)
-		flags = READ_BUFFERS_ZERO_ON_ERROR;
-	else
-		flags = 0;
+		flags |= READ_BUFFERS_ZERO_ON_ERROR;
 	operation.smgr = smgr;
 	operation.rel = rel;
 	operation.persistence = persistence;
@@ -1268,6 +1269,7 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 
 	Assert(*nblocks > 0);
 	Assert(*nblocks <= MAX_IO_COMBINE_LIMIT);
+	Assert(*nblocks == 1 || allow_forwarding);
 
 	for (int i = 0; i < actual_nblocks; ++i)
 	{
@@ -1307,6 +1309,11 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 			else
 				bufHdr = GetBufferDescriptor(buffers[i] - 1);
 			found = pg_atomic_read_u32(&bufHdr->state) & BM_VALID;
+
+			ereport(DEBUG3,
+					errmsg("found forwarded buffer %d",
+						   buffers[i]),
+					errhidestmt(true), errhidecontext(true));
 		}
 		else
 		{
@@ -1372,25 +1379,59 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 	operation->blocknum = blockNum;
 	operation->flags = flags;
 	operation->nblocks = actual_nblocks;
+	pgaio_wref_clear(&operation->io_wref);
 
-	if (flags & READ_BUFFERS_ISSUE_ADVICE)
+	/*
+	 * When using AIO, start the IO in the background. If not, issue prefetch
+	 * requests if desired by the caller.
+	 *
+	 * The reason we have a dedicated path for IOMETHOD_SYNC here is to
+	 * de-risk the introduction of AIO somewhat. It's a large architectural
+	 * change, with lots of chances for unanticipated performance effects.
+	 *
+	 * Use of IOMETHOD_SYNC already leads to not actually performing IO
+	 * asynchronously, but without the check here we'd execute IO earlier than
+	 * we used to. Eventually this IOMETHOD_SYNC specific path should go away.
+	 */
+	if (io_method != IOMETHOD_SYNC)
 	{
 		/*
-		 * In theory we should only do this if PinBufferForBlock() had to
-		 * allocate new buffers above.  That way, if two calls to
-		 * StartReadBuffers() were made for the same blocks before
-		 * WaitReadBuffers(), only the first would issue the advice. That'd be
-		 * a better simulation of true asynchronous I/O, which would only
-		 * start the I/O once, but isn't done here for simplicity.
+		 * Try to start IO asynchronously. It's possible that no IO needs to
+		 * be started, if another backend already performed the IO.
+		 *
+		 * Note that if an IO is started, it might not cover the entire
+		 * requested range, e.g. because an intermediary block has been read
+		 * in by another backend.  In that case any "trailing" buffers we
+		 * already pinned above will be "forwarded" by read_stream.c to the
+		 * next call to StartReadBuffers(). This is signalled to the caller by
+		 * decrementing *nblocks.
 		 */
-		smgrprefetch(operation->smgr,
-					 operation->forknum,
-					 blockNum,
-					 actual_nblocks);
+		return AsyncReadBuffers(operation, nblocks);
 	}
+	else
+	{
+		operation->flags |= READ_BUFFERS_SYNCHRONOUSLY;
 
-	/* Indicate that WaitReadBuffers() should be called. */
-	return true;
+		if (flags & READ_BUFFERS_ISSUE_ADVICE)
+		{
+			/*
+			 * In theory we should only do this if PinBufferForBlock() had to
+			 * allocate new buffers above.  That way, if two calls to
+			 * StartReadBuffers() were made for the same blocks before
+			 * WaitReadBuffers(), only the first would issue the advice.
+			 * That'd be a better simulation of true asynchronous I/O, which
+			 * would only start the I/O once, but isn't done here for
+			 * simplicity.
+			 */
+			smgrprefetch(operation->smgr,
+						 operation->forknum,
+						 blockNum,
+						 actual_nblocks);
+		}
+
+		/* Indicate that WaitReadBuffers() should be called. */
+		return true;
+	}
 }
 
 /*
@@ -1458,12 +1499,31 @@ StartReadBuffer(ReadBuffersOperation *operation,
 }
 
 static inline bool
-WaitReadBuffersCanStartIO(Buffer buffer, bool nowait)
+ReadBuffersCanStartIO(Buffer buffer, bool nowait)
 {
 	if (BufferIsLocal(buffer))
 	{
 		BufferDesc *bufHdr = GetLocalBufferDescriptor(-buffer - 1);
 
+		/*
+		 * The buffer could have IO in progress by another scan. Right now
+		 * localbuf.c doesn't use IO_IN_PROGRESS, which is why we need this
+		 * hack.
+		 *
+		 * TODO: localbuf.c should use IO_IN_PROGRESS / have an equivalent of
+		 * StartBufferIO().
+		 */
+		if (pgaio_wref_valid(&bufHdr->io_wref))
+		{
+			PgAioWaitRef iow = bufHdr->io_wref;
+
+			ereport(DEBUG3,
+					errmsg("waiting for temp buffer IO in CSIO"),
+					errhidestmt(true), errhidecontext(true));
+			pgaio_wref_wait(&iow);
+			return false;
+		}
+
 		return (pg_atomic_read_u32(&bufHdr->state) & BM_VALID) == 0;
 	}
 	else
@@ -1473,28 +1533,163 @@ WaitReadBuffersCanStartIO(Buffer buffer, bool nowait)
 void
 WaitReadBuffers(ReadBuffersOperation *operation)
 {
-	Buffer	   *buffers;
+	IOContext	io_context;
+	IOObject	io_object;
 	int			nblocks;
-	BlockNumber blocknum;
-	ForkNumber	forknum;
-	IOContext	io_context;
-	IOObject	io_object;
-	char		persistence;
+	PgAioReturn *aio_ret;
+
+	/*
+	 * If we get here without an IO operation having been issued, io_method ==
+	 * IOMETHOD_SYNC path must have been used. In that case, we start - as we
+	 * used to before - the IO now, just before waiting.
+	 *
+	 * This path is expected to eventually go away.
+	 */
+	if (!pgaio_wref_valid(&operation->io_wref))
+	{
+		Assert(io_method == IOMETHOD_SYNC);
+
+		while (true)
+		{
+			nblocks = operation->nblocks;
+
+			if (!AsyncReadBuffers(operation, &nblocks))
+			{
+				/* all blocks were already read in concurrently */
+				Assert(nblocks == operation->nblocks);
+				return;
+			}
+
+			Assert(nblocks > 0 && nblocks <= operation->nblocks);
+
+			if (nblocks == operation->nblocks)
+			{
+				/* will wait below as if this had been normal AIO */
+				break;
+			}
+
+			/*
+			 * It's unlikely, but possible, that AsyncReadBuffers() wasn't
+			 * able to initiate IO for all the relevant buffers. In that case
+			 * we need to wait for the prior IO before issuing more IO.
+			 */
+			WaitReadBuffers(operation);
+		}
+	}
+
+	if (operation->persistence == RELPERSISTENCE_TEMP)
+	{
+		io_context = IOCONTEXT_NORMAL;
+		io_object = IOOBJECT_TEMP_RELATION;
+	}
+	else
+	{
+		io_context = IOContextForStrategy(operation->strategy);
+		io_object = IOOBJECT_RELATION;
+	}
+
+restart:
 
 	/* Find the range of the physical read we need to perform. */
 	nblocks = operation->nblocks;
-	buffers = &operation->buffers[0];
-	blocknum = operation->blocknum;
-	forknum = operation->forknum;
-	persistence = operation->persistence;
-
 	Assert(nblocks > 0);
 	Assert(nblocks <= MAX_IO_COMBINE_LIMIT);
 
+	aio_ret = &operation->io_return;
+
+	/*
+	 * For IO timing we just count the time spent waiting for the IO.
+	 *
+	 * XXX: We probably should track the IO operation, rather than its time,
+	 * separately, when initiating the IO. But right now that's not quite
+	 * allowed by the interface.
+	 */
+
+	/*
+	 * Tracking a wait even if we don't actually need to wait
+	 *
+	 * a) is not cheap
+	 *
+	 * b) reports some time as waiting, even if we never waited.
+	 */
+	if (aio_ret->result.status == ARS_UNKNOWN &&
+		!pgaio_wref_check_done(&operation->io_wref))
+	{
+		instr_time	io_start = pgstat_prepare_io_time(track_io_timing);
+
+		pgaio_wref_wait(&operation->io_wref);
+
+		/*
+		 * The IO operation itself was already counted earlier, in
+		 * AsyncReadBuffers().
+		 */
+		pgstat_count_io_op_time(io_object, io_context, IOOP_READ,
+								io_start, 0, 0);
+	}
+	else
+	{
+		Assert(pgaio_wref_check_done(&operation->io_wref));
+	}
+
+	if (aio_ret->result.status == ARS_PARTIAL)
+	{
+		/*
+		 * We'll retry below, so we just emit a debug message the server log
+		 * (or not even that in prod scenarios).
+		 */
+		pgaio_result_report(aio_ret->result, &aio_ret->target_data, DEBUG1);
+
+		/*
+		 * Try to perform the rest of the IO.  Buffers for which IO has
+		 * completed successfully will be discovered as such and not retried.
+		 */
+		nblocks = operation->nblocks;
+
+		elog(DEBUG3, "retrying IO after partial failure");
+		CHECK_FOR_INTERRUPTS();
+		AsyncReadBuffers(operation, &nblocks);
+		goto restart;
+	}
+	else if (aio_ret->result.status != ARS_OK)
+		pgaio_result_report(aio_ret->result, &aio_ret->target_data, ERROR);
+
+	if (VacuumCostActive)
+		VacuumCostBalance += VacuumCostPageMiss * nblocks;
+
+	/* NB: READ_DONE tracepoint is executed in IO completion callback */
+}
+
+/*
+ * Initiate IO for the ReadBuffersOperation. If IO is only initiated for a
+ * subset of the blocks, *nblocks is updated to reflect that.
+ *
+ * Returns true if IO was initiated, false if no IO was necessary.
+ */
+static bool
+AsyncReadBuffers(ReadBuffersOperation *operation,
+				 int *nblocks)
+{
+	int			io_buffers_len = 0;
+	Buffer	   *buffers = &operation->buffers[0];
+	int			flags = operation->flags;
+	BlockNumber blocknum = operation->blocknum;
+	ForkNumber	forknum = operation->forknum;
+	bool		did_start_io = false;
+	PgAioHandle *ioh = NULL;
+	uint32		ioh_flags = 0;
+	IOContext	io_context;
+	IOObject	io_object;
+	char		persistence;
+
+	persistence = operation->rel
+		? operation->rel->rd_rel->relpersistence
+		: RELPERSISTENCE_PERMANENT;
+
 	if (persistence == RELPERSISTENCE_TEMP)
 	{
 		io_context = IOCONTEXT_NORMAL;
 		io_object = IOOBJECT_TEMP_RELATION;
+		ioh_flags |= PGAIO_HF_REFERENCES_LOCAL;
 	}
 	else
 	{
@@ -1502,6 +1697,14 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 		io_object = IOOBJECT_RELATION;
 	}
 
+	/*
+	 * When this IO is executed synchronously, either because the caller will
+	 * immediately block waiting for the IO or because IOMETHOD_SYNC is used,
+	 * the AIO subsystem needs to know.
+	 */
+	if (flags & READ_BUFFERS_SYNCHRONOUSLY)
+		ioh_flags |= PGAIO_HF_SYNCHRONOUS;
+
 	/*
 	 * We count all these blocks as read by this backend.  This is traditional
 	 * behavior, but might turn out to be not true if we find that someone
@@ -1511,25 +1714,53 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 	 * but another backend completed the read".
 	 */
 	if (persistence == RELPERSISTENCE_TEMP)
-		pgBufferUsage.local_blks_read += nblocks;
+		pgBufferUsage.local_blks_read += *nblocks;
 	else
-		pgBufferUsage.shared_blks_read += nblocks;
+		pgBufferUsage.shared_blks_read += *nblocks;
 
-	for (int i = 0; i < nblocks; ++i)
+	pgaio_wref_clear(&operation->io_wref);
+
+	/*
+	 * Loop until we have started one IO or we discover that all buffers are
+	 * already valid.
+	 */
+	for (int i = 0; i < *nblocks; ++i)
 	{
-		int			io_buffers_len;
 		Buffer		io_buffers[MAX_IO_COMBINE_LIMIT];
 		void	   *io_pages[MAX_IO_COMBINE_LIMIT];
-		instr_time	io_start;
 		BlockNumber io_first_block;
 
 		/*
-		 * Skip this block if someone else has already completed it.  If an
-		 * I/O is already in progress in another backend, this will wait for
-		 * the outcome: either done, or something went wrong and we will
-		 * retry.
+		 * Get IO before ReadBuffersCanStartIO, as pgaio_io_acquire() might
+		 * block, which we don't want after setting IO_IN_PROGRESS.
+		 *
+		 * XXX: Should we attribute the time spent in here to the IO? If there
+		 * already are a lot of IO operations in progress, getting an IO
+		 * handle will block waiting for some other IO operation to finish.
+		 *
+		 * In most cases it'll be free to get the IO, so a timer would be
+		 * overhead. Perhaps we should use pgaio_io_acquire_nb() and only
+		 * account IO time when pgaio_io_acquire_nb() returned false?
 		 */
-		if (!WaitReadBuffersCanStartIO(buffers[i], false))
+		if (likely(!ioh))
+			ioh = pgaio_io_acquire(CurrentResourceOwner,
+								   &operation->io_return);
+
+		/*
+		 * Skip this block if someone else has already completed it.
+		 *
+		 * If an I/O is already in progress in another backend, this will wait
+		 * for the outcome: either done, or something went wrong and we will
+		 * retry. But don't wait if we have staged, but haven't issued,
+		 * another IO.
+		 *
+		 * It's safe to start IO while we have unsubmitted IO, but it'd be
+		 * better to first submit it. But right now the boolean return value
+		 * from ReadBuffersCanStartIO()/StartBufferIO() doesn't allow to
+		 * distinguish between nowait=true trigger failure and the buffer
+		 * already being valid.
+		 */
+		if (!ReadBuffersCanStartIO(buffers[i], false))
 		{
 			/*
 			 * Report this as a 'hit' for this backend, even though it must
@@ -1541,6 +1772,11 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 											  operation->smgr->smgr_rlocator.locator.relNumber,
 											  operation->smgr->smgr_rlocator.backend,
 											  true);
+
+			ereport(DEBUG3,
+					errmsg("can't start io for first buffer %u: %s",
+						   buffers[i], DebugPrintBufferRefcount(buffers[i])),
+					errhidestmt(true), errhidecontext(true));
 			continue;
 		}
 
@@ -1550,6 +1786,11 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 		io_first_block = blocknum + i;
 		io_buffers_len = 1;
 
+		ereport(DEBUG5,
+				errmsg("first prepped for io: %s, offset %d",
+					   DebugPrintBufferRefcount(io_buffers[0]), i),
+				errhidestmt(true), errhidecontext(true));
+
 		/*
 		 * How many neighboring-on-disk blocks can we scatter-read into other
 		 * buffers at the same time?  In this case we don't wait if we see an
@@ -1557,86 +1798,54 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 		 * head block, so we should get on with that I/O as soon as possible.
 		 * We'll come back to this block again, above.
 		 */
-		while ((i + 1) < nblocks &&
-			   WaitReadBuffersCanStartIO(buffers[i + 1], true))
+		while ((i + 1) < *nblocks &&
+			   ReadBuffersCanStartIO(buffers[i + 1], true))
 		{
 			/* Must be consecutive block numbers. */
 			Assert(BufferGetBlockNumber(buffers[i + 1]) ==
 				   BufferGetBlockNumber(buffers[i]) + 1);
 
+			ereport(DEBUG5,
+					errmsg("seq prepped for io: %s, offset %d",
+						   DebugPrintBufferRefcount(buffers[i + 1]),
+						   i + 1),
+					errhidestmt(true), errhidecontext(true));
+
 			io_buffers[io_buffers_len] = buffers[++i];
 			io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]);
 		}
 
-		io_start = pgstat_prepare_io_time(track_io_timing);
-		smgrreadv(operation->smgr, forknum, io_first_block, io_pages, io_buffers_len);
-		pgstat_count_io_op_time(io_object, io_context, IOOP_READ, io_start,
-								1, io_buffers_len * BLCKSZ);
+		pgaio_io_get_wref(ioh, &operation->io_wref);
 
-		/* Verify each block we read, and terminate the I/O. */
-		for (int j = 0; j < io_buffers_len; ++j)
-		{
-			BufferDesc *bufHdr;
-			Block		bufBlock;
+		pgaio_io_set_handle_data_32(ioh, (uint32 *) io_buffers, io_buffers_len);
 
-			if (persistence == RELPERSISTENCE_TEMP)
-			{
-				bufHdr = GetLocalBufferDescriptor(-io_buffers[j] - 1);
-				bufBlock = LocalBufHdrGetBlock(bufHdr);
-			}
-			else
-			{
-				bufHdr = GetBufferDescriptor(io_buffers[j] - 1);
-				bufBlock = BufHdrGetBlock(bufHdr);
-			}
+		if (persistence == RELPERSISTENCE_TEMP)
+			pgaio_io_register_callbacks(ioh, PGAIO_HCB_LOCAL_BUFFER_READV);
+		else
+			pgaio_io_register_callbacks(ioh, PGAIO_HCB_SHARED_BUFFER_READV);
 
-			/* check for garbage data */
-			if (!PageIsVerifiedExtended((Page) bufBlock, io_first_block + j,
-										PIV_LOG_WARNING | PIV_REPORT_STAT))
-			{
-				if ((operation->flags & READ_BUFFERS_ZERO_ON_ERROR) || zero_damaged_pages)
-				{
-					ereport(WARNING,
-							(errcode(ERRCODE_DATA_CORRUPTED),
-							 errmsg("invalid page in block %u of relation %s; zeroing out page",
-									io_first_block + j,
-									relpath(operation->smgr->smgr_rlocator, forknum))));
-					memset(bufBlock, 0, BLCKSZ);
-				}
-				else
-					ereport(ERROR,
-							(errcode(ERRCODE_DATA_CORRUPTED),
-							 errmsg("invalid page in block %u of relation %s",
-									io_first_block + j,
-									relpath(operation->smgr->smgr_rlocator, forknum))));
-			}
+		pgaio_io_set_flag(ioh, ioh_flags);
 
-			/* Terminate I/O and set BM_VALID. */
-			if (persistence == RELPERSISTENCE_TEMP)
-			{
-				uint32		buf_state = pg_atomic_read_u32(&bufHdr->state);
+		did_start_io = true;
+		smgrstartreadv(ioh, operation->smgr, forknum, io_first_block,
+					   io_pages, io_buffers_len);
+		ioh = NULL;
 
-				buf_state |= BM_VALID;
-				pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
-			}
-			else
-			{
-				/* Set BM_VALID, terminate IO, and wake up any waiters */
-				TerminateBufferIO(bufHdr, false, BM_VALID, true, true);
-			}
+		/* not obvious what we'd use for time */
+		pgstat_count_io_op(io_object, io_context, IOOP_READ,
+						   1, io_buffers_len * BLCKSZ);
 
-			/* Report I/Os as completing individually. */
-			TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, io_first_block + j,
-											  operation->smgr->smgr_rlocator.locator.spcOid,
-											  operation->smgr->smgr_rlocator.locator.dbOid,
-											  operation->smgr->smgr_rlocator.locator.relNumber,
-											  operation->smgr->smgr_rlocator.backend,
-											  false);
-		}
+		*nblocks = io_buffers_len;
+		break;
+	}
 
-		if (VacuumCostActive)
-			VacuumCostBalance += VacuumCostPageMiss * io_buffers_len;
+	if (ioh)
+	{
+		pgaio_io_release(ioh);
+		ioh = NULL;
 	}
+
+	return did_start_io;
 }
 
 /*
-- 
2.48.1.76.g4e746b1a31.dirty

