diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c
index e9e8698..5774093 100644
*** a/src/bin/pg_dump/parallel.c
--- b/src/bin/pg_dump/parallel.c
***************
*** 35,43 ****
   * the required action (dump or restore) and returns a malloc'd status string.
   * The status string is passed back to the master where it is interpreted by
   * AH->MasterEndParallelItemPtr, another format-specific routine.  That
!  * function can update state or catalog information on the master's side,
   * depending on the reply from the worker process.  In the end it returns a
!  * status code, which is 0 for successful execution.
   *
   * Remember that we have forked off the workers only after we have read in
   * the catalog.  That's why our worker processes can also access the catalog
--- 35,45 ----
   * the required action (dump or restore) and returns a malloc'd status string.
   * The status string is passed back to the master where it is interpreted by
   * AH->MasterEndParallelItemPtr, another format-specific routine.  That
!  * function can update format-specific information on the master's side,
   * depending on the reply from the worker process.  In the end it returns a
!  * status code, which we pass to the ParallelCompletionPtr callback function
!  * that was passed to DispatchJobForTocEntry().  The callback function does
!  * state updating for the master control logic in pg_backup_archiver.c.
   *
   * Remember that we have forked off the workers only after we have read in
   * the catalog.  That's why our worker processes can also access the catalog
***************
*** 48,60 ****
   * In the master process, the workerStatus field for each worker has one of
   * the following values:
   *		WRKR_IDLE: it's waiting for a command
!  *		WRKR_WORKING: it's been sent a command
!  *		WRKR_FINISHED: it's returned a result
   *		WRKR_TERMINATED: process ended
-  * The FINISHED state indicates that the worker is idle, but we've not yet
-  * dealt with the status code it returned from the prior command.
-  * ReapWorkerStatus() extracts the unhandled command status value and sets
-  * the workerStatus back to WRKR_IDLE.
   */
  
  #include "postgres_fe.h"
--- 50,57 ----
   * In the master process, the workerStatus field for each worker has one of
   * the following values:
   *		WRKR_IDLE: it's waiting for a command
!  *		WRKR_WORKING: it's working on a command
   *		WRKR_TERMINATED: process ended
   */
  
  #include "postgres_fe.h"
***************
*** 75,80 ****
--- 72,79 ----
  #define PIPE_READ							0
  #define PIPE_WRITE							1
  
+ #define NO_SLOT (-1)			/* Failure result for GetIdleWorker() */
+ 
  #ifdef WIN32
  
  /*
*************** static void archive_close_connection(int
*** 135,143 ****
--- 134,145 ----
  static void ShutdownWorkersHard(ParallelState *pstate);
  static void WaitForTerminatingWorkers(ParallelState *pstate);
  static void RunWorker(ArchiveHandle *AH, int pipefd[2]);
+ static int	GetIdleWorker(ParallelState *pstate);
  static bool HasEveryWorkerTerminated(ParallelState *pstate);
  static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
  static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
+ static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate,
+ 				bool do_wait);
  static char *getMessageFromMaster(int pipefd[2]);
  static void sendMessageToMaster(int pipefd[2], const char *str);
  static int	select_loop(int maxFd, fd_set *workerset);
*************** archive_close_connection(int code, void 
*** 309,316 ****
  			 * fail to detect it because there would be no EOF condition on
  			 * the other end of the pipe.)
  			 */
! 			if (slot->args->AH)
! 				DisconnectDatabase(&(slot->args->AH->public));
  
  #ifdef WIN32
  			closesocket(slot->pipeRevRead);
--- 311,318 ----
  			 * fail to detect it because there would be no EOF condition on
  			 * the other end of the pipe.)
  			 */
! 			if (slot->AH)
! 				DisconnectDatabase(&(slot->AH->public));
  
  #ifdef WIN32
  			closesocket(slot->pipeRevRead);
*************** ParallelBackupStart(ArchiveHandle *AH)
*** 571,579 ****
  						  strerror(errno));
  
  		pstate->parallelSlot[i].workerStatus = WRKR_IDLE;
! 		pstate->parallelSlot[i].args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs));
! 		pstate->parallelSlot[i].args->AH = NULL;
! 		pstate->parallelSlot[i].args->te = NULL;
  
  		/* master's ends of the pipes */
  		pstate->parallelSlot[i].pipeRead = pipeWM[PIPE_READ];
--- 573,582 ----
  						  strerror(errno));
  
  		pstate->parallelSlot[i].workerStatus = WRKR_IDLE;
! 		pstate->parallelSlot[i].AH = NULL;
! 		pstate->parallelSlot[i].te = NULL;
! 		pstate->parallelSlot[i].callback = NULL;
! 		pstate->parallelSlot[i].callback_data = NULL;
  
  		/* master's ends of the pipes */
  		pstate->parallelSlot[i].pipeRead = pipeWM[PIPE_READ];
*************** ParallelBackupStart(ArchiveHandle *AH)
*** 628,637 ****
  			 * state information and also clones the database connection which
  			 * both seem kinda helpful.
  			 */
! 			pstate->parallelSlot[i].args->AH = CloneArchive(AH);
  
  			/* Run the worker ... */
! 			RunWorker(pstate->parallelSlot[i].args->AH, pipefd);
  
  			/* We can just exit(0) when done */
  			exit(0);
--- 631,640 ----
  			 * state information and also clones the database connection which
  			 * both seem kinda helpful.
  			 */
! 			pstate->parallelSlot[i].AH = CloneArchive(AH);
  
  			/* Run the worker ... */
! 			RunWorker(pstate->parallelSlot[i].AH, pipefd);
  
  			/* We can just exit(0) when done */
  			exit(0);
*************** ParallelBackupEnd(ArchiveHandle *AH, Par
*** 703,722 ****
  }
  
  /*
!  * Dispatch a job to some free worker (caller must ensure there is one!)
   *
   * te is the TocEntry to be processed, act is the action to be taken on it.
   */
  void
! DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te,
! 					   T_Action act)
  {
  	int			worker;
  	char	   *arg;
  
! 	/* our caller makes sure that at least one worker is idle */
! 	worker = GetIdleWorker(pstate);
! 	Assert(worker != NO_SLOT);
  
  	/* Construct and send command string */
  	arg = (AH->MasterStartParallelItemPtr) (AH, te, act);
--- 706,733 ----
  }
  
  /*
!  * Dispatch a job to some free worker.
   *
   * te is the TocEntry to be processed, act is the action to be taken on it.
+  * callback is the function to call on completion of the job.
+  *
+  * If no worker is currently available, this will block, and previously
+  * registered callback functions may be called.
   */
  void
! DispatchJobForTocEntry(ArchiveHandle *AH,
! 					   ParallelState *pstate,
! 					   TocEntry *te,
! 					   T_Action act,
! 					   ParallelCompletionPtr callback,
! 					   void *callback_data)
  {
  	int			worker;
  	char	   *arg;
  
! 	/* Get a worker, waiting if none are idle */
! 	while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
! 		WaitForWorkers(AH, pstate, WFW_ONE_IDLE);
  
  	/* Construct and send command string */
  	arg = (AH->MasterStartParallelItemPtr) (AH, te, act);
*************** DispatchJobForTocEntry(ArchiveHandle *AH
*** 727,740 ****
  
  	/* Remember worker is busy, and which TocEntry it's working on */
  	pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
! 	pstate->parallelSlot[worker].args->te = te;
  }
  
  /*
   * Find an idle worker and return its slot number.
   * Return NO_SLOT if none are idle.
   */
! int
  GetIdleWorker(ParallelState *pstate)
  {
  	int			i;
--- 738,753 ----
  
  	/* Remember worker is busy, and which TocEntry it's working on */
  	pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
! 	pstate->parallelSlot[worker].te = te;
! 	pstate->parallelSlot[worker].callback = callback;
! 	pstate->parallelSlot[worker].callback_data = callback_data;
  }
  
  /*
   * Find an idle worker and return its slot number.
   * Return NO_SLOT if none are idle.
   */
! static int
  GetIdleWorker(ParallelState *pstate)
  {
  	int			i;
*************** WaitForCommands(ArchiveHandle *AH, int p
*** 932,948 ****
   * immediately if there is none available.
   *
   * When we get a status message, we let MasterEndParallelItemPtr process it,
!  * then save the resulting status code and switch the worker's state to
!  * WRKR_FINISHED.  Later, caller must call ReapWorkerStatus() to verify
!  * that the status was "OK" and push the worker back to IDLE state.
   *
!  * XXX Rube Goldberg would be proud of this API, but no one else should be.
   *
   * XXX is it worth checking for more than one status message per call?
   * It seems somewhat unlikely that multiple workers would finish at exactly
   * the same time.
   */
! void
  ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
  {
  	int			worker;
--- 945,960 ----
   * immediately if there is none available.
   *
   * When we get a status message, we let MasterEndParallelItemPtr process it,
!  * then pass the resulting status code to the callback function that was
!  * specified to DispatchJobForTocEntry, then reset the worker status to IDLE.
   *
!  * Returns true if we collected a status message, else false.
   *
   * XXX is it worth checking for more than one status message per call?
   * It seems somewhat unlikely that multiple workers would finish at exactly
   * the same time.
   */
! static bool
  ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
  {
  	int			worker;
*************** ListenToWorkers(ArchiveHandle *AH, Paral
*** 956,989 ****
  		/* If do_wait is true, we must have detected EOF on some socket */
  		if (do_wait)
  			exit_horribly(modulename, "a worker process died unexpectedly\n");
! 		return;
  	}
  
  	/* Process it and update our idea of the worker's status */
  	if (messageStartsWith(msg, "OK "))
  	{
! 		TocEntry   *te = pstate->parallelSlot[worker].args->te;
  		char	   *statusString;
  
  		if (messageStartsWith(msg, "OK RESTORE "))
  		{
  			statusString = msg + strlen("OK RESTORE ");
! 			pstate->parallelSlot[worker].status =
  				(AH->MasterEndParallelItemPtr)
  				(AH, te, statusString, ACT_RESTORE);
  		}
  		else if (messageStartsWith(msg, "OK DUMP "))
  		{
  			statusString = msg + strlen("OK DUMP ");
! 			pstate->parallelSlot[worker].status =
  				(AH->MasterEndParallelItemPtr)
  				(AH, te, statusString, ACT_DUMP);
  		}
  		else
  			exit_horribly(modulename,
  						  "invalid message received from worker: \"%s\"\n",
  						  msg);
! 		pstate->parallelSlot[worker].workerStatus = WRKR_FINISHED;
  	}
  	else
  		exit_horribly(modulename,
--- 968,1006 ----
  		/* If do_wait is true, we must have detected EOF on some socket */
  		if (do_wait)
  			exit_horribly(modulename, "a worker process died unexpectedly\n");
! 		return false;
  	}
  
  	/* Process it and update our idea of the worker's status */
  	if (messageStartsWith(msg, "OK "))
  	{
! 		ParallelSlot *slot = &pstate->parallelSlot[worker];
! 		TocEntry   *te = slot->te;
  		char	   *statusString;
+ 		int			status;
  
  		if (messageStartsWith(msg, "OK RESTORE "))
  		{
  			statusString = msg + strlen("OK RESTORE ");
! 			status =
  				(AH->MasterEndParallelItemPtr)
  				(AH, te, statusString, ACT_RESTORE);
+ 			slot->callback(AH, te, status, slot->callback_data);
  		}
  		else if (messageStartsWith(msg, "OK DUMP "))
  		{
  			statusString = msg + strlen("OK DUMP ");
! 			status =
  				(AH->MasterEndParallelItemPtr)
  				(AH, te, statusString, ACT_DUMP);
+ 			slot->callback(AH, te, status, slot->callback_data);
  		}
  		else
  			exit_horribly(modulename,
  						  "invalid message received from worker: \"%s\"\n",
  						  msg);
! 		slot->workerStatus = WRKR_IDLE;
! 		slot->te = NULL;
  	}
  	else
  		exit_horribly(modulename,
*************** ListenToWorkers(ArchiveHandle *AH, Paral
*** 992,1101 ****
  
  	/* Free the string returned from getMessageFromWorker */
  	free(msg);
- }
- 
- /*
-  * Check to see if any worker is in WRKR_FINISHED state.  If so,
-  * return its command status code into *status, reset it to IDLE state,
-  * and return its slot number.  Otherwise return NO_SLOT.
-  *
-  * This function is executed in the master process.
-  */
- int
- ReapWorkerStatus(ParallelState *pstate, int *status)
- {
- 	int			i;
  
! 	for (i = 0; i < pstate->numWorkers; i++)
! 	{
! 		if (pstate->parallelSlot[i].workerStatus == WRKR_FINISHED)
! 		{
! 			*status = pstate->parallelSlot[i].status;
! 			pstate->parallelSlot[i].status = 0;
! 			pstate->parallelSlot[i].workerStatus = WRKR_IDLE;
! 			return i;
! 		}
! 	}
! 	return NO_SLOT;
  }
  
  /*
!  * Wait, if necessary, until we have at least one idle worker.
!  * Reap worker status as necessary to move FINISHED workers to IDLE state.
   *
!  * We assume that no extra processing is required when reaping a finished
!  * command, except for checking that the status was OK (zero).
!  * Caution: that assumption means that this function can only be used in
!  * parallel dump, not parallel restore, because the latter has a more
!  * complex set of rules about handling status.
   *
   * This function is executed in the master process.
   */
  void
! EnsureIdleWorker(ArchiveHandle *AH, ParallelState *pstate)
  {
! 	int			ret_worker;
! 	int			work_status;
  
! 	for (;;)
  	{
! 		int			nTerm = 0;
! 
! 		while ((ret_worker = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT)
! 		{
! 			if (work_status != 0)
! 				exit_horribly(modulename, "error processing a parallel work item\n");
! 
! 			nTerm++;
! 		}
! 
! 		/*
! 		 * We need to make sure that we have an idle worker before dispatching
! 		 * the next item. If nTerm > 0 we already have that (quick check).
! 		 */
! 		if (nTerm > 0)
! 			return;
! 
! 		/* explicit check for an idle worker */
! 		if (GetIdleWorker(pstate) != NO_SLOT)
! 			return;
  
  		/*
! 		 * If we have no idle worker, read the result of one or more workers
! 		 * and loop the loop to call ReapWorkerStatus() on them
  		 */
! 		ListenToWorkers(AH, pstate, true);
! 	}
! }
! 
! /*
!  * Wait for all workers to be idle.
!  * Reap worker status as necessary to move FINISHED workers to IDLE state.
!  *
!  * We assume that no extra processing is required when reaping a finished
!  * command, except for checking that the status was OK (zero).
!  * Caution: that assumption means that this function can only be used in
!  * parallel dump, not parallel restore, because the latter has a more
!  * complex set of rules about handling status.
!  *
!  * This function is executed in the master process.
!  */
! void
! EnsureWorkersFinished(ArchiveHandle *AH, ParallelState *pstate)
! {
! 	int			work_status;
  
! 	if (!pstate || pstate->numWorkers == 1)
! 		return;
  
! 	/* Waiting for the remaining worker processes to finish */
! 	while (!IsEveryWorkerIdle(pstate))
! 	{
! 		if (ReapWorkerStatus(pstate, &work_status) == NO_SLOT)
! 			ListenToWorkers(AH, pstate, true);
! 		else if (work_status != 0)
! 			exit_horribly(modulename,
! 						  "error processing a parallel work item\n");
  	}
  }
  
--- 1009,1087 ----
  
  	/* Free the string returned from getMessageFromWorker */
  	free(msg);
  
! 	return true;
  }
  
  /*
!  * Check for status results from workers, waiting if necessary.
   *
!  * Available wait modes are:
!  * WFW_NO_WAIT: reap any available status, but don't block
!  * WFW_GOT_STATUS: wait for at least one more worker to finish
!  * WFW_ONE_IDLE: wait for at least one worker to be idle
!  * WFW_ALL_IDLE: wait for all workers to be idle
!  *
!  * Any received results are passed to MasterEndParallelItemPtr and then
!  * to the callback specified to DispatchJobForTocEntry.
   *
   * This function is executed in the master process.
   */
  void
! WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
  {
! 	bool		do_wait = false;
  
! 	/*
! 	 * In GOT_STATUS mode, always block waiting for a message, since we can't
! 	 * return till we get something.  In other modes, we don't block the first
! 	 * time through the loop.
! 	 */
! 	if (mode == WFW_GOT_STATUS)
  	{
! 		/* Assert that caller knows what it's doing */
! 		Assert(!IsEveryWorkerIdle(pstate));
! 		do_wait = true;
! 	}
  
+ 	for (;;)
+ 	{
  		/*
! 		 * Check for status messages, even if we don't need to block.  We do
! 		 * not try very hard to reap all available messages, though, since
! 		 * there's unlikely to be more than one.
  		 */
! 		if (ListenToWorkers(AH, pstate, do_wait))
! 		{
! 			/*
! 			 * If we got a message, we are done by definition for GOT_STATUS
! 			 * mode, and we can also be certain that there's at least one idle
! 			 * worker.  So we're done in all but ALL_IDLE mode.
! 			 */
! 			if (mode != WFW_ALL_IDLE)
! 				return;
! 		}
  
! 		/* Check whether we must wait for new status messages */
! 		switch (mode)
! 		{
! 			case WFW_NO_WAIT:
! 				return;			/* never wait */
! 			case WFW_GOT_STATUS:
! 				Assert(false);	/* can't get here, because we waited */
! 				break;
! 			case WFW_ONE_IDLE:
! 				if (GetIdleWorker(pstate) != NO_SLOT)
! 					return;
! 				break;
! 			case WFW_ALL_IDLE:
! 				if (IsEveryWorkerIdle(pstate))
! 					return;
! 				break;
! 		}
  
! 		/* Loop back, and this time wait for something to happen */
! 		do_wait = true;
  	}
  }
  
diff --git a/src/bin/pg_dump/parallel.h b/src/bin/pg_dump/parallel.h
index 8d70428..7be0f7c 100644
*** a/src/bin/pg_dump/parallel.h
--- b/src/bin/pg_dump/parallel.h
***************
*** 2,15 ****
   *
   * parallel.h
   *
!  *	Parallel support header file for the pg_dump archiver
   *
   * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
   * Portions Copyright (c) 1994, Regents of the University of California
   *
-  *	The author is not responsible for loss or damages that may
-  *	result from its use.
-  *
   * IDENTIFICATION
   *		src/bin/pg_dump/parallel.h
   *
--- 2,12 ----
   *
   * parallel.h
   *
!  *	Parallel support for pg_dump and pg_restore
   *
   * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
   * Portions Copyright (c) 1994, Regents of the University of California
   *
   * IDENTIFICATION
   *		src/bin/pg_dump/parallel.h
   *
***************
*** 21,51 ****
  
  #include "pg_backup_archiver.h"
  
  typedef enum
  {
- 	WRKR_TERMINATED = 0,
  	WRKR_IDLE,
  	WRKR_WORKING,
! 	WRKR_FINISHED
  } T_WorkerStatus;
  
! /* Arguments needed for a worker process */
! typedef struct ParallelArgs
! {
! 	ArchiveHandle *AH;
! 	TocEntry   *te;
! } ParallelArgs;
! 
! /* State for each parallel activity slot */
  typedef struct ParallelSlot
  {
! 	ParallelArgs *args;
! 	T_WorkerStatus workerStatus;
! 	int			status;
  	int			pipeRead;		/* master's end of the pipes */
  	int			pipeWrite;
  	int			pipeRevRead;	/* child's end of the pipes */
  	int			pipeRevWrite;
  #ifdef WIN32
  	uintptr_t	hThread;
  	unsigned int threadId;
--- 18,70 ----
  
  #include "pg_backup_archiver.h"
  
+ /* Function to call in master process on completion of a worker task */
+ typedef void (*ParallelCompletionPtr) (ArchiveHandle *AH,
+ 												   TocEntry *te,
+ 												   int status,
+ 												   void *callback_data);
+ 
+ /* Wait options for WaitForWorkers */
+ typedef enum
+ {
+ 	WFW_NO_WAIT,
+ 	WFW_GOT_STATUS,
+ 	WFW_ONE_IDLE,
+ 	WFW_ALL_IDLE
+ } WFW_WaitOption;
+ 
+ /* Worker process statuses */
  typedef enum
  {
  	WRKR_IDLE,
  	WRKR_WORKING,
! 	WRKR_TERMINATED
  } T_WorkerStatus;
  
! /*
!  * Per-parallel-worker state of parallel.c.
!  *
!  * Much of this is valid only in the master process (or, on Windows, should
!  * be touched only by the master thread).  But the AH field should be touched
!  * only by workers.  The pipe descriptors are valid everywhere.
!  */
  typedef struct ParallelSlot
  {
! 	T_WorkerStatus workerStatus;	/* see enum above */
! 
! 	/* These fields are valid if workerStatus == WRKR_WORKING: */
! 	TocEntry   *te;				/* item being worked on */
! 	ParallelCompletionPtr callback;		/* function to call on completion */
! 	void	   *callback_data;	/* passthru data for it */
! 
! 	ArchiveHandle *AH;			/* Archive data worker is using */
! 
  	int			pipeRead;		/* master's end of the pipes */
  	int			pipeWrite;
  	int			pipeRevRead;	/* child's end of the pipes */
  	int			pipeRevWrite;
+ 
+ 	/* Child process/thread identity info: */
  #ifdef WIN32
  	uintptr_t	hThread;
  	unsigned int threadId;
*************** typedef struct ParallelSlot
*** 54,65 ****
  #endif
  } ParallelSlot;
  
! #define NO_SLOT (-1)
! 
  typedef struct ParallelState
  {
! 	int			numWorkers;
! 	ParallelSlot *parallelSlot;
  } ParallelState;
  
  #ifdef WIN32
--- 73,83 ----
  #endif
  } ParallelSlot;
  
! /* Overall state for parallel.c */
  typedef struct ParallelState
  {
! 	int			numWorkers;		/* allowed number of workers */
! 	ParallelSlot *parallelSlot; /* array of numWorkers slots */
  } ParallelState;
  
  #ifdef WIN32
*************** extern DWORD mainThreadId;
*** 69,85 ****
  
  extern void init_parallel_dump_utils(void);
  
- extern int	GetIdleWorker(ParallelState *pstate);
  extern bool IsEveryWorkerIdle(ParallelState *pstate);
! extern void ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait);
! extern int	ReapWorkerStatus(ParallelState *pstate, int *status);
! extern void EnsureIdleWorker(ArchiveHandle *AH, ParallelState *pstate);
! extern void EnsureWorkersFinished(ArchiveHandle *AH, ParallelState *pstate);
  
  extern ParallelState *ParallelBackupStart(ArchiveHandle *AH);
  extern void DispatchJobForTocEntry(ArchiveHandle *AH,
  					   ParallelState *pstate,
! 					   TocEntry *te, T_Action act);
  extern void ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate);
  
  extern void checkAborting(ArchiveHandle *AH);
--- 87,103 ----
  
  extern void init_parallel_dump_utils(void);
  
  extern bool IsEveryWorkerIdle(ParallelState *pstate);
! extern void WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate,
! 			   WFW_WaitOption mode);
  
  extern ParallelState *ParallelBackupStart(ArchiveHandle *AH);
  extern void DispatchJobForTocEntry(ArchiveHandle *AH,
  					   ParallelState *pstate,
! 					   TocEntry *te,
! 					   T_Action act,
! 					   ParallelCompletionPtr callback,
! 					   void *callback_data);
  extern void ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate);
  
  extern void checkAborting(ArchiveHandle *AH);
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index ad8e132..83e899b 100644
*** a/src/bin/pg_dump/pg_backup_archiver.c
--- b/src/bin/pg_dump/pg_backup_archiver.c
*************** static void par_list_remove(TocEntry *te
*** 97,105 ****
  static TocEntry *get_next_work_item(ArchiveHandle *AH,
  				   TocEntry *ready_list,
  				   ParallelState *pstate);
! static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
! 			   int worker, int status,
! 			   ParallelState *pstate);
  static void fix_dependencies(ArchiveHandle *AH);
  static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
  static void repoint_table_dependencies(ArchiveHandle *AH);
--- 97,110 ----
  static TocEntry *get_next_work_item(ArchiveHandle *AH,
  				   TocEntry *ready_list,
  				   ParallelState *pstate);
! static void mark_dump_job_done(ArchiveHandle *AH,
! 				   TocEntry *te,
! 				   int status,
! 				   void *callback_data);
! static void mark_restore_job_done(ArchiveHandle *AH,
! 					  TocEntry *te,
! 					  int status,
! 					  void *callback_data);
  static void fix_dependencies(ArchiveHandle *AH);
  static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
  static void repoint_table_dependencies(ArchiveHandle *AH);
*************** WriteDataChunks(ArchiveHandle *AH, Paral
*** 2348,2355 ****
  			 * If we are in a parallel backup, then we are always the master
  			 * process.  Dispatch each data-transfer job to a worker.
  			 */
! 			EnsureIdleWorker(AH, pstate);
! 			DispatchJobForTocEntry(AH, pstate, te, ACT_DUMP);
  		}
  		else
  			WriteDataChunksForTocEntry(AH, te);
--- 2353,2360 ----
  			 * If we are in a parallel backup, then we are always the master
  			 * process.  Dispatch each data-transfer job to a worker.
  			 */
! 			DispatchJobForTocEntry(AH, pstate, te, ACT_DUMP,
! 								   mark_dump_job_done, NULL);
  		}
  		else
  			WriteDataChunksForTocEntry(AH, te);
*************** WriteDataChunks(ArchiveHandle *AH, Paral
*** 2358,2366 ****
  	/*
  	 * If parallel, wait for workers to finish.
  	 */
! 	EnsureWorkersFinished(AH, pstate);
  }
  
  void
  WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
  {
--- 2363,2394 ----
  	/*
  	 * If parallel, wait for workers to finish.
  	 */
! 	if (pstate && pstate->numWorkers > 1)
! 		WaitForWorkers(AH, pstate, WFW_ALL_IDLE);
! }
! 
! 
! /*
!  * Callback function that's invoked in the master process after a step has
!  * been parallel dumped.
!  *
!  * We don't need to do anything except check for worker failure.
!  */
! static void
! mark_dump_job_done(ArchiveHandle *AH,
! 				   TocEntry *te,
! 				   int status,
! 				   void *callback_data)
! {
! 	ahlog(AH, 1, "finished item %d %s %s\n",
! 		  te->dumpId, te->desc, te->tag);
! 
! 	if (status != 0)
! 		exit_horribly(modulename, "worker process failed: exit code %d\n",
! 					  status);
  }
  
+ 
  void
  WriteDataChunksForTocEntry(ArchiveHandle *AH, TocEntry *te)
  {
*************** static void
*** 3744,3754 ****
  restore_toc_entries_parallel(ArchiveHandle *AH, ParallelState *pstate,
  							 TocEntry *pending_list)
  {
- 	int			work_status;
  	bool		skipped_some;
  	TocEntry	ready_list;
  	TocEntry   *next_work_item;
- 	int			ret_child;
  
  	ahlog(AH, 2, "entering restore_toc_entries_parallel\n");
  
--- 3772,3780 ----
*************** restore_toc_entries_parallel(ArchiveHand
*** 3825,3878 ****
  
  			par_list_remove(next_work_item);
  
! 			DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE);
  		}
  		else
  		{
  			/* at least one child is working and we have nothing ready. */
  		}
  
! 		for (;;)
! 		{
! 			int			nTerm = 0;
! 
! 			/*
! 			 * In order to reduce dependencies as soon as possible and
! 			 * especially to reap the status of workers who are working on
! 			 * items that pending items depend on, we do a non-blocking check
! 			 * for ended workers first.
! 			 *
! 			 * However, if we do not have any other work items currently that
! 			 * workers can work on, we do not busy-loop here but instead
! 			 * really wait for at least one worker to terminate. Hence we call
! 			 * ListenToWorkers(..., ..., do_wait = true) in this case.
! 			 */
! 			ListenToWorkers(AH, pstate, !next_work_item);
! 
! 			while ((ret_child = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT)
! 			{
! 				nTerm++;
! 				mark_work_done(AH, &ready_list, ret_child, work_status, pstate);
! 			}
! 
! 			/*
! 			 * We need to make sure that we have an idle worker before
! 			 * re-running the loop. If nTerm > 0 we already have that (quick
! 			 * check).
! 			 */
! 			if (nTerm > 0)
! 				break;
! 
! 			/* if nobody terminated, explicitly check for an idle worker */
! 			if (GetIdleWorker(pstate) != NO_SLOT)
! 				break;
! 
! 			/*
! 			 * If we have no idle worker, read the result of one or more
! 			 * workers and loop the loop to call ReapWorkerStatus() on them.
! 			 */
! 			ListenToWorkers(AH, pstate, true);
! 		}
  	}
  
  	ahlog(AH, 1, "finished main parallel loop\n");
--- 3851,3879 ----
  
  			par_list_remove(next_work_item);
  
! 			DispatchJobForTocEntry(AH, pstate, next_work_item, ACT_RESTORE,
! 								   mark_restore_job_done, &ready_list);
  		}
  		else
  		{
  			/* at least one child is working and we have nothing ready. */
  		}
  
! 		/*
! 		 * Before dispatching another job, check to see if anything has
! 		 * finished.  We should check every time through the loop so as to
! 		 * reduce dependencies as soon as possible.  If we were unable to
! 		 * dispatch any job this time through, wait until some worker finishes
! 		 * (and, hopefully, unblocks some pending item).  If we did dispatch
! 		 * something, continue as soon as there's at least one idle worker.
! 		 * Note that in either case, there's guaranteed to be at least one
! 		 * idle worker when we return to the top of the loop.  This ensures we
! 		 * won't block inside DispatchJobForTocEntry, which would be
! 		 * undesirable: we'd rather postpone dispatching until we see what's
! 		 * been unblocked by finished jobs.
! 		 */
! 		WaitForWorkers(AH, pstate,
! 					   next_work_item ? WFW_ONE_IDLE : WFW_GOT_STATUS);
  	}
  
  	ahlog(AH, 1, "finished main parallel loop\n");
*************** get_next_work_item(ArchiveHandle *AH, To
*** 3999,4007 ****
  		int			count = 0;
  
  		for (k = 0; k < pstate->numWorkers; k++)
! 			if (pstate->parallelSlot[k].args->te != NULL &&
! 				pstate->parallelSlot[k].args->te->section == SECTION_DATA)
  				count++;
  		if (pstate->numWorkers == 0 || count * 4 < pstate->numWorkers)
  			pref_non_data = false;
  	}
--- 4000,4010 ----
  		int			count = 0;
  
  		for (k = 0; k < pstate->numWorkers; k++)
! 		{
! 			if (pstate->parallelSlot[k].workerStatus == WRKR_WORKING &&
! 				pstate->parallelSlot[k].te->section == SECTION_DATA)
  				count++;
+ 		}
  		if (pstate->numWorkers == 0 || count * 4 < pstate->numWorkers)
  			pref_non_data = false;
  	}
*************** get_next_work_item(ArchiveHandle *AH, To
*** 4018,4030 ****
  		 * that a currently running item also needs lock on, or vice versa. If
  		 * so, we don't want to schedule them together.
  		 */
! 		for (i = 0; i < pstate->numWorkers && !conflicts; i++)
  		{
  			TocEntry   *running_te;
  
  			if (pstate->parallelSlot[i].workerStatus != WRKR_WORKING)
  				continue;
! 			running_te = pstate->parallelSlot[i].args->te;
  
  			if (has_lock_conflicts(te, running_te) ||
  				has_lock_conflicts(running_te, te))
--- 4021,4033 ----
  		 * that a currently running item also needs lock on, or vice versa. If
  		 * so, we don't want to schedule them together.
  		 */
! 		for (i = 0; i < pstate->numWorkers; i++)
  		{
  			TocEntry   *running_te;
  
  			if (pstate->parallelSlot[i].workerStatus != WRKR_WORKING)
  				continue;
! 			running_te = pstate->parallelSlot[i].te;
  
  			if (has_lock_conflicts(te, running_te) ||
  				has_lock_conflicts(running_te, te))
*************** get_next_work_item(ArchiveHandle *AH, To
*** 4065,4074 ****
   * our work is finished, the master process will assign us a new work item.
   */
  int
! parallel_restore(ParallelArgs *args)
  {
- 	ArchiveHandle *AH = args->AH;
- 	TocEntry   *te = args->te;
  	int			status;
  
  	_doSetFixedOutputState(AH);
--- 4068,4075 ----
   * our work is finished, the master process will assign us a new work item.
   */
  int
! parallel_restore(ArchiveHandle *AH, TocEntry *te)
  {
  	int			status;
  
  	_doSetFixedOutputState(AH);
*************** parallel_restore(ParallelArgs *args)
*** 4085,4106 ****
  
  
  /*
!  * Housekeeping to be done after a step has been parallel restored.
   *
!  * Clear the appropriate slot, free all the extra memory we allocated,
!  * update status, and reduce the dependency count of any dependent items.
   */
  static void
! mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
! 			   int worker, int status,
! 			   ParallelState *pstate)
  {
! 	TocEntry   *te = NULL;
! 
! 	te = pstate->parallelSlot[worker].args->te;
! 
! 	if (te == NULL)
! 		exit_horribly(modulename, "could not find slot of finished worker\n");
  
  	ahlog(AH, 1, "finished item %d %s %s\n",
  		  te->dumpId, te->desc, te->tag);
--- 4086,4103 ----
  
  
  /*
!  * Callback function that's invoked in the master process after a step has
!  * been parallel restored.
   *
!  * Update status and reduce the dependency count of any dependent items.
   */
  static void
! mark_restore_job_done(ArchiveHandle *AH,
! 					  TocEntry *te,
! 					  int status,
! 					  void *callback_data)
  {
! 	TocEntry   *ready_list = (TocEntry *) callback_data;
  
  	ahlog(AH, 1, "finished item %d %s %s\n",
  		  te->dumpId, te->desc, te->tag);
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index 4aa7190..c3e64cb 100644
*** a/src/bin/pg_dump/pg_backup_archiver.h
--- b/src/bin/pg_dump/pg_backup_archiver.h
*************** typedef z_stream *z_streamp;
*** 111,117 ****
  
  typedef struct _archiveHandle ArchiveHandle;
  typedef struct _tocEntry TocEntry;
- struct ParallelArgs;
  struct ParallelState;
  
  #define READ_ERROR_EXIT(fd) \
--- 111,116 ----
*************** struct _tocEntry
*** 372,378 ****
  	int			nLockDeps;		/* number of such dependencies */
  };
  
! extern int	parallel_restore(struct ParallelArgs *args);
  extern void on_exit_close_archive(Archive *AHX);
  
  extern void warn_or_exit_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt,...) pg_attribute_printf(3, 4);
--- 371,377 ----
  	int			nLockDeps;		/* number of such dependencies */
  };
  
! extern int	parallel_restore(ArchiveHandle *AH, TocEntry *te);
  extern void on_exit_close_archive(Archive *AHX);
  
  extern void warn_or_exit_horribly(ArchiveHandle *AH, const char *modulename, const char *fmt,...) pg_attribute_printf(3, 4);
diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c
index 66329dc..c4f487a 100644
*** a/src/bin/pg_dump/pg_backup_custom.c
--- b/src/bin/pg_dump/pg_backup_custom.c
*************** _WorkerJobRestoreCustom(ArchiveHandle *A
*** 820,832 ****
  	 */
  	const int	buflen = 64;
  	char	   *buf = (char *) pg_malloc(buflen);
- 	ParallelArgs pargs;
  	int			status;
  
! 	pargs.AH = AH;
! 	pargs.te = te;
! 
! 	status = parallel_restore(&pargs);
  
  	snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status,
  			 status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
--- 820,828 ----
  	 */
  	const int	buflen = 64;
  	char	   *buf = (char *) pg_malloc(buflen);
  	int			status;
  
! 	status = parallel_restore(AH, te);
  
  	snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status,
  			 status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index 27c6190..129d761 100644
*** a/src/bin/pg_dump/pg_backup_directory.c
--- b/src/bin/pg_dump/pg_backup_directory.c
*************** _WorkerJobRestoreDirectory(ArchiveHandle
*** 838,850 ****
  	 */
  	const int	buflen = 64;
  	char	   *buf = (char *) pg_malloc(buflen);
- 	ParallelArgs pargs;
  	int			status;
  
! 	pargs.AH = AH;
! 	pargs.te = te;
! 
! 	status = parallel_restore(&pargs);
  
  	snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status,
  			 status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
--- 838,846 ----
  	 */
  	const int	buflen = 64;
  	char	   *buf = (char *) pg_malloc(buflen);
  	int			status;
  
! 	status = parallel_restore(AH, te);
  
  	snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status,
  			 status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
