From 533585f2fab0c29ac33d0629e5aca5d86fd013f7 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Thu, 30 Jan 2025 11:42:03 +1300
Subject: [PATCH v2.6 13/34] Support buffer forwarding in read_stream.c.

In preparation for a following change to the buffer manager, teach read
stream to keep track of buffers that were "forwarded" from one call to
StartReadBuffers() to the next.

Since StartReadBuffers() buffers argument will become an in/out
argument, we need to initialize the buffer queue entries with
InvalidBuffer.  We don't want to do that up front, because we try to
keep stream initialization cheap and code that uses the fast path stays
in one single buffer queue element.  Satisfy both goals by initializing
the queue incrementally on the first cycle.
---
 src/backend/storage/aio/read_stream.c | 108 ++++++++++++++++++++++----
 1 file changed, 94 insertions(+), 14 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 6a39bd1d92d..e58e0edf221 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -112,8 +112,10 @@ struct ReadStream
 	int16		ios_in_progress;
 	int16		queue_size;
 	int16		max_pinned_buffers;
+	int16		forwarded_buffers;
 	int16		pinned_buffers;
 	int16		distance;
+	int16		initialized_buffers;
 	bool		advice_enabled;
 	bool		temporary;
 
@@ -280,7 +282,9 @@ read_stream_start_pending_read(ReadStream *stream,
 							   bool suppress_advice)
 {
 	bool		need_wait;
+	int			requested_nblocks;
 	int			nblocks;
+	int			forwarded;
 	int			flags;
 	int16		io_index;
 	int16		overflow;
@@ -312,13 +316,34 @@ read_stream_start_pending_read(ReadStream *stream,
 		flags = 0;
 
 	/*
-	 * We say how many blocks we want to read, but may be smaller on return.
-	 * On memory-constrained systems we may be also have to ask for a smaller
-	 * read ourselves.
+	 * On buffer-constrained systems we may need to limit the I/O size by the
+	 * available pin count.
 	 */
+	requested_nblocks = Min(buffer_limit, stream->pending_read_nblocks);
+	nblocks = requested_nblocks;
 	buffer_index = stream->next_buffer_index;
 	io_index = stream->next_io_index;
-	nblocks = Min(buffer_limit, stream->pending_read_nblocks);
+
+	/*
+	 * The first time around the queue we initialize it as we go, including
+	 * the overflow zone, because otherwise the entries would appear as
+	 * forwarded buffers.  This avoids initializing the whole queue up front
+	 * in cases where it is large but we don't ever use it due to the
+	 * all-cached fast path or small scans.
+	 */
+	while (stream->initialized_buffers < buffer_index + nblocks)
+		stream->buffers[stream->initialized_buffers++] = InvalidBuffer;
+
+	/*
+	 * Start the I/O.  Any buffers that are not InvalidBuffer will be
+	 * interpreted as already pinned, forwarded by an earlier call to
+	 * StartReadBuffers(), and must map to the expected blocks.  The nblocks
+	 * value may be smaller on return indicating the size of the I/O that
+	 * could be started.  Buffers beyond the output nblocks number may also
+	 * have been pinned without starting I/O due to various edge cases.  In
+	 * that case we'll just leave them in the queue ahead of us, "forwarded"
+	 * to the next call, avoiding the need to unpin/repin.
+	 */
 	need_wait = StartReadBuffers(&stream->ios[io_index].op,
 								 &stream->buffers[buffer_index],
 								 stream->pending_read_blocknum,
@@ -347,16 +372,35 @@ read_stream_start_pending_read(ReadStream *stream,
 		stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
 	}
 
+	/*
+	 * How many pins were acquired but forwarded to the next call?  These need
+	 * to be passed to the next StartReadBuffers() call, or released if the
+	 * stream ends early.  We need the number for accounting purposes, since
+	 * they are not counted in stream->pinned_buffers but we already hold
+	 * them.
+	 */
+	forwarded = 0;
+	while (nblocks + forwarded < requested_nblocks &&
+		   stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer)
+		forwarded++;
+	stream->forwarded_buffers = forwarded;
+
 	/*
 	 * We gave a contiguous range of buffer space to StartReadBuffers(), but
-	 * we want it to wrap around at queue_size.  Slide overflowing buffers to
-	 * the front of the array.
+	 * we want it to wrap around at queue_size.  Copy overflowing buffers to
+	 * the front of the array where they'll be consumed, but also leave a copy
+	 * in the overflow zone which the I/O operation has a pointer to (it needs
+	 * a contiguous array).  Both copies will be cleared when the buffers are
+	 * handed to the consumer.
 	 */
-	overflow = (buffer_index + nblocks) - stream->queue_size;
+	overflow = (buffer_index + nblocks + forwarded) - stream->queue_size;
 	if (overflow > 0)
-		memmove(&stream->buffers[0],
-				&stream->buffers[stream->queue_size],
-				sizeof(stream->buffers[0]) * overflow);
+	{
+		Assert(overflow < stream->queue_size);	/* can't overlap */
+		memcpy(&stream->buffers[0],
+			   &stream->buffers[stream->queue_size],
+			   sizeof(stream->buffers[0]) * overflow);
+	}
 
 	/* Move to the location of start of next read. */
 	read_stream_index_advance_n(stream, &buffer_index, nblocks);
@@ -381,6 +425,15 @@ read_stream_get_buffer_limit(ReadStream *stream)
 	else
 		buffers = GetAdditionalPinLimit();
 
+	/*
+	 * If we already have some forwarded buffers, we can certainly use those.
+	 * They are already pinned, and are mapped to the starting blocks of the
+	 * pending read, they just don't have any I/O started yet and are not
+	 * counted in stream->pinned_buffers.
+	 */
+	Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
+	buffers += stream->forwarded_buffers;
+
 	/*
 	 * Each stream is always allowed to try to acquire one pin if it doesn't
 	 * hold one already.  This is needed to guarantee progress, and just like
@@ -389,7 +442,7 @@ read_stream_get_buffer_limit(ReadStream *stream)
 	 * pin, ie the buffer pool is simply too small for the workload.
 	 */
 	if (buffers == 0 && stream->pinned_buffers == 0)
-		return 1;
+		buffers = 1;
 
 	/*
 	 * Otherwise, see how many additional pins the backend can currently pin,
@@ -755,10 +808,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 
 		/* Fast path assumptions. */
 		Assert(stream->ios_in_progress == 0);
+		Assert(stream->forwarded_buffers == 0);
 		Assert(stream->pinned_buffers == 1);
 		Assert(stream->distance == 1);
 		Assert(stream->pending_read_nblocks == 0);
 		Assert(stream->per_buffer_data_size == 0);
+		Assert(stream->initialized_buffers > stream->oldest_buffer_index);
 
 		/* We're going to return the buffer we pinned last time. */
 		oldest_buffer_index = stream->oldest_buffer_index;
@@ -807,6 +862,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 			stream->distance = 0;
 			stream->oldest_buffer_index = stream->next_buffer_index;
 			stream->pinned_buffers = 0;
+			stream->buffers[oldest_buffer_index] = InvalidBuffer;
 		}
 
 		stream->fast_path = false;
@@ -891,10 +947,15 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		}
 	}
 
-#ifdef CLOBBER_FREED_MEMORY
-	/* Clobber old buffer for debugging purposes. */
+	/*
+	 * We must zap this queue entry, or else it would appear as a forwarded
+	 * buffer.  If it's potentially in the overflow zone (ie it wrapped around
+	 * the queue), also zap that copy.
+	 */
 	stream->buffers[oldest_buffer_index] = InvalidBuffer;
-#endif
+	if (oldest_buffer_index < io_combine_limit - 1)
+		stream->buffers[stream->queue_size + oldest_buffer_index] =
+			InvalidBuffer;
 
 #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
 
@@ -937,6 +998,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 #ifndef READ_STREAM_DISABLE_FAST_PATH
 	/* See if we can take the fast path for all-cached scans next time. */
 	if (stream->ios_in_progress == 0 &&
+		stream->forwarded_buffers == 0 &&
 		stream->pinned_buffers == 1 &&
 		stream->distance == 1 &&
 		stream->pending_read_nblocks == 0 &&
@@ -972,6 +1034,7 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
 void
 read_stream_reset(ReadStream *stream)
 {
+	int16		index;
 	Buffer		buffer;
 
 	/* Stop looking ahead. */
@@ -985,6 +1048,23 @@ read_stream_reset(ReadStream *stream)
 	while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
 		ReleaseBuffer(buffer);
 
+	/* Unpin any unused forwarded buffers. */
+	index = stream->next_buffer_index;
+	while (index < stream->initialized_buffers &&
+		   (buffer = stream->buffers[index]) != InvalidBuffer)
+	{
+		Assert(stream->forwarded_buffers > 0);
+		stream->forwarded_buffers--;
+		ReleaseBuffer(buffer);
+
+		stream->buffers[index] = InvalidBuffer;
+		if (index < io_combine_limit - 1)
+			stream->buffers[stream->queue_size + index] = InvalidBuffer;
+
+		read_stream_index_advance(stream, &index);
+	}
+
+	Assert(stream->forwarded_buffers == 0);
 	Assert(stream->pinned_buffers == 0);
 	Assert(stream->ios_in_progress == 0);
 
-- 
2.48.1.76.g4e746b1a31.dirty

