From 917c654520a6d34c0da391ef74bf430cc2e0102f Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Fri, 15 Aug 2025 11:01:52 -0400
Subject: [PATCH v2 3/3] bufmgr: aio: Prototype for not waiting for
 already-in-progress IO

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/include/storage/bufmgr.h        |   1 +
 src/backend/storage/buffer/bufmgr.c | 150 ++++++++++++++++++++++++++--
 2 files changed, 142 insertions(+), 9 deletions(-)

diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 41fdc1e7693..7ddb867bc99 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -137,6 +137,7 @@ struct ReadBuffersOperation
 	int			flags;
 	int16		nblocks;
 	int16		nblocks_done;
+	bool		foreign_io;
 	PgAioWaitRef io_wref;
 	PgAioReturn io_return;
 };
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index fe470de63f2..7b245daaf4e 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1557,6 +1557,46 @@ ReadBuffersCanStartIOOnce(Buffer buffer, bool nowait)
 		return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait);
 }
 
+/*
+ * Check if the buffer is already undergoing read AIO. If it is, assign the
+ * IO's wait reference to operation->io_wref, thereby allowing the caller to
+ * wait for that IO.
+ */
+static inline bool
+ReadBuffersIOAlreadyInProgress(ReadBuffersOperation *operation, Buffer buffer)
+{
+	BufferDesc *desc;
+	uint32		buf_state;
+	PgAioWaitRef iow;
+
+	pgaio_wref_clear(&iow);
+
+	if (BufferIsLocal(buffer))
+	{
+		desc = GetLocalBufferDescriptor(-buffer - 1);
+		buf_state = pg_atomic_read_u32(&desc->state);
+		if ((buf_state & BM_IO_IN_PROGRESS) && !(buf_state & BM_VALID))
+			iow = desc->io_wref;
+	}
+	else
+	{
+		desc = GetBufferDescriptor(buffer - 1);
+		buf_state = LockBufHdr(desc);
+
+		if ((buf_state & BM_IO_IN_PROGRESS) && !(buf_state & BM_VALID))
+			iow = desc->io_wref;
+		UnlockBufHdr(desc, buf_state);
+	}
+
+	if (pgaio_wref_valid(&iow))
+	{
+		operation->io_wref = iow;
+		return true;
+	}
+
+	return false;
+}
+
 /*
  * Helper for AsyncReadBuffers that tries to get the buffer ready for IO.
  */
@@ -1689,7 +1729,7 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 			 *
 			 * we first check if we already know the IO is complete.
 			 */
-			if (aio_ret->result.status == PGAIO_RS_UNKNOWN &&
+			if ((operation->foreign_io || aio_ret->result.status == PGAIO_RS_UNKNOWN) &&
 				!pgaio_wref_check_done(&operation->io_wref))
 			{
 				instr_time	io_start = pgstat_prepare_io_time(track_io_timing);
@@ -1708,11 +1748,66 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 				Assert(pgaio_wref_check_done(&operation->io_wref));
 			}
 
-			/*
-			 * We now are sure the IO completed. Check the results. This
-			 * includes reporting on errors if there were any.
-			 */
-			ProcessReadBuffersResult(operation);
+			if (unlikely(operation->foreign_io))
+			{
+				Buffer		buffer = operation->buffers[operation->nblocks_done];
+				BufferDesc *desc;
+				uint32		buf_state;
+
+				if (BufferIsLocal(buffer))
+				{
+					desc = GetLocalBufferDescriptor(-buffer - 1);
+					buf_state = pg_atomic_read_u32(&desc->state);
+				}
+				else
+				{
+					desc = GetBufferDescriptor(buffer - 1);
+					buf_state = LockBufHdr(desc);
+					UnlockBufHdr(desc, buf_state);
+				}
+
+				if (buf_state & BM_VALID)
+				{
+					operation->nblocks_done += 1;
+					Assert(operation->nblocks_done <= operation->nblocks);
+
+					/*
+					 * Report and track this as a 'hit' for this backend, even
+					 * though it must have started out as a miss in
+					 * PinBufferForBlock(). The other backend (or ourselves,
+					 * as part of a read started earlier) will track this as a
+					 * 'read'.
+					 */
+					TRACE_POSTGRESQL_BUFFER_READ_DONE(operation->forknum,
+													  operation->blocknum + operation->nblocks_done,
+													  operation->smgr->smgr_rlocator.locator.spcOid,
+													  operation->smgr->smgr_rlocator.locator.dbOid,
+													  operation->smgr->smgr_rlocator.locator.relNumber,
+													  operation->smgr->smgr_rlocator.backend,
+													  true);
+
+					if (BufferIsLocal(buffer))
+						pgBufferUsage.local_blks_hit += 1;
+					else
+						pgBufferUsage.shared_blks_hit += 1;
+
+					if (operation->rel)
+						pgstat_count_buffer_hit(operation->rel);
+
+					pgstat_count_io_op(io_object, io_context, IOOP_HIT, 1, 0);
+
+					if (VacuumCostActive)
+						VacuumCostBalance += VacuumCostPageHit;
+				}
+			}
+			else
+			{
+				/*
+				 * We now are sure the IO completed. Check the results. This
+				 * includes reporting on errors if there were any.
+				 */
+				ProcessReadBuffersResult(operation);
+			}
 		}
 
 		/*
@@ -1798,6 +1893,43 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 		io_object = IOOBJECT_RELATION;
 	}
 
+	/*
+	 * If AIO is in progress, be it in this backend or another backend, we
+	 * just associate the wait reference with the operation and wait in
+	 * WaitReadBuffers(). This turns out to be important for performance in
+	 * two workloads:
+	 *
+	 * 1) A read stream that has to read the same block multiple times within
+	 * the readahead distance. This can happen e.g. for the table accesses of
+	 * an index scan.
+	 *
+	 * 2) Concurrent scans by multiple backends on the same relation.
+	 *
+	 * If we were to synchronously wait for the in-progress IO, we'd not be
+	 * able to keep enough I/O in flight.
+	 *
+	 * If we do find there is ongoing I/O for the buffer, we set up a 1-block
+	 * ReadBuffersOperation that WaitReadBuffers then can wait on.
+	 *
+	 * It's possible that another backend starts IO on the buffer between this
+	 * check and the ReadBuffersCanStartIO(nowait = false) below. In that case
+	 * we will synchronously wait for the IO below, but the window for that is
+	 * small enough that it won't happen often enough to have a significant
+	 * performance impact.
+	 */
+	if (ReadBuffersIOAlreadyInProgress(operation, buffers[nblocks_done]))
+	{
+		*nblocks_progress = 1;
+		operation->foreign_io = true;
+
+		CheckReadBuffersOperation(operation, false);
+
+
+		return true;
+	}
+
+	operation->foreign_io = false;
+
 	/*
 	 * If zero_damaged_pages is enabled, add the READ_BUFFERS_ZERO_ON_ERROR
 	 * flag. The reason for that is that, hopefully, zero_damaged_pages isn't
@@ -1855,9 +1987,9 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 	/*
 	 * Check if we can start IO on the first to-be-read buffer.
 	 *
-	 * If an I/O is already in progress in another backend, we want to wait
-	 * for the outcome: either done, or something went wrong and we will
-	 * retry.
+	 * If a synchronous I/O is in progress in another backend (it can't be
+	 * this backend), we want to wait for the outcome: either done, or
+	 * something went wrong and we will retry.
 	 */
 	if (!ReadBuffersCanStartIO(buffers[nblocks_done], false))
 	{
-- 
2.48.1.76.g4e746b1a31.dirty

