From b7a14884878ddb8ed274ced5c725886edd48b164 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=C3=81lvaro=20Herrera?= <alvherre@kurilemu.de>
Date: Tue, 24 Mar 2026 15:09:03 +0100
Subject: [PATCH v44 07/10] Split cluster.h to create repack_internal.h

Most of the details of concurrent repack are only needed by
pgoutput_repack; and they have a nasty effect on headers included by
cluster.h (used by several high-profile places), so isolate that for
cleanliness.

Also, change_useless_for_repack() is better declared in decode.h.
---
 src/backend/commands/cluster.c                | 128 ++++++++++--------
 src/backend/postmaster/bgworker.c             |   2 +-
 .../pgoutput_repack/pgoutput_repack.c         |  13 +-
 src/include/commands/cluster.h                |  57 +-------
 src/include/commands/repack_internal.h        |  59 ++++++++
 src/include/replication/decode.h              |   4 +
 6 files changed, 147 insertions(+), 116 deletions(-)
 create mode 100644 src/include/commands/repack_internal.h

diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index 75556cbdafb..2c3058ba10d 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -59,6 +59,7 @@
 #include "commands/cluster.h"
 #include "commands/defrem.h"
 #include "commands/progress.h"
+#include "commands/repack_internal.h"
 #include "commands/tablecmds.h"
 #include "commands/vacuum.h"
 #include "executor/executor.h"
@@ -205,17 +206,11 @@ typedef struct DecodingWorkerShared
 	char		error_queue[FLEXIBLE_ARRAY_MEMBER];
 } DecodingWorkerShared;
 
-/*
- * Generate worker's output file name. If relations of the same 'relid' happen
- * to be processed at the same time, they must be from different databases and
- * therefore different backends must be involved. (PID is already present in
- * the fileset name.)
- */
-static inline void
-DecodingWorkerFileName(char *fname, Oid relid, uint32 seq)
-{
-	snprintf(fname, MAXPGPATH, "%u-%u", relid, seq);
-}
+/* Is this process a REPACK worker? */
+static bool is_repack_worker = false;
+
+static pid_t backend_pid;
+static ProcNumber backend_proc_number;
 
 /*
  * Backend-local information to control the decoding worker.
@@ -241,6 +236,7 @@ static DecodingWorker *decoding_worker = NULL;
  */
 volatile sig_atomic_t RepackMessagePending = false;
 
+static LOCKMODE RepackLockLevel(bool concurrent);
 static bool cluster_rel_recheck(RepackCommand cmd, Relation OldHeap,
 								Oid indexOid, Oid userid, LOCKMODE lmode,
 								int options);
@@ -298,12 +294,16 @@ static Relation process_single_relation(RepackStmt *stmt,
 										ClusterParams *params);
 static Oid	determine_clustered_index(Relation rel, bool usingindex,
 									  const char *indexname);
-static void start_decoding_worker(Oid relid);
-static void stop_decoding_worker(void);
+
+static void start_repack_decoding_worker(Oid relid);
+static void stop_repack_decoding_worker(void);
 static void repack_worker_internal(dsm_segment *seg);
 static void export_initial_snapshot(Snapshot snapshot,
 									DecodingWorkerShared *shared);
 static Snapshot get_initial_snapshot(DecodingWorker *worker);
+extern bool am_decoding_for_repack(void);
+static void DecodingWorkerFileName(char *fname, Oid relid, uint32 seq);
+
 static void ProcessRepackMessage(StringInfo msg);
 static const char *RepackCommandAsString(RepackCommand cmd);
 
@@ -369,17 +369,8 @@ ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel)
 					parser_errposition(pstate, opt->location));
 	}
 
-	/*
-	 * Determine the lock mode expected by cluster_rel().
-	 *
-	 * In the exclusive case, we obtain AccessExclusiveLock right away to
-	 * avoid lock-upgrade hazard in the single-transaction case. In the
-	 * CONCURRENTLY case, the AccessExclusiveLock will only be used at the end
-	 * of processing, supposedly for very short time. Until then, we'll have
-	 * to unlock the relation temporarily, so there's no lock-upgrade hazard.
-	 */
-	lockmode = (params.options & CLUOPT_CONCURRENT) == 0 ?
-		AccessExclusiveLock : ShareUpdateExclusiveLock;
+	/* Determine the lock mode to use. */
+	lockmode = RepackLockLevel((params.options & CLUOPT_CONCURRENT) != 0);
 
 	/*
 	 * If a single relation is specified, process it and we're done ... unless
@@ -434,6 +425,12 @@ ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel)
 										   "Repack",
 										   ALLOCSET_DEFAULT_SIZES);
 
+	/*
+	 * Since we open a new transaction for each relation, we have to check
+	 * that the relation still is what we think it is.
+	 *
+	 * In single-transaction CLUSTER, we don't need the overhead.
+	 */
 	params.options |= CLUOPT_RECHECK;
 
 	/*
@@ -544,6 +541,22 @@ ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel)
 	MemoryContextDelete(repack_context);
 }
 
+/*
+ * In the non-concurrent case, we obtain AccessExclusiveLock throughout the
+ * operation to avoid any lock-upgrade hazards.  In the concurrent case, we
+ * grab ShareUpdateExclusiveLock (jsut like VACUUM) for most of the
+ * processing and only acquire AccessExclusiveLock at the end, to swap the
+ * relation -- supposedly for a short time.
+ */
+static LOCKMODE
+RepackLockLevel(bool concurrent)
+{
+	if (concurrent)
+		return ShareUpdateExclusiveLock;
+	else
+		return AccessExclusiveLock;
+}
+
 /*
  * cluster_rel
  *
@@ -583,25 +596,22 @@ cluster_rel(RepackCommand cmd, Relation OldHeap, Oid indexOid,
 	bool		concurrent = ((params->options & CLUOPT_CONCURRENT) != 0);
 	Oid			ident_idx = InvalidOid;
 
-	/*
-	 * The lock mode is AccessExclusiveLock for normal processing and
-	 * ShareUpdateExclusiveLock for concurrent processing (so that SELECT,
-	 * INSERT, UPDATE and DELETE commands work, but cluster_rel() cannot be
-	 * called concurrently for the same relation).
-	 */
-	lmode = !concurrent ? AccessExclusiveLock : ShareUpdateExclusiveLock;
+	/* Determine the lock mode to use. */
+	lmode = RepackLockLevel(concurrent);
 
-	/* There are specific requirements on concurrent processing. */
+	/*
+	 * Check some preconditions in the concurrent case.  This also obtains the
+	 * replica index OID.
+	 */
 	if (concurrent)
 	{
 		/*
-		 * Make sure we have no XID assigned, otherwise call of
-		 * repack_setup_logical_decoding() can cause a deadlock.
+		 * Make sure we're not in a transaction block.
 		 *
-		 * The existence of transaction block actually does not imply that XID
-		 * was already assigned, but it very likely is. We might want to check
-		 * the result of GetCurrentTransactionIdIfAny() instead, but that
-		 * would be less clear from user's perspective.
+		 * The reason is that repack_setup_logical_decoding() could deadlock
+		 * if there's an XID already assigned.  It would be possible to run in
+		 * a transaction block if we had no XID, but this restriction is
+		 * simpler for users to understand and we don't lose anything.
 		 */
 		PreventInTransactionBlock(isTopLevel, "REPACK (CONCURRENTLY)");
 
@@ -626,15 +636,11 @@ cluster_rel(RepackCommand cmd, Relation OldHeap, Oid indexOid,
 	RestrictSearchPath();
 
 	/*
-	 * Since we may open a new transaction for each relation, we have to check
-	 * that the relation still is what we think it is.
+	 * Recheck that the relation is still what it was when we started.
 	 *
-	 * If this is a single-transaction CLUSTER, we can skip these tests. We
-	 * *must* skip the one on indisclustered since it would reject an attempt
-	 * to cluster a not-previously-clustered index.
-	 *
-	 * XXX move [some of] these comments to where the RECHECK flag is
-	 * determined?
+	 * Note that it's critical to skip this in single-relation CLUSTER;
+	 * otherwise, we would reject an attempt to cluster using a
+	 * not-previously-clustered index.
 	 */
 	if (recheck &&
 		!cluster_rel_recheck(cmd, OldHeap, indexOid, save_userid,
@@ -754,7 +760,7 @@ cluster_rel(RepackCommand cmd, Relation OldHeap, Oid indexOid,
 			 * However it still seems a good practice to make sure that the
 			 * worker never survives the REPACK command.
 			 */
-			stop_decoding_worker();
+			stop_repack_decoding_worker();
 		}
 	}
 	PG_END_TRY();
@@ -1093,7 +1099,7 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose,
 		 * clustering index) and checking again if it's still eligible for
 		 * REPACK CONCURRENTLY.
 		 */
-		start_decoding_worker(tableOid);
+		start_repack_decoding_worker(tableOid);
 
 		/*
 		 * Wait until the worker has the initial snapshot and retrieve it.
@@ -2538,7 +2544,6 @@ RepackCommandAsString(RepackCommand cmd)
 	return "???";				/* keep compiler quiet */
 }
 
-
 /*
  * Is this backend performing logical decoding on behalf of REPACK
  * (CONCURRENTLY) ?
@@ -3688,7 +3693,7 @@ build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes)
  * contents to a new table.
  */
 static void
-start_decoding_worker(Oid relid)
+start_repack_decoding_worker(Oid relid)
 {
 	Size		size;
 	dsm_segment *seg;
@@ -3779,7 +3784,7 @@ start_decoding_worker(Oid relid)
  * we need to stop it explicitly at least on ERROR in the launching backend.
  */
 static void
-stop_decoding_worker(void)
+stop_repack_decoding_worker(void)
 {
 	BgwHandleStatus status;
 
@@ -3817,12 +3822,6 @@ stop_decoding_worker(void)
 	decoding_worker = NULL;
 }
 
-/* Is this process a REPACK worker? */
-static bool is_repack_worker = false;
-
-static pid_t backend_pid;
-static ProcNumber backend_proc_number;
-
 /*
  * See ParallelWorkerShutdown for details.
  */
@@ -3869,7 +3868,7 @@ RepackWorkerMain(Datum main_arg)
 
 	/*
 	 * Join locking group - see the comments around the call of
-	 * start_decoding_worker().
+	 * start_repack_decoding_worker().
 	 */
 	if (!BecomeLockGroupMember(shared->backend_proc, backend_pid))
 		/* The leader is not running anymore. */
@@ -4039,6 +4038,18 @@ get_initial_snapshot(DecodingWorker *worker)
 	return snapshot;
 }
 
+/*
+ * Generate worker's output file name. If relations of the same 'relid' happen
+ * to be processed at the same time, they must be from different databases and
+ * therefore different backends must be involved. (PID is already present in
+ * the fileset name.)
+ */
+static void
+DecodingWorkerFileName(char *fname, Oid relid, uint32 seq)
+{
+	snprintf(fname, MAXPGPATH, "%u-%u", relid, seq);
+}
+
 bool
 IsRepackWorker(void)
 {
@@ -4067,7 +4078,6 @@ void
 ProcessRepackMessages(void)
 {
 	MemoryContext oldcontext;
-
 	static MemoryContext hpm_context = NULL;
 
 	/*
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index f8a8d1681e9..9e876d55e27 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -13,7 +13,7 @@
 #include "postgres.h"
 
 #include "access/parallel.h"
-#include "commands/cluster.h"
+#include "commands/repack_internal.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
 #include "pgstat.h"
diff --git a/src/backend/replication/pgoutput_repack/pgoutput_repack.c b/src/backend/replication/pgoutput_repack/pgoutput_repack.c
index 032fbd0e5b0..cc9ce615b18 100644
--- a/src/backend/replication/pgoutput_repack/pgoutput_repack.c
+++ b/src/backend/replication/pgoutput_repack/pgoutput_repack.c
@@ -13,7 +13,7 @@
 #include "postgres.h"
 
 #include "access/detoast.h"
-#include "commands/cluster.h"
+#include "commands/repack_internal.h"
 #include "replication/snapbuild.h"
 #include "utils/memutils.h"
 
@@ -176,7 +176,7 @@ repack_store_change(LogicalDecodingContext *ctx, Relation relation,
 					ConcurrentChangeKind kind, HeapTuple tuple)
 {
 	RepackDecodingState *dstate;
-	MemoryContext	oldcxt;
+	MemoryContext oldcxt;
 	BufFile    *file;
 	List	   *attrs_ext = NIL;
 	int			natt_ext;
@@ -226,7 +226,10 @@ repack_store_change(LogicalDecodingContext *ctx, Relation relation,
 
 			slot_getsomeattrs(slot, i + 1);
 
-			/* This is a non-null varlena datum, but we only care if it's out-of-line */
+			/*
+			 * This is a non-null varlena datum, but we only care if it's
+			 * out-of-line
+			 */
 			varlen = (varlena *) DatumGetPointer(slot->tts_values[i]);
 			if (!VARATT_IS_EXTERNAL(varlen))
 				continue;
@@ -244,8 +247,8 @@ repack_store_change(LogicalDecodingContext *ctx, Relation relation,
 				 * attributes (those actually should never appear on disk), so
 				 * only TOASTed attribute can be seen here.
 				 *
-				 * FIXME in what circumstances can an ONDISK attr appear?
-				 * Why aren't these written separately?
+				 * FIXME in what circumstances can an ONDISK attr appear? Why
+				 * aren't these written separately?
 				 */
 				Assert(VARATT_IS_EXTERNAL_ONDISK(varlen));
 			}
diff --git a/src/include/commands/cluster.h b/src/include/commands/cluster.h
index 1c0ac3ab4f5..1528d34fa42 100644
--- a/src/include/commands/cluster.h
+++ b/src/include/commands/cluster.h
@@ -13,17 +13,12 @@
 #ifndef CLUSTER_H
 #define CLUSTER_H
 
-#include "nodes/execnodes.h"
+#include <signal.h>
+
 #include "nodes/parsenodes.h"
 #include "parser/parse_node.h"
-#include "replication/decode.h"
-#include "postmaster/bgworker.h"
-#include "replication/logical.h"
-#include "storage/buffile.h"
 #include "storage/lockdefs.h"
-#include "storage/shm_mq.h"
 #include "utils/relcache.h"
-#include "utils/resowner.h"
 
 
 /* flag bits for ClusterParams->options */
@@ -34,55 +29,14 @@
 #define CLUOPT_ANALYZE 0x08		/* do an ANALYZE */
 #define CLUOPT_CONCURRENT 0x10	/* allow concurrent data changes */
 
-
 /* options for CLUSTER */
 typedef struct ClusterParams
 {
 	bits32		options;		/* bitmask of CLUOPT_* */
 } ClusterParams;
 
-
-/*
- * The following definitions are used by REPACK CONCURRENTLY.
- */
-
-/*
- * Stored as a single byte in the output file.
- */
-#define		CHANGE_INSERT		'i'
-#define		CHANGE_UPDATE_OLD	'u'
-#define		CHANGE_UPDATE_NEW	'U'
-#define		CHANGE_DELETE		'd'
-typedef char ConcurrentChangeKind;
-
-/*
- * Logical decoding state.
- *
- * The output plugin uses it to store the data changes that it decodes from
- * WAL while the table contents is being copied to a new storage.
- */
-typedef struct RepackDecodingState
-{
-#ifdef	USE_ASSERT_CHECKING
-	/* The relation whose changes we're decoding. */
-	Oid			relid;
-#endif
-
-	/* Per-change memory context. */
-	MemoryContext	change_cxt;
-
-	/* A tuple slot used to pass tuples back and forth */
-	TupleTableSlot	*slot;
-
-	/* The current output file. */
-	BufFile    *file;
-} RepackDecodingState;
-
 extern PGDLLIMPORT volatile sig_atomic_t RepackMessagePending;
 
-extern bool IsRepackWorker(void);
-extern void HandleRepackMessageInterrupt(void);
-extern void ProcessRepackMessages(void);
 
 extern void ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel);
 
@@ -104,8 +58,9 @@ extern void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap,
 							 MultiXactId cutoffMulti,
 							 char newrelpersistence);
 
-extern bool am_decoding_for_repack(void);
-extern bool change_useless_for_repack(XLogRecordBuffer *buf);
+extern bool IsRepackWorker(void);
+extern void HandleRepackMessageInterrupt(void);
+extern void ProcessRepackMessages(void);
+
 
-extern void RepackWorkerMain(Datum main_arg);
 #endif							/* CLUSTER_H */
diff --git a/src/include/commands/repack_internal.h b/src/include/commands/repack_internal.h
new file mode 100644
index 00000000000..f90c973f5a0
--- /dev/null
+++ b/src/include/commands/repack_internal.h
@@ -0,0 +1,59 @@
+/*-------------------------------------------------------------------------
+ *
+ * repack_internal.h
+ *	  header for REPACK internals
+ *
+ * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994-5, Regents of the University of California
+ *
+ * src/include/commands/repack_internal.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef REPACK_INTERNAL_H
+#define REPACK_INTERNAL_H
+
+#include "nodes/execnodes.h"
+#include "replication/decode.h"
+#include "postmaster/bgworker.h"
+#include "replication/logical.h"
+#include "storage/buffile.h"
+#include "storage/shm_mq.h"
+#include "utils/resowner.h"
+
+/*
+ * Stored as a single byte in the output file.
+ */
+#define		CHANGE_INSERT		'i'
+#define		CHANGE_UPDATE_OLD	'u'
+#define		CHANGE_UPDATE_NEW	'U'
+#define		CHANGE_DELETE		'd'
+typedef char ConcurrentChangeKind;
+
+/*
+ * Logical decoding state.
+ *
+ * The output plugin uses it to store the data changes that it decodes from
+ * WAL while the table contents is being copied to a new storage.
+ */
+typedef struct RepackDecodingState
+{
+#ifdef	USE_ASSERT_CHECKING
+	/* The relation whose changes we're decoding. */
+	Oid			relid;
+#endif
+
+	/* Per-change memory context. */
+	MemoryContext change_cxt;
+
+	/* A tuple slot used to pass tuples back and forth */
+	TupleTableSlot *slot;
+
+	/* The current output file. */
+	BufFile    *file;
+} RepackDecodingState;
+
+
+extern void RepackWorkerMain(Datum main_arg);
+
+#endif							/* REPACK_INTERNAL_H */
diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h
index 49f00fc48b8..02b5393474c 100644
--- a/src/include/replication/decode.h
+++ b/src/include/replication/decode.h
@@ -31,4 +31,8 @@ extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf
 extern void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
 										 XLogReaderState *record);
 
+/* in commands/cluster.c */
+extern bool change_useless_for_repack(XLogRecordBuffer *buf);
+
+
 #endif
-- 
2.47.3

