From f6e3c30c331f385a7bed2ee78704f6393b8d7f82 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Tue, 11 Feb 2025 14:29:37 -0500
Subject: [PATCH v2.4 14/29] aio: Add io_uring method

---
 src/include/storage/aio.h                     |   3 +
 src/include/storage/aio_internal.h            |   3 +
 src/include/storage/lwlock.h                  |   1 +
 src/backend/storage/aio/Makefile              |   1 +
 src/backend/storage/aio/aio.c                 |   6 +
 src/backend/storage/aio/meson.build           |   1 +
 src/backend/storage/aio/method_io_uring.c     | 382 ++++++++++++++++++
 src/backend/storage/lmgr/lwlock.c             |   1 +
 .../utils/activity/wait_event_names.txt       |   2 +
 src/backend/utils/misc/postgresql.conf.sample |   3 +-
 doc/src/sgml/config.sgml                      |   6 +
 src/tools/pgindent/typedefs.list              |   1 +
 12 files changed, 409 insertions(+), 1 deletion(-)
 create mode 100644 src/backend/storage/aio/method_io_uring.c

diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h
index ca0f9b64d97..3c058c84003 100644
--- a/src/include/storage/aio.h
+++ b/src/include/storage/aio.h
@@ -24,6 +24,9 @@ typedef enum IoMethod
 {
 	IOMETHOD_SYNC = 0,
 	IOMETHOD_WORKER,
+#ifdef USE_LIBURING
+	IOMETHOD_IO_URING,
+#endif
 } IoMethod;
 
 /* We'll default to worker based execution. */
diff --git a/src/include/storage/aio_internal.h b/src/include/storage/aio_internal.h
index c6e7306ed61..ac26aff80b6 100644
--- a/src/include/storage/aio_internal.h
+++ b/src/include/storage/aio_internal.h
@@ -339,6 +339,9 @@ extern PgAioHandle *pgaio_inj_io_get(void);
 /* Declarations for the tables of function pointers exposed by each IO method. */
 extern PGDLLIMPORT const IoMethodOps pgaio_sync_ops;
 extern PGDLLIMPORT const IoMethodOps pgaio_worker_ops;
+#ifdef USE_LIBURING
+extern PGDLLIMPORT const IoMethodOps pgaio_uring_ops;
+#endif
 
 extern PGDLLIMPORT const IoMethodOps *pgaio_method_ops;
 extern PGDLLIMPORT PgAioCtl *pgaio_ctl;
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index 13a7dc89980..043e8bae7a9 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -217,6 +217,7 @@ typedef enum BuiltinTrancheIds
 	LWTRANCHE_SUBTRANS_SLRU,
 	LWTRANCHE_XACT_SLRU,
 	LWTRANCHE_PARALLEL_VACUUM_DSA,
+	LWTRANCHE_AIO_URING_COMPLETION,
 	LWTRANCHE_FIRST_USER_DEFINED,
 }			BuiltinTrancheIds;
 
diff --git a/src/backend/storage/aio/Makefile b/src/backend/storage/aio/Makefile
index f51c34a37f8..c06c50771e0 100644
--- a/src/backend/storage/aio/Makefile
+++ b/src/backend/storage/aio/Makefile
@@ -14,6 +14,7 @@ OBJS = \
 	aio_init.o \
 	aio_io.o \
 	aio_target.o \
+	method_io_uring.o \
 	method_sync.o \
 	method_worker.o \
 	read_stream.o
diff --git a/src/backend/storage/aio/aio.c b/src/backend/storage/aio/aio.c
index 3eace131de2..a1282351436 100644
--- a/src/backend/storage/aio/aio.c
+++ b/src/backend/storage/aio/aio.c
@@ -64,6 +64,9 @@ static void pgaio_io_wait(PgAioHandle *ioh, uint64 ref_generation);
 const struct config_enum_entry io_method_options[] = {
 	{"sync", IOMETHOD_SYNC, false},
 	{"worker", IOMETHOD_WORKER, false},
+#ifdef USE_LIBURING
+	{"io_uring", IOMETHOD_IO_URING, false},
+#endif
 	{NULL, 0, false}
 };
 
@@ -81,6 +84,9 @@ PgAioBackend *pgaio_my_backend;
 static const IoMethodOps *const pgaio_method_ops_table[] = {
 	[IOMETHOD_SYNC] = &pgaio_sync_ops,
 	[IOMETHOD_WORKER] = &pgaio_worker_ops,
+#ifdef USE_LIBURING
+	[IOMETHOD_IO_URING] = &pgaio_uring_ops,
+#endif
 };
 
 /* callbacks for the configured io_method, set by assign_io_method */
diff --git a/src/backend/storage/aio/meson.build b/src/backend/storage/aio/meson.build
index 74f94c6e40b..2f0f03d8071 100644
--- a/src/backend/storage/aio/meson.build
+++ b/src/backend/storage/aio/meson.build
@@ -6,6 +6,7 @@ backend_sources += files(
   'aio_init.c',
   'aio_io.c',
   'aio_target.c',
+  'method_io_uring.c',
   'method_sync.c',
   'method_worker.c',
   'read_stream.c',
diff --git a/src/backend/storage/aio/method_io_uring.c b/src/backend/storage/aio/method_io_uring.c
new file mode 100644
index 00000000000..43f7576498c
--- /dev/null
+++ b/src/backend/storage/aio/method_io_uring.c
@@ -0,0 +1,382 @@
+/*-------------------------------------------------------------------------
+ *
+ * method_io_uring.c
+ *    AIO - perform AIO using Linux' io_uring
+ *
+ * XXX Write me
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *    src/backend/storage/aio/method_io_uring.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#ifdef USE_LIBURING
+
+#include <liburing.h>
+
+#include "pgstat.h"
+#include "port/pg_iovec.h"
+#include "storage/aio_internal.h"
+#include "storage/fd.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+
+
+/* Entry points for IoMethodOps. */
+static size_t pgaio_uring_shmem_size(void);
+static void pgaio_uring_shmem_init(bool first_time);
+static void pgaio_uring_init_backend(void);
+
+static int	pgaio_uring_submit(uint16 num_staged_ios, PgAioHandle **staged_ios);
+static void pgaio_uring_wait_one(PgAioHandle *ioh, uint64 ref_generation);
+
+static void pgaio_uring_sq_from_io(PgAioHandle *ioh, struct io_uring_sqe *sqe);
+
+
+const IoMethodOps pgaio_uring_ops = {
+	.shmem_size = pgaio_uring_shmem_size,
+	.shmem_init = pgaio_uring_shmem_init,
+	.init_backend = pgaio_uring_init_backend,
+
+	.submit = pgaio_uring_submit,
+	.wait_one = pgaio_uring_wait_one,
+};
+
+typedef struct PgAioUringContext
+{
+	LWLock		completion_lock;
+
+	struct io_uring io_uring_ring;
+	/* XXX: probably worth padding to a cacheline boundary here */
+} PgAioUringContext;
+
+
+static PgAioUringContext *pgaio_uring_contexts;
+static PgAioUringContext *pgaio_my_uring_context;
+
+/* io_uring local state */
+static struct io_uring local_ring;
+
+
+
+static Size
+pgaio_uring_context_shmem_size(void)
+{
+	uint32		TotalProcs = MaxBackends + NUM_AUXILIARY_PROCS - MAX_IO_WORKERS;
+
+	return mul_size(TotalProcs, sizeof(PgAioUringContext));
+}
+
+static size_t
+pgaio_uring_shmem_size(void)
+{
+	return pgaio_uring_context_shmem_size();
+}
+
+static void
+pgaio_uring_shmem_init(bool first_time)
+{
+	uint32		TotalProcs = MaxBackends + NUM_AUXILIARY_PROCS - MAX_IO_WORKERS;
+	bool		found;
+
+	pgaio_uring_contexts = (PgAioUringContext *)
+		ShmemInitStruct("AioUring", pgaio_uring_shmem_size(), &found);
+
+	if (found)
+		return;
+
+	for (int contextno = 0; contextno < TotalProcs; contextno++)
+	{
+		PgAioUringContext *context = &pgaio_uring_contexts[contextno];
+		int			ret;
+
+		/*
+		 * XXX: Probably worth sharing the WQ between the different rings,
+		 * when supported by the kernel. Could also cause additional
+		 * contention, I guess?
+		 */
+#if 0
+		if (!AcquireExternalFD())
+			elog(ERROR, "No external FD available");
+#endif
+		ret = io_uring_queue_init(io_max_concurrency, &context->io_uring_ring, 0);
+		if (ret < 0)
+			elog(ERROR, "io_uring_queue_init failed: %s", strerror(-ret));
+
+		LWLockInitialize(&context->completion_lock, LWTRANCHE_AIO_URING_COMPLETION);
+	}
+}
+
+static void
+pgaio_uring_init_backend(void)
+{
+	int			ret;
+
+	pgaio_my_uring_context = &pgaio_uring_contexts[MyProcNumber];
+
+	ret = io_uring_queue_init(32, &local_ring, 0);
+	if (ret < 0)
+		elog(ERROR, "io_uring_queue_init failed: %s", strerror(-ret));
+}
+
+static int
+pgaio_uring_submit(uint16 num_staged_ios, PgAioHandle **staged_ios)
+{
+	struct io_uring *uring_instance = &pgaio_my_uring_context->io_uring_ring;
+	int			in_flight_before = dclist_count(&pgaio_my_backend->in_flight_ios);
+
+	Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
+
+	for (int i = 0; i < num_staged_ios; i++)
+	{
+		PgAioHandle *ioh = staged_ios[i];
+		struct io_uring_sqe *sqe;
+
+		sqe = io_uring_get_sqe(uring_instance);
+
+		if (!sqe)
+			elog(ERROR, "io_uring submission queue is unexpectedly full");
+
+		pgaio_io_prepare_submit(ioh);
+		pgaio_uring_sq_from_io(ioh, sqe);
+
+		/*
+		 * io_uring executes IO in process context if possible. That's
+		 * generally good, as it reduces context switching. When performing a
+		 * lot of buffered IO that means that copying between page cache and
+		 * userspace memory happens in the foreground, as it can't be
+		 * offloaded to DMA hardware as is possible when using direct IO. When
+		 * executing a lot of buffered IO this causes io_uring to be slower
+		 * than worker mode, as worker mode parallelizes the copying. io_uring
+		 * can be told to offload work to worker threads instead.
+		 *
+		 * If an IO is buffered IO and we already have IOs in flight or
+		 * multiple IOs are being submitted, we thus tell io_uring to execute
+		 * the IO in the background. We don't do so for the first few IOs
+		 * being submitted as executing in this process' context has lower
+		 * latency.
+		 */
+		if (in_flight_before > 4 && (ioh->flags & PGAIO_HF_BUFFERED))
+			io_uring_sqe_set_flags(sqe, IOSQE_ASYNC);
+
+		in_flight_before++;
+	}
+
+	while (true)
+	{
+		int			ret;
+
+		pgstat_report_wait_start(WAIT_EVENT_AIO_IO_URING_SUBMIT);
+		ret = io_uring_submit(uring_instance);
+		pgstat_report_wait_end();
+
+		if (ret == -EINTR)
+		{
+			pgaio_debug(DEBUG3,
+						"aio method uring: submit EINTR, nios: %d",
+						num_staged_ios);
+			continue;
+		}
+		if (ret < 0)
+			elog(PANIC, "failed: %d/%s",
+				 ret, strerror(-ret));
+		else if (ret != num_staged_ios)
+		{
+			/* likely unreachable, but if it is, we would need to re-submit */
+			elog(PANIC, "submitted only %d of %d",
+				 ret, num_staged_ios);
+		}
+		else
+		{
+			pgaio_debug(DEBUG4,
+						"aio method uring: submitted %d IOs",
+						num_staged_ios);
+		}
+		break;
+	}
+
+	return num_staged_ios;
+}
+
+
+#define PGAIO_MAX_LOCAL_COMPLETED_IO 32
+
+static void
+pgaio_uring_drain_locked(PgAioUringContext *context)
+{
+	int			ready;
+	int			orig_ready;
+
+	/*
+	 * Don't drain more events than available right now. Otherwise it's
+	 * plausible that one backend could get stuck, for a while, receiving CQEs
+	 * without actually processing them.
+	 */
+	orig_ready = ready = io_uring_cq_ready(&context->io_uring_ring);
+
+	while (ready > 0)
+	{
+		struct io_uring_cqe *cqes[PGAIO_MAX_LOCAL_COMPLETED_IO];
+		uint32		ncqes;
+
+		START_CRIT_SECTION();
+		ncqes =
+			io_uring_peek_batch_cqe(&context->io_uring_ring,
+									cqes,
+									Min(PGAIO_MAX_LOCAL_COMPLETED_IO, ready));
+		Assert(ncqes <= ready);
+
+		ready -= ncqes;
+
+		for (int i = 0; i < ncqes; i++)
+		{
+			struct io_uring_cqe *cqe = cqes[i];
+			PgAioHandle *ioh;
+
+			ioh = io_uring_cqe_get_data(cqe);
+			io_uring_cqe_seen(&context->io_uring_ring, cqe);
+
+			pgaio_io_process_completion(ioh, cqe->res);
+		}
+
+		END_CRIT_SECTION();
+
+		pgaio_debug(DEBUG3,
+					"drained %d/%d, now expecting %d",
+					ncqes, orig_ready, io_uring_cq_ready(&context->io_uring_ring));
+	}
+}
+
+static void
+pgaio_uring_wait_one(PgAioHandle *ioh, uint64 ref_generation)
+{
+	PgAioHandleState state;
+	ProcNumber	owner_procno = ioh->owner_procno;
+	PgAioUringContext *owner_context = &pgaio_uring_contexts[owner_procno];
+	bool		expect_cqe;
+	int			waited = 0;
+
+	/*
+	 * We ought to have a smarter locking scheme, nearly all the time the
+	 * backend owning the ring will consume the completions, making the
+	 * locking unnecessarily expensive.
+	 */
+	LWLockAcquire(&owner_context->completion_lock, LW_EXCLUSIVE);
+
+	while (true)
+	{
+		pgaio_debug_io(DEBUG3, ioh,
+					   "wait_one io_gen: %llu, ref_gen: %llu, cycle %d",
+					   (long long unsigned) ref_generation,
+					   (long long unsigned) ioh->generation,
+					   waited);
+
+		if (pgaio_io_was_recycled(ioh, ref_generation, &state) ||
+			state != PGAIO_HS_SUBMITTED)
+		{
+			break;
+		}
+		else if (io_uring_cq_ready(&owner_context->io_uring_ring))
+		{
+			expect_cqe = true;
+		}
+		else
+		{
+			int			ret;
+			struct io_uring_cqe *cqes;
+
+			pgstat_report_wait_start(WAIT_EVENT_AIO_IO_URING_COMPLETION);
+			ret = io_uring_wait_cqes(&owner_context->io_uring_ring, &cqes, 1, NULL, NULL);
+			pgstat_report_wait_end();
+
+			if (ret == -EINTR)
+			{
+				continue;
+			}
+			else if (ret != 0)
+			{
+				elog(PANIC, "unexpected: %d/%s: %m", ret, strerror(-ret));
+			}
+			else
+			{
+				Assert(cqes != NULL);
+				expect_cqe = true;
+				waited++;
+			}
+		}
+
+		if (expect_cqe)
+		{
+			pgaio_uring_drain_locked(owner_context);
+		}
+	}
+
+	LWLockRelease(&owner_context->completion_lock);
+
+	pgaio_debug(DEBUG3,
+				"wait_one with %d sleeps",
+				waited);
+}
+
+static void
+pgaio_uring_sq_from_io(PgAioHandle *ioh, struct io_uring_sqe *sqe)
+{
+	struct iovec *iov;
+
+	switch (ioh->op)
+	{
+		case PGAIO_OP_READV:
+			iov = &pgaio_ctl->iovecs[ioh->iovec_off];
+			if (ioh->op_data.read.iov_length == 1)
+			{
+				io_uring_prep_read(sqe,
+								   ioh->op_data.read.fd,
+								   iov->iov_base,
+								   iov->iov_len,
+								   ioh->op_data.read.offset);
+			}
+			else
+			{
+				io_uring_prep_readv(sqe,
+									ioh->op_data.read.fd,
+									iov,
+									ioh->op_data.read.iov_length,
+									ioh->op_data.read.offset);
+
+			}
+			break;
+
+		case PGAIO_OP_WRITEV:
+			iov = &pgaio_ctl->iovecs[ioh->iovec_off];
+			if (ioh->op_data.write.iov_length == 1)
+			{
+				io_uring_prep_write(sqe,
+									ioh->op_data.write.fd,
+									iov->iov_base,
+									iov->iov_len,
+									ioh->op_data.write.offset);
+			}
+			else
+			{
+				io_uring_prep_writev(sqe,
+									 ioh->op_data.write.fd,
+									 iov,
+									 ioh->op_data.write.iov_length,
+									 ioh->op_data.write.offset);
+			}
+			break;
+
+		case PGAIO_OP_INVALID:
+			elog(ERROR, "trying to prepare invalid IO operation for execution");
+	}
+
+	io_uring_sqe_set_data(sqe, ioh);
+}
+
+#endif							/* USE_LIBURING */
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index b02625194be..9fec95dd4b7 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -166,6 +166,7 @@ static const char *const BuiltinTrancheNames[] = {
 	[LWTRANCHE_SUBTRANS_SLRU] = "SubtransSLRU",
 	[LWTRANCHE_XACT_SLRU] = "XactSLRU",
 	[LWTRANCHE_PARALLEL_VACUUM_DSA] = "ParallelVacuumDSA",
+	[LWTRANCHE_AIO_URING_COMPLETION] = "AioUringCompletion",
 };
 
 StaticAssertDecl(lengthof(BuiltinTrancheNames) ==
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index cb977b049d8..a2d1a9fa4ec 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -192,6 +192,8 @@ ABI_compatibility:
 
 Section: ClassName - WaitEventIO
 
+AIO_IO_URING_SUBMIT	"Waiting for IO submission via io_uring."
+AIO_IO_URING_COMPLETION	"Waiting for IO completion via io_uring."
 AIO_IO_COMPLETION	"Waiting for IO completion."
 BASEBACKUP_READ	"Waiting for base backup to read from a file."
 BASEBACKUP_SYNC	"Waiting for data written by a base backup to reach durable storage."
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 3bb4e0d4d7d..50fde0ba2c3 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -199,7 +199,8 @@
 #maintenance_io_concurrency = 10	# 1-1000; 0 disables prefetching
 #io_combine_limit = 128kB		# usually 1-32 blocks (depends on OS)
 
-#io_method = worker			# worker, sync (change requires restart)
+#io_method = worker			# worker, io_uring, sync
+					# (change requires restart)
 #io_max_concurrency = -1		# Max number of IOs that one process
 					# can execute simultaneously
 					# -1 sets based on shared_buffers
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 2cf0120bb56..c13c1e9e95b 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2661,6 +2661,12 @@ include_dir 'conf.d'
             <literal>worker</literal> (execute asynchronous I/O using worker processes)
            </para>
           </listitem>
+          <listitem>
+           <para>
+            <literal>io_uring</literal> (execute asynchronous I/O using
+            io_uring, if available)
+           </para>
+          </listitem>
           <listitem>
            <para>
             <literal>sync</literal> (execute asynchronous I/O synchronously)
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index e34727e269a..d4734b85c0d 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2131,6 +2131,7 @@ PgAioReturn
 PgAioTargetData
 PgAioTargetID
 PgAioTargetInfo
+PgAioUringContext
 PgAioWaitRef
 PgArchData
 PgBackendGSSStatus
-- 
2.48.1.76.g4e746b1a31.dirty

