From 05a9903295cb3b57ca9144217e89f0aac27277b5 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathan@postgresql.org>
Date: Wed, 15 May 2024 12:07:10 -0500
Subject: [PATCH v1 1/1] parallel get relinfos

---
 src/bin/pg_upgrade/info.c        | 266 +++++++++++++++++++++++--------
 src/tools/pgindent/typedefs.list |   1 +
 2 files changed, 202 insertions(+), 65 deletions(-)

diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index 95c22a7200..bb28e262c7 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -11,6 +11,7 @@
 
 #include "access/transam.h"
 #include "catalog/pg_class_d.h"
+#include "fe_utils/string_utils.h"
 #include "pg_upgrade.h"
 
 static void create_rel_filename_map(const char *old_data, const char *new_data,
@@ -22,13 +23,16 @@ static void report_unmatched_relation(const RelInfo *rel, const DbInfo *db,
 static void free_db_and_rel_infos(DbInfoArr *db_arr);
 static void get_template0_info(ClusterInfo *cluster);
 static void get_db_infos(ClusterInfo *cluster);
-static void get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo);
+static void start_rel_infos_query(PGconn *conn);
+static void get_rel_infos_result(PGconn *conn, DbInfo *dbinfo);
 static void free_rel_infos(RelInfoArr *rel_arr);
 static void print_db_infos(DbInfoArr *db_arr);
 static void print_rel_infos(RelInfoArr *rel_arr);
 static void print_slot_infos(LogicalSlotInfoArr *slot_arr);
-static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check);
-static void get_db_subscription_count(DbInfo *dbinfo);
+static void start_old_cluster_logical_slot_infos_query(PGconn *conn, bool live_check);
+static void get_old_cluster_logical_slot_infos_result(PGconn *conn, DbInfo *dbinfo);
+static void start_db_sub_count_query(PGconn *conn, DbInfo *dbinfo);
+static void get_db_sub_count_result(PGconn *conn, DbInfo *dbinfo);
 
 
 /*
@@ -268,6 +272,16 @@ report_unmatched_relation(const RelInfo *rel, const DbInfo *db, bool is_new_db)
 			   reloid, db->db_name, reldesc);
 }
 
+typedef enum
+{
+	UNUSED,
+	CONN_STARTED,
+	CONNECTING,
+	STARTED_RELINFO_QUERY,
+	STARTED_LOGICAL_QUERY,
+	STARTED_SUBSCRIPTION_QUERY,
+} InfoState;
+
 /*
  * get_db_rel_and_slot_infos()
  *
@@ -279,7 +293,12 @@ report_unmatched_relation(const RelInfo *rel, const DbInfo *db, bool is_new_db)
 void
 get_db_rel_and_slot_infos(ClusterInfo *cluster, bool live_check)
 {
-	int			dbnum;
+	int			dbnum = 0;
+	int			dbnum_proc = 0;
+	InfoState  *states;
+	int		   *dbs;
+	PGconn	  **conns;
+	int			jobs = (user_opts.jobs < 1) ? 1 : user_opts.jobs;
 
 	if (cluster->dbarr.dbs != NULL)
 		free_db_and_rel_infos(&cluster->dbarr);
@@ -287,20 +306,103 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster, bool live_check)
 	get_template0_info(cluster);
 	get_db_infos(cluster);
 
-	for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
-	{
-		DbInfo	   *pDbInfo = &cluster->dbarr.dbs[dbnum];
+	states = (InfoState *) pg_malloc(sizeof(InfoState *) * jobs);
+	dbs = (int *) pg_malloc(sizeof(int) * jobs);
+	conns = (PGconn **) pg_malloc(sizeof(PGconn *) * jobs);
 
-		get_rel_infos(cluster, pDbInfo);
+	for (int i = 0; i < jobs; i++)
+		states[i] = UNUSED;
 
-		/*
-		 * Retrieve the logical replication slots infos and the subscriptions
-		 * count for the old cluster.
-		 */
-		if (cluster == &old_cluster)
+	while (dbnum < cluster->dbarr.ndbs)
+	{
+		for (int i = 0; i < jobs; i++)
 		{
-			get_old_cluster_logical_slot_infos(pDbInfo, live_check);
-			get_db_subscription_count(pDbInfo);
+			switch (states[i])
+			{
+				case UNUSED:
+					if (dbnum_proc < cluster->dbarr.ndbs)
+					{
+						PQExpBufferData conn_opts;
+
+						dbs[i] = dbnum_proc++;
+
+						/* Build connection string with proper quoting */
+						initPQExpBuffer(&conn_opts);
+						appendPQExpBufferStr(&conn_opts, "dbname=");
+						appendConnStrVal(&conn_opts, cluster->dbarr.dbs[dbs[i]].db_name);
+						appendPQExpBufferStr(&conn_opts, " user=");
+						appendConnStrVal(&conn_opts, os_info.user);
+						appendPQExpBuffer(&conn_opts, " port=%d", cluster->port);
+						if (cluster->sockdir)
+						{
+							appendPQExpBufferStr(&conn_opts, " host=");
+							appendConnStrVal(&conn_opts, cluster->sockdir);
+						}
+
+						conns[i] = PQconnectStart(conn_opts.data);
+						termPQExpBuffer(&conn_opts);
+						states[i] = CONNECTING;
+					}
+					break;
+				case CONNECTING:
+					if (PQconnectPoll(conns[i]) == PGRES_POLLING_FAILED)
+					{
+						pg_log(PG_REPORT, "%s", PQerrorMessage(conns[i]));
+						exit(1);
+					}
+					if (PQconnectPoll(conns[i]) == PGRES_POLLING_OK)
+						states[i] = CONN_STARTED;
+					break;
+				case CONN_STARTED:
+					if (PQstatus(conns[i]) == CONNECTION_OK)
+					{
+						start_rel_infos_query(conns[i]);
+						states[i] = STARTED_RELINFO_QUERY;
+					}
+					break;
+				case STARTED_RELINFO_QUERY:
+					if (PQisBusy(conns[i]))
+						PQconsumeInput(conns[i]);
+					else
+					{
+						get_rel_infos_result(conns[i], &cluster->dbarr.dbs[dbs[i]]);
+
+						if (cluster == &old_cluster &&
+							GET_MAJOR_VERSION(old_cluster.major_version) >= 1700)
+						{
+							start_old_cluster_logical_slot_infos_query(conns[i], live_check);
+							states[i] = STARTED_LOGICAL_QUERY;
+						}
+						else
+						{
+							dbnum++;
+							PQfinish(conns[i]);
+							states[i] = UNUSED;
+						}
+					}
+					break;
+				case STARTED_LOGICAL_QUERY:
+					if (PQisBusy(conns[i]))
+						PQconsumeInput(conns[i]);
+					else
+					{
+						get_old_cluster_logical_slot_infos_result(conns[i], &cluster->dbarr.dbs[dbs[i]]);
+						start_db_sub_count_query(conns[i], &cluster->dbarr.dbs[dbs[i]]);
+						states[i] = STARTED_SUBSCRIPTION_QUERY;
+					}
+					break;
+				case STARTED_SUBSCRIPTION_QUERY:
+					if (PQisBusy(conns[i]))
+						PQconsumeInput(conns[i]);
+					else
+					{
+						get_db_sub_count_result(conns[i], &cluster->dbarr.dbs[dbs[i]]);
+						dbnum++;
+						PQfinish(conns[i]);
+						states[i] = UNUSED;
+					}
+					break;
+			}
 		}
 	}
 
@@ -450,29 +552,9 @@ get_db_infos(ClusterInfo *cluster)
  * This allows later processing to match up old and new databases efficiently.
  */
 static void
-get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
+start_rel_infos_query(PGconn *conn)
 {
-	PGconn	   *conn = connectToServer(cluster,
-									   dbinfo->db_name);
-	PGresult   *res;
-	RelInfo    *relinfos;
-	int			ntups;
-	int			relnum;
-	int			num_rels = 0;
-	char	   *nspname = NULL;
-	char	   *relname = NULL;
-	char	   *tablespace = NULL;
-	int			i_spclocation,
-				i_nspname,
-				i_relname,
-				i_reloid,
-				i_indtable,
-				i_toastheap,
-				i_relfilenumber,
-				i_reltablespace;
 	char		query[QUERY_ALLOC];
-	char	   *last_namespace = NULL,
-			   *last_tablespace = NULL;
 
 	query[0] = '\0';			/* initialize query string to empty */
 
@@ -552,7 +634,38 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
 			 "     ON c.reltablespace = t.oid "
 			 "ORDER BY 1;");
 
-	res = executeQueryOrDie(conn, "%s", query);
+	if (PQsendQuery(conn, query) == 0)
+	{
+		/* TODO: fail */
+	}
+}
+
+static void
+get_rel_infos_result(PGconn *conn, DbInfo *dbinfo)
+{
+	PGresult   *res = PQgetResult(conn);
+	RelInfo    *relinfos;
+	int			ntups;
+	int			relnum;
+	int			num_rels = 0;
+	char	   *nspname = NULL;
+	char	   *relname = NULL;
+	char	   *tablespace = NULL;
+	int			i_spclocation,
+				i_nspname,
+				i_relname,
+				i_reloid,
+				i_indtable,
+				i_toastheap,
+				i_relfilenumber,
+				i_reltablespace;
+	char	   *last_namespace = NULL,
+			   *last_tablespace = NULL;
+
+	if (PQgetResult(conn) != NULL)
+	{
+		/* TODO: fail */
+	}
 
 	ntups = PQntuples(res);
 
@@ -622,8 +735,6 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
 	}
 	PQclear(res);
 
-	PQfinish(conn);
-
 	dbinfo->rel_arr.rels = relinfos;
 	dbinfo->rel_arr.nrels = num_rels;
 }
@@ -645,19 +756,14 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
  * are included.
  */
 static void
-get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
+start_old_cluster_logical_slot_infos_query(PGconn *conn, bool live_check)
 {
-	PGconn	   *conn;
-	PGresult   *res;
-	LogicalSlotInfo *slotinfos = NULL;
-	int			num_slots;
+	char		query[QUERY_ALLOC];
 
 	/* Logical slots can be migrated since PG17. */
 	if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1600)
 		return;
 
-	conn = connectToServer(&old_cluster, dbinfo->db_name);
-
 	/*
 	 * Fetch the logical replication slot information. The check whether the
 	 * slot is considered caught up is done by an upgrade function. This
@@ -675,16 +781,34 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
 	 * started and stopped several times causing any temporary slots to be
 	 * removed.
 	 */
-	res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, failover, "
-							"%s as caught_up, invalidation_reason IS NOT NULL as invalid "
-							"FROM pg_catalog.pg_replication_slots "
-							"WHERE slot_type = 'logical' AND "
-							"database = current_database() AND "
-							"temporary IS FALSE;",
-							live_check ? "FALSE" :
-							"(CASE WHEN invalidation_reason IS NOT NULL THEN FALSE "
-							"ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
-							"END)");
+	snprintf(query, sizeof(query), "SELECT slot_name, plugin, two_phase, failover, "
+			 "%s as caught_up, invalidation_reason IS NOT NULL as invalid "
+			 "FROM pg_catalog.pg_replication_slots "
+			 "WHERE slot_type = 'logical' AND "
+			 "database = current_database() AND "
+			 "temporary IS FALSE;",
+			 live_check ? "FALSE" :
+			 "(CASE WHEN invalidation_reason IS NOT NULL THEN FALSE "
+			 "ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
+			 "END)");
+
+	if (PQsendQuery(conn, query) == 0)
+	{
+		/* TODO: fail */
+	}
+}
+
+static void
+get_old_cluster_logical_slot_infos_result(PGconn *conn, DbInfo *dbinfo)
+{
+	PGresult   *res = PQgetResult(conn);
+	LogicalSlotInfo *slotinfos = NULL;
+	int			num_slots;
+
+	if (PQgetResult(conn) != NULL)
+	{
+		/* TODO: fail */
+	}
 
 	num_slots = PQntuples(res);
 
@@ -720,7 +844,6 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
 	}
 
 	PQclear(res);
-	PQfinish(conn);
 
 	dbinfo->slot_arr.slots = slotinfos;
 	dbinfo->slot_arr.nslots = num_slots;
@@ -757,23 +880,36 @@ count_old_cluster_logical_slots(void)
  * not be able to upgrade the logical replication clusters completely.
  */
 static void
-get_db_subscription_count(DbInfo *dbinfo)
+start_db_sub_count_query(PGconn *conn, DbInfo *dbinfo)
 {
-	PGconn	   *conn;
-	PGresult   *res;
+	char		query[QUERY_ALLOC];
 
 	/* Subscriptions can be migrated since PG17. */
 	if (GET_MAJOR_VERSION(old_cluster.major_version) < 1700)
 		return;
 
-	conn = connectToServer(&old_cluster, dbinfo->db_name);
-	res = executeQueryOrDie(conn, "SELECT count(*) "
-							"FROM pg_catalog.pg_subscription WHERE subdbid = %u",
-							dbinfo->db_oid);
+	snprintf(query, sizeof(query), "SELECT count(*) "
+			 "FROM pg_catalog.pg_subscription WHERE subdbid = %u",
+			 dbinfo->db_oid);
+	if (PQsendQuery(conn, query) == 0)
+	{
+		/* TODO: fail */
+	}
+}
+
+static void
+get_db_sub_count_result(PGconn *conn, DbInfo *dbinfo)
+{
+	PGresult   *res = PQgetResult(conn);
+
+	if (PQgetResult(conn) != NULL)
+	{
+		/* TODO: fail */
+	}
+
 	dbinfo->nsubs = atoi(PQgetvalue(res, 0, 0));
 
 	PQclear(res);
-	PQfinish(conn);
 }
 
 /*
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 2b83c340fb..015019b18d 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1225,6 +1225,7 @@ IndxInfo
 InferClause
 InferenceElem
 InfoItem
+InfoState
 InhInfo
 InheritableSocket
 InitSampleScan_function
-- 
2.25.1

