From 80e6fb3c4e7deab9ad8dab21adb8475aea6f4177 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Fri, 3 Apr 2026 13:02:52 -0400
Subject: [PATCH v5 5/5] Allow read_stream_reset() to not wait for IO
 completion

Not waiting for IO during read_stream_reset() can be important for performance
in cases where read streams are frequently reset before the end is
reached. Current users do not commonly do that, but the upcoming work to use a
read stream to prefetch table blocks as part of index scans can do so
frequently in some query patterns. E.g. if there is an index scan on the inner
side of a nested loop antijoin.

This takes a bit of care to do right. Just introducing support for abandoning
a AIO handle could lead to the IO's completion not being processed until the
backend exits. That's bad because it would lead to resources held onto for the
IO (e.g. buffer pins) not being released and the handle showing up in pg_aios.

To avoid that, the existing resowner cleanup is changed to wait for the IO's
completion, which guarantees that by the end of the statement the IO has
completed. We might eventually want to relax that for some operations (e.g.,
for background WAL writes or opportunistic prefetching).

Discussion: https://postgr.es/m/f3xxfrkafjxpyqxywcxricxgyizjirfceychyxsgn7bwjp5eda@kwbduhy7tfmu
---
 src/include/storage/aio.h                   |   2 +
 src/include/storage/bufmgr.h                |   1 +
 src/backend/storage/aio/aio.c               |  99 +++++++++++++++---
 src/backend/storage/aio/read_stream.c       |  31 ++++--
 src/backend/storage/buffer/bufmgr.c         |  34 +++++++
 src/test/modules/test_aio/t/001_aio.pl      | 107 ++++++++++++++++++++
 src/test/modules/test_aio/test_aio--1.0.sql |   2 +-
 src/test/modules/test_aio/test_aio.c        |  38 +++++--
 8 files changed, 282 insertions(+), 32 deletions(-)

diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h
index ec543b78409..ab7fad130bd 100644
--- a/src/include/storage/aio.h
+++ b/src/include/storage/aio.h
@@ -328,6 +328,8 @@ extern int	pgaio_wref_get_id(PgAioWaitRef *iow);
 extern void pgaio_wref_wait(PgAioWaitRef *iow);
 extern bool pgaio_wref_check_done(PgAioWaitRef *iow);
 
+extern void pgaio_wref_abandon(PgAioWaitRef *iow);
+
 
 
 /* --------------------------------------------------------------------------------
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index aa61a39d9e6..005d09905e3 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -252,6 +252,7 @@ extern bool StartReadBuffers(ReadBuffersOperation *operation,
 							 int *nblocks,
 							 int flags);
 extern bool WaitReadBuffers(ReadBuffersOperation *operation);
+extern void AbandonReadBuffers(ReadBuffersOperation *operation);
 
 extern void ReleaseBuffer(Buffer buffer);
 extern void UnlockReleaseBuffer(Buffer buffer);
diff --git a/src/backend/storage/aio/aio.c b/src/backend/storage/aio/aio.c
index 8f7e26607b9..11ab9b9acff 100644
--- a/src/backend/storage/aio/aio.c
+++ b/src/backend/storage/aio/aio.c
@@ -152,11 +152,15 @@ const IoMethodOps *pgaio_method_ops;
  * operation succeeded and details about the first failure, if any. The error
  * can be raised / logged with pgaio_result_report().
  *
- * The lifetime of the memory pointed to be *ret needs to be at least as long
- * as the passed in resowner. If the resowner releases resources before the IO
- * completes (typically due to an error), the reference to *ret will be
- * cleared. In case of resowner cleanup *ret will not be updated with the
- * results of the IO operation.
+ * The lifetime of the memory pointed to by *ret needs to be at least as long
+ * as the passed in resowner.
+ *
+ * If the resowner releases resources before the IO completes (typically due
+ * to an error), the reference to *ret will be cleared. In case of resowner
+ * cleanup *ret will not be updated with the results of the IO operation.
+ *
+ * If the caller loses interest in the IO before completion
+ * pgaio_wref_abandon() can be used.
  */
 PgAioHandle *
 pgaio_io_acquire(struct ResourceOwnerData *resowner, PgAioReturn *ret)
@@ -278,6 +282,14 @@ pgaio_io_release_resowner(dlist_node *ioh_node, bool on_error)
 	ResourceOwnerForgetAioHandle(ioh->resowner, &ioh->resowner_node);
 	ioh->resowner = NULL;
 
+	/*
+	 * Need to unregister the reporting of the IO's result, the memory it's
+	 * referencing likely has gone away. Do so before potentially waiting
+	 * below, as that could cause the undesired writes.
+	 */
+	if (ioh->report_return)
+		ioh->report_return = NULL;
+
 	switch ((PgAioHandleState) ioh->state)
 	{
 		case PGAIO_HS_IDLE:
@@ -300,22 +312,32 @@ pgaio_io_release_resowner(dlist_node *ioh_node, bool on_error)
 			if (!on_error)
 				elog(WARNING, "AIO handle was not submitted");
 			pgaio_submit_staged();
-			break;
+
+			/* now that the IO is submitted, need to wait */
+			pg_fallthrough;
 		case PGAIO_HS_SUBMITTED:
 		case PGAIO_HS_COMPLETED_IO:
 		case PGAIO_HS_COMPLETED_SHARED:
 		case PGAIO_HS_COMPLETED_LOCAL:
-			/* this is expected to happen */
+
+			/*
+			 * This is expected to happen, e.g. after an error or after
+			 * pgaio_wref_abandon() was called.
+			 *
+			 * For now always wait for the IO's completion during resowner
+			 * cleanup. This provides a bound on how long after an error or
+			 * pgaio_wref_abandon() an IO handle will show up as used and how
+			 * long an uncompleted IO can cause resources to be retained.
+			 *
+			 * It is quite possible that we eventually want to support IO
+			 * operations that last longer, e.g. for WAL writes in the
+			 * background. If so we will either need to use a longer lived
+			 * resowner or add a flag controlling when this cleanup happens.
+			 */
+			pgaio_io_wait(ioh, ioh->generation);
 			break;
 	}
 
-	/*
-	 * Need to unregister the reporting of the IO's result, the memory it's
-	 * referencing likely has gone away.
-	 */
-	if (ioh->report_return)
-		ioh->report_return = NULL;
-
 	RESUME_INTERRUPTS();
 }
 
@@ -1050,6 +1072,55 @@ pgaio_wref_check_done(PgAioWaitRef *iow)
 	return false;
 }
 
+/*
+ * Declare that a wait reference to an IO, started by this backend, is not of
+ * interest to this backend anymore. Once called, the PgAioReturn *ret passed
+ * to pgaio_io_acquire[_nb]() will not be updated anymore and thus can be
+ * freed.
+ */
+void
+pgaio_wref_abandon(PgAioWaitRef *iow)
+{
+	uint64		ref_generation;
+	bool		am_owner;
+	PgAioHandle *ioh;
+	PgAioHandleState state;
+
+	ioh = pgaio_io_from_wref(iow, &ref_generation);
+
+	am_owner = ioh->owner_procno == MyProcNumber;
+
+	/*
+	 * To ensure that the IO won't be recycled while we check (e.g. during the
+	 * emission of a debug message).
+	 */
+	HOLD_INTERRUPTS();
+
+	/*
+	 * It is safe to perform this check before checking if the IO was recycled
+	 * as the owner of an IO cannot change.
+	 */
+	if (!am_owner)
+		elog(ERROR, "only IOs owned by current backend can be abandoned");
+
+	if (!pgaio_io_was_recycled(ioh, ref_generation, &state))
+	{
+		pgaio_debug_io(DEBUG3, ioh,
+					   "discarding result %p, resowner: %p",
+					   ioh->report_return, ioh->resowner);
+
+		/*
+		 * All we need to do to abandon the IO is to clear its report_return
+		 * field. Without that we could end up writing to freed/reused memory
+		 * when the IO completes.
+		 */
+		if (ioh->report_return)
+			ioh->report_return = NULL;
+	}
+
+	RESUME_INTERRUPTS();
+}
+
 
 
 /* --------------------------------------------------------------------------------
diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 2adc04601b6..5e70a4f0ced 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -109,7 +109,8 @@ struct ReadStream
 	 * for IO combining even in cases where the IO subsystem can keep up at a
 	 * low read-ahead distance, as doing larger IOs is more efficient.
 	 *
-	 * Set to 0 when the end of the stream is reached.
+	 * Set to 0 when the end of the stream is reached and to -1 when the
+	 * stream is reset.
 	 */
 	int16		combine_distance;
 	int16		readahead_distance;
@@ -473,8 +474,8 @@ read_stream_start_pending_read(ReadStream *stream)
 static inline bool
 read_stream_should_look_ahead(ReadStream *stream)
 {
-	/* If the callback has signaled end-of-stream, we're done */
-	if (stream->readahead_distance == 0)
+	/* If we reached end-of-stream or a reset, we're done */
+	if (stream->readahead_distance <= 0)
 		return false;
 
 	/* never start more IOs than our cap */
@@ -538,7 +539,7 @@ read_stream_should_issue_now(ReadStream *stream)
 	 * If the callback has signaled end-of-stream, start the pending read
 	 * immediately. There is no further potential for IO combining.
 	 */
-	if (stream->readahead_distance == 0)
+	if (stream->readahead_distance <= 0)
 		return true;
 
 	/*
@@ -646,7 +647,7 @@ read_stream_look_ahead(ReadStream *stream)
 	 * stream.  In the worst case we can always make progress one buffer at a
 	 * time.
 	 */
-	Assert(stream->pinned_buffers > 0 || stream->readahead_distance == 0);
+	Assert(stream->pinned_buffers > 0 || stream->readahead_distance <= 0);
 
 	if (stream->batch_mode)
 		pgaio_exit_batchmode();
@@ -1035,8 +1036,8 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 	{
 		Assert(stream->oldest_buffer_index == stream->next_buffer_index);
 
-		/* End of stream reached?  */
-		if (stream->readahead_distance == 0)
+		/* End of stream / reset reached?  */
+		if (stream->readahead_distance <= 0)
 			return InvalidBuffer;
 
 		/*
@@ -1077,7 +1078,17 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
 		Assert(stream->ios[io_index].op.buffers ==
 			   &stream->buffers[oldest_buffer_index]);
 
-		needed_wait = WaitReadBuffers(&stream->ios[io_index].op);
+		/*
+		 * If the stream has been reset, don't even wait for the IO, just
+		 * abandon it.
+		 */
+		if (stream->readahead_distance < 0)
+		{
+			AbandonReadBuffers(&stream->ios[io_index].op);
+			needed_wait = false;
+		}
+		else
+			needed_wait = WaitReadBuffers(&stream->ios[io_index].op);
 
 		Assert(stream->ios_in_progress > 0);
 		stream->ios_in_progress--;
@@ -1303,8 +1314,8 @@ read_stream_reset(ReadStream *stream)
 	Buffer		buffer;
 
 	/* Stop looking ahead. */
-	stream->readahead_distance = 0;
-	stream->combine_distance = 0;
+	stream->readahead_distance = -1;
+	stream->combine_distance = -1;
 
 	/* Forget buffered block number and fast path state. */
 	stream->buffered_blocknum = InvalidBlockNumber;
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 5c64570020d..3167df4b06e 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -1604,6 +1604,10 @@ StartReadBuffersImpl(ReadBuffersOperation *operation,
  * buffers must remain valid until WaitReadBuffers() is called, and any
  * forwarded buffers must also be preserved for a continuing call unless
  * they are explicitly released.
+ *
+ * If true was returned, the memory underlying the ReadBuffersOperation needs
+ * to stay around until either WaitReadBuffers() or AbandonReadBuffers() is
+ * called (or an error is thrown).
  */
 bool
 StartReadBuffers(ReadBuffersOperation *operation,
@@ -2165,6 +2169,36 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 	return true;
 }
 
+/*
+ * Declare that this backend is not interested in the operation anymore. This
+ * needs to be called if StartReadBuffers() returned true and the
+ * ReadBuffersOperation is to be freed without calling WaitReadBuffers()
+ * (leaving errors aside).
+ *
+ * It is the caller's responsibility to release buffer pins (seems simpler
+ * that way, as that already is required if no IO had been necessary).
+ */
+void
+AbandonReadBuffers(ReadBuffersOperation *operation)
+{
+	PgAioWaitRef io_wref = operation->io_wref;
+
+	/* see equivalent WaitReadBuffers() check */
+	if (!pgaio_wref_valid(&io_wref) && io_method != IOMETHOD_SYNC)
+		elog(ERROR, "abandoning read operation that didn't read");
+
+	if (!pgaio_wref_valid(&io_wref))
+		return;
+
+	pgaio_wref_clear(&operation->io_wref);
+
+	/* can't abandon foreign IOs (nor do we need to) */
+	if (operation->foreign_io)
+		operation->foreign_io = false;
+	else
+		pgaio_wref_abandon(&io_wref);
+}
+
 /*
  * BufferAlloc -- subroutine for PinBufferForBlock.  Handles lookup of a shared
  *		buffer.  If no buffer exists already, selects a replacement victim and
diff --git a/src/test/modules/test_aio/t/001_aio.pl b/src/test/modules/test_aio/t/001_aio.pl
index 63cadd64c15..1fbca2b5955 100644
--- a/src/test/modules/test_aio/t/001_aio.pl
+++ b/src/test/modules/test_aio/t/001_aio.pl
@@ -15,6 +15,10 @@ use TestAio;
 my @methods = TestAio::supported_io_methods();
 my %nodes;
 
+# Putting it inline makes perltidy do ugly things
+my $count_my_aios_query =
+  'SELECT count(*) FROM pg_aios WHERE pid = pg_backend_pid()';
+
 
 ###
 # Create and configure one instance for each io_method
@@ -1616,6 +1620,61 @@ INSERT INTO tmp_ok SELECT generate_series(1, 5000);
 			qq|SELECT blockoff, blocknum, io_reqd and not foreign_io, nblocks FROM read_buffers('$table', 1, 3)|,
 			qr/^0\|1\|t\|2\n2\|3\|f\|1$/,
 			qr/^$/);
+
+
+		###
+		# Test that abandoning IO works and that it does not cause issues.
+		###
+
+		# Test that even after abandoning IO we do wait for the IOs at the end
+		# of the statement.
+		$psql_a->query_safe(qq|SET io_combine_limit=2|);
+		$psql_a->query_safe(qq|SELECT evict_rel('$table')|);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers & abandon it",
+			qq|SELECT abandoned, count(*) FROM read_buffers('$table', 0, 6, abandon_after => 2) GROUP BY 1 ORDER BY 1|,
+			qr/^f\|1\nt\|2$/,
+			qr/^$/);
+		# Due to the end-of-statement wait there should be no IOs anymore
+		is($psql_a->query_safe($count_my_aios_query), 0,
+			"$io_method: $persistency: abandoned IO completed by end of statement"
+		);
+
+		# Test that after abandoning IO buffer access still works.
+		#
+		# First test that by just issuing another read_buffers() in the same
+		# statement (so that the abandoned IOs aren't waited-for during
+		# end-of-statement resowner handling).
+		$psql_a->query_safe(qq|SELECT evict_rel('$table')|);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers & abandon it & read again",
+			qq|
+			   SELECT SUM(nblocks) FROM read_buffers('$table', 0, 4, abandon_after => 2)
+			   UNION ALL
+			   SELECT SUM(nblocks) FROM read_buffers('$table', 0, 4)
+			   |,
+			qr/^4\n4$/,
+			qr/^$/);
+
+		# Now test that a plain SELECT needing buffers affected by abandoned
+		# IO work
+		$psql_a->query_safe(qq|SELECT evict_rel('$table')|);
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers & abandon it & read again via SELECT",
+			qq|
+			   SELECT SUM(nblocks) FROM read_buffers('$table', 0, 4, abandon_after => 2)
+			   UNION ALL
+			   SELECT SUM(data) FROM $table WHERE data < 1000
+			   |,
+			qr/^4\n499500$/,
+			qr/^$/);
+		$psql_a->query_safe(qq|RESET io_combine_limit|);
 	}
 
 	# The remaining tests don't make sense for temp tables, as they are
@@ -1837,6 +1896,54 @@ read_buffers('$table', 0, 4)|,
 	$psql_a->{stdout} = '';
 
 
+	###
+	# Test that abandoning IO actually avoids waiting for IO.
+	###
+
+	# Testing not waiting only works if the IO method doesn't execute IO
+	# synchronously, which is why we fundamentally can't test with
+	# io_method=sync. Furthermore we need to work around io_method=io_uring
+	# potentially executing the IO synchronously - we can do so by making the
+	# IOs big enough (c.f. pgaio_uring_should_use_async()).
+	#
+	# To make the test reliable we have to abandon all IOs, as waiting for
+	# some IOs could lead to also consuming the completion of the IO that will
+	# trigger a wait in the completion.
+	if ($io_method ne 'sync')
+	{
+		$psql_a->query_safe(qq|SELECT evict_rel('$table')|);
+		$psql_a->query_safe(qq|SET io_combine_limit=5|);
+		$psql_b->query_safe(
+			qq/SELECT inj_io_completion_wait(
+		   relfilenode=>pg_relation_filenode('$table'),
+		   blockno=>4);/);
+		ok(1,
+			"$io_method: $persistency: configure wait in completion of block 4"
+		);
+
+		# Need to end the wait in the completion before the statement is over,
+		# otherwise we'll wait during resowner cleanup at the end of the
+		# statement.
+		psql_like(
+			$io_method,
+			$psql_a,
+			"$persistency: read buffers abandoning blocked IO avoids wait",
+			qq|
+			   SELECT count(*) > 0 FROM read_buffers('$table', 0, 10, abandon_after => 0) WHERE abandoned
+			   UNION ALL
+			   SELECT count(*) > 0 FROM pg_aios WHERE pid = pg_backend_pid()
+			   UNION ALL
+			   SELECT inj_io_completion_continue() IS NOT NULL
+			|,
+			qr/^t\nt\nt$/,
+			qr/^$/);
+		is($psql_a->query_safe($count_my_aios_query), 0,
+			"$io_method: $persistency: abandoned IO is completed at end of statement"
+		);
+
+		$psql_a->query_safe(qq|RESET io_combine_limit|);
+	}
+
 	$psql_a->quit();
 	$psql_b->quit();
 	$psql_c->quit();
diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql
index 762ac29512f..ff3b200cff3 100644
--- a/src/test/modules/test_aio/test_aio--1.0.sql
+++ b/src/test/modules/test_aio/test_aio--1.0.sql
@@ -53,7 +53,7 @@ CREATE FUNCTION buffer_call_terminate_io(buffer int, for_input bool, succeed boo
 RETURNS pg_catalog.void STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
-CREATE FUNCTION read_buffers(rel regclass, startblock int4, nblocks int4, OUT blockoff int4, OUT blocknum int4, OUT io_reqd bool, OUT foreign_io bool, OUT nblocks int4, OUT buf int4[])
+CREATE FUNCTION read_buffers(rel regclass, startblock int4, nblocks int4, abandon_after int4 DEFAULT -1, OUT blockoff int4, OUT blocknum int4, OUT io_reqd bool, OUT foreign_io bool, OUT abandoned bool, OUT nblocks int4, OUT buf int4[])
 RETURNS SETOF record STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c
index d7530681192..c9a8b21b972 100644
--- a/src/test/modules/test_aio/test_aio.c
+++ b/src/test/modules/test_aio/test_aio.c
@@ -709,11 +709,13 @@ read_buffers(PG_FUNCTION_ARGS)
 	Oid			relid = PG_GETARG_OID(0);
 	BlockNumber startblock = PG_GETARG_UINT32(1);
 	int32		nblocks = PG_GETARG_INT32(2);
+	int32		abandon_after = PG_GETARG_INT32(3);
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	Relation	rel;
 	SMgrRelation smgr;
 	int			nblocks_done = 0;
 	int			nblocks_disp = 0;
+	int			nblocks_wait = 0;
 	int			nios = 0;
 	ReadBuffersOperation *operations;
 	Buffer	   *buffers;
@@ -735,6 +737,9 @@ read_buffers(PG_FUNCTION_ARGS)
 	rel = relation_open(relid, AccessShareLock);
 	smgr = RelationGetSmgr(rel);
 
+	if (abandon_after < 0)
+		abandon_after = nblocks;
+
 	/*
 	 * Do StartReadBuffers() until IO for all the required blocks has been
 	 * started (if required).
@@ -769,9 +774,17 @@ read_buffers(PG_FUNCTION_ARGS)
 	for (int nio = 0; nio < nios; nio++)
 	{
 		ReadBuffersOperation *operation = &operations[nio];
+		int			nblocks_this_io = nblocks_per_io[nio];
 
 		if (io_reqds[nio])
-			WaitReadBuffers(operation);
+		{
+			if (nblocks_wait < abandon_after)
+				WaitReadBuffers(operation);
+			else
+				AbandonReadBuffers(operation);
+		}
+
+		nblocks_wait += nblocks_this_io;
 	}
 
 	/*
@@ -781,8 +794,8 @@ read_buffers(PG_FUNCTION_ARGS)
 	{
 		ReadBuffersOperation *operation = &operations[nio];
 		int			nblocks_this_io = nblocks_per_io[nio];
-		Datum		values[6] = {0};
-		bool		nulls[6] = {0};
+		Datum		values[7] = {0};
+		bool		nulls[7] = {0};
 		ArrayType  *buffers_arr;
 
 		/* convert buffer array to datum array */
@@ -815,14 +828,18 @@ read_buffers(PG_FUNCTION_ARGS)
 		values[3] = BoolGetDatum(io_reqds[nio] ? operation->foreign_io : false);
 		nulls[3] = false;
 
-		/* nblocks */
-		values[4] = Int32GetDatum(nblocks_this_io);
+		/* abandoned */
+		values[4] = BoolGetDatum(nblocks_disp >= abandon_after);
 		nulls[4] = false;
 
-		/* array of buffers */
-		values[5] = PointerGetDatum(buffers_arr);
+		/* nblocks */
+		values[5] = Int32GetDatum(nblocks_this_io);
 		nulls[5] = false;
 
+		/* array of buffers */
+		values[6] = PointerGetDatum(buffers_arr);
+		nulls[6] = false;
+
 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
 
 		nblocks_disp += nblocks_this_io;
@@ -1075,6 +1092,13 @@ inj_io_completion_wait_matches(PgAioHandle *ioh)
 		!(inj_blockno >= io_blockno && inj_blockno < (io_blockno + td->smgr.nblocks)))
 		return false;
 
+	ereport(LOG,
+			errmsg("wait injection point matches for IO %d, inj blockno %d, io blockno %d, io nblocks %d",
+				   pgaio_io_get_id(ioh),
+				   inj_blockno, io_blockno, td->smgr.nblocks
+				   ),
+			errhidestmt(true), errhidecontext(true));
+
 	return true;
 }
 
-- 
2.53.0.1.gb2826b52eb

