From a07f930bca25979c3ab78ec0353a109bb484846e Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Tue, 18 Mar 2025 14:40:06 -0400
Subject: [PATCH v2.14 14/29] aio: Basic read_stream adjustments for real AIO

Adapt the read stream logic for real AIO:
- If AIO is enabled, we shouldn't issue advice, but if it isn't, we should
  continue issuing advice
- AIO benefits from reading ahead with direct IO
- If effective_io_concurrency=0, pass READ_BUFFERS_SYNCHRONOUSLY to
  StartReadBuffers() to ensure synchronous IO execution

There are further improvements we should consider:

- While in read_stream_look_ahead(), we can use AIO batch submission mode for
  increased efficiency. That however requires care to avoid deadlocks and thus
  done separately.
- It can be beneficial to defer starting new IOs until we can issue multiple
  IOs at once. That however requires non-trivial heuristics to decide when to
  do so.

Co-authored-by: Andres Freund <andres@anarazel.de>
Co-authored-by: Thomas Munro <thomas.munro@gmail.com>
---
 src/backend/storage/aio/read_stream.c | 39 ++++++++++++++++++---------
 1 file changed, 26 insertions(+), 13 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index c60e37e7f7f..26e5dfe77db 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -72,6 +72,7 @@
 #include "postgres.h"
 
 #include "miscadmin.h"
+#include "storage/aio.h"
 #include "storage/fd.h"
 #include "storage/smgr.h"
 #include "storage/read_stream.h"
@@ -99,6 +100,8 @@ struct ReadStream
 	int16		pinned_buffers;
 	int16		distance;
 	int16		initialized_buffers;
+	int			read_buffers_flags;
+	bool		sync_mode;		/* using io_method=sync */
 	bool		advice_enabled;
 	bool		temporary;
 
@@ -250,7 +253,7 @@ read_stream_start_pending_read(ReadStream *stream)
 		Assert(stream->next_buffer_index == stream->oldest_buffer_index);
 
 	/* Do we need to issue read-ahead advice? */
-	flags = 0;
+	flags = stream->read_buffers_flags;
 	if (stream->advice_enabled)
 	{
 		if (stream->pending_read_blocknum == stream->seq_blocknum)
@@ -261,7 +264,7 @@ read_stream_start_pending_read(ReadStream *stream)
 			 * then stay of the way of the kernel's own read-ahead.
 			 */
 			if (stream->seq_until_processed != InvalidBlockNumber)
-				flags = READ_BUFFERS_ISSUE_ADVICE;
+				flags |= READ_BUFFERS_ISSUE_ADVICE;
 		}
 		else
 		{
@@ -272,7 +275,7 @@ read_stream_start_pending_read(ReadStream *stream)
 			 */
 			stream->seq_until_processed = stream->pending_read_blocknum;
 			if (stream->pinned_buffers > 0)
-				flags = READ_BUFFERS_ISSUE_ADVICE;
+				flags |= READ_BUFFERS_ISSUE_ADVICE;
 		}
 	}
 
@@ -613,27 +616,33 @@ read_stream_begin_impl(int flags,
 		stream->per_buffer_data = (void *)
 			MAXALIGN(&stream->ios[Max(1, max_ios)]);
 
+	stream->sync_mode = io_method == IOMETHOD_SYNC;
+
 #ifdef USE_PREFETCH
 
 	/*
-	 * This system supports prefetching advice.  We can use it as long as
-	 * direct I/O isn't enabled, the caller hasn't promised sequential access
-	 * (overriding our detection heuristics), and max_ios hasn't been set to
-	 * zero.
+	 * Read-ahead advice simulating asynchronous I/O with synchronous calls.
+	 * Issue advice only if AIO is not used, direct I/O isn't enabled, the
+	 * caller hasn't promised sequential access (overriding our detection
+	 * heuristics), and max_ios hasn't been set to zero.
 	 */
-	if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
+	if (stream->sync_mode &&
+		(io_direct_flags & IO_DIRECT_DATA) == 0 &&
 		(flags & READ_STREAM_SEQUENTIAL) == 0 &&
 		max_ios > 0)
 		stream->advice_enabled = true;
 #endif
 
 	/*
-	 * For now, max_ios = 0 is interpreted as max_ios = 1 with advice disabled
-	 * above.  If we had real asynchronous I/O we might need a slightly
-	 * different definition.
+	 * Setting max_ios to zero disables AIO and advice-based pseudo AIO, but
+	 * we still need to allocate space to combine and run one I/O.  Bump it up
+	 * to one, and remember to ask for synchronous I/O only.
 	 */
 	if (max_ios == 0)
+	{
 		max_ios = 1;
+		stream->read_buffers_flags = READ_BUFFERS_SYNCHRONOUSLY;
+	}
 
 	/*
 	 * Capture stable values for these two GUC-derived numbers for the
@@ -777,6 +786,11 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 
 		if (likely(next_blocknum != InvalidBlockNumber))
 		{
+			int			flags = stream->read_buffers_flags;
+
+			if (stream->advice_enabled)
+				flags |= READ_BUFFERS_ISSUE_ADVICE;
+
 			/*
 			 * Pin a buffer for the next call.  Same buffer entry, and
 			 * arbitrary I/O entry (they're all free).  We don't have to
@@ -792,8 +806,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 			if (likely(!StartReadBuffer(&stream->ios[0].op,
 										&stream->buffers[oldest_buffer_index],
 										next_blocknum,
-										stream->advice_enabled ?
-										READ_BUFFERS_ISSUE_ADVICE : 0)))
+										flags)))
 			{
 				/* Fast return. */
 				return buffer;
-- 
2.48.1.76.g4e746b1a31.dirty

