From 4051dfd56fa68948c3864634f8e20fa4d386e94e Mon Sep 17 00:00:00 2001 From: Bryan Green Date: Sun, 28 Jun 2026 12:05:10 -0500 Subject: [PATCH v1 3/4] pg_dump: dispatch parallel workers in-process on Windows On Windows the parallel workers are threads in the leader's process, but they reached the leader through the same transport as Unix worker processes: pgpipe() (a loopback TCP socket pair), select(), and a byte-at-a-time string protocol. That is pointless when the workers share the leader's address space. Give each ParallelSlot an in-process channel instead -- a command slot and a response slot under a critical section, with one condition variable to wake the worker and one to wake the leader. Messages pass as malloc'd strings; dispatch writes to memory rather than a socket. The pipe also signalled worker death through EOF. The channel has none, and exit_nicely() ends only the worker thread, so a failed worker -- for instance a connection that fails when -j exceeds max_connections -- would hang the leader forever. A dying worker now sets slot->workerDied and wakes the leader, and getMessageFromWorker() returns NULL just as the Unix path does on EOF. For the leader to name the dead worker, each worker records its own thread id on entry rather than trusting _beginthreadex()'s output argument, which can lag the thread's start. The Unix path is untouched; the now-unused pgpipe(), select_loop(), and readMessageFromPipe() stay until the worker model is unified on threads. --- src/bin/pg_dump/parallel.c | 197 +++++++++++++++++++++++++++++++------ 1 file changed, 167 insertions(+), 30 deletions(-) diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c index 12b462375d..552a8c149e 100644 --- a/src/bin/pg_dump/parallel.c +++ b/src/bin/pg_dump/parallel.c @@ -107,6 +107,19 @@ struct ParallelSlot int pipeRevRead; /* child's end of the pipes */ int pipeRevWrite; +#ifdef WIN32 + + /* + * In-process channel used instead of the pipes when workers are threads. + * A message is a malloc'd string; ownership passes to the receiver. + * Protected by msg_lock. + */ + char *cmdMsg; /* command pending for the worker, or NULL */ + char *respMsg; /* response pending for the leader, or NULL */ + bool chanClosed; /* leader closed the command channel (EOF) */ + bool workerDied; /* worker exited without sending a response */ +#endif + /* Child process/thread identity info: */ #ifdef WIN32 uintptr_t hThread; @@ -176,6 +189,15 @@ static volatile DumpSignalInformation signal_info; #ifdef WIN32 static CRITICAL_SECTION signal_info_lock; + +/* + * Synchronization for the in-process channels (see struct ParallelSlot). + * msg_lock protects the per-slot cmdMsg/respMsg/chanClosed/workerDied fields; + * worker_cv wakes a worker, leader_cv wakes the leader. + */ +static CRITICAL_SECTION msg_lock; +static CONDITION_VARIABLE worker_cv; +static CONDITION_VARIABLE leader_cv; #endif /* @@ -210,11 +232,11 @@ static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot); static int GetIdleWorker(ParallelState *pstate); static bool HasEveryWorkerTerminated(ParallelState *pstate); static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te); -static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]); +static void WaitForCommands(ArchiveHandle *AH, ParallelSlot *slot); static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait); -static char *getMessageFromLeader(int pipefd[2]); -static void sendMessageToLeader(int pipefd[2], const char *str); +static char *getMessageFromLeader(ParallelSlot *slot); +static void sendMessageToLeader(ParallelSlot *slot, const char *str); static int select_loop(int maxFd, fd_set *workerset); static char *getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker); @@ -242,6 +264,11 @@ init_parallel_dump_utils(void) mainThreadId = GetCurrentThreadId(); + /* Initialize the in-process message-channel synchronization */ + InitializeCriticalSection(&msg_lock); + InitializeConditionVariable(&worker_cv); + InitializeConditionVariable(&leader_cv); + /* Initialize socket access */ err = WSAStartup(MAKEWORD(2, 2), &wsaData); if (err != 0) @@ -314,20 +341,21 @@ archive_close_connection(int code, void *arg) else { /* - * We're a worker. Shut down our own DB connection if any. On - * Windows, we also have to close our communication sockets, to - * emulate what will happen on Unix when the worker process exits. - * (Without this, if this is a premature exit, the leader would - * fail to detect it because there would be no EOF condition on - * the other end of the pipe.) + * We're a worker. Shut down our own DB connection if any. */ if (slot->AH) DisconnectDatabase(&(slot->AH->public)); -#ifdef WIN32 - closesocket(slot->pipeRevRead); - closesocket(slot->pipeRevWrite); -#endif + /* + * Tell the leader we're gone so it stops waiting for our reply. On + * Unix the worker's exit closes the pipe and the leader sees EOF; + * the channel has no EOF, and exit_nicely() ends only this thread, + * so signal it here. + */ + EnterCriticalSection(&msg_lock); + slot->workerDied = true; + WakeAllConditionVariable(&leader_cv); + LeaveCriticalSection(&msg_lock); } } else @@ -351,6 +379,17 @@ ShutdownWorkersHard(ParallelState *pstate) { int i; + /* + * Tell any workers that are waiting for commands that they can exit. + */ +#ifdef WIN32 + EnterCriticalSection(&msg_lock); + for (i = 0; i < pstate->numWorkers; i++) + pstate->parallelSlot[i].chanClosed = true; + WakeAllConditionVariable(&worker_cv); + LeaveCriticalSection(&msg_lock); +#else + /* * Close our write end of the sockets so that any workers waiting for * commands know they can exit. (Note: some of the pipeWrite fields might @@ -359,6 +398,7 @@ ShutdownWorkersHard(ParallelState *pstate) */ for (i = 0; i < pstate->numWorkers; i++) closesocket(pstate->parallelSlot[i].pipeWrite); +#endif /* * Force early termination of any commands currently in progress. @@ -783,12 +823,6 @@ set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH) static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot) { - int pipefd[2]; - - /* fetch child ends of pipes */ - pipefd[PIPE_READ] = slot->pipeRevRead; - pipefd[PIPE_WRITE] = slot->pipeRevWrite; - /* * Clone the archive so that we have our own state to work with, and in * particular our own database connection. @@ -811,7 +845,7 @@ RunWorker(ArchiveHandle *AH, ParallelSlot *slot) /* * Execute commands until done. */ - WaitForCommands(AH, pipefd); + WaitForCommands(AH, slot); /* * Disconnect from database and clean up. @@ -834,6 +868,15 @@ init_spawned_worker_win32(WorkerInfo *wi) /* Don't need WorkerInfo anymore */ free(wi); + /* + * Record our thread id so GetMyPSlot() can tell us from the leader. Do + * this before anything that might call exit_nicely(): the cleanup handler + * uses GetMyPSlot(), and mistaking a failing worker for the leader + * deadlocks shutdown. We can't trust the leader to have stored the id + * from _beginthreadex() yet, since this thread may run before that returns. + */ + slot->threadId = GetCurrentThreadId(); + /* Run the worker ... */ RunWorker(AH, slot); @@ -899,11 +942,12 @@ ParallelBackupStart(ArchiveHandle *AH) uintptr_t handle; #else pid_t pid; -#endif - ParallelSlot *slot = &(pstate->parallelSlot[i]); int pipeMW[2], pipeWM[2]; +#endif + ParallelSlot *slot = &(pstate->parallelSlot[i]); +#ifndef WIN32 /* Create communication pipes for this worker */ if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0) pg_fatal("could not create communication channels: %m"); @@ -914,6 +958,7 @@ ParallelBackupStart(ArchiveHandle *AH) /* child's ends of the pipes */ slot->pipeRevRead = pipeMW[PIPE_READ]; slot->pipeRevWrite = pipeWM[PIPE_WRITE]; +#endif #ifdef WIN32 /* Create transient structure to pass args to worker function */ @@ -922,8 +967,13 @@ ParallelBackupStart(ArchiveHandle *AH) wi->AH = AH; wi->slot = slot; + /* + * The worker stores its own thread id (see init_spawned_worker_win32), + * so don't ask _beginthreadex() to report it -- that would race its + * store. + */ handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32, - wi, 0, &(slot->threadId)); + wi, 0, NULL); if (handle == 0) pg_fatal("could not create worker thread: %m"); slot->hThread = handle; @@ -1019,12 +1069,21 @@ ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate) /* There should not be any unfinished jobs */ Assert(IsEveryWorkerIdle(pstate)); + /* Tell the workers they can exit */ +#ifdef WIN32 + EnterCriticalSection(&msg_lock); + for (i = 0; i < pstate->numWorkers; i++) + pstate->parallelSlot[i].chanClosed = true; + WakeAllConditionVariable(&worker_cv); + LeaveCriticalSection(&msg_lock); +#else /* Close the sockets so that the workers know they can exit */ for (i = 0; i < pstate->numWorkers; i++) { closesocket(pstate->parallelSlot[i].pipeRead); closesocket(pstate->parallelSlot[i].pipeWrite); } +#endif /* Wait for them to exit */ WaitForTerminatingWorkers(pstate); @@ -1285,7 +1344,7 @@ lockTableForWorker(ArchiveHandle *AH, TocEntry *te) * Read and execute commands from the leader until we see EOF on the pipe. */ static void -WaitForCommands(ArchiveHandle *AH, int pipefd[2]) +WaitForCommands(ArchiveHandle *AH, ParallelSlot *slot) { char *command; TocEntry *te; @@ -1295,7 +1354,7 @@ WaitForCommands(ArchiveHandle *AH, int pipefd[2]) for (;;) { - if (!(command = getMessageFromLeader(pipefd))) + if (!(command = getMessageFromLeader(slot))) { /* EOF, so done */ return; @@ -1323,7 +1382,7 @@ WaitForCommands(ArchiveHandle *AH, int pipefd[2]) /* Return status to leader */ buildWorkerResponse(AH, te, act, status, buf, sizeof(buf)); - sendMessageToLeader(pipefd, buf); + sendMessageToLeader(slot, buf); /* command was pg_malloc'd and we are responsible for free()ing it. */ free(command); @@ -1465,9 +1524,21 @@ WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode) * This function is executed in worker processes. */ static char * -getMessageFromLeader(int pipefd[2]) +getMessageFromLeader(ParallelSlot *slot) { - return readMessageFromPipe(pipefd[PIPE_READ]); +#ifdef WIN32 + char *msg; + + EnterCriticalSection(&msg_lock); + while (slot->cmdMsg == NULL && !slot->chanClosed) + SleepConditionVariableCS(&worker_cv, &msg_lock, INFINITE); + msg = slot->cmdMsg; /* NULL here means the channel was closed */ + slot->cmdMsg = NULL; + LeaveCriticalSection(&msg_lock); + return msg; +#else + return readMessageFromPipe(slot->pipeRevRead); +#endif } /* @@ -1476,12 +1547,20 @@ getMessageFromLeader(int pipefd[2]) * This function is executed in worker processes. */ static void -sendMessageToLeader(int pipefd[2], const char *str) +sendMessageToLeader(ParallelSlot *slot, const char *str) { +#ifdef WIN32 + EnterCriticalSection(&msg_lock); + Assert(slot->respMsg == NULL); + slot->respMsg = pg_strdup(str); + WakeAllConditionVariable(&leader_cv); + LeaveCriticalSection(&msg_lock); +#else int len = strlen(str) + 1; - if (pipewrite(pipefd[PIPE_WRITE], str, len) != len) + if (pipewrite(slot->pipeRevWrite, str, len) != len) pg_fatal("could not write to the communication channel: %m"); +#endif } /* @@ -1530,6 +1609,53 @@ select_loop(int maxFd, fd_set *workerset) static char * getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker) { +#ifdef WIN32 + int i; + + /* + * Return the first pending response; if none and do_wait, sleep on + * leader_cv until a worker posts one. + */ + EnterCriticalSection(&msg_lock); + for (;;) + { + bool anyDied = false; + + for (i = 0; i < pstate->numWorkers; i++) + { + char *msg; + + if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus)) + continue; + msg = pstate->parallelSlot[i].respMsg; + if (msg != NULL) + { + pstate->parallelSlot[i].respMsg = NULL; + LeaveCriticalSection(&msg_lock); + *worker = i; + return msg; + } + if (pstate->parallelSlot[i].workerDied) + anyDied = true; + } + + /* + * A worker died without responding: return NULL as the Unix path does + * on EOF, so the caller reports the failure instead of hanging. + */ + if (anyDied) + { + LeaveCriticalSection(&msg_lock); + return NULL; + } + if (!do_wait) + { + LeaveCriticalSection(&msg_lock); + return NULL; + } + SleepConditionVariableCS(&leader_cv, &msg_lock, INFINITE); + } +#else int i; fd_set workerset; int maxFd = -1; @@ -1585,6 +1711,7 @@ getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker) } Assert(false); return NULL; +#endif } /* @@ -1595,12 +1722,22 @@ getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker) static void sendMessageToWorker(ParallelState *pstate, int worker, const char *str) { +#ifdef WIN32 + ParallelSlot *slot = &pstate->parallelSlot[worker]; + + EnterCriticalSection(&msg_lock); + Assert(slot->cmdMsg == NULL); + slot->cmdMsg = pg_strdup(str); + WakeAllConditionVariable(&worker_cv); + LeaveCriticalSection(&msg_lock); +#else int len = strlen(str) + 1; if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len) { pg_fatal("could not write to the communication channel: %m"); } +#endif } /* -- 2.54.0.windows.1