From 997999700689d0a8dc4d01efc59b168974362992 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nathan@postgresql.org>
Date: Thu, 20 Jul 2023 10:19:08 -0700
Subject: [PATCH v2 4/4] use priority queue for pg_restore ready_list

---
 src/bin/pg_dump/pg_backup_archiver.c | 155 ++++++---------------------
 1 file changed, 35 insertions(+), 120 deletions(-)

diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 39ebcfec32..7c0e50353d 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -34,6 +34,7 @@
 #include "compress_io.h"
 #include "dumputils.h"
 #include "fe_utils/string_utils.h"
+#include "lib/binaryheap.h"
 #include "lib/stringinfo.h"
 #include "libpq/libpq-fs.h"
 #include "parallel.h"
@@ -44,24 +45,6 @@
 #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
 #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
 
-/*
- * State for tracking TocEntrys that are ready to process during a parallel
- * restore.  (This used to be a list, and we still call it that, though now
- * it's really an array so that we can apply qsort to it.)
- *
- * tes[] is sized large enough that we can't overrun it.
- * The valid entries are indexed first_te .. last_te inclusive.
- * We periodically sort the array to bring larger-by-dataLength entries to
- * the front; "sorted" is true if the valid entries are known sorted.
- */
-typedef struct _parallelReadyList
-{
-	TocEntry  **tes;			/* Ready-to-dump TocEntrys */
-	int			first_te;		/* index of first valid entry in tes[] */
-	int			last_te;		/* index of last valid entry in tes[] */
-	bool		sorted;			/* are valid entries currently sorted? */
-} ParallelReadyList;
-
 
 static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
 							   const pg_compress_specification compression_spec,
@@ -110,16 +93,12 @@ static void restore_toc_entries_postfork(ArchiveHandle *AH,
 static void pending_list_header_init(TocEntry *l);
 static void pending_list_append(TocEntry *l, TocEntry *te);
 static void pending_list_remove(TocEntry *te);
-static void ready_list_init(ParallelReadyList *ready_list, int tocCount);
-static void ready_list_free(ParallelReadyList *ready_list);
-static void ready_list_insert(ParallelReadyList *ready_list, TocEntry *te);
-static void ready_list_remove(ParallelReadyList *ready_list, int i);
-static void ready_list_sort(ParallelReadyList *ready_list);
 static int	TocEntrySizeCompare(const void *p1, const void *p2);
+static int  TocEntrySizeCompareBinaryHeap(Datum p1, Datum p2, void *arg);
 static void move_to_ready_list(TocEntry *pending_list,
-							   ParallelReadyList *ready_list,
+							   binaryheap *ready_list,
 							   RestorePass pass);
-static TocEntry *pop_next_work_item(ParallelReadyList *ready_list,
+static TocEntry *pop_next_work_item(binaryheap *ready_list,
 									ParallelState *pstate);
 static void mark_dump_job_done(ArchiveHandle *AH,
 							   TocEntry *te,
@@ -134,7 +113,7 @@ static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
 static void repoint_table_dependencies(ArchiveHandle *AH);
 static void identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te);
 static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
-								ParallelReadyList *ready_list);
+								binaryheap *ready_list);
 static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
 static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
 
@@ -4023,13 +4002,15 @@ static void
 restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
 							 TocEntry *pending_list)
 {
-	ParallelReadyList ready_list;
+	binaryheap *ready_list;
 	TocEntry   *next_work_item;
 
 	pg_log_debug("entering restore_toc_entries_parallel");
 
 	/* Set up ready_list with enough room for all known TocEntrys */
-	ready_list_init(&ready_list, AH->tocCount);
+	ready_list = binaryheap_allocate(AH->tocCount,
+									 TocEntrySizeCompareBinaryHeap,
+									 NULL);
 
 	/*
 	 * The pending_list contains all items that we need to restore.  Move all
@@ -4040,7 +4021,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
 	 * process in the current restore pass.
 	 */
 	AH->restorePass = RESTORE_PASS_MAIN;
-	move_to_ready_list(pending_list, &ready_list, AH->restorePass);
+	move_to_ready_list(pending_list, ready_list, AH->restorePass);
 
 	/*
 	 * main parent loop
@@ -4054,7 +4035,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
 	for (;;)
 	{
 		/* Look for an item ready to be dispatched to a worker */
-		next_work_item = pop_next_work_item(&ready_list, pstate);
+		next_work_item = pop_next_work_item(ready_list, pstate);
 		if (next_work_item != NULL)
 		{
 			/* If not to be restored, don't waste time launching a worker */
@@ -4064,7 +4045,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
 							next_work_item->dumpId,
 							next_work_item->desc, next_work_item->tag);
 				/* Update its dependencies as though we'd completed it */
-				reduce_dependencies(AH, next_work_item, &ready_list);
+				reduce_dependencies(AH, next_work_item, ready_list);
 				/* Loop around to see if anything else can be dispatched */
 				continue;
 			}
@@ -4075,7 +4056,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
 
 			/* Dispatch to some worker */
 			DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
-								   mark_restore_job_done, &ready_list);
+								   mark_restore_job_done, ready_list);
 		}
 		else if (IsEveryWorkerIdle(pstate))
 		{
@@ -4089,7 +4070,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
 			/* Advance to next restore pass */
 			AH->restorePass++;
 			/* That probably allows some stuff to be made ready */
-			move_to_ready_list(pending_list, &ready_list, AH->restorePass);
+			move_to_ready_list(pending_list, ready_list, AH->restorePass);
 			/* Loop around to see if anything's now ready */
 			continue;
 		}
@@ -4119,9 +4100,9 @@ restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
 	}
 
 	/* There should now be nothing in ready_list. */
-	Assert(ready_list.first_te > ready_list.last_te);
+	Assert(binaryheap_empty(ready_list));
 
-	ready_list_free(&ready_list);
+	binaryheap_free(ready_list);
 
 	pg_log_info("finished main parallel loop");
 }
@@ -4221,77 +4202,6 @@ pending_list_remove(TocEntry *te)
 }
 
 
-/*
- * Initialize the ready_list with enough room for up to tocCount entries.
- */
-static void
-ready_list_init(ParallelReadyList *ready_list, int tocCount)
-{
-	ready_list->tes = (TocEntry **)
-		pg_malloc(tocCount * sizeof(TocEntry *));
-	ready_list->first_te = 0;
-	ready_list->last_te = -1;
-	ready_list->sorted = false;
-}
-
-/*
- * Free storage for a ready_list.
- */
-static void
-ready_list_free(ParallelReadyList *ready_list)
-{
-	pg_free(ready_list->tes);
-}
-
-/* Add te to the ready_list */
-static void
-ready_list_insert(ParallelReadyList *ready_list, TocEntry *te)
-{
-	ready_list->tes[++ready_list->last_te] = te;
-	/* List is (probably) not sorted anymore. */
-	ready_list->sorted = false;
-}
-
-/* Remove the i'th entry in the ready_list */
-static void
-ready_list_remove(ParallelReadyList *ready_list, int i)
-{
-	int			f = ready_list->first_te;
-
-	Assert(i >= f && i <= ready_list->last_te);
-
-	/*
-	 * In the typical case where the item to be removed is the first ready
-	 * entry, we need only increment first_te to remove it.  Otherwise, move
-	 * the entries before it to compact the list.  (This preserves sortedness,
-	 * if any.)  We could alternatively move the entries after i, but there
-	 * are typically many more of those.
-	 */
-	if (i > f)
-	{
-		TocEntry  **first_te_ptr = &ready_list->tes[f];
-
-		memmove(first_te_ptr + 1, first_te_ptr, (i - f) * sizeof(TocEntry *));
-	}
-	ready_list->first_te++;
-}
-
-/* Sort the ready_list into the desired order */
-static void
-ready_list_sort(ParallelReadyList *ready_list)
-{
-	if (!ready_list->sorted)
-	{
-		int			n = ready_list->last_te - ready_list->first_te + 1;
-
-		if (n > 1)
-			qsort(ready_list->tes + ready_list->first_te, n,
-				  sizeof(TocEntry *),
-				  TocEntrySizeCompare);
-		ready_list->sorted = true;
-	}
-}
-
 /* qsort comparator for sorting TocEntries by dataLength */
 static int
 TocEntrySizeCompare(const void *p1, const void *p2)
@@ -4314,6 +4224,15 @@ TocEntrySizeCompare(const void *p1, const void *p2)
 	return 0;
 }
 
+static int
+TocEntrySizeCompareBinaryHeap(Datum p1, Datum p2, void *arg)
+{
+	const void *te1 = (const void *) DatumGetPointer(p1);
+	const void *te2 = (const void *) DatumGetPointer(p2);
+
+	return TocEntrySizeCompare(te1, te2);
+}
+
 
 /*
  * Move all immediately-ready items from pending_list to ready_list.
@@ -4324,7 +4243,7 @@ TocEntrySizeCompare(const void *p1, const void *p2)
  */
 static void
 move_to_ready_list(TocEntry *pending_list,
-				   ParallelReadyList *ready_list,
+				   binaryheap *ready_list,
 				   RestorePass pass)
 {
 	TocEntry   *te;
@@ -4341,7 +4260,7 @@ move_to_ready_list(TocEntry *pending_list,
 			/* Remove it from pending_list ... */
 			pending_list_remove(te);
 			/* ... and add to ready_list */
-			ready_list_insert(ready_list, te);
+			binaryheap_add(ready_list, PointerGetDatum(te));
 		}
 	}
 }
@@ -4358,20 +4277,16 @@ move_to_ready_list(TocEntry *pending_list,
  * no remaining dependencies, but we have to check for lock conflicts.
  */
 static TocEntry *
-pop_next_work_item(ParallelReadyList *ready_list,
+pop_next_work_item(binaryheap *ready_list,
 				   ParallelState *pstate)
 {
-	/*
-	 * Sort the ready_list so that we'll tackle larger jobs first.
-	 */
-	ready_list_sort(ready_list);
-
 	/*
 	 * Search the ready_list until we find a suitable item.
 	 */
-	for (int i = ready_list->first_te; i <= ready_list->last_te; i++)
+	for (int i = 0; i < binaryheap_size(ready_list); i++)
 	{
-		TocEntry   *te = ready_list->tes[i];
+		Datum		ted = binaryheap_get_node(ready_list, i);
+		TocEntry   *te = (TocEntry *) DatumGetPointer(ted);
 		bool		conflicts = false;
 
 		/*
@@ -4397,7 +4312,7 @@ pop_next_work_item(ParallelReadyList *ready_list,
 			continue;
 
 		/* passed all tests, so this item can run */
-		ready_list_remove(ready_list, i);
+		binaryheap_remove_node(ready_list, i);
 		return te;
 	}
 
@@ -4443,7 +4358,7 @@ mark_restore_job_done(ArchiveHandle *AH,
 					  int status,
 					  void *callback_data)
 {
-	ParallelReadyList *ready_list = (ParallelReadyList *) callback_data;
+	binaryheap *ready_list = (binaryheap *) callback_data;
 
 	pg_log_info("finished item %d %s %s",
 				te->dumpId, te->desc, te->tag);
@@ -4708,7 +4623,7 @@ identify_locking_dependencies(ArchiveHandle *AH, TocEntry *te)
  */
 static void
 reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
-					ParallelReadyList *ready_list)
+					binaryheap *ready_list)
 {
 	int			i;
 
@@ -4737,7 +4652,7 @@ reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
 			/* Remove it from pending list ... */
 			pending_list_remove(otherte);
 			/* ... and add to ready_list */
-			ready_list_insert(ready_list, otherte);
+			binaryheap_add(ready_list, PointerGetDatum(otherte));
 		}
 	}
 }
-- 
2.25.1

