From c3858e25c03f8f949cf1564e52236b5647427cbb Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Fri, 3 Apr 2026 13:01:26 -0400
Subject: [PATCH v5 3/5] read stream: Split decision about look ahead for AIO
 and combining

In a subsequent commit the read-ahead distance will only be increased when
waiting for IO. Without further work that would cause a regression: As IO
combining and read-ahead are currently controlled by the same mechanism, we
would end up not allowing IO combining when never needing to wait for IO (as
the distance ends up too small to allow for full sized IOs), which can
increase CPU overhead. A typical reason to not have to wait for IO completion
at a low look-ahead distance is use of io_uring with the to-be-read data in
the page cache. But even with worker the IO submission rate may be low enough
for the worker to keep up.

One might think that we could just always perform IO combining, but doing so
at the start of a scan can cause performance regressions:

1) Performing a large IO commonly has a higher latency than smaller IOs. That
   is not a problem once reading ahead far enough, but at the start of a stream
   it can lead to longer waits for IO completion.

2) Sometimes read streams will not be read to completion. Immediately starting
   with full sized IOs leads to more wasted effort. This is not commonly an
   issue with existing read stream users, but the upcoming use of read streams
   to fetch table pages as part of an index scan frequently encounters this.

One of the comments in read_stream_should_look_ahead() refers to a motivation
that only really exists as of the next commit, but without it the code doesn't
make sense on its own.

Reviewed-by: Nazir Bilal Yavuz <byavuz81@gmail.com>
Reviewed-by: Melanie Plageman <melanieplageman@gmail.com>
Discussion: https://postgr.es/m/f3xxfrkafjxpyqxywcxricxgyizjirfceychyxsgn7bwjp5eda@kwbduhy7tfmu
Discussion: https://postgr.es/m/CA+hUKGL2PhFyDoqrHefqasOnaXhSg48t1phs3VM8BAdrZqKZkw@mail.gmail.com
---
 src/backend/storage/aio/read_stream.c | 141 +++++++++++++++++++-------
 1 file changed, 107 insertions(+), 34 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 3499776d210..a63e66988e1 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -98,10 +98,23 @@ struct ReadStream
 	int16		max_pinned_buffers;
 	int16		forwarded_buffers;
 	int16		pinned_buffers;
-	int16		distance;
+
+	/*
+	 * Limit of how far, in blocks, to look-ahead for IO combining and for
+	 * read-ahead.
+	 *
+	 * The limits for read-ahead and combining are handled separately to allow
+	 * for IO combining even in cases where the IO subsystem can keep up at a
+	 * low read-ahead distance, as doing larger IOs is more efficient.
+	 *
+	 * Set to 0 when the end of the stream is reached.
+	 */
+	int16		combine_distance;
+	int16		readahead_distance;
 	uint16		distance_decay_holdoff;
 	int16		initialized_buffers;
-	int16		resume_distance;
+	int16		resume_readahead_distance;
+	int16		resume_combine_distance;
 	int			read_buffers_flags;
 	bool		sync_mode;		/* using io_method=sync */
 	bool		batch_mode;		/* READ_STREAM_USE_BATCHING */
@@ -332,8 +345,8 @@ read_stream_start_pending_read(ReadStream *stream)
 
 		/* Shrink distance: no more look-ahead until buffers are released. */
 		new_distance = stream->pinned_buffers + buffer_limit;
-		if (stream->distance > new_distance)
-			stream->distance = new_distance;
+		if (stream->readahead_distance > new_distance)
+			stream->readahead_distance = new_distance;
 
 		/* Unless we have nothing to give the consumer, stop here. */
 		if (stream->pinned_buffers > 0)
@@ -374,12 +387,23 @@ read_stream_start_pending_read(ReadStream *stream)
 		 * perform IO asynchronously when starting out with a small look-ahead
 		 * distance.
 		 */
-		if (stream->distance > 1 && stream->ios_in_progress == 0)
+		if (stream->ios_in_progress == 0)
 		{
-			if (stream->distance_decay_holdoff == 0)
-				stream->distance--;
-			else
+			if (stream->distance_decay_holdoff > 0)
 				stream->distance_decay_holdoff--;
+			else
+			{
+				if (stream->readahead_distance > 1)
+					stream->readahead_distance--;
+
+				/*
+				 * XXX: Should we actually reduce this at any time other than
+				 * a reset? For now we have to, as this is also a condition
+				 * for re-enabling fast_path.
+				 */
+				if (stream->combine_distance > 1)
+					stream->combine_distance--;
+			}
 		}
 	}
 	else
@@ -448,20 +472,42 @@ static inline bool
 read_stream_should_look_ahead(ReadStream *stream)
 {
 	/* If the callback has signaled end-of-stream, we're done */
-	if (stream->distance == 0)
+	if (stream->readahead_distance == 0)
 		return false;
 
 	/* never start more IOs than our cap */
 	if (stream->ios_in_progress >= stream->max_ios)
 		return false;
 
+	/*
+	 * Allow looking further ahead if we are in the process of building a
+	 * larger IO, the IO is not yet big enough and we don't yet have IO in
+	 * flight.  Note that this is allowed even if we have reached the
+	 * read-ahead limit (but not the buffer pin limit, combine_distance is
+	 * capped by it and we are checking for pinned_buffers == 0).
+	 *
+	 * The reason this is restricted to not yet having an IO in flight is that
+	 * once we are actually reading ahead, we will not issue IOs before they
+	 * have reached the full size (or can't be grown further). But we *have*
+	 * to issue an IO once pinned_buffers == 0, otherwise there won't be a
+	 * buffer to return to the caller.
+	 *
+	 * This is important for cases where either effective_io_concurrency is
+	 * low or we never need to wait for IO and thus are not increasing the
+	 * distance. Without this we would end up with lots of small IOs.
+	 */
+	if (stream->pending_read_nblocks > 0 &&
+		stream->pinned_buffers == 0 &&
+		stream->pending_read_nblocks < stream->combine_distance)
+		return true;
+
 	/*
 	 * Don't start more read-ahead if that'd put us over the distance limit
 	 * for doing read-ahead. As stream->distance is capped by
 	 * max_pinned_buffers, this prevents us from looking ahead so far that it
 	 * would put us over the pin limit.
 	 */
-	if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->distance)
+	if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->readahead_distance)
 		return false;
 
 	return true;
@@ -490,14 +536,14 @@ read_stream_should_issue_now(ReadStream *stream)
 	 * If the callback has signaled end-of-stream, start the pending read
 	 * immediately. There is no further potential for IO combining.
 	 */
-	if (stream->distance == 0)
+	if (stream->readahead_distance == 0)
 		return true;
 
 	/*
-	 * If we've already reached io_combine_limit, there's no chance of growing
+	 * If we've already reached combine_distance, there's no chance of growing
 	 * the read further.
 	 */
-	if (pending_read_nblocks >= stream->io_combine_limit)
+	if (pending_read_nblocks >= stream->combine_distance)
 		return true;
 
 	/*
@@ -550,7 +596,8 @@ read_stream_look_ahead(ReadStream *stream)
 		if (blocknum == InvalidBlockNumber)
 		{
 			/* End of stream. */
-			stream->distance = 0;
+			stream->readahead_distance = 0;
+			stream->combine_distance = 0;
 			break;
 		}
 
@@ -597,7 +644,7 @@ read_stream_look_ahead(ReadStream *stream)
 	 * stream.  In the worst case we can always make progress one buffer at a
 	 * time.
 	 */
-	Assert(stream->pinned_buffers > 0 || stream->distance == 0);
+	Assert(stream->pinned_buffers > 0 || stream->readahead_distance == 0);
 
 	if (stream->batch_mode)
 		pgaio_exit_batchmode();
@@ -787,10 +834,17 @@ read_stream_begin_impl(int flags,
 	 * doing full io_combine_limit sized reads.
 	 */
 	if (flags & READ_STREAM_FULL)
-		stream->distance = Min(max_pinned_buffers, stream->io_combine_limit);
+	{
+		stream->readahead_distance = Min(max_pinned_buffers, stream->io_combine_limit);
+		stream->combine_distance = Min(max_pinned_buffers, stream->io_combine_limit);
+	}
 	else
-		stream->distance = 1;
-	stream->resume_distance = stream->distance;
+	{
+		stream->readahead_distance = 1;
+		stream->combine_distance = 1;
+	}
+	stream->resume_readahead_distance = stream->readahead_distance;
+	stream->resume_combine_distance = stream->combine_distance;
 
 	/*
 	 * Since we always access the same relation, we can initialize parts of
@@ -889,7 +943,8 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		Assert(stream->ios_in_progress == 0);
 		Assert(stream->forwarded_buffers == 0);
 		Assert(stream->pinned_buffers == 1);
-		Assert(stream->distance == 1);
+		Assert(stream->readahead_distance == 1);
+		Assert(stream->combine_distance == 1);
 		Assert(stream->pending_read_nblocks == 0);
 		Assert(stream->per_buffer_data_size == 0);
 		Assert(stream->initialized_buffers > stream->oldest_buffer_index);
@@ -963,7 +1018,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		else
 		{
 			/* No more blocks, end of stream. */
-			stream->distance = 0;
+			stream->readahead_distance = 0;
 			stream->oldest_buffer_index = stream->next_buffer_index;
 			stream->pinned_buffers = 0;
 			stream->buffers[oldest_buffer_index] = InvalidBuffer;
@@ -979,7 +1034,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		Assert(stream->oldest_buffer_index == stream->next_buffer_index);
 
 		/* End of stream reached?  */
-		if (stream->distance == 0)
+		if (stream->readahead_distance == 0)
 			return InvalidBuffer;
 
 		/*
@@ -993,7 +1048,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		/* End of stream reached? */
 		if (stream->pinned_buffers == 0)
 		{
-			Assert(stream->distance == 0);
+			Assert(stream->readahead_distance == 0);
 			return InvalidBuffer;
 		}
 	}
@@ -1014,7 +1069,10 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
 	{
 		int16		io_index = stream->oldest_io_index;
-		int32		distance;	/* wider temporary value, clamped below */
+
+		/* wider temporary values, clamped below */
+		int32		readahead_distance;
+		int32		combine_distance;
 
 		/* Sanity check that we still agree on the buffers. */
 		Assert(stream->ios[io_index].op.buffers ==
@@ -1027,10 +1085,18 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		if (++stream->oldest_io_index == stream->max_ios)
 			stream->oldest_io_index = 0;
 
-		/* Look-ahead distance ramps up rapidly after we do I/O. */
-		distance = stream->distance * 2;
-		distance = Min(distance, stream->max_pinned_buffers);
-		stream->distance = distance;
+		/*
+		 * Read-ahead and IO combining distances ramp up rapidly after we do
+		 * I/O.
+		 */
+		readahead_distance = stream->readahead_distance * 2;
+		readahead_distance = Min(readahead_distance, stream->max_pinned_buffers);
+		stream->readahead_distance = readahead_distance;
+
+		combine_distance = stream->combine_distance * 2;
+		combine_distance = Min(combine_distance, stream->io_combine_limit);
+		combine_distance = Min(combine_distance, stream->max_pinned_buffers);
+		stream->combine_distance = combine_distance;
 
 		/*
 		 * As we needed IO, prevent distance from being reduced within our
@@ -1111,7 +1177,8 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 	if (stream->ios_in_progress == 0 &&
 		stream->forwarded_buffers == 0 &&
 		stream->pinned_buffers == 1 &&
-		stream->distance == 1 &&
+		stream->readahead_distance == 1 &&
+		stream->combine_distance == 1 &&
 		stream->pending_read_nblocks == 0 &&
 		stream->per_buffer_data_size == 0)
 	{
@@ -1157,8 +1224,10 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
 BlockNumber
 read_stream_pause(ReadStream *stream)
 {
-	stream->resume_distance = stream->distance;
-	stream->distance = 0;
+	stream->resume_readahead_distance = stream->readahead_distance;
+	stream->resume_combine_distance = stream->combine_distance;
+	stream->readahead_distance = 0;
+	stream->combine_distance = 0;
 	return InvalidBlockNumber;
 }
 
@@ -1170,7 +1239,8 @@ read_stream_pause(ReadStream *stream)
 void
 read_stream_resume(ReadStream *stream)
 {
-	stream->distance = stream->resume_distance;
+	stream->readahead_distance = stream->resume_readahead_distance;
+	stream->combine_distance = stream->resume_combine_distance;
 }
 
 /*
@@ -1186,7 +1256,8 @@ read_stream_reset(ReadStream *stream)
 	Buffer		buffer;
 
 	/* Stop looking ahead. */
-	stream->distance = 0;
+	stream->readahead_distance = 0;
+	stream->combine_distance = 0;
 
 	/* Forget buffered block number and fast path state. */
 	stream->buffered_blocknum = InvalidBlockNumber;
@@ -1218,8 +1289,10 @@ read_stream_reset(ReadStream *stream)
 	Assert(stream->ios_in_progress == 0);
 
 	/* Start off assuming data is cached. */
-	stream->distance = 1;
-	stream->resume_distance = stream->distance;
+	stream->readahead_distance = 1;
+	stream->combine_distance = 1;
+	stream->resume_readahead_distance = stream->readahead_distance;
+	stream->resume_combine_distance = stream->combine_distance;
 	stream->distance_decay_holdoff = 0;
 }
 
-- 
2.53.0.1.gb2826b52eb

