From fd18aa6530277487fcf2c602c570f36c274bbc81 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Tue, 5 Aug 2025 14:35:09 +1200
Subject: [PATCH v3] Give StartReadBuffers() a more robust interface.

Remove the need for read_stream.c to initialize and clear
stream->buffers[] elements that will be passed to StartReadBuffers(),
and to count them on return.  Instead, add an explicit npinned argument
to tell StartReadBuffers() how many buffers were already pinned by the
previous call (if it was a short read), and to learn how many are pinned
beyond *nblocks after this call returns (if it is a short read).  It is
an in/out argument.  The output of one call can be passed directly as
input to the next call, just like the buffers themselves.

The new function argument count still fits in registers in Unixoid
calling conventions.  It doesn't on Windows/x86, but it didn't before
either.  The single-buffer StartReadBuffer() specialization is not
affected.

read_stream.c now uses stream->pending_read_npinned to carry that number
from call to call, replacing stream->forwarded_buffers.  It no longer
maintains it itself, which now seems a little fragile, layer-violating
and expensive.  It still needs to consult the value when respecting pin
limits, but other than that it's entirely StartReadBuffers()' job to
worry about it.  The new member variable name better reflects its
relationship with pending_read_{blocknum,nblocks}.  It is the number of
buffers in that range that were already pinned by an earlier short read.
After a short read, _blocknum advances, _nblocks shrinks, and _npinned
has already been filled in by StartReadBuffers(), so we're ready for the
next call at next_buffer_index, where *npinned buffers have been placed
by StartReadBuffers().

Assertions about the identity of forwarded buffers are removed, because
StartReadBuffers() already has such assertions, and assertions about
queue entries after that are no longer relevant.

Reviewed-by: Xuneng Zhou <xunengzhou@gmail.com>
Discussion: https://postgr.es/m/19006-80fcaaf69000377e%40postgresql.org
---
 src/backend/storage/aio/read_stream.c | 107 +++++---------------------
 src/backend/storage/buffer/bufmgr.c   |  44 +++++++----
 src/include/storage/bufmgr.h          |   1 +
 src/test/modules/test_aio/test_aio.c  |  11 +++
 4 files changed, 60 insertions(+), 103 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 2374b4cd507..a6169188a53 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -99,7 +99,6 @@ struct ReadStream
 	int16		ios_in_progress;
 	int16		queue_size;
 	int16		max_pinned_buffers;
-	int16		forwarded_buffers;
 	int16		pinned_buffers;
 
 	/*
@@ -115,7 +114,6 @@ struct ReadStream
 	int16		combine_distance;
 	int16		readahead_distance;
 	uint16		distance_decay_holdoff;
-	int16		initialized_buffers;
 	int16		resume_readahead_distance;
 	int16		resume_combine_distance;
 	int			read_buffers_flags;
@@ -147,6 +145,7 @@ struct ReadStream
 	/* The read operation we are currently preparing. */
 	BlockNumber pending_read_blocknum;
 	int16		pending_read_nblocks;
+	int			pending_read_npinned;
 
 	/* Space for buffers and optional per-buffer private data. */
 	size_t		per_buffer_data_size;
@@ -318,10 +317,8 @@ static bool
 read_stream_start_pending_read(ReadStream *stream)
 {
 	bool		need_wait;
-	int			requested_nblocks;
 	int			nblocks;
 	int			flags;
-	int			forwarded;
 	int16		io_index;
 	int16		overflow;
 	int16		buffer_index;
@@ -335,33 +332,12 @@ read_stream_start_pending_read(ReadStream *stream)
 	Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
 		   stream->max_pinned_buffers);
 
-#ifdef USE_ASSERT_CHECKING
 	/* We had better not be overwriting an existing pinned buffer. */
 	if (stream->pinned_buffers > 0)
 		Assert(stream->next_buffer_index != stream->oldest_buffer_index);
 	else
 		Assert(stream->next_buffer_index == stream->oldest_buffer_index);
 
-	/*
-	 * Pinned buffers forwarded by a preceding StartReadBuffers() call that
-	 * had to split the operation should match the leading blocks of this
-	 * following StartReadBuffers() call.
-	 */
-	Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
-	for (int i = 0; i < stream->forwarded_buffers; ++i)
-		Assert(BufferGetBlockNumber(stream->buffers[stream->next_buffer_index + i]) ==
-			   stream->pending_read_blocknum + i);
-
-	/*
-	 * Check that we've cleared the queue/overflow entries corresponding to
-	 * the rest of the blocks covered by this read, unless it's the first go
-	 * around and we haven't even initialized them yet.
-	 */
-	for (int i = stream->forwarded_buffers; i < stream->pending_read_nblocks; ++i)
-		Assert(stream->next_buffer_index + i >= stream->initialized_buffers ||
-			   stream->buffers[stream->next_buffer_index + i] == InvalidBuffer);
-#endif
-
 	/* Do we need to issue read-ahead advice? */
 	flags = stream->read_buffers_flags;
 	if (stream->advice_enabled)
@@ -402,9 +378,9 @@ read_stream_start_pending_read(ReadStream *stream)
 		buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX);
 	else
 		buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
-	Assert(stream->forwarded_buffers <= stream->pending_read_nblocks);
+	Assert(stream->pending_read_npinned <= stream->pending_read_nblocks);
 
-	buffer_limit += stream->forwarded_buffers;
+	buffer_limit += stream->pending_read_npinned;
 	buffer_limit = Min(buffer_limit, PG_INT16_MAX);
 
 	if (buffer_limit == 0 && stream->pinned_buffers == 0)
@@ -431,20 +407,15 @@ read_stream_start_pending_read(ReadStream *stream)
 
 	/*
 	 * We say how many blocks we want to read, but it may be smaller on return
-	 * if the buffer manager decides to shorten the read.  Initialize buffers
-	 * to InvalidBuffer (= not a forwarded buffer) as input on first use only,
-	 * and keep the original nblocks number so we can check for forwarded
-	 * buffers as output, below.
+	 * if the buffer manager decides to shorten the read.
 	 */
 	buffer_index = stream->next_buffer_index;
 	io_index = stream->next_io_index;
-	while (stream->initialized_buffers < buffer_index + nblocks)
-		stream->buffers[stream->initialized_buffers++] = InvalidBuffer;
-	requested_nblocks = nblocks;
 	need_wait = StartReadBuffers(&stream->ios[io_index].op,
 								 &stream->buffers[buffer_index],
 								 stream->pending_read_blocknum,
 								 &nblocks,
+								 &stream->pending_read_npinned,
 								 flags);
 	stream->pinned_buffers += nblocks;
 
@@ -502,28 +473,15 @@ read_stream_start_pending_read(ReadStream *stream)
 		read_stream_count_io(stream, nblocks, stream->ios_in_progress);
 	}
 
-	/*
-	 * How many pins were acquired but forwarded to the next call?  These need
-	 * to be passed to the next StartReadBuffers() call by leaving them
-	 * exactly where they are in the queue, or released if the stream ends
-	 * early.  We need the number for accounting purposes, since they are not
-	 * counted in stream->pinned_buffers but we already hold them.
-	 */
-	forwarded = 0;
-	while (nblocks + forwarded < requested_nblocks &&
-		   stream->buffers[buffer_index + nblocks + forwarded] != InvalidBuffer)
-		forwarded++;
-	stream->forwarded_buffers = forwarded;
-
 	/*
 	 * We gave a contiguous range of buffer space to StartReadBuffers(), but
 	 * we want it to wrap around at queue_size.  Copy overflowing buffers to
 	 * the front of the array where they'll be consumed, but also leave a copy
 	 * in the overflow zone which the I/O operation has a pointer to (it needs
-	 * a contiguous array).  Both copies will be cleared when the buffers are
-	 * handed to the consumer.
+	 * a contiguous array).
 	 */
-	overflow = (buffer_index + nblocks + forwarded) - stream->queue_size;
+	overflow = (buffer_index + nblocks + stream->pending_read_npinned) -
+		stream->queue_size;
 	if (overflow > 0)
 	{
 		Assert(overflow < stream->queue_size);	/* can't overlap */
@@ -542,6 +500,7 @@ read_stream_start_pending_read(ReadStream *stream)
 	/* Adjust the pending read to cover the remaining portion, if any. */
 	stream->pending_read_blocknum += nblocks;
 	stream->pending_read_nblocks -= nblocks;
+	Assert(stream->pending_read_nblocks >= stream->pending_read_npinned);
 
 	return true;
 }
@@ -1036,13 +995,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 
 		/* Fast path assumptions. */
 		Assert(stream->ios_in_progress == 0);
-		Assert(stream->forwarded_buffers == 0);
 		Assert(stream->pinned_buffers == 1);
 		Assert(stream->readahead_distance == 1);
 		Assert(stream->combine_distance == 1);
 		Assert(stream->pending_read_nblocks == 0);
+		Assert(stream->pending_read_npinned == 0);
 		Assert(stream->per_buffer_data_size == 0);
-		Assert(stream->initialized_buffers > stream->oldest_buffer_index);
 
 		/* We're going to return the buffer we pinned last time. */
 		oldest_buffer_index = stream->oldest_buffer_index;
@@ -1124,7 +1082,6 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 			stream->combine_distance = 0;
 			stream->oldest_buffer_index = stream->next_buffer_index;
 			stream->pinned_buffers = 0;
-			stream->buffers[oldest_buffer_index] = InvalidBuffer;
 		}
 
 		stream->fast_path = false;
@@ -1274,16 +1231,6 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 			stream->seq_until_processed = InvalidBlockNumber;
 	}
 
-	/*
-	 * We must zap this queue entry, or else it would appear as a forwarded
-	 * buffer.  If it's potentially in the overflow zone (ie from a
-	 * multi-block I/O that wrapped around the queue), also zap the copy.
-	 */
-	stream->buffers[oldest_buffer_index] = InvalidBuffer;
-	if (oldest_buffer_index < stream->io_combine_limit - 1)
-		stream->buffers[stream->queue_size + oldest_buffer_index] =
-			InvalidBuffer;
-
 #if defined(CLOBBER_FREED_MEMORY) || defined(USE_VALGRIND)
 
 	/*
@@ -1329,26 +1276,13 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 #ifndef READ_STREAM_DISABLE_FAST_PATH
 	/* See if we can take the fast path for all-cached scans next time. */
 	if (stream->ios_in_progress == 0 &&
-		stream->forwarded_buffers == 0 &&
 		stream->pinned_buffers == 1 &&
 		stream->readahead_distance == 1 &&
 		stream->combine_distance == 1 &&
 		stream->pending_read_nblocks == 0 &&
+		stream->pending_read_npinned == 0 &&
 		stream->per_buffer_data_size == 0)
 	{
-		/*
-		 * The fast path spins on one buffer entry repeatedly instead of
-		 * rotating through the whole queue and clearing the entries behind
-		 * it.  If the buffer it starts with happened to be forwarded between
-		 * StartReadBuffers() calls and also wrapped around the circular queue
-		 * partway through, then a copy also exists in the overflow zone, and
-		 * it won't clear it out as the regular path would.  Do that now, so
-		 * it doesn't need code for that.
-		 */
-		if (stream->oldest_buffer_index < stream->io_combine_limit - 1)
-			stream->buffers[stream->queue_size + stream->oldest_buffer_index] =
-				InvalidBuffer;
-
 		stream->fast_path = true;
 	}
 #endif
@@ -1421,24 +1355,19 @@ read_stream_reset(ReadStream *stream)
 	while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
 		ReleaseBuffer(buffer);
 
-	/* Unpin any unused forwarded buffers. */
+	/*
+	 * Unpin any unused forwarded buffers.  See comments above
+	 * StartReadBuffers() for why we do this.
+	 */
 	index = stream->next_buffer_index;
-	while (index < stream->initialized_buffers &&
-		   (buffer = stream->buffers[index]) != InvalidBuffer)
+	while (stream->pending_read_npinned > 0)
 	{
-		Assert(stream->forwarded_buffers > 0);
-		stream->forwarded_buffers--;
-		ReleaseBuffer(buffer);
-
-		stream->buffers[index] = InvalidBuffer;
-		if (index < stream->io_combine_limit - 1)
-			stream->buffers[stream->queue_size + index] = InvalidBuffer;
-
+		ReleaseBuffer(stream->buffers[index]);
+		stream->pending_read_npinned--;
 		if (++index == stream->queue_size)
 			index = 0;
 	}
 
-	Assert(stream->forwarded_buffers == 0);
 	Assert(stream->pinned_buffers == 0);
 	Assert(stream->ios_in_progress == 0);
 
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 3cc0b0bdd92..b966a107b9a 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1369,16 +1369,20 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 					 Buffer *buffers,
 					 BlockNumber blockNum,
 					 int *nblocks,
+					 int *npinned,
 					 int flags,
 					 bool allow_forwarding)
 {
 	int			actual_nblocks = *nblocks;
+	int			actual_npinned = *npinned;
 	int			maxcombine = 0;
 	bool		did_start_io;
 	IOContext	io_context;
 	IOObject	io_object;
 
+	Assert(*npinned == 0 || allow_forwarding);
 	Assert(*nblocks == 1 || allow_forwarding);
+	Assert(*npinned <= *nblocks);
 	Assert(*nblocks > 0);
 	Assert(*nblocks <= MAX_IO_COMBINE_LIMIT);
 
@@ -1397,7 +1401,7 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 	{
 		bool		found;
 
-		if (allow_forwarding && buffers[i] != InvalidBuffer)
+		if (allow_forwarding && i < actual_npinned)
 		{
 			BufferDesc *bufHdr;
 
@@ -1443,6 +1447,7 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 										   operation->strategy,
 										   io_object, io_context,
 										   &found);
+			actual_npinned++;
 		}
 
 		if (found)
@@ -1456,6 +1461,7 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 			if (i == 0)
 			{
 				*nblocks = 1;
+				*npinned = actual_npinned - 1;
 
 #ifdef USE_ASSERT_CHECKING
 
@@ -1579,22 +1585,30 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 
 	CheckReadBuffersOperation(operation, !did_start_io);
 
+	/* Pinned buffers not included in final nblocks, forwarded to next call. */
+	*npinned = actual_npinned - *nblocks;
+
 	return did_start_io;
 }
 
 /*
  * Begin reading a range of blocks beginning at blockNum and extending for
- * *nblocks.  *nblocks and the buffers array are in/out parameters.  On entry,
- * the buffers elements covered by *nblocks must hold either InvalidBuffer or
- * buffers forwarded by an earlier call to StartReadBuffers() that was split
- * and is now being continued.  On return, *nblocks holds the number of blocks
- * accepted by this operation.  If it is less than the original number then
- * this operation has been split, but buffer elements up to the original
- * requested size may hold forwarded buffers to be used for a continuing
- * operation.  The caller must either start a new I/O beginning at the block
- * immediately following the blocks accepted by this call and pass those
- * buffers back in, or release them if it chooses not to.  It shouldn't make
- * any other use of or assumptions about forwarded buffers.
+ * *nblocks.  *nblocks, *npinned and the buffers array are in/out parameters.
+ *
+ * On entry, *nblocks is the desire number of block to read.  On exit, it may
+ * be a smaller number if the operation was split.
+ *
+ * On entry, *npinned is the number of pinned buffers forwarded by an earlier
+ * operation that was split.  On exit, it is the number forwarded by this call,
+ * which should be passed to the following call when continuing to read the
+ * same sequence of blocks, along with the corresponding buffers.
+ *
+ * When buffers are forwarded, as reported by *npinned, the caller must either
+ * start a new I/O beginning at the block immediately following the blocks
+ * accepted by this call (*nblocks on exit) and pass those buffers back in head
+ * position, or release them if it chooses not to.  They are located in the
+ * buffers array beginning at index *nblocks (on exit).  It shouldn't make any
+ * other use of or assumptions about forwarded buffers.
  *
  * If false is returned, no I/O is necessary and the buffers covered by
  * *nblocks on exit are valid and ready to be accessed.  If true is returned,
@@ -1610,9 +1624,10 @@ StartReadBuffers(ReadBuffersOperation *operation,
 				 Buffer *buffers,
 				 BlockNumber blockNum,
 				 int *nblocks,
+				 int *npinned,
 				 int flags)
 {
-	return StartReadBuffersImpl(operation, buffers, blockNum, nblocks, flags,
+	return StartReadBuffersImpl(operation, buffers, blockNum, nblocks, npinned, flags,
 								true /* expect forwarded buffers */ );
 }
 
@@ -1631,9 +1646,10 @@ StartReadBuffer(ReadBuffersOperation *operation,
 				int flags)
 {
 	int			nblocks = 1;
+	int			npinned = 0;
 	bool		result;
 
-	result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, flags,
+	result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, &npinned, flags,
 								  false /* single block, no forwarding */ );
 	Assert(nblocks == 1);		/* single block can't be short */
 
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 6837b35fc6d..262dc4262d0 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -250,6 +250,7 @@ extern bool StartReadBuffers(ReadBuffersOperation *operation,
 							 Buffer *buffers,
 							 BlockNumber blockNum,
 							 int *nblocks,
+							 int *npinned,
 							 int flags);
 extern bool WaitReadBuffers(ReadBuffersOperation *operation);
 
diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c
index 35efba1a5e3..e7c4864136c 100644
--- a/src/test/modules/test_aio/test_aio.c
+++ b/src/test/modules/test_aio/test_aio.c
@@ -709,6 +709,7 @@ read_buffers(PG_FUNCTION_ARGS)
 	Datum	   *buffers_datum;
 	bool	   *io_reqds;
 	int		   *nblocks_per_io;
+	int			npinned;
 
 	Assert(nblocks > 0);
 
@@ -724,6 +725,15 @@ read_buffers(PG_FUNCTION_ARGS)
 	rel = relation_open(relid, AccessShareLock);
 	smgr = RelationGetSmgr(rel);
 
+	/*
+	 * Pins might be forwarded between calls, if IOs are split after pins are
+	 * obtained.  Such buffers will be provided to the next call in the
+	 * buffers array, and their count will be carrier between calls in this
+	 * variable. (Only if we decided to give up the loop below would we need
+	 * to consult this, to unpin them.)
+	 */
+	npinned = 0;
+
 	/*
 	 * Do StartReadBuffers() until IO for all the required blocks has been
 	 * started (if required).
@@ -744,6 +754,7 @@ read_buffers(PG_FUNCTION_ARGS)
 										  &buffers[nblocks_done],
 										  startblock + nblocks_done,
 										  &nblocks_this_io,
+										  &npinned,
 										  0);
 		nblocks_per_io[nios] = nblocks_this_io;
 		nios++;
-- 
2.53.0

