From 6a82771fbab938cd10e14de7490341c3b546dfc6 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Tue, 16 Jun 2026 22:06:03 +0300
Subject: [PATCH v13 4/9] Refactor PARALLEL_MESSAGE procsignal signaling

When a message is sent to a shm_mq in pq_putmessage(), the sender
sends a PROCSIG_PARALLEL_MESSAGE procsignal to the receiver, to notify
it of the new message. However, in case of a parallel apply worker or
a repack worker, we used PROCSIG_PARALLEL_APPLY_MESSAGE or
PROCSIG_REPACK_MESSAGE instead. Remove those extra procsignals, always
use PROCSIG_PARALLEL_MESSAGE, and move the logic to the receiving side
instead to check all different kinds of incoming messages.

This deduplicates the code between ProcessParallelMessages(),
ProcessParallelApplyMessages(), and ProcessRepackMessages() to set up
the short-lived memory context and to hold interrupts.
---
 src/backend/access/transam/parallel.c         | 41 +++++++++---
 src/backend/commands/repack.c                 | 60 +-----------------
 src/backend/commands/repack_worker.c          |  2 +-
 src/backend/libpq/pqmq.c                      | 20 +-----
 .../replication/logical/applyparallelworker.c | 63 ++-----------------
 src/backend/storage/ipc/procsignal.c          |  6 --
 src/backend/tcop/postgres.c                   |  8 +--
 src/include/access/parallel.h                 |  2 +-
 src/include/replication/logicalworker.h       |  1 -
 src/include/storage/procsignal.h              |  2 -
 10 files changed, 46 insertions(+), 159 deletions(-)

diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 89e9d224eec..5c92e05ad86 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -26,6 +26,7 @@
 #include "catalog/pg_enum.h"
 #include "catalog/storage.h"
 #include "commands/async.h"
+#include "commands/repack.h"
 #include "commands/vacuum.h"
 #include "executor/execParallel.h"
 #include "libpq/libpq.h"
@@ -34,6 +35,7 @@
 #include "miscadmin.h"
 #include "optimizer/optimizer.h"
 #include "pgstat.h"
+#include "replication/logicalworker.h"
 #include "storage/ipc.h"
 #include "storage/predicate.h"
 #include "storage/proc.h"
@@ -160,6 +162,7 @@ static const struct
 };
 
 /* Private functions. */
+static void ProcessParallelMessages(void);
 static void ProcessParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
 static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
 static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname);
@@ -1054,9 +1057,8 @@ HandleParallelMessageInterrupt(void)
  * Process any queued protocol messages received from parallel workers.
  */
 void
-ProcessParallelMessages(void)
+ProcessParallelMessageInterrupt(void)
 {
-	dlist_iter	iter;
 	MemoryContext oldcontext;
 
 	static MemoryContext hpm_context = NULL;
@@ -1087,6 +1089,34 @@ ProcessParallelMessages(void)
 	/* OK to process messages.  Reset the flag saying there are more to do. */
 	ParallelMessagePending = false;
 
+	/* Process messages from parallel query workers */
+	ProcessParallelMessages();
+
+	/* Process messages from REPACK CONCURRENTLY workers */
+	ProcessRepackMessages();
+
+	/* Process messages from replication parallel apply workers */
+	ProcessParallelApplyMessages();
+
+	MemoryContextSwitchTo(oldcontext);
+
+	/* Might as well clear the context on our way out */
+	MemoryContextReset(hpm_context);
+
+	RESUME_INTERRUPTS();
+}
+
+/*
+ * Process any incoming messages from parallel workers.
+ *
+ * This is called from CHECK_FOR_INTERRUPTS(), but in a short-lived memory
+ * context.
+ */
+static void
+ProcessParallelMessages(void)
+{
+	dlist_iter	iter;
+
 	dlist_foreach(iter, &pcxt_list)
 	{
 		ParallelContext *pcxt;
@@ -1130,13 +1160,6 @@ ProcessParallelMessages(void)
 			}
 		}
 	}
-
-	MemoryContextSwitchTo(oldcontext);
-
-	/* Might as well clear the context on our way out */
-	MemoryContextReset(hpm_context);
-
-	RESUME_INTERRUPTS();
 }
 
 /*
diff --git a/src/backend/commands/repack.c b/src/backend/commands/repack.c
index 4d177c868bb..a2a4c286772 100644
--- a/src/backend/commands/repack.c
+++ b/src/backend/commands/repack.c
@@ -145,12 +145,6 @@ typedef struct DecodingWorker
 /* Pointer to currently running decoding worker. */
 static DecodingWorker *decoding_worker = NULL;
 
-/*
- * Is there a message sent by a repack worker that the backend needs to
- * receive?
- */
-volatile sig_atomic_t RepackMessagePending = false;
-
 static LOCKMODE RepackLockLevel(bool concurrent);
 static bool cluster_rel_recheck(RepackCommand cmd, Relation OldHeap,
 								Oid indexOid, Oid userid, LOCKMODE lmode,
@@ -3647,30 +3641,15 @@ DecodingWorkerFileName(char *fname, Oid relid, uint32 seq)
 	snprintf(fname, MAXPGPATH, "%u-%u", relid, seq);
 }
 
-/*
- * Handle receipt of an interrupt indicating a repack worker message.
- *
- * Note: this is called within a signal handler!  All we can do is set
- * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
- * ProcessRepackMessages().
- */
-void
-HandleRepackMessageInterrupt(void)
-{
-	InterruptPending = true;
-	RepackMessagePending = true;
-	SetLatch(MyLatch);
-}
-
 /*
  * Process any queued protocol messages received from the repack worker.
+ *
+ * This is called from CHECK_FOR_INTERRUPTS(), but in a short-lived memory
+ * context.
  */
 void
 ProcessRepackMessages(void)
 {
-	MemoryContext oldcontext;
-	static MemoryContext hpm_context = NULL;
-
 	/*
 	 * Nothing to do if we haven't launched the worker yet or have already
 	 * terminated it.
@@ -3678,32 +3657,6 @@ ProcessRepackMessages(void)
 	if (decoding_worker == NULL)
 		return;
 
-	/*
-	 * This is invoked from ProcessInterrupts(), and since some of the
-	 * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
-	 * for recursive calls if more signals are received while this runs.  It's
-	 * unclear that recursive entry would be safe, and it doesn't seem useful
-	 * even if it is safe, so let's block interrupts until done.
-	 */
-	HOLD_INTERRUPTS();
-
-	/*
-	 * Moreover, CurrentMemoryContext might be pointing almost anywhere.  We
-	 * don't want to risk leaking data into long-lived contexts, so let's do
-	 * our work here in a private context that we can reset on each use.
-	 */
-	if (hpm_context == NULL)	/* first time through? */
-		hpm_context = AllocSetContextCreate(TopMemoryContext,
-											"ProcessRepackMessages",
-											ALLOCSET_DEFAULT_SIZES);
-	else
-		MemoryContextReset(hpm_context);
-
-	oldcontext = MemoryContextSwitchTo(hpm_context);
-
-	/* OK to process messages.  Reset the flag saying there are more to do. */
-	RepackMessagePending = false;
-
 	/*
 	 * Read as many messages as we can from the worker, but stop when no more
 	 * messages can be read from the worker without blocking.
@@ -3738,13 +3691,6 @@ ProcessRepackMessages(void)
 			break;
 		}
 	}
-
-	MemoryContextSwitchTo(oldcontext);
-
-	/* Might as well clear the context on our way out */
-	MemoryContextReset(hpm_context);
-
-	RESUME_INTERRUPTS();
 }
 
 /*
diff --git a/src/backend/commands/repack_worker.c b/src/backend/commands/repack_worker.c
index db9ff057cc6..9ff2e14904c 100644
--- a/src/backend/commands/repack_worker.c
+++ b/src/backend/commands/repack_worker.c
@@ -172,7 +172,7 @@ RepackWorkerShutdown(int code, Datum arg)
 	DecodingWorkerShared *shared = (DecodingWorkerShared *) DatumGetPointer(arg);
 
 	SendProcSignal(shared->backend_pid,
-				   PROCSIG_REPACK_MESSAGE,
+				   PROCSIG_PARALLEL_MESSAGE,
 				   shared->backend_proc_number);
 
 	dsm_detach(worker_dsm_segment);
diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c
index d038a9da515..238fb73ebd8 100644
--- a/src/backend/libpq/pqmq.c
+++ b/src/backend/libpq/pqmq.c
@@ -172,23 +172,9 @@ mq_putmessage(char msgtype, const char *s, size_t len)
 		result = shm_mq_sendv(pq_mq_handle, iov, 2, true, true);
 
 		if (pq_mq_parallel_leader_pid != 0)
-		{
-			if (IsLogicalParallelApplyWorker())
-				SendProcSignal(pq_mq_parallel_leader_pid,
-							   PROCSIG_PARALLEL_APPLY_MESSAGE,
-							   pq_mq_parallel_leader_proc_number);
-			else if (AmRepackWorker())
-				SendProcSignal(pq_mq_parallel_leader_pid,
-							   PROCSIG_REPACK_MESSAGE,
-							   pq_mq_parallel_leader_proc_number);
-			else
-			{
-				Assert(IsParallelWorker());
-				SendProcSignal(pq_mq_parallel_leader_pid,
-							   PROCSIG_PARALLEL_MESSAGE,
-							   pq_mq_parallel_leader_proc_number);
-			}
-		}
+			SendProcSignal(pq_mq_parallel_leader_pid,
+						   PROCSIG_PARALLEL_MESSAGE,
+						   pq_mq_parallel_leader_proc_number);
 
 		if (result != SHM_MQ_WOULD_BLOCK)
 			break;
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 012d55e9d3d..9535abef122 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -241,12 +241,6 @@ static List *ParallelApplyWorkerPool = NIL;
  */
 ParallelApplyWorkerShared *MyParallelShared = NULL;
 
-/*
- * Is there a message sent by a parallel apply worker that the leader apply
- * worker needs to receive?
- */
-volatile sig_atomic_t ParallelApplyMessagePending = false;
-
 /*
  * Cache the parallel apply worker information required for applying the
  * current streaming transaction. It is used to save the cost of searching the
@@ -857,7 +851,7 @@ static void
 pa_shutdown(int code, Datum arg)
 {
 	SendProcSignal(MyLogicalRepWorker->leader_pid,
-				   PROCSIG_PARALLEL_APPLY_MESSAGE,
+				   PROCSIG_PARALLEL_MESSAGE,
 				   INVALID_PROC_NUMBER);
 
 	dsm_detach((dsm_segment *) DatumGetPointer(arg));
@@ -997,21 +991,6 @@ ParallelApplyWorkerMain(Datum main_arg)
 	Assert(false);
 }
 
-/*
- * Handle receipt of an interrupt indicating a parallel apply worker message.
- *
- * Note: this is called within a signal handler! All we can do is set a flag
- * that will cause the next CHECK_FOR_INTERRUPTS() to invoke
- * ProcessParallelApplyMessages().
- */
-void
-HandleParallelApplyMessageInterrupt(void)
-{
-	InterruptPending = true;
-	ParallelApplyMessagePending = true;
-	/* latch will be set by procsignal_sigusr1_handler */
-}
-
 /*
  * Process a single protocol message received from a single parallel apply
  * worker.
@@ -1076,40 +1055,15 @@ ProcessParallelApplyMessage(StringInfo msg)
 }
 
 /*
- * Handle any queued protocol messages received from parallel apply workers.
+ * Process any queued protocol messages received from parallel apply workers.
+ *
+ * This is called from CHECK_FOR_INTERRUPTS(), but in a short-lived memory
+ * context.
  */
 void
 ProcessParallelApplyMessages(void)
 {
 	ListCell   *lc;
-	MemoryContext oldcontext;
-
-	static MemoryContext hpam_context = NULL;
-
-	/*
-	 * This is invoked from ProcessInterrupts(), and since some of the
-	 * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
-	 * for recursive calls if more signals are received while this runs. It's
-	 * unclear that recursive entry would be safe, and it doesn't seem useful
-	 * even if it is safe, so let's block interrupts until done.
-	 */
-	HOLD_INTERRUPTS();
-
-	/*
-	 * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
-	 * don't want to risk leaking data into long-lived contexts, so let's do
-	 * our work here in a private context that we can reset on each use.
-	 */
-	if (!hpam_context)			/* first time through? */
-		hpam_context = AllocSetContextCreate(TopMemoryContext,
-											 "ProcessParallelApplyMessages",
-											 ALLOCSET_DEFAULT_SIZES);
-	else
-		MemoryContextReset(hpam_context);
-
-	oldcontext = MemoryContextSwitchTo(hpam_context);
-
-	ParallelApplyMessagePending = false;
 
 	foreach(lc, ParallelApplyWorkerPool)
 	{
@@ -1145,13 +1099,6 @@ ProcessParallelApplyMessages(void)
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 					 errmsg("lost connection to the logical replication parallel apply worker")));
 	}
-
-	MemoryContextSwitchTo(oldcontext);
-
-	/* Might as well clear the context on our way out */
-	MemoryContextReset(hpam_context);
-
-	RESUME_INTERRUPTS();
 }
 
 /*
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index 1397f65f67b..00b640f2d53 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -713,12 +713,6 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
 	if (CheckProcSignal(PROCSIG_LOG_MEMORY_CONTEXT))
 		HandleLogMemoryContextInterrupt();
 
-	if (CheckProcSignal(PROCSIG_PARALLEL_APPLY_MESSAGE))
-		HandleParallelApplyMessageInterrupt();
-
-	if (CheckProcSignal(PROCSIG_REPACK_MESSAGE))
-		HandleRepackMessageInterrupt();
-
 	if (CheckProcSignal(PROCSIG_SLOTSYNC_MESSAGE))
 		HandleSlotSyncMessageInterrupt();
 
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 1f2fc3cce9b..80449b0b3cd 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -3604,19 +3604,13 @@ ProcessInterrupts(void)
 		ProcessProcSignalBarrier();
 
 	if (ParallelMessagePending)
-		ProcessParallelMessages();
+		ProcessParallelMessageInterrupt();
 
 	if (LogMemoryContextPending)
 		ProcessLogMemoryContextInterrupt();
 
-	if (ParallelApplyMessagePending)
-		ProcessParallelApplyMessages();
-
 	if (SlotSyncShutdownPending)
 		ProcessSlotSyncMessage();
-
-	if (RepackMessagePending)
-		ProcessRepackMessages();
 }
 
 /*
diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h
index 60f857675e0..0ed0a88d831 100644
--- a/src/include/access/parallel.h
+++ b/src/include/access/parallel.h
@@ -73,7 +73,7 @@ extern void DestroyParallelContext(ParallelContext *pcxt);
 extern bool ParallelContextActive(void);
 
 extern void HandleParallelMessageInterrupt(void);
-extern void ProcessParallelMessages(void);
+extern void ProcessParallelMessageInterrupt(void);
 extern void AtEOXact_Parallel(bool isCommit);
 extern void AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId);
 extern void ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end);
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index 7d748a28da8..f6994e9a1a1 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -24,7 +24,6 @@ extern void SequenceSyncWorkerMain(Datum main_arg);
 extern bool IsLogicalWorker(void);
 extern bool IsLogicalParallelApplyWorker(void);
 
-extern void HandleParallelApplyMessageInterrupt(void);
 extern void ProcessParallelApplyMessages(void);
 
 extern void LogicalRepWorkersWakeupAtCommit(Oid subid);
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index aaa158bfd66..691314e26bf 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -35,9 +35,7 @@ typedef enum
 	PROCSIG_WALSND_INIT_STOPPING,	/* ask walsenders to prepare for shutdown  */
 	PROCSIG_BARRIER,			/* global barrier interrupt  */
 	PROCSIG_LOG_MEMORY_CONTEXT, /* ask backend to log the memory contexts */
-	PROCSIG_PARALLEL_APPLY_MESSAGE, /* Message from parallel apply workers */
 	PROCSIG_SLOTSYNC_MESSAGE,	/* ask slot synchronization to stop */
-	PROCSIG_REPACK_MESSAGE,		/* Message from repack worker */
 	PROCSIG_RECOVERY_CONFLICT,	/* backend is blocking recovery, check
 								 * PGPROC->pendingRecoveryConflicts for the
 								 * reason */
-- 
2.47.3

