From 6c9f6faffb23c81250298975112128b6521da429 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Mon, 10 Feb 2025 21:55:40 +1300
Subject: [PATCH v2.4 08/29] Support buffer forwarding in StartReadBuffers().

Sometimes we have to perform a short read because we hit a cached block
that ends a contiguous run of blocks requiring I/O.  We don't want
StartReadBuffers() to have to start more than one I/O, so we stop there.
We also don't want to have to unpin the cached block (and repin it
later), so previously we'd silently pretend the hit was part of the I/O,
and just leave it out of the read from disk.  Now, we'll "forward" it to
the next call.  We still write it to the buffers[] array for the caller
to pass back to us later, but it's not included in *nblocks.

This policy means that we no longer mix hits and misses in a single
operation's results, so we avoid the requirement to call
WaitReadBuffers(), which might stall, before the caller can make use of
the hits.  The caller will get the hit in the next call instead, and
know that it doesn't have to wait.  That's important for later work on
out-of-order read streams that minimize I/O stalls.

This also makes life easier for proposed work on true AIO, which
occasionally needs to split a large I/O after pinning all the buffers,
while the current coding only ever forwards a single bookending hit.

This API is natural for read_stream.c: it just leaves forwarded buffers
where they are in its circular queue, where the next call will pick them
up and continue, minimizing pin churn.

If we ever think of a good reason to disable this feature, i.e. for
other users of StartReadBuffers() that don't want to deal with forwarded
buffers, then we could add a flag for that.  For now read_steam.c is the
only user.
---
 src/include/storage/bufmgr.h        |   1 -
 src/backend/storage/buffer/bufmgr.c | 128 ++++++++++++++++++++--------
 2 files changed, 91 insertions(+), 38 deletions(-)

diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 597ecb97897..4a035f59a7d 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -130,7 +130,6 @@ struct ReadBuffersOperation
 	BlockNumber blocknum;
 	int			flags;
 	int16		nblocks;
-	int16		io_buffers_len;
 };
 
 typedef struct ReadBuffersOperation ReadBuffersOperation;
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 5ad1e2b18a9..47e1c3442b4 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1257,10 +1257,10 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 					 Buffer *buffers,
 					 BlockNumber blockNum,
 					 int *nblocks,
-					 int flags)
+					 int flags,
+					 bool allow_forwarding)
 {
 	int			actual_nblocks = *nblocks;
-	int			io_buffers_len = 0;
 	int			maxcombine = 0;
 
 	Assert(*nblocks > 0);
@@ -1270,30 +1270,80 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 	{
 		bool		found;
 
-		buffers[i] = PinBufferForBlock(operation->rel,
-									   operation->smgr,
-									   operation->persistence,
-									   operation->forknum,
-									   blockNum + i,
-									   operation->strategy,
-									   &found);
+		if (allow_forwarding && buffers[i] != InvalidBuffer)
+		{
+			BufferDesc *bufHdr;
+
+			/*
+			 * This is a buffer that was pinned by an earlier call to
+			 * StartReadBuffers(), but couldn't be handled in one operation at
+			 * that time.  The operation was split, and the caller has passed
+			 * an already pinned buffer back to us to handle the rest of the
+			 * operation.  It must continue at the expected block number.
+			 */
+			Assert(BufferGetBlockNumber(buffers[i]) == blockNum + i);
+
+			/*
+			 * It might be an already valid buffer (a hit) that followed the
+			 * final contiguous block of an earlier I/O (a miss) marking the
+			 * end of it, or a buffer that some other backend has since made
+			 * valid by performing the I/O for us, in which case we can handle
+			 * it as a hit now.  It is safe to check for a BM_VALID flag with
+			 * a relaxed load, because we got a fresh view of it while pinning
+			 * it in the previous call.
+			 *
+			 * On the other hand if we don't see BM_VALID yet, it must be an
+			 * I/O that was split by the previous call and we need to try to
+			 * start a new I/O from this block.  We're also racing against any
+			 * other backend that might start the I/O or even manage to mark
+			 * it BM_VALID after this check, BM_VALID after this check, but
+			 * StartBufferIO() will handle those cases.
+			 */
+			if (BufferIsLocal(buffers[i]))
+				bufHdr = GetLocalBufferDescriptor(-buffers[i] - 1);
+			else
+				bufHdr = GetBufferDescriptor(buffers[i] - 1);
+			found = pg_atomic_read_u32(&bufHdr->state) & BM_VALID;
+		}
+		else
+		{
+			buffers[i] = PinBufferForBlock(operation->rel,
+										   operation->smgr,
+										   operation->persistence,
+										   operation->forknum,
+										   blockNum + i,
+										   operation->strategy,
+										   &found);
+		}
 
 		if (found)
 		{
 			/*
-			 * Terminate the read as soon as we get a hit.  It could be a
-			 * single buffer hit, or it could be a hit that follows a readable
-			 * range.  We don't want to create more than one readable range,
-			 * so we stop here.
+			 * We have a hit.  If it's the first block in the requested range,
+			 * we can return it immediately and report that WaitReadBuffers()
+			 * does not need to be called.  If the initial value of *nblocks
+			 * was larger, the caller will have to call again for the rest.
 			 */
-			actual_nblocks = i + 1;
+			if (i == 0)
+			{
+				*nblocks = 1;
+				return false;
+			}
+
+			/*
+			 * Otherwise we already have an I/O to perform, but this block
+			 * can't be included as it is already valid.  Split the I/O here.
+			 * There may or may not be more blocks requiring I/O after this
+			 * one, we haven't checked, but it can't be contiguous with this
+			 * hit in the way.  We'll leave this buffer pinned, forwarding it
+			 * to the next call, avoiding the need to unpin it here and re-pin
+			 * it in the next call.
+			 */
+			actual_nblocks = i;
 			break;
 		}
 		else
 		{
-			/* Extend the readable range to cover this block. */
-			io_buffers_len++;
-
 			/*
 			 * Check how many blocks we can cover with the same IO. The smgr
 			 * implementation might e.g. be limited due to a segment boundary.
@@ -1314,15 +1364,11 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 	}
 	*nblocks = actual_nblocks;
 
-	if (likely(io_buffers_len == 0))
-		return false;
-
 	/* Populate information needed for I/O. */
 	operation->buffers = buffers;
 	operation->blocknum = blockNum;
 	operation->flags = flags;
 	operation->nblocks = actual_nblocks;
-	operation->io_buffers_len = io_buffers_len;
 
 	if (flags & READ_BUFFERS_ISSUE_ADVICE)
 	{
@@ -1337,7 +1383,7 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
 		smgrprefetch(operation->smgr,
 					 operation->forknum,
 					 blockNum,
-					 operation->io_buffers_len);
+					 actual_nblocks);
 	}
 
 	/* Indicate that WaitReadBuffers() should be called. */
@@ -1351,11 +1397,21 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
  * actual number, which may be fewer than requested.  Caller sets some of the
  * members of operation; see struct definition.
  *
+ * The initial contents of the elements of buffers up to *nblocks should
+ * either be InvalidBuffer or an already-pinned buffer that was left by an
+ * preceding call to StartReadBuffers() that had to be split.  On return, some
+ * elements of buffers may hold pinned buffers beyond the number indicated by
+ * the updated value of *nblocks.  Operations are split on boundaries known to
+ * smgr (eg md.c segment boundaries that require crossing into a different
+ * underlying file), or when already cached blocks are found in the buffer
+ * that prevent the formation of a contiguous read.
+ *
  * If false is returned, no I/O is necessary.  If true is returned, one I/O
  * has been started, and WaitReadBuffers() must be called with the same
  * operation object before the buffers are accessed.  Along with the operation
  * object, the caller-supplied array of buffers must remain valid until
- * WaitReadBuffers() is called.
+ * WaitReadBuffers() is called, and any forwarded buffers must also be
+ * preserved for a future call unless explicitly released.
  *
  * Currently the I/O is only started with optional operating system advice if
  * requested by the caller with READ_BUFFERS_ISSUE_ADVICE, and the real I/O
@@ -1369,13 +1425,18 @@ StartReadBuffers(ReadBuffersOperation *operation,
 				 int *nblocks,
 				 int flags)
 {
-	return StartReadBuffersImpl(operation, buffers, blockNum, nblocks, flags);
+	return StartReadBuffersImpl(operation, buffers, blockNum, nblocks, flags,
+								true /* expect forwarded buffers */ );
 }
 
 /*
  * Single block version of the StartReadBuffers().  This might save a few
  * instructions when called from another translation unit, because it is
  * specialized for nblocks == 1.
+ *
+ * This version does not support "forwarded" buffers: they cannot be created
+ * by reading only one block, and the current contents of *buffer is ignored
+ * on entry.
  */
 bool
 StartReadBuffer(ReadBuffersOperation *operation,
@@ -1386,7 +1447,8 @@ StartReadBuffer(ReadBuffersOperation *operation,
 	int			nblocks = 1;
 	bool		result;
 
-	result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, flags);
+	result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, flags,
+								  false /* single block, no forwarding */ );
 	Assert(nblocks == 1);		/* single block can't be short */
 
 	return result;
@@ -1416,24 +1478,16 @@ WaitReadBuffers(ReadBuffersOperation *operation)
 	IOObject	io_object;
 	char		persistence;
 
-	/*
-	 * Currently operations are only allowed to include a read of some range,
-	 * with an optional extra buffer that is already pinned at the end.  So
-	 * nblocks can be at most one more than io_buffers_len.
-	 */
-	Assert((operation->nblocks == operation->io_buffers_len) ||
-		   (operation->nblocks == operation->io_buffers_len + 1));
-
 	/* Find the range of the physical read we need to perform. */
-	nblocks = operation->io_buffers_len;
-	if (nblocks == 0)
-		return;					/* nothing to do */
-
+	nblocks = operation->nblocks;
 	buffers = &operation->buffers[0];
 	blocknum = operation->blocknum;
 	forknum = operation->forknum;
 	persistence = operation->persistence;
 
+	Assert(nblocks > 0);
+	Assert(nblocks <= MAX_IO_COMBINE_LIMIT);
+
 	if (persistence == RELPERSISTENCE_TEMP)
 	{
 		io_context = IOCONTEXT_NORMAL;
-- 
2.48.1.76.g4e746b1a31.dirty

