From 327fb689e01d1be5e4ee3e4564d9e659a97f5e18 Mon Sep 17 00:00:00 2001
From: Antonin Houska <ah@cybertec.at>
Date: Mon, 30 Jun 2025 19:41:43 +0200
Subject: [PATCH 6/7] Introduce repack_max_xlock_time configuration variable.

When executing REPACK CONCURRENTLY, we need the AccessExclusiveLock to swap
the relation files and that should require pretty short time. However, on a
busy system, other backends might change non-negligible amount of data in the
table while we are waiting for the lock. Since these changes must be applied
to the new storage before the swap, the time we eventually hold the lock might
become non-negligible too.

If the user is worried about this situation, he can set repack_max_xlock_time
to the maximum time for which the exclusive lock may be held. If this amount
of time is not sufficient to complete the REPACK CONCURRENTLY command, ERROR
is raised and the command is canceled.
---
 doc/src/sgml/config.sgml                      |  31 ++++
 doc/src/sgml/ref/repack.sgml                  |   5 +-
 src/backend/access/heap/heapam_handler.c      |   3 +-
 src/backend/commands/cluster.c                | 135 +++++++++++++++---
 src/backend/utils/misc/guc_tables.c           |  15 +-
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/commands/cluster.h                |   5 +-
 .../injection_points/expected/repack.out      |  74 +++++++++-
 .../injection_points/specs/repack.spec        |  42 ++++++
 9 files changed, 290 insertions(+), 21 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 59a0874528a..c0529005c78 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -11239,6 +11239,37 @@ dynamic_library_path = '/usr/local/lib/postgresql:$libdir'
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-repack-max-xclock-time" xreflabel="repack_max_xlock_time">
+      <term><varname>repack_max_xlock_time</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>repack_max_xlock_time</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        This is the maximum amount of time to hold an exclusive lock on a
+        table by <command>REPACK</command> with
+        the <literal>CONCURRENTLY</literal> option. Typically, these commands
+        should not need the lock for longer time
+        than <command>TRUNCATE</command> does. However, additional time might
+        be needed if the system is too busy. (See <xref linkend="sql-repack"/>
+        for explanation how the <literal>CONCURRENTLY</literal> option works.)
+       </para>
+
+       <para>
+        If you want to restrict the lock time, set this variable to the
+        highest acceptable value. If it appears during the processing that
+        additional time is needed to release the lock, the command will be
+        cancelled.
+       </para>
+
+       <para>
+        The default value is 0, which means that the lock is not released
+        until the concurrent data changes are processed.
+       </para>
+      </listitem>
+     </varlistentry>
+
      </variablelist>
    </sect1>
 
diff --git a/doc/src/sgml/ref/repack.sgml b/doc/src/sgml/ref/repack.sgml
index 9c089a6b3d7..e1313f40599 100644
--- a/doc/src/sgml/ref/repack.sgml
+++ b/doc/src/sgml/ref/repack.sgml
@@ -192,7 +192,10 @@ REPACK [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ] CONCU
       too many data changes have been done to the table while
       <command>REPACK</command> was waiting for the lock: those changes must
       be processed just before the files are swapped, while the
-      <literal>ACCESS EXCLUSIVE</literal> lock is being held.
+      <literal>ACCESS EXCLUSIVE</literal> lock is being held. If you are
+      worried about this situation, set
+      the <link linkend="guc-repack-max-xclock-time"><varname>repack_max_xlock_time</varname></link>
+      configuration parameter to a value that your applications can tolerate.
      </para>
 
      <para>
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index c829c06f769..03e722347a1 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -986,7 +986,8 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
 			end_of_wal = GetFlushRecPtr(NULL);
 			if ((end_of_wal - end_of_wal_prev) > wal_segment_size)
 			{
-				repack_decode_concurrent_changes(decoding_ctx, end_of_wal);
+				repack_decode_concurrent_changes(decoding_ctx, end_of_wal,
+												 NULL);
 				end_of_wal_prev = end_of_wal;
 			}
 		}
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index abbbfc99036..37f69f369eb 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -17,6 +17,8 @@
  */
 #include "postgres.h"
 
+#include <sys/time.h>
+
 #include "access/amapi.h"
 #include "access/heapam.h"
 #include "access/multixact.h"
@@ -89,6 +91,15 @@ typedef struct
 RelFileLocator repacked_rel_locator = {.relNumber = InvalidOid};
 RelFileLocator repacked_rel_toast_locator = {.relNumber = InvalidOid};
 
+/*
+ * The maximum time to hold AccessExclusiveLock during the final
+ * processing. Note that only the execution time of
+ * process_concurrent_changes() is included here. The very last steps like
+ * swap_relation_files() shouldn't get blocked and it'd be wrong to consider
+ * them a reason to abort otherwise completed processing.
+ */
+int			repack_max_xlock_time = 0;
+
 /*
  * Everything we need to call ExecInsertIndexTuples().
  */
@@ -132,7 +143,8 @@ static LogicalDecodingContext *setup_logical_decoding(Oid relid,
 static HeapTuple get_changed_tuple(char *change);
 static void apply_concurrent_changes(RepackDecodingState *dstate,
 									 Relation rel, ScanKey key, int nkeys,
-									 IndexInsertState *iistate);
+									 IndexInsertState *iistate,
+									 struct timeval *must_complete);
 static void apply_concurrent_insert(Relation rel, ConcurrentChange *change,
 									HeapTuple tup, IndexInsertState *iistate,
 									TupleTableSlot *index_slot);
@@ -148,13 +160,15 @@ static HeapTuple find_target_tuple(Relation rel, ScanKey key, int nkeys,
 								   IndexInsertState *iistate,
 								   TupleTableSlot *ident_slot,
 								   IndexScanDesc *scan_p);
-static void process_concurrent_changes(LogicalDecodingContext *ctx,
+static bool process_concurrent_changes(LogicalDecodingContext *ctx,
 									   XLogRecPtr end_of_wal,
 									   Relation rel_dst,
 									   Relation rel_src,
 									   ScanKey ident_key,
 									   int ident_key_nentries,
-									   IndexInsertState *iistate);
+									   IndexInsertState *iistate,
+									   struct timeval *must_complete);
+static bool processing_time_elapsed(struct timeval *must_complete);
 static IndexInsertState *get_index_insert_state(Relation relation,
 												Oid ident_index_id);
 static ScanKey build_identity_key(Oid ident_idx_oid, Relation rel_src,
@@ -2352,7 +2366,8 @@ get_changed_tuple(char *change)
  */
 void
 repack_decode_concurrent_changes(LogicalDecodingContext *ctx,
-								 XLogRecPtr end_of_wal)
+								 XLogRecPtr end_of_wal,
+								 struct timeval *must_complete)
 {
 	RepackDecodingState *dstate;
 	ResourceOwner resowner_old;
@@ -2382,6 +2397,9 @@ repack_decode_concurrent_changes(LogicalDecodingContext *ctx,
 			if (record != NULL)
 				LogicalDecodingProcessRecord(ctx, ctx->reader);
 
+			if (processing_time_elapsed(must_complete))
+				break;
+
 			/*
 			 * If WAL segment boundary has been crossed, inform the decoding
 			 * system that the catalog_xmin can advance. (We can confirm more
@@ -2422,7 +2440,8 @@ repack_decode_concurrent_changes(LogicalDecodingContext *ctx,
  */
 static void
 apply_concurrent_changes(RepackDecodingState *dstate, Relation rel,
-						 ScanKey key, int nkeys, IndexInsertState *iistate)
+						 ScanKey key, int nkeys, IndexInsertState *iistate,
+						 struct timeval *must_complete)
 {
 	TupleTableSlot *index_slot,
 			   *ident_slot;
@@ -2452,6 +2471,9 @@ apply_concurrent_changes(RepackDecodingState *dstate, Relation rel,
 
 		CHECK_FOR_INTERRUPTS();
 
+		Assert(dstate->nchanges > 0);
+		dstate->nchanges--;
+
 		/* Get the change from the single-column tuple. */
 		tup_change = ExecFetchSlotHeapTuple(dstate->tsslot, false, &shouldFree);
 		heap_deform_tuple(tup_change, dstate->tupdesc_change, values, isnull);
@@ -2552,10 +2574,22 @@ apply_concurrent_changes(RepackDecodingState *dstate, Relation rel,
 		/* TTSOpsMinimalTuple has .get_heap_tuple==NULL. */
 		Assert(shouldFree);
 		pfree(tup_change);
+
+		/*
+		 * If there is a limit on the time of completion, check it now.
+		 * However, make sure the loop does not break if tup_old was set in
+		 * the previous iteration. In such a case we could not resume the
+		 * processing in the next call.
+		 */
+		if (must_complete && tup_old == NULL &&
+			processing_time_elapsed(must_complete))
+			/* The next call will process the remaining changes. */
+			break;
 	}
 
-	tuplestore_clear(dstate->tstore);
-	dstate->nchanges = 0;
+	/* If we could not apply all the changes, the next call will do. */
+	if (dstate->nchanges == 0)
+		tuplestore_clear(dstate->tstore);
 
 	/* Cleanup. */
 	ExecDropSingleTupleTableSlot(index_slot);
@@ -2737,11 +2771,15 @@ find_target_tuple(Relation rel, ScanKey key, int nkeys, HeapTuple tup_key,
  * Decode and apply concurrent changes.
  *
  * Pass rel_src iff its reltoastrelid is needed.
+ *
+ * Returns true if must_complete is NULL or if managed to complete by the time
+ * *must_complete indicates.
  */
-static void
+static bool
 process_concurrent_changes(LogicalDecodingContext *ctx, XLogRecPtr end_of_wal,
 						   Relation rel_dst, Relation rel_src, ScanKey ident_key,
-						   int ident_key_nentries, IndexInsertState *iistate)
+						   int ident_key_nentries, IndexInsertState *iistate,
+						   struct timeval *must_complete)
 {
 	RepackDecodingState *dstate;
 
@@ -2750,10 +2788,19 @@ process_concurrent_changes(LogicalDecodingContext *ctx, XLogRecPtr end_of_wal,
 
 	dstate = (RepackDecodingState *) ctx->output_writer_private;
 
-	repack_decode_concurrent_changes(ctx, end_of_wal);
+	repack_decode_concurrent_changes(ctx, end_of_wal, must_complete);
 
+	if (processing_time_elapsed(must_complete))
+		/* Caller is responsible for applying the changes. */
+		return false;
+
+	/*
+	 * *must_complete not reached, so there are really no changes. (It's
+	 * possible to see no changes just because not enough time was left for
+	 * the decoding.)
+	 */
 	if (dstate->nchanges == 0)
-		return;
+		return true;
 
 	PG_TRY();
 	{
@@ -2765,7 +2812,7 @@ process_concurrent_changes(LogicalDecodingContext *ctx, XLogRecPtr end_of_wal,
 			rel_dst->rd_toastoid = rel_src->rd_rel->reltoastrelid;
 
 		apply_concurrent_changes(dstate, rel_dst, ident_key,
-								 ident_key_nentries, iistate);
+								 ident_key_nentries, iistate, must_complete);
 	}
 	PG_FINALLY();
 	{
@@ -2773,6 +2820,28 @@ process_concurrent_changes(LogicalDecodingContext *ctx, XLogRecPtr end_of_wal,
 			rel_dst->rd_toastoid = InvalidOid;
 	}
 	PG_END_TRY();
+
+	/*
+	 * apply_concurrent_changes() does check the processing time, so if some
+	 * changes are left, we ran out of time.
+	 */
+	return dstate->nchanges == 0;
+}
+
+/*
+ * Check if the current time is beyond *must_complete.
+ */
+static bool
+processing_time_elapsed(struct timeval *must_complete)
+{
+	struct timeval now;
+
+	if (must_complete == NULL)
+		return false;
+
+	gettimeofday(&now, NULL);
+
+	return timercmp(&now, must_complete, >);
 }
 
 static IndexInsertState *
@@ -2934,6 +3003,8 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
 	Relation   *ind_refs,
 			   *ind_refs_p;
 	int			nind;
+	struct timeval t_end;
+	struct timeval *t_end_ptr = NULL;
 
 	/* Like in cluster_rel(). */
 	lockmode_old = ShareUpdateExclusiveLock;
@@ -3029,7 +3100,8 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
 	 */
 	process_concurrent_changes(ctx, end_of_wal, NewHeap,
 							   swap_toast_by_content ? OldHeap : NULL,
-							   ident_key, ident_key_nentries, iistate);
+							   ident_key, ident_key_nentries, iistate,
+							   NULL);
 
 	/*
 	 * Acquire AccessExclusiveLock on the table, its TOAST relation (if there
@@ -3125,9 +3197,40 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
 	end_of_wal = GetFlushRecPtr(NULL);
 
 	/* Apply the concurrent changes again. */
-	process_concurrent_changes(ctx, end_of_wal, NewHeap,
-							   swap_toast_by_content ? OldHeap : NULL,
-							   ident_key, ident_key_nentries, iistate);
+
+	/*
+	 * This time we have the exclusive lock on the table, so make sure that
+	 * repack_max_xlock_time is not exceeded.
+	 */
+	if (repack_max_xlock_time > 0)
+	{
+		int64		usec;
+		struct timeval t_start;
+
+		gettimeofday(&t_start, NULL);
+		/* Add the whole seconds. */
+		t_end.tv_sec = t_start.tv_sec + repack_max_xlock_time / 1000;
+		/* Add the rest, expressed in microseconds. */
+		usec = t_start.tv_usec + 1000 * (repack_max_xlock_time % 1000);
+		/* The number of microseconds could have overflown. */
+		t_end.tv_sec += usec / USECS_PER_SEC;
+		t_end.tv_usec = usec % USECS_PER_SEC;
+		t_end_ptr = &t_end;
+	}
+
+	/*
+	 * During testing, stop here to simulate excessive processing time.
+	 */
+	INJECTION_POINT("repack-concurrently-after-lock", NULL);
+
+	if (!process_concurrent_changes(ctx, end_of_wal, NewHeap,
+									swap_toast_by_content ? OldHeap : NULL,
+									ident_key, ident_key_nentries, iistate,
+									t_end_ptr))
+		ereport(ERROR,
+				(errmsg("could not process concurrent data changes in time"),
+				 errhint("Please consider adjusting \"repack_max_xlock_time\".")));
+
 
 	/* Remember info about rel before closing OldHeap */
 	relpersistence = OldHeap->rd_rel->relpersistence;
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 511dc32d519..6a373a8b65a 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -42,8 +42,9 @@
 #include "catalog/namespace.h"
 #include "catalog/storage.h"
 #include "commands/async.h"
-#include "commands/extension.h"
+#include "commands/cluster.h"
 #include "commands/event_trigger.h"
+#include "commands/extension.h"
 #include "commands/tablespace.h"
 #include "commands/trigger.h"
 #include "commands/user.h"
@@ -2839,6 +2840,18 @@ struct config_int ConfigureNamesInt[] =
 		1600000000, 0, 2100000000,
 		NULL, NULL, NULL
 	},
+	{
+		{"repack_max_xlock_time", PGC_USERSET, LOCK_MANAGEMENT,
+			gettext_noop("Maximum time for REPACK CONCURRENTLY to keep table locked."),
+			gettext_noop("The table is locked in exclusive mode during the final stage of processing. "
+						 "If the lock time exceeds this value, error is raised and the lock is "
+						 "released. Set to zero if you don't care how long the lock can be held."),
+			GUC_UNIT_MS
+		},
+		&repack_max_xlock_time,
+		0, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
 
 	/*
 	 * See also CheckRequiredParameterValues() if this parameter changes
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 341f88adc87..42d32a2c198 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -765,6 +765,7 @@ autovacuum_worker_slots = 16	# autovacuum worker slots to allocate
 #lock_timeout = 0				# in milliseconds, 0 is disabled
 #idle_in_transaction_session_timeout = 0	# in milliseconds, 0 is disabled
 #idle_session_timeout = 0			# in milliseconds, 0 is disabled
+#repack_max_xlock_time = 0
 #bytea_output = 'hex'			# hex, escape
 #xmlbinary = 'base64'
 #xmloption = 'content'
diff --git a/src/include/commands/cluster.h b/src/include/commands/cluster.h
index 0a7e72bc74a..4914f217267 100644
--- a/src/include/commands/cluster.h
+++ b/src/include/commands/cluster.h
@@ -59,6 +59,8 @@ typedef enum ClusterCommand
 extern RelFileLocator repacked_rel_locator;
 extern RelFileLocator repacked_rel_toast_locator;
 
+extern PGDLLIMPORT int repack_max_xlock_time;
+
 typedef enum
 {
 	CHANGE_INSERT,
@@ -134,7 +136,8 @@ extern void check_index_is_clusterable(Relation OldHeap, Oid indexOid,
 									   LOCKMODE lockmode);
 extern void mark_index_clustered(Relation rel, Oid indexOid, bool is_internal);
 extern void repack_decode_concurrent_changes(LogicalDecodingContext *ctx,
-											 XLogRecPtr end_of_wal);
+											 XLogRecPtr end_of_wal,
+											 struct timeval *must_complete);
 extern Oid	make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod,
 						  char relpersistence, LOCKMODE lockmode);
 extern void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap,
diff --git a/src/test/modules/injection_points/expected/repack.out b/src/test/modules/injection_points/expected/repack.out
index f919087ca5b..02967ed9d48 100644
--- a/src/test/modules/injection_points/expected/repack.out
+++ b/src/test/modules/injection_points/expected/repack.out
@@ -1,4 +1,4 @@
-Parsed test spec with 2 sessions
+Parsed test spec with 4 sessions
 
 starting permutation: wait_before_lock change_existing change_new change_subxact1 change_subxact2 check2 wakeup_before_lock check1
 injection_points_attach
@@ -111,3 +111,75 @@ injection_points_detach
                        
 (1 row)
 
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+
+starting permutation: wait_after_lock wakeup_after_lock
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step wait_after_lock: 
+	REPACK CONCURRENTLY repack_test USING INDEX repack_test_pkey;
+ <waiting ...>
+step wakeup_after_lock: 
+	SELECT injection_points_wakeup('repack-concurrently-after-lock');
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step wait_after_lock: <... completed>
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+
+starting permutation: wait_after_lock after_lock_delay wakeup_after_lock
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step wait_after_lock: 
+	REPACK CONCURRENTLY repack_test USING INDEX repack_test_pkey;
+ <waiting ...>
+step after_lock_delay: 
+    SELECT pg_sleep(1.5);
+
+pg_sleep
+--------
+        
+(1 row)
+
+step wakeup_after_lock: 
+	SELECT injection_points_wakeup('repack-concurrently-after-lock');
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step wait_after_lock: <... completed>
+ERROR:  could not process concurrent data changes in time
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
diff --git a/src/test/modules/injection_points/specs/repack.spec b/src/test/modules/injection_points/specs/repack.spec
index a17064462ce..d0fa38dd8cd 100644
--- a/src/test/modules/injection_points/specs/repack.spec
+++ b/src/test/modules/injection_points/specs/repack.spec
@@ -130,6 +130,34 @@ step wakeup_before_lock
 	SELECT injection_points_wakeup('repack-concurrently-before-lock');
 }
 
+session s3
+setup
+{
+	SELECT injection_points_set_local();
+	SELECT injection_points_attach('repack-concurrently-after-lock', 'wait');
+	SET repack_max_xlock_time TO '1s';
+}
+# Perform the initial load, lock the table in exclusive mode and wait. s4 will
+# cancel the waiting.
+step wait_after_lock
+{
+	REPACK CONCURRENTLY repack_test USING INDEX repack_test_pkey;
+}
+teardown
+{
+    SELECT injection_points_detach('repack-concurrently-after-lock');
+}
+
+session s4
+step wakeup_after_lock
+{
+	SELECT injection_points_wakeup('repack-concurrently-after-lock');
+}
+step after_lock_delay
+{
+    SELECT pg_sleep(1.5);
+}
+
 # Test if data changes introduced while one session is performing REPACK
 # CONCURRENTLY find their way into the table.
 permutation
@@ -141,3 +169,17 @@ permutation
 	check2
 	wakeup_before_lock
 	check1
+
+# Test the repack_max_xlock_time configuration variable.
+#
+# First, cancel waiting on the injection point immediately. That way, REPACK
+# should complete.
+permutation
+	wait_after_lock
+	wakeup_after_lock
+# Second, cancel the waiting with a delay that violates
+# repack_max_xlock_time.
+permutation
+	wait_after_lock
+	after_lock_delay
+	wakeup_after_lock
-- 
2.47.1

