From 0de554082f3ff6468ff352000774245b337d6d64 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Sat, 31 Aug 2024 21:23:37 -0400
Subject: [PATCH v2.0 09/17] aio: Basic AIO implementation

At this point nothing can use AIO - this commit does not include any
implementation of aio subjects / callbacks. That will come in later commits.

Todo:
- implement "synchronous" AIO method
- split worker, io_uring methods out into separate commits
- lots of cleanup
---
 src/include/storage/aio.h                     | 308 ++++++
 src/include/storage/aio_internal.h            | 274 +++++
 src/include/storage/aio_ref.h                 |  24 +
 src/include/storage/lwlock.h                  |   1 +
 src/include/storage/lwlocklist.h              |   1 +
 src/include/utils/resowner.h                  |   7 +
 src/backend/access/transam/xact.c             |   9 +
 src/backend/postmaster/postmaster.c           |   3 +-
 src/backend/storage/aio/Makefile              |   3 +
 src/backend/storage/aio/aio.c                 | 963 +++++++++++++++++-
 src/backend/storage/aio/aio_init.c            | 318 ++++++
 src/backend/storage/aio/aio_io.c              | 111 ++
 src/backend/storage/aio/aio_subject.c         | 170 ++++
 src/backend/storage/aio/meson.build           |   3 +
 src/backend/storage/aio/method_io_uring.c     | 393 +++++++
 src/backend/storage/aio/method_worker.c       | 413 +++++++-
 src/backend/storage/lmgr/lwlock.c             |   1 +
 .../utils/activity/wait_event_names.txt       |   4 +
 src/backend/utils/misc/guc_tables.c           |  25 +
 src/backend/utils/misc/postgresql.conf.sample |   6 +
 src/backend/utils/resowner/resowner.c         |  51 +
 src/tools/pgindent/typedefs.list              |  23 +
 22 files changed, 3104 insertions(+), 7 deletions(-)
 create mode 100644 src/include/storage/aio_internal.h
 create mode 100644 src/include/storage/aio_ref.h
 create mode 100644 src/backend/storage/aio/aio_io.c
 create mode 100644 src/backend/storage/aio/aio_subject.c
 create mode 100644 src/backend/storage/aio/method_io_uring.c

diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h
index 98fafcf9bc4..65052462b02 100644
--- a/src/include/storage/aio.h
+++ b/src/include/storage/aio.h
@@ -15,9 +15,315 @@
 #define AIO_H
 
 
+#include "storage/aio_ref.h"
+#include "storage/procnumber.h"
 #include "utils/guc_tables.h"
 
 
+typedef struct PgAioHandle PgAioHandle;
+
+typedef enum PgAioOp
+{
+	/* intentionally the zero value, to help catch zeroed memory etc */
+	PGAIO_OP_INVALID = 0,
+
+	PGAIO_OP_READ,
+	PGAIO_OP_WRITE,
+
+	PGAIO_OP_FSYNC,
+
+	PGAIO_OP_FLUSH_RANGE,
+
+	PGAIO_OP_NOP,
+
+	/**
+	 * Eventually we'll additionally want at least:
+	 * - send
+	 * - recv
+	 * - accept
+	 **/
+} PgAioOp;
+
+#define PGAIO_OP_COUNT	(PGAIO_OP_NOP + 1)
+
+
+/*
+ * On what is IO being performed.
+ *
+ * PgAioSharedCallback specific behaviour should be implemented in
+ * aio_subject.c.
+ */
+typedef enum PgAioSubjectID
+{
+	/* intentionally the zero value, to help catch zeroed memory etc */
+	ASI_INVALID = 0,
+} PgAioSubjectID;
+
+#define ASI_COUNT (ASI_INVALID + 1)
+
+/*
+ * Flags for an IO that can be set with pgaio_io_set_flag().
+ */
+typedef enum PgAioHandleFlags
+{
+	AHF_REFERENCES_LOCAL = 1 << 0,
+} PgAioHandleFlags;
+
+
+/*
+ * IDs for callbacks that can be registered on an IO.
+ *
+ * Callbacks are identified by an ID rather than a function pointer. There are
+ * two main reasons:
+
+ * 1) Memory within PgAioHandle is precious, due to the number of PgAioHandle
+ *    structs in pre-allocated shared memory.
+
+ * 2) Due to EXEC_BACKEND function pointers are not necessarily stable between
+ *    different backends, therefore function pointers cannot directly be in
+ *    shared memory.
+ *
+ * Without 2), we could fairly easily allow to add new callbacks, by filling a
+ * ID->pointer mapping table on demand. In the presence of 2 that's still
+ * doable, but harder, because every process has to re-register the pointers
+ * so that a local ID->"backend local pointer" mapping can be maintained.
+ */
+typedef enum PgAioHandleSharedCallbackID
+{
+	ASC_PLACEHOLDER /* empty enums are invalid */ ,
+} PgAioHandleSharedCallbackID;
+
+
+/*
+ * Data necessary for basic IO types (PgAioOp).
+ *
+ * NB: Note that the FDs in here may *not* be relied upon for re-issuing
+ * requests (e.g. for partial reads/writes) - the FD might be from another
+ * process, or closed since. That's not a problem for IOs waiting to be issued
+ * only because the queue is flushed when closing an FD.
+ */
+typedef union
+{
+	struct
+	{
+		int			fd;
+		uint16		iov_length;
+		uint64		offset;
+	}			read;
+
+	struct
+	{
+		int			fd;
+		uint16		iov_length;
+		uint64		offset;
+	}			write;
+
+	struct
+	{
+		int			fd;
+		bool		datasync;
+	}			fsync;
+
+	struct
+	{
+		int			fd;
+		uint32		nbytes;
+		uint64		offset;
+	}			flush_range;
+} PgAioOpData;
+
+
+/* XXX: Perhaps it's worth moving this to a dedicated file? */
+#include "storage/block.h"
+#include "storage/relfilelocator.h"
+
+typedef union PgAioSubjectData
+{
+	/* just as an example placeholder for later */
+	struct
+	{
+		uint32		queue_id;
+	}			wal;
+} PgAioSubjectData;
+
+
+
+typedef enum PgAioResultStatus
+{
+	ARS_UNKNOWN,
+	ARS_OK,
+	ARS_PARTIAL,
+	ARS_ERROR,
+} PgAioResultStatus;
+
+typedef struct PgAioResult
+{
+	PgAioHandleSharedCallbackID id:8;
+	PgAioResultStatus status:2;
+	uint32		error_data:22;
+	int32		result;
+} PgAioResult;
+
+typedef struct PgAioReturn
+{
+	PgAioResult result;
+	PgAioSubjectData subject_data;
+} PgAioReturn;
+
+
+typedef struct PgAioSubjectInfo
+{
+	void		(*reopen) (PgAioHandle *ioh);
+
+#ifdef NOT_YET
+	char	   *(*describe_identity) (PgAioHandle *ioh);
+#endif
+
+	const char *name;
+} PgAioSubjectInfo;
+
+
+typedef PgAioResult (*PgAioHandleSharedCallbackComplete) (PgAioHandle *ioh, PgAioResult prior_result);
+typedef void (*PgAioHandleSharedCallbackPrepare) (PgAioHandle *ioh);
+typedef void (*PgAioHandleSharedCallbackError) (PgAioResult result, const PgAioSubjectData *subject_data, int elevel);
+
+typedef struct PgAioHandleSharedCallbacks
+{
+	PgAioHandleSharedCallbackPrepare prepare;
+	PgAioHandleSharedCallbackComplete complete;
+	PgAioHandleSharedCallbackError error;
+} PgAioHandleSharedCallbacks;
+
+
+
+typedef struct PgAioBounceBuffer PgAioBounceBuffer;
+
+
+/*
+ * How many callbacks can be registered for one IO handle. Currently we only
+ * need two, but it's not hard to imagine needing a few more.
+ */
+#define AIO_MAX_SHARED_CALLBACKS	4
+
+
+
+/* AIO API */
+
+
+/* --------------------------------------------------------------------------------
+ * IO Handles
+ * --------------------------------------------------------------------------------
+ */
+
+struct ResourceOwnerData;
+extern PgAioHandle *pgaio_io_get(struct ResourceOwnerData *resowner, PgAioReturn *ret);
+extern PgAioHandle *pgaio_io_get_nb(struct ResourceOwnerData *resowner, PgAioReturn *ret);
+
+extern void pgaio_io_release(PgAioHandle *ioh);
+extern void pgaio_io_release_resowner(dlist_node *ioh_node, bool on_error);
+
+extern void pgaio_io_get_ref(PgAioHandle *ioh, PgAioHandleRef *ior);
+
+extern void pgaio_io_set_subject(PgAioHandle *ioh, PgAioSubjectID subjid);
+extern void pgaio_io_set_flag(PgAioHandle *ioh, PgAioHandleFlags flag);
+
+extern void pgaio_io_add_shared_cb(PgAioHandle *ioh, PgAioHandleSharedCallbackID cbid);
+
+extern void pgaio_io_set_io_data_32(PgAioHandle *ioh, uint32 *data, uint8 len);
+extern void pgaio_io_set_io_data_64(PgAioHandle *ioh, uint64 *data, uint8 len);
+extern uint64 *pgaio_io_get_io_data(PgAioHandle *ioh, uint8 *len);
+
+extern void pgaio_io_prepare(PgAioHandle *ioh, PgAioOp op);
+
+extern int	pgaio_io_get_id(PgAioHandle *ioh);
+struct iovec;
+extern int	pgaio_io_get_iovec(PgAioHandle *ioh, struct iovec **iov);
+extern bool pgaio_io_has_subject(PgAioHandle *ioh);
+
+extern PgAioSubjectData *pgaio_io_get_subject_data(PgAioHandle *ioh);
+extern PgAioOpData *pgaio_io_get_op_data(PgAioHandle *ioh);
+extern ProcNumber pgaio_io_get_owner(PgAioHandle *ioh);
+
+
+
+/* --------------------------------------------------------------------------------
+ * IO References
+ * --------------------------------------------------------------------------------
+ */
+
+extern void pgaio_io_ref_clear(PgAioHandleRef *ior);
+extern bool pgaio_io_ref_valid(PgAioHandleRef *ior);
+extern int	pgaio_io_ref_get_id(PgAioHandleRef *ior);
+
+
+extern void pgaio_io_ref_wait(PgAioHandleRef *ior);
+extern bool pgaio_io_ref_check_done(PgAioHandleRef *ior);
+
+
+
+/* --------------------------------------------------------------------------------
+ * IO Result
+ * --------------------------------------------------------------------------------
+ */
+
+extern void pgaio_result_log(PgAioResult result, const PgAioSubjectData *subject_data,
+							 int elevel);
+
+
+
+/* --------------------------------------------------------------------------------
+ * Bounce Buffers
+ * --------------------------------------------------------------------------------
+ */
+
+extern PgAioBounceBuffer *pgaio_bounce_buffer_get(void);
+extern void pgaio_io_assoc_bounce_buffer(PgAioHandle *ioh, PgAioBounceBuffer *bb);
+extern uint32 pgaio_bounce_buffer_id(PgAioBounceBuffer *bb);
+extern void pgaio_bounce_buffer_release(PgAioBounceBuffer *bb);
+extern char *pgaio_bounce_buffer_buffer(PgAioBounceBuffer *bb);
+extern void pgaio_bounce_buffer_release_resowner(dlist_node *bb_node, bool on_error);
+
+
+
+/* --------------------------------------------------------------------------------
+ * Actions on multiple IOs.
+ * --------------------------------------------------------------------------------
+ */
+
+extern void pgaio_submit_staged(void);
+extern bool pgaio_have_staged(void);
+
+
+
+/* --------------------------------------------------------------------------------
+ * Low level IO preparation routines
+ *
+ * These will often be called by code lowest level of initiating an
+ * IO. E.g. bufmgr.c may initiate IO for a buffer, but pgaio_io_prep_readv()
+ * will be called from within fd.c.
+ *
+ * Implemented in aio_io.c
+ * --------------------------------------------------------------------------------
+ */
+
+extern void pgaio_io_prep_readv(PgAioHandle *ioh,
+								int fd, int iovcnt, uint64 offset);
+
+extern void pgaio_io_prep_writev(PgAioHandle *ioh,
+								 int fd, int iovcnt, uint64 offset);
+
+
+
+/* --------------------------------------------------------------------------------
+ * Other
+ * --------------------------------------------------------------------------------
+ */
+
+extern void pgaio_closing_fd(int fd);
+extern void pgaio_at_xact_end(bool is_subxact, bool is_commit);
+extern void pgaio_at_error(void);
+
+
 /* GUC related */
 extern void assign_io_method(int newval, void *extra);
 
@@ -37,6 +343,8 @@ typedef enum IoMethod
 /* GUCs */
 extern const struct config_enum_entry io_method_options[];
 extern int	io_method;
+extern int	io_max_concurrency;
+extern int	io_bounce_buffers;
 
 
 #endif							/* AIO_H */
diff --git a/src/include/storage/aio_internal.h b/src/include/storage/aio_internal.h
new file mode 100644
index 00000000000..67d994cc0b1
--- /dev/null
+++ b/src/include/storage/aio_internal.h
@@ -0,0 +1,274 @@
+/*-------------------------------------------------------------------------
+ *
+ * aio_internal.h
+ *    aio_internal
+ *
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/storage/aio_internal.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef AIO_INTERNAL_H
+#define AIO_INTERNAL_H
+
+
+#include "lib/ilist.h"
+#include "port/pg_iovec.h"
+#include "storage/aio.h"
+#include "storage/condition_variable.h"
+
+
+#define PGAIO_VERBOSE
+
+
+/* AFIXME */
+#define PGAIO_SUBMIT_BATCH_SIZE 32
+
+
+
+typedef enum PgAioHandleState
+{
+	/* not in use */
+	AHS_IDLE = 0,
+
+	/* returned by pgaio_io_get() */
+	AHS_HANDED_OUT,
+
+	/* pgaio_io_start_*() has been called, but IO hasn't been submitted yet */
+	AHS_DEFINED,
+
+	/* subjects prepare() callback has been called */
+	AHS_PREPARED,
+
+	/* IO is being executed */
+	AHS_IN_FLIGHT,
+
+	/* IO finished, but result has not yet been processed */
+	AHS_REAPED,
+
+	/* IO completed, shared completion has been called */
+	AHS_COMPLETED_SHARED,
+
+	/* IO completed, local completion has been called */
+	AHS_COMPLETED_LOCAL,
+} PgAioHandleState;
+
+
+struct ResourceOwnerData;
+
+/* typedef is in public header */
+struct PgAioHandle
+{
+	PgAioHandleState state:8;
+
+	/* what are we operating on */
+	PgAioSubjectID subject:8;
+
+	/* which operation */
+	PgAioOp		op:8;
+
+	/* bitfield of PgAioHandleFlags */
+	uint8		flags;
+
+	uint8		num_shared_callbacks;
+
+	/* using the proper type here would use more space */
+	uint8		shared_callbacks[AIO_MAX_SHARED_CALLBACKS];
+
+	uint8		iovec_data_len;
+
+	/* XXX: could be optimized out with some pointer math */
+	int32		owner_procno;
+
+	/* FIXME: remove in favor of distilled_result */
+	/* raw result of the IO operation */
+	int32		result;
+
+	/* index into PgAioCtl->iovecs */
+	uint32		iovec_off;
+
+	/*
+	 * List of bounce_buffers owned by IO. It would suffice to use an index
+	 * based linked list here.
+	 */
+	slist_head	bounce_buffers;
+
+	/**
+	 * In which list the handle is registered, depends on the state:
+	 * - IDLE, in per-backend list
+	 * - HANDED_OUT - not in a list
+	 * - DEFINED - in per-backend staged list
+	 * - PREPARED - in per-backend staged list
+	 * - IN_FLIGHT - not in any list
+	 * - REAPED - in per-reap context list
+	 * - COMPLETED_SHARED - not in any list
+	 * - COMPLETED_LOCAL - not in any list
+	 *
+	 * XXX: It probably make sense to optimize this out to save on per-io
+	 * memory at the cost of per-backend memory.
+	 **/
+	dlist_node	node;
+
+	struct ResourceOwnerData *resowner;
+	dlist_node	resowner_node;
+
+	/* incremented every time the IO handle is reused */
+	uint64		generation;
+
+	ConditionVariable cv;
+
+	/* result of shared callback, passed to issuer callback */
+	PgAioResult distilled_result;
+
+	PgAioReturn *report_return;
+
+	PgAioOpData op_data;
+
+	/*
+	 * Data necessary for shared completions. Needs to be sufficient to allow
+	 * another backend to retry an IO.
+	 */
+	PgAioSubjectData scb_data;
+};
+
+
+struct PgAioBounceBuffer
+{
+	slist_node	node;
+	struct ResourceOwnerData *resowner;
+	dlist_node	resowner_node;
+	char	   *buffer;
+};
+
+
+typedef struct PgAioPerBackend
+{
+	/* index into PgAioCtl->io_handles */
+	uint32		io_handle_off;
+
+	/* index into PgAioCtl->bounce_buffers */
+	uint32		bounce_buffers_off;
+
+	/* IO Handles that currently are not used */
+	dclist_head idle_ios;
+
+	/*
+	 * Only one IO may be returned by pgaio_io_get()/pgaio_io_get() without
+	 * having been either defined (by actually associating it with IO) or by
+	 * released (with pgaio_io_release()). This restriction is necessary to
+	 * guarantee that we always can acquire an IO. ->handed_out_io is used to
+	 * enforce that rule.
+	 */
+	PgAioHandle *handed_out_io;
+
+	/*
+	 * IOs that are defined, but not yet submitted.
+	 */
+	dclist_head staged_ios;
+
+	/* Bounce Buffers that currently are not used */
+	slist_head	idle_bbs;
+
+	/* see handed_out_io */
+	PgAioBounceBuffer *handed_out_bb;
+} PgAioPerBackend;
+
+
+typedef struct PgAioCtl
+{
+	int			backend_state_count;
+	PgAioPerBackend *backend_state;
+
+	/*
+	 * Array of iovec structs. Each iovec is owned by a specific backend. The
+	 * allocation is in PgAioCtl to allow the maximum number of iovecs for
+	 * individual IOs to be configurable with PGC_POSTMASTER GUC.
+	 */
+	uint64		iovec_count;
+	struct iovec *iovecs;
+
+	/*
+	 * For, e.g., an IO covering multiple buffers in shared / temp buffers, we
+	 * need to get Buffer IDs during completion to be able to change the
+	 * BufferDesc state accordingly. This space can be used to store e.g.
+	 * Buffer IDs.  Note that the actual iovec might be shorter than this,
+	 * because we combine neighboring pages into one larger iovec entry.
+	 */
+	uint64	   *iovecs_data;
+
+	/*
+	 * To perform AIO on buffers that are not located in shared memory (either
+	 * because they are not in shared memory or because we need to operate on
+	 * a copy, as e.g. the case for writes when checksums are in use)
+	 */
+	uint64		bounce_buffers_count;
+	PgAioBounceBuffer *bounce_buffers;
+	char	   *bounce_buffers_data;
+
+	uint64		io_handle_count;
+	PgAioHandle *io_handles;
+} PgAioCtl;
+
+
+
+/*
+ * The set of callbacks that each IO method must implement.
+ */
+typedef struct IoMethodOps
+{
+	/* initialization */
+	size_t		(*shmem_size) (void);
+	void		(*shmem_init) (bool first_time);
+
+	void		(*postmaster_init) (void);
+	void		(*postmaster_child_init_local) (void);
+	void		(*postmaster_child_init) (void);
+
+	/* teardown */
+	void		(*postmaster_before_child_exit) (void);
+
+	/* handling of IOs */
+	int			(*submit) (void);
+
+	void		(*wait_one) (PgAioHandle *ioh,
+							 uint64 ref_generation);
+
+	/* properties */
+	bool		can_scatter_gather_direct;
+	bool		can_scatter_gather_buffered;
+} IoMethodOps;
+
+
+extern bool pgaio_io_was_recycled(PgAioHandle *ioh, uint64 ref_generation, PgAioHandleState *state);
+
+extern void pgaio_io_prepare_subject(PgAioHandle *ioh);
+extern void pgaio_io_process_completion_subject(PgAioHandle *ioh);
+extern void pgaio_io_process_completion(PgAioHandle *ioh, int result);
+extern void pgaio_io_prepare_submit(PgAioHandle *ioh);
+
+extern bool pgaio_io_needs_synchronously(PgAioHandle *ioh);
+extern void pgaio_io_perform_synchronously(PgAioHandle *ioh);
+
+extern void pgaio_io_reopen(PgAioHandle *ioh);
+
+extern const char *pgaio_io_get_subject_name(PgAioHandle *ioh);
+extern const char *pgaio_io_get_op_name(PgAioHandle *ioh);
+extern const char *pgaio_io_get_state_name(PgAioHandle *ioh);
+
+
+/* Declarations for the tables of function pointers exposed by each IO method. */
+extern const IoMethodOps pgaio_worker_ops;
+#ifdef USE_LIBURING
+extern const IoMethodOps pgaio_uring_ops;
+#endif
+
+extern const IoMethodOps *pgaio_impl;
+extern PgAioCtl *aio_ctl;
+extern PgAioPerBackend *my_aio;
+
+
+
+#endif							/* AIO_INTERNAL_H */
diff --git a/src/include/storage/aio_ref.h b/src/include/storage/aio_ref.h
new file mode 100644
index 00000000000..ad7e9ad34f3
--- /dev/null
+++ b/src/include/storage/aio_ref.h
@@ -0,0 +1,24 @@
+/*-------------------------------------------------------------------------
+ *
+ * aio_ref.h Definition of PgAioHandleRef, which sometimes needs to be used in
+ *    headers.
+ *
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/storage/aio_ref.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef AIO_REF_H
+#define AIO_REF_H
+
+typedef struct PgAioHandleRef
+{
+	uint32		aio_index;
+	uint32		generation_upper;
+	uint32		generation_lower;
+} PgAioHandleRef;
+
+#endif							/* AIO_REF_H */
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index 00e8022fbad..f4e6abce327 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -217,6 +217,7 @@ typedef enum BuiltinTrancheIds
 	LWTRANCHE_SUBTRANS_SLRU,
 	LWTRANCHE_XACT_SLRU,
 	LWTRANCHE_PARALLEL_VACUUM_DSA,
+	LWTRANCHE_AIO_URING_COMPLETION,
 	LWTRANCHE_FIRST_USER_DEFINED,
 }			BuiltinTrancheIds;
 
diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h
index 88dc79b2bd6..7aaccf69d1e 100644
--- a/src/include/storage/lwlocklist.h
+++ b/src/include/storage/lwlocklist.h
@@ -84,3 +84,4 @@ PG_LWLOCK(50, DSMRegistry)
 PG_LWLOCK(51, InjectionPoint)
 PG_LWLOCK(52, SerialControl)
 PG_LWLOCK(53, WaitLSN)
+PG_LWLOCK(54, AioWorkerSubmissionQueue)
diff --git a/src/include/utils/resowner.h b/src/include/utils/resowner.h
index 4e534bc3e70..0cdd0c13ffb 100644
--- a/src/include/utils/resowner.h
+++ b/src/include/utils/resowner.h
@@ -164,4 +164,11 @@ struct LOCALLOCK;
 extern void ResourceOwnerRememberLock(ResourceOwner owner, struct LOCALLOCK *locallock);
 extern void ResourceOwnerForgetLock(ResourceOwner owner, struct LOCALLOCK *locallock);
 
+/* special support for AIO */
+struct dlist_node;
+extern void ResourceOwnerRememberAioHandle(ResourceOwner owner, struct dlist_node *ioh_node);
+extern void ResourceOwnerForgetAioHandle(ResourceOwner owner, struct dlist_node *ioh_node);
+extern void ResourceOwnerRememberAioBounceBuffer(ResourceOwner owner, struct dlist_node *bb_node);
+extern void ResourceOwnerForgetAioBounceBuffer(ResourceOwner owner, struct dlist_node *bb_node);
+
 #endif							/* RESOWNER_H */
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 0fe1630fca8..cb4ee5dfd1f 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -52,6 +52,7 @@
 #include "replication/origin.h"
 #include "replication/snapbuild.h"
 #include "replication/syncrep.h"
+#include "storage/aio.h"
 #include "storage/condition_variable.h"
 #include "storage/fd.h"
 #include "storage/lmgr.h"
@@ -2462,6 +2463,8 @@ CommitTransaction(void)
 	AtEOXact_LogicalRepWorkers(true);
 	pgstat_report_xact_timestamp(0);
 
+	pgaio_at_xact_end( /* is_subxact = */ false, /* is_commit = */ true);
+
 	ResourceOwnerDelete(TopTransactionResourceOwner);
 	s->curTransactionOwner = NULL;
 	CurTransactionResourceOwner = NULL;
@@ -2976,6 +2979,8 @@ AbortTransaction(void)
 		pgstat_report_xact_timestamp(0);
 	}
 
+	pgaio_at_xact_end( /* is_subxact = */ false, /* is_commit = */ false);
+
 	/*
 	 * State remains TRANS_ABORT until CleanupTransaction().
 	 */
@@ -5185,6 +5190,8 @@ CommitSubTransaction(void)
 	AtEOSubXact_PgStat(true, s->nestingLevel);
 	AtSubCommit_Snapshot(s->nestingLevel);
 
+	pgaio_at_xact_end( /* is_subxact = */ true, /* is_commit = */ true);
+
 	/*
 	 * We need to restore the upper transaction's read-only state, in case the
 	 * upper is read-write while the child is read-only; GUC will incorrectly
@@ -5350,6 +5357,8 @@ AbortSubTransaction(void)
 		AtSubAbort_Snapshot(s->nestingLevel);
 	}
 
+	pgaio_at_xact_end( /* is_subxact = */ true, /* is_commit = */ false);
+
 	/*
 	 * Restore the upper transaction's read-only state, too.  This should be
 	 * redundant with GUC's cleanup but we may as well do it for consistency
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index fc3901d5347..71930094309 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -4221,7 +4221,8 @@ maybe_reap_io_worker(int pid)
 static void
 maybe_adjust_io_workers(void)
 {
-	/* ATODO: This will need to check if io_method == worker */
+	if (!pgaio_workers_enabled())
+		return;
 
 	/*
 	 * If we're in final shutting down state, then we're just waiting for all
diff --git a/src/backend/storage/aio/Makefile b/src/backend/storage/aio/Makefile
index 824682e7354..2a5e72a8024 100644
--- a/src/backend/storage/aio/Makefile
+++ b/src/backend/storage/aio/Makefile
@@ -10,8 +10,11 @@ include $(top_builddir)/src/Makefile.global
 
 OBJS = \
 	aio.o \
+	aio_io.o \
 	aio_init.o \
+	aio_subject.o \
 	method_worker.o \
+	method_io_uring.o \
 	read_stream.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/aio/aio.c b/src/backend/storage/aio/aio.c
index 67f6b52de91..d6f9f658b97 100644
--- a/src/backend/storage/aio/aio.c
+++ b/src/backend/storage/aio/aio.c
@@ -14,7 +14,23 @@
 
 #include "postgres.h"
 
+#include "miscadmin.h"
+#include "port/atomics.h"
 #include "storage/aio.h"
+#include "storage/aio_internal.h"
+#include "storage/bufmgr.h"
+#include "utils/resowner.h"
+#include "utils/wait_event_types.h"
+
+
+
+static void pgaio_io_reclaim(PgAioHandle *ioh);
+static void pgaio_io_resowner_register(PgAioHandle *ioh);
+static void pgaio_io_wait_for_free(void);
+static PgAioHandle *pgaio_io_from_ref(PgAioHandleRef *ior, uint64 *ref_generation);
+
+static void pgaio_bounce_buffer_wait_for_free(void);
+
 
 
 /* Options for io_method. */
@@ -26,10 +42,955 @@ const struct config_enum_entry io_method_options[] = {
 	{NULL, 0, false}
 };
 
-int			io_method = IOMETHOD_WORKER;
+int			io_method = DEFAULT_IO_METHOD;
+int			io_max_concurrency = -1;
+int			io_bounce_buffers = -1;
+
+
+/* global control for AIO */
+PgAioCtl   *aio_ctl;
+
+/* current backend's per-backend state */
+PgAioPerBackend *my_aio;
+
+
+static const IoMethodOps *pgaio_ops_table[] = {
+	[IOMETHOD_WORKER] = &pgaio_worker_ops,
+#ifdef USE_LIBURING
+	[IOMETHOD_IO_URING] = &pgaio_uring_ops,
+#endif
+};
+
+
+const IoMethodOps *pgaio_impl;
+
+
+
+/* --------------------------------------------------------------------------------
+ * "Core" IO Api
+ * --------------------------------------------------------------------------------
+ */
+
+/*
+ * AFIXME: rewrite
+ *
+ * Shared completion callbacks can be executed by any backend (otherwise there
+ * would be deadlocks). Therefore they cannot update state for the issuer of
+ * the IO. That can be done with issuer callbacks.
+ *
+ * Note that issuer callbacks are effectively executed in a critical
+ * section. This is necessary as we need to be able to execute IO in critical
+ * sections (consider e.g. WAL logging) and to be able to execute IOs we need
+ * to acquire an IO, which in turn requires executing issuer callbacks. An
+ * alternative scheme could be to defer local callback execution until a later
+ * point, but that gets complicated quickly.
+ *
+ * Therefore the typical pattern is to use an issuer callback to set some
+ * flags in backend local memory, which can then be used to error out at a
+ * later time.
+ *
+ * NB: The issuer callback is cleared when the resowner owning the IO goes out
+ * of scope.
+ */
+PgAioHandle *
+pgaio_io_get(struct ResourceOwnerData *resowner, PgAioReturn *ret)
+{
+	PgAioHandle *h;
+
+	while (true)
+	{
+		h = pgaio_io_get_nb(resowner, ret);
+
+		if (h != NULL)
+			return h;
+
+		/*
+		 * Evidently all handles by this backend are in use. Just wait for
+		 * some to complete.
+		 */
+		pgaio_io_wait_for_free();
+	}
+}
+
+PgAioHandle *
+pgaio_io_get_nb(struct ResourceOwnerData *resowner, PgAioReturn *ret)
+{
+	if (dclist_count(&my_aio->staged_ios) >= PGAIO_SUBMIT_BATCH_SIZE)
+	{
+		pgaio_submit_staged();
+	}
+
+	if (my_aio->handed_out_io)
+	{
+		ereport(ERROR,
+				errmsg("API violation: Only one IO can be handed out"));
+	}
+
+	if (!dclist_is_empty(&my_aio->idle_ios))
+	{
+		dlist_node *ion = dclist_pop_head_node(&my_aio->idle_ios);
+		PgAioHandle *ioh = dclist_container(PgAioHandle, node, ion);
+
+		Assert(ioh->state == AHS_IDLE);
+		Assert(ioh->owner_procno == MyProcNumber);
+
+		ioh->state = AHS_HANDED_OUT;
+		my_aio->handed_out_io = ioh;
+
+		if (resowner)
+			pgaio_io_resowner_register(ioh);
+
+		if (ret)
+			ioh->report_return = ret;
+
+		return ioh;
+	}
+
+	return NULL;
+}
+
+void
+pgaio_io_release(PgAioHandle *ioh)
+{
+	if (ioh == my_aio->handed_out_io)
+	{
+		Assert(ioh->state == AHS_HANDED_OUT);
+		Assert(ioh->resowner);
+
+		my_aio->handed_out_io = NULL;
+		pgaio_io_reclaim(ioh);
+	}
+	else
+	{
+		elog(ERROR, "release in unexpected state");
+	}
+}
+
+void
+pgaio_io_release_resowner(dlist_node *ioh_node, bool on_error)
+{
+	PgAioHandle *ioh = dlist_container(PgAioHandle, resowner_node, ioh_node);
+
+	Assert(ioh->resowner);
+
+	ResourceOwnerForgetAioHandle(ioh->resowner, &ioh->resowner_node);
+	ioh->resowner = NULL;
+
+	switch (ioh->state)
+	{
+		case AHS_IDLE:
+			elog(ERROR, "unexpected");
+			break;
+		case AHS_HANDED_OUT:
+			Assert(ioh == my_aio->handed_out_io || my_aio->handed_out_io == NULL);
+
+			if (ioh == my_aio->handed_out_io)
+			{
+				my_aio->handed_out_io = NULL;
+				if (!on_error)
+					elog(WARNING, "leaked AIO handle");
+			}
+
+			pgaio_io_reclaim(ioh);
+			break;
+		case AHS_DEFINED:
+		case AHS_PREPARED:
+			/* XXX: Should we warn about this when is_commit? */
+			pgaio_submit_staged();
+			break;
+		case AHS_IN_FLIGHT:
+		case AHS_REAPED:
+		case AHS_COMPLETED_SHARED:
+			/* this is expected to happen */
+			break;
+		case AHS_COMPLETED_LOCAL:
+			/* XXX: unclear if this ought to be possible? */
+			pgaio_io_reclaim(ioh);
+			break;
+	}
+
+	/*
+	 * Need to unregister the reporting of the IO's result, the memory it's
+	 * referencing likely has gone away.
+	 */
+	if (ioh->report_return)
+		ioh->report_return = NULL;
+}
+
+int
+pgaio_io_get_iovec(PgAioHandle *ioh, struct iovec **iov)
+{
+	Assert(ioh->state == AHS_HANDED_OUT);
+
+	*iov = &aio_ctl->iovecs[ioh->iovec_off];
+
+	/* AFIXME: Needs to be the value at startup time */
+	return io_combine_limit;
+}
+
+PgAioSubjectData *
+pgaio_io_get_subject_data(PgAioHandle *ioh)
+{
+	return &ioh->scb_data;
+}
+
+PgAioOpData *
+pgaio_io_get_op_data(PgAioHandle *ioh)
+{
+	return &ioh->op_data;
+}
+
+ProcNumber
+pgaio_io_get_owner(PgAioHandle *ioh)
+{
+	return ioh->owner_procno;
+}
+
+bool
+pgaio_io_has_subject(PgAioHandle *ioh)
+{
+	return ioh->subject != ASI_INVALID;
+}
+
+void
+pgaio_io_set_flag(PgAioHandle *ioh, PgAioHandleFlags flag)
+{
+	Assert(ioh->state == AHS_HANDED_OUT);
+
+	ioh->flags |= flag;
+}
+
+void
+pgaio_io_set_io_data_32(PgAioHandle *ioh, uint32 *data, uint8 len)
+{
+	Assert(ioh->state == AHS_HANDED_OUT);
+
+	for (int i = 0; i < len; i++)
+		aio_ctl->iovecs_data[ioh->iovec_off + i] = data[i];
+	ioh->iovec_data_len = len;
+}
+
+uint64 *
+pgaio_io_get_io_data(PgAioHandle *ioh, uint8 *len)
+{
+	Assert(ioh->iovec_data_len > 0);
+
+	*len = ioh->iovec_data_len;
+
+	return &aio_ctl->iovecs_data[ioh->iovec_off];
+}
+
+void
+pgaio_io_set_subject(PgAioHandle *ioh, PgAioSubjectID subjid)
+{
+	Assert(ioh->state == AHS_HANDED_OUT);
+
+	ioh->subject = subjid;
+
+	elog(DEBUG3, "io:%d, op %s, subject %s, set subject",
+		 pgaio_io_get_id(ioh),
+		 pgaio_io_get_op_name(ioh),
+		 pgaio_io_get_subject_name(ioh));
+}
+
+void
+pgaio_io_get_ref(PgAioHandle *ioh, PgAioHandleRef *ior)
+{
+	Assert(ioh->state == AHS_HANDED_OUT ||
+		   ioh->state == AHS_DEFINED ||
+		   ioh->state == AHS_PREPARED);
+	Assert(ioh->generation != 0);
+
+	ior->aio_index = ioh - aio_ctl->io_handles;
+	ior->generation_upper = (uint32) (ioh->generation >> 32);
+	ior->generation_lower = (uint32) ioh->generation;
+}
+
+void
+pgaio_io_ref_clear(PgAioHandleRef *ior)
+{
+	ior->aio_index = PG_UINT32_MAX;
+}
+
+bool
+pgaio_io_ref_valid(PgAioHandleRef *ior)
+{
+	return ior->aio_index != PG_UINT32_MAX;
+}
+
+int
+pgaio_io_ref_get_id(PgAioHandleRef *ior)
+{
+	Assert(pgaio_io_ref_valid(ior));
+	return ior->aio_index;
+}
+
+bool
+pgaio_io_was_recycled(PgAioHandle *ioh, uint64 ref_generation, PgAioHandleState *state)
+{
+	*state = ioh->state;
+	pg_read_barrier();
+
+	return ioh->generation != ref_generation;
+}
+
+void
+pgaio_io_ref_wait(PgAioHandleRef *ior)
+{
+	uint64		ref_generation;
+	PgAioHandleState state;
+	bool		am_owner;
+	PgAioHandle *ioh;
+
+	ioh = pgaio_io_from_ref(ior, &ref_generation);
+
+	am_owner = ioh->owner_procno == MyProcNumber;
+
+
+	if (pgaio_io_was_recycled(ioh, ref_generation, &state))
+		return;
+
+	if (am_owner)
+	{
+		if (state == AHS_DEFINED || state == AHS_PREPARED)
+		{
+			/* XXX: Arguably this should be prevented by callers? */
+			pgaio_submit_staged();
+		}
+		else if (state != AHS_IN_FLIGHT && state != AHS_REAPED && state != AHS_COMPLETED_SHARED && state != AHS_COMPLETED_LOCAL)
+		{
+			elog(PANIC, "waiting for own IO in wrong state: %d",
+				 state);
+		}
+
+		/*
+		 * Somebody else completed the IO, need to execute issuer callback, so
+		 * reclaim eagerly.
+		 */
+		if (state == AHS_COMPLETED_LOCAL)
+		{
+			pgaio_io_reclaim(ioh);
+
+			return;
+		}
+	}
+
+	while (true)
+	{
+		if (pgaio_io_was_recycled(ioh, ref_generation, &state))
+			return;
+
+		switch (state)
+		{
+			case AHS_IDLE:
+			case AHS_HANDED_OUT:
+				elog(ERROR, "IO in wrong state: %d", state);
+				break;
+
+			case AHS_IN_FLIGHT:
+				if (pgaio_impl->wait_one)
+				{
+					pgaio_impl->wait_one(ioh, ref_generation);
+					continue;
+				}
+				/* fallthrough */
+
+				/* waiting for owner to submit */
+			case AHS_PREPARED:
+			case AHS_DEFINED:
+				/* waiting for reaper to complete */
+				/* fallthrough */
+			case AHS_REAPED:
+				/* shouldn't be able to hit this otherwise */
+				Assert(IsUnderPostmaster);
+				/* ensure we're going to get woken up */
+				ConditionVariablePrepareToSleep(&ioh->cv);
+
+				while (!pgaio_io_was_recycled(ioh, ref_generation, &state))
+				{
+					if (state != AHS_REAPED && state != AHS_DEFINED &&
+						state != AHS_IN_FLIGHT)
+						break;
+					ConditionVariableSleep(&ioh->cv, WAIT_EVENT_AIO_COMPLETION);
+				}
+
+				ConditionVariableCancelSleep();
+				break;
+
+			case AHS_COMPLETED_SHARED:
+				/* see above */
+				if (am_owner)
+					pgaio_io_reclaim(ioh);
+				return;
+			case AHS_COMPLETED_LOCAL:
+				return;
+		}
+	}
+}
+
+bool
+pgaio_io_ref_check_done(PgAioHandleRef *ior)
+{
+	uint64		ref_generation;
+	PgAioHandleState state;
+	bool		am_owner;
+	PgAioHandle *ioh;
+
+	ioh = pgaio_io_from_ref(ior, &ref_generation);
+
+	if (pgaio_io_was_recycled(ioh, ref_generation, &state))
+		return true;
+
+
+	if (state == AHS_IDLE)
+		return true;
+
+	am_owner = ioh->owner_procno == MyProcNumber;
+
+	if (state == AHS_COMPLETED_SHARED || state == AHS_COMPLETED_LOCAL)
+	{
+		if (am_owner)
+			pgaio_io_reclaim(ioh);
+		return true;
+	}
+
+	return false;
+}
+
+int
+pgaio_io_get_id(PgAioHandle *ioh)
+{
+	Assert(ioh >= aio_ctl->io_handles &&
+		   ioh <= (aio_ctl->io_handles + aio_ctl->io_handle_count));
+	return ioh - aio_ctl->io_handles;
+}
+
+const char *
+pgaio_io_get_state_name(PgAioHandle *ioh)
+{
+	switch (ioh->state)
+	{
+		case AHS_IDLE:
+			return "idle";
+		case AHS_HANDED_OUT:
+			return "handed_out";
+		case AHS_DEFINED:
+			return "DEFINED";
+		case AHS_PREPARED:
+			return "PREPARED";
+		case AHS_IN_FLIGHT:
+			return "IN_FLIGHT";
+		case AHS_REAPED:
+			return "REAPED";
+		case AHS_COMPLETED_SHARED:
+			return "COMPLETED_SHARED";
+		case AHS_COMPLETED_LOCAL:
+			return "COMPLETED_LOCAL";
+	}
+	pg_unreachable();
+}
+
+/*
+ * Internal, should only be called from pgaio_io_prep_*().
+ */
+void
+pgaio_io_prepare(PgAioHandle *ioh, PgAioOp op)
+{
+	Assert(ioh->state == AHS_HANDED_OUT);
+	Assert(pgaio_io_has_subject(ioh));
+
+	ioh->op = op;
+	ioh->state = AHS_DEFINED;
+	ioh->result = 0;
+
+	/* allow a new IO to be staged */
+	my_aio->handed_out_io = NULL;
+
+	dclist_push_tail(&my_aio->staged_ios, &ioh->node);
+
+	pgaio_io_prepare_subject(ioh);
+
+	ioh->state = AHS_PREPARED;
+
+	elog(DEBUG3, "io:%d: prepared %s",
+		 pgaio_io_get_id(ioh), pgaio_io_get_op_name(ioh));
+}
+
+/*
+ * Handle IO getting completed by a method.
+ */
+void
+pgaio_io_process_completion(PgAioHandle *ioh, int result)
+{
+	ioh->result = result;
+
+	pg_write_barrier();
+
+	/* FIXME: should be done in separate function */
+	ioh->state = AHS_REAPED;
+
+	pgaio_io_process_completion_subject(ioh);
+
+	/* ensure results of completion are visible before the new state */
+	pg_write_barrier();
+
+	ioh->state = AHS_COMPLETED_SHARED;
+
+	/* condition variable broadcast ensures state is visible before wakeup */
+	ConditionVariableBroadcast(&ioh->cv);
+
+	if (ioh->owner_procno == MyProcNumber)
+		pgaio_io_reclaim(ioh);
+}
+
+/*
+ * Handle IO being processed by IO method.
+ */
+void
+pgaio_io_prepare_submit(PgAioHandle *ioh)
+{
+	ioh->state = AHS_IN_FLIGHT;
+	pg_write_barrier();
+
+	dclist_delete_from(&my_aio->staged_ios, &ioh->node);
+}
+
+static PgAioHandle *
+pgaio_io_from_ref(PgAioHandleRef *ior, uint64 *ref_generation)
+{
+	PgAioHandle *ioh;
+
+	Assert(ior->aio_index < aio_ctl->io_handle_count);
+
+	ioh = &aio_ctl->io_handles[ior->aio_index];
+
+	*ref_generation = ((uint64) ior->generation_upper) << 32 |
+		ior->generation_lower;
+
+	Assert(*ref_generation != 0);
+
+	return ioh;
+}
+
+static void
+pgaio_io_resowner_register(PgAioHandle *ioh)
+{
+	Assert(!ioh->resowner);
+	Assert(CurrentResourceOwner);
+
+	ResourceOwnerRememberAioHandle(CurrentResourceOwner, &ioh->resowner_node);
+	ioh->resowner = CurrentResourceOwner;
+}
+
+static void
+pgaio_io_reclaim(PgAioHandle *ioh)
+{
+	/* This is only ok if it's our IO */
+	Assert(ioh->owner_procno == MyProcNumber);
+
+	ereport(DEBUG3,
+			errmsg("reclaiming io:%d, state: %s, op %s, subject %s, result: %d, distilled_result: AFIXME, report to: %p",
+				   pgaio_io_get_id(ioh),
+				   pgaio_io_get_state_name(ioh),
+				   pgaio_io_get_op_name(ioh),
+				   pgaio_io_get_subject_name(ioh),
+				   ioh->result,
+				   ioh->report_return
+				   ),
+			errhidestmt(true), errhidecontext(true));
+
+	if (ioh->report_return)
+	{
+		if (ioh->state != AHS_HANDED_OUT)
+		{
+			ioh->report_return->result = ioh->distilled_result;
+			ioh->report_return->subject_data = ioh->scb_data;
+		}
+	}
+
+	/* reclaim all associated bounce buffers */
+	if (!slist_is_empty(&ioh->bounce_buffers))
+	{
+		slist_mutable_iter it;
+
+		slist_foreach_modify(it, &ioh->bounce_buffers)
+		{
+			PgAioBounceBuffer *bb = slist_container(PgAioBounceBuffer, node, it.cur);
+
+			slist_delete_current(&it);
+
+			slist_push_head(&my_aio->idle_bbs, &bb->node);
+		}
+	}
+
+	if (ioh->resowner)
+	{
+		ResourceOwnerForgetAioHandle(ioh->resowner, &ioh->resowner_node);
+		ioh->resowner = NULL;
+	}
+
+	Assert(!ioh->resowner);
+
+	ioh->num_shared_callbacks = 0;
+	ioh->iovec_data_len = 0;
+	ioh->report_return = NULL;
+	ioh->flags = 0;
+
+	pg_write_barrier();
+	ioh->generation++;
+	pg_write_barrier();
+	ioh->state = AHS_IDLE;
+	pg_write_barrier();
+
+	dclist_push_tail(&my_aio->idle_ios, &ioh->node);
+}
+
+static void
+pgaio_io_wait_for_free(void)
+{
+	bool		found_handed_out = false;
+	int			reclaimed = 0;
+	static uint32 lastpos = 0;
+
+	elog(DEBUG2,
+		 "waiting for self: %d pending",
+		 dclist_count(&my_aio->staged_ios));
+
+	/*
+	 * First check if any of our IOs actually have completed - when using
+	 * worker, that'll often be the case. We could do so as part of the loop
+	 * below, but that'd potentially lead us to wait for some IO submitted
+	 * before.
+	 */
+	for (int i = 0; i < io_max_concurrency; i++)
+	{
+		PgAioHandle *ioh = &aio_ctl->io_handles[my_aio->io_handle_off + i];
+
+		if (ioh->state == AHS_COMPLETED_SHARED)
+		{
+			pgaio_io_reclaim(ioh);
+			reclaimed++;
+		}
+	}
+
+	if (reclaimed > 0)
+		return;
+
+	if (!dclist_is_empty(&my_aio->staged_ios))
+	{
+		elog(DEBUG2, "submitting while acquiring free io");
+		pgaio_submit_staged();
+	}
+
+	for (uint32 i = lastpos; i < lastpos + io_max_concurrency; i++)
+	{
+		uint32		thisoff = my_aio->io_handle_off + (i % io_max_concurrency);
+		PgAioHandle *ioh = &aio_ctl->io_handles[thisoff];
+
+		switch (ioh->state)
+		{
+			case AHS_IDLE:
+
+				/*
+				 * While one might think that pgaio_io_get_nb() should have
+				 * succeeded, this is reachable because the IO could have
+				 * completed during the submission above.
+				 */
+				return;
+			case AHS_DEFINED:	/* should have been submitted above */
+			case AHS_PREPARED:
+			case AHS_COMPLETED_LOCAL:
+				elog(ERROR, "shouldn't get here with io:%d in state %d",
+					 pgaio_io_get_id(ioh), ioh->state);
+				break;
+			case AHS_HANDED_OUT:
+				if (found_handed_out)
+					elog(ERROR, "more than one handed out IO");
+				found_handed_out = true;
+				continue;
+			case AHS_REAPED:
+			case AHS_IN_FLIGHT:
+				{
+					PgAioHandleRef ior;
+
+					ior.aio_index = ioh - aio_ctl->io_handles;
+					ior.generation_upper = (uint32) (ioh->generation >> 32);
+					ior.generation_lower = (uint32) ioh->generation;
+
+					pgaio_io_ref_wait(&ior);
+					elog(DEBUG2, "waited for io:%d",
+						 pgaio_io_get_id(ioh));
+					lastpos = i;
+					return;
+				}
+				break;
+			case AHS_COMPLETED_SHARED:
+				/* reclaim */
+				pgaio_io_reclaim(ioh);
+				lastpos = i;
+				return;
+		}
+	}
+
+	elog(PANIC, "could not reclaim any handles");
+}
+
+
+
+/* --------------------------------------------------------------------------------
+ * Bounce Buffers
+ * --------------------------------------------------------------------------------
+ */
+
+PgAioBounceBuffer *
+pgaio_bounce_buffer_get(void)
+{
+	PgAioBounceBuffer *bb = NULL;
+	slist_node *node;
+
+	if (my_aio->handed_out_bb != NULL)
+		elog(ERROR, "can only hand out one BB");
+
+	/*
+	 * FIXME It probably is not correct to have bounce buffers be per backend,
+	 * they use too much memory.
+	 */
+	if (slist_is_empty(&my_aio->idle_bbs))
+	{
+		pgaio_bounce_buffer_wait_for_free();
+	}
+
+	node = slist_pop_head_node(&my_aio->idle_bbs);
+	bb = slist_container(PgAioBounceBuffer, node, node);
+
+	my_aio->handed_out_bb = bb;
+
+	bb->resowner = CurrentResourceOwner;
+	ResourceOwnerRememberAioBounceBuffer(bb->resowner, &bb->resowner_node);
+
+	return bb;
+}
+
+void
+pgaio_io_assoc_bounce_buffer(PgAioHandle *ioh, PgAioBounceBuffer *bb)
+{
+	if (my_aio->handed_out_bb != bb)
+		elog(ERROR, "can only assign handed out BB");
+	my_aio->handed_out_bb = NULL;
+
+	/*
+	 * There can be many bounce buffers assigned in case of vectorized IOs.
+	 */
+	slist_push_head(&ioh->bounce_buffers, &bb->node);
+
+	/* once associated with an IO, the IO has ownership */
+	ResourceOwnerForgetAioBounceBuffer(bb->resowner, &bb->resowner_node);
+	bb->resowner = NULL;
+}
+
+uint32
+pgaio_bounce_buffer_id(PgAioBounceBuffer *bb)
+{
+	return bb - aio_ctl->bounce_buffers;
+}
+
+void
+pgaio_bounce_buffer_release(PgAioBounceBuffer *bb)
+{
+	if (my_aio->handed_out_bb != bb)
+		elog(ERROR, "can only release handed out BB");
+
+	slist_push_head(&my_aio->idle_bbs, &bb->node);
+	my_aio->handed_out_bb = NULL;
+
+	ResourceOwnerForgetAioBounceBuffer(bb->resowner, &bb->resowner_node);
+	bb->resowner = NULL;
+}
+
+void
+pgaio_bounce_buffer_release_resowner(dlist_node *bb_node, bool on_error)
+{
+	PgAioBounceBuffer *bb = dlist_container(PgAioBounceBuffer, resowner_node, bb_node);
+
+	Assert(bb->resowner);
+
+	if (!on_error)
+		elog(WARNING, "leaked AIO bounce buffer");
+
+	pgaio_bounce_buffer_release(bb);
+}
+
+char *
+pgaio_bounce_buffer_buffer(PgAioBounceBuffer *bb)
+{
+	return bb->buffer;
+}
+
+static void
+pgaio_bounce_buffer_wait_for_free(void)
+{
+	static uint32 lastpos = 0;
+
+	if (!dclist_is_empty(&my_aio->staged_ios))
+	{
+		elog(DEBUG2, "submitting while acquiring free bb");
+		pgaio_submit_staged();
+	}
+
+	for (uint32 i = lastpos; i < lastpos + io_max_concurrency; i++)
+	{
+		uint32		thisoff = my_aio->io_handle_off + (i % io_max_concurrency);
+		PgAioHandle *ioh = &aio_ctl->io_handles[thisoff];
+
+		switch (ioh->state)
+		{
+			case AHS_IDLE:
+			case AHS_HANDED_OUT:
+				continue;
+			case AHS_DEFINED:	/* should have been submitted above */
+			case AHS_PREPARED:
+				elog(ERROR, "shouldn't get here with io:%d in state %d",
+					 pgaio_io_get_id(ioh), ioh->state);
+				break;
+			case AHS_REAPED:
+			case AHS_IN_FLIGHT:
+				if (!slist_is_empty(&ioh->bounce_buffers))
+				{
+					PgAioHandleRef ior;
+
+					ior.aio_index = ioh - aio_ctl->io_handles;
+					ior.generation_upper = (uint32) (ioh->generation >> 32);
+					ior.generation_lower = (uint32) ioh->generation;
+
+					pgaio_io_ref_wait(&ior);
+					elog(DEBUG2, "waited for io:%d to reclaim BB",
+						 pgaio_io_get_id(ioh));
+
+					if (slist_is_empty(&my_aio->idle_bbs))
+						elog(WARNING, "empty after wait");
+
+					if (!slist_is_empty(&my_aio->idle_bbs))
+					{
+						lastpos = i;
+						return;
+					}
+				}
+				break;
+			case AHS_COMPLETED_SHARED:
+			case AHS_COMPLETED_LOCAL:
+				/* reclaim */
+				pgaio_io_reclaim(ioh);
+
+				if (!slist_is_empty(&my_aio->idle_bbs))
+				{
+					lastpos = i;
+					return;
+				}
+				break;
+		}
+	}
+
+	/*
+	 * The submission above could have caused the IO to complete at any time.
+	 */
+	if (slist_is_empty(&my_aio->idle_bbs))
+		elog(PANIC, "no more bbs");
+}
+
+
+
+/* --------------------------------------------------------------------------------
+ * Actions on multiple IOs.
+ * --------------------------------------------------------------------------------
+ */
+
+void
+pgaio_submit_staged(void)
+{
+	int			total_submitted = 0;
+
+	if (dclist_is_empty(&my_aio->staged_ios))
+		return;
+
+	while (!dclist_is_empty(&my_aio->staged_ios))
+	{
+		int			staged_count PG_USED_FOR_ASSERTS_ONLY = dclist_count(&my_aio->staged_ios);
+		int			did_submit;
+
+		Assert(staged_count > 0);
+
+		START_CRIT_SECTION();
+		END_CRIT_SECTION();
+
+		did_submit = pgaio_impl->submit();
+
+		total_submitted += did_submit;
+	}
+
+#ifdef PGAIO_VERBOSE
+	ereport(DEBUG2,
+			errmsg("submitted %d",
+				   total_submitted),
+			errhidestmt(true),
+			errhidecontext(true));
+#endif
+}
+
+bool
+pgaio_have_staged(void)
+{
+	return !dclist_is_empty(&my_aio->staged_ios);
+}
+
+
+
+/* --------------------------------------------------------------------------------
+ * Other
+ * --------------------------------------------------------------------------------
+ */
+
+/*
+ * Need to submit staged but not yet submitted IOs using the fd, otherwise
+ * the IO would end up targeting something bogus.
+ */
+void
+pgaio_closing_fd(int fd)
+{
+	/*
+	 * Might be called before AIO is initialized or in a subprocess that
+	 * doesn't use AIO.
+	 */
+	if (!my_aio)
+		return;
+
+	/*
+	 * For now just submit all staged IOs - we could be more selective, but
+	 * it's probably not worth it.
+	 */
+	pgaio_submit_staged();
+}
+
+void
+pgaio_at_xact_end(bool is_subxact, bool is_commit)
+{
+	Assert(!my_aio->handed_out_io);
+	Assert(!my_aio->handed_out_bb);
+}
+
+/*
+ * Similar to pgaio_at_xact_end(..., is_commit = false), but for cases where
+ * errors happen outside of transactions.
+ */
+void
+pgaio_at_error(void)
+{
+	Assert(!my_aio->handed_out_io);
+	Assert(!my_aio->handed_out_bb);
+}
 
 
 void
 assign_io_method(int newval, void *extra)
 {
+	pgaio_impl = pgaio_ops_table[newval];
 }
diff --git a/src/backend/storage/aio/aio_init.c b/src/backend/storage/aio/aio_init.c
index 1c277a7eb3b..cf3512f79fc 100644
--- a/src/backend/storage/aio/aio_init.c
+++ b/src/backend/storage/aio/aio_init.c
@@ -14,33 +14,351 @@
 
 #include "postgres.h"
 
+#include "miscadmin.h"
+#include "storage/aio.h"
 #include "storage/aio_init.h"
+#include "storage/aio_internal.h"
+#include "storage/bufmgr.h"
+#include "storage/io_worker.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
 
 
+static Size
+AioCtlShmemSize(void)
+{
+	Size		sz;
+
+	/* aio_ctl itself */
+	sz = offsetof(PgAioCtl, io_handles);
+
+	return sz;
+}
+
+static uint32
+AioProcs(void)
+{
+	/*
+	 * While AIO workers don't need their own AIO context, we can't currently
+	 * guarantee nothing gets assigned to the a ProcNumber for an IO worker if
+	 * we just subtracted MAX_IO_WORKERS.
+	 */
+	return MaxBackends + NUM_AUXILIARY_PROCS;
+}
+
+static Size
+AioBackendShmemSize(void)
+{
+	return mul_size(AioProcs(), sizeof(PgAioPerBackend));
+}
+
+static Size
+AioHandleShmemSize(void)
+{
+	Size		sz;
+
+	/* ios */
+	sz = mul_size(AioProcs(),
+				  mul_size(io_max_concurrency, sizeof(PgAioHandle)));
+
+	return sz;
+}
+
+static Size
+AioIOVShmemSize(void)
+{
+	/* FIXME: io_combine_limit is USERSET */
+	return mul_size(sizeof(struct iovec),
+					mul_size(mul_size(io_combine_limit, AioProcs()),
+							 io_max_concurrency));
+}
+
+static Size
+AioIOVDataShmemSize(void)
+{
+	/* FIXME: io_combine_limit is USERSET */
+	return mul_size(sizeof(uint64),
+					mul_size(mul_size(io_combine_limit, AioProcs()),
+							 io_max_concurrency));
+}
+
+static Size
+AioBounceBufferDescShmemSize(void)
+{
+	Size		sz;
+
+	/* PgAioBounceBuffer itself */
+	sz = mul_size(sizeof(PgAioBounceBuffer),
+				  mul_size(AioProcs(), io_bounce_buffers));
+
+	return sz;
+}
+
+static Size
+AioBounceBufferDataShmemSize(void)
+{
+	Size		sz;
+
+	/* and the associated buffer */
+	sz = mul_size(BLCKSZ,
+				  mul_size(io_bounce_buffers, AioProcs()));
+	/* memory for alignment */
+	sz += BLCKSZ;
+
+	return sz;
+}
+
+/*
+ * Choose a suitable value for io_max_concurrency.
+ *
+ * It's unlikely that we could have more IOs in flight than buffers that we
+ * would be allowed to pin.
+ *
+ * On the upper end, apply a cap too - just because shared_buffers is large,
+ * it doesn't make sense have millions of buffers undergo IO concurrently.
+ */
+static int
+AioChooseMaxConccurrency(void)
+{
+	uint32		max_backends;
+	int			max_proportional_pins;
+
+	/* Similar logic to LimitAdditionalPins() */
+	max_backends = MaxBackends + NUM_AUXILIARY_PROCS;
+	max_proportional_pins = NBuffers / max_backends;
+
+	max_proportional_pins = Max(max_proportional_pins, 1);
+
+	/* apply upper limit */
+	return Min(max_proportional_pins, 64);
+}
+
+/*
+ * Choose a suitable value for io_bounce_buffers.
+ *
+ * It's very unlikely to be useful to allocate more bounce buffers for each
+ * backend than the backend is allowed to pin. Additionally, bounce buffers
+ * currently are used for writes, it seems very uncommon for more than 10% of
+ * shared_buffers to be written out concurrently.
+ *
+ * XXX: This quickly can take up significant amounts of memory, the logic
+ * should probably fine tuned.
+ */
+static int
+AioChooseBounceBuffers(void)
+{
+	uint32		max_backends;
+	int			max_proportional_pins;
+
+	/* Similar logic to LimitAdditionalPins() */
+	max_backends = MaxBackends + NUM_AUXILIARY_PROCS;
+	max_proportional_pins = (NBuffers / 10) / max_backends;
+
+	max_proportional_pins = Max(max_proportional_pins, 1);
+
+	/* apply upper limit */
+	return Min(max_proportional_pins, 256);
+}
+
 Size
 AioShmemSize(void)
 {
 	Size		sz = 0;
 
+	/*
+	 * We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT.
+	 * However, if the DBA explicitly set wal_buffers = -1 in the config file,
+	 * then PGC_S_DYNAMIC_DEFAULT will fail to override that and we must force
+	 *
+	 */
+	if (io_max_concurrency == -1)
+	{
+		char		buf[32];
+
+		snprintf(buf, sizeof(buf), "%d", AioChooseMaxConccurrency());
+		SetConfigOption("io_max_concurrency", buf, PGC_POSTMASTER,
+						PGC_S_DYNAMIC_DEFAULT);
+		if (io_bounce_buffers == -1)	/* failed to apply it? */
+			SetConfigOption("io_max_concurrency", buf, PGC_POSTMASTER,
+							PGC_S_OVERRIDE);
+	}
+
+
+	/*
+	 * If io_bounce_buffers is -1, we automatically choose a suitable value.
+	 *
+	 * See also comment above.
+	 */
+	if (io_bounce_buffers == -1)
+	{
+		char		buf[32];
+
+		snprintf(buf, sizeof(buf), "%d", AioChooseBounceBuffers());
+		SetConfigOption("io_bounce_buffers", buf, PGC_POSTMASTER,
+						PGC_S_DYNAMIC_DEFAULT);
+		if (io_bounce_buffers == -1)	/* failed to apply it? */
+			SetConfigOption("io_bounce_buffers", buf, PGC_POSTMASTER,
+							PGC_S_OVERRIDE);
+	}
+
+	sz = add_size(sz, AioCtlShmemSize());
+	sz = add_size(sz, AioBackendShmemSize());
+	sz = add_size(sz, AioHandleShmemSize());
+	sz = add_size(sz, AioIOVShmemSize());
+	sz = add_size(sz, AioIOVDataShmemSize());
+	sz = add_size(sz, AioBounceBufferDescShmemSize());
+	sz = add_size(sz, AioBounceBufferDataShmemSize());
+
+	if (pgaio_impl->shmem_size)
+		sz = add_size(sz, pgaio_impl->shmem_size());
+
 	return sz;
 }
 
 void
 AioShmemInit(void)
 {
+	bool		found;
+	uint32		io_handle_off = 0;
+	uint32		iovec_off = 0;
+	uint32		bounce_buffers_off = 0;
+	uint32		per_backend_iovecs = io_max_concurrency * io_combine_limit;
+	uint32		per_backend_bb = io_bounce_buffers;
+	char	   *bounce_buffers_data;
+
+	aio_ctl = (PgAioCtl *)
+		ShmemInitStruct("AioCtl", AioCtlShmemSize(), &found);
+
+	if (found)
+		goto out;
+
+	memset(aio_ctl, 0, AioCtlShmemSize());
+
+	aio_ctl->io_handle_count = AioProcs() * io_max_concurrency;
+	aio_ctl->iovec_count = AioProcs() * per_backend_iovecs;
+	aio_ctl->bounce_buffers_count = AioProcs() * per_backend_bb;
+
+	aio_ctl->backend_state = (PgAioPerBackend *)
+		ShmemInitStruct("AioBackend", AioBackendShmemSize(), &found);
+
+	aio_ctl->io_handles = (PgAioHandle *)
+		ShmemInitStruct("AioHandle", AioHandleShmemSize(), &found);
+
+	aio_ctl->iovecs = ShmemInitStruct("AioIOV", AioIOVShmemSize(), &found);
+	aio_ctl->iovecs_data = ShmemInitStruct("AioIOVData", AioIOVDataShmemSize(), &found);
+
+	aio_ctl->bounce_buffers = ShmemInitStruct("AioBounceBufferDesc", AioBounceBufferDescShmemSize(), &found);
+
+	bounce_buffers_data = ShmemInitStruct("AioBounceBufferData", AioBounceBufferDataShmemSize(), &found);
+	bounce_buffers_data = (char *) TYPEALIGN(BLCKSZ, (uintptr_t) bounce_buffers_data);
+	aio_ctl->bounce_buffers_data = bounce_buffers_data;
+
+
+	/* Initialize IO handles. */
+	for (uint64 i = 0; i < aio_ctl->io_handle_count; i++)
+	{
+		PgAioHandle *ioh = &aio_ctl->io_handles[i];
+
+		ioh->op = PGAIO_OP_INVALID;
+		ioh->subject = ASI_INVALID;
+		ioh->state = AHS_IDLE;
+
+		slist_init(&ioh->bounce_buffers);
+	}
+
+	/* Initialize Bounce Buffers. */
+	for (uint64 i = 0; i < aio_ctl->bounce_buffers_count; i++)
+	{
+		PgAioBounceBuffer *bb = &aio_ctl->bounce_buffers[i];
+
+		bb->buffer = bounce_buffers_data;
+		bounce_buffers_data += BLCKSZ;
+	}
+
+
+	for (int procno = 0; procno < AioProcs(); procno++)
+	{
+		PgAioPerBackend *bs = &aio_ctl->backend_state[procno];
+
+		bs->io_handle_off = io_handle_off;
+		io_handle_off += io_max_concurrency;
+
+		bs->bounce_buffers_off = bounce_buffers_off;
+		bounce_buffers_off += per_backend_bb;
+
+		dclist_init(&bs->idle_ios);
+		dclist_init(&bs->staged_ios);
+		slist_init(&bs->idle_bbs);
+
+		/* initialize per-backend IOs */
+		for (int i = 0; i < io_max_concurrency; i++)
+		{
+			PgAioHandle *ioh = &aio_ctl->io_handles[bs->io_handle_off + i];
+
+			ioh->generation = 1;
+			ioh->owner_procno = procno;
+			ioh->iovec_off = iovec_off;
+			ioh->iovec_data_len = 0;
+			ioh->report_return = NULL;
+			ioh->resowner = NULL;
+			ioh->num_shared_callbacks = 0;
+			ioh->distilled_result.status = ARS_UNKNOWN;
+			ioh->flags = 0;
+
+			ConditionVariableInit(&ioh->cv);
+
+			dclist_push_tail(&bs->idle_ios, &ioh->node);
+			iovec_off += io_combine_limit;
+		}
+
+		/* initialize per-backend bounce buffers */
+		for (int i = 0; i < per_backend_bb; i++)
+		{
+			PgAioBounceBuffer *bb = &aio_ctl->bounce_buffers[bs->bounce_buffers_off + i];
+
+			slist_push_head(&bs->idle_bbs, &bb->node);
+		}
+	}
+
+out:
+	/* Initialize IO method specific resources. */
+	pgaio_impl->shmem_init(!found);
 }
 
 void
 pgaio_postmaster_init(void)
 {
+	if (pgaio_impl->postmaster_init)
+		pgaio_impl->postmaster_init();
 }
 
 void
 pgaio_postmaster_child_init(void)
 {
+	/* shouldn't be initialized twice */
+	Assert(!my_aio);
+
+	if (MyBackendType == B_IO_WORKER)
+		return;
+
+	if (MyProc == NULL || MyProcNumber >= AioProcs())
+		elog(ERROR, "aio requires a normal PGPROC");
+
+	my_aio = &aio_ctl->backend_state[MyProcNumber];
+
+	if (pgaio_impl->postmaster_child_init)
+		pgaio_impl->postmaster_child_init();
 }
 
 void
 pgaio_postmaster_child_init_local(void)
 {
+	if (pgaio_impl->postmaster_child_init_local)
+		pgaio_impl->postmaster_child_init_local();
+}
+
+bool
+pgaio_workers_enabled(void)
+{
+	return io_method == IOMETHOD_WORKER;
 }
diff --git a/src/backend/storage/aio/aio_io.c b/src/backend/storage/aio/aio_io.c
new file mode 100644
index 00000000000..5b2f9ee3ba6
--- /dev/null
+++ b/src/backend/storage/aio/aio_io.c
@@ -0,0 +1,111 @@
+/*-------------------------------------------------------------------------
+ *
+ * aio_io.c
+ *    Asynchronous I/O subsytem.
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *    src/backend/storage/aio/aio_io.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "storage/aio.h"
+#include "storage/aio_internal.h"
+#include "storage/fd.h"
+#include "utils/wait_event.h"
+
+
+static void
+pgaio_io_before_prep(PgAioHandle *ioh)
+{
+	Assert(ioh->state == AHS_HANDED_OUT);
+	Assert(pgaio_io_has_subject(ioh));
+}
+
+const char *
+pgaio_io_get_op_name(PgAioHandle *ioh)
+{
+	Assert(ioh->op >= 0 && ioh->op < PGAIO_OP_COUNT);
+
+	switch (ioh->op)
+	{
+		case PGAIO_OP_INVALID:
+			return "invalid";
+		case PGAIO_OP_READ:
+			return "read";
+		case PGAIO_OP_WRITE:
+			return "write";
+		case PGAIO_OP_FSYNC:
+			return "fsync";
+		case PGAIO_OP_FLUSH_RANGE:
+			return "flush_range";
+		case PGAIO_OP_NOP:
+			return "nop";
+	}
+
+	pg_unreachable();
+}
+
+void
+pgaio_io_prep_readv(PgAioHandle *ioh,
+					int fd, int iovcnt, uint64 offset)
+{
+	pgaio_io_before_prep(ioh);
+
+	ioh->op_data.read.fd = fd;
+	ioh->op_data.read.offset = offset;
+	ioh->op_data.read.iov_length = iovcnt;
+
+	pgaio_io_prepare(ioh, PGAIO_OP_READ);
+}
+
+void
+pgaio_io_prep_writev(PgAioHandle *ioh,
+					 int fd, int iovcnt, uint64 offset)
+{
+	pgaio_io_before_prep(ioh);
+
+	ioh->op_data.write.fd = fd;
+	ioh->op_data.write.offset = offset;
+	ioh->op_data.write.iov_length = iovcnt;
+
+	pgaio_io_prepare(ioh, PGAIO_OP_WRITE);
+}
+
+
+extern void
+pgaio_io_perform_synchronously(PgAioHandle *ioh)
+{
+	ssize_t		result = 0;
+	struct iovec *iov = &aio_ctl->iovecs[ioh->iovec_off];
+
+	/* Perform IO. */
+	switch (ioh->op)
+	{
+		case PGAIO_OP_READ:
+			pgstat_report_wait_start(WAIT_EVENT_DATA_FILE_READ);
+			result = pg_preadv(ioh->op_data.read.fd, iov,
+							   ioh->op_data.read.iov_length,
+							   ioh->op_data.read.offset);
+			pgstat_report_wait_end();
+			break;
+		case PGAIO_OP_WRITE:
+			pgstat_report_wait_start(WAIT_EVENT_DATA_FILE_WRITE);
+			result = pg_pwritev(ioh->op_data.write.fd, iov,
+								ioh->op_data.write.iov_length,
+								ioh->op_data.write.offset);
+			pgstat_report_wait_end();
+			break;
+		default:
+			elog(ERROR, "not yet");
+	}
+
+	ioh->result = result < 0 ? -errno : result;
+
+	pgaio_io_process_completion(ioh, ioh->result);
+}
diff --git a/src/backend/storage/aio/aio_subject.c b/src/backend/storage/aio/aio_subject.c
new file mode 100644
index 00000000000..68e9e80074c
--- /dev/null
+++ b/src/backend/storage/aio/aio_subject.c
@@ -0,0 +1,170 @@
+/*-------------------------------------------------------------------------
+ *
+ * aio_subject.c
+ *    IO completion handling for IOs on different subjects
+ *
+ * XXX Write me
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *    src/backend/storage/aio/aio_subject.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "storage/aio.h"
+#include "storage/aio_internal.h"
+#include "storage/buf_internals.h"
+#include "storage/bufmgr.h"
+#include "storage/smgr.h"
+#include "utils/memutils.h"
+
+
+static const PgAioSubjectInfo *aio_subject_info[] = {
+	[ASI_INVALID] = &(PgAioSubjectInfo) {
+		.name = "invalid",
+	},
+};
+
+static const PgAioHandleSharedCallbacks *aio_shared_cbs[] = {
+};
+
+
+void
+pgaio_io_add_shared_cb(PgAioHandle *ioh, PgAioHandleSharedCallbackID cbid)
+{
+	if (cbid >= lengthof(aio_shared_cbs))
+		elog(ERROR, "callback %d is out of range", cbid);
+	if (aio_shared_cbs[cbid]->complete == NULL)
+		elog(ERROR, "callback %d is undefined", cbid);
+	if (ioh->num_shared_callbacks >= AIO_MAX_SHARED_CALLBACKS)
+		elog(PANIC, "too many callbacks, the max is %d", AIO_MAX_SHARED_CALLBACKS);
+	ioh->shared_callbacks[ioh->num_shared_callbacks] = cbid;
+
+	elog(DEBUG3, "io:%d, op %s, subject %s, adding cbid num %d, id %d",
+		 pgaio_io_get_id(ioh),
+		 pgaio_io_get_op_name(ioh),
+		 pgaio_io_get_subject_name(ioh),
+		 ioh->num_shared_callbacks + 1, cbid);
+
+	ioh->num_shared_callbacks++;
+}
+
+const char *
+pgaio_io_get_subject_name(PgAioHandle *ioh)
+{
+	Assert(ioh->subject >= 0 && ioh->subject < ASI_COUNT);
+
+	return aio_subject_info[ioh->subject]->name;
+}
+
+void
+pgaio_io_prepare_subject(PgAioHandle *ioh)
+{
+	Assert(ioh->subject > ASI_INVALID && ioh->subject < ASI_COUNT);
+	Assert(ioh->op >= 0 && ioh->op < PGAIO_OP_COUNT);
+
+	for (int i = ioh->num_shared_callbacks; i > 0; i--)
+	{
+		PgAioHandleSharedCallbackID cbid = ioh->shared_callbacks[i - 1];
+		const PgAioHandleSharedCallbacks *cbs = aio_shared_cbs[cbid];
+
+		if (!cbs->prepare)
+			continue;
+
+		elog(DEBUG3, "io:%d, op %s, subject %s, calling cbid num %d, id %d: prepare",
+			 pgaio_io_get_id(ioh),
+			 pgaio_io_get_op_name(ioh),
+			 pgaio_io_get_subject_name(ioh),
+			 i, cbid);
+		cbs->prepare(ioh);
+	}
+}
+
+void
+pgaio_io_process_completion_subject(PgAioHandle *ioh)
+{
+	PgAioResult result;
+
+	Assert(ioh->subject >= 0 && ioh->subject < ASI_COUNT);
+	Assert(ioh->op >= 0 && ioh->op < PGAIO_OP_COUNT);
+
+	result.status = ARS_OK;		/* low level IO is always considered OK */
+	result.result = ioh->result;
+	result.id = 0;				/* FIXME */
+	result.error_data = 0;
+
+	for (int i = ioh->num_shared_callbacks; i > 0; i--)
+	{
+		PgAioHandleSharedCallbackID cbid;
+
+		cbid = ioh->shared_callbacks[i - 1];
+		elog(DEBUG3, "io:%d, op %s, subject %s, calling cbid num %d, id %d with distilled result status %d, id %u, error_data: %d, result: %d",
+			 pgaio_io_get_id(ioh),
+			 pgaio_io_get_op_name(ioh),
+			 pgaio_io_get_subject_name(ioh),
+			 i, cbid,
+			 result.status,
+			 result.id,
+			 result.error_data,
+			 result.result);
+		result = aio_shared_cbs[cbid]->complete(ioh, result);
+	}
+
+	ioh->distilled_result = result;
+
+	elog(DEBUG3, "io:%d, op %s, subject %s, distilled result status %d, id %u, error_data: %d, result: %d, raw_result %d",
+		 pgaio_io_get_id(ioh),
+		 pgaio_io_get_op_name(ioh),
+		 pgaio_io_get_subject_name(ioh),
+		 result.status,
+		 result.id,
+		 result.error_data,
+		 result.result,
+		 ioh->result);
+}
+
+void
+pgaio_io_reopen(PgAioHandle *ioh)
+{
+	Assert(ioh->subject >= 0 && ioh->subject < ASI_COUNT);
+	Assert(ioh->op >= 0 && ioh->op < PGAIO_OP_COUNT);
+
+	aio_subject_info[ioh->subject]->reopen(ioh);
+}
+
+bool
+pgaio_io_needs_synchronously(PgAioHandle *ioh)
+{
+	if (aio_subject_info[ioh->subject]->reopen == NULL)
+		return true;
+
+	return false;
+}
+
+
+
+/* --------------------------------------------------------------------------------
+ * IO Result
+ * --------------------------------------------------------------------------------
+ */
+
+void
+pgaio_result_log(PgAioResult result, const PgAioSubjectData *subject_data, int elevel)
+{
+	const PgAioHandleSharedCallbacks *scb;
+
+	Assert(result.status != ARS_UNKNOWN);
+	Assert(result.status != ARS_OK);
+
+	scb = aio_shared_cbs[result.id];
+
+	if (scb->error == NULL)
+		elog(ERROR, "scb id %d does not have error callback", result.id);
+
+	scb->error(result, subject_data, elevel);
+}
diff --git a/src/backend/storage/aio/meson.build b/src/backend/storage/aio/meson.build
index e13728b73da..8960223194a 100644
--- a/src/backend/storage/aio/meson.build
+++ b/src/backend/storage/aio/meson.build
@@ -2,7 +2,10 @@
 
 backend_sources += files(
   'aio.c',
+  'aio_io.c',
   'aio_init.c',
+  'aio_subject.c',
+  'method_io_uring.c',
   'method_worker.c',
   'read_stream.c',
 )
diff --git a/src/backend/storage/aio/method_io_uring.c b/src/backend/storage/aio/method_io_uring.c
new file mode 100644
index 00000000000..f76533b4cdc
--- /dev/null
+++ b/src/backend/storage/aio/method_io_uring.c
@@ -0,0 +1,393 @@
+/*-------------------------------------------------------------------------
+ *
+ * method_io_uring.c
+ *    AIO implementation using io_uring on Linux
+ *
+ * XXX Write me
+ *
+ * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *    src/backend/storage/aio/method_io_uring.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#ifdef USE_LIBURING
+
+#include <liburing.h>
+
+#include "pgstat.h"
+#include "port/pg_iovec.h"
+#include "storage/aio_internal.h"
+#include "storage/fd.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+
+
+/* Entry points for IoMethodOps. */
+static size_t pgaio_uring_shmem_size(void);
+static void pgaio_uring_postmaster_init(void);
+static void pgaio_uring_shmem_init(bool first_time);
+static void pgaio_uring_postmaster_child_init(void);
+static void pgaio_uring_postmaster_child_init_local(void);
+
+static int	pgaio_uring_submit(void);
+static void pgaio_uring_wait_one(PgAioHandle *ioh, uint64 ref_generation);
+
+static void pgaio_uring_sq_from_io(PgAioHandle *ioh, struct io_uring_sqe *sqe);
+
+
+const IoMethodOps pgaio_uring_ops = {
+	.shmem_size = pgaio_uring_shmem_size,
+	.shmem_init = pgaio_uring_shmem_init,
+	.postmaster_init = pgaio_uring_postmaster_init,
+	.postmaster_child_init = pgaio_uring_postmaster_child_init,
+	.postmaster_child_init_local = pgaio_uring_postmaster_child_init_local,
+
+	.submit = pgaio_uring_submit,
+	.wait_one = pgaio_uring_wait_one,
+#if 0
+	.retry = pgaio_uring_io_retry,
+	.wait_one = pgaio_uring_wait_one,
+	.drain = pgaio_uring_drain,
+#endif
+	.can_scatter_gather_direct = true,
+	.can_scatter_gather_buffered = true
+};
+
+typedef struct PgAioUringContext
+{
+	LWLock		completion_lock;
+
+	struct io_uring io_uring_ring;
+	/* XXX: probably worth padding to a cacheline boundary here */
+} PgAioUringContext;
+
+
+static PgAioUringContext *aio_uring_contexts;
+static PgAioUringContext *my_shared_uring_context;
+
+/* io_uring local state */
+static struct io_uring local_ring;
+
+
+
+static Size
+AioContextShmemSize(void)
+{
+	uint32		TotalProcs = MaxBackends + NUM_AUXILIARY_PROCS - MAX_IO_WORKERS;
+
+	return mul_size(TotalProcs, sizeof(PgAioUringContext));
+}
+
+static size_t
+pgaio_uring_shmem_size(void)
+{
+	return AioContextShmemSize();
+}
+
+static void
+pgaio_uring_shmem_init(bool first_time)
+{
+	uint32		TotalProcs = MaxBackends + NUM_AUXILIARY_PROCS - MAX_IO_WORKERS;
+	bool		found;
+
+	aio_uring_contexts = (PgAioUringContext *)
+		ShmemInitStruct("AioUring", pgaio_uring_shmem_size(), &found);
+
+	if (found)
+		return;
+
+	for (int contextno = 0; contextno < TotalProcs; contextno++)
+	{
+		PgAioUringContext *context = &aio_uring_contexts[contextno];
+		int			ret;
+
+		/*
+		 * XXX: Probably worth sharing the WQ between the different rings,
+		 * when supported by the kernel. Could also cause additional
+		 * contention, I guess?
+		 */
+#if 0
+		if (!AcquireExternalFD())
+			elog(ERROR, "No external FD available");
+#endif
+		ret = io_uring_queue_init(io_max_concurrency, &context->io_uring_ring, 0);
+		if (ret < 0)
+			elog(ERROR, "io_uring_queue_init failed: %s", strerror(-ret));
+
+		LWLockInitialize(&context->completion_lock, LWTRANCHE_AIO_URING_COMPLETION);
+	}
+}
+
+static void
+pgaio_uring_postmaster_init(void)
+{
+	uint32		TotalProcs =
+		MaxBackends + NUM_AUXILIARY_PROCS - MAX_IO_WORKERS;
+
+	for (int i = 0; i < TotalProcs; i++)
+		ReserveExternalFD();
+}
+
+static void
+pgaio_uring_postmaster_child_init(void)
+{
+	my_shared_uring_context = &aio_uring_contexts[MyProcNumber];
+}
+
+static void
+pgaio_uring_postmaster_child_init_local(void)
+{
+	int			ret;
+
+	ret = io_uring_queue_init(32, &local_ring, 0);
+	if (ret < 0)
+		elog(ERROR, "io_uring_queue_init failed: %s", strerror(-ret));
+}
+
+static int
+pgaio_uring_submit(void)
+{
+	PgAioHandle *ios[PGAIO_SUBMIT_BATCH_SIZE];
+	struct io_uring_sqe *sqe[PGAIO_SUBMIT_BATCH_SIZE];
+	struct io_uring *uring_instance = &my_shared_uring_context->io_uring_ring;
+	int			nios = 0;
+
+	while (!dclist_is_empty(&my_aio->staged_ios))
+	{
+		dlist_node *node;
+		PgAioHandle *ioh;
+
+		node = dclist_head_node(&my_aio->staged_ios);
+		ioh = dlist_container(PgAioHandle, node, node);
+
+		sqe[nios] = io_uring_get_sqe(uring_instance);
+		ios[nios] = ioh;
+
+		pgaio_io_prepare_submit(ioh);
+		pgaio_uring_sq_from_io(ios[nios], sqe[nios]);
+
+		nios++;
+
+		if (nios + 1 > PGAIO_SUBMIT_BATCH_SIZE)
+			break;
+	}
+
+	while (true)
+	{
+		int			ret;
+
+		pgstat_report_wait_start(WAIT_EVENT_AIO_SUBMIT);
+		ret = io_uring_submit(uring_instance);
+		pgstat_report_wait_end();
+
+		if (ret == -EINTR)
+		{
+			elog(DEBUG3, "submit EINTR, nios: %d", nios);
+			continue;
+		}
+		if (ret < 0)
+			elog(PANIC, "failed: %d/%s",
+				 ret, strerror(-ret));
+		else if (ret != nios)
+		{
+			/* likely unreachable, but if it is, we would need to re-submit */
+			elog(PANIC, "submitted only %d of %d",
+				 ret, nios);
+		}
+		else
+		{
+			elog(DEBUG3, "submit nios: %d", nios);
+		}
+		break;
+	}
+
+	return nios;
+}
+
+
+#define PGAIO_MAX_LOCAL_REAPED 16
+
+static void
+pgaio_uring_drain_locked(PgAioUringContext *context)
+{
+	int			ready;
+	int			orig_ready;
+
+	/*
+	 * Don't drain more events than available right now. Otherwise it's
+	 * plausible that one backend could get stuck, for a while, receiving CQEs
+	 * without actually processing them.
+	 */
+	orig_ready = ready = io_uring_cq_ready(&context->io_uring_ring);
+
+	while (ready > 0)
+	{
+		struct io_uring_cqe *reaped_cqes[PGAIO_MAX_LOCAL_REAPED];
+		uint32		reaped;
+
+		START_CRIT_SECTION();
+		reaped =
+			io_uring_peek_batch_cqe(&context->io_uring_ring,
+									reaped_cqes,
+									Min(PGAIO_MAX_LOCAL_REAPED, ready));
+		Assert(reaped <= ready);
+
+		ready -= reaped;
+
+		for (int i = 0; i < reaped; i++)
+		{
+			struct io_uring_cqe *cqe = reaped_cqes[i];
+			PgAioHandle *ioh;
+
+			ioh = io_uring_cqe_get_data(cqe);
+			io_uring_cqe_seen(&context->io_uring_ring, cqe);
+
+			pgaio_io_process_completion(ioh, cqe->res);
+		}
+
+		END_CRIT_SECTION();
+
+		ereport(DEBUG3,
+				errmsg("drained %d/%d, now expecting %d",
+					   reaped, orig_ready, io_uring_cq_ready(&context->io_uring_ring)),
+				errhidestmt(true),
+				errhidecontext(true));
+
+	}
+}
+
+static void
+pgaio_uring_wait_one(PgAioHandle *ioh, uint64 ref_generation)
+{
+	PgAioHandleState state;
+	ProcNumber	owner_procno = ioh->owner_procno;
+	PgAioUringContext *owner_context = &aio_uring_contexts[owner_procno];
+	bool		expect_cqe;
+	int			waited = 0;
+
+	/*
+	 * We ought to have a smarter locking scheme, nearly all the time the
+	 * backend owning the ring will reap the completions, making the locking
+	 * unnecessarily expensive.
+	 */
+	LWLockAcquire(&owner_context->completion_lock, LW_EXCLUSIVE);
+
+	while (true)
+	{
+		ereport(DEBUG3,
+				errmsg("wait_one for io:%d in state %s, cycle %d",
+					   pgaio_io_get_id(ioh), pgaio_io_get_state_name(ioh), waited),
+				errhidestmt(true),
+				errhidecontext(true));
+
+		if (pgaio_io_was_recycled(ioh, ref_generation, &state) ||
+			state != AHS_IN_FLIGHT)
+		{
+			break;
+		}
+		else if (io_uring_cq_ready(&owner_context->io_uring_ring))
+		{
+			expect_cqe = true;
+		}
+		else
+		{
+			int			ret;
+			struct io_uring_cqe *cqes;
+
+			pgstat_report_wait_start(WAIT_EVENT_AIO_DRAIN);
+			ret = io_uring_wait_cqes(&owner_context->io_uring_ring, &cqes, 1, NULL, NULL);
+			pgstat_report_wait_end();
+
+			if (ret == -EINTR)
+			{
+				continue;
+			}
+			else if (ret != 0)
+			{
+				elog(PANIC, "unexpected: %d/%s: %m", ret, strerror(-ret));
+			}
+			else
+			{
+				Assert(cqes != NULL);
+				expect_cqe = true;
+				waited++;
+			}
+		}
+
+		if (expect_cqe)
+		{
+			pgaio_uring_drain_locked(owner_context);
+		}
+	}
+
+	LWLockRelease(&owner_context->completion_lock);
+
+	ereport(DEBUG3,
+			errmsg("wait_one with %d sleeps",
+				   waited),
+			errhidestmt(true),
+			errhidecontext(true));
+}
+
+static void
+pgaio_uring_sq_from_io(PgAioHandle *ioh, struct io_uring_sqe *sqe)
+{
+	struct iovec *iov;
+
+	switch (ioh->op)
+	{
+		case PGAIO_OP_READ:
+			iov = &aio_ctl->iovecs[ioh->iovec_off];
+			if (ioh->op_data.read.iov_length == 1)
+			{
+				io_uring_prep_read(sqe,
+								   ioh->op_data.read.fd,
+								   iov->iov_base,
+								   iov->iov_len,
+								   ioh->op_data.read.offset);
+			}
+			else
+			{
+				io_uring_prep_readv(sqe,
+									ioh->op_data.read.fd,
+									iov,
+									ioh->op_data.read.iov_length,
+									ioh->op_data.read.offset);
+
+			}
+			break;
+
+		case PGAIO_OP_WRITE:
+			iov = &aio_ctl->iovecs[ioh->iovec_off];
+			if (ioh->op_data.write.iov_length == 1)
+			{
+				io_uring_prep_write(sqe,
+									ioh->op_data.write.fd,
+									iov->iov_base,
+									iov->iov_len,
+									ioh->op_data.write.offset);
+			}
+			else
+			{
+				io_uring_prep_writev(sqe,
+									 ioh->op_data.write.fd,
+									 iov,
+									 ioh->op_data.write.iov_length,
+									 ioh->op_data.write.offset);
+			}
+			break;
+
+		default:
+			elog(ERROR, "not implemented");
+	}
+
+	io_uring_sqe_set_data(sqe, ioh);
+}
+
+#endif							/* USE_LIBURING */
diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c
index 5df2eea4a03..cd79bf1fba6 100644
--- a/src/backend/storage/aio/method_worker.c
+++ b/src/backend/storage/aio/method_worker.c
@@ -3,6 +3,21 @@
  * method_worker.c
  *    AIO implementation using workers
  *
+ * Worker processes consume IOs from a shared memory submission queue, run
+ * traditional synchronous system calls, and perform the shared completion
+ * handling immediately.  Client code submits most requests by pushing IOs
+ * into the submission queue, and waits (if necessary) using condition
+ * variables.  Some IOs cannot be performed in another process due to lack of
+ * infrastructure for reopening the file, and must processed synchronously by
+ * the client code when submitted.
+ *
+ * So that the submitter can make just one system call when submitting a batch
+ * of IOs, wakeups "fan out"; each woken backend can wake two more.  XXX This
+ * could be improved by using futexes instead of latches to wake N waiters.
+ *
+ * This method of AIO is available in all builds on all operating systems, and
+ * is the default.
+ *
  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
@@ -16,24 +31,299 @@
 
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
+#include "port/pg_bitutils.h"
+#include "postmaster/auxprocess.h"
 #include "postmaster/interrupt.h"
+#include "storage/aio.h"
+#include "storage/aio_internal.h"
 #include "storage/io_worker.h"
 #include "storage/ipc.h"
 #include "storage/latch.h"
 #include "storage/proc.h"
 #include "tcop/tcopprot.h"
 #include "utils/wait_event.h"
+#include "utils/ps_status.h"
+
+
+/* How many workers should each worker wake up if needed? */
+#define IO_WORKER_WAKEUP_FANOUT 2
+
+
+typedef struct AioWorkerSubmissionQueue
+{
+	uint32		size;
+	uint32		mask;
+	uint32		head;
+	uint32		tail;
+	uint32		ios[FLEXIBLE_ARRAY_MEMBER];
+} AioWorkerSubmissionQueue;
+
+typedef struct AioWorkerSlot
+{
+	Latch	   *latch;
+	bool		in_use;
+} AioWorkerSlot;
+
+typedef struct AioWorkerControl
+{
+	uint64		idle_worker_mask;
+	AioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
+} AioWorkerControl;
+
+
+static size_t pgaio_worker_shmem_size(void);
+static void pgaio_worker_shmem_init(bool first_time);
+static void pgaio_worker_postmaster_child_init_local(void);
+
+static int	pgaio_worker_submit(void);
+
+
+const IoMethodOps pgaio_worker_ops = {
+	.shmem_size = pgaio_worker_shmem_size,
+	.shmem_init = pgaio_worker_shmem_init,
+	.postmaster_child_init_local = pgaio_worker_postmaster_child_init_local,
+	.submit = pgaio_worker_submit,
+#if 0
+	.wait_one = pgaio_worker_wait_one,
+	.retry = pgaio_worker_io_retry,
+	.drain = pgaio_worker_drain,
+#endif
+
+	.can_scatter_gather_direct = true,
+	.can_scatter_gather_buffered = true
+};
 
 
 int			io_workers = 3;
+static int	io_worker_queue_size = 64;
 
+static int	MyIoWorkerId;
+
+
+static AioWorkerSubmissionQueue *io_worker_submission_queue;
+static AioWorkerControl *io_worker_control;
+
+
+static size_t
+pgaio_worker_shmem_size(void)
+{
+	return
+		offsetof(AioWorkerSubmissionQueue, ios) +
+		sizeof(uint32) * io_worker_queue_size +
+		offsetof(AioWorkerControl, workers) +
+		sizeof(AioWorkerSlot) * io_workers;
+}
+
+static void
+pgaio_worker_shmem_init(bool first_time)
+{
+	bool		found;
+	int			size;
+
+	/* Round size up to next power of two so we can make a mask. */
+	size = pg_nextpower2_32(io_worker_queue_size);
+
+	io_worker_submission_queue =
+		ShmemInitStruct("AioWorkerSubmissionQueue",
+						offsetof(AioWorkerSubmissionQueue, ios) +
+						sizeof(uint32) * size,
+						&found);
+	if (!found)
+	{
+		io_worker_submission_queue->size = size;
+		io_worker_submission_queue->head = 0;
+		io_worker_submission_queue->tail = 0;
+	}
+
+	io_worker_control =
+		ShmemInitStruct("AioWorkerControl",
+						offsetof(AioWorkerControl, workers) +
+						sizeof(AioWorkerSlot) * io_workers,
+						&found);
+	if (!found)
+	{
+		io_worker_control->idle_worker_mask = 0;
+		for (int i = 0; i < io_workers; ++i)
+		{
+			io_worker_control->workers[i].latch = NULL;
+			io_worker_control->workers[i].in_use = false;
+		}
+	}
+}
+
+static void
+pgaio_worker_postmaster_child_init_local(void)
+{
+}
+
+
+static int
+pgaio_choose_idle_worker(void)
+{
+	int			worker;
+
+	if (io_worker_control->idle_worker_mask == 0)
+		return -1;
+
+	/* Find the lowest bit position, and clear it. */
+	worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask);
+	io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker);
+
+	return worker;
+}
+
+static bool
+pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
+{
+	AioWorkerSubmissionQueue *queue;
+	uint32		new_head;
+
+	queue = io_worker_submission_queue;
+	new_head = (queue->head + 1) & (queue->size - 1);
+	if (new_head == queue->tail)
+	{
+		elog(DEBUG1, "full");
+		return false;			/* full */
+	}
+
+	queue->ios[queue->head] = pgaio_io_get_id(ioh);
+	queue->head = new_head;
+
+	return true;
+}
+
+static uint32
+pgaio_worker_submission_queue_consume(void)
+{
+	AioWorkerSubmissionQueue *queue;
+	uint32		result;
+
+	queue = io_worker_submission_queue;
+	if (queue->tail == queue->head)
+		return UINT32_MAX;		/* empty */
+
+	result = queue->ios[queue->tail];
+	queue->tail = (queue->tail + 1) & (queue->size - 1);
+
+	return result;
+}
+
+static uint32
+pgaio_worker_submission_queue_depth(void)
+{
+	uint32		head;
+	uint32		tail;
+
+	head = io_worker_submission_queue->head;
+	tail = io_worker_submission_queue->tail;
+
+	if (tail > head)
+		head += io_worker_submission_queue->size;
+
+	Assert(head >= tail);
+
+	return head - tail;
+}
+
+static bool
+pgaio_worker_need_synchronous(PgAioHandle *ioh)
+{
+	return
+		!IsUnderPostmaster
+		|| ioh->flags & AHF_REFERENCES_LOCAL
+		|| pgaio_io_needs_synchronously(ioh);
+}
+
+static void
+pgaio_worker_submit_internal(PgAioHandle *ios[], int nios)
+{
+	PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE];
+	int			nsync = 0;
+	Latch	   *wakeup = NULL;
+	int			worker;
+
+	Assert(nios <= PGAIO_SUBMIT_BATCH_SIZE);
+
+	LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+	for (int i = 0; i < nios; ++i)
+	{
+		if (pgaio_worker_need_synchronous(ios[i]) ||
+			!pgaio_worker_submission_queue_insert(ios[i]))
+		{
+			/*
+			 * We'll do it synchronously, but only after we've sent as many as
+			 * we can to workers, to maximize concurrency.
+			 */
+			synchronous_ios[nsync++] = ios[i];
+			continue;
+		}
+
+		if (wakeup == NULL)
+		{
+			/* Choose an idle worker to wake up if we haven't already. */
+			worker = pgaio_choose_idle_worker();
+			if (worker >= 0)
+				wakeup = io_worker_control->workers[worker].latch;
+
+			ereport(DEBUG3,
+					errmsg("submission for io:%d choosing worker %d, latch %p",
+						   pgaio_io_get_id(ios[i]), worker, wakeup),
+					errhidestmt(true), errhidecontext(true));
+		}
+	}
+	LWLockRelease(AioWorkerSubmissionQueueLock);
+
+	if (wakeup)
+		SetLatch(wakeup);
+
+	/* Run whatever is left synchronously. */
+	if (nsync > 0)
+	{
+		for (int i = 0; i < nsync; ++i)
+		{
+			pgaio_io_perform_synchronously(synchronous_ios[i]);
+		}
+	}
+}
+
+static int
+pgaio_worker_submit(void)
+{
+	PgAioHandle *ios[PGAIO_SUBMIT_BATCH_SIZE];
+	int			nios = 0;
+
+	while (!dclist_is_empty(&my_aio->staged_ios))
+	{
+		dlist_node *node;
+		PgAioHandle *ioh;
+
+		node = dclist_head_node(&my_aio->staged_ios);
+		ioh = dlist_container(PgAioHandle, node, node);
+
+		pgaio_io_prepare_submit(ioh);
+
+		Assert(nios < PGAIO_SUBMIT_BATCH_SIZE);
+
+		ios[nios++] = ioh;
+
+		if (nios + 1 == PGAIO_SUBMIT_BATCH_SIZE)
+			break;
+	}
+
+	pgaio_worker_submit_internal(ios, nios);
+
+	return nios;
+}
 
 void
 IoWorkerMain(char *startup_data, size_t startup_data_len)
 {
 	sigjmp_buf	local_sigjmp_buf;
+	volatile PgAioHandle *ioh = NULL;
+	char		cmd[128];
 
 	MyBackendType = B_IO_WORKER;
+	AuxiliaryProcessMainCommon();
 
 	/* TODO review all signals */
 	pqsignal(SIGHUP, SignalHandlerForConfigReload);
@@ -49,7 +339,34 @@ IoWorkerMain(char *startup_data, size_t startup_data_len)
 	pqsignal(SIGPIPE, SIG_IGN);
 	pqsignal(SIGUSR1, procsignal_sigusr1_handler);
 	pqsignal(SIGUSR2, SignalHandlerForShutdownRequest);
-	sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
+
+	/* FIXME: locking */
+	MyIoWorkerId = -1;
+
+	LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+
+	for (int i = 0; i < io_workers; ++i)
+	{
+		if (!io_worker_control->workers[i].in_use)
+		{
+			Assert(io_worker_control->workers[i].latch == NULL);
+			io_worker_control->workers[i].in_use = true;
+			MyIoWorkerId = i;
+			break;
+		}
+		else
+			Assert(io_worker_control->workers[i].latch != NULL);
+	}
+
+	if (MyIoWorkerId == -1)
+		elog(ERROR, "couldn't find a free worker slot");
+
+	io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
+	io_worker_control->workers[MyIoWorkerId].latch = MyLatch;
+	LWLockRelease(AioWorkerSubmissionQueueLock);
+
+	sprintf(cmd, "worker: %d", MyIoWorkerId);
+	set_ps_display(cmd);
 
 	/* see PostgresMain() */
 	if (sigsetjmp(local_sigjmp_buf, 1) != 0)
@@ -64,21 +381,107 @@ IoWorkerMain(char *startup_data, size_t startup_data_len)
 		LWLockReleaseAll();
 
 		/* TODO: recover from IO errors */
+		if (ioh != NULL)
+		{
+#if 0
+			/* EINTR is treated as a retryable error */
+			pgaio_process_io_completion(unvolatize(PgAioInProgress *, io),
+										EINTR);
+#endif
+		}
 
 		EmitErrorReport();
+
+		/* FIXME: should probably be a before-shmem-exit instead */
+		LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+		Assert(io_worker_control->workers[MyIoWorkerId].in_use);
+		Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch);
+
+		io_worker_control->workers[MyIoWorkerId].in_use = false;
+		io_worker_control->workers[MyIoWorkerId].latch = NULL;
+		LWLockRelease(AioWorkerSubmissionQueueLock);
+
 		proc_exit(1);
 	}
 
 	/* We can now handle ereport(ERROR) */
 	PG_exception_stack = &local_sigjmp_buf;
 
+	sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
+
 	while (!ShutdownRequestPending)
 	{
-		WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
-				  WAIT_EVENT_IO_WORKER_MAIN);
-		ResetLatch(MyLatch);
-		CHECK_FOR_INTERRUPTS();
+		uint32		io_index;
+		Latch	   *latches[IO_WORKER_WAKEUP_FANOUT];
+		int			nlatches = 0;
+		int			nwakeups = 0;
+		int			worker;
+
+		/* Try to get a job to do. */
+		LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+		if ((io_index = pgaio_worker_submission_queue_consume()) == UINT32_MAX)
+		{
+			/* Nothing to do.  Mark self idle. */
+			/*
+			 * XXX: Invent some kind of back pressure to reduce useless
+			 * wakeups?
+			 */
+			io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId);
+		}
+		else
+		{
+			/* Got one.  Clear idle flag. */
+			io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
+
+			/* See if we can wake up some peers. */
+			nwakeups = Min(pgaio_worker_submission_queue_depth(),
+						   IO_WORKER_WAKEUP_FANOUT);
+			for (int i = 0; i < nwakeups; ++i)
+			{
+				if ((worker = pgaio_choose_idle_worker()) < 0)
+					break;
+				latches[nlatches++] = io_worker_control->workers[worker].latch;
+			}
+#if 0
+			if (nwakeups > 0)
+				elog(LOG, "wake %d", nwakeups);
+#endif
+		}
+		LWLockRelease(AioWorkerSubmissionQueueLock);
+
+		for (int i = 0; i < nlatches; ++i)
+			SetLatch(latches[i]);
+
+		if (io_index != UINT32_MAX)
+		{
+			ioh = &aio_ctl->io_handles[io_index];
+
+			ereport(DEBUG3,
+					errmsg("worker processing io:%d",
+						   pgaio_io_get_id(unvolatize(PgAioHandle *, ioh))),
+					errhidestmt(true), errhidecontext(true));
+
+			pgaio_io_reopen(unvolatize(PgAioHandle *, ioh));
+			pgaio_io_perform_synchronously(unvolatize(PgAioHandle *, ioh));
+
+			ioh = NULL;
+		}
+		else
+		{
+			WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
+					  WAIT_EVENT_IO_WORKER_MAIN);
+			ResetLatch(MyLatch);
+			CHECK_FOR_INTERRUPTS();
+		}
 	}
 
+	LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
+	Assert(io_worker_control->workers[MyIoWorkerId].in_use);
+	Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch);
+
+	io_worker_control->workers[MyIoWorkerId].in_use = false;
+	io_worker_control->workers[MyIoWorkerId].latch = NULL;
+	LWLockRelease(AioWorkerSubmissionQueueLock);
+
 	proc_exit(0);
 }
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index f3d3435b1f5..63d1f905554 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -166,6 +166,7 @@ static const char *const BuiltinTrancheNames[] = {
 	[LWTRANCHE_SUBTRANS_SLRU] = "SubtransSLRU",
 	[LWTRANCHE_XACT_SLRU] = "XactSLRU",
 	[LWTRANCHE_PARALLEL_VACUUM_DSA] = "ParallelVacuumDSA",
+	[LWTRANCHE_AIO_URING_COMPLETION] = "AioUringCompletion",
 };
 
 StaticAssertDecl(lengthof(BuiltinTrancheNames) ==
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 47a2c4d126b..3678f2b3e43 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -192,6 +192,9 @@ ABI_compatibility:
 
 Section: ClassName - WaitEventIO
 
+AIO_SUBMIT	"Waiting for AIO submission."
+AIO_DRAIN	"Waiting for IOs to finish."
+AIO_COMPLETION	"Waiting for completion callback."
 BASEBACKUP_READ	"Waiting for base backup to read from a file."
 BASEBACKUP_SYNC	"Waiting for data written by a base backup to reach durable storage."
 BASEBACKUP_WRITE	"Waiting for base backup to write to a file."
@@ -348,6 +351,7 @@ DSMRegistry	"Waiting to read or update the dynamic shared memory registry."
 InjectionPoint	"Waiting to read or update information related to injection points."
 SerialControl	"Waiting to read or update shared <filename>pg_serial</filename> state."
 WaitLSN	"Waiting to read or update shared Wait-for-LSN state."
+AioWorkerSubmissionQueue	"Waiting to access AIO worker submission queue."
 
 #
 # END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE)
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 5670f40478a..5828072a48e 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -3214,6 +3214,31 @@ struct config_int ConfigureNamesInt[] =
 		NULL, assign_io_workers, NULL
 	},
 
+	{
+		{"io_max_concurrency",
+			PGC_POSTMASTER,
+			RESOURCES_ASYNCHRONOUS,
+			gettext_noop("Number of IOs that may be in flight in one backend."),
+			NULL,
+		},
+		&io_max_concurrency,
+		-1, -1, 1024,
+		NULL, NULL, NULL
+	},
+
+	{
+		{"io_bounce_buffers",
+			PGC_POSTMASTER,
+			RESOURCES_ASYNCHRONOUS,
+			gettext_noop("Number of IO Bounce Buffers reserved for each backend."),
+			NULL,
+			GUC_UNIT_BLOCKS
+		},
+		&io_bounce_buffers,
+		-1, -1, 4096,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"backend_flush_after", PGC_USERSET, RESOURCES_ASYNCHRONOUS,
 			gettext_noop("Number of pages after which previously performed writes are flushed to disk."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 90430381efa..1fc8336496c 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -842,6 +842,12 @@
 #io_method = worker			# (change requires restart)
 #io_workers = 3				# 1-32;
 
+#io_max_concurrency = 32		# Max number of IOs that may be in
+					# flight at the same time in one backend
+					# (change requires restart)
+#io_bounce_buffers = -1			# -1 sets based on shared_buffers
+					# (change requires restart)
+
 
 #------------------------------------------------------------------------------
 # CUSTOMIZED OPTIONS
diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c
index 505534ee8d3..d1932b7393c 100644
--- a/src/backend/utils/resowner/resowner.c
+++ b/src/backend/utils/resowner/resowner.c
@@ -47,6 +47,8 @@
 
 #include "common/hashfn.h"
 #include "common/int.h"
+#include "lib/ilist.h"
+#include "storage/aio.h"
 #include "storage/ipc.h"
 #include "storage/predicate.h"
 #include "storage/proc.h"
@@ -155,6 +157,13 @@ struct ResourceOwnerData
 
 	/* The local locks cache. */
 	LOCALLOCK  *locks[MAX_RESOWNER_LOCKS];	/* list of owned locks */
+
+	/*
+	 * AIO handles & bounce buffers need be registered in critical sections
+	 * and therefore cannot use the normal ResoureElem mechanism.
+	 */
+	dlist_head	aio_handles;
+	dlist_head	aio_bounce_buffers;
 };
 
 
@@ -425,6 +434,9 @@ ResourceOwnerCreate(ResourceOwner parent, const char *name)
 		parent->firstchild = owner;
 	}
 
+	dlist_init(&owner->aio_handles);
+	dlist_init(&owner->aio_bounce_buffers);
+
 	return owner;
 }
 
@@ -725,6 +737,21 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
 		 * so issue warnings.  In the abort case, just clean up quietly.
 		 */
 		ResourceOwnerReleaseAll(owner, phase, isCommit);
+
+		/* XXX: Could probably be a later phase? */
+		while (!dlist_is_empty(&owner->aio_handles))
+		{
+			dlist_node *node = dlist_head_node(&owner->aio_handles);
+
+			pgaio_io_release_resowner(node, !isCommit);
+		}
+
+		while (!dlist_is_empty(&owner->aio_bounce_buffers))
+		{
+			dlist_node *node = dlist_head_node(&owner->aio_bounce_buffers);
+
+			pgaio_bounce_buffer_release_resowner(node, !isCommit);
+		}
 	}
 	else if (phase == RESOURCE_RELEASE_LOCKS)
 	{
@@ -1082,3 +1109,27 @@ ResourceOwnerForgetLock(ResourceOwner owner, LOCALLOCK *locallock)
 	elog(ERROR, "lock reference %p is not owned by resource owner %s",
 		 locallock, owner->name);
 }
+
+void
+ResourceOwnerRememberAioHandle(ResourceOwner owner, struct dlist_node *ioh_node)
+{
+	dlist_push_tail(&owner->aio_handles, ioh_node);
+}
+
+void
+ResourceOwnerForgetAioHandle(ResourceOwner owner, struct dlist_node *ioh_node)
+{
+	dlist_delete_from(&owner->aio_handles, ioh_node);
+}
+
+void
+ResourceOwnerRememberAioBounceBuffer(ResourceOwner owner, struct dlist_node *ioh_node)
+{
+	dlist_push_tail(&owner->aio_bounce_buffers, ioh_node);
+}
+
+void
+ResourceOwnerForgetAioBounceBuffer(ResourceOwner owner, struct dlist_node *ioh_node)
+{
+	dlist_delete_from(&owner->aio_bounce_buffers, ioh_node);
+}
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 309686627e7..be8be9fbff0 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -54,6 +54,9 @@ AggStrategy
 AggTransInfo
 Aggref
 AggregateInstrumentation
+AioWorkerControl
+AioWorkerSlot
+AioWorkerSubmissionQueue
 AlenState
 Alias
 AllocBlock
@@ -1258,6 +1261,7 @@ IntoClause
 InvalMessageArray
 InvalidationMsgsGroup
 IoMethod
+IoMethodOps
 IpcMemoryId
 IpcMemoryKey
 IpcMemoryState
@@ -2093,6 +2097,25 @@ Permutation
 PermutationStep
 PermutationStepBlocker
 PermutationStepBlockerType
+PgAioBounceBuffer
+PgAioCtl
+PgAioHandle
+PgAioHandleFlags
+PgAioHandleRef
+PgAioHandleState
+PgAioHandleSharedCallbacks
+PgAioHandleSharedCallbackID
+PgAioHandleSharedCallbacks
+PgAioOp
+PgAioOpData
+PgAioPerBackend
+PgAioResultStatus
+PgAioResult
+PgAioReturn
+PgAioSubjectData
+PgAioSubjectID
+PgAioSubjectInfo
+PgAioUringContext
 PgArchData
 PgBackendGSSStatus
 PgBackendSSLStatus
-- 
2.45.2.827.g557ae147e6

