From 9d47f591937c483d5713b0cbad6d630a3c812c36 Mon Sep 17 00:00:00 2001
From: Antonin Houska <ah@cybertec.at>
Date: Tue, 16 Jun 2026 13:54:16 +0200
Subject: [PATCH 3/8] Introduce RepackDest structure.

This is for ChangeContext to handle insertions into two relations: besides the
new relation (whose file will eventually be used by the REPACKed relation), an
"auxiliary relation" is needed sometimes, in order to get the data sorted. The
concept is introduced and explained later in the patch series.
---
 src/backend/commands/repack.c    | 221 ++++++++++++++++---------------
 src/include/commands/repack.h    |  48 +++++++
 src/tools/pgindent/typedefs.list |   1 +
 3 files changed, 160 insertions(+), 110 deletions(-)

diff --git a/src/backend/commands/repack.c b/src/backend/commands/repack.c
index fa98b8b9247..04d39ad8c86 100644
--- a/src/backend/commands/repack.c
+++ b/src/backend/commands/repack.c
@@ -93,40 +93,6 @@ typedef struct
 	Oid			indexOid;
 } RelToCluster;
 
-/*
- * The first file exported by the decoding worker must contain a snapshot, the
- * following ones contain the data changes.
- */
-#define WORKER_FILE_SNAPSHOT	0
-
-/*
- * Information needed to apply concurrent data changes.
- */
-typedef struct ChangeContext
-{
-	/* The relation the changes are applied to. */
-	Relation	cc_rel;
-
-	/* Needed to update indexes of cc_rel. */
-	ResultRelInfo *cc_rri;
-	EState	   *cc_estate;
-
-	/*
-	 * Existing tuples to UPDATE and DELETE are located via this index. We
-	 * keep the scankey in partially initialized state to avoid repeated work.
-	 * sk_argument is completed on the fly.
-	 */
-	Relation	cc_ident_index;
-	ScanKey		cc_ident_key;
-	int			cc_ident_key_nentries;
-
-	/* The latest column we need to deform to have the tuple identity */
-	AttrNumber	cc_last_key_attno;
-
-	/* Sequential number of the file containing the changes. */
-	int			cc_file_seq;
-} ChangeContext;
-
 /*
  * Backend-local information to control the decoding worker.
  */
@@ -173,22 +139,19 @@ static List *get_tables_to_repack_partitioned(RepackCommand cmd,
 static bool repack_is_permitted_for_relation(RepackCommand cmd,
 											 Oid relid, Oid userid);
 
-static void apply_concurrent_changes(BufFile *file, ChangeContext *chgcxt);
-static void apply_concurrent_insert(Relation rel, TupleTableSlot *slot,
-									ChangeContext *chgcxt);
-static void apply_concurrent_update(Relation rel, TupleTableSlot *spilled_tuple,
-									TupleTableSlot *ondisk_tuple,
-									ChangeContext *chgcxt);
+static void apply_concurrent_changes(ChangeContext *chgcxt);
+static void apply_concurrent_insert(RepackDest *dest, TupleTableSlot *slot);
+static void apply_concurrent_update(RepackDest *dest,
+									TupleTableSlot *spilled_tuple,
+									TupleTableSlot *ondisk_tuple);
 static void apply_concurrent_delete(Relation rel, TupleTableSlot *slot);
 static void restore_tuple(BufFile *file, Relation relation,
 						  TupleTableSlot *slot);
 static void adjust_toast_pointers(Relation relation, TupleTableSlot *dest,
 								  TupleTableSlot *src);
-static bool find_target_tuple(Relation rel, ChangeContext *chgcxt,
-							  TupleTableSlot *locator,
+static bool find_target_tuple(RepackDest *dest, TupleTableSlot *locator,
 							  TupleTableSlot *retrieved);
-static bool identity_key_equal(ChangeContext *chgcxt,
-							   TupleTableSlot *locator,
+static bool identity_key_equal(RepackDest *dest, TupleTableSlot *locator,
 							   TupleTableSlot *candidate);
 static void process_concurrent_changes(XLogRecPtr end_of_wal,
 									   ChangeContext *chgcxt,
@@ -197,6 +160,9 @@ static void initialize_change_context(ChangeContext *chgcxt,
 									  Relation relation,
 									  Oid ident_index_id);
 static void release_change_context(ChangeContext *chgcxt);
+static void initialize_change_dest(RepackDest *dest, Relation relation,
+								   Oid ident_index_id);
+static void release_change_dest(RepackDest *dest);
 static void rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
 											   Oid identIdx,
 											   TransactionId frozenXid,
@@ -2613,18 +2579,31 @@ RepackCommandAsString(RepackCommand cmd)
 }
 
 /*
- * Apply all the changes stored in 'file'.
+ * Apply all the changes provided by decoding worker.
  */
 static void
-apply_concurrent_changes(BufFile *file, ChangeContext *chgcxt)
+apply_concurrent_changes(ChangeContext *chgcxt)
 {
 	ConcurrentChangeKind kind = '\0';
-	Relation	rel = chgcxt->cc_rel;
+	RepackDest *dest;
+	Relation	rel;
 	TupleTableSlot *spilled_tuple;
 	TupleTableSlot *old_update_tuple;
 	TupleTableSlot *ondisk_tuple;
 	bool		have_old_tuple = false;
 	MemoryContext oldcxt;
+	DecodingWorkerShared *shared;
+	char		fname[MAXPGPATH];
+	BufFile    *file;
+
+	dest = &chgcxt->cc_dest;
+	rel = dest->rel;
+
+	shared = (DecodingWorkerShared *) dsm_segment_address(decoding_worker->seg);
+
+	/* Open the file containing the changes. */
+	DecodingWorkerFileName(fname, shared->relid, chgcxt->cc_file_seq);
+	file = BufFileOpenFileSet(&shared->sfs.fs, fname, O_RDONLY, false);
 
 	spilled_tuple = MakeSingleTupleTableSlot(RelationGetDescr(rel),
 											 &TTSOpsVirtual);
@@ -2633,7 +2612,7 @@ apply_concurrent_changes(BufFile *file, ChangeContext *chgcxt)
 	old_update_tuple = MakeSingleTupleTableSlot(RelationGetDescr(rel),
 												&TTSOpsVirtual);
 
-	oldcxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(chgcxt->cc_estate));
+	oldcxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(dest->estate));
 
 	while (true)
 	{
@@ -2682,14 +2661,14 @@ apply_concurrent_changes(BufFile *file, ChangeContext *chgcxt)
 
 		if (kind == CHANGE_INSERT)
 		{
-			apply_concurrent_insert(rel, spilled_tuple, chgcxt);
+			apply_concurrent_insert(dest, spilled_tuple);
 		}
 		else if (kind == CHANGE_DELETE)
 		{
 			bool		found;
 
 			/* Find the tuple to be deleted */
-			found = find_target_tuple(rel, chgcxt, spilled_tuple, ondisk_tuple);
+			found = find_target_tuple(dest, spilled_tuple, ondisk_tuple);
 			if (!found)
 				elog(ERROR, "could not find target tuple");
 			apply_concurrent_delete(rel, ondisk_tuple);
@@ -2705,7 +2684,7 @@ apply_concurrent_changes(BufFile *file, ChangeContext *chgcxt)
 				key = spilled_tuple;
 
 			/* Find the tuple to be updated or deleted. */
-			found = find_target_tuple(rel, chgcxt, key, ondisk_tuple);
+			found = find_target_tuple(dest, key, ondisk_tuple);
 			if (!found)
 				elog(ERROR, "could not find target tuple");
 
@@ -2718,7 +2697,7 @@ apply_concurrent_changes(BufFile *file, ChangeContext *chgcxt)
 			 */
 			adjust_toast_pointers(rel, spilled_tuple, ondisk_tuple);
 
-			apply_concurrent_update(rel, spilled_tuple, ondisk_tuple, chgcxt);
+			apply_concurrent_update(dest, spilled_tuple, ondisk_tuple);
 
 			ExecClearTuple(old_update_tuple);
 			have_old_tuple = false;
@@ -2726,7 +2705,7 @@ apply_concurrent_changes(BufFile *file, ChangeContext *chgcxt)
 		else
 			elog(ERROR, "unrecognized kind of change: %d", kind);
 
-		ResetPerTupleExprContext(chgcxt->cc_estate);
+		ResetPerTupleExprContext(dest->estate);
 	}
 
 	/* Cleanup. */
@@ -2735,6 +2714,8 @@ apply_concurrent_changes(BufFile *file, ChangeContext *chgcxt)
 	ExecDropSingleTupleTableSlot(old_update_tuple);
 
 	MemoryContextSwitchTo(oldcxt);
+
+	BufFileClose(file);
 }
 
 /*
@@ -2742,16 +2723,15 @@ apply_concurrent_changes(BufFile *file, ChangeContext *chgcxt)
  * table.
  */
 static void
-apply_concurrent_insert(Relation rel, TupleTableSlot *slot,
-						ChangeContext *chgcxt)
+apply_concurrent_insert(RepackDest *dest, TupleTableSlot *slot)
 {
 	/* Put the tuple in the table, but make sure it won't be decoded */
-	table_tuple_insert(rel, slot, GetCurrentCommandId(true),
+	table_tuple_insert(dest->rel, slot, GetCurrentCommandId(true),
 					   TABLE_INSERT_NO_LOGICAL, NULL);
 
 	/* Update indexes with this new tuple. */
-	ExecInsertIndexTuples(chgcxt->cc_rri,
-						  chgcxt->cc_estate,
+	ExecInsertIndexTuples(dest->rri,
+						  dest->estate,
 						  0,
 						  slot,
 						  NIL, NULL);
@@ -2763,10 +2743,10 @@ apply_concurrent_insert(Relation rel, TupleTableSlot *slot,
  * table.
  */
 static void
-apply_concurrent_update(Relation rel, TupleTableSlot *spilled_tuple,
-						TupleTableSlot *ondisk_tuple,
-						ChangeContext *chgcxt)
+apply_concurrent_update(RepackDest *dest, TupleTableSlot *spilled_tuple,
+						TupleTableSlot *ondisk_tuple)
 {
+	Relation	rel = dest->rel;
 	LockTupleMode lockmode;
 	TM_FailureData tmfd;
 	TU_UpdateIndexes update_indexes;
@@ -2794,8 +2774,8 @@ apply_concurrent_update(Relation rel, TupleTableSlot *spilled_tuple,
 
 		if (update_indexes == TU_Summarizing)
 			flags |= EIIT_ONLY_SUMMARIZING;
-		ExecInsertIndexTuples(chgcxt->cc_rri,
-							  chgcxt->cc_estate,
+		ExecInsertIndexTuples(dest->rri,
+							  dest->estate,
 							  flags,
 							  spilled_tuple,
 							  NIL, NULL);
@@ -2944,10 +2924,11 @@ adjust_toast_pointers(Relation relation, TupleTableSlot *dest, TupleTableSlot *s
  * not found, return false.
  */
 static bool
-find_target_tuple(Relation rel, ChangeContext *chgcxt, TupleTableSlot *locator,
+find_target_tuple(RepackDest *dest, TupleTableSlot *locator,
 				  TupleTableSlot *retrieved)
 {
-	Form_pg_index idx = chgcxt->cc_ident_index->rd_index;
+	Relation	rel = dest->rel;
+	Form_pg_index idx = dest->ident_index->rd_index;
 	IndexScanDesc scan;
 	bool		retval = false;
 
@@ -2958,9 +2939,9 @@ find_target_tuple(Relation rel, ChangeContext *chgcxt, TupleTableSlot *locator,
 	 *
 	 * Use the incoming tuple to finalize the scan key.
 	 */
-	for (int i = 0; i < chgcxt->cc_ident_key_nentries; i++)
+	for (int i = 0; i < dest->ident_key_nentries; i++)
 	{
-		ScanKey		entry = &chgcxt->cc_ident_key[i];
+		ScanKey		entry = &dest->ident_key[i];
 		AttrNumber	attno = idx->indkey.values[i];
 
 		entry->sk_argument = locator->tts_values[attno - 1];
@@ -2968,13 +2949,13 @@ find_target_tuple(Relation rel, ChangeContext *chgcxt, TupleTableSlot *locator,
 	}
 
 	/* XXX no instrumentation for now */
-	scan = index_beginscan(rel, chgcxt->cc_ident_index, GetActiveSnapshot(),
-						   NULL, chgcxt->cc_ident_key_nentries, 0, 0);
-	index_rescan(scan, chgcxt->cc_ident_key, chgcxt->cc_ident_key_nentries, NULL, 0);
+	scan = index_beginscan(rel, dest->ident_index, GetActiveSnapshot(),
+						   NULL, dest->ident_key_nentries, 0, 0);
+	index_rescan(scan, dest->ident_key, dest->ident_key_nentries, NULL, 0);
 	while (index_getnext_slot(scan, ForwardScanDirection, retrieved))
 	{
 		/* Be wary of temporal constraints */
-		if (scan->xs_recheck && !identity_key_equal(chgcxt, locator, retrieved))
+		if (scan->xs_recheck && !identity_key_equal(dest, locator, retrieved))
 		{
 			CHECK_FOR_INTERRUPTS();
 			continue;
@@ -2997,16 +2978,16 @@ find_target_tuple(Relation rel, ChangeContext *chgcxt, TupleTableSlot *locator,
  * used for temporal constraints.
  */
 static bool
-identity_key_equal(ChangeContext *chgcxt, TupleTableSlot *locator,
+identity_key_equal(RepackDest *dest, TupleTableSlot *locator,
 				   TupleTableSlot *candidate)
 {
-	slot_getsomeattrs(locator, chgcxt->cc_last_key_attno);
-	slot_getsomeattrs(candidate, chgcxt->cc_last_key_attno);
+	slot_getsomeattrs(locator, dest->last_key_attno);
+	slot_getsomeattrs(candidate, dest->last_key_attno);
 
-	for (int i = 0; i < chgcxt->cc_ident_key_nentries; i++)
+	for (int i = 0; i < dest->ident_key_nentries; i++)
 	{
-		ScanKey		entry = &chgcxt->cc_ident_key[i];
-		AttrNumber	attno = chgcxt->cc_ident_index->rd_index->indkey.values[i];
+		ScanKey		entry = &dest->ident_key[i];
+		AttrNumber	attno = dest->ident_index->rd_index->indkey.values[i];
 
 		Assert(attno > 0);
 
@@ -3077,7 +3058,7 @@ process_concurrent_changes(XLogRecPtr end_of_wal, ChangeContext *chgcxt, bool do
 	/* Open the file. */
 	DecodingWorkerFileName(fname, shared->relid, chgcxt->cc_file_seq);
 	file = BufFileOpenFileSet(&shared->sfs.fs, fname, O_RDONLY, false);
-	apply_concurrent_changes(file, chgcxt);
+	apply_concurrent_changes(chgcxt);
 
 	BufFileClose(file);
 
@@ -3093,41 +3074,65 @@ static void
 initialize_change_context(ChangeContext *chgcxt,
 						  Relation relation, Oid ident_index_id)
 {
-	chgcxt->cc_rel = relation;
+	initialize_change_dest(&chgcxt->cc_dest, relation, ident_index_id);
+
+	chgcxt->cc_file_seq = WORKER_FILE_SNAPSHOT + 1;
+}
+
+/*
+ * Free up resources taken by a ChangeContext.
+ */
+static void
+release_change_context(ChangeContext *chgcxt)
+{
+	release_change_dest(&chgcxt->cc_dest);
+}
+
+/*
+ * Initialize the RepackDest struct for the given relation, with the given
+ * index as identity index. InvalidOid can be specified to only make the
+ * relation ready for insertions.
+ */
+static void
+initialize_change_dest(RepackDest *dest, Relation relation,
+					   Oid ident_index_id)
+{
+	dest->rel = relation;
+	dest->bistate = GetBulkInsertState();
 
 	/* Only initialize fields needed by ExecInsertIndexTuples(). */
-	chgcxt->cc_estate = CreateExecutorState();
+	dest->estate = CreateExecutorState();
 
-	chgcxt->cc_rri = (ResultRelInfo *) palloc(sizeof(ResultRelInfo));
-	InitResultRelInfo(chgcxt->cc_rri, relation, 0, 0, 0);
-	ExecOpenIndices(chgcxt->cc_rri, false);
+	dest->rri = (ResultRelInfo *) palloc(sizeof(ResultRelInfo));
+	InitResultRelInfo(dest->rri, relation, 0, 0, 0);
+	ExecOpenIndices(dest->rri, false);
 
 	/*
 	 * The table's relcache entry already has the relcache entry for the
 	 * identity index; find that.
 	 */
-	chgcxt->cc_ident_index = NULL;
-	for (int i = 0; i < chgcxt->cc_rri->ri_NumIndices; i++)
+	dest->ident_index = NULL;
+	for (int i = 0; i < dest->rri->ri_NumIndices; i++)
 	{
 		Relation	ind_rel;
 
-		ind_rel = chgcxt->cc_rri->ri_IndexRelationDescs[i];
+		ind_rel = dest->rri->ri_IndexRelationDescs[i];
 		if (ind_rel->rd_id == ident_index_id)
 		{
-			chgcxt->cc_ident_index = ind_rel;
+			dest->ident_index = ind_rel;
 			break;
 		}
 	}
-	if (chgcxt->cc_ident_index == NULL)
+	if (dest->ident_index == NULL)
 		elog(ERROR, "could not find identity index");
 
 	/* Set up for scanning said identity index */
 	{
 		Form_pg_index indexForm;
 
-		indexForm = chgcxt->cc_ident_index->rd_index;
-		chgcxt->cc_ident_key_nentries = indexForm->indnkeyatts;
-		chgcxt->cc_ident_key = (ScanKey) palloc_array(ScanKeyData, indexForm->indnkeyatts);
+		indexForm = dest->ident_index->rd_index;
+		dest->ident_key_nentries = indexForm->indnkeyatts;
+		dest->ident_key = (ScanKey) palloc_array(ScanKeyData, indexForm->indnkeyatts);
 		for (int i = 0; i < indexForm->indnkeyatts; i++)
 		{
 			ScanKey		entry;
@@ -3137,12 +3142,12 @@ initialize_change_context(ChangeContext *chgcxt,
 						opcode;
 			StrategyNumber eq_strategy;
 
-			entry = &chgcxt->cc_ident_key[i];
+			entry = &dest->ident_key[i];
 
-			opfamily = chgcxt->cc_ident_index->rd_opfamily[i];
-			opcintype = chgcxt->cc_ident_index->rd_opcintype[i];
+			opfamily = dest->ident_index->rd_opfamily[i];
+			opcintype = dest->ident_index->rd_opcintype[i];
 			eq_strategy = IndexAmTranslateCompareType(COMPARE_EQ,
-													  chgcxt->cc_ident_index->rd_rel->relam,
+													  dest->ident_index->rd_rel->relam,
 													  opfamily, false);
 			if (eq_strategy == InvalidStrategy)
 				elog(ERROR, "could not find equality strategy for index operator family %u for type %u",
@@ -3161,34 +3166,30 @@ initialize_change_context(ChangeContext *chgcxt,
 						i + 1,
 						eq_strategy, opcode,
 						(Datum) 0);
-			entry->sk_collation = chgcxt->cc_ident_index->rd_indcollation[i];
+			entry->sk_collation = dest->ident_index->rd_indcollation[i];
 		}
 	}
 
 	/* Determine the last column we must deform to read the identity */
-	chgcxt->cc_last_key_attno = InvalidAttrNumber;
-	for (int i = 0; i < chgcxt->cc_ident_key_nentries; i++)
+	dest->last_key_attno = InvalidAttrNumber;
+	for (int i = 0; i < dest->ident_key_nentries; i++)
 	{
-		AttrNumber	attno = chgcxt->cc_ident_index->rd_index->indkey.values[i];
+		AttrNumber	attno = dest->ident_index->rd_index->indkey.values[i];
 
 		Assert(attno > 0);
-		chgcxt->cc_last_key_attno = Max(chgcxt->cc_last_key_attno, attno);
+		dest->last_key_attno = Max(dest->last_key_attno, attno);
 	}
-
-	chgcxt->cc_file_seq = WORKER_FILE_SNAPSHOT + 1;
 }
 
-/*
- * Free up resources taken by a ChangeContext.
- */
 static void
-release_change_context(ChangeContext *chgcxt)
+release_change_dest(RepackDest *dest)
 {
-	ExecCloseIndices(chgcxt->cc_rri);
-	FreeExecutorState(chgcxt->cc_estate);
+	FreeBulkInsertState(dest->bistate);
+	ExecCloseIndices(dest->rri);
+	FreeExecutorState(dest->estate);
 	/* XXX are these pfrees necessary? */
-	pfree(chgcxt->cc_rri);
-	pfree(chgcxt->cc_ident_key);
+	pfree(dest->rri);
+	pfree(dest->ident_key);
 }
 
 /*
diff --git a/src/include/commands/repack.h b/src/include/commands/repack.h
index 27105c10591..07f887e99f6 100644
--- a/src/include/commands/repack.h
+++ b/src/include/commands/repack.h
@@ -16,6 +16,7 @@
 #include <signal.h>
 
 #include "access/hio.h"
+#include "access/skey.h"
 #include "nodes/execnodes.h"
 #include "nodes/parsenodes.h"
 #include "parser/parse_node.h"
@@ -39,6 +40,53 @@ typedef struct ClusterParams
 
 extern PGDLLIMPORT volatile sig_atomic_t RepackMessagePending;
 
+/*
+ * Table to apply concurrent data changes to.
+ */
+typedef struct RepackDest
+{
+	/* The relation the changes are applied to. */
+	Relation	rel;
+
+	BulkInsertStateData *bistate;
+
+	/* Needed to update indexes of cc_rel. */
+	ResultRelInfo *rri;
+	EState	   *estate;
+
+	/*
+	 * Existing tuples to UPDATE and DELETE are located via this index. We
+	 * keep the scankey in partially initialized state to avoid repeated work.
+	 * sk_argument is completed on the fly.
+	 */
+	Relation	ident_index;
+	ScanKey		ident_key;
+
+	int			ident_key_nentries;
+
+	/* The latest column we need to deform to have the tuple identity */
+	AttrNumber	last_key_attno;
+} RepackDest;
+
+/*
+ * The first file exported by the decoding worker must contain a snapshot, the
+ * following ones contain the data changes.
+ */
+#define WORKER_FILE_SNAPSHOT	0
+
+/*
+ * Information needed to apply concurrent data changes.
+ *
+ * XXX Now that it's in *.h file, rename to RepackChangeContext?
+ */
+typedef struct ChangeContext
+{
+	/* The destination table. */
+	RepackDest	cc_dest;
+
+	/* Sequential number of the file containing the changes. */
+	int			cc_file_seq;
+} ChangeContext;
 
 extern void ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel);
 
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index f9eb23e52c9..b4d5abbaca7 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2660,6 +2660,7 @@ ReorderBufferTupleCidKey
 ReorderBufferUpdateProgressTxnCB
 ReorderTuple
 RepackCommand
+RepackDest
 RepackDecodingState
 RepackStmt
 ReparameterizeForeignPathByChild_function
-- 
2.52.0

