From 9677b13211b05c5893a80404aa3a8d383833b9a3 Mon Sep 17 00:00:00 2001
From: Antonin Houska <ah@cybertec.at>
Date: Mon, 30 Jun 2025 19:41:43 +0200
Subject: [PATCH 7/7] Enable logical decoding transiently, only for REPACK
 CONCURRENTLY.

As REPACK CONCURRENTLY uses logical decoding, it requires wal_level to be set
to 'logical', while 'replica' is the default value. If logical replication is
not used, users will probably be reluctant to set the GUC to 'logical' because
it can affect server performance (by writing additional information to WAL)
and because it cannot be changed to 'logical' only for the time REPACK
CONCURRENTLY is running: change of this GUC requires server restart to take
effect.

This patch teaches postgres backend to recognize whether it should consider
wal_level='logical' "locally" for particular transaction, even if the
wal_level GUC is actually set to 'replica'. Also it ensures that the logical
decoding specific information is added to WAL only for the tables which are
currently being processed by REPACK CONCURRENTLY.

If the logical decoding is enabled this way, only temporary replication slots
should be created. The problem of permanent slot is that it is restored during
server restart, and the restore fails if wal_level is not "globally"
'logical'.

There is an independent work in progres to enable logical decoding transiently
[1]. ISTM that this is too "heavyweight" solution for our problem. And I think
that these two approaches are not mutually exclusive: once [1] is committed,
we only need to adjust the XLogLogicalInfoActive() macro.

[1] https://www.postgresql.org/message-id/CAD21AoCVLeLYq09pQPaWs%2BJwdni5FuJ8v2jgq-u9_uFbcp6UbA%40mail.gmail.com
---
 doc/src/sgml/ref/repack.sgml                  |   7 -
 src/backend/access/transam/parallel.c         |   8 +
 src/backend/access/transam/xact.c             | 106 ++++-
 src/backend/access/transam/xlog.c             |   1 +
 src/backend/commands/cluster.c                | 387 +++++++++++++++++-
 src/backend/replication/logical/logical.c     |   9 +-
 src/backend/storage/ipc/ipci.c                |   2 +
 src/backend/storage/ipc/standby.c             |   4 +-
 src/backend/utils/cache/inval.c               |  21 +
 src/backend/utils/cache/relcache.c            |   4 +
 src/include/access/xlog.h                     |  15 +-
 src/include/commands/cluster.h                |   5 +
 src/include/utils/inval.h                     |   2 +
 src/include/utils/rel.h                       |   9 +-
 src/test/modules/injection_points/Makefile    |   1 -
 .../modules/injection_points/logical.conf     |   1 -
 src/test/modules/injection_points/meson.build |   3 -
 src/tools/pgindent/typedefs.list              |   1 +
 18 files changed, 540 insertions(+), 46 deletions(-)
 delete mode 100644 src/test/modules/injection_points/logical.conf

diff --git a/doc/src/sgml/ref/repack.sgml b/doc/src/sgml/ref/repack.sgml
index e1313f40599..0fd767eef98 100644
--- a/doc/src/sgml/ref/repack.sgml
+++ b/doc/src/sgml/ref/repack.sgml
@@ -260,13 +260,6 @@ REPACK [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ] CONCU
         </para>
        </listitem>
 
-       <listitem>
-        <para>
-          The <link linkend="guc-wal-level"><varname>wal_level</varname></link>
-          configuration parameter is less than <literal>logical</literal>.
-        </para>
-       </listitem>
-
        <listitem>
         <para>
          The <link linkend="guc-max-replication-slots"><varname>max_replication_slots</varname></link>
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 94db1ec3012..a33318ea7bd 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -98,6 +98,7 @@ typedef struct FixedParallelState
 	TimestampTz xact_ts;
 	TimestampTz stmt_ts;
 	SerializableXactHandle serializable_xact_handle;
+	int			wal_level_transient;
 
 	/* Mutex protects remaining fields. */
 	slock_t		mutex;
@@ -355,6 +356,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
 	fps->xact_ts = GetCurrentTransactionStartTimestamp();
 	fps->stmt_ts = GetCurrentStatementStartTimestamp();
 	fps->serializable_xact_handle = ShareSerializableXact();
+	fps->wal_level_transient = wal_level_transient;
 	SpinLockInit(&fps->mutex);
 	fps->last_xlog_end = 0;
 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
@@ -1550,6 +1552,12 @@ ParallelWorkerMain(Datum main_arg)
 	/* Attach to the leader's serializable transaction, if SERIALIZABLE. */
 	AttachSerializableXact(fps->serializable_xact_handle);
 
+	/*
+	 * Restore the information whether this worker should behave as if
+	 * wal_level was WAL_LEVEL_LOGICAL..
+	 */
+	wal_level_transient = fps->wal_level_transient;
+
 	/*
 	 * We've initialized all of our state now; nothing should change
 	 * hereafter.
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 23f2de587a1..be568f70961 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -36,6 +36,7 @@
 #include "catalog/pg_enum.h"
 #include "catalog/storage.h"
 #include "commands/async.h"
+#include "commands/cluster.h"
 #include "commands/tablecmds.h"
 #include "commands/trigger.h"
 #include "common/pg_prng.h"
@@ -126,6 +127,12 @@ static FullTransactionId XactTopFullTransactionId = {InvalidTransactionId};
 static int	nParallelCurrentXids = 0;
 static TransactionId *ParallelCurrentXids;
 
+/*
+ * Have we determined the value of wal_level_transient for the current
+ * transaction?
+ */
+static bool wal_level_transient_checked = false;
+
 /*
  * Miscellaneous flag bits to record events which occur on the top level
  * transaction. These flags are only persisted in MyXactFlags and are intended
@@ -638,6 +645,7 @@ AssignTransactionId(TransactionState s)
 	bool		isSubXact = (s->parent != NULL);
 	ResourceOwner currentOwner;
 	bool		log_unknown_top = false;
+	bool		set_wal_level_transient = false;
 
 	/* Assert that caller didn't screw up */
 	Assert(!FullTransactionIdIsValid(s->fullTransactionId));
@@ -652,6 +660,32 @@ AssignTransactionId(TransactionState s)
 				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
 				 errmsg("cannot assign transaction IDs during a parallel operation")));
 
+	/*
+	 * The first call (i.e. the first write) in the transaction tree
+	 * determines whether the whole transaction assumes logical decoding or
+	 * not.
+	 */
+	if (!wal_level_transient_checked)
+	{
+		Assert(wal_level_transient == WAL_LEVEL_MINIMAL);
+
+		/*
+		 * Do not repeat the check when calling this function for parent
+		 * transactions.
+		 */
+		wal_level_transient_checked = true;
+
+		/*
+		 * Remember that the actual check is needed. We cannot do it until the
+		 * top-level transaction has its XID assigned, see comments below.
+		 *
+		 * There is no use case for overriding MINIMAL, and LOGICAL cannot be
+		 * overridden as such.
+		 */
+		if (wal_level == WAL_LEVEL_REPLICA)
+			set_wal_level_transient = true;
+	}
+
 	/*
 	 * Ensure parent(s) have XIDs, so that a child always has an XID later
 	 * than its parent.  Mustn't recurse here, or we might get a stack
@@ -681,20 +715,6 @@ AssignTransactionId(TransactionState s)
 		pfree(parents);
 	}
 
-	/*
-	 * When wal_level=logical, guarantee that a subtransaction's xid can only
-	 * be seen in the WAL stream if its toplevel xid has been logged before.
-	 * If necessary we log an xact_assignment record with fewer than
-	 * PGPROC_MAX_CACHED_SUBXIDS. Note that it is fine if didLogXid isn't set
-	 * for a transaction even though it appears in a WAL record, we just might
-	 * superfluously log something. That can happen when an xid is included
-	 * somewhere inside a wal record, but not in XLogRecord->xl_xid, like in
-	 * xl_standby_locks.
-	 */
-	if (isSubXact && XLogLogicalInfoActive() &&
-		!TopTransactionStateData.didLogXid)
-		log_unknown_top = true;
-
 	/*
 	 * Generate a new FullTransactionId and record its xid in PGPROC and
 	 * pg_subtrans.
@@ -719,6 +739,54 @@ AssignTransactionId(TransactionState s)
 	if (!isSubXact)
 		RegisterPredicateLockingXid(XidFromFullTransactionId(s->fullTransactionId));
 
+	/*
+	 * Check if this transaction should consider wal_level=logical.
+	 *
+	 * Sometimes we need to turn on the logical decoding transiently although
+	 * wal_level=WAL_LEVEL_REPLICA. Currently we do so when at least one table
+	 * is being clustered concurrently, i.e. when we should assume that
+	 * changes done by this transaction will be decoded. In such a case we
+	 * adjust the value of XLogLogicalInfoActive() by setting
+	 * wal_level_transient to LOGICAL.
+	 *
+	 * It's important not to do this check until the XID of the top-level
+	 * transaction is in ProcGlobal: if the decoding becomes mandatory right
+	 * after the check, our transaction will fail to write the necessary
+	 * information to WAL. However, if the top-level transaction is already in
+	 * ProcGlobal, its XID is guaranteed to appear in the xl_running_xacts
+	 * record and therefore the snapshot builder will not try to decode the
+	 * transaction (because it assumes it could have missed the initial part
+	 * of the transaction).
+	 *
+	 * On the other hand, if the decoding became mandatory between the actual
+	 * XID assignment and now, the transaction will WAL the decoding specific
+	 * information unnecessarily. Let's assume that such race conditions do
+	 * not happen too often.
+	 */
+	if (set_wal_level_transient)
+	{
+		/*
+		 * Check for the operation that enables the logical decoding
+		 * transiently.
+		 */
+		if (is_concurrent_repack_in_progress(InvalidOid))
+			wal_level_transient = WAL_LEVEL_LOGICAL;
+	}
+
+	/*
+	 * When wal_level=logical, guarantee that a subtransaction's xid can only
+	 * be seen in the WAL stream if its toplevel xid has been logged before.
+	 * If necessary we log an xact_assignment record with fewer than
+	 * PGPROC_MAX_CACHED_SUBXIDS. Note that it is fine if didLogXid isn't set
+	 * for a transaction even though it appears in a WAL record, we just might
+	 * superfluously log something. That can happen when an xid is included
+	 * somewhere inside a wal record, but not in XLogRecord->xl_xid, like in
+	 * xl_standby_locks.
+	 */
+	if (isSubXact && XLogLogicalInfoActive() &&
+		!TopTransactionStateData.didLogXid)
+		log_unknown_top = true;
+
 	/*
 	 * Acquire lock on the transaction XID.  (We assume this cannot block.) We
 	 * have to ensure that the lock is assigned to the transaction's own
@@ -2216,6 +2284,16 @@ StartTransaction(void)
 	if (TransactionTimeout > 0)
 		enable_timeout_after(TRANSACTION_TIMEOUT, TransactionTimeout);
 
+	/*
+	 * wal_level_transient can override wal_level for individual transactions,
+	 * which effectively enables logical decoding for them. At the moment we
+	 * don't know if this transaction will write any data changes to be
+	 * decoded. Should it do, AssignTransactionId() will check if the decoding
+	 * needs to be considered.
+	 */
+	wal_level_transient = WAL_LEVEL_MINIMAL;
+	wal_level_transient_checked = false;
+
 	ShowTransactionState("StartTransaction");
 }
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 47ffc0a2307..dc222db6a5d 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -129,6 +129,7 @@ bool		wal_recycle = true;
 bool		log_checkpoints = true;
 int			wal_sync_method = DEFAULT_WAL_SYNC_METHOD;
 int			wal_level = WAL_LEVEL_REPLICA;
+int			wal_level_transient = WAL_LEVEL_MINIMAL;
 int			CommitDelay = 0;	/* precommit delay in microseconds */
 int			CommitSiblings = 5; /* # concurrent xacts needed to sleep */
 int			wal_retrieve_retry_interval = 5000;
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index 37f69f369eb..7ecef2b86fc 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -84,6 +84,14 @@ typedef struct
  * The following definitions are used for concurrent processing.
  */
 
+/*
+ * OID of the table being repacked by this backend.
+ */
+static Oid	repacked_rel = InvalidOid;
+
+/* The same for its TOAST relation. */
+static Oid	repacked_rel_toast = InvalidOid;
+
 /*
  * The locators are used to avoid logical decoding of data that we do not need
  * for our table.
@@ -135,8 +143,10 @@ static List *get_tables_to_cluster_partitioned(MemoryContext cluster_context,
 											   ClusterCommand cmd);
 static bool cluster_is_permitted_for_relation(Oid relid, Oid userid,
 											  ClusterCommand cmd);
-static void begin_concurrent_repack(Relation rel);
-static void end_concurrent_repack(void);
+static void begin_concurrent_repack(Relation rel, Relation *index_p,
+									bool *entered_p);
+static void end_concurrent_repack(bool error);
+static void cluster_before_shmem_exit_callback(int code, Datum arg);
 static LogicalDecodingContext *setup_logical_decoding(Oid relid,
 													  const char *slotname,
 													  TupleDesc tupdesc);
@@ -383,6 +393,8 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params,
 	Relation	index;
 	bool		concurrent = ((params->options & CLUOPT_CONCURRENT) != 0);
 	LOCKMODE	lmode;
+	bool		entered,
+				success;
 
 	/*
 	 * Check that the correct lock is held. The lock mode is
@@ -558,23 +570,31 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params,
 		TransferPredicateLocksToHeapRelation(OldHeap);
 
 	/* rebuild_relation does all the dirty work */
+	entered = false;
+	success = false;
 	PG_TRY();
 	{
 		/*
-		 * For concurrent processing, make sure that our logical decoding
-		 * ignores data changes of other tables than the one we are
-		 * processing.
+		 * For concurrent processing, make sure that
+		 *
+		 * 1) our logical decoding ignores data changes of other tables than
+		 * the one we are processing.
+		 *
+		 * 2) other transactions know that REPACK CONCURRENTLY is in progress
+		 * for our table, so they write sufficient information to WAL even if
+		 * wal_level is < LOGICAL.
 		 */
 		if (concurrent)
-			begin_concurrent_repack(OldHeap);
+			begin_concurrent_repack(OldHeap, &index, &entered);
 
 		rebuild_relation(OldHeap, index, verbose, concurrent, save_userid,
 						 cmd);
+		success = true;
 	}
 	PG_FINALLY();
 	{
-		if (concurrent)
-			end_concurrent_repack();
+		if (concurrent && entered)
+			end_concurrent_repack(!success);
 	}
 	PG_END_TRY();
 
@@ -2208,6 +2228,49 @@ cluster_is_permitted_for_relation(Oid relid, Oid userid, ClusterCommand cmd)
 
 #define REPL_PLUGIN_NAME	"pgoutput_repack"
 
+/*
+ * Each relation being processed by REPACK CONCURRENTLY must be in the
+ * repackedRelsHash hashtable.
+ */
+typedef struct RepackedRel
+{
+	Oid			relid;
+	Oid			dbid;
+} RepackedRel;
+
+/* Hashtable of RepackedRel elements. */
+static HTAB	   *repackedRelsHash = NULL;;
+
+/*
+ * Maximum number of entries in the hashtable.
+ *
+ * A replication slot is needed for the processing, so use this GUC to
+ * allocate memory for the hashtable. Multiply by two because TOAST relations
+ * also need to be added to the hashtable.
+ */
+#define	MAX_REPACKED_RELS	(max_replication_slots * 2)
+
+Size
+RepackShmemSize(void)
+{
+	return hash_estimate_size(MAX_REPACKED_RELS, sizeof(RepackedRel));
+}
+
+void
+RepackShmemInit(void)
+{
+	HASHCTL		info;
+
+	info.keysize = sizeof(RepackedRel);
+	info.entrysize = info.keysize;
+	repackedRelsHash = ShmemInitHash("Repacked Relations Hash",
+									 MAX_REPACKED_RELS,
+									 MAX_REPACKED_RELS,
+									 &info,
+									 HASH_ELEM | HASH_BLOBS |
+									 HASH_FIXED_SIZE);
+}
+
 /*
  * Call this function before REPACK CONCURRENTLY starts to setup logical
  * decoding. It makes sure that other users of the table put enough
@@ -2222,11 +2285,150 @@ cluster_is_permitted_for_relation(Oid relid, Oid userid, ClusterCommand cmd)
  *
  * Note that TOAST table needs no attention here as it's not scanned using
  * historic snapshot.
+ *
+ * 'index_p' is in/out argument because the function unlocks the index
+ * temporarily.
+ *
+ * 'enter_p' receives a bool value telling whether relation OID was entered
+ * into repackedRelsHash or not.
  */
 static void
-begin_concurrent_repack(Relation rel)
+begin_concurrent_repack(Relation rel, Relation *index_p, bool *entered_p)
 {
-	Oid			toastrelid;
+	Oid			relid,
+				toastrelid;
+	Relation	index = NULL;
+	Oid			indexid = InvalidOid;
+	RepackedRel key,
+			   *entry;
+	bool		found;
+	static bool before_shmem_exit_callback_setup = false;
+
+	relid = RelationGetRelid(rel);
+	index = index_p ? *index_p : NULL;
+
+	/*
+	 * Make sure that we do not leave an entry in repackedRelsHash if exiting
+	 * due to FATAL.
+	 */
+	if (!before_shmem_exit_callback_setup)
+	{
+		before_shmem_exit(cluster_before_shmem_exit_callback, 0);
+		before_shmem_exit_callback_setup = true;
+	}
+
+	memset(&key, 0, sizeof(key));
+	key.relid = relid;
+	key.dbid = MyDatabaseId;
+
+	*entered_p = false;
+	LWLockAcquire(RepackedRelsLock, LW_EXCLUSIVE);
+	entry = (RepackedRel *)
+		hash_search(repackedRelsHash, &key, HASH_ENTER_NULL, &found);
+	if (found)
+	{
+		/*
+		 * Since REPACK CONCURRENTLY takes ShareRowExclusiveLock, a conflict
+		 * should occur much earlier. However that lock may be released
+		 * temporarily, see below.  Anyway, we should complain whatever the
+		 * reason of the conflict might be.
+		 */
+		ereport(ERROR,
+				(errmsg("relation \"%s\" is already being processed by REPACK CONCURRENTLY",
+						RelationGetRelationName(rel))));
+	}
+	if (entry == NULL)
+		ereport(ERROR,
+				(errmsg("too many requests for REPACK CONCURRENTLY at a time")),
+				(errhint("Please consider increasing the \"max_replication_slots\" configuration parameter.")));
+
+	/*
+	 * Even if the insertion of TOAST relid should fail below, the caller has
+	 * to do cleanup.
+	 */
+	*entered_p = true;
+
+	/*
+	 * Enable the callback to remove the entry in case of exit. We should not
+	 * do this earlier, otherwise an attempt to insert already existing entry
+	 * could make us remove that entry (inserted by another backend) during
+	 * ERROR handling.
+	 */
+	Assert(!OidIsValid(repacked_rel));
+	repacked_rel = relid;
+
+	/*
+	 * TOAST relation is not accessed using historic snapshot, but we enter it
+	 * here to protect it from being VACUUMed by another backend. (Lock does
+	 * not help in the CONCURRENTLY case because cannot hold it continuously
+	 * till the end of the transaction.) See the comments on locking TOAST
+	 * relation in copy_table_data().
+	 */
+	toastrelid = rel->rd_rel->reltoastrelid;
+	if (OidIsValid(toastrelid))
+	{
+		key.relid = toastrelid;
+		entry = (RepackedRel *)
+			hash_search(repackedRelsHash, &key, HASH_ENTER_NULL, &found);
+		if (found)
+
+			/*
+			 * If we could enter the main fork the TOAST should succeed too.
+			 * Nevertheless, check.
+			 */
+			ereport(ERROR,
+					(errmsg("TOAST relation of \"%s\" is already being processed by REPACK CONCURRENTLY",
+							RelationGetRelationName(rel))));
+		if (entry == NULL)
+			ereport(ERROR,
+					(errmsg("too many requests for REPACK CONCURRENTLY at a time")),
+					(errhint("Please consider increasing the \"max_replication_slots\" configuration parameter.")));
+
+		Assert(!OidIsValid(repacked_rel_toast));
+		repacked_rel_toast = toastrelid;
+	}
+
+	LWLockRelease(RepackedRelsLock);
+
+	/*
+	 * Make sure that other backends are aware of the new hash entry as soon
+	 * as they open our table.
+	 */
+	CacheInvalidateRelcacheImmediate(relid);
+
+	/*
+	 * Also make sure that the existing users of the table update their
+	 * relcache entry as soon as they try to run DML commands on it.
+	 *
+	 * ShareLock is the weakest lock that conflicts with DMLs. If any backend
+	 * has a lower lock, we assume it'll accept our invalidation message when
+	 * it changes the lock mode.
+	 *
+	 * Before upgrading the lock on the relation, close the index temporarily
+	 * to avoid a deadlock if another backend running DML already has its lock
+	 * (ShareLock) on the table and waits for the lock on the index.
+	 */
+	if (index)
+	{
+		indexid = RelationGetRelid(index);
+		index_close(index, ShareUpdateExclusiveLock);
+	}
+	LockRelationOid(relid, ShareLock);
+	UnlockRelationOid(relid, ShareLock);
+	if (OidIsValid(indexid))
+	{
+		/*
+		 * Re-open the index and check that it hasn't changed while unlocked.
+		 */
+		check_index_is_clusterable(rel, indexid, ShareUpdateExclusiveLock);
+
+		/*
+		 * Return the new relcache entry to the caller. (It's been locked by
+		 * the call above.)
+		 */
+		index = index_open(indexid, NoLock);
+		*index_p = index;
+	}
 
 	/* Avoid logical decoding of other relations by this backend. */
 	repacked_rel_locator = rel->rd_locator;
@@ -2244,15 +2446,176 @@ begin_concurrent_repack(Relation rel)
 
 /*
  * Call this when done with REPACK CONCURRENTLY.
+ *
+ * 'error' tells whether the function is being called in order to handle
+ * error.
  */
 static void
-end_concurrent_repack(void)
+end_concurrent_repack(bool error)
 {
+	RepackedRel key;
+	RepackedRel *entry = NULL;
+	RepackedRel *entry_toast = NULL;
+	Oid			relid = repacked_rel;
+	Oid			toastrelid = repacked_rel_toast;
+
+	/* Remove the relation from the hash if we managed to insert one. */
+	if (OidIsValid(repacked_rel))
+	{
+		LWLockAcquire(RepackedRelsLock, LW_EXCLUSIVE);
+
+		memset(&key, 0, sizeof(key));
+		key.relid = repacked_rel;
+		key.dbid = MyDatabaseId;
+
+		entry = hash_search(repackedRelsHash, &key, HASH_REMOVE, NULL);
+
+		/* Remove the TOAST relation if there is one. */
+		if (OidIsValid(repacked_rel_toast))
+		{
+			key.relid = repacked_rel_toast;
+			entry_toast = hash_search(repackedRelsHash, &key, HASH_REMOVE,
+									  NULL);
+		}
+
+		LWLockRelease(RepackedRelsLock);
+
+		/*
+		 * Make others refresh their information whether they should still
+		 * treat the table as catalog from the perspective of writing WAL.
+		 *
+		 * XXX Unlike entering the entry into the hashtable, we do not bother
+		 * with locking and unlocking the table here:
+		 *
+		 * 1) On normal completion (and sometimes even on ERROR), the caller
+		 * is already holding AccessExclusiveLock on the table, so there
+		 * should be no relcache reference unaware of this change.
+		 *
+		 * 2) In the other cases, the worst scenario is that the other
+		 * backends will write unnecessary information to WAL until they close
+		 * the relation.
+		 *
+		 * Should we use ShareLock mode to fix 2) at least for the non-FATAL
+		 * errors? (Our before_shmem_exit callback is in charge of FATAL, and
+		 * that probably should not try to acquire any lock.)
+		 */
+		CacheInvalidateRelcacheImmediate(repacked_rel);
+
+		/*
+		 * By clearing repacked_rel we also disable
+		 * cluster_before_shmem_exit_callback().
+		 */
+		repacked_rel = InvalidOid;
+		repacked_rel_toast = InvalidOid;
+	}
+
 	/*
 	 * Restore normal function of (future) logical decoding for this backend.
 	 */
 	repacked_rel_locator.relNumber = InvalidOid;
 	repacked_rel_toast_locator.relNumber = InvalidOid;
+
+	/*
+	 * On normal completion (!error), we should not really fail to remove the
+	 * entry. But if it wasn't there for any reason, raise ERROR to make sure
+	 * the transaction is aborted: if other transactions, while changing the
+	 * contents of the relation, didn't know that REPACK CONCURRENTLY was in
+	 * progress, they could have missed to WAL enough information, and thus we
+	 * could have produced an inconsistent table contents.
+	 *
+	 * On the other hand, if we are already handling an error, there's no
+	 * reason to worry about inconsistent contents of the new storage because
+	 * the transaction is going to be rolled back anyway. Furthermore, by
+	 * raising ERROR here we'd shadow the original error.
+	 */
+	if (!error)
+	{
+		char	   *relname;
+
+		if (OidIsValid(relid) && entry == NULL)
+		{
+			relname = get_rel_name(relid);
+			if (!relname)
+				ereport(ERROR,
+						(errmsg("cache lookup failed for relation %u",
+								relid)));
+
+			ereport(ERROR,
+					(errmsg("relation \"%s\" not found among repacked relations",
+							relname)));
+		}
+
+		/*
+		 * Likewise, the TOAST relation should not have disappeared.
+		 */
+		if (OidIsValid(toastrelid) && entry_toast == NULL)
+		{
+			relname = get_rel_name(key.relid);
+			if (!relname)
+				ereport(ERROR,
+						(errmsg("cache lookup failed for relation %u",
+								key.relid)));
+
+			ereport(ERROR,
+					(errmsg("relation \"%s\" not found among repacked relations",
+							relname)));
+		}
+
+	}
+}
+
+/*
+ * A wrapper to call end_concurrent_repack() as a before_shmem_exit callback.
+ */
+static void
+cluster_before_shmem_exit_callback(int code, Datum arg)
+{
+	if (OidIsValid(repacked_rel))
+		end_concurrent_repack(true);
+}
+
+/*
+ * Check if relation is currently being processed by REPACK CONCURRENTLY.
+ *
+ * If relid is InvalidOid, check if any relation is being processed.
+ */
+bool
+is_concurrent_repack_in_progress(Oid relid)
+{
+	RepackedRel key,
+			   *entry;
+
+	/* For particular relation we need to search in the hashtable. */
+	memset(&key, 0, sizeof(key));
+	key.relid = relid;
+	key.dbid = MyDatabaseId;
+
+	LWLockAcquire(RepackedRelsLock, LW_SHARED);
+	/*
+	 * If the caller is interested whether any relation is being repacked,
+	 * just check the number of entries.
+	 */
+	if (!OidIsValid(relid))
+	{
+		long	n = hash_get_num_entries(repackedRelsHash);
+
+		LWLockRelease(RepackedRelsLock);
+		return n > 0;
+	}
+	entry = (RepackedRel *)
+		hash_search(repackedRelsHash, &key, HASH_FIND, NULL);
+	LWLockRelease(RepackedRelsLock);
+
+	return entry != NULL;
+}
+
+/*
+ * Is this backend performing REPACK CONCURRENTLY?
+ */
+bool
+is_concurrent_repack_run_by_me(void)
+{
+	return OidIsValid(repacked_rel);
 }
 
 /*
@@ -2282,7 +2645,7 @@ setup_logical_decoding(Oid relid, const char *slotname, TupleDesc tupdesc)
 	 * useful for us.
 	 *
 	 * Regarding the value of need_full_snapshot, we pass false because the
-	 * table we are processing is present in RepackedRelsHash and therefore,
+	 * table we are processing is present in repackedRelsHash and therefore,
 	 * regarding logical decoding, treated like a catalog.
 	 */
 	ctx = CreateInitDecodingContext(REPL_PLUGIN_NAME,
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index f1eb798f3e9..5e6000db086 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -31,6 +31,7 @@
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "access/xlogutils.h"
+#include "commands/cluster.h"
 #include "fmgr.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -114,10 +115,12 @@ CheckLogicalDecodingRequirements(void)
 
 	/*
 	 * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
-	 * needs the same check.
+	 * needs the same check. (Except that only temporary slots should be
+	 * created for REPACK CONCURRENTLY, which effectively raises wal_level to
+	 * LOGICAL.)
 	 */
-
-	if (wal_level < WAL_LEVEL_LOGICAL)
+	if ((wal_level < WAL_LEVEL_LOGICAL && !is_concurrent_repack_run_by_me())
+		|| wal_level < WAL_LEVEL_REPLICA)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("logical decoding requires \"wal_level\" >= \"logical\"")));
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index e9ddf39500c..e24e1795aa9 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -151,6 +151,7 @@ CalculateShmemSize(int *num_semaphores)
 	size = add_size(size, InjectionPointShmemSize());
 	size = add_size(size, SlotSyncShmemSize());
 	size = add_size(size, AioShmemSize());
+	size = add_size(size, RepackShmemSize());
 
 	/* include additional requested shmem from preload libraries */
 	size = add_size(size, total_addin_request);
@@ -344,6 +345,7 @@ CreateOrAttachShmemStructs(void)
 	WaitEventCustomShmemInit();
 	InjectionPointShmemInit();
 	AioShmemInit();
+	RepackShmemInit();
 }
 
 /*
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 7fa8d9247e0..ab30d448d42 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -1325,13 +1325,13 @@ LogStandbySnapshot(void)
 	 * record. Fortunately this routine isn't executed frequently, and it's
 	 * only a shared lock.
 	 */
-	if (wal_level < WAL_LEVEL_LOGICAL)
+	if (!XLogLogicalInfoActive())
 		LWLockRelease(ProcArrayLock);
 
 	recptr = LogCurrentRunningXacts(running);
 
 	/* Release lock if we kept it longer ... */
-	if (wal_level >= WAL_LEVEL_LOGICAL)
+	if (XLogLogicalInfoActive())
 		LWLockRelease(ProcArrayLock);
 
 	/* GetRunningTransactionData() acquired XidGenLock, we must release it */
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 02505c88b8e..ecaa2283c2a 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -1643,6 +1643,27 @@ CacheInvalidateRelcache(Relation relation)
 								 databaseId, relationId);
 }
 
+/*
+ * CacheInvalidateRelcacheImmediate
+ *		Send invalidation message for the specified relation's relcache entry.
+ *
+ * Currently this is used in REPACK CONCURRENTLY, to make sure that other
+ * backends are aware that the command is being executed for the relation.
+ */
+void
+CacheInvalidateRelcacheImmediate(Oid relid)
+{
+	SharedInvalidationMessage msg;
+
+	msg.rc.id = SHAREDINVALRELCACHE_ID;
+	msg.rc.dbId = MyDatabaseId;
+	msg.rc.relId = relid;
+	/* check AddCatcacheInvalidationMessage() for an explanation */
+	VALGRIND_MAKE_MEM_DEFINED(&msg, sizeof(msg));
+
+	SendSharedInvalidMessages(&msg, 1);
+}
+
 /*
  * CacheInvalidateRelcacheAll
  *		Register invalidation of the whole relcache at the end of command.
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 4911642fb3c..504cb8e56a8 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -1279,6 +1279,10 @@ retry:
 	/* make sure relation is marked as having no open file yet */
 	relation->rd_smgr = NULL;
 
+	/* Is REPACK CONCURRENTLY in progress? */
+	relation->rd_repack_concurrent =
+		is_concurrent_repack_in_progress(targetRelId);
+
 	/*
 	 * now we can free the memory allocated for pg_class_tuple
 	 */
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index d313099c027..a325bb1d16b 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -95,6 +95,12 @@ typedef enum RecoveryState
 
 extern PGDLLIMPORT int wal_level;
 
+/*
+ * wal_level_transient overrides wal_level if logical decoding needs to be
+ * enabled transiently.
+ */
+extern PGDLLIMPORT int wal_level_transient;
+
 /* Is WAL archiving enabled (always or only while server is running normally)? */
 #define XLogArchivingActive() \
 	(AssertMacro(XLogArchiveMode == ARCHIVE_MODE_OFF || wal_level >= WAL_LEVEL_REPLICA), XLogArchiveMode > ARCHIVE_MODE_OFF)
@@ -122,8 +128,13 @@ extern PGDLLIMPORT int wal_level;
 /* Do we need to WAL-log information required only for Hot Standby and logical replication? */
 #define XLogStandbyInfoActive() (wal_level >= WAL_LEVEL_REPLICA)
 
-/* Do we need to WAL-log information required only for logical replication? */
-#define XLogLogicalInfoActive() (wal_level >= WAL_LEVEL_LOGICAL)
+/*
+ * Do we need to WAL-log information required only for logical replication?
+ *
+ * wal_level_transient overrides wal_level if logical decoding needs to be
+ * active transiently.
+ */
+#define XLogLogicalInfoActive() (Max(wal_level, wal_level_transient) == WAL_LEVEL_LOGICAL)
 
 #ifdef WAL_DEBUG
 extern PGDLLIMPORT bool XLOG_DEBUG;
diff --git a/src/include/commands/cluster.h b/src/include/commands/cluster.h
index 4914f217267..9d5a30d0689 100644
--- a/src/include/commands/cluster.h
+++ b/src/include/commands/cluster.h
@@ -150,5 +150,10 @@ extern void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap,
 							 MultiXactId cutoffMulti,
 							 char newrelpersistence);
 
+extern Size RepackShmemSize(void);
+extern void RepackShmemInit(void);
+extern bool is_concurrent_repack_in_progress(Oid relid);
+extern bool is_concurrent_repack_run_by_me(void);
+
 extern void repack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel);
 #endif							/* CLUSTER_H */
diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h
index 9b871caef62..ae9dee394dc 100644
--- a/src/include/utils/inval.h
+++ b/src/include/utils/inval.h
@@ -50,6 +50,8 @@ extern void CacheInvalidateCatalog(Oid catalogId);
 
 extern void CacheInvalidateRelcache(Relation relation);
 
+extern void CacheInvalidateRelcacheImmediate(Oid relid);
+
 extern void CacheInvalidateRelcacheAll(void);
 
 extern void CacheInvalidateRelcacheByTuple(HeapTuple classTuple);
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index b552359915f..cc84592eb1f 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -253,6 +253,9 @@ typedef struct RelationData
 	bool		pgstat_enabled; /* should relation stats be counted */
 	/* use "struct" here to avoid needing to include pgstat.h: */
 	struct PgStat_TableStatus *pgstat_info; /* statistics collection area */
+
+	/* Is REPACK CONCURRENTLY being performed on this relation? */
+	bool		rd_repack_concurrent;
 } RelationData;
 
 
@@ -708,12 +711,16 @@ RelationCloseSmgr(Relation relation)
  * it would complicate decoding slightly for little gain). Note that we *do*
  * log information for user defined catalog tables since they presumably are
  * interesting to the user...
+ *
+ * If particular relations require that, the logical decoding can be active
+ * even if wal_level is REPLICA. Do not log other relations in that case.
  */
 #define RelationIsLogicallyLogged(relation) \
 	(XLogLogicalInfoActive() && \
 	 RelationNeedsWAL(relation) && \
 	 (relation)->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&	\
-	 !IsCatalogRelation(relation))
+	 !IsCatalogRelation(relation) && \
+	 (wal_level == WAL_LEVEL_LOGICAL || (relation)->rd_repack_concurrent))
 
 /* routines in utils/cache/relcache.c */
 extern void RelationIncrementReferenceCount(Relation rel);
diff --git a/src/test/modules/injection_points/Makefile b/src/test/modules/injection_points/Makefile
index 30ffe509239..e71b8a19116 100644
--- a/src/test/modules/injection_points/Makefile
+++ b/src/test/modules/injection_points/Makefile
@@ -15,7 +15,6 @@ REGRESS = injection_points hashagg reindex_conc vacuum
 REGRESS_OPTS = --dlpath=$(top_builddir)/src/test/regress
 
 ISOLATION = basic inplace syscache-update-pruned repack
-ISOLATION_OPTS = --temp-config $(top_srcdir)/src/test/modules/injection_points/logical.conf
 
 TAP_TESTS = 1
 
diff --git a/src/test/modules/injection_points/logical.conf b/src/test/modules/injection_points/logical.conf
deleted file mode 100644
index c8f264bc6cb..00000000000
--- a/src/test/modules/injection_points/logical.conf
+++ /dev/null
@@ -1 +0,0 @@
-wal_level = logical
\ No newline at end of file
diff --git a/src/test/modules/injection_points/meson.build b/src/test/modules/injection_points/meson.build
index c7daa669548..13c2b627a0b 100644
--- a/src/test/modules/injection_points/meson.build
+++ b/src/test/modules/injection_points/meson.build
@@ -51,9 +51,6 @@ tests += {
       'syscache-update-pruned',
     ],
     'runningcheck': false, # see syscache-update-pruned
-    # 'repack' requires wal_level = 'logical'.
-    'regress_args': ['--temp-config', files('logical.conf')],
-
   },
   'tap': {
     'env': {
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 879977ea41f..add58883124 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2532,6 +2532,7 @@ ReorderBufferTupleCidKey
 ReorderBufferUpdateProgressTxnCB
 ReorderTuple
 RepOriginId
+RepackedRel
 RepackDecodingState
 RepackStmt
 ReparameterizeForeignPathByChild_function
-- 
2.47.1

