From b5719ca7a1c6257929b71c45ba8cb9da3bd53665 Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot.pg@gmail.com>
Date: Wed, 28 Aug 2024 12:59:02 +0000
Subject: [PATCH v3 2/2] Add pg_stat_get_backend_io()

Adding the pg_stat_get_backend_io() function to retrieve I/O statistics for
a particular backend pid. Note this function behaves as if stats_fetch_consistency
is set to none.
---
 doc/src/sgml/monitoring.sgml           |  18 ++++
 src/backend/utils/activity/pgstat_io.c |   6 ++
 src/backend/utils/adt/pgstatfuncs.c    | 120 +++++++++++++++++++++++++
 src/include/catalog/pg_proc.dat        |   9 ++
 src/include/pgstat.h                   |   1 +
 src/test/regress/expected/stats.out    |  25 ++++++
 src/test/regress/sql/stats.sql         |  16 +++-
 7 files changed, 194 insertions(+), 1 deletion(-)
  10.9% doc/src/sgml/
  47.1% src/backend/utils/adt/
   9.2% src/include/catalog/
  15.4% src/test/regress/expected/
  14.4% src/test/regress/sql/

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index b28ca4e0ca..fb908172f8 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -4770,6 +4770,24 @@ description | Waiting for a newly initialized WAL file to reach durable storage
        </para></entry>
       </row>
 
+      <row>
+       <entry role="func_table_entry"><para role="func_signature">
+        <indexterm>
+         <primary>pg_stat_get_backend_io</primary>
+        </indexterm>
+        <function>pg_stat_get_backend_io</function> ( <type>integer</type> )
+        <returnvalue>setof record</returnvalue>
+       </para>
+       <para>
+        Returns I/O statistics about the backend with the specified
+        process ID. The output fields are exactly the same as the ones in the
+        <link linkend="monitoring-pg-stat-io-view"> <structname>pg_stat_io</structname></link>
+        view. This function behaves as if <varname>stats_fetch_consistency</varname>
+        is set to <literal>none</literal> (means each execution re-fetches
+        counters from shared memory).
+       </para></entry>
+      </row>
+
       <row>
        <entry role="func_table_entry"><para role="func_signature">
         <indexterm>
diff --git a/src/backend/utils/activity/pgstat_io.c b/src/backend/utils/activity/pgstat_io.c
index 611ca21720..cf6a49cf48 100644
--- a/src/backend/utils/activity/pgstat_io.c
+++ b/src/backend/utils/activity/pgstat_io.c
@@ -170,6 +170,12 @@ pgstat_fetch_my_stat_io(void)
 	return &pgStatLocal.snapshot.my_io;
 }
 
+PgStat_Backend_IO *
+pgstat_fetch_proc_stat_io(ProcNumber procNumber)
+{
+	return &pgStatLocal.shmem->io.stat[procNumber];
+}
+
 /*
  * Check if there any IO stats waiting for flush.
  */
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 4c00570396..795ff4e2f6 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -1570,6 +1570,126 @@ pg_stat_get_my_io(PG_FUNCTION_ARGS)
 	return (Datum) 0;
 }
 
+Datum
+pg_stat_get_backend_io(PG_FUNCTION_ARGS)
+{
+	ReturnSetInfo *rsinfo;
+	PgStat_Backend_IO *backend_io_stats;
+	Datum		reset_time;
+	ProcNumber	procNumber;
+	PGPROC	   *proc;
+	BackendType bktype;
+	Datum		bktype_desc;
+	PgStat_BktypeIO *bktype_stats;
+
+	int			backend_pid = PG_GETARG_INT32(0);
+
+	rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+	InitMaterializedSRF(fcinfo, 0);
+
+	proc = BackendPidGetProc(backend_pid);
+
+	/* Maybe an auxiliary process? */
+	if (proc == NULL)
+		proc = AuxiliaryPidGetProc(backend_pid);
+
+	/* This backend_pid does not exist */
+	if (proc != NULL)
+	{
+		procNumber = GetNumberFromPGProc(proc);
+		backend_io_stats = pgstat_fetch_proc_stat_io(procNumber);
+		bktype = backend_io_stats->bktype;
+		reset_time = TimestampTzGetDatum(backend_io_stats->stat_reset_timestamp);
+
+		bktype_desc = CStringGetTextDatum(GetBackendTypeDesc(bktype));
+		bktype_stats = &backend_io_stats->stats;
+
+		/*
+		 * In Assert builds, we can afford an extra loop through all of the
+		 * counters checking that only expected stats are non-zero, since it
+		 * keeps the non-Assert code cleaner.
+		 */
+		Assert(pgstat_bktype_io_stats_valid(bktype_stats, bktype));
+
+		for (int io_obj = 0; io_obj < IOOBJECT_NUM_TYPES; io_obj++)
+		{
+			const char *obj_name = pgstat_get_io_object_name(io_obj);
+
+			for (int io_context = 0; io_context < IOCONTEXT_NUM_TYPES; io_context++)
+			{
+				const char *context_name = pgstat_get_io_context_name(io_context);
+
+				Datum		values[IO_NUM_COLUMNS] = {0};
+				bool		nulls[IO_NUM_COLUMNS] = {0};
+
+				/*
+				 * Some combinations of BackendType, IOObject, and IOContext
+				 * are not valid for any type of IOOp. In such cases, omit the
+				 * entire row from the view.
+				 */
+				if (!pgstat_tracks_io_object(bktype, io_obj, io_context))
+					continue;
+
+				values[IO_COL_BACKEND_TYPE] = bktype_desc;
+				values[IO_COL_CONTEXT] = CStringGetTextDatum(context_name);
+				values[IO_COL_OBJECT] = CStringGetTextDatum(obj_name);
+				if (backend_io_stats->stat_reset_timestamp != 0)
+					values[IO_COL_RESET_TIME] = reset_time;
+				else
+					nulls[IO_COL_RESET_TIME] = true;
+
+				/*
+				 * Hard-code this to the value of BLCKSZ for now. Future
+				 * values could include XLOG_BLCKSZ, once WAL IO is tracked,
+				 * and constant multipliers, once non-block-oriented IO (e.g.
+				 * temporary file IO) is tracked.
+				 */
+				values[IO_COL_CONVERSION] = Int64GetDatum(BLCKSZ);
+
+				for (int io_op = 0; io_op < IOOP_NUM_TYPES; io_op++)
+				{
+					int			op_idx = pgstat_get_io_op_index(io_op);
+					int			time_idx = pgstat_get_io_time_index(io_op);
+
+					/*
+					 * Some combinations of BackendType and IOOp, of IOContext
+					 * and IOOp, and of IOObject and IOOp are not tracked. Set
+					 * these cells in the view NULL.
+					 */
+					if (pgstat_tracks_io_op(bktype, io_obj, io_context, io_op))
+					{
+						PgStat_Counter count =
+							bktype_stats->counts[io_obj][io_context][io_op];
+
+						values[op_idx] = Int64GetDatum(count);
+					}
+					else
+						nulls[op_idx] = true;
+
+					/* not every operation is timed */
+					if (time_idx == IO_COL_INVALID)
+						continue;
+
+					if (!nulls[op_idx])
+					{
+						PgStat_Counter time =
+							bktype_stats->times[io_obj][io_context][io_op];
+
+						values[time_idx] = Float8GetDatum(pg_stat_us_to_ms(time));
+					}
+					else
+						nulls[time_idx] = true;
+				}
+
+				tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
+									 values, nulls);
+			}
+		}
+	}
+
+	return (Datum) 0;
+}
+
 /*
  * Returns statistics of WAL activity
  */
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 62658e9a00..0a540a590f 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5840,6 +5840,15 @@
   proargnames => '{backend_type,object,context,reads,read_time,writes,write_time,writebacks,writeback_time,extends,extend_time,op_bytes,hits,evictions,reuses,fsyncs,fsync_time,stats_reset}',
   prosrc => 'pg_stat_get_my_io' },
 
+{ oid => '8896', descr => 'statistics: per backend type IO statistics',
+  proname => 'pg_stat_get_backend_io', prorows => '30', proretset => 't',
+  provolatile => 'v', proparallel => 'r', prorettype => 'record',
+  proargtypes => 'int4',
+  proallargtypes => '{int4,text,text,text,int8,float8,int8,float8,int8,float8,int8,float8,int8,int8,int8,int8,int8,float8,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{backend_pid,backend_type,object,context,reads,read_time,writes,write_time,writebacks,writeback_time,extends,extend_time,op_bytes,hits,evictions,reuses,fsyncs,fsync_time,stats_reset}',
+  prosrc => 'pg_stat_get_backend_io' },
+
 { oid => '1136', descr => 'statistics: information about WAL activity',
   proname => 'pg_stat_get_wal', proisstrict => 'f', provolatile => 's',
   proparallel => 'r', prorettype => 'record', proargtypes => '',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 9d63b1a5b7..2a2b4a2947 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -570,6 +570,7 @@ extern void pgstat_count_io_op_time(IOObject io_object, IOContext io_context,
 
 extern PgStat_IO *pgstat_fetch_global_stat_io(void);
 extern PgStat_Backend_IO *pgstat_fetch_my_stat_io(void);
+extern PgStat_Backend_IO *pgstat_fetch_proc_stat_io(ProcNumber procNumber);
 extern const char *pgstat_get_io_context_name(IOContext io_context);
 extern const char *pgstat_get_io_object_name(IOObject io_object);
 
diff --git a/src/test/regress/expected/stats.out b/src/test/regress/expected/stats.out
index c489e528e0..aced015c2f 100644
--- a/src/test/regress/expected/stats.out
+++ b/src/test/regress/expected/stats.out
@@ -1263,12 +1263,18 @@ SELECT sum(extends) AS io_sum_shared_before_extends
   FROM pg_stat_io WHERE context = 'normal' AND object = 'relation' \gset
 SELECT sum(extends) AS my_io_sum_shared_before_extends
   FROM pg_my_stat_io WHERE context = 'normal' AND object = 'relation' \gset
+SELECT sum(extends) AS backend_io_sum_shared_before_extends
+  FROM pg_stat_get_backend_io(pg_backend_pid())
+  WHERE context = 'normal' AND object = 'relation' \gset
 SELECT sum(writes) AS writes, sum(fsyncs) AS fsyncs
   FROM pg_stat_io
   WHERE object = 'relation' \gset io_sum_shared_before_
 SELECT sum(writes) AS writes, sum(fsyncs) AS fsyncs
   FROM pg_my_stat_io
   WHERE object = 'relation' \gset my_io_sum_shared_before_
+SELECT sum(writes) AS writes, sum(fsyncs) AS fsyncs
+  FROM pg_stat_get_backend_io(pg_backend_pid())
+  WHERE object = 'relation' \gset backend_io_sum_shared_before_
 CREATE TABLE test_io_shared(a int);
 INSERT INTO test_io_shared SELECT i FROM generate_series(1,100)i;
 SELECT pg_stat_force_next_flush();
@@ -1293,6 +1299,15 @@ SELECT :my_io_sum_shared_after_extends > :my_io_sum_shared_before_extends;
  t
 (1 row)
 
+SELECT sum(extends) AS backend_io_sum_shared_after_extends
+  FROM pg_stat_get_backend_io(pg_backend_pid())
+  WHERE context = 'normal' AND object = 'relation' \gset
+SELECT :backend_io_sum_shared_after_extends > :backend_io_sum_shared_before_extends;
+ ?column? 
+----------
+ t
+(1 row)
+
 -- After a checkpoint, there should be some additional IOCONTEXT_NORMAL writes
 -- and fsyncs in the global stats (not for the backend).
 -- See comment above for rationale for two explicit CHECKPOINTs.
@@ -1553,6 +1568,8 @@ SELECT sum(evictions) + sum(reuses) + sum(extends) + sum(fsyncs) + sum(reads) +
   FROM pg_stat_io \gset
 SELECT sum(evictions) + sum(reuses) + sum(extends) + sum(fsyncs) + sum(reads) + sum(writes) + sum(writebacks) + sum(hits) AS my_io_stats_pre_reset
   FROM pg_my_stat_io \gset
+SELECT sum(evictions) + sum(reuses) + sum(extends) + sum(fsyncs) + sum(reads) + sum(writes) + sum(writebacks) + sum(hits) AS backend_io_stats_pre_reset
+  FROM pg_stat_get_backend_io(pg_backend_pid()) \gset
 SELECT pg_stat_reset_shared('io');
  pg_stat_reset_shared 
 ----------------------
@@ -1575,6 +1592,14 @@ SELECT :my_io_stats_post_reset < :my_io_stats_pre_reset;
  t
 (1 row)
 
+SELECT sum(evictions) + sum(reuses) + sum(extends) + sum(fsyncs) + sum(reads) + sum(writes) + sum(writebacks) + sum(hits) AS backend_io_stats_post_reset
+  FROM pg_stat_get_backend_io(pg_backend_pid()) \gset
+SELECT :backend_io_stats_post_reset < :backend_io_stats_pre_reset;
+ ?column? 
+----------
+ t
+(1 row)
+
 -- test BRIN index doesn't block HOT update
 CREATE TABLE brin_hot (
   id  integer PRIMARY KEY,
diff --git a/src/test/regress/sql/stats.sql b/src/test/regress/sql/stats.sql
index c95cb71652..d05009e1f5 100644
--- a/src/test/regress/sql/stats.sql
+++ b/src/test/regress/sql/stats.sql
@@ -611,12 +611,18 @@ SELECT sum(extends) AS io_sum_shared_before_extends
   FROM pg_stat_io WHERE context = 'normal' AND object = 'relation' \gset
 SELECT sum(extends) AS my_io_sum_shared_before_extends
   FROM pg_my_stat_io WHERE context = 'normal' AND object = 'relation' \gset
+SELECT sum(extends) AS backend_io_sum_shared_before_extends
+  FROM pg_stat_get_backend_io(pg_backend_pid())
+  WHERE context = 'normal' AND object = 'relation' \gset
 SELECT sum(writes) AS writes, sum(fsyncs) AS fsyncs
   FROM pg_stat_io
   WHERE object = 'relation' \gset io_sum_shared_before_
 SELECT sum(writes) AS writes, sum(fsyncs) AS fsyncs
   FROM pg_my_stat_io
   WHERE object = 'relation' \gset my_io_sum_shared_before_
+SELECT sum(writes) AS writes, sum(fsyncs) AS fsyncs
+  FROM pg_stat_get_backend_io(pg_backend_pid())
+  WHERE object = 'relation' \gset backend_io_sum_shared_before_
 CREATE TABLE test_io_shared(a int);
 INSERT INTO test_io_shared SELECT i FROM generate_series(1,100)i;
 SELECT pg_stat_force_next_flush();
@@ -626,6 +632,10 @@ SELECT :io_sum_shared_after_extends > :io_sum_shared_before_extends;
 SELECT sum(extends) AS my_io_sum_shared_after_extends
   FROM pg_my_stat_io WHERE context = 'normal' AND object = 'relation' \gset
 SELECT :my_io_sum_shared_after_extends > :my_io_sum_shared_before_extends;
+SELECT sum(extends) AS backend_io_sum_shared_after_extends
+  FROM pg_stat_get_backend_io(pg_backend_pid())
+  WHERE context = 'normal' AND object = 'relation' \gset
+SELECT :backend_io_sum_shared_after_extends > :backend_io_sum_shared_before_extends;
 
 -- After a checkpoint, there should be some additional IOCONTEXT_NORMAL writes
 -- and fsyncs in the global stats (not for the backend).
@@ -778,6 +788,8 @@ SELECT sum(evictions) + sum(reuses) + sum(extends) + sum(fsyncs) + sum(reads) +
   FROM pg_stat_io \gset
 SELECT sum(evictions) + sum(reuses) + sum(extends) + sum(fsyncs) + sum(reads) + sum(writes) + sum(writebacks) + sum(hits) AS my_io_stats_pre_reset
   FROM pg_my_stat_io \gset
+SELECT sum(evictions) + sum(reuses) + sum(extends) + sum(fsyncs) + sum(reads) + sum(writes) + sum(writebacks) + sum(hits) AS backend_io_stats_pre_reset
+  FROM pg_stat_get_backend_io(pg_backend_pid()) \gset
 SELECT pg_stat_reset_shared('io');
 SELECT sum(evictions) + sum(reuses) + sum(extends) + sum(fsyncs) + sum(reads) + sum(writes) + sum(writebacks) + sum(hits) AS io_stats_post_reset
   FROM pg_stat_io \gset
@@ -785,7 +797,9 @@ SELECT :io_stats_post_reset < :io_stats_pre_reset;
 SELECT sum(evictions) + sum(reuses) + sum(extends) + sum(fsyncs) + sum(reads) + sum(writes) + sum(writebacks) + sum(hits) AS my_io_stats_post_reset
   FROM pg_my_stat_io \gset
 SELECT :my_io_stats_post_reset < :my_io_stats_pre_reset;
-
+SELECT sum(evictions) + sum(reuses) + sum(extends) + sum(fsyncs) + sum(reads) + sum(writes) + sum(writebacks) + sum(hits) AS backend_io_stats_post_reset
+  FROM pg_stat_get_backend_io(pg_backend_pid()) \gset
+SELECT :backend_io_stats_post_reset < :backend_io_stats_pre_reset;
 
 -- test BRIN index doesn't block HOT update
 CREATE TABLE brin_hot (
-- 
2.34.1

