From 7598d135a84b6e901645e35be41005a721d23de0 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Wed, 10 Sep 2025 14:00:02 -0400
Subject: [PATCH v2 2/3] test_aio: Add read_stream test infrastructure & tests

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/test/modules/test_aio/meson.build         |   1 +
 .../modules/test_aio/t/004_read_stream.pl     | 282 +++++++++++++++
 src/test/modules/test_aio/test_aio--1.0.sql   |  26 +-
 src/test/modules/test_aio/test_aio.c          | 342 +++++++++++++++---
 src/tools/pgindent/typedefs.list              |   1 +
 5 files changed, 601 insertions(+), 51 deletions(-)
 create mode 100644 src/test/modules/test_aio/t/004_read_stream.pl

diff --git a/src/test/modules/test_aio/meson.build b/src/test/modules/test_aio/meson.build
index 044149d02b8..d571d9da00d 100644
--- a/src/test/modules/test_aio/meson.build
+++ b/src/test/modules/test_aio/meson.build
@@ -33,6 +33,7 @@ tests += {
       't/001_aio.pl',
       't/002_io_workers.pl',
       't/003_initdb.pl',
+      't/004_read_stream.pl',
     ],
   },
 }
diff --git a/src/test/modules/test_aio/t/004_read_stream.pl b/src/test/modules/test_aio/t/004_read_stream.pl
new file mode 100644
index 00000000000..89cfabbb1d3
--- /dev/null
+++ b/src/test/modules/test_aio/t/004_read_stream.pl
@@ -0,0 +1,282 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+use FindBin;
+use lib $FindBin::RealBin;
+
+use TestAio;
+
+
+my $node = PostgreSQL::Test::Cluster->new('test');
+$node->init();
+
+$node->append_conf(
+	'postgresql.conf', qq(
+shared_preload_libraries=test_aio
+log_min_messages = 'DEBUG3'
+log_statement=all
+log_error_verbosity=default
+restart_after_crash=false
+temp_buffers=100
+max_connections=8
+io_method=worker
+));
+
+$node->start();
+test_setup($node);
+$node->stop();
+
+
+foreach my $method (TestAio::supported_io_methods())
+{
+	$node->adjust_conf('postgresql.conf', 'io_method', 'worker');
+	$node->start();
+	test_io_method($method, $node);
+	$node->stop();
+}
+
+done_testing();
+
+
+sub test_setup
+{
+	my $node = shift;
+
+	$node->safe_psql(
+		'postgres', qq(
+CREATE EXTENSION test_aio;
+
+CREATE TABLE largeish(k int not null) WITH (FILLFACTOR=10);
+INSERT INTO largeish(k) SELECT generate_series(1, 10000);
+));
+	ok(1, "setup");
+}
+
+
+sub test_repeated_blocks
+{
+	my $io_method = shift;
+	my $node = shift;
+
+	my $psql = $node->background_psql('postgres', on_error_stop => 0);
+
+	# Preventing larger reads makes testing easier
+	$psql->query_safe(
+		qq/
+SET io_combine_limit = 1;
+/);
+
+	# test miss of the same block twice in a row
+	$psql->query_safe(
+		qq/
+SELECT evict_rel('largeish');
+/);
+	$psql->query_safe(
+		qq/
+SELECT * FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 2, 4, 4]);
+/);
+	ok(1, "$io_method: stream missing the same block repeatedly");
+
+	$psql->query_safe(
+		qq/
+SELECT * FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 2, 4, 4]);
+/);
+	ok(1, "$io_method: stream hitting the same block repeatedly");
+
+	# test hit of the same block twice in a row
+	$psql->query_safe(
+		qq/
+SELECT evict_rel('largeish');
+/);
+	$psql->query_safe(
+		qq/
+SELECT * FROM read_stream_for_blocks('largeish', ARRAY[0, 1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1, 0]);
+/);
+	ok(1, "$io_method: stream accessing same block");
+
+	$psql->quit();
+}
+
+
+sub test_inject_foreign
+{
+	my $io_method = shift;
+	my $node = shift;
+
+	my $psql_a = $node->background_psql('postgres', on_error_stop => 0);
+	my $psql_b = $node->background_psql('postgres', on_error_stop => 0);
+
+	my $pid_a = $psql_a->query_safe(qq/SELECT pg_backend_pid();/);
+
+
+	###
+	# Test read stream encountering buffers undergoing IO in another backend,
+	# with the other backend's reads succeeding.
+	###
+	$psql_a->query_safe(
+		qq/
+SELECT evict_rel('largeish');
+/);
+
+	$psql_b->query_safe(
+		qq/
+SELECT inj_io_completion_wait(pid=>pg_backend_pid(), relfilenode=>pg_relation_filenode('largeish'));
+/);
+
+	$psql_b->{stdin} .= qq/
+SELECT read_rel_block_ll('largeish', blockno=>5, nblocks=>1);
+/;
+	$psql_b->{run}->pump_nb();
+
+	$node->poll_query_until(
+		'postgres', qq/
+SELECT wait_event FROM pg_stat_activity WHERE wait_event = 'completion_wait';
+/,
+		'completion_wait');
+
+	$psql_a->{stdin} .= qq/
+SELECT array_agg(blocknum) FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]);
+/;
+	$psql_a->{run}->pump_nb();
+
+	$node->poll_query_until('postgres',
+		qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a),
+		'AioIoCompletion');
+
+	$node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/);
+
+	pump_until(
+		$psql_a->{run}, $psql_a->{timeout},
+		\$psql_a->{stdout}, qr/\{0,2,5,7\}/);
+
+	ok(1,
+		qq/$io_method: read stream encounters succeeding IO by another backend/
+	);
+
+
+	###
+	# Test read stream encountering buffers undergoing IO in another backend,
+	# with the other backend's reads failing.
+	###
+	$psql_a->query_safe(
+		qq/
+SELECT evict_rel('largeish');
+/);
+
+	$psql_b->query_safe(
+		qq/
+SELECT inj_io_completion_wait(pid=>pg_backend_pid(), relfilenode=>pg_relation_filenode('largeish'));
+/);
+
+	$psql_b->query_safe(
+		qq/
+SELECT inj_io_short_read_attach(-errno_from_string('EIO'), pid=>pg_backend_pid(), relfilenode=>pg_relation_filenode('largeish'));
+/);
+
+	$psql_b->{stdin} .= qq/
+SELECT read_rel_block_ll('largeish', blockno=>5, nblocks=>1);
+/;
+	$psql_b->{run}->pump_nb();
+
+	$node->poll_query_until(
+		'postgres', qq/
+SELECT wait_event FROM pg_stat_activity WHERE wait_event = 'completion_wait';
+/,
+		'completion_wait');
+
+	$psql_a->{stdin} .= qq/
+SELECT array_agg(blocknum) FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]);
+/;
+	$psql_a->{run}->pump_nb();
+
+	$node->poll_query_until('postgres',
+		qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a),
+		'AioIoCompletion');
+
+	$node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/);
+
+	pump_until(
+		$psql_a->{run}, $psql_a->{timeout},
+		\$psql_a->{stdout}, qr/\{0,2,5,7\}/);
+
+	$psql_b->{run}->pump_nb();
+	like(
+		$psql_b->{stderr},
+		qr/.*ERROR.*could not read blocks 5..5.*$/,
+		"$io_method: injected error occurred");
+	$psql_b->{stderr} = '';
+	$psql_b->query_safe(qq/SELECT inj_io_short_read_detach();/);
+
+
+	ok(1,
+		qq/$io_method: read stream encounters failing IO by another backend/);
+
+
+	###
+	# Test read stream encountering two buffers that are undergoing the same
+	# IO, started by another backend
+	###
+	$psql_a->query_safe(
+		qq/
+SELECT evict_rel('largeish');
+/);
+
+	$psql_b->query_safe(
+		qq/
+SELECT inj_io_completion_wait(pid=>pg_backend_pid(), relfilenode=>pg_relation_filenode('largeish'));
+/);
+
+	$psql_b->{stdin} .= qq/
+SELECT read_rel_block_ll('largeish', blockno=>2, nblocks=>3);
+/;
+	$psql_b->{run}->pump_nb();
+
+	$node->poll_query_until(
+		'postgres', qq/
+SELECT wait_event FROM pg_stat_activity WHERE wait_event = 'completion_wait';
+/,
+		'completion_wait');
+
+	$psql_a->{stdin} .= qq/
+SELECT array_agg(blocknum) FROM read_stream_for_blocks('largeish', ARRAY[0, 2, 4]);
+/;
+	$psql_a->{run}->pump_nb();
+
+	$node->poll_query_until('postgres',
+		qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a),
+		'AioIoCompletion');
+
+	$node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/);
+
+	pump_until(
+		$psql_a->{run}, $psql_a->{timeout},
+		\$psql_a->{stdout}, qr/\{0,2,4\}/);
+
+	ok(1, qq/$io_method: read stream encounters two buffer read in one IO/);
+
+
+	$psql_a->quit();
+	$psql_b->quit();
+}
+
+
+sub test_io_method
+{
+	my $io_method = shift;
+	my $node = shift;
+
+	test_repeated_blocks($io_method, $node);
+
+  SKIP:
+	{
+		skip 'Injection points not supported by this build', 1
+		  unless $ENV{enable_injection_points} eq 'yes';
+		test_inject_foreign($io_method, $node);
+	}
+}
diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql
index e495481c41e..da7cc03829a 100644
--- a/src/test/modules/test_aio/test_aio--1.0.sql
+++ b/src/test/modules/test_aio/test_aio--1.0.sql
@@ -33,6 +33,10 @@ CREATE FUNCTION read_rel_block_ll(
 RETURNS pg_catalog.void STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
+CREATE FUNCTION evict_rel(rel regclass)
+RETURNS pg_catalog.void STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
 CREATE FUNCTION invalidate_rel_block(rel regclass, blockno int)
 RETURNS pg_catalog.void STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
@@ -41,7 +45,7 @@ CREATE FUNCTION buffer_create_toy(rel regclass, blockno int4)
 RETURNS pg_catalog.int4 STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
-CREATE FUNCTION buffer_call_start_io(buffer int, for_input bool, nowait bool)
+CREATE FUNCTION buffer_call_start_io(buffer int, for_input bool, nowait bool, assign_io bool DEFAULT false)
 RETURNS pg_catalog.bool STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
@@ -50,6 +54,14 @@ RETURNS pg_catalog.void STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
 
+/*
+ * Read stream related functions
+ */
+CREATE FUNCTION read_stream_for_blocks(rel regclass, blocks int4[], OUT blockoff int4, OUT blocknum int4, OUT buf int4)
+RETURNS SETOF record STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+
 
 /*
  * Handle related functions
@@ -91,8 +103,16 @@ AS 'MODULE_PATHNAME' LANGUAGE C;
 /*
  * Injection point related functions
  */
-CREATE FUNCTION inj_io_short_read_attach(result int)
-RETURNS pg_catalog.void STRICT
+CREATE FUNCTION inj_io_completion_wait(pid int DEFAULT NULL, relfilenode oid DEFAULT 0)
+RETURNS pg_catalog.void
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION inj_io_completion_continue()
+RETURNS pg_catalog.void
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION inj_io_short_read_attach(result int, pid int DEFAULT NULL, relfilenode oid DEFAULT 0)
+RETURNS pg_catalog.void
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
 CREATE FUNCTION inj_io_short_read_detach()
diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c
index c55cf6c0aac..1831e535b28 100644
--- a/src/test/modules/test_aio/test_aio.c
+++ b/src/test/modules/test_aio/test_aio.c
@@ -20,16 +20,23 @@
 
 #include "access/relation.h"
 #include "fmgr.h"
+#include "funcapi.h"
 #include "storage/aio.h"
 #include "storage/aio_internal.h"
 #include "storage/buf_internals.h"
 #include "storage/bufmgr.h"
 #include "storage/checksum.h"
+#include "storage/condition_variable.h"
 #include "storage/ipc.h"
 #include "storage/lwlock.h"
+#include "storage/proc.h"
+#include "storage/procnumber.h"
+#include "storage/read_stream.h"
+#include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/injection_point.h"
 #include "utils/rel.h"
+#include "utils/wait_event.h"
 
 
 PG_MODULE_MAGIC;
@@ -37,13 +44,30 @@ PG_MODULE_MAGIC;
 
 typedef struct InjIoErrorState
 {
+	ConditionVariable cv;
+
 	bool		enabled_short_read;
 	bool		enabled_reopen;
 
+	bool		enabled_completion_wait;
+	Oid			completion_wait_relfilenode;
+	pid_t		completion_wait_pid;
+	uint32		completion_wait_event;
+
 	bool		short_read_result_set;
+	Oid			short_read_relfilenode;
+	pid_t		short_read_pid;
 	int			short_read_result;
 } InjIoErrorState;
 
+typedef struct BlocksReadStreamData
+{
+	int			nblocks;
+	int			curblock;
+	uint32	   *blocks;
+} BlocksReadStreamData;
+
+
 static InjIoErrorState *inj_io_error_state;
 
 /* Shared memory init callbacks */
@@ -85,10 +109,13 @@ test_aio_shmem_startup(void)
 		inj_io_error_state->enabled_short_read = false;
 		inj_io_error_state->enabled_reopen = false;
 
+		ConditionVariableInit(&inj_io_error_state->cv);
+		inj_io_error_state->completion_wait_event = WaitEventInjectionPointNew("completion_wait");
+
 #ifdef USE_INJECTION_POINTS
 		InjectionPointAttach("aio-process-completion-before-shared",
 							 "test_aio",
-							 "inj_io_short_read",
+							 "inj_io_completion_hook",
 							 NULL,
 							 0);
 		InjectionPointLoad("aio-process-completion-before-shared");
@@ -378,7 +405,7 @@ read_rel_block_ll(PG_FUNCTION_ARGS)
 	if (nblocks <= 0 || nblocks > PG_IOV_MAX)
 		elog(ERROR, "nblocks is out of range");
 
-	rel = relation_open(relid, AccessExclusiveLock);
+	rel = relation_open(relid, AccessShareLock);
 
 	for (int i = 0; i < nblocks; i++)
 	{
@@ -452,6 +479,27 @@ read_rel_block_ll(PG_FUNCTION_ARGS)
 	PG_RETURN_VOID();
 }
 
+PG_FUNCTION_INFO_V1(evict_rel);
+Datum
+evict_rel(PG_FUNCTION_ARGS)
+{
+	Oid			relid = PG_GETARG_OID(0);
+	Relation	rel;
+	int32		buffers_evicted,
+				buffers_flushed,
+				buffers_skipped;
+
+	rel = relation_open(relid, AccessExclusiveLock);
+
+	EvictRelUnpinnedBuffers(rel, &buffers_evicted, &buffers_flushed,
+							&buffers_skipped);
+
+	relation_close(rel, AccessExclusiveLock);
+
+
+	PG_RETURN_VOID();
+}
+
 PG_FUNCTION_INFO_V1(invalidate_rel_block);
 Datum
 invalidate_rel_block(PG_FUNCTION_ARGS)
@@ -604,6 +652,86 @@ buffer_call_terminate_io(PG_FUNCTION_ARGS)
 	PG_RETURN_VOID();
 }
 
+
+static BlockNumber
+read_stream_for_blocks_cb(ReadStream *stream,
+						  void *callback_private_data,
+						  void *per_buffer_data)
+{
+	BlocksReadStreamData *stream_data = callback_private_data;
+
+	if (stream_data->curblock >= stream_data->nblocks)
+		return InvalidBlockNumber;
+	return stream_data->blocks[stream_data->curblock++];
+}
+
+PG_FUNCTION_INFO_V1(read_stream_for_blocks);
+Datum
+read_stream_for_blocks(PG_FUNCTION_ARGS)
+{
+	Oid			relid = PG_GETARG_OID(0);
+	ArrayType  *blocksarray = PG_GETARG_ARRAYTYPE_P(1);
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	Relation	rel;
+	BlocksReadStreamData stream_data;
+	ReadStream *stream;
+
+	InitMaterializedSRF(fcinfo, 0);
+
+	/*
+	 * We expect the input to be an N-element int4 array; verify that. We
+	 * don't need to use deconstruct_array() since the array data is just
+	 * going to look like a C array of N int4 values.
+	 */
+	if (ARR_NDIM(blocksarray) != 1 ||
+		ARR_HASNULL(blocksarray) ||
+		ARR_ELEMTYPE(blocksarray) != INT4OID)
+		elog(ERROR, "expected 1 dimensional int4 array");
+
+	stream_data.curblock = 0;
+	stream_data.nblocks = ARR_DIMS(blocksarray)[0];
+	stream_data.blocks = (uint32 *) ARR_DATA_PTR(blocksarray);
+
+	rel = relation_open(relid, AccessShareLock);
+
+	stream = read_stream_begin_relation(READ_STREAM_FULL,
+										NULL,
+										rel,
+										MAIN_FORKNUM,
+										read_stream_for_blocks_cb,
+										&stream_data,
+										0);
+
+	for (int i = 0; i < stream_data.nblocks; i++)
+	{
+		Buffer		buf = read_stream_next_buffer(stream, NULL);
+		Datum		values[3] = {0};
+		bool		nulls[3] = {0};
+
+		if (!BufferIsValid(buf))
+			elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly invalid", i);
+
+		values[0] = Int32GetDatum(i);
+		values[1] = UInt32GetDatum(stream_data.blocks[i]);
+		values[2] = UInt32GetDatum(buf);
+
+		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
+
+		ReleaseBuffer(buf);
+	}
+
+	if (read_stream_next_buffer(stream, NULL) != InvalidBuffer)
+		elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly valid",
+			 stream_data.nblocks + 1);
+
+	read_stream_end(stream);
+
+	relation_close(rel, NoLock);
+
+	return (Datum) 0;
+}
+
+
 PG_FUNCTION_INFO_V1(handle_get);
 Datum
 handle_get(PG_FUNCTION_ARGS)
@@ -674,15 +802,98 @@ batch_end(PG_FUNCTION_ARGS)
 }
 
 #ifdef USE_INJECTION_POINTS
-extern PGDLLEXPORT void inj_io_short_read(const char *name,
-										  const void *private_data,
-										  void *arg);
+extern PGDLLEXPORT void inj_io_completion_hook(const char *name,
+											   const void *private_data,
+											   void *arg);
 extern PGDLLEXPORT void inj_io_reopen(const char *name,
 									  const void *private_data,
 									  void *arg);
 
-void
-inj_io_short_read(const char *name, const void *private_data, void *arg)
+static bool
+inj_io_short_read_matches(PgAioHandle *ioh)
+{
+	PGPROC	   *owner_proc;
+	int32		owner_pid;
+	PgAioTargetData *td;
+
+	if (!inj_io_error_state->enabled_short_read)
+		return false;
+
+	if (!inj_io_error_state->short_read_result_set)
+		return false;
+
+	owner_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh));
+	owner_pid = owner_proc->pid;
+
+	if (inj_io_error_state->short_read_pid != 0 &&
+		inj_io_error_state->short_read_pid != owner_pid)
+		return false;
+
+	td = pgaio_io_get_target_data(ioh);
+
+	if (inj_io_error_state->short_read_relfilenode != InvalidOid &&
+		td->smgr.rlocator.relNumber != inj_io_error_state->short_read_relfilenode)
+		return false;
+
+	/*
+	 * Only shorten reads that are actually longer than the target size,
+	 * otherwise we can trigger over-reads.
+	 */
+	if (inj_io_error_state->short_read_result >= ioh->result)
+		return false;
+
+	return true;
+}
+
+static bool
+inj_io_completion_wait_matches(PgAioHandle *ioh)
+{
+	PGPROC	   *owner_proc;
+	int32		owner_pid;
+	PgAioTargetData *td;
+
+	if (!inj_io_error_state->enabled_completion_wait)
+		return false;
+
+	owner_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh));
+	owner_pid = owner_proc->pid;
+
+	if (inj_io_error_state->completion_wait_pid != owner_pid)
+		return false;
+
+	td = pgaio_io_get_target_data(ioh);
+
+	if (inj_io_error_state->completion_wait_relfilenode != InvalidOid &&
+		td->smgr.rlocator.relNumber != inj_io_error_state->completion_wait_relfilenode)
+		return false;
+
+	return true;
+}
+
+static void
+inj_io_completion_wait_hook(const char *name, const void *private_data, void *arg)
+{
+	PgAioHandle *ioh = (PgAioHandle *) arg;
+
+	if (!inj_io_completion_wait_matches(ioh))
+		return;
+
+	ConditionVariablePrepareToSleep(&inj_io_error_state->cv);
+
+	while (true)
+	{
+		if (!inj_io_completion_wait_matches(ioh))
+			break;
+
+		ConditionVariableSleep(&inj_io_error_state->cv,
+							   inj_io_error_state->completion_wait_event);
+	}
+
+	ConditionVariableCancelSleep();
+}
+
+static void
+inj_io_short_read_hook(const char *name, const void *private_data, void *arg)
 {
 	PgAioHandle *ioh = (PgAioHandle *) arg;
 
@@ -691,58 +902,56 @@ inj_io_short_read(const char *name, const void *private_data, void *arg)
 				   inj_io_error_state->enabled_reopen),
 			errhidestmt(true), errhidecontext(true));
 
-	if (inj_io_error_state->enabled_short_read)
+	if (inj_io_short_read_matches(ioh))
 	{
+		struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off];
+		int32		old_result = ioh->result;
+		int32		new_result = inj_io_error_state->short_read_result;
+		int32		processed = 0;
+
+		ereport(LOG,
+				errmsg("short read inject point, changing result from %d to %d",
+					   old_result, new_result),
+				errhidestmt(true), errhidecontext(true));
+
 		/*
-		 * Only shorten reads that are actually longer than the target size,
-		 * otherwise we can trigger over-reads.
+		 * The underlying IO actually completed OK, and thus the "invalid"
+		 * portion of the IOV actually contains valid data. That can hide a
+		 * lot of problems, e.g. if we were to wrongly mark a buffer, that
+		 * wasn't read according to the shortened-read, IO as valid, the
+		 * contents would look valid and we might miss a bug.
+		 *
+		 * To avoid that, iterate through the IOV and zero out the "failed"
+		 * portion of the IO.
 		 */
-		if (inj_io_error_state->short_read_result_set
-			&& ioh->op == PGAIO_OP_READV
-			&& inj_io_error_state->short_read_result <= ioh->result)
+		for (int i = 0; i < ioh->op_data.read.iov_length; i++)
 		{
-			struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off];
-			int32		old_result = ioh->result;
-			int32		new_result = inj_io_error_state->short_read_result;
-			int32		processed = 0;
-
-			ereport(LOG,
-					errmsg("short read inject point, changing result from %d to %d",
-						   old_result, new_result),
-					errhidestmt(true), errhidecontext(true));
-
-			/*
-			 * The underlying IO actually completed OK, and thus the "invalid"
-			 * portion of the IOV actually contains valid data. That can hide
-			 * a lot of problems, e.g. if we were to wrongly mark a buffer,
-			 * that wasn't read according to the shortened-read, IO as valid,
-			 * the contents would look valid and we might miss a bug.
-			 *
-			 * To avoid that, iterate through the IOV and zero out the
-			 * "failed" portion of the IO.
-			 */
-			for (int i = 0; i < ioh->op_data.read.iov_length; i++)
+			if (processed + iov[i].iov_len <= new_result)
+				processed += iov[i].iov_len;
+			else if (processed <= new_result)
 			{
-				if (processed + iov[i].iov_len <= new_result)
-					processed += iov[i].iov_len;
-				else if (processed <= new_result)
-				{
-					uint32		ok_part = new_result - processed;
+				uint32		ok_part = new_result - processed;
 
-					memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part);
-					processed += iov[i].iov_len;
-				}
-				else
-				{
-					memset((char *) iov[i].iov_base, 0, iov[i].iov_len);
-				}
+				memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part);
+				processed += iov[i].iov_len;
+			}
+			else
+			{
+				memset((char *) iov[i].iov_base, 0, iov[i].iov_len);
 			}
-
-			ioh->result = new_result;
 		}
+
+		ioh->result = new_result;
 	}
 }
 
+void
+inj_io_completion_hook(const char *name, const void *private_data, void *arg)
+{
+	inj_io_completion_wait_hook(name, private_data, arg);
+	inj_io_short_read_hook(name, private_data, arg);
+}
+
 void
 inj_io_reopen(const char *name, const void *private_data, void *arg)
 {
@@ -756,6 +965,39 @@ inj_io_reopen(const char *name, const void *private_data, void *arg)
 }
 #endif
 
+PG_FUNCTION_INFO_V1(inj_io_completion_wait);
+Datum
+inj_io_completion_wait(PG_FUNCTION_ARGS)
+{
+#ifdef USE_INJECTION_POINTS
+	inj_io_error_state->enabled_completion_wait = true;
+	inj_io_error_state->completion_wait_pid =
+		PG_ARGISNULL(0) ? 0 : PG_GETARG_INT32(0);
+	inj_io_error_state->completion_wait_relfilenode =
+		PG_ARGISNULL(1) ? InvalidOid : PG_GETARG_OID(1);
+#else
+	elog(ERROR, "injection points not supported");
+#endif
+
+	PG_RETURN_VOID();
+}
+
+PG_FUNCTION_INFO_V1(inj_io_completion_continue);
+Datum
+inj_io_completion_continue(PG_FUNCTION_ARGS)
+{
+#ifdef USE_INJECTION_POINTS
+	inj_io_error_state->enabled_completion_wait = false;
+	inj_io_error_state->completion_wait_pid = 0;
+	inj_io_error_state->completion_wait_relfilenode = InvalidOid;
+	ConditionVariableBroadcast(&inj_io_error_state->cv);
+#else
+	elog(ERROR, "injection points not supported");
+#endif
+
+	PG_RETURN_VOID();
+}
+
 PG_FUNCTION_INFO_V1(inj_io_short_read_attach);
 Datum
 inj_io_short_read_attach(PG_FUNCTION_ARGS)
@@ -765,6 +1007,10 @@ inj_io_short_read_attach(PG_FUNCTION_ARGS)
 	inj_io_error_state->short_read_result_set = !PG_ARGISNULL(0);
 	if (inj_io_error_state->short_read_result_set)
 		inj_io_error_state->short_read_result = PG_GETARG_INT32(0);
+	inj_io_error_state->short_read_pid =
+		PG_ARGISNULL(1) ? 0 : PG_GETARG_INT32(1);
+	inj_io_error_state->short_read_relfilenode =
+		PG_ARGISNULL(2) ? 0 : PG_GETARG_OID(2);
 #else
 	elog(ERROR, "injection points not supported");
 #endif
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index a13e8162890..7c99bec3e2e 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -293,6 +293,7 @@ BlockSampler
 BlockSamplerData
 BlockedProcData
 BlockedProcsData
+BlocksReadStreamData
 BlocktableEntry
 BloomBuildState
 BloomFilter
-- 
2.48.1.76.g4e746b1a31.dirty

