From 6ecfe2226c9068a82b7c54094db55354960a70bb Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sat, 11 Apr 2026 17:31:13 +1200
Subject: [PATCH] contrib/io_limit: Simulation of slow storage.

Only affects IOs submitted to io_method=worker.  Configured as:

  shared_preload_libraries=io_limit

  io_limit.ios_per_second=1000
  io_limit.read_per_second=200MB
  io_limit_write_per_second=100MB

Zero means no limit.

XXX Experimental hack
---
 contrib/Makefile                        |   1 +
 contrib/io_limit/Makefile               |  20 ++
 contrib/io_limit/io_limit.c             | 275 ++++++++++++++++++++++++
 contrib/io_limit/io_limit.control       |   5 +
 contrib/io_limit/meson.build            |  28 +++
 contrib/meson.build                     |   1 +
 src/backend/storage/aio/method_worker.c |  13 ++
 src/include/storage/io_worker.h         |   5 +
 8 files changed, 348 insertions(+)
 create mode 100644 contrib/io_limit/Makefile
 create mode 100644 contrib/io_limit/io_limit.c
 create mode 100644 contrib/io_limit/io_limit.control
 create mode 100644 contrib/io_limit/meson.build

diff --git a/contrib/Makefile b/contrib/Makefile
index 7d91fe77db3..48e82c53333 100644
--- a/contrib/Makefile
+++ b/contrib/Makefile
@@ -24,6 +24,7 @@ SUBDIRS = \
 		hstore		\
 		intagg		\
 		intarray	\
+		io_limit	\
 		isn		\
 		lo		\
 		ltree		\
diff --git a/contrib/io_limit/Makefile b/contrib/io_limit/Makefile
new file mode 100644
index 00000000000..da176698a17
--- /dev/null
+++ b/contrib/io_limit/Makefile
@@ -0,0 +1,20 @@
+# contrib/io_limit/Makefile
+
+MODULE_big = io_limit
+OBJS = \
+	$(WIN32RES) \
+	io_limit.o
+
+EXTENSION = io_limit
+PGFILEDESC = "io_limit - io_limit - artificially limit asynchronous I/O for tesing"
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/pg_prewarm
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/io_limit/io_limit.c b/contrib/io_limit/io_limit.c
new file mode 100644
index 00000000000..fa2ec6f1ff2
--- /dev/null
+++ b/contrib/io_limit/io_limit.c
@@ -0,0 +1,275 @@
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "port/atomics.h"
+#include "portability/instr_time.h"
+#include "storage/aio_internal.h"
+#include "storage/io_worker.h"
+#include "storage/lwlock.h"
+#include "storage/shmem.h"
+#include "utils/guc.h"
+
+/* GUCs. */
+static int	io_limit_ios_per_second = 0;
+static int	io_limit_read_per_second = 0;
+static int	io_limit_write_per_second = 0;
+
+typedef struct io_limit_control_data
+{
+	/* Whether any GUC is set to a non-zero value. */
+	bool		enabled;
+
+	/* Absolute time to wait until. */
+	pg_atomic_uint64 op_next_ns;
+	pg_atomic_uint64 read_next_ns;
+	pg_atomic_uint64 write_next_ns;
+
+	/* Limits expressed as delay intervals. */
+	LWLock		lock;
+	int			op_ns;
+	int			read_block_ns;
+	int			write_block_ns;
+}			io_limit_control_data;
+
+static io_limit_control_data * io_limit_control;
+
+static void io_limit_shmem_request(void *arg);
+static void io_limit_shmem_init(void *arg);
+
+static void assign_io_limit_ios_per_second(int newval, void *extra);
+static void assign_io_limit_read_per_second(int newval, void *extra);
+static void assign_io_limit_write_per_second(int newval, void *extra);
+static const char *show_io_limit_ios_per_second(void);
+static const char *show_io_limit_read_per_second(void);
+static const char *show_io_limit_write_per_second(void);
+
+static void io_limit_on_perform(PgAioHandle *ioh);
+
+static const ShmemCallbacks io_limit_shmem_callbacks = {
+	.request_fn = io_limit_shmem_request,
+	.init_fn = io_limit_shmem_init,
+};
+
+PG_MODULE_MAGIC_EXT(
+					.name = "io_limit",
+					.version = PG_VERSION
+);
+
+void
+_PG_init(void)
+{
+	/* Bail out if not configured in shared_preload_libraries. */
+	if (!process_shared_preload_libraries_in_progress)
+		return;
+
+	DefineCustomIntVariable("io_limit.ios_per_second",
+							"Limits IOs per second.",
+							"If set to zero, there is no limit.",
+							&io_limit_ios_per_second,
+							0,
+							0, INT_MAX,
+							PGC_USERSET,
+							0,
+							NULL,
+							assign_io_limit_ios_per_second,
+							show_io_limit_ios_per_second);
+	DefineCustomIntVariable("io_limit.read_per_second",
+							"Limits read bandwidth.",
+							"If set to zero, there is no limit.",
+							&io_limit_read_per_second,
+							0,
+							0, INT_MAX,
+							PGC_USERSET,
+							GUC_UNIT_BLOCKS,
+							NULL,
+							assign_io_limit_read_per_second,
+							show_io_limit_read_per_second);
+	DefineCustomIntVariable("io_limit.write_per_second",
+							"Limits write bandwidth.",
+							"If set to zero, there is no limit.",
+							&io_limit_write_per_second,
+							0,
+							0, INT_MAX,
+							PGC_USERSET,
+							GUC_UNIT_BLOCKS,
+							NULL,
+							assign_io_limit_write_per_second,
+							show_io_limit_write_per_second);
+
+	MarkGUCPrefixReserved("io_limit");
+	RegisterShmemCallbacks(&io_limit_shmem_callbacks);
+	pgaio_worker_set_on_perform_hook(io_limit_on_perform);
+}
+
+static void
+io_limit_shmem_request(void *arg)
+{
+	ShmemRequestStruct(.name = "io_limit",
+					   .size = sizeof(io_limit_control_data),
+					   .ptr = (void **) &io_limit_control);
+}
+
+static void
+io_limit_shmem_init(void *arg)
+{
+	memset(io_limit_control, 0, sizeof(*io_limit_control));
+	pg_atomic_init_u64(&io_limit_control->op_next_ns, 0);
+	pg_atomic_init_u64(&io_limit_control->read_next_ns, 0);
+	pg_atomic_init_u64(&io_limit_control->write_next_ns, 0);
+	LWLockInitialize(&io_limit_control->lock, LWLockNewTrancheId("io_limit"));
+
+	/* Assign initial values. */
+	assign_io_limit_ios_per_second(io_limit_ios_per_second, NULL);
+	assign_io_limit_read_per_second(io_limit_read_per_second, NULL);
+	assign_io_limit_write_per_second(io_limit_write_per_second, NULL);
+}
+
+static void
+assign_io_limit(int *wait_ns, int per_second)
+{
+	/* Ignore call from _PG_init() before ready. */
+	if (!io_limit_control)
+		return;
+
+	LWLockAcquire(&io_limit_control->lock, LW_EXCLUSIVE);
+	*wait_ns = per_second == 0 ? 0 : NS_PER_S / per_second;
+	io_limit_control->enabled =
+		io_limit_control->op_ns > 0 ||
+		io_limit_control->read_block_ns > 0 ||
+		io_limit_control->write_block_ns > 0;
+	LWLockRelease(&io_limit_control->lock);
+}
+
+static void
+assign_io_limit_ios_per_second(int newval, void *extra)
+{
+	assign_io_limit(&io_limit_control->op_ns, newval);
+}
+
+static void
+assign_io_limit_read_per_second(int newval, void *extra)
+{
+	assign_io_limit(&io_limit_control->read_block_ns, newval);
+}
+
+static void
+assign_io_limit_write_per_second(int newval, void *extra)
+{
+	assign_io_limit(&io_limit_control->write_block_ns, newval);
+}
+
+static const char *
+show_io_limit(const int *wait_ns)
+{
+	int			per_second;
+
+	LWLockAcquire(&io_limit_control->lock, LW_SHARED);
+	per_second = *wait_ns == 0 ? 0 : NS_PER_S / *wait_ns;
+	LWLockRelease(&io_limit_control->lock);
+
+	return psprintf("%d", per_second);
+}
+
+static const char *
+show_io_limit_ios_per_second(void)
+{
+	return show_io_limit(&io_limit_control->op_ns);
+}
+
+static const char *
+show_io_limit_read_per_second(void)
+{
+	return show_io_limit(&io_limit_control->read_block_ns);
+}
+
+static const char *
+show_io_limit_write_per_second(void)
+{
+	return show_io_limit(&io_limit_control->write_block_ns);
+}
+
+static BlockNumber
+io_limit_get_block_count(PgAioHandle *ioh)
+{
+	if (ioh->op == PGAIO_OP_READV ||
+		ioh->op == PGAIO_OP_WRITEV)
+	{
+		struct iovec *iov;
+		size_t		size;
+		int			iovcnt;
+
+		size = 0;
+		iovcnt = pgaio_io_get_iovec_length(ioh, &iov);
+		for (int i = 0; i < iovcnt; ++i)
+			size += iov[i].iov_len;
+
+		return size / BLCKSZ;
+	}
+
+	return 0;
+}
+
+/*
+ * Wait until *next_ns_p and advance *next_ns_p by delay_ns.
+ */
+static void
+io_limit_wait(pg_atomic_uint64 *next_ns_p, int delay_ns)
+{
+	instr_time	now;
+	uint64		now_ns;
+	uint64		next_ns;
+
+	INSTR_TIME_SET_CURRENT(now);
+	now_ns = INSTR_TIME_GET_NANOSEC(now);
+	next_ns = pg_atomic_read_u64(next_ns_p);
+
+	for (;;)
+	{
+		if (next_ns > now_ns)
+		{
+			/* Need to wait.  Delay the next op further. */
+			next_ns = pg_atomic_fetch_add_u64(next_ns_p, delay_ns);
+
+			/* Average rate maintained even with low-res sleep or EINTR. */
+			pg_usleep(((next_ns - now_ns) + 999) / 1000);
+			break;
+		}
+		else
+		{
+			/* Don't need to wait.  New next_ns is relative to now. */
+			if (pg_atomic_compare_exchange_u64(next_ns_p,
+											   &next_ns,
+											   now_ns + delay_ns))
+				break;
+		}
+	}
+}
+
+static void
+io_limit_on_perform(PgAioHandle *ioh)
+{
+	int			op_ns;
+	int			read_block_ns;
+	int			write_block_ns;
+
+	if (!io_limit_control->enabled)
+		return;
+
+	op_ns = io_limit_control->op_ns;
+	if (op_ns)
+		io_limit_wait(&io_limit_control->op_next_ns, op_ns);
+
+	if (ioh->op == PGAIO_OP_READV)
+	{
+		read_block_ns = io_limit_control->read_block_ns;
+		if (read_block_ns)
+			io_limit_wait(&io_limit_control->read_next_ns,
+						  io_limit_get_block_count(ioh) * read_block_ns);
+	}
+	else if (ioh->op == PGAIO_OP_WRITEV)
+	{
+		write_block_ns = io_limit_control->write_block_ns;
+		io_limit_wait(&io_limit_control->write_next_ns,
+					  io_limit_get_block_count(ioh) * write_block_ns);
+	}
+}
diff --git a/contrib/io_limit/io_limit.control b/contrib/io_limit/io_limit.control
new file mode 100644
index 00000000000..2f8f06c9e87
--- /dev/null
+++ b/contrib/io_limit/io_limit.control
@@ -0,0 +1,5 @@
+# io_limit extension
+comment = 'io_limit'
+default_version = '1.0'
+module_pathname = '$libdir/io_limit'
+relocatable = true
diff --git a/contrib/io_limit/meson.build b/contrib/io_limit/meson.build
new file mode 100644
index 00000000000..1d26a08de83
--- /dev/null
+++ b/contrib/io_limit/meson.build
@@ -0,0 +1,28 @@
+# Copyright (c) 2022-2026, PostgreSQL Global Development Group
+
+io_limit_sources = files(
+  'io_limit.c',
+)
+
+if host_system == 'windows'
+  io_limit_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
+    '--NAME', 'io_limit',
+    '--FILEDESC', 'io_limit - artificially limit asynchronous I/O for tesing',])
+endif
+
+io_limit = shared_module('io_limit',
+  io_limit_sources,
+  kwargs: contrib_mod_args,
+)
+contrib_targets += io_limit
+
+install_data(
+  'io_limit.control',
+  kwargs: contrib_data_args,
+)
+
+tests += {
+  'name': 'io_limit',
+  'sd': meson.current_source_dir(),
+  'bd': meson.current_build_dir(),
+}
diff --git a/contrib/meson.build b/contrib/meson.build
index ebb7f83d8c5..398b0d704b5 100644
--- a/contrib/meson.build
+++ b/contrib/meson.build
@@ -34,6 +34,7 @@ subdir('hstore_plperl')
 subdir('hstore_plpython')
 subdir('intagg')
 subdir('intarray')
+subdir('io_limit')
 subdir('isn')
 subdir('jsonb_plperl')
 subdir('jsonb_plpython')
diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c
index a5ccd506d8c..87afcf856e1 100644
--- a/src/backend/storage/aio/method_worker.c
+++ b/src/backend/storage/aio/method_worker.c
@@ -139,6 +139,7 @@ static int	MyIoWorkerId = -1;
 static PgAioWorkerSubmissionQueue *io_worker_submission_queue;
 static PgAioWorkerControl *io_worker_control;
 
+static io_worker_on_perform_fn io_worker_on_perform_hook;
 
 static void
 pgaio_workerset_initialize(PgAioWorkerSet *set)
@@ -529,6 +530,9 @@ pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
 		for (int i = 0; i < nsync; ++i)
 		{
 			pgaio_io_perform_synchronously(synchronous_ios[i]);
+
+			if (io_worker_on_perform_hook)
+				io_worker_on_perform_hook(synchronous_ios[i]);
 		}
 	}
 
@@ -929,6 +933,9 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
 			 */
 			pgaio_io_perform_synchronously(ioh);
 
+			if (io_worker_on_perform_hook)
+				io_worker_on_perform_hook(ioh);
+
 			RESUME_INTERRUPTS();
 			errcallback.arg = NULL;
 		}
@@ -1024,6 +1031,12 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
 	proc_exit(0);
 }
 
+void
+pgaio_worker_set_on_perform_hook(io_worker_on_perform_fn fn)
+{
+	io_worker_on_perform_hook = fn;
+}
+
 bool
 pgaio_workers_enabled(void)
 {
diff --git a/src/include/storage/io_worker.h b/src/include/storage/io_worker.h
index c852c9f3741..c9ef49a585d 100644
--- a/src/include/storage/io_worker.h
+++ b/src/include/storage/io_worker.h
@@ -28,4 +28,9 @@ extern bool pgaio_worker_pm_test_grow_signal_sent(void);
 extern void pgaio_worker_pm_clear_grow_signal_sent(void);
 extern bool pgaio_worker_pm_test_grow(void);
 
+/* Hook to support contrib/io_limit. */
+typedef void (*io_worker_on_perform_fn) (PgAioHandle *handle);
+extern void pgaio_worker_set_on_perform_hook(io_worker_on_perform_fn fn);
+
+
 #endif							/* IO_WORKER_H */
-- 
2.53.0

