From ccf77f7584d30ceb9997a0d47bd1e3e05cab1cdc Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Fri, 24 Jan 2025 23:52:53 +1300
Subject: [PATCH v2.4 06/29] Respect pin limits accurately in read_stream.c.

Read streams pin multiple buffers at once as required to combine I/O.
This also avoids having to unpin and repin later when issuing read-ahead
advice, and will be needed for proposed work that starts "real"
asynchronous I/O.

To avoid pinning too much of the buffer pool at once, we previously used
LimitAdditionalBuffers() to avoid pinning more than this backend's fair
share of the pool as a cap.  The coding was a naive and only checked the
cap once at stream initialization.

This commit moves the check to the time of use with new bufmgr APIs from
an earlier commit, since the result might change later due to pins
acquired later outside this stream.  No extra CPU cycles are added to
the all-buffered fast-path code (it only pins one buffer at a time), but
the I/O-starting path now re-checks the limit every time using simple
arithmetic.

In practice it was difficult to exceed the limit, but you could contrive
a workload to do it using multiple CURSORs and FETCHing from sequential
scans in round-robin fashion, so that each underlying stream computes
its limit before all the others have ramped up to their full look-ahead
distance.  Therefore, no back-patch for now.

Per code review from Andres, in the course of his AIO work.

Reported-by: Andres Freund <andres@anarazel.de>
---
 src/backend/storage/aio/read_stream.c | 111 ++++++++++++++++++++++----
 1 file changed, 95 insertions(+), 16 deletions(-)

diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index edeef292f75..1a51e6eed31 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -115,6 +115,7 @@ struct ReadStream
 	int16		pinned_buffers;
 	int16		distance;
 	bool		advice_enabled;
+	bool		temporary;
 
 	/*
 	 * One-block buffer to support 'ungetting' a block number, to resolve flow
@@ -274,7 +275,9 @@ read_stream_index_retreat(ReadStream *stream, int16 *index)
 #endif
 
 static void
-read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
+read_stream_start_pending_read(ReadStream *stream,
+							   int16 buffer_limit,
+							   bool suppress_advice)
 {
 	bool		need_wait;
 	int			nblocks;
@@ -308,10 +311,14 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
 	else
 		flags = 0;
 
-	/* We say how many blocks we want to read, but may be smaller on return. */
+	/*
+	 * We say how many blocks we want to read, but may be smaller on return.
+	 * On memory-constrained systems we may be also have to ask for a smaller
+	 * read ourselves.
+	 */
 	buffer_index = stream->next_buffer_index;
 	io_index = stream->next_io_index;
-	nblocks = stream->pending_read_nblocks;
+	nblocks = Min(buffer_limit, stream->pending_read_nblocks);
 	need_wait = StartReadBuffers(&stream->ios[io_index].op,
 								 &stream->buffers[buffer_index],
 								 stream->pending_read_blocknum,
@@ -360,11 +367,60 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
 	stream->pending_read_nblocks -= nblocks;
 }
 
+/*
+ * How many more buffers could we use, while respecting the soft limit?
+ */
+static int16
+read_stream_get_buffer_limit(ReadStream *stream)
+{
+	uint32		buffers;
+
+	/* Check how many local or shared pins we could acquire. */
+	if (stream->temporary)
+		buffers = GetAdditionalLocalPinLimit();
+	else
+		buffers = GetAdditionalPinLimit();
+
+	/*
+	 * Each stream is always allowed to try to acquire one pin if it doesn't
+	 * hold one already.  This is needed to guarantee progress, and just like
+	 * the simple ReadBuffer() operation in code that is not using this stream
+	 * API, if a buffer can't be pinned we'll raise an error when trying to
+	 * pin, ie the buffer pool is simply too small for the workload.
+	 */
+	if (buffers == 0 && stream->pinned_buffers == 0)
+		return 1;
+
+	/*
+	 * Otherwise, see how many additional pins the backend can currently pin,
+	 * which may be zero.  As above, this only guarantees that this backend
+	 * won't use more than its fair share if all backends can respect the soft
+	 * limit, not that a pin can actually be acquired without error.
+	 */
+	return Min(buffers, INT16_MAX);
+}
+
 static void
 read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 {
+	int16		buffer_limit;
+
+	/*
+	 * Check how many pins we could acquire now.  We do this here rather than
+	 * pushing it down into read_stream_start_pending_read(), because it
+	 * allows more flexibility in behavior when we run out of allowed pins.
+	 * Currently the policy is to start an I/O when we've run out of allowed
+	 * pins only if we have to to make progress, and otherwise to stop looking
+	 * ahead until more pins become available, so that we don't start issuing
+	 * a lot of smaller I/Os, prefering to build the largest ones we can. This
+	 * choice is debatable, but it should only really come up with the buffer
+	 * pool/connection ratio is very constrained.
+	 */
+	buffer_limit = read_stream_get_buffer_limit(stream);
+
 	while (stream->ios_in_progress < stream->max_ios &&
-		   stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
+		   stream->pinned_buffers + stream->pending_read_nblocks <
+		   Min(stream->distance, buffer_limit))
 	{
 		BlockNumber blocknum;
 		int16		buffer_index;
@@ -372,7 +428,9 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 
 		if (stream->pending_read_nblocks == io_combine_limit)
 		{
-			read_stream_start_pending_read(stream, suppress_advice);
+			read_stream_start_pending_read(stream, buffer_limit,
+										   suppress_advice);
+			buffer_limit = read_stream_get_buffer_limit(stream);
 			suppress_advice = false;
 			continue;
 		}
@@ -406,11 +464,12 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 		/* We have to start the pending read before we can build another. */
 		while (stream->pending_read_nblocks > 0)
 		{
-			read_stream_start_pending_read(stream, suppress_advice);
+			read_stream_start_pending_read(stream, buffer_limit, suppress_advice);
+			buffer_limit = read_stream_get_buffer_limit(stream);
 			suppress_advice = false;
-			if (stream->ios_in_progress == stream->max_ios)
+			if (stream->ios_in_progress == stream->max_ios || buffer_limit == 0)
 			{
-				/* And we've hit the limit.  Rewind, and stop here. */
+				/* And we've hit a limit.  Rewind, and stop here. */
 				read_stream_unget_block(stream, blocknum);
 				return;
 			}
@@ -426,16 +485,17 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
 	 * limit, preferring to give it another chance to grow to full
 	 * io_combine_limit size once more buffers have been consumed.  However,
 	 * if we've already reached io_combine_limit, or we've reached the
-	 * distance limit and there isn't anything pinned yet, or the callback has
-	 * signaled end-of-stream, we start the read immediately.
+	 * distance limit or buffer limit and there isn't anything pinned yet, or
+	 * the callback has signaled end-of-stream, we start the read immediately.
 	 */
 	if (stream->pending_read_nblocks > 0 &&
 		(stream->pending_read_nblocks == io_combine_limit ||
-		 (stream->pending_read_nblocks == stream->distance &&
+		 ((stream->pending_read_nblocks == stream->distance ||
+		   stream->pending_read_nblocks == buffer_limit) &&
 		  stream->pinned_buffers == 0) ||
 		 stream->distance == 0) &&
 		stream->ios_in_progress < stream->max_ios)
-		read_stream_start_pending_read(stream, suppress_advice);
+		read_stream_start_pending_read(stream, buffer_limit, suppress_advice);
 }
 
 /*
@@ -464,6 +524,7 @@ read_stream_begin_impl(int flags,
 	int			max_ios;
 	int			strategy_pin_limit;
 	uint32		max_pinned_buffers;
+	uint32		max_possible_buffer_limit;
 	Oid			tablespace_id;
 
 	/*
@@ -507,12 +568,23 @@ read_stream_begin_impl(int flags,
 	strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
 	max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
 
-	/* Don't allow this backend to pin more than its share of buffers. */
+	/*
+	 * Also limit by the maximum possible number of pins we could be allowed
+	 * to acquire according to bufmgr.  We may not be able to use them all due
+	 * to other pins held by this backend, but we'll enforce the dynamic limit
+	 * later when starting I/O.
+	 */
 	if (SmgrIsTemp(smgr))
-		LimitAdditionalLocalPins(&max_pinned_buffers);
+		max_possible_buffer_limit = GetSoftLocalPinLimit();
 	else
-		LimitAdditionalPins(&max_pinned_buffers);
-	Assert(max_pinned_buffers > 0);
+		max_possible_buffer_limit = GetSoftPinLimit();
+	max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit);
+
+	/*
+	 * The soft limit might be zero on a system configured with more
+	 * connections than buffers.  We need at least one.
+	 */
+	max_pinned_buffers = Max(1, max_pinned_buffers);
 
 	/*
 	 * We need one extra entry for buffers and per-buffer data, because users
@@ -572,6 +644,7 @@ read_stream_begin_impl(int flags,
 	stream->callback = callback;
 	stream->callback_private_data = callback_private_data;
 	stream->buffered_blocknum = InvalidBlockNumber;
+	stream->temporary = SmgrIsTemp(smgr);
 
 	/*
 	 * Skip the initial ramp-up phase if the caller says we're going to be
@@ -700,6 +773,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 			 * arbitrary I/O entry (they're all free).  We don't have to
 			 * adjust pinned_buffers because we're transferring one to caller
 			 * but pinning one more.
+			 *
+			 * In the fast path we don't need to check the pin limit.  We're
+			 * always allowed at least one pin so that progress can be made,
+			 * and that's all we need here.  Although two pins are momentarily
+			 * held at the same time, the model used here is that the stream
+			 * holds only one, and the other now belongs to the caller.
 			 */
 			if (likely(!StartReadBuffer(&stream->ios[0].op,
 										&stream->buffers[oldest_buffer_index],
-- 
2.48.1.76.g4e746b1a31.dirty

