From 7d30d8b7eb9888800a2456d5f9b9a58458590f4d Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Sun, 9 Mar 2025 18:52:26 -0400
Subject: [PATCH v2.6 23/34] bufmgr: Use AIO in StartReadBuffers()

This finally introduces the first actual AIO user. StartReadBuffers() now uses
the AIO routines to issue IO. This converts a lot of callers to use the AIO
infrastructure.

As the implementation of StartReadBuffers() is also used by the functions for
reading individual blocks (StartReadBuffer() and through that
ReadBufferExtended()) this means all buffered read IO passes through the AIO
paths.  However, as those are synchronous reads, actually performing the IO
asynchronously would be rarely beneficial. Instead such IOs are flagged to
always be executed synchronously. This way we don't have to duplicate a fair
bit of code.

When io_method=sync is used, the IO patterns generated after this change are
the same as before, i.e. actual reads are only issued in WaitReadBuffers() and
StartReadBuffers() may issue prefetch requests.  This allows to bypass most of
the actual asynchronicity, which is important to make a change as big as this
less risky.

One thing worth calling out is that, if IO is actually executed
asynchronously, the precise meaning of what track_io_timing is measuring has
changed. Previously it tracked the time for each IO, but that does not make
sense when multiple IOs are executed concurrently. Now it only measures the
time actually spent waiting for IO.

While AIO is now actually used, the logic in read_stream.c will often prevent
using sufficiently many concurrent IOs. That will be addressed in the next
commits.

Co-authored-by: Andres Freund <andres@anarazel.de>
Co-authored-by: Thomas Munro <thomas.munro@gmail.com>
Discussion: https://postgr.es/m/uvrtrknj4kdytuboidbhwclo4gxhswwcpgadptsjvjqcluzmah%40brqs62irg4dt
Discussion: https://postgr.es/m/20210223100344.llw5an2aklengrmn@alap3.anarazel.de
Discussion: https://postgr.es/m/stj36ea6yyhoxtqkhpieia2z4krnam7qyetc57rfezgk4zgapf@gcnactj4z56m
---
 src/include/storage/bufmgr.h        |   6 +
 src/backend/storage/buffer/bufmgr.c | 399 +++++++++++++++++++++-------
 2 files changed, 305 insertions(+), 100 deletions(-)

diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 12687fde45e..db9a4673097 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -112,6 +112,9 @@ typedef struct BufferManagerRelation
 #define READ_BUFFERS_ZERO_ON_ERROR (1 << 0)
 /* Call smgrprefetch() if I/O necessary. */
 #define READ_BUFFERS_ISSUE_ADVICE (1 << 1)
+/* IO will immediately be waited for */
+#define READ_BUFFERS_SYNCHRONOUSLY (1 << 2)
+
 
 struct ReadBuffersOperation
 {
@@ -131,6 +134,9 @@ struct ReadBuffersOperation
 	BlockNumber blocknum;
 	int			flags;
 	int16		nblocks;
+
+	PgAioWaitRef io_wref;
+	PgAioReturn io_return;
 };
 
 typedef struct ReadBuffersOperation ReadBuffersOperation;
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 149840f81ea..60df9eb8cba 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -528,6 +528,8 @@ static inline BufferDesc *BufferAlloc(SMgrRelation smgr,
 									  BlockNumber blockNum,
 									  BufferAccessStrategy strategy,
 									  bool *foundPtr, IOContext io_context);
+static bool AsyncReadBuffers(ReadBuffersOperation *operation,
+							 int *nblocks);
 static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context);
 static void FlushBuffer(BufferDesc *buf, SMgrRelation reln,
 						IOObject io_object, IOContext io_context);
@@ -1228,10 +1230,9 @@ ReadBuffer_common(Relation rel, SMgrRelation smgr, char smgr_persistence,
 		return buffer;
 	}
 
+	flags = READ_BUFFERS_SYNCHRONOUSLY;
 	if (mode == RBM_ZERO_ON_ERROR)
-		flags = READ_BUFFERS_ZERO_ON_ERROR;
-	else
-		flags = 0;
+		flags |= READ_BUFFERS_ZERO_ON_ERROR;
 	operation.smgr = smgr;
 	operation.rel = rel;
 	operation.persistence = persistence;
@@ -1259,6 +1260,7 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 
 	Assert(*nblocks > 0);
 	Assert(*nblocks <= MAX_IO_COMBINE_LIMIT);
+	Assert(*nblocks == 1 || allow_forwarding);
 
 	for (int i = 0; i < actual_nblocks; ++i)
 	{
@@ -1298,6 +1300,11 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 			else
 				bufHdr = GetBufferDescriptor(buffers[i] - 1);
 			found = pg_atomic_read_u32(&bufHdr->state) & BM_VALID;
+
+			ereport(DEBUG3,
+					errmsg("found forwarded buffer %d",
+						   buffers[i]),
+					errhidestmt(true), errhidecontext(true));
 		}
 		else
 		{
@@ -1363,25 +1370,59 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 	operation->blocknum = blockNum;
 	operation->flags = flags;
 	operation->nblocks = actual_nblocks;
+	pgaio_wref_clear(&operation->io_wref);
 
-	if (flags & READ_BUFFERS_ISSUE_ADVICE)
+	/*
+	 * When using AIO, start the IO in the background. If not, issue prefetch
+	 * requests if desired by the caller.
+	 *
+	 * The reason we have a dedicated path for IOMETHOD_SYNC here is to
+	 * de-risk the introduction of AIO somewhat. It's a large architectural
+	 * change, with lots of chances for unanticipated performance effects.
+	 *
+	 * Use of IOMETHOD_SYNC already leads to not actually performing IO
+	 * asynchronously, but without the check here we'd execute IO earlier than
+	 * we used to. Eventually this IOMETHOD_SYNC specific path should go away.
+	 */
+	if (io_method != IOMETHOD_SYNC)
 	{
 		/*
-		 * In theory we should only do this if PinBufferForBlock() had to
-		 * allocate new buffers above.  That way, if two calls to
-		 * StartReadBuffers() were made for the same blocks before
-		 * WaitReadBuffers(), only the first would issue the advice. That'd be
-		 * a better simulation of true asynchronous I/O, which would only
-		 * start the I/O once, but isn't done here for simplicity.
+		 * Try to start IO asynchronously. It's possible that no IO needs to
+		 * be started, if another backend already performed the IO.
+		 *
+		 * Note that if an IO is started, it might not cover the entire
+		 * requested range, e.g. because an intermediary block has been read
+		 * in by another backend.  In that case any "trailing" buffers we
+		 * already pinned above will be "forwarded" by read_stream.c to the
+		 * next call to StartReadBuffers(). This is signalled to the caller by
+		 * decrementing *nblocks.
 		 */
-		smgrprefetch(operation->smgr,
-					 operation->forknum,
-					 blockNum,
-					 actual_nblocks);
+		return AsyncReadBuffers(operation, nblocks);
 	}
+	else
+	{
+		operation->flags |= READ_BUFFERS_SYNCHRONOUSLY;
 
-	/* Indicate that WaitReadBuffers() should be called. */
-	return true;
+		if (flags & READ_BUFFERS_ISSUE_ADVICE)
+		{
+			/*
+			 * In theory we should only do this if PinBufferForBlock() had to
+			 * allocate new buffers above.  That way, if two calls to
+			 * StartReadBuffers() were made for the same blocks before
+			 * WaitReadBuffers(), only the first would issue the advice.
+			 * That'd be a better simulation of true asynchronous I/O, which
+			 * would only start the I/O once, but isn't done here for
+			 * simplicity.
+			 */
+			smgrprefetch(operation->smgr,
+						 operation->forknum,
+						 blockNum,
+						 actual_nblocks);
+		}
+
+		/* Indicate that WaitReadBuffers() should be called. */
+		return true;
+	}
 }
 
 /*
@@ -1449,7 +1490,7 @@ StartReadBuffer(ReadBuffersOperation *operation,
 }
 
 static inline bool
-WaitReadBuffersCanStartIO(Buffer buffer, bool nowait)
+ReadBuffersCanStartIO(Buffer buffer, bool nowait)
 {
 	if (BufferIsLocal(buffer))
 		return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1),
@@ -1462,28 +1503,163 @@ WaitReadBuffersCanStartIO(Buffer buffer, bool nowait)
 void
 WaitReadBuffers(ReadBuffersOperation *operation)
 {
-	Buffer	   *buffers;
+	IOContext	io_context;
+	IOObject	io_object;
 	int			nblocks;
-	BlockNumber blocknum;
-	ForkNumber	forknum;
-	IOContext	io_context;
-	IOObject	io_object;
-	char		persistence;
+	PgAioReturn *aio_ret;
+
+	/*
+	 * If we get here without an IO operation having been issued, io_method ==
+	 * IOMETHOD_SYNC path must have been used. In that case, we start - as we
+	 * used to before - the IO now, just before waiting.
+	 *
+	 * This path is expected to eventually go away.
+	 */
+	if (!pgaio_wref_valid(&operation->io_wref))
+	{
+		Assert(io_method == IOMETHOD_SYNC);
+
+		while (true)
+		{
+			nblocks = operation->nblocks;
+
+			if (!AsyncReadBuffers(operation, &nblocks))
+			{
+				/* all blocks were already read in concurrently */
+				Assert(nblocks == operation->nblocks);
+				return;
+			}
+
+			Assert(nblocks > 0 && nblocks <= operation->nblocks);
+
+			if (nblocks == operation->nblocks)
+			{
+				/* will wait below as if this had been normal AIO */
+				break;
+			}
+
+			/*
+			 * It's unlikely, but possible, that AsyncReadBuffers() wasn't
+			 * able to initiate IO for all the relevant buffers. In that case
+			 * we need to wait for the prior IO before issuing more IO.
+			 */
+			WaitReadBuffers(operation);
+		}
+	}
+
+	if (operation->persistence == RELPERSISTENCE_TEMP)
+	{
+		io_context = IOCONTEXT_NORMAL;
+		io_object = IOOBJECT_TEMP_RELATION;
+	}
+	else
+	{
+		io_context = IOContextForStrategy(operation->strategy);
+		io_object = IOOBJECT_RELATION;
+	}
+
+restart:
 
 	/* Find the range of the physical read we need to perform. */
 	nblocks = operation->nblocks;
-	buffers = &operation->buffers[0];
-	blocknum = operation->blocknum;
-	forknum = operation->forknum;
-	persistence = operation->persistence;
-
 	Assert(nblocks > 0);
 	Assert(nblocks <= MAX_IO_COMBINE_LIMIT);
 
+	aio_ret = &operation->io_return;
+
+	/*
+	 * For IO timing we just count the time spent waiting for the IO.
+	 *
+	 * XXX: We probably should track the IO operation, rather than its time,
+	 * separately, when initiating the IO. But right now that's not quite
+	 * allowed by the interface.
+	 */
+
+	/*
+	 * Tracking a wait even if we don't actually need to wait
+	 *
+	 * a) is not cheap
+	 *
+	 * b) reports some time as waiting, even if we never waited.
+	 */
+	if (aio_ret->result.status == ARS_UNKNOWN &&
+		!pgaio_wref_check_done(&operation->io_wref))
+	{
+		instr_time	io_start = pgstat_prepare_io_time(track_io_timing);
+
+		pgaio_wref_wait(&operation->io_wref);
+
+		/*
+		 * The IO operation itself was already counted earlier, in
+		 * AsyncReadBuffers().
+		 */
+		pgstat_count_io_op_time(io_object, io_context, IOOP_READ,
+								io_start, 0, 0);
+	}
+	else
+	{
+		Assert(pgaio_wref_check_done(&operation->io_wref));
+	}
+
+	if (aio_ret->result.status == ARS_PARTIAL)
+	{
+		/*
+		 * We'll retry below, so we just emit a debug message the server log
+		 * (or not even that in prod scenarios).
+		 */
+		pgaio_result_report(aio_ret->result, &aio_ret->target_data, DEBUG1);
+
+		/*
+		 * Try to perform the rest of the IO.  Buffers for which IO has
+		 * completed successfully will be discovered as such and not retried.
+		 */
+		nblocks = operation->nblocks;
+
+		elog(DEBUG3, "retrying IO after partial failure");
+		CHECK_FOR_INTERRUPTS();
+		AsyncReadBuffers(operation, &nblocks);
+		goto restart;
+	}
+	else if (aio_ret->result.status != ARS_OK)
+		pgaio_result_report(aio_ret->result, &aio_ret->target_data, ERROR);
+
+	if (VacuumCostActive)
+		VacuumCostBalance += VacuumCostPageMiss * nblocks;
+
+	/* NB: READ_DONE tracepoint is executed in IO completion callback */
+}
+
+/*
+ * Initiate IO for the ReadBuffersOperation. If IO is only initiated for a
+ * subset of the blocks, *nblocks is updated to reflect that.
+ *
+ * Returns true if IO was initiated, false if no IO was necessary.
+ */
+static bool
+AsyncReadBuffers(ReadBuffersOperation *operation,
+				 int *nblocks)
+{
+	int			io_buffers_len = 0;
+	Buffer	   *buffers = &operation->buffers[0];
+	int			flags = operation->flags;
+	BlockNumber blocknum = operation->blocknum;
+	ForkNumber	forknum = operation->forknum;
+	bool		did_start_io = false;
+	PgAioHandle *ioh = NULL;
+	uint32		ioh_flags = 0;
+	IOContext	io_context;
+	IOObject	io_object;
+	char		persistence;
+
+	persistence = operation->rel
+		? operation->rel->rd_rel->relpersistence
+		: RELPERSISTENCE_PERMANENT;
+
 	if (persistence == RELPERSISTENCE_TEMP)
 	{
 		io_context = IOCONTEXT_NORMAL;
 		io_object = IOOBJECT_TEMP_RELATION;
+		ioh_flags |= PGAIO_HF_REFERENCES_LOCAL;
 	}
 	else
 	{
@@ -1491,6 +1667,14 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 		io_object = IOOBJECT_RELATION;
 	}
 
+	/*
+	 * When this IO is executed synchronously, either because the caller will
+	 * immediately block waiting for the IO or because IOMETHOD_SYNC is used,
+	 * the AIO subsystem needs to know.
+	 */
+	if (flags & READ_BUFFERS_SYNCHRONOUSLY)
+		ioh_flags |= PGAIO_HF_SYNCHRONOUS;
+
 	/*
 	 * We count all these blocks as read by this backend.  This is traditional
 	 * behavior, but might turn out to be not true if we find that someone
@@ -1500,25 +1684,53 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 	 * but another backend completed the read".
 	 */
 	if (persistence == RELPERSISTENCE_TEMP)
-		pgBufferUsage.local_blks_read += nblocks;
+		pgBufferUsage.local_blks_read += *nblocks;
 	else
-		pgBufferUsage.shared_blks_read += nblocks;
+		pgBufferUsage.shared_blks_read += *nblocks;
 
-	for (int i = 0; i < nblocks; ++i)
+	pgaio_wref_clear(&operation->io_wref);
+
+	/*
+	 * Loop until we have started one IO or we discover that all buffers are
+	 * already valid.
+	 */
+	for (int i = 0; i < *nblocks; ++i)
 	{
-		int			io_buffers_len;
 		Buffer		io_buffers[MAX_IO_COMBINE_LIMIT];
 		void	   *io_pages[MAX_IO_COMBINE_LIMIT];
-		instr_time	io_start;
 		BlockNumber io_first_block;
 
 		/*
-		 * Skip this block if someone else has already completed it.  If an
-		 * I/O is already in progress in another backend, this will wait for
-		 * the outcome: either done, or something went wrong and we will
-		 * retry.
+		 * Get IO before ReadBuffersCanStartIO, as pgaio_io_acquire() might
+		 * block, which we don't want after setting IO_IN_PROGRESS.
+		 *
+		 * XXX: Should we attribute the time spent in here to the IO? If there
+		 * already are a lot of IO operations in progress, getting an IO
+		 * handle will block waiting for some other IO operation to finish.
+		 *
+		 * In most cases it'll be free to get the IO, so a timer would be
+		 * overhead. Perhaps we should use pgaio_io_acquire_nb() and only
+		 * account IO time when pgaio_io_acquire_nb() returned false?
 		 */
-		if (!WaitReadBuffersCanStartIO(buffers[i], false))
+		if (likely(!ioh))
+			ioh = pgaio_io_acquire(CurrentResourceOwner,
+								   &operation->io_return);
+
+		/*
+		 * Skip this block if someone else has already completed it.
+		 *
+		 * If an I/O is already in progress in another backend, this will wait
+		 * for the outcome: either done, or something went wrong and we will
+		 * retry. But don't wait if we have staged, but haven't issued,
+		 * another IO.
+		 *
+		 * It's safe to start IO while we have unsubmitted IO, but it'd be
+		 * better to first submit it. But right now the boolean return value
+		 * from ReadBuffersCanStartIO()/StartBufferIO() doesn't allow to
+		 * distinguish between nowait=true trigger failure and the buffer
+		 * already being valid.
+		 */
+		if (!ReadBuffersCanStartIO(buffers[i], false))
 		{
 			/*
 			 * Report this as a 'hit' for this backend, even though it must
@@ -1530,6 +1742,11 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 											  operation->smgr->smgr_rlocator.locator.relNumber,
 											  operation->smgr->smgr_rlocator.backend,
 											  true);
+
+			ereport(DEBUG3,
+					errmsg("can't start io for first buffer %u: %s",
+						   buffers[i], DebugPrintBufferRefcount(buffers[i])),
+					errhidestmt(true), errhidecontext(true));
 			continue;
 		}
 
@@ -1539,6 +1756,11 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 		io_first_block = blocknum + i;
 		io_buffers_len = 1;
 
+		ereport(DEBUG5,
+				errmsg("first prepped for io: %s, offset %d",
+					   DebugPrintBufferRefcount(io_buffers[0]), i),
+				errhidestmt(true), errhidecontext(true));
+
 		/*
 		 * How many neighboring-on-disk blocks can we scatter-read into other
 		 * buffers at the same time?  In this case we don't wait if we see an
@@ -1546,78 +1768,55 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 		 * head block, so we should get on with that I/O as soon as possible.
 		 * We'll come back to this block again, above.
 		 */
-		while ((i + 1) < nblocks &&
-			   WaitReadBuffersCanStartIO(buffers[i + 1], true))
+		while ((i + 1) < *nblocks &&
+			   ReadBuffersCanStartIO(buffers[i + 1], true))
 		{
 			/* Must be consecutive block numbers. */
 			Assert(BufferGetBlockNumber(buffers[i + 1]) ==
 				   BufferGetBlockNumber(buffers[i]) + 1);
 
+			ereport(DEBUG5,
+					errmsg("seq prepped for io: %s, offset %d",
+						   DebugPrintBufferRefcount(buffers[i + 1]),
+						   i + 1),
+					errhidestmt(true), errhidecontext(true));
+
 			io_buffers[io_buffers_len] = buffers[++i];
 			io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]);
 		}
 
-		io_start = pgstat_prepare_io_time(track_io_timing);
-		smgrreadv(operation->smgr, forknum, io_first_block, io_pages, io_buffers_len);
-		pgstat_count_io_op_time(io_object, io_context, IOOP_READ, io_start,
-								1, io_buffers_len * BLCKSZ);
-
-		/* Verify each block we read, and terminate the I/O. */
-		for (int j = 0; j < io_buffers_len; ++j)
-		{
-			BufferDesc *bufHdr;
-			Block		bufBlock;
-
-			if (persistence == RELPERSISTENCE_TEMP)
-			{
-				bufHdr = GetLocalBufferDescriptor(-io_buffers[j] - 1);
-				bufBlock = LocalBufHdrGetBlock(bufHdr);
-			}
-			else
-			{
-				bufHdr = GetBufferDescriptor(io_buffers[j] - 1);
-				bufBlock = BufHdrGetBlock(bufHdr);
-			}
-
-			/* check for garbage data */
-			if (!PageIsVerifiedExtended((Page) bufBlock, io_first_block + j,
-										PIV_LOG_WARNING | PIV_REPORT_STAT))
-			{
-				if ((operation->flags & READ_BUFFERS_ZERO_ON_ERROR) || zero_damaged_pages)
-				{
-					ereport(WARNING,
-							(errcode(ERRCODE_DATA_CORRUPTED),
-							 errmsg("invalid page in block %u of relation %s; zeroing out page",
-									io_first_block + j,
-									relpath(operation->smgr->smgr_rlocator, forknum).str)));
-					memset(bufBlock, 0, BLCKSZ);
-				}
-				else
-					ereport(ERROR,
-							(errcode(ERRCODE_DATA_CORRUPTED),
-							 errmsg("invalid page in block %u of relation %s",
-									io_first_block + j,
-									relpath(operation->smgr->smgr_rlocator, forknum).str)));
-			}
-
-			/* Set BM_VALID, terminate IO, and wake up any waiters */
-			if (persistence == RELPERSISTENCE_TEMP)
-				TerminateLocalBufferIO(bufHdr, false, BM_VALID, true);
-			else
-				TerminateBufferIO(bufHdr, false, BM_VALID, true, true);
-
-			/* Report I/Os as completing individually. */
-			TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, io_first_block + j,
-											  operation->smgr->smgr_rlocator.locator.spcOid,
-											  operation->smgr->smgr_rlocator.locator.dbOid,
-											  operation->smgr->smgr_rlocator.locator.relNumber,
-											  operation->smgr->smgr_rlocator.backend,
-											  false);
-		}
-
-		if (VacuumCostActive)
-			VacuumCostBalance += VacuumCostPageMiss * io_buffers_len;
+		pgaio_io_get_wref(ioh, &operation->io_wref);
+
+		pgaio_io_set_handle_data_32(ioh, (uint32 *) io_buffers, io_buffers_len);
+
+		pgaio_io_register_callbacks(ioh,
+									persistence == RELPERSISTENCE_TEMP ?
+									PGAIO_HCB_LOCAL_BUFFER_READV :
+									PGAIO_HCB_SHARED_BUFFER_READV,
+									flags);
+
+		pgaio_io_set_flag(ioh, ioh_flags);
+
+		did_start_io = true;
+		smgrstartreadv(ioh, operation->smgr, forknum, io_first_block,
+					   io_pages, io_buffers_len);
+		ioh = NULL;
+
+		/* not obvious what we'd use for time */
+		pgstat_count_io_op(io_object, io_context, IOOP_READ,
+						   1, io_buffers_len * BLCKSZ);
+
+		*nblocks = io_buffers_len;
+		break;
+	}
+
+	if (ioh)
+	{
+		pgaio_io_release(ioh);
+		ioh = NULL;
 	}
+
+	return did_start_io;
 }
 
 /*
-- 
2.48.1.76.g4e746b1a31.dirty

