From 7031072c40494b151d9849160d403f719c60631c Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Fri, 27 Mar 2026 13:08:26 -0400
Subject: [PATCH v9 2/4] test_aio: Add read_stream test infrastructure & tests

While we have a lot of indirect coverage of read streams, there are corner
cases that are hard to test when only indirectly controlling and observing the
read stream.  This commit adds an SQL callable SRF interface for a read stream
and uses that in a few tests.

To make some of the tests possible, the injection point infrastructure in
test_aio had to be expanded to allow blocking IO completion.

Author: Andres Freund <andres@anarazel.de>
Reviewed-by: Nazir Bilal Yavuz <byavuz81@gmail.com>
Reviewed-by: Melanie Plageman <melanieplageman@gmail.com>
Discussion: https://postgr.es/m/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw@3p3zu522yykv
---
 src/test/modules/test_aio/meson.build         |   1 +
 .../modules/test_aio/t/004_read_stream.pl     | 275 ++++++++++++++
 src/test/modules/test_aio/test_aio--1.0.sql   |  19 +-
 src/test/modules/test_aio/test_aio.c          | 336 +++++++++++++++---
 src/tools/pgindent/typedefs.list              |   1 +
 5 files changed, 582 insertions(+), 50 deletions(-)
 create mode 100644 src/test/modules/test_aio/t/004_read_stream.pl

diff --git a/src/test/modules/test_aio/meson.build b/src/test/modules/test_aio/meson.build
index 18a797f3a3b..909f81d96c1 100644
--- a/src/test/modules/test_aio/meson.build
+++ b/src/test/modules/test_aio/meson.build
@@ -33,6 +33,7 @@ tests += {
       't/001_aio.pl',
       't/002_io_workers.pl',
       't/003_initdb.pl',
+      't/004_read_stream.pl',
     ],
   },
 }
diff --git a/src/test/modules/test_aio/t/004_read_stream.pl b/src/test/modules/test_aio/t/004_read_stream.pl
new file mode 100644
index 00000000000..0d123ac0ed5
--- /dev/null
+++ b/src/test/modules/test_aio/t/004_read_stream.pl
@@ -0,0 +1,275 @@
+# Copyright (c) 2025-2026, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+use FindBin;
+use lib $FindBin::RealBin;
+
+use TestAio;
+
+
+my $node = PostgreSQL::Test::Cluster->new('test');
+$node->init();
+
+TestAio::configure($node);
+
+$node->append_conf(
+	'postgresql.conf', qq(
+max_connections=8
+io_method=worker
+));
+
+$node->start();
+test_setup($node);
+$node->stop();
+
+
+foreach my $method (TestAio::supported_io_methods())
+{
+	$node->adjust_conf('postgresql.conf', 'io_method', $method);
+	$node->start();
+	test_io_method($method, $node);
+	$node->stop();
+}
+
+done_testing();
+
+
+sub test_setup
+{
+	my $node = shift;
+
+	$node->safe_psql(
+		'postgres', qq(
+CREATE EXTENSION test_aio;
+
+CREATE TABLE largeish(k int not null) WITH (FILLFACTOR=10);
+INSERT INTO largeish(k) SELECT generate_series(1, 10000);
+));
+	ok(1, "setup");
+}
+
+
+sub test_repeated_blocks
+{
+	my $io_method = shift;
+	my $node = shift;
+
+	my $psql = $node->background_psql('postgres', on_error_stop => 0);
+
+	# Preventing larger reads makes testing easier
+	$psql->query_safe(qq/SET io_combine_limit = 1/);
+
+	# test miss of the same block twice in a row
+	$psql->query_safe(qq/SELECT evict_rel('largeish');/);
+
+	# block 0 grows the distance enough that the stream will look ahead and try
+	# to start a pending read for block 2 (and later block 4) twice before
+	# returning any buffers.
+	$psql->query_safe(
+		qq/SELECT * FROM read_stream_for_blocks('largeish',
+		   ARRAY[0, 2, 2, 4, 4]);/);
+
+	ok(1, "$io_method: stream missing the same block repeatedly");
+
+	$psql->query_safe(
+		qq/SELECT * FROM read_stream_for_blocks('largeish',
+		   ARRAY[0, 2, 2, 4, 4]);/);
+	ok(1, "$io_method: stream hitting the same block repeatedly");
+
+	# test hit of the same block twice in a row
+	$psql->query_safe(qq/SELECT evict_rel('largeish');/);
+	$psql->query_safe(
+		qq/SELECT * FROM read_stream_for_blocks('largeish',
+		   ARRAY[0, 1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1, 0]);/);
+	ok(1, "$io_method: stream accessing same block");
+
+	# Test repeated blocks with a temp table, using invalidate_rel_block()
+	# to evict individual local buffers.
+	$psql->query_safe(
+		qq/CREATE TEMP TABLE largeish_temp(k int not null) WITH (FILLFACTOR=10);
+		   INSERT INTO largeish_temp(k) SELECT generate_series(1, 200);/);
+
+	# Evict the specific blocks we'll request to force misses
+	$psql->query_safe(qq/SELECT invalidate_rel_block('largeish_temp', 0);/);
+	$psql->query_safe(qq/SELECT invalidate_rel_block('largeish_temp', 2);/);
+	$psql->query_safe(qq/SELECT invalidate_rel_block('largeish_temp', 4);/);
+
+	$psql->query_safe(
+		qq/SELECT * FROM read_stream_for_blocks('largeish_temp',
+		   ARRAY[0, 2, 2, 4, 4]);/);
+	ok(1, "$io_method: temp stream missing the same block repeatedly");
+
+	# Now the blocks are cached, so repeated access should be hits
+	$psql->query_safe(
+		qq/SELECT * FROM read_stream_for_blocks('largeish_temp',
+		   ARRAY[0, 2, 2, 4, 4]);/);
+	ok(1, "$io_method: temp stream hitting the same block repeatedly");
+
+	$psql->quit();
+}
+
+
+sub test_inject_foreign
+{
+	my $io_method = shift;
+	my $node = shift;
+
+	my $psql_a = $node->background_psql('postgres', on_error_stop => 0);
+	my $psql_b = $node->background_psql('postgres', on_error_stop => 0);
+
+	my $pid_a = $psql_a->query_safe(qq/SELECT pg_backend_pid();/);
+
+
+	###
+	# Test read stream encountering buffers undergoing IO in another backend,
+	# with the other backend's reads succeeding.
+	###
+	$psql_a->query_safe(qq/SELECT evict_rel('largeish');/);
+
+	$psql_b->query_safe(
+		qq/SELECT inj_io_completion_wait(pid=>pg_backend_pid(),
+		   relfilenode=>pg_relation_filenode('largeish'));/);
+
+	$psql_b->{stdin} .= qq/SELECT read_rel_block_ll('largeish',
+		blockno=>5, nblocks=>1);\n/;
+	$psql_b->{run}->pump_nb();
+
+	$node->poll_query_until(
+		'postgres', qq/SELECT wait_event FROM pg_stat_activity
+			WHERE wait_event = 'completion_wait';/,
+		'completion_wait');
+
+	# Block 5 is undergoing IO in session b, so session a will move on to start
+	# a new IO for block 7.
+	$psql_a->{stdin} .= qq/SELECT array_agg(blocknum) FROM
+		read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]);\n/;
+	$psql_a->{run}->pump_nb();
+
+	$node->poll_query_until('postgres',
+		qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a),
+		'AioIoCompletion');
+
+	$node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/);
+
+	pump_until(
+		$psql_a->{run}, $psql_a->{timeout},
+		\$psql_a->{stdout}, qr/\{0,2,5,7\}/);
+
+	ok(1,
+		qq/$io_method: read stream encounters succeeding IO by another backend/
+	);
+
+	###
+	# Test read stream encountering buffers undergoing IO in another backend,
+	# with the other backend's reads failing.
+	###
+	$psql_a->query_safe(qq/SELECT evict_rel('largeish');/);
+
+	$psql_b->query_safe(
+		qq/SELECT inj_io_completion_wait(pid=>pg_backend_pid(),
+		   relfilenode=>pg_relation_filenode('largeish'));/);
+
+	$psql_b->query_safe(
+		qq/SELECT inj_io_short_read_attach(-errno_from_string('EIO'),
+		   pid=>pg_backend_pid(),
+		   relfilenode=>pg_relation_filenode('largeish'));/);
+
+	$psql_b->{stdin} .= qq/SELECT read_rel_block_ll('largeish',
+		blockno=>5, nblocks=>1);\n/;
+	$psql_b->{run}->pump_nb();
+
+	$node->poll_query_until(
+		'postgres',
+		qq/SELECT wait_event FROM pg_stat_activity
+		   WHERE wait_event = 'completion_wait';/,
+		'completion_wait');
+
+	$psql_a->{stdin} .= qq/SELECT array_agg(blocknum) FROM
+		read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]);\n/;
+	$psql_a->{run}->pump_nb();
+
+	$node->poll_query_until('postgres',
+		qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a),
+		'AioIoCompletion');
+
+	$node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/);
+
+	pump_until(
+		$psql_a->{run}, $psql_a->{timeout},
+		\$psql_a->{stdout}, qr/\{0,2,5,7\}/);
+
+	pump_until($psql_b->{run}, $psql_b->{timeout}, \$psql_b->{stderr},
+		qr/ERROR.*could not read blocks 5\.\.5/);
+	ok(1, "$io_method: injected error occurred");
+	$psql_b->{stderr} = '';
+	$psql_b->query_safe(qq/SELECT inj_io_short_read_detach();/);
+
+	ok(1,
+		qq/$io_method: read stream encounters failing IO by another backend/);
+
+
+	###
+	# Test read stream encountering two buffers that are undergoing the same
+	# IO, started by another backend.
+	###
+	$psql_a->query_safe(qq/SELECT evict_rel('largeish');/);
+
+	$psql_b->query_safe(
+		qq/SELECT inj_io_completion_wait(pid=>pg_backend_pid(),
+		   relfilenode=>pg_relation_filenode('largeish'));/);
+
+	$psql_b->{stdin} .= qq/SELECT read_rel_block_ll('largeish',
+		blockno=>2, nblocks=>3);\n/;
+	$psql_b->{run}->pump_nb();
+
+	$node->poll_query_until(
+		'postgres',
+		qq/SELECT wait_event FROM pg_stat_activity
+			WHERE wait_event = 'completion_wait';/,
+		'completion_wait');
+
+	# Blocks 2 and 4 are undergoing IO initiated by session b
+	$psql_a->{stdin} .= qq/SELECT array_agg(blocknum) FROM
+		read_stream_for_blocks('largeish', ARRAY[0, 2, 4]);\n/;
+	$psql_a->{run}->pump_nb();
+
+	$node->poll_query_until('postgres',
+		qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a),
+		'AioIoCompletion');
+
+	$node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/);
+
+	pump_until(
+		$psql_a->{run}, $psql_a->{timeout},
+		\$psql_a->{stdout}, qr/\{0,2,4\}/);
+
+	ok(1, qq/$io_method: read stream encounters two buffer read in one IO/);
+
+	$psql_a->quit();
+	$psql_b->quit();
+}
+
+
+sub test_io_method
+{
+	my $io_method = shift;
+	my $node = shift;
+
+	is($node->safe_psql('postgres', 'SHOW io_method'),
+		$io_method, "$io_method: io_method set correctly");
+
+	test_repeated_blocks($io_method, $node);
+
+  SKIP:
+	{
+		skip 'Injection points not supported by this build', 1
+		  unless $ENV{enable_injection_points} eq 'yes';
+		test_inject_foreign($io_method, $node);
+	}
+}
diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql
index 86beb563b6a..4a5a379b3c5 100644
--- a/src/test/modules/test_aio/test_aio--1.0.sql
+++ b/src/test/modules/test_aio/test_aio--1.0.sql
@@ -57,6 +57,13 @@ CREATE FUNCTION read_buffers(rel regclass, startblock int4, nblocks int4, OUT bl
 RETURNS SETOF record STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
+/*
+ * Read stream related functions
+ */
+CREATE FUNCTION read_stream_for_blocks(rel regclass, blocks int4[], OUT blockoff int4, OUT blocknum int4, OUT buf int4)
+RETURNS SETOF record STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
 
 /*
  * Handle related functions
@@ -98,8 +105,16 @@ AS 'MODULE_PATHNAME' LANGUAGE C;
 /*
  * Injection point related functions
  */
-CREATE FUNCTION inj_io_short_read_attach(result int)
-RETURNS pg_catalog.void STRICT
+CREATE FUNCTION inj_io_completion_wait(pid int DEFAULT NULL, relfilenode oid DEFAULT NULL, blockno int4 DEFAULT NULL)
+RETURNS pg_catalog.void
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION inj_io_completion_continue()
+RETURNS pg_catalog.void
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION inj_io_short_read_attach(result int, pid int DEFAULT NULL, relfilenode oid DEFAULT NULL)
+RETURNS pg_catalog.void
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
 CREATE FUNCTION inj_io_short_read_detach()
diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c
index 3e486a5806e..cb614551914 100644
--- a/src/test/modules/test_aio/test_aio.c
+++ b/src/test/modules/test_aio/test_aio.c
@@ -27,13 +27,18 @@
 #include "storage/buf_internals.h"
 #include "storage/bufmgr.h"
 #include "storage/checksum.h"
+#include "storage/condition_variable.h"
 #include "storage/ipc.h"
 #include "storage/lwlock.h"
+#include "storage/proc.h"
+#include "storage/procnumber.h"
+#include "storage/read_stream.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/injection_point.h"
 #include "utils/rel.h"
 #include "utils/tuplestore.h"
+#include "utils/wait_event.h"
 
 
 PG_MODULE_MAGIC;
@@ -41,13 +46,31 @@ PG_MODULE_MAGIC;
 
 typedef struct InjIoErrorState
 {
+	ConditionVariable cv;
+
 	bool		enabled_short_read;
 	bool		enabled_reopen;
 
+	bool		enabled_completion_wait;
+	Oid			completion_wait_relfilenode;
+	BlockNumber completion_wait_blockno;
+	pid_t		completion_wait_pid;
+	uint32		completion_wait_event;
+
 	bool		short_read_result_set;
+	Oid			short_read_relfilenode;
+	pid_t		short_read_pid;
 	int			short_read_result;
 } InjIoErrorState;
 
+typedef struct BlocksReadStreamData
+{
+	int			nblocks;
+	int			curblock;
+	uint32	   *blocks;
+} BlocksReadStreamData;
+
+
 static InjIoErrorState *inj_io_error_state;
 
 /* Shared memory init callbacks */
@@ -88,11 +111,15 @@ test_aio_shmem_startup(void)
 		/* First time through, initialize */
 		inj_io_error_state->enabled_short_read = false;
 		inj_io_error_state->enabled_reopen = false;
+		inj_io_error_state->enabled_completion_wait = false;
+
+		ConditionVariableInit(&inj_io_error_state->cv);
+		inj_io_error_state->completion_wait_event = WaitEventInjectionPointNew("completion_wait");
 
 #ifdef USE_INJECTION_POINTS
 		InjectionPointAttach("aio-process-completion-before-shared",
 							 "test_aio",
-							 "inj_io_short_read",
+							 "inj_io_completion_hook",
 							 NULL,
 							 0);
 		InjectionPointLoad("aio-process-completion-before-shared");
@@ -388,7 +415,7 @@ read_rel_block_ll(PG_FUNCTION_ARGS)
 	if (nblocks <= 0 || nblocks > PG_IOV_MAX)
 		elog(ERROR, "nblocks is out of range");
 
-	rel = relation_open(relid, AccessExclusiveLock);
+	rel = relation_open(relid, AccessShareLock);
 
 	for (int i = 0; i < nblocks; i++)
 	{
@@ -819,6 +846,85 @@ read_buffers(PG_FUNCTION_ARGS)
 }
 
 
+static BlockNumber
+read_stream_for_blocks_cb(ReadStream *stream,
+						  void *callback_private_data,
+						  void *per_buffer_data)
+{
+	BlocksReadStreamData *stream_data = callback_private_data;
+
+	if (stream_data->curblock >= stream_data->nblocks)
+		return InvalidBlockNumber;
+	return stream_data->blocks[stream_data->curblock++];
+}
+
+PG_FUNCTION_INFO_V1(read_stream_for_blocks);
+Datum
+read_stream_for_blocks(PG_FUNCTION_ARGS)
+{
+	Oid			relid = PG_GETARG_OID(0);
+	ArrayType  *blocksarray = PG_GETARG_ARRAYTYPE_P(1);
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	Relation	rel;
+	BlocksReadStreamData stream_data;
+	ReadStream *stream;
+
+	InitMaterializedSRF(fcinfo, 0);
+
+	/*
+	 * We expect the input to be an N-element int4 array; verify that. We
+	 * don't need to use deconstruct_array() since the array data is just
+	 * going to look like a C array of N int4 values.
+	 */
+	if (ARR_NDIM(blocksarray) != 1 ||
+		ARR_HASNULL(blocksarray) ||
+		ARR_ELEMTYPE(blocksarray) != INT4OID)
+		elog(ERROR, "expected 1 dimensional int4 array");
+
+	stream_data.curblock = 0;
+	stream_data.nblocks = ARR_DIMS(blocksarray)[0];
+	stream_data.blocks = (uint32 *) ARR_DATA_PTR(blocksarray);
+
+	rel = relation_open(relid, AccessShareLock);
+
+	stream = read_stream_begin_relation(READ_STREAM_FULL,
+										NULL,
+										rel,
+										MAIN_FORKNUM,
+										read_stream_for_blocks_cb,
+										&stream_data,
+										0);
+
+	for (int i = 0; i < stream_data.nblocks; i++)
+	{
+		Buffer		buf = read_stream_next_buffer(stream, NULL);
+		Datum		values[3] = {0};
+		bool		nulls[3] = {0};
+
+		if (!BufferIsValid(buf))
+			elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly invalid", i);
+
+		values[0] = Int32GetDatum(i);
+		values[1] = UInt32GetDatum(stream_data.blocks[i]);
+		values[2] = UInt32GetDatum(buf);
+
+		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
+
+		ReleaseBuffer(buf);
+	}
+
+	if (read_stream_next_buffer(stream, NULL) != InvalidBuffer)
+		elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly valid",
+			 stream_data.nblocks);
+
+	read_stream_end(stream);
+
+	relation_close(rel, NoLock);
+
+	return (Datum) 0;
+}
+
+
 PG_FUNCTION_INFO_V1(handle_get);
 Datum
 handle_get(PG_FUNCTION_ARGS)
@@ -889,15 +995,111 @@ batch_end(PG_FUNCTION_ARGS)
 }
 
 #ifdef USE_INJECTION_POINTS
-extern PGDLLEXPORT void inj_io_short_read(const char *name,
-										  const void *private_data,
-										  void *arg);
+extern PGDLLEXPORT void inj_io_completion_hook(const char *name,
+											   const void *private_data,
+											   void *arg);
 extern PGDLLEXPORT void inj_io_reopen(const char *name,
 									  const void *private_data,
 									  void *arg);
 
-void
-inj_io_short_read(const char *name, const void *private_data, void *arg)
+static bool
+inj_io_short_read_matches(PgAioHandle *ioh)
+{
+	PGPROC	   *io_proc;
+	int32		io_pid;
+	int32		inj_pid;
+	PgAioTargetData *td;
+
+	if (!inj_io_error_state->enabled_short_read)
+		return false;
+
+	if (!inj_io_error_state->short_read_result_set)
+		return false;
+
+	io_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh));
+	io_pid = io_proc->pid;
+	inj_pid = inj_io_error_state->short_read_pid;
+
+	if (inj_pid != InvalidPid && inj_pid != io_pid)
+		return false;
+
+	td = pgaio_io_get_target_data(ioh);
+
+	if (inj_io_error_state->short_read_relfilenode != InvalidOid &&
+		td->smgr.rlocator.relNumber != inj_io_error_state->short_read_relfilenode)
+		return false;
+
+	/*
+	 * Only shorten reads that are actually longer than the target size,
+	 * otherwise we can trigger over-reads.
+	 */
+	if (inj_io_error_state->short_read_result >= ioh->result)
+		return false;
+
+	return true;
+}
+
+static bool
+inj_io_completion_wait_matches(PgAioHandle *ioh)
+{
+	PGPROC	   *io_proc;
+	int32		io_pid;
+	PgAioTargetData *td;
+	int32		inj_pid;
+	BlockNumber io_blockno;
+	BlockNumber inj_blockno;
+	Oid			inj_relfilenode;
+
+	if (!inj_io_error_state->enabled_completion_wait)
+		return false;
+
+	io_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh));
+	io_pid = io_proc->pid;
+	inj_pid = inj_io_error_state->completion_wait_pid;
+
+	if (inj_pid != InvalidPid && inj_pid != io_pid)
+		return false;
+
+	td = pgaio_io_get_target_data(ioh);
+
+	inj_relfilenode = inj_io_error_state->completion_wait_relfilenode;
+	if (inj_relfilenode != InvalidOid &&
+		td->smgr.rlocator.relNumber != inj_relfilenode)
+		return false;
+
+	inj_blockno = inj_io_error_state->completion_wait_blockno;
+	io_blockno = td->smgr.blockNum;
+	if (inj_blockno != InvalidBlockNumber &&
+		!(inj_blockno >= io_blockno && inj_blockno < (io_blockno + td->smgr.nblocks)))
+		return false;
+
+	return true;
+}
+
+static void
+inj_io_completion_wait_hook(const char *name, const void *private_data, void *arg)
+{
+	PgAioHandle *ioh = (PgAioHandle *) arg;
+
+	if (!inj_io_completion_wait_matches(ioh))
+		return;
+
+	ConditionVariablePrepareToSleep(&inj_io_error_state->cv);
+
+	while (true)
+	{
+		if (!inj_io_completion_wait_matches(ioh))
+			break;
+
+		ConditionVariableSleep(&inj_io_error_state->cv,
+							   inj_io_error_state->completion_wait_event);
+	}
+
+	ConditionVariableCancelSleep();
+}
+
+static void
+inj_io_short_read_hook(const char *name, const void *private_data, void *arg)
 {
 	PgAioHandle *ioh = (PgAioHandle *) arg;
 
@@ -906,58 +1108,56 @@ inj_io_short_read(const char *name, const void *private_data, void *arg)
 				   inj_io_error_state->enabled_reopen),
 			errhidestmt(true), errhidecontext(true));
 
-	if (inj_io_error_state->enabled_short_read)
+	if (inj_io_short_read_matches(ioh))
 	{
+		struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off];
+		int32		old_result = ioh->result;
+		int32		new_result = inj_io_error_state->short_read_result;
+		int32		processed = 0;
+
+		ereport(LOG,
+				errmsg("short read inject point, changing result from %d to %d",
+					   old_result, new_result),
+				errhidestmt(true), errhidecontext(true));
+
 		/*
-		 * Only shorten reads that are actually longer than the target size,
-		 * otherwise we can trigger over-reads.
+		 * The underlying IO actually completed OK, and thus the "invalid"
+		 * portion of the IOV actually contains valid data. That can hide a
+		 * lot of problems, e.g. if we were to wrongly mark a buffer, that
+		 * wasn't read according to the shortened-read, IO as valid, the
+		 * contents would look valid and we might miss a bug.
+		 *
+		 * To avoid that, iterate through the IOV and zero out the "failed"
+		 * portion of the IO.
 		 */
-		if (inj_io_error_state->short_read_result_set
-			&& ioh->op == PGAIO_OP_READV
-			&& inj_io_error_state->short_read_result <= ioh->result)
+		for (int i = 0; i < ioh->op_data.read.iov_length; i++)
 		{
-			struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off];
-			int32		old_result = ioh->result;
-			int32		new_result = inj_io_error_state->short_read_result;
-			int32		processed = 0;
-
-			ereport(LOG,
-					errmsg("short read inject point, changing result from %d to %d",
-						   old_result, new_result),
-					errhidestmt(true), errhidecontext(true));
-
-			/*
-			 * The underlying IO actually completed OK, and thus the "invalid"
-			 * portion of the IOV actually contains valid data. That can hide
-			 * a lot of problems, e.g. if we were to wrongly mark a buffer,
-			 * that wasn't read according to the shortened-read, IO as valid,
-			 * the contents would look valid and we might miss a bug.
-			 *
-			 * To avoid that, iterate through the IOV and zero out the
-			 * "failed" portion of the IO.
-			 */
-			for (int i = 0; i < ioh->op_data.read.iov_length; i++)
+			if (processed + iov[i].iov_len <= new_result)
+				processed += iov[i].iov_len;
+			else if (processed <= new_result)
 			{
-				if (processed + iov[i].iov_len <= new_result)
-					processed += iov[i].iov_len;
-				else if (processed <= new_result)
-				{
-					uint32		ok_part = new_result - processed;
+				uint32		ok_part = new_result - processed;
 
-					memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part);
-					processed += iov[i].iov_len;
-				}
-				else
-				{
-					memset((char *) iov[i].iov_base, 0, iov[i].iov_len);
-				}
+				memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part);
+				processed += iov[i].iov_len;
+			}
+			else
+			{
+				memset((char *) iov[i].iov_base, 0, iov[i].iov_len);
 			}
-
-			ioh->result = new_result;
 		}
+
+		ioh->result = new_result;
 	}
 }
 
+void
+inj_io_completion_hook(const char *name, const void *private_data, void *arg)
+{
+	inj_io_completion_wait_hook(name, private_data, arg);
+	inj_io_short_read_hook(name, private_data, arg);
+}
+
 void
 inj_io_reopen(const char *name, const void *private_data, void *arg)
 {
@@ -971,6 +1171,42 @@ inj_io_reopen(const char *name, const void *private_data, void *arg)
 }
 #endif
 
+PG_FUNCTION_INFO_V1(inj_io_completion_wait);
+Datum
+inj_io_completion_wait(PG_FUNCTION_ARGS)
+{
+#ifdef USE_INJECTION_POINTS
+	inj_io_error_state->enabled_completion_wait = true;
+	inj_io_error_state->completion_wait_pid =
+		PG_ARGISNULL(0) ? InvalidPid : PG_GETARG_INT32(0);
+	inj_io_error_state->completion_wait_relfilenode =
+		PG_ARGISNULL(1) ? InvalidOid : PG_GETARG_OID(1);
+	inj_io_error_state->completion_wait_blockno =
+		PG_ARGISNULL(2) ? InvalidBlockNumber : PG_GETARG_UINT32(2);
+#else
+	elog(ERROR, "injection points not supported");
+#endif
+
+	PG_RETURN_VOID();
+}
+
+PG_FUNCTION_INFO_V1(inj_io_completion_continue);
+Datum
+inj_io_completion_continue(PG_FUNCTION_ARGS)
+{
+#ifdef USE_INJECTION_POINTS
+	inj_io_error_state->enabled_completion_wait = false;
+	inj_io_error_state->completion_wait_pid = InvalidPid;
+	inj_io_error_state->completion_wait_relfilenode = InvalidOid;
+	inj_io_error_state->completion_wait_blockno = InvalidBlockNumber;
+	ConditionVariableBroadcast(&inj_io_error_state->cv);
+#else
+	elog(ERROR, "injection points not supported");
+#endif
+
+	PG_RETURN_VOID();
+}
+
 PG_FUNCTION_INFO_V1(inj_io_short_read_attach);
 Datum
 inj_io_short_read_attach(PG_FUNCTION_ARGS)
@@ -980,6 +1216,10 @@ inj_io_short_read_attach(PG_FUNCTION_ARGS)
 	inj_io_error_state->short_read_result_set = !PG_ARGISNULL(0);
 	if (inj_io_error_state->short_read_result_set)
 		inj_io_error_state->short_read_result = PG_GETARG_INT32(0);
+	inj_io_error_state->short_read_pid =
+		PG_ARGISNULL(1) ? InvalidPid : PG_GETARG_INT32(1);
+	inj_io_error_state->short_read_relfilenode =
+		PG_ARGISNULL(2) ? InvalidOid : PG_GETARG_OID(2);
 #else
 	elog(ERROR, "injection points not supported");
 #endif
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 112653c1680..b2c7c9e6f7c 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -309,6 +309,7 @@ BlockSampler
 BlockSamplerData
 BlockedProcData
 BlockedProcsData
+BlocksReadStreamData
 BlocktableEntry
 BloomBuildState
 BloomFilter
-- 
2.53.0.1.gb2826b52eb

