From 240b9b57a54841dfedc4a4f8c2a692533dee3c1c Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Mon, 4 Nov 2024 20:58:36 +0200
Subject: [PATCH v3 2/3] Fix lost wakeup issue in logical replication launcher

by using a different interrupt reason for subscription changes

https://www.postgresql.org/message-id/flat/ff0663d9-8011-420f-a169-efbf57327cb5%40iki.fi#bef984f8c43d6b8a9428d2c5547fe72b
---
 src/backend/replication/logical/launcher.c | 23 ++++++++++++++--------
 src/include/storage/interrupt.h            |  5 ++++-
 2 files changed, 19 insertions(+), 9 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 1fb8790f213..08525c8a84b 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -57,7 +57,7 @@ LogicalRepWorker *MyLogicalRepWorker = NULL;
 typedef struct LogicalRepCtxStruct
 {
 	/* Supervisor process. */
-	pid_t		launcher_pid;
+	ProcNumber	launcher_procno;
 
 	/* Hash table holding last start times of subscriptions' apply workers. */
 	dsa_handle	last_start_dsa;
@@ -814,7 +814,7 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker)
 static void
 logicalrep_launcher_onexit(int code, Datum arg)
 {
-	LogicalRepCtx->launcher_pid = 0;
+	LogicalRepCtx->launcher_procno = INVALID_PROC_NUMBER;
 }
 
 /*
@@ -974,6 +974,7 @@ ApplyLauncherShmemInit(void)
 
 		memset(LogicalRepCtx, 0, ApplyLauncherShmemSize());
 
+		LogicalRepCtx->launcher_procno = INVALID_PROC_NUMBER;
 		LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID;
 		LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID;
 
@@ -1119,8 +1120,12 @@ ApplyLauncherWakeupAtCommit(void)
 static void
 ApplyLauncherWakeup(void)
 {
-	if (LogicalRepCtx->launcher_pid != 0)
-		kill(LogicalRepCtx->launcher_pid, SIGUSR1);
+	volatile LogicalRepCtxStruct *repctx = LogicalRepCtx;
+	ProcNumber	launcher_procno;
+
+	launcher_procno = repctx->launcher_procno;
+	if (launcher_procno != INVALID_PROC_NUMBER)
+		SendInterrupt(INTERRUPT_SUBSCRIPTION_CHANGE, launcher_procno);
 }
 
 /*
@@ -1134,8 +1139,8 @@ ApplyLauncherMain(Datum main_arg)
 
 	before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0);
 
-	Assert(LogicalRepCtx->launcher_pid == 0);
-	LogicalRepCtx->launcher_pid = MyProcPid;
+	Assert(LogicalRepCtx->launcher_procno == INVALID_PROC_NUMBER);
+	LogicalRepCtx->launcher_procno = MyProcNumber;
 
 	/* Establish signal handlers. */
 	pqsignal(SIGHUP, SignalHandlerForConfigReload);
@@ -1167,6 +1172,7 @@ ApplyLauncherMain(Datum main_arg)
 		oldctx = MemoryContextSwitchTo(subctx);
 
 		/* Start any missing workers for enabled subscriptions. */
+		ClearInterrupt(INTERRUPT_SUBSCRIPTION_CHANGE);
 		sublist = get_subscription_list();
 		foreach(lc, sublist)
 		{
@@ -1223,7 +1229,8 @@ ApplyLauncherMain(Datum main_arg)
 		MemoryContextDelete(subctx);
 
 		/* Wait for more work. */
-		rc = WaitInterrupt(1 << INTERRUPT_GENERAL_WAKEUP,
+		rc = WaitInterrupt(1 << INTERRUPT_GENERAL_WAKEUP |
+						   1 << INTERRUPT_SUBSCRIPTION_CHANGE,
 						   WL_INTERRUPT | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
 						   wait_time,
 						   WAIT_EVENT_LOGICAL_LAUNCHER_MAIN);
@@ -1250,7 +1257,7 @@ ApplyLauncherMain(Datum main_arg)
 bool
 IsLogicalLauncher(void)
 {
-	return LogicalRepCtx->launcher_pid == MyProcPid;
+	return LogicalRepCtx->launcher_procno == MyProcNumber;
 }
 
 /*
diff --git a/src/include/storage/interrupt.h b/src/include/storage/interrupt.h
index 5bcc9f2407e..359c35ee813 100644
--- a/src/include/storage/interrupt.h
+++ b/src/include/storage/interrupt.h
@@ -98,11 +98,14 @@ typedef enum
 	INTERRUPT_GENERAL_WAKEUP,
 
 	/*
-	 * INTERRUPT_RECOVERY_WAKEUP is used to wake up startup process, to tell
+	 * INTERRUPT_RECOVERY_CONTINUE is used to wake up startup process, to tell
 	 * it that it should continue WAL replay. It's sent by WAL receiver when
 	 * more WAL arrives, or when promotion is requested.
 	 */
 	INTERRUPT_RECOVERY_CONTINUE,
+
+	/* sent to logical replication launcher, when a subscription changes */
+	INTERRUPT_SUBSCRIPTION_CHANGE,
 } InterruptType;
 
 /*
-- 
2.39.5

