From 460e366fe8797f2e44b9e4c1ed7500120af7da30 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Wed, 22 Jan 2025 13:44:40 -0500
Subject: [PATCH v2.6 09/34] aio: Add pg_aios view

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/include/catalog/pg_proc.dat      |  10 ++
 src/backend/catalog/system_views.sql |   3 +
 src/backend/storage/aio/Makefile     |   1 +
 src/backend/storage/aio/aio_funcs.c  | 219 +++++++++++++++++++++++++++
 src/backend/storage/aio/meson.build  |   1 +
 src/test/regress/expected/rules.out  |  16 ++
 6 files changed, 250 insertions(+)
 create mode 100644 src/backend/storage/aio/aio_funcs.c

diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index cede992b6e2..f416ed387d0 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -12469,4 +12469,14 @@
   proargtypes => 'int4',
   prosrc => 'gist_stratnum_common' },
 
+# AIO related functions
+{ oid => '9200', descr => 'information about in-progress asynchronous IOs',
+  proname => 'pg_get_aios', prorows => '100', proretset => 't',
+  provolatile => 'v', proparallel => 'r', prorettype => 'record', proargtypes => '',
+  proallargtypes => '{int4,int4,int8,text,text,int8,int8,text,int2,int4,text,text,bool,bool,bool}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{pid,io_id,io_generation,state,operation,offset,length,target,handle_data_len,raw_result,result,target_desc,f_sync,f_localmem,f_buffered}',
+  prosrc => 'pg_get_aios' },
+
+
 ]
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index a4d2cfdcaf5..73902a763d1 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1390,3 +1390,6 @@ CREATE VIEW pg_stat_subscription_stats AS
 
 CREATE VIEW pg_wait_events AS
     SELECT * FROM pg_get_wait_events();
+
+CREATE VIEW pg_aios AS
+    SELECT * FROM pg_get_aios();
diff --git a/src/backend/storage/aio/Makefile b/src/backend/storage/aio/Makefile
index c06c50771e0..3f2469cc399 100644
--- a/src/backend/storage/aio/Makefile
+++ b/src/backend/storage/aio/Makefile
@@ -11,6 +11,7 @@ include $(top_builddir)/src/Makefile.global
 OBJS = \
 	aio.o \
 	aio_callback.o \
+	aio_funcs.o \
 	aio_init.o \
 	aio_io.o \
 	aio_target.o \
diff --git a/src/backend/storage/aio/aio_funcs.c b/src/backend/storage/aio/aio_funcs.c
new file mode 100644
index 00000000000..58883fd61d8
--- /dev/null
+++ b/src/backend/storage/aio/aio_funcs.c
@@ -0,0 +1,219 @@
+/*-------------------------------------------------------------------------
+ *
+ * aio_funcs.c
+ *    AIO - SQL interface for AIO
+ *
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *    src/backend/storage/aio/aio_funcs.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "storage/aio.h"
+#include "storage/aio_internal.h"
+#include "utils/builtins.h"
+#include "funcapi.h"
+#include "storage/proc.h"
+
+
+/*
+ * Byte length of an iovec.
+ */
+static size_t
+iov_byte_length(const struct iovec *iov, int cnt)
+{
+	size_t		len = 0;
+
+	for (int i = 0; i < cnt; i++)
+	{
+		len += iov[i].iov_len;
+	}
+
+	return len;
+}
+
+Datum
+pg_get_aios(PG_FUNCTION_ARGS)
+{
+	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+
+	InitMaterializedSRF(fcinfo, 0);
+
+#define PG_GET_AIOS_COLS	15
+
+	for (uint64 i = 0; i < pgaio_ctl->io_handle_count; i++)
+	{
+		PgAioHandle *live_ioh = &pgaio_ctl->io_handles[i];
+		uint32		ioh_id = pgaio_io_get_id(live_ioh);
+		Datum		values[PG_GET_AIOS_COLS] = {0};
+		bool		nulls[PG_GET_AIOS_COLS] = {0};
+		ProcNumber	owner;
+		PGPROC	   *owner_proc;
+		int32		owner_pid;
+		PgAioHandleState start_state;
+		uint64		start_generation;
+		PgAioHandle ioh_copy;
+		struct iovec iov_copy[PG_IOV_MAX];
+
+retry:
+
+		/*
+		 * There is no lock that could prevent the state of the IO to advance
+		 * concurrently - and we don't want to introduce one, as that would
+		 * introduce atomics into a very common path. Instead we
+		 *
+		 * 1) Determine the state + generation of the IO.
+		 *
+		 * 2) Copy the IO to local memory.
+		 *
+		 * 3) Check if state or generation of the IO changed. If the state
+		 * changed, retry, if the generation changed don't display the IO.
+		 */
+
+		/* 1) from above */
+		start_generation = live_ioh->generation;
+		pg_read_barrier();
+		start_state = live_ioh->state;
+
+		if (start_state == PGAIO_HS_IDLE)
+			continue;
+
+		/* 2) from above */
+		memcpy(&ioh_copy, live_ioh, sizeof(PgAioHandle));
+
+		/*
+		 * Safe to copy even if no iovec is used - we always reserve the
+		 * required space.
+		 */
+		memcpy(&iov_copy, &pgaio_ctl->iovecs[ioh_copy.iovec_off],
+			   PG_IOV_MAX * sizeof(struct iovec));
+
+		/*
+		 * Copy information about owner before 3) below, if the process exited
+		 * it'd have to wait for the IO to finish first, which we would detect
+		 * in 3).
+		 */
+		owner = ioh_copy.owner_procno;
+		owner_proc = GetPGProcByNumber(owner);
+		owner_pid = owner_proc->pid;
+
+		/* 3) from above */
+		pg_read_barrier();
+
+		/*
+		 * The IO completed and a new one was started with the same ID. Don't
+		 * display it - it really started after this function was called.
+		 * There be a risk of a livelock if we just retried endlessly, if IOs
+		 * complete very quickly.
+		 */
+		if (live_ioh->generation != start_generation)
+			continue;
+
+		/*
+		 * The IOs state changed while we were "rendering" it. Just start from
+		 * scratch. There's no risk of a livelock here, as an IO has a limited
+		 * sets of states it can be in, and state changes go only in a single
+		 * direction.
+		 */
+		if (live_ioh->state != start_state)
+			goto retry;
+
+		/*
+		 * Now that we have copied the IO into local memory and checked that
+		 * it's still in the same state, we are not allowed to access "live"
+		 * memory anymore. To make it slightly easier to catch such cases, set
+		 * the "live" pointers to NULL.
+		 */
+		live_ioh = NULL;
+		owner_proc = NULL;
+
+
+		/* column: owning pid */
+		if (owner_pid != 0)
+			values[0] = Int32GetDatum(owner_pid);
+		else
+			nulls[0] = false;
+
+		/* column: IO's id */
+		values[1] = ioh_id;
+
+		/* column: IO's generation */
+		values[2] = Int64GetDatum(start_generation);
+
+		/* column: IO's state */
+		values[3] = CStringGetTextDatum(pgaio_io_get_state_name(&ioh_copy));
+
+		/*
+		 * If the IO is in PGAIO_HS_HANDED_OUT state, none of it's fields are
+		 * valid yet (or are in the process of being set). Therefore we don't
+		 * want to display any other columns.
+		 */
+		if (start_state == PGAIO_HS_HANDED_OUT)
+		{
+			memset(nulls + 4, 1, (lengthof(nulls) - 4) * sizeof(bool));
+			goto display;
+		}
+
+		/* column: IO's operation */
+		values[4] = CStringGetTextDatum(pgaio_io_get_op_name(&ioh_copy));
+
+		/* columns: details about the IO's operation */
+		switch (ioh_copy.op)
+		{
+			case PGAIO_OP_INVALID:
+				nulls[5] = true;
+				nulls[6] = true;
+				break;
+			case PGAIO_OP_READV:
+				values[5] = Int64GetDatum(ioh_copy.op_data.read.offset);
+				values[6] =
+					Int64GetDatum(iov_byte_length(iov_copy, ioh_copy.op_data.read.iov_length));
+				break;
+			case PGAIO_OP_WRITEV:
+				values[5] = Int64GetDatum(ioh_copy.op_data.write.offset);
+				values[6] =
+					Int64GetDatum(iov_byte_length(iov_copy, ioh_copy.op_data.write.iov_length));
+				break;
+		}
+
+		/* column: IO's target */
+		values[7] = CStringGetTextDatum(pgaio_io_get_target_name(&ioh_copy));
+
+		/* column: length of IO's data array */
+		values[8] = Int16GetDatum(ioh_copy.handle_data_len);
+
+		/* column: raw result (i.e. some form of syscall return value) */
+		if (start_state == PGAIO_HS_COMPLETED_IO
+			|| start_state == PGAIO_HS_COMPLETED_SHARED)
+			values[9] = Int32GetDatum(ioh_copy.result);
+		else
+			nulls[9] = true;
+
+		/*
+		 * column: result in the higher level representation (unknown if not
+		 * finished)
+		 */
+		values[10] =
+			CStringGetTextDatum(pgaio_result_status_string(ioh_copy.distilled_result.status));
+
+		/* column: target description */
+		values[11] = CStringGetTextDatum(pgaio_io_get_target_description(&ioh_copy));
+
+		/* columns: one for each flag */
+		values[12] = BoolGetDatum(ioh_copy.flags & PGAIO_HF_SYNCHRONOUS);
+		values[13] = BoolGetDatum(ioh_copy.flags & PGAIO_HF_REFERENCES_LOCAL);
+		values[14] = BoolGetDatum(ioh_copy.flags & PGAIO_HF_BUFFERED);
+
+display:
+
+		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
+	}
+
+	return (Datum) 0;
+}
diff --git a/src/backend/storage/aio/meson.build b/src/backend/storage/aio/meson.build
index 2f0f03d8071..da6df2d3654 100644
--- a/src/backend/storage/aio/meson.build
+++ b/src/backend/storage/aio/meson.build
@@ -3,6 +3,7 @@
 backend_sources += files(
   'aio.c',
   'aio_callback.c',
+  'aio_funcs.c',
   'aio_init.c',
   'aio_io.c',
   'aio_target.c',
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 62f69ac20b2..6570d0cddce 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1286,6 +1286,22 @@ drop table cchild;
 SELECT viewname, definition FROM pg_views
 WHERE schemaname = 'pg_catalog'
 ORDER BY viewname;
+pg_aios| SELECT pid,
+    io_id,
+    io_generation,
+    state,
+    operation,
+    "offset",
+    length,
+    target,
+    handle_data_len,
+    raw_result,
+    result,
+    target_desc,
+    f_sync,
+    f_localmem,
+    f_buffered
+   FROM pg_get_aios() pg_get_aios(pid, io_id, io_generation, state, operation, "offset", length, target, handle_data_len, raw_result, result, target_desc, f_sync, f_localmem, f_buffered);
 pg_available_extension_versions| SELECT e.name,
     e.version,
     (x.extname IS NOT NULL) AS installed,
-- 
2.48.1.76.g4e746b1a31.dirty

