From 0d79ede7a92225d90715a5370581c83b521cb63f Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Fri, 31 Jan 2025 13:46:35 -0500
Subject: [PATCH v2.4 12/29] aio: Add worker method

---
 src/include/storage/aio.h                     |   5 +-
 src/include/storage/aio_internal.h            |   1 +
 src/include/storage/lwlocklist.h              |   1 +
 src/backend/storage/aio/aio.c                 |   2 +
 src/backend/storage/aio/aio_init.c            |   9 +
 src/backend/storage/aio/method_worker.c       | 435 +++++++++++++++++-
 .../utils/activity/wait_event_names.txt       |   1 +
 src/backend/utils/misc/guc_tables.c           |  13 +
 src/backend/utils/misc/postgresql.conf.sample |   3 +-
 doc/src/sgml/config.sgml                      |  21 +
 src/tools/pgindent/typedefs.list              |   3 +
 11 files changed, 483 insertions(+), 11 deletions(-)

diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h
index d87cfe96b20..ca0f9b64d97 100644
--- a/src/include/storage/aio.h
+++ b/src/include/storage/aio.h
@@ -23,10 +23,11 @@
 typedef enum IoMethod
 {
 	IOMETHOD_SYNC = 0,
+	IOMETHOD_WORKER,
 } IoMethod;
 
-/* We'll default to synchronous execution. */
-#define DEFAULT_IO_METHOD IOMETHOD_SYNC
+/* We'll default to worker based execution. */
+#define DEFAULT_IO_METHOD IOMETHOD_WORKER
 
 
 /*
diff --git a/src/include/storage/aio_internal.h b/src/include/storage/aio_internal.h
index e980b06c1f3..c6e7306ed61 100644
--- a/src/include/storage/aio_internal.h
+++ b/src/include/storage/aio_internal.h
@@ -338,6 +338,7 @@ extern PgAioHandle *pgaio_inj_io_get(void);
 
 /* Declarations for the tables of function pointers exposed by each IO method. */
 extern PGDLLIMPORT const IoMethodOps pgaio_sync_ops;
+extern PGDLLIMPORT const IoMethodOps pgaio_worker_ops;
 
 extern PGDLLIMPORT const IoMethodOps *pgaio_method_ops;
 extern PGDLLIMPORT PgAioCtl *pgaio_ctl;
diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h
index cf565452382..932024b1b0b 100644
--- a/src/include/storage/lwlocklist.h
+++ b/src/include/storage/lwlocklist.h
@@ -83,3 +83,4 @@ PG_LWLOCK(49, WALSummarizer)
 PG_LWLOCK(50, DSMRegistry)
 PG_LWLOCK(51, InjectionPoint)
 PG_LWLOCK(52, SerialControl)
+PG_LWLOCK(53, AioWorkerSubmissionQueue)
diff --git a/src/backend/storage/aio/aio.c b/src/backend/storage/aio/aio.c
index 0483580d644..3eace131de2 100644
--- a/src/backend/storage/aio/aio.c
+++ b/src/backend/storage/aio/aio.c
@@ -63,6 +63,7 @@ static void pgaio_io_wait(PgAioHandle *ioh, uint64 ref_generation);
 /* Options for io_method. */
 const struct config_enum_entry io_method_options[] = {
 	{"sync", IOMETHOD_SYNC, false},
+	{"worker", IOMETHOD_WORKER, false},
 	{NULL, 0, false}
 };
 
@@ -79,6 +80,7 @@ PgAioBackend *pgaio_my_backend;
 
 static const IoMethodOps *const pgaio_method_ops_table[] = {
 	[IOMETHOD_SYNC] = &pgaio_sync_ops,
+	[IOMETHOD_WORKER] = &pgaio_worker_ops,
 };
 
 /* callbacks for the configured io_method, set by assign_io_method */
diff --git a/src/backend/storage/aio/aio_init.c b/src/backend/storage/aio/aio_init.c
index 4223cd1bfd6..87eac5e961c 100644
--- a/src/backend/storage/aio/aio_init.c
+++ b/src/backend/storage/aio/aio_init.c
@@ -18,6 +18,7 @@
 #include "storage/aio.h"
 #include "storage/aio_internal.h"
 #include "storage/aio_subsys.h"
+#include "storage/io_worker.h"
 #include "storage/ipc.h"
 #include "storage/proc.h"
 #include "storage/shmem.h"
@@ -39,6 +40,11 @@ AioCtlShmemSize(void)
 static uint32
 AioProcs(void)
 {
+	/*
+	 * While AIO workers don't need their own AIO context, we can't currently
+	 * guarantee nothing gets assigned to the a ProcNumber for an IO worker if
+	 * we just subtracted MAX_IO_WORKERS.
+	 */
 	return MaxBackends + NUM_AUXILIARY_PROCS;
 }
 
@@ -211,6 +217,9 @@ pgaio_init_backend(void)
 	/* shouldn't be initialized twice */
 	Assert(!pgaio_my_backend);
 
+	if (MyBackendType == B_IO_WORKER)
+		return;
+
 	if (MyProc == NULL || MyProcNumber >= AioProcs())
 		elog(ERROR, "aio requires a normal PGPROC");
 
diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c
index 942f1609af2..d1b52cba2cd 100644
--- a/src/backend/storage/aio/method_worker.c
+++ b/src/backend/storage/aio/method_worker.c
@@ -3,6 +3,21 @@
  * method_worker.c
  *    AIO - perform AIO using worker processes
  *
+ * Worker processes consume IOs from a shared memory submission queue, run
+ * traditional synchronous system calls, and perform the shared completion
+ * handling immediately.  Client code submits most requests by pushing IOs
+ * into the submission queue, and waits (if necessary) using condition
+ * variables.  Some IOs cannot be performed in another process due to lack of
+ * infrastructure for reopening the file, and must processed synchronously by
+ * the client code when submitted.
+ *
+ * So that the submitter can make just one system call when submitting a batch
+ * of IOs, wakeups "fan out"; each woken backend can wake two more.  XXX This
+ * could be improved by using futexes instead of latches to wake N waiters.
+ *
+ * This method of AIO is available in all builds on all operating systems, and
+ * is the default.
+ *
  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
@@ -16,30 +31,329 @@
 
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
+#include "port/pg_bitutils.h"
 #include "postmaster/auxprocess.h"
 #include "postmaster/interrupt.h"
+#include "storage/aio.h"
+#include "storage/aio_internal.h"
 #include "storage/aio_subsys.h"
 #include "storage/io_worker.h"
 #include "storage/ipc.h"
 #include "storage/latch.h"
 #include "storage/proc.h"
 #include "tcop/tcopprot.h"
+#include "utils/ps_status.h"
 #include "utils/wait_event.h"
 
 
+/* How many workers should each worker wake up if needed? */
+#define IO_WORKER_WAKEUP_FANOUT 2
+
+
+typedef struct AioWorkerSubmissionQueue
+{
+	uint32		size;
+	uint32		mask;
+	uint32		head;
+	uint32		tail;
+	uint32		ios[FLEXIBLE_ARRAY_MEMBER];
+} AioWorkerSubmissionQueue;
+
+typedef struct AioWorkerSlot
+{
+	Latch	   *latch;
+	bool		in_use;
+} AioWorkerSlot;
+
+typedef struct AioWorkerControl
+{
+	uint64		idle_worker_mask;
+	AioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
+} AioWorkerControl;
+
+
+static size_t pgaio_worker_shmem_size(void);
+static void pgaio_worker_shmem_init(bool first_time);
+
+static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh);
+static int	pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios);
+
+
+const IoMethodOps pgaio_worker_ops = {
+	.shmem_size = pgaio_worker_shmem_size,
+	.shmem_init = pgaio_worker_shmem_init,
+
+	.needs_synchronous_execution = pgaio_worker_needs_synchronous_execution,
+	.submit = pgaio_worker_submit,
+};
+
+
 /* GUCs */
 int			io_workers = 3;
 
 
+static int	io_worker_queue_size = 64;
+static int	MyIoWorkerId;
+static AioWorkerSubmissionQueue *io_worker_submission_queue;
+static AioWorkerControl *io_worker_control;
+
+
+static size_t
+pgaio_worker_shmem_size(void)
+{
+	return
+		offsetof(AioWorkerSubmissionQueue, ios) +
+		sizeof(uint32) * MAX_IO_WORKERS * io_max_concurrency +
+		offsetof(AioWorkerControl, workers) +
+		sizeof(AioWorkerSlot) * MAX_IO_WORKERS;
+}
+
+static void
+pgaio_worker_shmem_init(bool first_time)
+{
+	bool		found;
+	int			size;
+
+	/* Round size up to next power of two so we can make a mask. */
+	size = pg_nextpower2_32(io_worker_queue_size);
+
+	io_worker_submission_queue =
+		ShmemInitStruct("AioWorkerSubmissionQueue",
+						offsetof(AioWorkerSubmissionQueue, ios) +
+						sizeof(uint32) * size,
+						&found);
+	if (!found)
+	{
+		io_worker_submission_queue->size = size;
+		io_worker_submission_queue->head = 0;
+		io_worker_submission_queue->tail = 0;
+	}
+
+	io_worker_control =
+		ShmemInitStruct("AioWorkerControl",
+						offsetof(AioWorkerControl, workers) +
+						sizeof(AioWorkerSlot) * io_workers,
+						&found);
+	if (!found)
+	{
+		io_worker_control->idle_worker_mask = 0;
+		for (int i = 0; i < io_workers; ++i)
+		{
+			io_worker_control->workers[i].latch = NULL;
+			io_worker_control->workers[i].in_use = false;
+		}
+	}
+}
+
+static int
+pgaio_choose_idle_worker(void)
+{
+	int			worker;
+
+	if (io_worker_control->idle_worker_mask == 0)
+		return -1;
+
+	/* Find the lowest bit position, and clear it. */
+	worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask);
+	io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker);
+
+	return worker;
+}
+
+static bool
+pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
+{
+	AioWorkerSubmissionQueue *queue;
+	uint32		new_head;
+
+	queue = io_worker_submission_queue;
+	new_head = (queue->head + 1) & (queue->size - 1);
+	if (new_head == queue->tail)
+	{
+		pgaio_debug(DEBUG1, "io queue is full, at %u elements",
+					io_worker_submission_queue->size);
+		return false;			/* full */
+	}
+
+	queue->ios[queue->head] = pgaio_io_get_id(ioh);
+	queue->head = new_head;
+
+	return true;
+}
+
+static uint32
+pgaio_worker_submission_queue_consume(void)
+{
+	AioWorkerSubmissionQueue *queue;
+	uint32		result;
+
+	queue = io_worker_submission_queue;
+	if (queue->tail == queue->head)
+		return UINT32_MAX;		/* empty */
+
+	result = queue->ios[queue->tail];
+	queue->tail = (queue->tail + 1) & (queue->size - 1);
+
+	return result;
+}
+
+static uint32
+pgaio_worker_submission_queue_depth(void)
+{
+	uint32		head;
+	uint32		tail;
+
+	head = io_worker_submission_queue->head;
+	tail = io_worker_submission_queue->tail;
+
+	if (tail > head)
+		head += io_worker_submission_queue->size;
+
+	Assert(head >= tail);
+
+	return head - tail;
+}
+
+static bool
+pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
+{
+	return
+		!IsUnderPostmaster
+		|| ioh->flags & PGAIO_HF_REFERENCES_LOCAL
+		|| !pgaio_io_can_reopen(ioh);
+}
+
+static void
+pgaio_worker_submit_internal(int nios, PgAioHandle *ios[])
+{
+	PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE];
+	int			nsync = 0;
+	Latch	   *wakeup = NULL;
+	int			worker;
+
+	Assert(nios <= PGAIO_SUBMIT_BATCH_SIZE);
+
+	LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+	for (int i = 0; i < nios; ++i)
+	{
+		Assert(!pgaio_worker_needs_synchronous_execution(ios[i]));
+		if (!pgaio_worker_submission_queue_insert(ios[i]))
+		{
+			/*
+			 * We'll do it synchronously, but only after we've sent as many as
+			 * we can to workers, to maximize concurrency.
+			 */
+			synchronous_ios[nsync++] = ios[i];
+			continue;
+		}
+
+		if (wakeup == NULL)
+		{
+			/* Choose an idle worker to wake up if we haven't already. */
+			worker = pgaio_choose_idle_worker();
+			if (worker >= 0)
+				wakeup = io_worker_control->workers[worker].latch;
+
+			pgaio_debug_io(DEBUG4, ios[i],
+						   "choosing worker %d",
+						   worker);
+		}
+	}
+	LWLockRelease(AioWorkerSubmissionQueueLock);
+
+	if (wakeup)
+		SetLatch(wakeup);
+
+	/* Run whatever is left synchronously. */
+	if (nsync > 0)
+	{
+		for (int i = 0; i < nsync; ++i)
+		{
+			pgaio_io_perform_synchronously(synchronous_ios[i]);
+		}
+	}
+}
+
+static int
+pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
+{
+	for (int i = 0; i < num_staged_ios; i++)
+	{
+		PgAioHandle *ioh = staged_ios[i];
+
+		pgaio_io_prepare_submit(ioh);
+	}
+
+	pgaio_worker_submit_internal(num_staged_ios, staged_ios);
+
+	return num_staged_ios;
+}
+
+/*
+ * on_shmem_exit() callback that releases the worker's slot in
+ * io_worker_control.
+ */
+static void
+pgaio_worker_die(int code, Datum arg)
+{
+	LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+	Assert(io_worker_control->workers[MyIoWorkerId].in_use);
+	Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch);
+
+	io_worker_control->workers[MyIoWorkerId].in_use = false;
+	io_worker_control->workers[MyIoWorkerId].latch = NULL;
+	LWLockRelease(AioWorkerSubmissionQueueLock);
+}
+
+/*
+ * Register the worker in shared memory, assign MyWorkerId and register a
+ * shutdown callback to release registration.
+ */
+static void
+pgaio_worker_register(void)
+{
+	MyIoWorkerId = -1;
+
+	/*
+	 * XXX: This could do with more fine-grained locking. But it's also not
+	 * very common for the number of workers to change at the moment...
+	 */
+	LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+
+	for (int i = 0; i < io_workers; ++i)
+	{
+		if (!io_worker_control->workers[i].in_use)
+		{
+			Assert(io_worker_control->workers[i].latch == NULL);
+			io_worker_control->workers[i].in_use = true;
+			MyIoWorkerId = i;
+			break;
+		}
+		else
+			Assert(io_worker_control->workers[i].latch != NULL);
+	}
+
+	if (MyIoWorkerId == -1)
+		elog(ERROR, "couldn't find a free worker slot");
+
+	io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
+	io_worker_control->workers[MyIoWorkerId].latch = MyLatch;
+	LWLockRelease(AioWorkerSubmissionQueueLock);
+
+	on_shmem_exit(pgaio_worker_die, 0);
+}
+
 void
 IoWorkerMain(char *startup_data, size_t startup_data_len)
 {
 	sigjmp_buf	local_sigjmp_buf;
+	PgAioHandle *volatile error_ioh = NULL;
+	volatile int error_errno = 0;
+	char		cmd[128];
 
 	MyBackendType = B_IO_WORKER;
 	AuxiliaryProcessMainCommon();
 
-	/* TODO review all signals */
 	pqsignal(SIGHUP, SignalHandlerForConfigReload);
 	pqsignal(SIGINT, die);		/* to allow manually triggering worker restart */
 
@@ -53,7 +367,12 @@ IoWorkerMain(char *startup_data, size_t startup_data_len)
 	pqsignal(SIGPIPE, SIG_IGN);
 	pqsignal(SIGUSR1, procsignal_sigusr1_handler);
 	pqsignal(SIGUSR2, SignalHandlerForShutdownRequest);
-	sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
+
+	/* also registers a shutdown callback to unregister */
+	pgaio_worker_register();
+
+	sprintf(cmd, "io worker: %d", MyIoWorkerId);
+	set_ps_display(cmd);
 
 	/* see PostgresMain() */
 	if (sigsetjmp(local_sigjmp_buf, 1) != 0)
@@ -61,6 +380,27 @@ IoWorkerMain(char *startup_data, size_t startup_data_len)
 		error_context_stack = NULL;
 		HOLD_INTERRUPTS();
 
+		/*
+		 * In the - very unlikely - case that the IO failed in a way that
+		 * raises an error we need to mark the IO as failed.
+		 *
+		 * Need to do just enough error recovery so that we can mark the IO as
+		 * failed and then exit (postmaster will start a new worker).
+		 */
+		LWLockReleaseAll();
+
+		if (error_ioh != NULL)
+		{
+			/* should never fail without setting error_errno */
+			Assert(error_errno != 0);
+
+			errno = error_errno;
+
+			START_CRIT_SECTION();
+			pgaio_io_process_completion(error_ioh, -error_errno);
+			END_CRIT_SECTION();
+		}
+
 		EmitErrorReport();
 
 		proc_exit(1);
@@ -69,12 +409,92 @@ IoWorkerMain(char *startup_data, size_t startup_data_len)
 	/* We can now handle ereport(ERROR) */
 	PG_exception_stack = &local_sigjmp_buf;
 
+	sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
+
 	while (!ShutdownRequestPending)
 	{
-		WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
-				  WAIT_EVENT_IO_WORKER_MAIN);
-		ResetLatch(MyLatch);
-		CHECK_FOR_INTERRUPTS();
+		uint32		io_index;
+		Latch	   *latches[IO_WORKER_WAKEUP_FANOUT];
+		int			nlatches = 0;
+		int			nwakeups = 0;
+		int			worker;
+
+		/* Try to get a job to do. */
+		LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+		if ((io_index = pgaio_worker_submission_queue_consume()) == UINT32_MAX)
+		{
+			/*
+			 * Nothing to do.  Mark self idle.
+			 *
+			 * XXX: Invent some kind of back pressure to reduce useless
+			 * wakeups?
+			 */
+			io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
+		}
+		else
+		{
+			/* Got one.  Clear idle flag. */
+			io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
+
+			/* See if we can wake up some peers. */
+			nwakeups = Min(pgaio_worker_submission_queue_depth(),
+						   IO_WORKER_WAKEUP_FANOUT);
+			for (int i = 0; i < nwakeups; ++i)
+			{
+				if ((worker = pgaio_choose_idle_worker()) < 0)
+					break;
+				latches[nlatches++] = io_worker_control->workers[worker].latch;
+			}
+		}
+		LWLockRelease(AioWorkerSubmissionQueueLock);
+
+		for (int i = 0; i < nlatches; ++i)
+			SetLatch(latches[i]);
+
+		if (io_index != UINT32_MAX)
+		{
+			PgAioHandle *ioh = NULL;
+
+			ioh = &pgaio_ctl->io_handles[io_index];
+			error_ioh = ioh;
+
+			pgaio_debug_io(DEBUG4, ioh,
+						   "worker %d processing IO",
+						   MyIoWorkerId);
+
+			/*
+			 * It's very unlikely, but possible, that reopen fails. E.g. due
+			 * to memory allocations failing or file permissions changing or
+			 * such.  In that case we need to fail the IO.
+			 *
+			 * There's not really a good errno we can report here.
+			 */
+			error_errno = ENOENT;
+			pgaio_io_reopen(ioh);
+
+			/*
+			 * To be able to exercise the reopen-fails path, allow injection
+			 * points to trigger a failure at this point.
+			 */
+			pgaio_io_call_inj(ioh, "AIO_WORKER_AFTER_REOPEN");
+
+			error_errno = 0;
+			error_ioh = NULL;
+
+			/*
+			 * We don't expect this to ever fail, no need to keep error_ioh
+			 * around. pgaio_io_perform_synchronously() contains a critical
+			 * section.
+			 */
+			pgaio_io_perform_synchronously(ioh);
+		}
+		else
+		{
+			WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
+					  WAIT_EVENT_IO_WORKER_MAIN);
+			ResetLatch(MyLatch);
+			CHECK_FOR_INTERRUPTS();
+		}
 	}
 
 	proc_exit(0);
@@ -83,6 +503,5 @@ IoWorkerMain(char *startup_data, size_t startup_data_len)
 bool
 pgaio_workers_enabled(void)
 {
-	/* placeholder for future commit */
-	return false;
+	return io_method == IOMETHOD_WORKER;
 }
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index d18b788b8a1..cb977b049d8 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -348,6 +348,7 @@ WALSummarizer	"Waiting to read or update WAL summarization state."
 DSMRegistry	"Waiting to read or update the dynamic shared memory registry."
 InjectionPoint	"Waiting to read or update information related to injection points."
 SerialControl	"Waiting to read or update shared <filename>pg_serial</filename> state."
+AioWorkerSubmissionQueue	"Waiting to access AIO worker submission queue."
 
 #
 # END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE)
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index b7c84d061e2..15954f42d4e 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -74,6 +74,7 @@
 #include "storage/aio.h"
 #include "storage/bufmgr.h"
 #include "storage/bufpage.h"
+#include "storage/io_worker.h"
 #include "storage/large_object.h"
 #include "storage/pg_shmem.h"
 #include "storage/predicate.h"
@@ -3265,6 +3266,18 @@ struct config_int ConfigureNamesInt[] =
 		check_io_max_concurrency, NULL, NULL
 	},
 
+	{
+		{"io_workers",
+			PGC_SIGHUP,
+			RESOURCES_IO,
+			gettext_noop("Number of IO worker processes, for io_method=worker."),
+			NULL,
+		},
+		&io_workers,
+		3, 1, MAX_IO_WORKERS,
+		NULL, assign_io_workers, NULL
+	},
+
 	{
 		{"backend_flush_after", PGC_USERSET, RESOURCES_IO,
 			gettext_noop("Number of pages after which previously performed writes are flushed to disk."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 186bc47b700..3bb4e0d4d7d 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -199,11 +199,12 @@
 #maintenance_io_concurrency = 10	# 1-1000; 0 disables prefetching
 #io_combine_limit = 128kB		# usually 1-32 blocks (depends on OS)
 
-#io_method = sync			# sync (change requires restart)
+#io_method = worker			# worker, sync (change requires restart)
 #io_max_concurrency = -1		# Max number of IOs that one process
 					# can execute simultaneously
 					# -1 sets based on shared_buffers
 					# (change requires restart)
+#io_workers = 3				# 1-32;
 
 # - Worker Processes -
 
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 0306827afbd..2cf0120bb56 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2656,6 +2656,11 @@ include_dir 'conf.d'
          Selects the method for executing asynchronous I/O.
          Possible values are:
          <itemizedlist>
+          <listitem>
+           <para>
+            <literal>worker</literal> (execute asynchronous I/O using worker processes)
+           </para>
+          </listitem>
           <listitem>
            <para>
             <literal>sync</literal> (execute asynchronous I/O synchronously)
@@ -2669,6 +2674,22 @@ include_dir 'conf.d'
        </listitem>
       </varlistentry>
 
+      <varlistentry id="guc-io-workers" xreflabel="io_workers">
+       <term><varname>io_workers</varname> (<type>int</type>)
+       <indexterm>
+        <primary><varname>io_workers</varname> configuration parameter</primary>
+       </indexterm>
+       </term>
+       <listitem>
+        <para>
+         Selects the number of I/O worker processes to use. The default is 3.
+        </para>
+        <para>
+         Only has an effect if <xref linkend="guc-max-wal-senders"/> is set to
+         <literal>worker</literal>.
+        </para>
+       </listitem>
+      </varlistentry>
      </variablelist>
     </sect2>
 
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 14a338e4308..e34727e269a 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -55,6 +55,9 @@ AggStrategy
 AggTransInfo
 Aggref
 AggregateInstrumentation
+AioWorkerControl
+AioWorkerSlot
+AioWorkerSubmissionQueue
 AlenState
 Alias
 AllocBlock
-- 
2.48.1.76.g4e746b1a31.dirty

