From 2ffba516edba39e7119e27c50e1c03296e860da9 Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Sat, 11 Apr 2026 17:31:13 +1200 Subject: [PATCH v1 1/2] contrib/io_limit: Simulation of slow storage. Only affects IOs submitted to io_method=worker. Configured as: shared_preload_libraries=io_limit io_limit.ios_per_second=1000 io_limit.read_per_second=200MB io_limit_write_per_second=100MB Zero means no limit. XXX Experimental hack --- contrib/Makefile | 1 + contrib/io_limit/Makefile | 20 ++ contrib/io_limit/io_limit.c | 275 ++++++++++++++++++++++++ contrib/io_limit/io_limit.control | 5 + contrib/io_limit/meson.build | 28 +++ contrib/meson.build | 1 + src/backend/storage/aio/method_worker.c | 13 ++ src/include/storage/io_worker.h | 5 + 8 files changed, 348 insertions(+) create mode 100644 contrib/io_limit/Makefile create mode 100644 contrib/io_limit/io_limit.c create mode 100644 contrib/io_limit/io_limit.control create mode 100644 contrib/io_limit/meson.build diff --git a/contrib/Makefile b/contrib/Makefile index 7d91fe77db3..48e82c53333 100644 --- a/contrib/Makefile +++ b/contrib/Makefile @@ -24,6 +24,7 @@ SUBDIRS = \ hstore \ intagg \ intarray \ + io_limit \ isn \ lo \ ltree \ diff --git a/contrib/io_limit/Makefile b/contrib/io_limit/Makefile new file mode 100644 index 00000000000..da176698a17 --- /dev/null +++ b/contrib/io_limit/Makefile @@ -0,0 +1,20 @@ +# contrib/io_limit/Makefile + +MODULE_big = io_limit +OBJS = \ + $(WIN32RES) \ + io_limit.o + +EXTENSION = io_limit +PGFILEDESC = "io_limit - io_limit - artificially limit asynchronous I/O for tesing" + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = contrib/pg_prewarm +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/contrib/io_limit/io_limit.c b/contrib/io_limit/io_limit.c new file mode 100644 index 00000000000..fa2ec6f1ff2 --- /dev/null +++ b/contrib/io_limit/io_limit.c @@ -0,0 +1,275 @@ +#include "postgres.h" + +#include "miscadmin.h" +#include "port/atomics.h" +#include "portability/instr_time.h" +#include "storage/aio_internal.h" +#include "storage/io_worker.h" +#include "storage/lwlock.h" +#include "storage/shmem.h" +#include "utils/guc.h" + +/* GUCs. */ +static int io_limit_ios_per_second = 0; +static int io_limit_read_per_second = 0; +static int io_limit_write_per_second = 0; + +typedef struct io_limit_control_data +{ + /* Whether any GUC is set to a non-zero value. */ + bool enabled; + + /* Absolute time to wait until. */ + pg_atomic_uint64 op_next_ns; + pg_atomic_uint64 read_next_ns; + pg_atomic_uint64 write_next_ns; + + /* Limits expressed as delay intervals. */ + LWLock lock; + int op_ns; + int read_block_ns; + int write_block_ns; +} io_limit_control_data; + +static io_limit_control_data * io_limit_control; + +static void io_limit_shmem_request(void *arg); +static void io_limit_shmem_init(void *arg); + +static void assign_io_limit_ios_per_second(int newval, void *extra); +static void assign_io_limit_read_per_second(int newval, void *extra); +static void assign_io_limit_write_per_second(int newval, void *extra); +static const char *show_io_limit_ios_per_second(void); +static const char *show_io_limit_read_per_second(void); +static const char *show_io_limit_write_per_second(void); + +static void io_limit_on_perform(PgAioHandle *ioh); + +static const ShmemCallbacks io_limit_shmem_callbacks = { + .request_fn = io_limit_shmem_request, + .init_fn = io_limit_shmem_init, +}; + +PG_MODULE_MAGIC_EXT( + .name = "io_limit", + .version = PG_VERSION +); + +void +_PG_init(void) +{ + /* Bail out if not configured in shared_preload_libraries. */ + if (!process_shared_preload_libraries_in_progress) + return; + + DefineCustomIntVariable("io_limit.ios_per_second", + "Limits IOs per second.", + "If set to zero, there is no limit.", + &io_limit_ios_per_second, + 0, + 0, INT_MAX, + PGC_USERSET, + 0, + NULL, + assign_io_limit_ios_per_second, + show_io_limit_ios_per_second); + DefineCustomIntVariable("io_limit.read_per_second", + "Limits read bandwidth.", + "If set to zero, there is no limit.", + &io_limit_read_per_second, + 0, + 0, INT_MAX, + PGC_USERSET, + GUC_UNIT_BLOCKS, + NULL, + assign_io_limit_read_per_second, + show_io_limit_read_per_second); + DefineCustomIntVariable("io_limit.write_per_second", + "Limits write bandwidth.", + "If set to zero, there is no limit.", + &io_limit_write_per_second, + 0, + 0, INT_MAX, + PGC_USERSET, + GUC_UNIT_BLOCKS, + NULL, + assign_io_limit_write_per_second, + show_io_limit_write_per_second); + + MarkGUCPrefixReserved("io_limit"); + RegisterShmemCallbacks(&io_limit_shmem_callbacks); + pgaio_worker_set_on_perform_hook(io_limit_on_perform); +} + +static void +io_limit_shmem_request(void *arg) +{ + ShmemRequestStruct(.name = "io_limit", + .size = sizeof(io_limit_control_data), + .ptr = (void **) &io_limit_control); +} + +static void +io_limit_shmem_init(void *arg) +{ + memset(io_limit_control, 0, sizeof(*io_limit_control)); + pg_atomic_init_u64(&io_limit_control->op_next_ns, 0); + pg_atomic_init_u64(&io_limit_control->read_next_ns, 0); + pg_atomic_init_u64(&io_limit_control->write_next_ns, 0); + LWLockInitialize(&io_limit_control->lock, LWLockNewTrancheId("io_limit")); + + /* Assign initial values. */ + assign_io_limit_ios_per_second(io_limit_ios_per_second, NULL); + assign_io_limit_read_per_second(io_limit_read_per_second, NULL); + assign_io_limit_write_per_second(io_limit_write_per_second, NULL); +} + +static void +assign_io_limit(int *wait_ns, int per_second) +{ + /* Ignore call from _PG_init() before ready. */ + if (!io_limit_control) + return; + + LWLockAcquire(&io_limit_control->lock, LW_EXCLUSIVE); + *wait_ns = per_second == 0 ? 0 : NS_PER_S / per_second; + io_limit_control->enabled = + io_limit_control->op_ns > 0 || + io_limit_control->read_block_ns > 0 || + io_limit_control->write_block_ns > 0; + LWLockRelease(&io_limit_control->lock); +} + +static void +assign_io_limit_ios_per_second(int newval, void *extra) +{ + assign_io_limit(&io_limit_control->op_ns, newval); +} + +static void +assign_io_limit_read_per_second(int newval, void *extra) +{ + assign_io_limit(&io_limit_control->read_block_ns, newval); +} + +static void +assign_io_limit_write_per_second(int newval, void *extra) +{ + assign_io_limit(&io_limit_control->write_block_ns, newval); +} + +static const char * +show_io_limit(const int *wait_ns) +{ + int per_second; + + LWLockAcquire(&io_limit_control->lock, LW_SHARED); + per_second = *wait_ns == 0 ? 0 : NS_PER_S / *wait_ns; + LWLockRelease(&io_limit_control->lock); + + return psprintf("%d", per_second); +} + +static const char * +show_io_limit_ios_per_second(void) +{ + return show_io_limit(&io_limit_control->op_ns); +} + +static const char * +show_io_limit_read_per_second(void) +{ + return show_io_limit(&io_limit_control->read_block_ns); +} + +static const char * +show_io_limit_write_per_second(void) +{ + return show_io_limit(&io_limit_control->write_block_ns); +} + +static BlockNumber +io_limit_get_block_count(PgAioHandle *ioh) +{ + if (ioh->op == PGAIO_OP_READV || + ioh->op == PGAIO_OP_WRITEV) + { + struct iovec *iov; + size_t size; + int iovcnt; + + size = 0; + iovcnt = pgaio_io_get_iovec_length(ioh, &iov); + for (int i = 0; i < iovcnt; ++i) + size += iov[i].iov_len; + + return size / BLCKSZ; + } + + return 0; +} + +/* + * Wait until *next_ns_p and advance *next_ns_p by delay_ns. + */ +static void +io_limit_wait(pg_atomic_uint64 *next_ns_p, int delay_ns) +{ + instr_time now; + uint64 now_ns; + uint64 next_ns; + + INSTR_TIME_SET_CURRENT(now); + now_ns = INSTR_TIME_GET_NANOSEC(now); + next_ns = pg_atomic_read_u64(next_ns_p); + + for (;;) + { + if (next_ns > now_ns) + { + /* Need to wait. Delay the next op further. */ + next_ns = pg_atomic_fetch_add_u64(next_ns_p, delay_ns); + + /* Average rate maintained even with low-res sleep or EINTR. */ + pg_usleep(((next_ns - now_ns) + 999) / 1000); + break; + } + else + { + /* Don't need to wait. New next_ns is relative to now. */ + if (pg_atomic_compare_exchange_u64(next_ns_p, + &next_ns, + now_ns + delay_ns)) + break; + } + } +} + +static void +io_limit_on_perform(PgAioHandle *ioh) +{ + int op_ns; + int read_block_ns; + int write_block_ns; + + if (!io_limit_control->enabled) + return; + + op_ns = io_limit_control->op_ns; + if (op_ns) + io_limit_wait(&io_limit_control->op_next_ns, op_ns); + + if (ioh->op == PGAIO_OP_READV) + { + read_block_ns = io_limit_control->read_block_ns; + if (read_block_ns) + io_limit_wait(&io_limit_control->read_next_ns, + io_limit_get_block_count(ioh) * read_block_ns); + } + else if (ioh->op == PGAIO_OP_WRITEV) + { + write_block_ns = io_limit_control->write_block_ns; + io_limit_wait(&io_limit_control->write_next_ns, + io_limit_get_block_count(ioh) * write_block_ns); + } +} diff --git a/contrib/io_limit/io_limit.control b/contrib/io_limit/io_limit.control new file mode 100644 index 00000000000..2f8f06c9e87 --- /dev/null +++ b/contrib/io_limit/io_limit.control @@ -0,0 +1,5 @@ +# io_limit extension +comment = 'io_limit' +default_version = '1.0' +module_pathname = '$libdir/io_limit' +relocatable = true diff --git a/contrib/io_limit/meson.build b/contrib/io_limit/meson.build new file mode 100644 index 00000000000..1d26a08de83 --- /dev/null +++ b/contrib/io_limit/meson.build @@ -0,0 +1,28 @@ +# Copyright (c) 2022-2026, PostgreSQL Global Development Group + +io_limit_sources = files( + 'io_limit.c', +) + +if host_system == 'windows' + io_limit_sources += rc_lib_gen.process(win32ver_rc, extra_args: [ + '--NAME', 'io_limit', + '--FILEDESC', 'io_limit - artificially limit asynchronous I/O for tesing',]) +endif + +io_limit = shared_module('io_limit', + io_limit_sources, + kwargs: contrib_mod_args, +) +contrib_targets += io_limit + +install_data( + 'io_limit.control', + kwargs: contrib_data_args, +) + +tests += { + 'name': 'io_limit', + 'sd': meson.current_source_dir(), + 'bd': meson.current_build_dir(), +} diff --git a/contrib/meson.build b/contrib/meson.build index ebb7f83d8c5..398b0d704b5 100644 --- a/contrib/meson.build +++ b/contrib/meson.build @@ -34,6 +34,7 @@ subdir('hstore_plperl') subdir('hstore_plpython') subdir('intagg') subdir('intarray') +subdir('io_limit') subdir('isn') subdir('jsonb_plperl') subdir('jsonb_plpython') diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c index a5ccd506d8c..87afcf856e1 100644 --- a/src/backend/storage/aio/method_worker.c +++ b/src/backend/storage/aio/method_worker.c @@ -139,6 +139,7 @@ static int MyIoWorkerId = -1; static PgAioWorkerSubmissionQueue *io_worker_submission_queue; static PgAioWorkerControl *io_worker_control; +static io_worker_on_perform_fn io_worker_on_perform_hook; static void pgaio_workerset_initialize(PgAioWorkerSet *set) @@ -529,6 +530,9 @@ pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios) for (int i = 0; i < nsync; ++i) { pgaio_io_perform_synchronously(synchronous_ios[i]); + + if (io_worker_on_perform_hook) + io_worker_on_perform_hook(synchronous_ios[i]); } } @@ -929,6 +933,9 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len) */ pgaio_io_perform_synchronously(ioh); + if (io_worker_on_perform_hook) + io_worker_on_perform_hook(ioh); + RESUME_INTERRUPTS(); errcallback.arg = NULL; } @@ -1024,6 +1031,12 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len) proc_exit(0); } +void +pgaio_worker_set_on_perform_hook(io_worker_on_perform_fn fn) +{ + io_worker_on_perform_hook = fn; +} + bool pgaio_workers_enabled(void) { diff --git a/src/include/storage/io_worker.h b/src/include/storage/io_worker.h index c852c9f3741..c9ef49a585d 100644 --- a/src/include/storage/io_worker.h +++ b/src/include/storage/io_worker.h @@ -28,4 +28,9 @@ extern bool pgaio_worker_pm_test_grow_signal_sent(void); extern void pgaio_worker_pm_clear_grow_signal_sent(void); extern bool pgaio_worker_pm_test_grow(void); +/* Hook to support contrib/io_limit. */ +typedef void (*io_worker_on_perform_fn) (PgAioHandle *handle); +extern void pgaio_worker_set_on_perform_hook(io_worker_on_perform_fn fn); + + #endif /* IO_WORKER_H */ base-commit: 80156cee06b9d257251d72379ac43f9b88bd13e1 -- 2.52.0