From e8d55bde15b1779c40c7bbdb9e7d5f29a68d49a8 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Tue, 23 Jun 2026 23:30:17 +0300
Subject: [PATCH 6/6] Distinguish datacheckums worker invocations more reliably

In some corner cases, a new datachecksums worker could be launched
while an old one was still running. If you're really unlucky, the old
worker could set the worker_result in shared memory and mislead the
launcher to think that a newer worker invocation completed
successfully, even though it failed for some reason. That's highly
unlikely to happen in practice as it requires several race conditions
with workers and launchers starting, failing and succeeding and at the
right moments. Nevertheless, better to tighten it up.

To distinguish different worker invocations, assign a unique
'worker_invocation' number every time a new worker is launched. In the
worker, check that the invocation number matches before setting the
worker result. This ensures that the result always belongs to the
latest invocation.
---
 src/backend/postmaster/datachecksum_state.c | 99 +++++++++++++++------
 1 file changed, 73 insertions(+), 26 deletions(-)

diff --git a/src/backend/postmaster/datachecksum_state.c b/src/backend/postmaster/datachecksum_state.c
index 1ad8fea93f0..68557c16cb9 100644
--- a/src/backend/postmaster/datachecksum_state.c
+++ b/src/backend/postmaster/datachecksum_state.c
@@ -315,11 +315,18 @@ typedef struct DataChecksumsStateStruct
 	bool		launcher_running;
 
 	/*
-	 * PID of the worker process, if it's currently running, of InvalidPid if
-	 * none. This is set by the worker launcher when it starts waiting for a
-	 * worker process to finish.
+	 * Every time a new worker is launched, it's assigned a unique invocation
+	 * number by incrementing this counter.
 	 */
-	pid_t		worker_pid;
+	uint64		worker_invocation_counter;
+
+	/*
+	 * Information about the current worker, if it's currently running.  These
+	 * are set by the worker launcher.
+	 */
+	uint64		worker_invocation;	/* unique invocation number */
+	Oid			database_oid;	/* database it's processing */
+	pid_t		worker_pid;		/* worker process's PID */
 
 	/*
 	 * These fields indicate the target state that the worker is currently
@@ -361,6 +368,8 @@ typedef struct DataChecksumsWorkerDatabase
 /* Flag set by the interrupt handler */
 static volatile sig_atomic_t abort_requested = false;
 
+static uint64 worker_invocation;
+
 /*
  * Have we set the DataChecksumsStateStruct->launcher_running flag?
  * If we have, we need to clear it before exiting!
@@ -389,10 +398,21 @@ const ShmemCallbacks DataChecksumsShmemCallbacks = {
 	.request_fn = DataChecksumsShmemRequest,
 };
 
-#define CHECK_FOR_ABORT_REQUEST() \
+#define CHECK_FOR_LAUNCHER_ABORT_REQUEST() \
+	do {															\
+		Assert(MyBackendType == B_DATACHECKSUMSWORKER_LAUNCHER);	\
+		LWLockAcquire(DataChecksumsWorkerLock, LW_SHARED);			\
+		if (DataChecksumState->launch_operation != operation) 		\
+			abort_requested = true;									\
+		LWLockRelease(DataChecksumsWorkerLock);						\
+	} while (0)
+
+#define CHECK_FOR_WORKER_ABORT_REQUEST() \
 	do {															\
+		Assert(MyBackendType == B_DATACHECKSUMSWORKER_WORKER);		\
 		LWLockAcquire(DataChecksumsWorkerLock, LW_SHARED);			\
-		if (DataChecksumState->launch_operation != operation)		\
+		if (DataChecksumState->worker_invocation != worker_invocation || \
+			DataChecksumState->launch_operation != operation) 		\
 			abort_requested = true;									\
 		LWLockRelease(DataChecksumsWorkerLock);						\
 	} while (0)
@@ -726,11 +746,7 @@ ProcessSingleRelationFork(Relation reln, ForkNumber forkNum, BufferAccessStrateg
 
 		/* Check if we are asked to abort, the abortion will bubble up. */
 		Assert(operation == ENABLE_DATACHECKSUMS);
-		LWLockAcquire(DataChecksumsWorkerLock, LW_SHARED);
-		if (DataChecksumState->launch_operation == DISABLE_DATACHECKSUMS)
-			abort_requested = true;
-		LWLockRelease(DataChecksumsWorkerLock);
-
+		CHECK_FOR_WORKER_ABORT_REQUEST();
 		if (abort_requested)
 			return false;
 
@@ -813,16 +829,23 @@ ProcessDatabase(DataChecksumsWorkerDatabase *db)
 	BackgroundWorkerHandle *bgw_handle;
 	BgwHandleStatus status;
 	pid_t		pid;
+	uint64		invocation;
 	char		activity[NAMEDATALEN + 64];
 	DataChecksumsWorkerResult result;
 
+	LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
+
 	/*
 	 * Initialize result to FAILED.  The worker will change it to SUCCESSFUL
 	 * if it completes successfully.
 	 */
-	LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
 	DataChecksumState->worker_result = DATACHECKSUMSWORKER_FAILED;
 	DataChecksumState->worker_pid = InvalidPid;
+
+	invocation = ++DataChecksumState->worker_invocation_counter;
+	DataChecksumState->worker_invocation = invocation;
+	DataChecksumState->database_oid = db->dboid;
+
 	LWLockRelease(DataChecksumsWorkerLock);
 
 	memset(&bgw, 0, sizeof(bgw));
@@ -834,7 +857,8 @@ ProcessDatabase(DataChecksumsWorkerDatabase *db)
 	snprintf(bgw.bgw_type, BGW_MAXLEN, "datachecksums worker");
 	bgw.bgw_restart_time = BGW_NEVER_RESTART;
 	bgw.bgw_notify_pid = MyProcPid;
-	bgw.bgw_main_arg = ObjectIdGetDatum(db->dboid);
+	/* pass the invocation number to the worker process */
+	bgw.bgw_main_arg = UInt64GetDatum(invocation);
 
 	/*
 	 * If there are no worker slots available, there is little we can do.  If
@@ -858,6 +882,7 @@ ProcessDatabase(DataChecksumsWorkerDatabase *db)
 		 * for it we can see a STOPPED status here without it being a failure.
 		 */
 		LWLockAcquire(DataChecksumsWorkerLock, LW_SHARED);
+		Assert(DataChecksumState->worker_invocation == invocation);
 		if (DataChecksumState->worker_result == DATACHECKSUMSWORKER_SUCCESSFUL)
 		{
 			LWLockRelease(DataChecksumsWorkerLock);
@@ -901,6 +926,7 @@ ProcessDatabase(DataChecksumsWorkerDatabase *db)
 
 	/* Save the pid of the worker so we can signal it later */
 	LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
+	Assert(DataChecksumState->worker_invocation == invocation);
 	DataChecksumState->worker_pid = pid;
 	LWLockRelease(DataChecksumsWorkerLock);
 
@@ -917,6 +943,7 @@ ProcessDatabase(DataChecksumsWorkerDatabase *db)
 				errhint("Restart the database and restart data checksum processing by calling pg_enable_data_checksums()."));
 
 	LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
+	Assert(DataChecksumState->worker_invocation == invocation);
 	result = DataChecksumState->worker_result;
 	DataChecksumState->worker_pid = InvalidPid;
 	LWLockRelease(DataChecksumsWorkerLock);
@@ -1044,7 +1071,7 @@ WaitForAllTransactionsToFinish(void)
 					errhint("Data checksums processing must be restarted manually after cluster restart."));
 
 		CHECK_FOR_INTERRUPTS();
-		CHECK_FOR_ABORT_REQUEST();
+		CHECK_FOR_LAUNCHER_ABORT_REQUEST();
 
 		if (abort_requested)
 			break;
@@ -1145,13 +1172,9 @@ again:
 			 * If the target state changed during processing then it's not a
 			 * failure, so restart processing instead.
 			 */
-			LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
-			if (DataChecksumState->launch_operation != operation)
-			{
-				LWLockRelease(DataChecksumsWorkerLock);
+			CHECK_FOR_LAUNCHER_ABORT_REQUEST();
+			if (abort_requested)
 				goto done;
-			}
-			LWLockRelease(DataChecksumsWorkerLock);
 			ereport(ERROR,
 					errcode(ERRCODE_INSUFFICIENT_RESOURCES),
 					errmsg("unable to enable data checksums in cluster"));
@@ -1520,7 +1543,7 @@ BuildRelationList(bool temp_relations, bool include_shared)
 void
 DataChecksumsWorkerMain(Datum arg)
 {
-	Oid			dboid = DatumGetObjectId(arg);
+	Oid			dboid;
 	List	   *RelationList = NIL;
 	List	   *InitialTempTableList = NIL;
 	BufferAccessStrategy strategy;
@@ -1531,6 +1554,8 @@ DataChecksumsWorkerMain(Datum arg)
 	bool		retried = false;
 #endif
 
+	worker_invocation = DatumGetUInt64(arg);
+
 	operation = ENABLE_DATACHECKSUMS;
 
 	pqsignal(SIGTERM, die);
@@ -1541,6 +1566,15 @@ DataChecksumsWorkerMain(Datum arg)
 	MyBackendType = B_DATACHECKSUMSWORKER_WORKER;
 	init_ps_display(NULL);
 
+	LWLockAcquire(DataChecksumsWorkerLock, LW_SHARED);
+	if (DataChecksumState->worker_invocation != worker_invocation)
+	{
+		LWLockRelease(DataChecksumsWorkerLock);
+		return;
+	}
+	dboid = DataChecksumState->database_oid;
+	LWLockRelease(DataChecksumsWorkerLock);
+
 	BackgroundWorkerInitializeConnectionByOid(dboid, InvalidOid,
 											  BGWORKER_BYPASS_ALLOWCONN);
 
@@ -1556,6 +1590,11 @@ DataChecksumsWorkerMain(Datum arg)
 	 */
 	InitialTempTableList = BuildRelationList(true, false);
 	LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
+	if (DataChecksumState->worker_invocation != worker_invocation)
+	{
+		LWLockRelease(DataChecksumsWorkerLock);
+		return;
+	}
 	process_shared = DataChecksumState->process_shared_catalogs;
 
 	/*
@@ -1611,7 +1650,7 @@ DataChecksumsWorkerMain(Datum arg)
 		pgstat_progress_update_param(PROGRESS_DATACHECKSUMS_RELS_DONE,
 									 ++rels_done);
 		CHECK_FOR_INTERRUPTS();
-		CHECK_FOR_ABORT_REQUEST();
+		CHECK_FOR_WORKER_ABORT_REQUEST();
 
 		if (abort_requested)
 			break;
@@ -1622,6 +1661,11 @@ DataChecksumsWorkerMain(Datum arg)
 		 * to be refreshed.
 		 */
 		LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
+		if (DataChecksumState->worker_invocation != worker_invocation)
+		{
+			LWLockRelease(DataChecksumsWorkerLock);
+			break;
+		}
 		if ((DataChecksumState->launch_cost_delay != DataChecksumState->cost_delay)
 			|| (DataChecksumState->launch_cost_limit != DataChecksumState->cost_limit))
 		{
@@ -1650,7 +1694,8 @@ DataChecksumsWorkerMain(Datum arg)
 	if (aborted || abort_requested)
 	{
 		LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
-		DataChecksumState->worker_result = DATACHECKSUMSWORKER_ABORTED;
+		if (DataChecksumState->worker_invocation == worker_invocation)
+			DataChecksumState->worker_result = DATACHECKSUMSWORKER_ABORTED;
 		LWLockRelease(DataChecksumsWorkerLock);
 		ereport(DEBUG1,
 				errmsg("data checksum processing aborted in database OID %u",
@@ -1717,12 +1762,13 @@ DataChecksumsWorkerMain(Datum arg)
 						 WAIT_EVENT_CHECKSUM_ENABLE_TEMPTABLE_WAIT);
 
 		CHECK_FOR_INTERRUPTS();
-		CHECK_FOR_ABORT_REQUEST();
+		CHECK_FOR_WORKER_ABORT_REQUEST();
 
 		if (aborted || abort_requested)
 		{
 			LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
-			DataChecksumState->worker_result = DATACHECKSUMSWORKER_ABORTED;
+			if (DataChecksumState->worker_invocation == worker_invocation)
+				DataChecksumState->worker_result = DATACHECKSUMSWORKER_ABORTED;
 			LWLockRelease(DataChecksumsWorkerLock);
 			ereport(LOG,
 					errmsg("data checksum processing aborted in database OID %u",
@@ -1737,6 +1783,7 @@ DataChecksumsWorkerMain(Datum arg)
 	pgstat_progress_end_command();
 
 	LWLockAcquire(DataChecksumsWorkerLock, LW_EXCLUSIVE);
-	DataChecksumState->worker_result = DATACHECKSUMSWORKER_SUCCESSFUL;
+	if (DataChecksumState->worker_invocation == worker_invocation)
+		DataChecksumState->worker_result = DATACHECKSUMSWORKER_SUCCESSFUL;
 	LWLockRelease(DataChecksumsWorkerLock);
 }
-- 
2.47.3

