From 6c314f3e135938def75aa8b9efde86f782503d10 Mon Sep 17 00:00:00 2001
From: Antonin Houska <ah@cybertec.at>
Date: Mon, 13 Jan 2025 14:29:54 +0100
Subject: [PATCH 7/8] Introduce cluster_max_xlock_time configuration variable.

When executing VACUUM FULL / CLUSTER (CONCURRENTLY) we need the
AccessExclusiveLock to swap the relation files and that should require pretty
short time. However, on a busy system, other backends might change
non-negligible amount of data in the table while we are waiting for the
lock. Since these changes must be applied to the new storage before the swap,
the time we eventually hold the lock might become non-negligible too.

If the user is worried about this situation, he can set cluster_max_xlock_time
to the maximum time for which the exclusive lock may be held. If this amount
of time is not sufficient to complete the VACUUM FULL / CLUSTER (CONCURRENTLY)
command, ERROR is raised and the command is canceled.
---
 doc/src/sgml/config.sgml                      |  32 +++++
 doc/src/sgml/ref/cluster.sgml                 |   9 +-
 src/backend/access/heap/heapam_handler.c      |   3 +-
 src/backend/commands/cluster.c                | 133 +++++++++++++++---
 src/backend/utils/misc/guc_tables.c           |  14 ++
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/commands/cluster.h                |   5 +-
 .../injection_points/expected/cluster.out     |  74 +++++++++-
 .../injection_points/specs/cluster.spec       |  42 ++++++
 9 files changed, 293 insertions(+), 20 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 3f41a17b1f..695d1fe2a4 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -10701,6 +10701,38 @@ dynamic_library_path = 'C:\tools\postgresql;H:\my_project\lib;$libdir'
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-cluster-max-xclock-time" xreflabel="cluster_max_xlock_time">
+      <term><varname>cluster_max_xlock_time</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>cluster_max_xlock_time</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        This is the maximum amount of time to hold an exclusive lock on a
+        table by commands <command>CLUSTER</command> and <command>VACUUM
+        FULL</command> with the <literal>CONCURRENTLY</literal>
+        option. Typically, these commands should not need the lock for longer
+        time than <command>TRUNCATE</command> does. However, additional time
+        might be needed if the system is too busy. (See
+        <xref linkend="sql-cluster"/> for explanation how
+        the <literal>CONCURRENTLY</literal> option works.)
+       </para>
+
+       <para>
+        If you want to restrict the lock time, set this variable to the
+        highest acceptable value. If it appears during the processing that
+        additional time is needed to release the lock, the command will be
+        cancelled.
+       </para>
+
+       <para>
+        The default value is 0, which means that the lock is not released
+        until the concurrent data changes are processed.
+       </para>
+      </listitem>
+     </varlistentry>
+
      </variablelist>
    </sect1>
 
diff --git a/doc/src/sgml/ref/cluster.sgml b/doc/src/sgml/ref/cluster.sgml
index 356b40e3fe..ebb85d9d47 100644
--- a/doc/src/sgml/ref/cluster.sgml
+++ b/doc/src/sgml/ref/cluster.sgml
@@ -141,7 +141,14 @@ CLUSTER [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ] [ <r
       (<xref linkend="logicaldecoding"/>) and applied before
       the <literal>ACCESS EXCLUSIVE</literal> lock is requested. Thus the lock
       is typically held only for the time needed to swap the files, which
-      should be pretty short.
+      should be pretty short. However, the time might still be noticeable if
+      too many data changes have been done to the table while
+      <command>CLUSTER</command> was waiting for the lock: those changes must
+      be processed just before the files are swapped, while the
+      <literal>ACCESS EXCLUSIVE</literal> lock is being held. If you are
+      worried about this situation, set
+      the <link linkend="guc-cluster-max-xclock-time"><varname>cluster_max_xlock_time</varname></link>
+      configuration parameter to a value that your applications can tolerate.
      </para>
 
      <para>
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index c315abac02..13c67f0e5f 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -1004,7 +1004,8 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
 			end_of_wal = GetFlushRecPtr(NULL);
 			if ((end_of_wal - end_of_wal_prev) > wal_segment_size)
 			{
-				cluster_decode_concurrent_changes(decoding_ctx, end_of_wal);
+				cluster_decode_concurrent_changes(decoding_ctx, end_of_wal,
+												  NULL);
 				end_of_wal_prev = end_of_wal;
 			}
 		}
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index 8e73da73ee..dfae550123 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -17,6 +17,8 @@
  */
 #include "postgres.h"
 
+#include <sys/time.h>
+
 #include "access/amapi.h"
 #include "access/heapam.h"
 #include "access/multixact.h"
@@ -102,6 +104,15 @@ RelFileLocator	clustered_rel_toast_locator = {.relNumber = InvalidOid};
 #define CLUSTER_IN_PROGRESS_MESSAGE \
 	"relation \"%s\" is already being processed by CLUSTER CONCURRENTLY"
 
+/*
+ * The maximum time to hold AccessExclusiveLock during the final
+ * processing. Note that only the execution time of
+ * process_concurrent_changes() is included here. The very last steps like
+ * swap_relation_files() shouldn't get blocked and it'd be wrong to consider
+ * them a reason to abort otherwise completed processing.
+ */
+int			cluster_max_xlock_time = 0;
+
 /*
  * Everything we need to call ExecInsertIndexTuples().
  */
@@ -188,7 +199,8 @@ static LogicalDecodingContext *setup_logical_decoding(Oid relid,
 static HeapTuple get_changed_tuple(char *change);
 static void apply_concurrent_changes(ClusterDecodingState *dstate,
 									 Relation rel, ScanKey key, int nkeys,
-									 IndexInsertState *iistate);
+									 IndexInsertState *iistate,
+									 struct timeval *must_complete);
 static void apply_concurrent_insert(Relation rel, ConcurrentChange *change,
 									HeapTuple tup, IndexInsertState *iistate,
 									TupleTableSlot *index_slot);
@@ -205,13 +217,15 @@ static HeapTuple find_target_tuple(Relation rel, ScanKey key, int nkeys,
 								   IndexInsertState *iistate,
 								   TupleTableSlot *ident_slot,
 								   IndexScanDesc *scan_p);
-static void process_concurrent_changes(LogicalDecodingContext *ctx,
+static bool process_concurrent_changes(LogicalDecodingContext *ctx,
 									   XLogRecPtr end_of_wal,
 									   Relation rel_dst,
 									   Relation rel_src,
 									   ScanKey ident_key,
 									   int ident_key_nentries,
-									   IndexInsertState *iistate);
+									   IndexInsertState *iistate,
+									   struct timeval *must_complete);
+static bool processing_time_elapsed(struct timeval *must_complete);
 static IndexInsertState *get_index_insert_state(Relation relation,
 												Oid ident_index_id);
 static ScanKey build_identity_key(Oid ident_idx_oid, Relation rel_src,
@@ -3027,7 +3041,8 @@ get_changed_tuple(char *change)
  */
 void
 cluster_decode_concurrent_changes(LogicalDecodingContext *ctx,
-								  XLogRecPtr end_of_wal)
+								  XLogRecPtr end_of_wal,
+								  struct timeval *must_complete)
 {
 	ClusterDecodingState *dstate;
 	ResourceOwner resowner_old;
@@ -3065,6 +3080,9 @@ cluster_decode_concurrent_changes(LogicalDecodingContext *ctx,
 			if (record != NULL)
 				LogicalDecodingProcessRecord(ctx, ctx->reader);
 
+			if (processing_time_elapsed(must_complete))
+				break;
+
 			/*
 			 * If WAL segment boundary has been crossed, inform the decoding
 			 * system that the catalog_xmin can advance. (We can confirm more
@@ -3107,7 +3125,8 @@ cluster_decode_concurrent_changes(LogicalDecodingContext *ctx,
  */
 static void
 apply_concurrent_changes(ClusterDecodingState *dstate, Relation rel,
-						 ScanKey key, int nkeys, IndexInsertState *iistate)
+						 ScanKey key, int nkeys, IndexInsertState *iistate,
+						 struct timeval *must_complete)
 {
 	TupleTableSlot *index_slot, *ident_slot;
 	HeapTuple	tup_old = NULL;
@@ -3137,6 +3156,9 @@ apply_concurrent_changes(ClusterDecodingState *dstate, Relation rel,
 
 		CHECK_FOR_INTERRUPTS();
 
+		Assert(dstate->nchanges > 0);
+		dstate->nchanges--;
+
 		/* Get the change from the single-column tuple. */
 		tup_change = ExecFetchSlotHeapTuple(dstate->tsslot, false, &shouldFree);
 		heap_deform_tuple(tup_change, dstate->tupdesc_change, values, isnull);
@@ -3261,10 +3283,22 @@ apply_concurrent_changes(ClusterDecodingState *dstate, Relation rel,
 		/* TTSOpsMinimalTuple has .get_heap_tuple==NULL. */
 		Assert(shouldFree);
 		pfree(tup_change);
+
+		/*
+		 * If there is a limit on the time of completion, check it
+		 * now. However, make sure the loop does not break if tup_old was set
+		 * in the previous iteration. In such a case we could not resume the
+		 * processing in the next call.
+		 */
+		if (must_complete && tup_old == NULL &&
+			processing_time_elapsed(must_complete))
+			/* The next call will process the remaining changes. */
+			break;
 	}
 
-	tuplestore_clear(dstate->tstore);
-	dstate->nchanges = 0;
+	/* If we could not apply all the changes, the next call will do. */
+	if (dstate->nchanges == 0)
+		tuplestore_clear(dstate->tstore);
 
 	/* Cleanup. */
 	ExecDropSingleTupleTableSlot(index_slot);
@@ -3467,11 +3501,15 @@ find_target_tuple(Relation rel, ScanKey key, int nkeys, HeapTuple tup_key,
  * Decode and apply concurrent changes.
  *
  * Pass rel_src iff its reltoastrelid is needed.
+ *
+ * Returns true if must_complete is NULL or if managed to complete by the time
+ * *must_complete indicates.
  */
-static void
+static bool
 process_concurrent_changes(LogicalDecodingContext *ctx, XLogRecPtr end_of_wal,
 						   Relation rel_dst, Relation rel_src, ScanKey ident_key,
-						   int ident_key_nentries, IndexInsertState *iistate)
+						   int ident_key_nentries, IndexInsertState *iistate,
+						   struct timeval *must_complete)
 {
 	ClusterDecodingState *dstate;
 
@@ -3480,10 +3518,19 @@ process_concurrent_changes(LogicalDecodingContext *ctx, XLogRecPtr end_of_wal,
 
 	dstate = (ClusterDecodingState *) ctx->output_writer_private;
 
-	cluster_decode_concurrent_changes(ctx, end_of_wal);
+	cluster_decode_concurrent_changes(ctx, end_of_wal, must_complete);
+
+	if (processing_time_elapsed(must_complete))
+		/* Caller is responsible for applying the changes. */
+		return false;
 
+	/*
+	 * *must_complete not reached, so there are really no changes. (It's
+	 * possible to see no changes just because not enough time was left for
+	 * the decoding.)
+	 */
 	if (dstate->nchanges == 0)
-		return;
+		return true;
 
 	PG_TRY();
 	{
@@ -3495,7 +3542,7 @@ process_concurrent_changes(LogicalDecodingContext *ctx, XLogRecPtr end_of_wal,
 			rel_dst->rd_toastoid = rel_src->rd_rel->reltoastrelid;
 
 		apply_concurrent_changes(dstate, rel_dst, ident_key,
-								 ident_key_nentries, iistate);
+								 ident_key_nentries, iistate, must_complete);
 	}
 	PG_FINALLY();
 	{
@@ -3505,6 +3552,28 @@ process_concurrent_changes(LogicalDecodingContext *ctx, XLogRecPtr end_of_wal,
 			rel_dst->rd_toastoid = InvalidOid;
 	}
 	PG_END_TRY();
+
+	/*
+	 * apply_concurrent_changes() does check the processing time, so if some
+	 * changes are left, we ran out of time.
+	 */
+	return dstate->nchanges == 0;
+}
+
+/*
+ * Check if the current time is beyond *must_complete.
+ */
+static bool
+processing_time_elapsed(struct timeval *must_complete)
+{
+	struct timeval now;
+
+	if (must_complete == NULL)
+		return false;
+
+	gettimeofday(&now, NULL);
+
+	return timercmp(&now, must_complete, >);
 }
 
 static IndexInsertState *
@@ -3665,6 +3734,8 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
 	RelReopenInfo	*rri = NULL;
 	int		nrel;
 	Relation	*ind_refs_all, *ind_refs_p;
+	struct timeval t_end;
+	struct timeval *t_end_ptr = NULL;
 
 	/* Like in cluster_rel(). */
 	lockmode_old = ShareUpdateExclusiveLock;
@@ -3744,7 +3815,8 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
 	 */
 	process_concurrent_changes(ctx, end_of_wal, NewHeap,
 							   swap_toast_by_content ? OldHeap : NULL,
-							   ident_key, ident_key_nentries, iistate);
+							   ident_key, ident_key_nentries, iistate,
+							   NULL);
 
 	/*
 	 * Release the locks that allowed concurrent data changes, in order to
@@ -3866,9 +3938,38 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
 	end_of_wal = GetFlushRecPtr(NULL);
 
 	/* Apply the concurrent changes again. */
-	process_concurrent_changes(ctx, end_of_wal, NewHeap,
-							   swap_toast_by_content ? OldHeap : NULL,
-							   ident_key, ident_key_nentries, iistate);
+	/*
+	 * This time we have the exclusive lock on the table, so make sure that
+	 * cluster_max_xlock_time is not exceeded.
+	 */
+	if (cluster_max_xlock_time > 0)
+	{
+		int64		usec;
+		struct timeval t_start;
+
+		gettimeofday(&t_start, NULL);
+		/* Add the whole seconds. */
+		t_end.tv_sec = t_start.tv_sec + cluster_max_xlock_time / 1000;
+		/* Add the rest, expressed in microseconds. */
+		usec = t_start.tv_usec + 1000 * (cluster_max_xlock_time % 1000);
+		/* The number of microseconds could have overflown. */
+		t_end.tv_sec += usec / USECS_PER_SEC;
+		t_end.tv_usec = usec % USECS_PER_SEC;
+		t_end_ptr = &t_end;
+	}
+	/*
+	 * During testing, stop here to simulate excessive processing time.
+	 */
+	INJECTION_POINT("cluster-concurrently-after-lock");
+
+	if (!process_concurrent_changes(ctx, end_of_wal, NewHeap,
+									swap_toast_by_content ? OldHeap : NULL,
+									ident_key, ident_key_nentries, iistate,
+									t_end_ptr))
+		ereport(ERROR,
+				(errmsg("could not process concurrent data changes in time"),
+				 errhint("Please consider adjusting \"cluster_max_xlock_time\".")));
+
 
 	/* Remember info about rel before closing OldHeap */
 	relpersistence = OldHeap->rd_rel->relpersistence;
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index c9d8cd796a..7f4686f31e 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -39,6 +39,7 @@
 #include "catalog/namespace.h"
 #include "catalog/storage.h"
 #include "commands/async.h"
+#include "commands/cluster.h"
 #include "commands/event_trigger.h"
 #include "commands/tablespace.h"
 #include "commands/trigger.h"
@@ -2791,6 +2792,19 @@ struct config_int ConfigureNamesInt[] =
 		1600000000, 0, 2100000000,
 		NULL, NULL, NULL
 	},
+	{
+		{"cluster_max_xlock_time", PGC_USERSET, LOCK_MANAGEMENT,
+			gettext_noop("Maximum time for VACUUM FULL / CLUSTER (CONCURRENTLY) to keep table locked."),
+			gettext_noop(
+				"The table is locked in exclusive mode during the final stage of processing. "
+				"If the lock time exceeds this value, error is raised and the lock is "
+				"released. Set to zero if you don't care how long the lock can be held."),
+			GUC_UNIT_MS
+		},
+		&cluster_max_xlock_time,
+		0, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
 
 	/*
 	 * See also CheckRequiredParameterValues() if this parameter changes
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index b2bc43383d..eef7be70c5 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -728,6 +728,7 @@ autovacuum_worker_slots = 16	# autovacuum worker slots to allocate
 #vacuum_multixact_freeze_table_age = 150000000
 #vacuum_multixact_freeze_min_age = 5000000
 #vacuum_multixact_failsafe_age = 1600000000
+#cluster_max_xlock_time = 0
 #bytea_output = 'hex'			# hex, escape
 #xmlbinary = 'base64'
 #xmloption = 'content'
diff --git a/src/include/commands/cluster.h b/src/include/commands/cluster.h
index 8945e46e64..72221e71d5 100644
--- a/src/include/commands/cluster.h
+++ b/src/include/commands/cluster.h
@@ -44,6 +44,8 @@ typedef struct ClusterParams
 extern RelFileLocator	clustered_rel_locator;
 extern RelFileLocator	clustered_rel_toast_locator;
 
+extern PGDLLIMPORT int	cluster_max_xlock_time;
+
 typedef enum
 {
 	CHANGE_INSERT,
@@ -139,7 +141,8 @@ extern void mark_index_clustered(Relation rel, Oid indexOid, bool is_internal);
 extern void check_relation_is_clusterable_concurrently(Relation rel,
 													   bool is_vacuum);
 extern void cluster_decode_concurrent_changes(LogicalDecodingContext *ctx,
-											  XLogRecPtr end_of_wal);
+											  XLogRecPtr end_of_wal,
+											  struct timeval *must_complete);
 extern Oid	make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod,
 						  char relpersistence, LOCKMODE lockmode);
 extern void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap,
diff --git a/src/test/modules/injection_points/expected/cluster.out b/src/test/modules/injection_points/expected/cluster.out
index d84fff3693..646e31448f 100644
--- a/src/test/modules/injection_points/expected/cluster.out
+++ b/src/test/modules/injection_points/expected/cluster.out
@@ -1,4 +1,4 @@
-Parsed test spec with 2 sessions
+Parsed test spec with 4 sessions
 
 starting permutation: wait_before_lock change_existing change_new change_subxact1 change_subxact2 check2 wakeup_before_lock check1
 injection_points_attach
@@ -111,3 +111,75 @@ injection_points_detach
                        
 (1 row)
 
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+
+starting permutation: wait_after_lock wakeup_after_lock
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step wait_after_lock: 
+	CLUSTER (CONCURRENTLY) clstr_test USING clstr_test_pkey;
+ <waiting ...>
+step wakeup_after_lock: 
+	SELECT injection_points_wakeup('cluster-concurrently-after-lock');
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step wait_after_lock: <... completed>
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+
+starting permutation: wait_after_lock after_lock_delay wakeup_after_lock
+injection_points_attach
+-----------------------
+                       
+(1 row)
+
+step wait_after_lock: 
+	CLUSTER (CONCURRENTLY) clstr_test USING clstr_test_pkey;
+ <waiting ...>
+step after_lock_delay: 
+    SELECT pg_sleep(1.5);
+
+pg_sleep
+--------
+        
+(1 row)
+
+step wakeup_after_lock: 
+	SELECT injection_points_wakeup('cluster-concurrently-after-lock');
+
+injection_points_wakeup
+-----------------------
+                       
+(1 row)
+
+step wait_after_lock: <... completed>
+ERROR:  could not process concurrent data changes in time
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
+injection_points_detach
+-----------------------
+                       
+(1 row)
+
diff --git a/src/test/modules/injection_points/specs/cluster.spec b/src/test/modules/injection_points/specs/cluster.spec
index 5f8404c5da..9af41bac6d 100644
--- a/src/test/modules/injection_points/specs/cluster.spec
+++ b/src/test/modules/injection_points/specs/cluster.spec
@@ -127,6 +127,34 @@ step wakeup_before_lock
 	SELECT injection_points_wakeup('cluster-concurrently-before-lock');
 }
 
+session s3
+setup
+{
+	SELECT injection_points_set_local();
+	SELECT injection_points_attach('cluster-concurrently-after-lock', 'wait');
+	SET cluster_max_xlock_time TO '1s';
+}
+# Perform the initial load, lock the table in exclusive mode and wait. s4 will
+# cancel the waiting.
+step wait_after_lock
+{
+	CLUSTER (CONCURRENTLY) clstr_test USING clstr_test_pkey;
+}
+teardown
+{
+    SELECT injection_points_detach('cluster-concurrently-after-lock');
+}
+
+session s4
+step wakeup_after_lock
+{
+	SELECT injection_points_wakeup('cluster-concurrently-after-lock');
+}
+step after_lock_delay
+{
+    SELECT pg_sleep(1.5);
+}
+
 # Test if data changes introduced while one session is performing CLUSTER
 # (CONCURRENTLY) find their way into the table.
 permutation
@@ -138,3 +166,17 @@ permutation
 	check2
 	wakeup_before_lock
 	check1
+
+# Test the cluster_max_xlock_time configuration variable.
+#
+# First, cancel waiting on the injection point immediately. That way, CLUSTER
+# should complete.
+permutation
+	wait_after_lock
+	wakeup_after_lock
+# Second, cancel the waiting with a delay that violates
+# cluster_max_xlock_time.
+permutation
+	wait_after_lock
+	after_lock_delay
+	wakeup_after_lock
-- 
2.45.2

