From 3763e71b4b08bef32c799e69a9f142df887095bc Mon Sep 17 00:00:00 2001
From: Antonin Houska <ah@cybertec.at>
Date: Tue, 9 Dec 2025 19:44:43 +0100
Subject: [PATCH 5/5] Use background worker to do logical decoding.

If the backend performing REPACK (CONCURRENTLY) does both data copying and
logical decoding, it has to "travel in time" back and forth and therefore it
has to invalidate system caches quite a few times. (The copying and the
decoding work with different catalog snapshots.) As the decoding worker has
separate caches, the switching is not necessary.

Without the worker, it'd also be difficult to switch between potentiallly long
running tasks like index build and WAL decoding. (No decoding during that time
at all can suspend archiving / recycling of WAL segments for some time, which
in turn may result in full disk.)

Another problem is that, after having acquired AccessExclusiveLock (in order
to swap the files), the backend needs to both decode and apply the data
changes that took place while it was waiting for the lock. With the decoding
worker, the decoding runs all the time, so the backend only needs to apply the
changes. This can reduce the time the exclusive lock is held for.

Note that the code added in order to handle ERRORs in the background worker
almost duplicates the existing code that does the same for other types of
workers (See ProcessParallelMessages() and
ProcessParallelApplyMessages()). Refactoring of the existing code might be
useful, to reduce the duplication.
---
 src/backend/access/heap/heapam_handler.c      |   44 -
 src/backend/commands/cluster.c                | 1139 +++++++++++++----
 src/backend/libpq/pqmq.c                      |    5 +
 src/backend/postmaster/bgworker.c             |    4 +
 src/backend/replication/logical/logical.c     |    6 +-
 .../pgoutput_repack/pgoutput_repack.c         |   54 +-
 src/backend/storage/ipc/procsignal.c          |    4 +
 src/backend/tcop/postgres.c                   |    4 +
 .../utils/activity/wait_event_names.txt       |    2 +
 src/include/access/tableam.h                  |    7 +-
 src/include/commands/cluster.h                |   68 +-
 src/include/storage/procsignal.h              |    1 +
 src/tools/pgindent/typedefs.list              |    4 +-
 13 files changed, 932 insertions(+), 410 deletions(-)

diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 01be29eb405..e6d630fa2f7 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -33,7 +33,6 @@
 #include "catalog/index.h"
 #include "catalog/storage.h"
 #include "catalog/storage_xlog.h"
-#include "commands/cluster.h"
 #include "commands/progress.h"
 #include "executor/executor.h"
 #include "miscadmin.h"
@@ -688,7 +687,6 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
 								 Relation OldIndex, bool use_sort,
 								 TransactionId OldestXmin,
 								 Snapshot snapshot,
-								 LogicalDecodingContext *decoding_ctx,
 								 TransactionId *xid_cutoff,
 								 MultiXactId *multi_cutoff,
 								 double *num_tuples,
@@ -710,7 +708,6 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
 	BufferHeapTupleTableSlot *hslot;
 	BlockNumber prev_cblock = InvalidBlockNumber;
 	bool		concurrent = snapshot != NULL;
-	XLogRecPtr	end_of_wal_prev = GetFlushRecPtr(NULL);
 
 	/* Remember if it's a system catalog */
 	is_system_catalog = IsSystemRelation(OldHeap);
@@ -957,31 +954,6 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
 			ct_val[1] = *num_tuples;
 			pgstat_progress_update_multi_param(2, ct_index, ct_val);
 		}
-
-		/*
-		 * Process the WAL produced by the load, as well as by other
-		 * transactions, so that the replication slot can advance and WAL does
-		 * not pile up. Use wal_segment_size as a threshold so that we do not
-		 * introduce the decoding overhead too often.
-		 *
-		 * Of course, we must not apply the changes until the initial load has
-		 * completed.
-		 *
-		 * Note that our insertions into the new table should not be decoded
-		 * as we (intentionally) do not write the logical decoding specific
-		 * information to WAL.
-		 */
-		if (concurrent)
-		{
-			XLogRecPtr	end_of_wal;
-
-			end_of_wal = GetFlushRecPtr(NULL);
-			if ((end_of_wal - end_of_wal_prev) > wal_segment_size)
-			{
-				repack_decode_concurrent_changes(decoding_ctx, end_of_wal);
-				end_of_wal_prev = end_of_wal;
-			}
-		}
 	}
 
 	if (indexScan != NULL)
@@ -1027,22 +999,6 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
 			/* Report n_tuples */
 			pgstat_progress_update_param(PROGRESS_REPACK_HEAP_TUPLES_INSERTED,
 										 n_tuples);
-
-			/*
-			 * Try to keep the amount of not-yet-decoded WAL small, like
-			 * above.
-			 */
-			if (concurrent)
-			{
-				XLogRecPtr	end_of_wal;
-
-				end_of_wal = GetFlushRecPtr(NULL);
-				if ((end_of_wal - end_of_wal_prev) > wal_segment_size)
-				{
-					repack_decode_concurrent_changes(decoding_ctx, end_of_wal);
-					end_of_wal_prev = end_of_wal;
-				}
-			}
 		}
 
 		tuplesort_end(tuplesort);
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index 501bd36c23e..f2a2ec6d3e5 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -46,6 +46,8 @@
 #include "commands/tablecmds.h"
 #include "commands/vacuum.h"
 #include "executor/executor.h"
+#include "libpq/pqformat.h"
+#include "libpq/pqmq.h"
 #include "miscadmin.h"
 #include "optimizer/optimizer.h"
 #include "pgstat.h"
@@ -56,6 +58,8 @@
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
+#include "storage/procsignal.h"
+#include "tcop/tcopprot.h"
 #include "utils/acl.h"
 #include "utils/fmgroids.h"
 #include "utils/guc.h"
@@ -123,6 +127,103 @@ typedef struct ChangeDest
 	IndexInsertState *iistate;
 } ChangeDest;
 
+/*
+ * Layout of shared memory used for communication between backend and the
+ * worker that performs logical decoding of data changes
+ */
+typedef struct DecodingWorkerShared
+{
+	/*
+	 * Once the worker has reached this LSN, it should close the current
+	 * output file and either create a new one or exit, according to the field
+	 * 'done'. If the value is InvalidXLogRecPtr, the worker should decode all
+	 * the WAL available and keep checking this field. It is ok if the worker
+	 * had already decoded records whose LSN is >= lsn_upto before this field
+	 * has been set.
+	 */
+	XLogRecPtr	lsn_upto;
+
+	/* Exit after closing the current file? */
+	bool		done;
+
+	/* The output is stored here. */
+	SharedFileSet sfs;
+
+	/* Can backend read the file contents? */
+	bool		sfs_valid;
+
+	/* Number of the last file exported by the worker. */
+	int			last_exported;
+
+	/* Synchronize access to the fields above. */
+	slock_t		mutex;
+
+	/* Database to connect to. */
+	Oid			dbid;
+
+	/* Role to connect as. */
+	Oid			roleid;
+
+	/* Decode data changes of this relation. */
+	Oid			relid;
+
+	/* The backend uses this to wait for the worker. */
+	ConditionVariable cv;
+
+	/* Info to signal the backend. */
+	PGPROC	   *backend_proc;
+	pid_t		backend_pid;
+	ProcNumber	backend_proc_number;
+
+	/* Error queue. */
+	shm_mq	   *error_mq;
+
+	/*
+	 * Memory the queue is located int.
+	 *
+	 * For considerations on the value see the comments of
+	 * PARALLEL_ERROR_QUEUE_SIZE.
+	 */
+#define REPACK_ERROR_QUEUE_SIZE			16384
+	char		error_queue[FLEXIBLE_ARRAY_MEMBER];
+} DecodingWorkerShared;
+
+/*
+ * Generate output file name. If relations of the same 'relid' happen to be
+ * processed at the same time, they must be from different databases and
+ * therefore different backends must be involved. (PID is already present in
+ * the fileset name.)
+ */
+static inline void
+DecodingWorkerFileName(char *fname, Oid relid, uint32 seq)
+{
+	snprintf(fname, MAXPGPATH, "%u-%u", relid, seq);
+}
+
+/*
+ * Backend-local information to control the decoding worker.
+ */
+typedef struct DecodingWorker
+{
+	/* The worker. */
+	BackgroundWorkerHandle *handle;
+
+	/* DecodingWorkerShared is in this segment. */
+	dsm_segment *seg;
+
+	/* Handle of the error queue. */
+	shm_mq_handle *error_mqh;
+} DecodingWorker;
+
+/* Pointer to currently running decoding worker. */
+static DecodingWorker *decoding_worker = NULL;
+
+/*
+ * Is there a message sent by a repack worker that the backend needs to
+ * receive?
+ */
+volatile sig_atomic_t RepackMessagePending = false;
+
 static bool cluster_rel_recheck(RepackCommand cmd, Relation OldHeap,
 								Oid indexOid, Oid userid, LOCKMODE lmode,
 								int options);
@@ -130,7 +231,7 @@ static void check_repack_concurrently_requirements(Relation rel);
 static void rebuild_relation(Relation OldHeap, Relation index, bool verbose,
 							 bool concurrent);
 static void copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex,
-							Snapshot snapshot, LogicalDecodingContext *decoding_ctx,
+							Snapshot snapshot,
 							bool verbose,
 							bool *pSwapToastByContent,
 							TransactionId *pFreezeXid,
@@ -143,12 +244,10 @@ static List *get_tables_to_repack_partitioned(RepackCommand cmd,
 static bool cluster_is_permitted_for_relation(RepackCommand cmd,
 											  Oid relid, Oid userid);
 
-static void begin_concurrent_repack(Relation rel);
-static void end_concurrent_repack(void);
 static LogicalDecodingContext *setup_logical_decoding(Oid relid);
-static HeapTuple get_changed_tuple(char *change);
-static void apply_concurrent_changes(RepackDecodingState *dstate,
-									 ChangeDest *dest);
+static bool decode_concurrent_changes(LogicalDecodingContext *ctx,
+									  DecodingWorkerShared *shared);
+static void apply_concurrent_changes(BufFile *file, ChangeDest *dest);
 static void apply_concurrent_insert(Relation rel, HeapTuple tup,
 									IndexInsertState *iistate,
 									TupleTableSlot *index_slot);
@@ -160,9 +259,9 @@ static void apply_concurrent_delete(Relation rel, HeapTuple tup_target);
 static HeapTuple find_target_tuple(Relation rel, ChangeDest *dest,
 								   HeapTuple tup_key,
 								   TupleTableSlot *ident_slot);
-static void process_concurrent_changes(LogicalDecodingContext *decoding_ctx,
-									   XLogRecPtr end_of_wal,
-									   ChangeDest *dest);
+static void process_concurrent_changes(XLogRecPtr end_of_wal,
+									   ChangeDest *dest,
+									   bool done);
 static IndexInsertState *get_index_insert_state(Relation relation,
 												Oid ident_index_id,
 												Relation *ident_index_p);
@@ -172,7 +271,6 @@ static void free_index_insert_state(IndexInsertState *iistate);
 static void cleanup_logical_decoding(LogicalDecodingContext *ctx);
 static void rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
 											   Relation cl_index,
-											   LogicalDecodingContext *decoding_ctx,
 											   TransactionId frozenXid,
 											   MultiXactId cutoffMulti);
 static List *build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes);
@@ -182,6 +280,13 @@ static Relation process_single_relation(RepackStmt *stmt,
 										ClusterParams *params);
 static Oid	determine_clustered_index(Relation rel, bool usingindex,
 									  const char *indexname);
+static void start_decoding_worker(Oid relid);
+static void stop_decoding_worker(void);
+static void repack_worker_internal(dsm_segment *seg);
+static void export_initial_snapshot(Snapshot snapshot,
+									DecodingWorkerShared *shared);
+static Snapshot get_initial_snapshot(DecodingWorker *worker);
+static void ProcessRepackMessage(StringInfo msg);
 static const char *RepackCommandAsString(RepackCommand cmd);
 
 
@@ -604,20 +709,20 @@ cluster_rel(RepackCommand cmd, Relation OldHeap, Oid indexOid,
 	/* rebuild_relation does all the dirty work */
 	PG_TRY();
 	{
-		/*
-		 * For concurrent processing, make sure that our logical decoding
-		 * ignores data changes of other tables than the one we are
-		 * processing.
-		 */
-		if (concurrent)
-			begin_concurrent_repack(OldHeap);
-
 		rebuild_relation(OldHeap, index, verbose, concurrent);
 	}
 	PG_FINALLY();
 	{
 		if (concurrent)
-			end_concurrent_repack();
+		{
+			/*
+			 * Since during normal operation the worker was already asked to
+			 * exit, stopping it explicitly is especially important on ERROR.
+			 * However it still seems a good practice to make sure that the
+			 * worker never survives the REPACK command.
+			 */
+			stop_decoding_worker();
+		}
 	}
 	PG_END_TRY();
 
@@ -914,7 +1019,6 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose, bool concurrent
 	bool		swap_toast_by_content;
 	TransactionId frozenXid;
 	MultiXactId cutoffMulti;
-	LogicalDecodingContext *decoding_ctx = NULL;
 	Snapshot	snapshot = NULL;
 #if USE_ASSERT_CHECKING
 	LOCKMODE	lmode;
@@ -928,19 +1032,36 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose, bool concurrent
 	if (concurrent)
 	{
 		/*
-		 * Prepare to capture the concurrent data changes.
+		 * The worker needs to be member of the locking group we're the leader
+		 * of. We ought to become the leader before the worker starts. The
+		 * worker will join the group as soon as it starts.
+		 *
+		 * This is to make sure that the deadlock described below is
+		 * detectable by deadlock.c: if the worker waits for a transaction to
+		 * complete and we are waiting for the worker output, then effectively
+		 * we (i.e. this backend) are waiting for that transaction.
+		 */
+		BecomeLockGroupLeader();
+
+		/*
+		 * Start the worker that decodes data changes applied while we're
+		 * copying the table contents.
 		 *
-		 * Note that this call waits for all transactions with XID already
-		 * assigned to finish. If some of those transactions is waiting for a
-		 * lock conflicting with ShareUpdateExclusiveLock on our table (e.g.
-		 * it runs CREATE INDEX), we can end up in a deadlock. Not sure this
-		 * risk is worth unlocking/locking the table (and its clustering
-		 * index) and checking again if its still eligible for REPACK
-		 * CONCURRENTLY.
+		 * Note that the worker has to wait for all transactions with XID
+		 * already assigned to finish. If some of those transactions is
+		 * waiting for a lock conflicting with ShareUpdateExclusiveLock on our
+		 * table (e.g.  it runs CREATE INDEX), we can end up in a deadlock.
+		 * Not sure this risk is worth unlocking/locking the table (and its
+		 * clustering index) and checking again if its still eligible for
+		 * REPACK CONCURRENTLY.
+		 */
+		start_decoding_worker(tableOid);
+
+		/*
+		 * Wait until the worker has the initial snapshot and retrieve it.
 		 */
-		decoding_ctx = setup_logical_decoding(tableOid);
+		snapshot = get_initial_snapshot(decoding_worker);
 
-		snapshot = SnapBuildInitialSnapshotForRepack(decoding_ctx->snapshot_builder);
 		PushActiveSnapshot(snapshot);
 	}
 
@@ -965,7 +1086,7 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose, bool concurrent
 	NewHeap = table_open(OIDNewHeap, NoLock);
 
 	/* Copy the heap data into the new table in the desired order */
-	copy_table_data(NewHeap, OldHeap, index, snapshot, decoding_ctx, verbose,
+	copy_table_data(NewHeap, OldHeap, index, snapshot, verbose,
 					&swap_toast_by_content, &frozenXid, &cutoffMulti);
 
 	/* The historic snapshot won't be needed anymore. */
@@ -989,15 +1110,11 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose, bool concurrent
 
 		Assert(!swap_toast_by_content);
 		rebuild_relation_finish_concurrent(NewHeap, OldHeap, index,
-										   decoding_ctx,
 										   frozenXid, cutoffMulti);
 		PopActiveSnapshot();
 
 		pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
 									 PROGRESS_REPACK_PHASE_FINAL_CLEANUP);
-
-		/* Done with decoding. */
-		cleanup_logical_decoding(decoding_ctx);
 	}
 	else
 	{
@@ -1168,8 +1285,7 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod,
  */
 static void
 copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex,
-				Snapshot snapshot, LogicalDecodingContext *decoding_ctx,
-				bool verbose, bool *pSwapToastByContent,
+				Snapshot snapshot, bool verbose, bool *pSwapToastByContent,
 				TransactionId *pFreezeXid, MultiXactId *pCutoffMulti)
 {
 	Relation	relRelation;
@@ -1330,7 +1446,6 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex,
 	 */
 	table_relation_copy_for_cluster(OldHeap, NewHeap, OldIndex, use_sort,
 									cutoffs.OldestXmin, snapshot,
-									decoding_ctx,
 									&cutoffs.FreezeLimit,
 									&cutoffs.MultiXactCutoff,
 									&num_tuples, &tups_vacuumed,
@@ -2363,59 +2478,6 @@ RepackCommandAsString(RepackCommand cmd)
 	return "???";
 }
 
-
-/*
- * Call this function before REPACK CONCURRENTLY starts to setup logical
- * decoding. It makes sure that other users of the table put enough
- * information into WAL.
- *
- * The point is that at various places we expect that the table we're
- * processing is treated like a system catalog. For example, we need to be
- * able to scan it using a "historic snapshot" anytime during the processing
- * (as opposed to scanning only at the start point of the decoding, as logical
- * replication does during initial table synchronization), in order to apply
- * concurrent UPDATE / DELETE commands.
- *
- * Note that TOAST table needs no attention here as it's not scanned using
- * historic snapshot.
- */
-static void
-begin_concurrent_repack(Relation rel)
-{
-	Oid			toastrelid;
-
-	/*
-	 * Avoid logical decoding of other relations by this backend. The lock we
-	 * have guarantees that the actual locator cannot be changed concurrently:
-	 * TRUNCATE needs AccessExclusiveLock.
-	 */
-	Assert(CheckRelationLockedByMe(rel, ShareUpdateExclusiveLock, false));
-	repacked_rel_locator = rel->rd_locator;
-	toastrelid = rel->rd_rel->reltoastrelid;
-	if (OidIsValid(toastrelid))
-	{
-		Relation	toastrel;
-
-		/* Avoid logical decoding of other TOAST relations. */
-		toastrel = table_open(toastrelid, AccessShareLock);
-		repacked_rel_toast_locator = toastrel->rd_locator;
-		table_close(toastrel, AccessShareLock);
-	}
-}
-
-/*
- * Call this when done with REPACK CONCURRENTLY.
- */
-static void
-end_concurrent_repack(void)
-{
-	/*
-	 * Restore normal function of (future) logical decoding for this backend.
-	 */
-	repacked_rel_locator.relNumber = InvalidOid;
-	repacked_rel_toast_locator.relNumber = InvalidOid;
-}
-
 /*
  * This function is much like pg_create_logical_replication_slot() except that
  * the new slot is neither released (if anyone else could read changes from
@@ -2427,9 +2489,10 @@ static LogicalDecodingContext *
 setup_logical_decoding(Oid relid)
 {
 	Relation	rel;
-	TupleDesc	tupdesc;
+	Oid			toastrelid;
 	LogicalDecodingContext *ctx;
-	RepackDecodingState *dstate = palloc0_object(RepackDecodingState);
+	NameData	slotname;
+	RepackDecodingState *dstate;
 
 	/*
 	 * REPACK CONCURRENTLY is not allowed in a transaction block, so this
@@ -2437,21 +2500,21 @@ setup_logical_decoding(Oid relid)
 	 */
 	Assert(!TransactionIdIsValid(GetTopTransactionIdIfAny()));
 
-	/*
-	 * A single backend should not execute multiple REPACK commands at a time,
-	 * so use PID to make the slot unique.
-	 */
-	snprintf(NameStr(dstate->slotname), NAMEDATALEN, "repack_%d", MyProcPid);
-
 	/*
 	 * Check if we can use logical decoding.
 	 */
 	CheckSlotPermissions();
 	CheckLogicalDecodingRequirements();
 
-	/* RS_TEMPORARY so that the slot gets cleaned up on ERROR. */
-	ReplicationSlotCreate(NameStr(dstate->slotname), true, RS_TEMPORARY,
-						  false, false, false);
+	/*
+	 * A single backend should not execute multiple REPACK commands at a time,
+	 * so use PID to make the slot unique.
+	 *
+	 * RS_TEMPORARY so that the slot gets cleaned up on ERROR.
+	 */
+	snprintf(NameStr(slotname), NAMEDATALEN, "repack_%d", MyProcPid);
+	ReplicationSlotCreate(NameStr(slotname), true, RS_TEMPORARY, false, false,
+						  false);
 
 	/*
 	 * Neither prepare_write nor do_write callback nor update_progress is
@@ -2473,197 +2536,238 @@ setup_logical_decoding(Oid relid)
 
 	DecodingContextFindStartpoint(ctx);
 
+	/*
+	 * decode_concurrent_changes() needs non-blocking callback.
+	 */
+	ctx->reader->routine.page_read = read_local_xlog_page_no_wait;
+
+	/*
+	 * read_local_xlog_page_no_wait() needs to be able to indicate the end of
+	 * WAL.
+	 */
+	ctx->reader->private_data = MemoryContextAllocZero(ctx->context,
+													  sizeof(ReadLocalXLogPageNoWaitPrivate));
+
+
 	/* Some WAL records should have been read. */
 	Assert(ctx->reader->EndRecPtr != InvalidXLogRecPtr);
 
+	/*
+	 * Initialize repack_current_segment so that we can notice WAL segment
+	 * boundaries.
+	 */
 	XLByteToSeg(ctx->reader->EndRecPtr, repack_current_segment,
 				wal_segment_size);
 
-	/*
-	 * Setup structures to store decoded changes.
-	 */
+	dstate = palloc0_object(RepackDecodingState);
 	dstate->relid = relid;
-	dstate->tstore = tuplestore_begin_heap(false, false,
-										   maintenance_work_mem);
 
-	/* Caller should already have the table locked. */
-	rel = table_open(relid, NoLock);
-	tupdesc = CreateTupleDescCopy(RelationGetDescr(rel));
-	dstate->tupdesc = tupdesc;
-	table_close(rel, NoLock);
+	/*
+	 * Tuple descriptor may be needed to flatten a tuple before we write it to
+	 * a file. A copy is needed because the decoding worker invalidates system
+	 * caches before it starts to do the actual work.
+	 */
+	rel = table_open(relid, AccessShareLock);
+	dstate->tupdesc = CreateTupleDescCopy(RelationGetDescr(rel));
 
-	/* Initialize the descriptor to store the changes ... */
-	dstate->tupdesc_change = CreateTemplateTupleDesc(1);
+	/* Avoid logical decoding of other relations. */
+	repacked_rel_locator = rel->rd_locator;
+	toastrelid = rel->rd_rel->reltoastrelid;
+	if (OidIsValid(toastrelid))
+	{
+		Relation	toastrel;
 
-	TupleDescInitEntry(dstate->tupdesc_change, 1, NULL, BYTEAOID, -1, 0);
-	/* ... as well as the corresponding slot. */
-	dstate->tsslot = MakeSingleTupleTableSlot(dstate->tupdesc_change,
-											  &TTSOpsMinimalTuple);
+		/* Avoid logical decoding of other TOAST relations. */
+		toastrel = table_open(toastrelid, AccessShareLock);
+		repacked_rel_toast_locator = toastrel->rd_locator;
+		table_close(toastrel, AccessShareLock);
+	}
+	table_close(rel, AccessShareLock);
 
-	dstate->resowner = ResourceOwnerCreate(CurrentResourceOwner,
-										   "logical decoding");
+	/* The file will be set as soon as we have it opened. */
+	dstate->file = NULL;
 
 	ctx->output_writer_private = dstate;
+
 	return ctx;
 }
 
 /*
- * Retrieve tuple from ConcurrentChange structure.
+ * Decode logical changes from the WAL sequence and store them to a file.
  *
- * The input data starts with the structure but it might not be appropriately
- * aligned.
+ * If true is returned, there is no more work for the worker.
  */
-static HeapTuple
-get_changed_tuple(char *change)
+static bool
+decode_concurrent_changes(LogicalDecodingContext *ctx,
+						  DecodingWorkerShared *shared)
 {
-	HeapTupleData tup_data;
-	HeapTuple	result;
-	char	   *src;
+	RepackDecodingState *dstate;
+	XLogRecPtr	lsn_upto;
+	bool		done;
+	char		fname[MAXPGPATH];
 
-	/*
-	 * Ensure alignment before accessing the fields. (This is why we can't use
-	 * heap_copytuple() instead of this function.)
-	 */
-	src = change + offsetof(ConcurrentChange, tup_data);
-	memcpy(&tup_data, src, sizeof(HeapTupleData));
+	dstate = (RepackDecodingState *) ctx->output_writer_private;
 
-	result = (HeapTuple) palloc(HEAPTUPLESIZE + tup_data.t_len);
-	memcpy(result, &tup_data, sizeof(HeapTupleData));
-	result->t_data = (HeapTupleHeader) ((char *) result + HEAPTUPLESIZE);
-	src = change + SizeOfConcurrentChange;
-	memcpy(result->t_data, src, result->t_len);
+	/* Open the output file. */
+	DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
+	dstate->file = BufFileCreateFileSet(&shared->sfs.fs, fname);
 
-	return result;
-}
+	SpinLockAcquire(&shared->mutex);
+	lsn_upto = shared->lsn_upto;
+	done = shared->done;
+	SpinLockRelease(&shared->mutex);
 
-/*
- * Decode logical changes from the WAL sequence up to end_of_wal.
- */
-void
-repack_decode_concurrent_changes(LogicalDecodingContext *ctx,
-								 XLogRecPtr end_of_wal)
-{
-	RepackDecodingState *dstate;
-	ResourceOwner resowner_old;
-
-	/*
-	 * Invalidate the "present" cache before moving to "(recent) history".
-	 */
-	InvalidateSystemCaches();
+	while (XLogRecPtrIsInvalid(lsn_upto) || ctx->reader->EndRecPtr < lsn_upto)
+	{
+		XLogRecord *record;
+		XLogSegNo	segno_new;
+		char	   *errm = NULL;
+		XLogRecPtr	end_lsn;
 
-	dstate = (RepackDecodingState *) ctx->output_writer_private;
-	resowner_old = CurrentResourceOwner;
-	CurrentResourceOwner = dstate->resowner;
+		CHECK_FOR_INTERRUPTS();
 
-	PG_TRY();
-	{
-		while (ctx->reader->EndRecPtr < end_of_wal)
+		record = XLogReadRecord(ctx->reader, &errm);
+		if (record == NULL)
 		{
-			XLogRecord *record;
-			XLogSegNo	segno_new;
-			char	   *errm = NULL;
-			XLogRecPtr	end_lsn;
+			ReadLocalXLogPageNoWaitPrivate *priv;
 
-			record = XLogReadRecord(ctx->reader, &errm);
 			if (errm)
-				elog(ERROR, "%s", errm);
-
-			if (record != NULL)
-				LogicalDecodingProcessRecord(ctx, ctx->reader);
+				ereport(ERROR, (errmsg("%s", errm)));
 
 			/*
-			 * If WAL segment boundary has been crossed, inform the decoding
-			 * system that the catalog_xmin can advance. (We can confirm more
-			 * often, but a filling a single WAL segment should not take much
-			 * time.)
+			 * In the decoding loop we do not want to get blocked when there
+			 * is no more WAL available, otherwise the loop would become
+			 * uninterruptible. The point is that the worker is only useful if
+			 * it starts decoding before lsn_upto is set. Thus it can reach
+			 * the end of WAL and find out later that it did not have to go
+			 * that far.
 			 */
-			end_lsn = ctx->reader->EndRecPtr;
-			XLByteToSeg(end_lsn, segno_new, wal_segment_size);
-			if (segno_new != repack_current_segment)
+			priv = (ReadLocalXLogPageNoWaitPrivate *)
+				ctx->reader->private_data;
+			if (priv->end_of_wal)
 			{
-				LogicalConfirmReceivedLocation(end_lsn);
-				elog(DEBUG1, "REPACK: confirmed receive location %X/%X",
-					 (uint32) (end_lsn >> 32), (uint32) end_lsn);
-				repack_current_segment = segno_new;
+				priv->end_of_wal = false;
+
+				/* Do we know how far we should get? */
+				if (XLogRecPtrIsInvalid(lsn_upto))
+				{
+					SpinLockAcquire(&shared->mutex);
+					lsn_upto = shared->lsn_upto;
+					/* 'done' should be set at the same time as 'lsn_upto' */
+					done = shared->done;
+					SpinLockRelease(&shared->mutex);
+
+					/* Check if the work happens to be complete. */
+					continue;
+				}
+
+				/* Wait a bit before we retry reading WAL. */
+				(void) WaitLatch(MyLatch,
+								 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+								 1000L,
+								 WAIT_EVENT_REPACK_WORKER_MAIN);
+
+				continue;
 			}
+			else
+				ereport(ERROR, (errmsg("could not read WAL record")));
+		}
 
-			CHECK_FOR_INTERRUPTS();
+		LogicalDecodingProcessRecord(ctx, ctx->reader);
+
+		/*
+		 * If WAL segment boundary has been crossed, inform the decoding
+		 * system that the catalog_xmin can advance.
+		 *
+		 * TODO Does it make sense to confirm more often? Segment size seems
+		 * appropriate for restart_lsn (because less than a segment cannot be
+		 * recycled anyway), however more frequent checks might be beneficial
+		 * for catalog_xmin.
+		 */
+		end_lsn = ctx->reader->EndRecPtr;
+		XLByteToSeg(end_lsn, segno_new, wal_segment_size);
+		if (segno_new != repack_current_segment)
+		{
+			LogicalConfirmReceivedLocation(end_lsn);
+			elog(DEBUG1, "REPACK: confirmed receive location %X/%X",
+				 (uint32) (end_lsn >> 32), (uint32) end_lsn);
+			repack_current_segment = segno_new;
+		}
+
+		/* Keep checking if 'lsn_upto' was specified. */
+		if (XLogRecPtrIsInvalid(lsn_upto))
+		{
+			SpinLockAcquire(&shared->mutex);
+			lsn_upto = shared->lsn_upto;
+			/* 'done' should be set at the same time as 'lsn_upto' */
+			done = shared->done;
+			SpinLockRelease(&shared->mutex);
 		}
-		InvalidateSystemCaches();
-		CurrentResourceOwner = resowner_old;
-	}
-	PG_CATCH();
-	{
-		/* clear all timetravel entries */
-		InvalidateSystemCaches();
-		CurrentResourceOwner = resowner_old;
-		PG_RE_THROW();
 	}
-	PG_END_TRY();
+
+	/*
+	 * Close the file and make it available to the backend.
+	 */
+	BufFileClose(dstate->file);
+	dstate->file = NULL;
+	SpinLockAcquire(&shared->mutex);
+	shared->lsn_upto = InvalidXLogRecPtr;
+	shared->sfs_valid = true;
+	shared->last_exported++;
+	SpinLockRelease(&shared->mutex);
+	ConditionVariableSignal(&shared->cv);
+
+	return done;
 }
 
 /*
- * Apply changes that happened during the initial load.
- *
- * Scan key is passed by caller, so it does not have to be constructed
- * multiple times. Key entries have all fields initialized, except for
- * sk_argument.
+ * Apply changes stored in 'file'.
  */
 static void
-apply_concurrent_changes(RepackDecodingState *dstate, ChangeDest *dest)
+apply_concurrent_changes(BufFile *file, ChangeDest *dest)
 {
+	char		kind;
+	uint32		t_len;
 	Relation	rel = dest->rel;
 	TupleTableSlot *index_slot,
 			   *ident_slot;
 	HeapTuple	tup_old = NULL;
 
-	if (dstate->nchanges == 0)
-		return;
-
 	/* TupleTableSlot is needed to pass the tuple to ExecInsertIndexTuples(). */
-	index_slot = MakeSingleTupleTableSlot(dstate->tupdesc, &TTSOpsHeapTuple);
+	index_slot = MakeSingleTupleTableSlot(RelationGetDescr(rel),
+										  &TTSOpsHeapTuple);
 
 	/* A slot to fetch tuples from identity index. */
 	ident_slot = table_slot_create(rel, NULL);
 
-	while (tuplestore_gettupleslot(dstate->tstore, true, false,
-								   dstate->tsslot))
+	while (true)
 	{
-		bool		shouldFree;
-		HeapTuple	tup_change,
-					tup,
+		size_t		nread;
+		HeapTuple	tup,
 					tup_exist;
-		char	   *change_raw,
-				   *src;
-		ConcurrentChange change;
-		bool		isnull[1];
-		Datum		values[1];
 
 		CHECK_FOR_INTERRUPTS();
 
-		/* Get the change from the single-column tuple. */
-		tup_change = ExecFetchSlotHeapTuple(dstate->tsslot, false, &shouldFree);
-		heap_deform_tuple(tup_change, dstate->tupdesc_change, values, isnull);
-		Assert(!isnull[0]);
-
-		/* Make sure we access aligned data. */
-		change_raw = (char *) DatumGetByteaP(values[0]);
-		src = (char *) VARDATA(change_raw);
-		memcpy(&change, src, SizeOfConcurrentChange);
+		nread = BufFileReadMaybeEOF(file, &kind, 1, true);
+		/* Are we done with the file? */
+		if (nread == 0)
+			break;
 
-		/*
-		 * Extract the tuple from the change. The tuple is copied here because
-		 * it might be assigned to 'tup_old', in which case it needs to
-		 * survive into the next iteration.
-		 */
-		tup = get_changed_tuple(src);
+		/* Read the tuple. */
+		BufFileReadExact(file, &t_len, sizeof(t_len));
+		tup = (HeapTuple) palloc(HEAPTUPLESIZE + t_len);
+		tup->t_data = (HeapTupleHeader) ((char *) tup + HEAPTUPLESIZE);
+		BufFileReadExact(file, tup->t_data, t_len);
+		tup->t_len = t_len;
+		ItemPointerSetInvalid(&tup->t_self);
+		tup->t_tableOid = RelationGetRelid(dest->rel);
 
-		if (change.kind == CHANGE_UPDATE_OLD)
+		if (kind == CHANGE_UPDATE_OLD)
 		{
 			Assert(tup_old == NULL);
 			tup_old = tup;
 		}
-		else if (change.kind == CHANGE_INSERT)
+		else if (kind == CHANGE_INSERT)
 		{
 			Assert(tup_old == NULL);
 
@@ -2671,12 +2775,11 @@ apply_concurrent_changes(RepackDecodingState *dstate, ChangeDest *dest)
 
 			pfree(tup);
 		}
-		else if (change.kind == CHANGE_UPDATE_NEW ||
-				 change.kind == CHANGE_DELETE)
+		else if (kind == CHANGE_UPDATE_NEW || kind == CHANGE_DELETE)
 		{
 			HeapTuple	tup_key;
 
-			if (change.kind == CHANGE_UPDATE_NEW)
+			if (kind == CHANGE_UPDATE_NEW)
 			{
 				tup_key = tup_old != NULL ? tup_old : tup;
 			}
@@ -2693,7 +2796,7 @@ apply_concurrent_changes(RepackDecodingState *dstate, ChangeDest *dest)
 			if (tup_exist == NULL)
 				elog(ERROR, "failed to find target tuple");
 
-			if (change.kind == CHANGE_UPDATE_NEW)
+			if (kind == CHANGE_UPDATE_NEW)
 				apply_concurrent_update(rel, tup, tup_exist, dest->iistate,
 										index_slot);
 			else
@@ -2708,26 +2811,19 @@ apply_concurrent_changes(RepackDecodingState *dstate, ChangeDest *dest)
 			pfree(tup);
 		}
 		else
-			elog(ERROR, "unrecognized kind of change: %d", change.kind);
+			elog(ERROR, "unrecognized kind of change: %d", kind);
 
 		/*
 		 * If a change was applied now, increment CID for next writes and
 		 * update the snapshot so it sees the changes we've applied so far.
 		 */
-		if (change.kind != CHANGE_UPDATE_OLD)
+		if (kind != CHANGE_UPDATE_OLD)
 		{
 			CommandCounterIncrement();
 			UpdateActiveSnapshotCommandId();
 		}
-
-		/* TTSOpsMinimalTuple has .get_heap_tuple==NULL. */
-		Assert(shouldFree);
-		pfree(tup_change);
 	}
 
-	tuplestore_clear(dstate->tstore);
-	dstate->nchanges = 0;
-
 	/* Cleanup. */
 	ExecDropSingleTupleTableSlot(index_slot);
 	ExecDropSingleTupleTableSlot(ident_slot);
@@ -2865,6 +2961,12 @@ find_target_tuple(Relation rel, ChangeDest *dest, HeapTuple tup_key,
 	/* XXX no instrumentation for now */
 	scan = index_beginscan(rel, ident_index, GetActiveSnapshot(),
 						   NULL, dest->ident_key_nentries, 0);
+
+	/*
+	 * Scan key is passed by caller, so it does not have to be constructed
+	 * multiple times. Key entries have all fields initialized, except for
+	 * sk_argument.
+	 */
 	index_rescan(scan, dest->ident_key, dest->ident_key_nentries, NULL, 0);
 
 	/* Info needed to retrieve key values from heap tuple. */
@@ -2900,25 +3002,58 @@ find_target_tuple(Relation rel, ChangeDest *dest, HeapTuple tup_key,
 }
 
 /*
- * Decode and apply concurrent changes.
+ * Decode and apply concurrent changes, up to (and including) the record whose
+ * LSN is 'end_of_wal'.
  */
 static void
-process_concurrent_changes(LogicalDecodingContext *decoding_ctx,
-						   XLogRecPtr end_of_wal, ChangeDest *dest)
+process_concurrent_changes(XLogRecPtr end_of_wal, ChangeDest *dest, bool done)
 {
-	RepackDecodingState *dstate;
+	DecodingWorkerShared *shared;
+	char		fname[MAXPGPATH];
+	BufFile    *file;
 
 	pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
 								 PROGRESS_REPACK_PHASE_CATCH_UP);
 
-	dstate = (RepackDecodingState *) decoding_ctx->output_writer_private;
+	/* Ask the worker for the file. */
+	shared = (DecodingWorkerShared *) dsm_segment_address(decoding_worker->seg);
+	SpinLockAcquire(&shared->mutex);
+	shared->lsn_upto = end_of_wal;
+	shared->done = done;
+	SpinLockRelease(&shared->mutex);
 
-	repack_decode_concurrent_changes(decoding_ctx, end_of_wal);
+	/*
+	 * The worker needs to finish processing of the current WAL record. Even
+	 * if it's idle, it'll need to close the output file. Thus we're likely to
+	 * wait, so prepare for sleep.
+	 */
+	ConditionVariablePrepareToSleep(&shared->cv);
+	for (;;)
+	{
+		bool		valid;
 
-	if (dstate->nchanges == 0)
-		return;
+		SpinLockAcquire(&shared->mutex);
+		valid = shared->sfs_valid;
+		SpinLockRelease(&shared->mutex);
+
+		if (valid)
+			break;
+
+		ConditionVariableSleep(&shared->cv, WAIT_EVENT_REPACK_WORKER_EXPORT);
+	}
+	ConditionVariableCancelSleep();
 
-	apply_concurrent_changes(dstate, dest);
+	/* Open the file. */
+	DecodingWorkerFileName(fname, shared->relid, shared->last_exported);
+	file = BufFileOpenFileSet(&shared->sfs.fs, fname, O_RDONLY, false);
+	apply_concurrent_changes(file, dest);
+
+	/* No file is exported until the worker exports the next one. */
+	SpinLockAcquire(&shared->mutex);
+	shared->sfs_valid = false;
+	SpinLockRelease(&shared->mutex);
+
+	BufFileClose(file);
 }
 
 /*
@@ -3044,15 +3179,10 @@ cleanup_logical_decoding(LogicalDecodingContext *ctx)
 
 	dstate = (RepackDecodingState *) ctx->output_writer_private;
 
-	ExecDropSingleTupleTableSlot(dstate->tsslot);
-	FreeTupleDesc(dstate->tupdesc_change);
 	FreeTupleDesc(dstate->tupdesc);
-	tuplestore_end(dstate->tstore);
-
 	FreeDecodingContext(ctx);
 
-	ReplicationSlotRelease();
-	ReplicationSlotDrop(NameStr(dstate->slotname), false);
+	ReplicationSlotDropAcquired();
 	pfree(dstate);
 }
 
@@ -3067,7 +3197,6 @@ cleanup_logical_decoding(LogicalDecodingContext *ctx)
 static void
 rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
 								   Relation cl_index,
-								   LogicalDecodingContext *decoding_ctx,
 								   TransactionId frozenXid,
 								   MultiXactId cutoffMulti)
 {
@@ -3170,7 +3299,7 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
 	 * hold AccessExclusiveLock. (Quite some amount of WAL could have been
 	 * written during the data copying and index creation.)
 	 */
-	process_concurrent_changes(decoding_ctx, end_of_wal, &chgdst);
+	process_concurrent_changes(end_of_wal, &chgdst, false);
 
 	/*
 	 * Acquire AccessExclusiveLock on the table, its TOAST relation (if there
@@ -3266,8 +3395,11 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
 	XLogFlush(wal_insert_ptr);
 	end_of_wal = GetFlushRecPtr(NULL);
 
-	/* Apply the concurrent changes again. */
-	process_concurrent_changes(decoding_ctx, end_of_wal, &chgdst);
+	/*
+	 * Apply the concurrent changes again. Indicate that the decoding worker
+	 * won't be needed anymore.
+	 */
+	process_concurrent_changes(end_of_wal, &chgdst, true);
 
 	/* Remember info about rel before closing OldHeap */
 	relpersistence = OldHeap->rd_rel->relpersistence;
@@ -3380,3 +3512,488 @@ build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes)
 
 	return result;
 }
+
+/*
+ * Try to start a background worker to perform logical decoding of data
+ * changes applied to relation while REPACK CONCURRENTLY is copying its
+ * contents to a new table.
+ */
+static void
+start_decoding_worker(Oid relid)
+{
+	Size		size;
+	dsm_segment *seg;
+	DecodingWorkerShared *shared;
+	shm_mq	   *mq;
+	shm_mq_handle *mqh;
+	BackgroundWorker bgw;
+
+	/* Setup shared memory. */
+	size = BUFFERALIGN(offsetof(DecodingWorkerShared, error_queue)) +
+		BUFFERALIGN(REPACK_ERROR_QUEUE_SIZE);
+	seg = dsm_create(size, 0);
+	shared = (DecodingWorkerShared *) dsm_segment_address(seg);
+	shared->lsn_upto = InvalidXLogRecPtr;
+	shared->done = false;
+	SharedFileSetInit(&shared->sfs, seg);
+	shared->sfs_valid = false;
+	shared->last_exported = -1;
+	SpinLockInit(&shared->mutex);
+	shared->dbid = MyDatabaseId;
+
+	/*
+	 * This is the UserId set in cluster_rel(). Security context shouldn't be
+	 * needed for decoding worker.
+	 */
+	shared->roleid = GetUserId();
+	shared->relid = relid;
+	ConditionVariableInit(&shared->cv);
+	shared->backend_proc = MyProc;
+	shared->backend_pid = MyProcPid;
+	shared->backend_proc_number = MyProcNumber;
+
+	mq = shm_mq_create((char *) BUFFERALIGN(shared->error_queue),
+					   REPACK_ERROR_QUEUE_SIZE);
+	shm_mq_set_receiver(mq, MyProc);
+	mqh = shm_mq_attach(mq, seg, NULL);
+
+	memset(&bgw, 0, sizeof(bgw));
+	snprintf(bgw.bgw_name, BGW_MAXLEN,
+			 "REPACK decoding worker for relation \"%s\"",
+			 get_rel_name(relid));
+	snprintf(bgw.bgw_type, BGW_MAXLEN, "REPACK decoding worker");
+	bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
+		BGWORKER_BACKEND_DATABASE_CONNECTION;
+	bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
+	bgw.bgw_restart_time = BGW_NEVER_RESTART;
+	snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres");
+	snprintf(bgw.bgw_function_name, BGW_MAXLEN, "RepackWorkerMain");
+	bgw.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
+	bgw.bgw_notify_pid = MyProcPid;
+
+	decoding_worker = palloc0_object(DecodingWorker);
+	if (!RegisterDynamicBackgroundWorker(&bgw, &decoding_worker->handle))
+		ereport(ERROR,
+				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+				 errmsg("out of background worker slots"),
+				 errhint("You might need to increase \"%s\".", "max_worker_processes")));
+
+	decoding_worker->seg = seg;
+	decoding_worker->error_mqh = mqh;
+}
+
+/*
+ * Stop the decoding worker and cleanup the related resources.
+ *
+ * The worker stops on its own when it knows there is no more work to do, but
+ * we need to stop it explicitly at least on ERROR in the launching backend.
+ */
+static void
+stop_decoding_worker(void)
+{
+	BgwHandleStatus status;
+
+	/* Haven't reached the worker startup? */
+	if (decoding_worker == NULL)
+		return;
+
+	/* Could not register the worker? */
+	if (decoding_worker->handle == NULL)
+		return;
+
+	TerminateBackgroundWorker(decoding_worker->handle);
+	/* The worker should really exit before the REPACK command does. */
+	HOLD_INTERRUPTS();
+	status = WaitForBackgroundWorkerShutdown(decoding_worker->handle);
+	RESUME_INTERRUPTS();
+
+	if (status == BGWH_POSTMASTER_DIED)
+		ereport(FATAL,
+				(errcode(ERRCODE_ADMIN_SHUTDOWN),
+				 errmsg("postmaster exited during REPACK command")));
+
+	shm_mq_detach(decoding_worker->error_mqh);
+
+	/*
+	 * If we could not cancel the current sleep due to ERROR, do that before
+	 * we detach from the shared memory the condition variable is located in.
+	 * If we did not, the bgworker ERROR handling code would try and fail
+	 * badly.
+	 */
+	ConditionVariableCancelSleep();
+
+	dsm_detach(decoding_worker->seg);
+	pfree(decoding_worker);
+	decoding_worker = NULL;
+}
+
+/* Is this process a REPACK worker? */
+static bool is_repack_worker = false;
+
+static pid_t backend_pid;
+static ProcNumber backend_proc_number;
+
+/*
+ * See ParallelWorkerShutdown for details.
+ */
+static void
+RepackWorkerShutdown(int code, Datum arg)
+{
+	SendProcSignal(backend_pid,
+				   PROCSIG_REPACK_MESSAGE,
+				   backend_proc_number);
+
+	dsm_detach((dsm_segment *) DatumGetPointer(arg));
+}
+
+/* REPACK decoding worker entry point */
+void
+RepackWorkerMain(Datum main_arg)
+{
+	dsm_segment *seg;
+	DecodingWorkerShared *shared;
+	shm_mq	   *mq;
+	shm_mq_handle *mqh;
+
+	is_repack_worker = true;
+
+	/*
+	 * Override the default bgworker_die() with die() so we can use
+	 * CHECK_FOR_INTERRUPTS().
+	 */
+	pqsignal(SIGTERM, die);
+	BackgroundWorkerUnblockSignals();
+
+	seg = dsm_attach(DatumGetUInt32(main_arg));
+	if (seg == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("could not map dynamic shared memory segment")));
+
+	shared = (DecodingWorkerShared *) dsm_segment_address(seg);
+
+	/* Arrange to signal the leader if we exit. */
+	backend_pid = shared->backend_pid;
+	backend_proc_number = shared->backend_proc_number;
+	before_shmem_exit(RepackWorkerShutdown, PointerGetDatum(seg));
+
+	/*
+	 * Join locking group - see the comments around the call of
+	 * start_decoding_worker().
+	 */
+	if (!BecomeLockGroupMember(shared->backend_proc, backend_pid))
+		/* The leader is not running anymore. */
+		return;
+
+	/*
+	 * Setup a queue to send error messages to the backend that launched this
+	 * worker.
+	 */
+	mq = (shm_mq *) (char *) BUFFERALIGN(shared->error_queue);
+	shm_mq_set_sender(mq, MyProc);
+	mqh = shm_mq_attach(mq, seg, NULL);
+	pq_redirect_to_shm_mq(seg, mqh);
+	pq_set_parallel_leader(shared->backend_pid,
+						   shared->backend_proc_number);
+
+	/* Connect to the database. */
+	BackgroundWorkerInitializeConnectionByOid(shared->dbid, shared->roleid, 0);
+
+	repack_worker_internal(seg);
+}
+
+static void
+repack_worker_internal(dsm_segment *seg)
+{
+	DecodingWorkerShared *shared;
+	LogicalDecodingContext *decoding_ctx;
+	SharedFileSet *sfs;
+	Snapshot	snapshot;
+
+	/*
+	 * Transaction is needed to open relation, and it also provides us with a
+	 * resource owner.
+	 */
+	StartTransactionCommand();
+
+	shared = (DecodingWorkerShared *) dsm_segment_address(seg);
+
+	/*
+	 * Not sure the spinlock is needed here - the backend should not change
+	 * anything in the shared memory until we have serialized the snapshot.
+	 */
+	SpinLockAcquire(&shared->mutex);
+	Assert(XLogRecPtrIsInvalid(shared->lsn_upto));
+	Assert(!shared->sfs_valid);
+	sfs = &shared->sfs;
+	SpinLockRelease(&shared->mutex);
+
+	SharedFileSetAttach(sfs, seg);
+
+	/*
+	 * Prepare to capture the concurrent data changes ourselves.
+	 */
+	decoding_ctx = setup_logical_decoding(shared->relid);
+
+	/* Build the initial snapshot and export it. */
+	snapshot = SnapBuildInitialSnapshotForRepack(decoding_ctx->snapshot_builder);
+	export_initial_snapshot(snapshot, shared);
+
+	/*
+	 * The worker already had to access some system catalogs during startup,
+	 * and we even had to open the relation we are processing. Now that we're
+	 * going to work with historic snapshots, the system caches must be
+	 * invalidated.
+	 */
+	InvalidateSystemCaches();
+
+	while (!decode_concurrent_changes(decoding_ctx, shared))
+		;
+
+	/* Cleanup. */
+	cleanup_logical_decoding(decoding_ctx);
+	CommitTransactionCommand();
+}
+
+/*
+ * Make snapshot available to the backend that launched the decoding worker.
+ */
+static void
+export_initial_snapshot(Snapshot snapshot, DecodingWorkerShared *shared)
+{
+	char		fname[MAXPGPATH];
+	BufFile    *file;
+	Size		snap_size;
+	char	   *snap_space;
+
+	snap_size = EstimateSnapshotSpace(snapshot);
+	snap_space = (char *) palloc(snap_size);
+	SerializeSnapshot(snapshot, snap_space);
+	FreeSnapshot(snapshot);
+
+	DecodingWorkerFileName(fname, shared->relid, shared->last_exported + 1);
+	file = BufFileCreateFileSet(&shared->sfs.fs, fname);
+	/* To make restoration easier, write the snapshot size first. */
+	BufFileWrite(file, &snap_size, sizeof(snap_size));
+	BufFileWrite(file, snap_space, snap_size);
+	pfree(snap_space);
+	BufFileClose(file);
+
+	/* Tell the backend that the file is available. */
+	SpinLockAcquire(&shared->mutex);
+	shared->sfs_valid = true;
+	shared->last_exported++;
+	SpinLockRelease(&shared->mutex);
+	ConditionVariableSignal(&shared->cv);
+}
+
+/*
+ * Get the initial snapshot from the decoding worker.
+ */
+static Snapshot
+get_initial_snapshot(DecodingWorker *worker)
+{
+	DecodingWorkerShared *shared;
+	char		fname[MAXPGPATH];
+	BufFile    *file;
+	Size		snap_size;
+	char	   *snap_space;
+	Snapshot	snapshot;
+
+	shared = (DecodingWorkerShared *) dsm_segment_address(worker->seg);
+
+	/*
+	 * The worker needs to initialize the logical decoding, which usually
+	 * takes some time. Therefore it makes sense to prepare for the sleep
+	 * first.
+	 */
+	ConditionVariablePrepareToSleep(&shared->cv);
+	for (;;)
+	{
+		bool		valid;
+
+		SpinLockAcquire(&shared->mutex);
+		valid = shared->sfs_valid;
+		SpinLockRelease(&shared->mutex);
+
+		if (valid)
+			break;
+
+		ConditionVariableSleep(&shared->cv, WAIT_EVENT_REPACK_WORKER_EXPORT);
+	}
+	ConditionVariableCancelSleep();
+
+	/* Read the snapshot from a file. */
+	DecodingWorkerFileName(fname, shared->relid, shared->last_exported);
+	file = BufFileOpenFileSet(&shared->sfs.fs, fname, O_RDONLY, false);
+	BufFileReadExact(file, &snap_size, sizeof(snap_size));
+	snap_space = (char *) palloc(snap_size);
+	BufFileReadExact(file, snap_space, snap_size);
+	BufFileClose(file);
+
+	SpinLockAcquire(&shared->mutex);
+	shared->sfs_valid = false;
+	SpinLockRelease(&shared->mutex);
+
+	/* Restore it. */
+	snapshot = RestoreSnapshot(snap_space);
+	pfree(snap_space);
+
+	return snapshot;
+}
+
+bool
+IsRepackWorker(void)
+{
+	return is_repack_worker;
+}
+
+/*
+ * Handle receipt of an interrupt indicating a repack worker message.
+ *
+ * Note: this is called within a signal handler!  All we can do is set
+ * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
+ * ProcessRepackMessages().
+ */
+void
+HandleRepackMessageInterrupt(void)
+{
+	InterruptPending = true;
+	RepackMessagePending = true;
+	SetLatch(MyLatch);
+}
+
+/*
+ * Process any queued protocol messages received from parallel workers.
+ */
+void
+ProcessRepackMessages(void)
+{
+	MemoryContext oldcontext;
+
+	static MemoryContext hpm_context = NULL;
+
+	/*
+	 * Nothing to do if we haven't launched the worker yet or have already
+	 * terminated it.
+	 */
+	if (decoding_worker == NULL)
+		return;
+
+	/*
+	 * This is invoked from ProcessInterrupts(), and since some of the
+	 * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
+	 * for recursive calls if more signals are received while this runs.  It's
+	 * unclear that recursive entry would be safe, and it doesn't seem useful
+	 * even if it is safe, so let's block interrupts until done.
+	 */
+	HOLD_INTERRUPTS();
+
+	/*
+	 * Moreover, CurrentMemoryContext might be pointing almost anywhere.  We
+	 * don't want to risk leaking data into long-lived contexts, so let's do
+	 * our work here in a private context that we can reset on each use.
+	 */
+	if (hpm_context == NULL)	/* first time through? */
+		hpm_context = AllocSetContextCreate(TopMemoryContext,
+											"ProcessParallelMessages",
+											ALLOCSET_DEFAULT_SIZES);
+	else
+		MemoryContextReset(hpm_context);
+
+	oldcontext = MemoryContextSwitchTo(hpm_context);
+
+	/* OK to process messages.  Reset the flag saying there are more to do. */
+	RepackMessagePending = false;
+
+	/*
+	 * Read as many messages as we can from each worker, but stop when no more
+	 * messages can be read from the worker without blocking.
+	 */
+	while (true)
+	{
+		shm_mq_result res;
+		Size		nbytes;
+		void	   *data;
+
+		res = shm_mq_receive(decoding_worker->error_mqh, &nbytes,
+							 &data, true);
+		if (res == SHM_MQ_WOULD_BLOCK)
+			break;
+		else if (res == SHM_MQ_SUCCESS)
+		{
+			StringInfoData msg;
+
+			initStringInfo(&msg);
+			appendBinaryStringInfo(&msg, data, nbytes);
+			ProcessRepackMessage(&msg);
+			pfree(msg.data);
+		}
+		else
+		{
+			/*
+			 * The decoding worker is special in that it exits as soon as it
+			 * has its work done. Thus the DETACHED result code is fine.
+			 */
+			Assert(res = SHM_MQ_DETACHED);
+
+			break;
+		}
+	}
+
+	MemoryContextSwitchTo(oldcontext);
+
+	/* Might as well clear the context on our way out */
+	MemoryContextReset(hpm_context);
+
+	RESUME_INTERRUPTS();
+}
+
+/*
+ * Process a single protocol message received from a single parallel worker.
+ */
+static void
+ProcessRepackMessage(StringInfo msg)
+{
+	char		msgtype;
+
+	msgtype = pq_getmsgbyte(msg);
+
+	switch (msgtype)
+	{
+		case PqMsg_ErrorResponse:
+		case PqMsg_NoticeResponse:
+			{
+				ErrorData	edata;
+
+				/* Parse ErrorResponse or NoticeResponse. */
+				pq_parse_errornotice(msg, &edata);
+
+				/* Death of a worker isn't enough justification for suicide. */
+				edata.elevel = Min(edata.elevel, ERROR);
+
+				/*
+				 * If desired, add a context line to show that this is a
+				 * message propagated from a parallel worker.  Otherwise, it
+				 * can sometimes be confusing to understand what actually
+				 * happened.
+				 */
+				if (edata.context)
+					edata.context = psprintf("%s\n%s", edata.context,
+											 _("decoding worker"));
+				else
+					edata.context = pstrdup(_("decoding worker"));
+
+				/* Rethrow error or print notice. */
+				ThrowErrorData(&edata);
+
+				break;
+			}
+
+		default:
+			{
+				elog(ERROR, "unrecognized message type received from decoding worker: %c (message length %d bytes)",
+					 msgtype, msg->len);
+			}
+	}
+}
diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c
index 2b75de0ddef..28a5a400fb1 100644
--- a/src/backend/libpq/pqmq.c
+++ b/src/backend/libpq/pqmq.c
@@ -14,6 +14,7 @@
 #include "postgres.h"
 
 #include "access/parallel.h"
+#include "commands/cluster.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "libpq/pqmq.h"
@@ -175,6 +176,10 @@ mq_putmessage(char msgtype, const char *s, size_t len)
 				SendProcSignal(pq_mq_parallel_leader_pid,
 							   PROCSIG_PARALLEL_APPLY_MESSAGE,
 							   pq_mq_parallel_leader_proc_number);
+			else if (IsRepackWorker())
+				SendProcSignal(pq_mq_parallel_leader_pid,
+							   PROCSIG_REPACK_MESSAGE,
+							   pq_mq_parallel_leader_proc_number);
 			else
 			{
 				Assert(IsParallelWorker());
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index 142a02eb5e9..b368990e90b 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -13,6 +13,7 @@
 #include "postgres.h"
 
 #include "access/parallel.h"
+#include "commands/cluster.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -135,6 +136,9 @@ static const struct
 	},
 	{
 		"SequenceSyncWorkerMain", SequenceSyncWorkerMain
+	},
+	{
+		"RepackWorkerMain", RepackWorkerMain
 	}
 };
 
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 866f92cf799..85323ba61e2 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -205,7 +205,11 @@ StartupDecodingContext(List *output_plugin_options,
 
 	ctx->slot = slot;
 
-	ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, xl_routine, ctx);
+	/*
+	 * TODO A separate patch for PG core, unless there's really a reason to
+	 * pass ctx for private_data (May extensions expect ctx?).
+	 */
+	ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, xl_routine, NULL);
 	if (!ctx->reader)
 		ereport(ERROR,
 				(errcode(ERRCODE_OUT_OF_MEMORY),
diff --git a/src/backend/replication/pgoutput_repack/pgoutput_repack.c b/src/backend/replication/pgoutput_repack/pgoutput_repack.c
index c8930640a0d..fb9956d392d 100644
--- a/src/backend/replication/pgoutput_repack/pgoutput_repack.c
+++ b/src/backend/replication/pgoutput_repack/pgoutput_repack.c
@@ -168,17 +168,13 @@ store_change(LogicalDecodingContext *ctx, ConcurrentChangeKind kind,
 			 HeapTuple tuple)
 {
 	RepackDecodingState *dstate;
-	char	   *change_raw;
-	ConcurrentChange change;
+	char		kind_byte = (char) kind;
 	bool		flattened = false;
-	Size		size;
-	Datum		values[1];
-	bool		isnull[1];
-	char	   *dst;
 
 	dstate = (RepackDecodingState *) ctx->output_writer_private;
 
-	size = VARHDRSZ + SizeOfConcurrentChange;
+	/* Store the change kind. */
+	BufFileWrite(dstate->file, &kind_byte, 1);
 
 	/*
 	 * ReorderBufferCommit() stores the TOAST chunks in its private memory
@@ -195,46 +191,12 @@ store_change(LogicalDecodingContext *ctx, ConcurrentChangeKind kind,
 		tuple = toast_flatten_tuple(tuple, dstate->tupdesc);
 		flattened = true;
 	}
+	/* Store the tuple size ... */
+	BufFileWrite(dstate->file, &tuple->t_len, sizeof(tuple->t_len));
+	/* ... and the tuple itself. */
+	BufFileWrite(dstate->file, tuple->t_data, tuple->t_len);
 
-	size += tuple->t_len;
-	if (size >= MaxAllocSize)
-		elog(ERROR, "Change is too big.");
-
-	/* Construct the change. */
-	change_raw = (char *) palloc0(size);
-	SET_VARSIZE(change_raw, size);
-
-	/*
-	 * Since the varlena alignment might not be sufficient for the structure,
-	 * set the fields in a local instance and remember where it should
-	 * eventually be copied.
-	 */
-	change.kind = kind;
-	dst = (char *) VARDATA(change_raw);
-
-	/*
-	 * Copy the tuple.
-	 *
-	 * Note: change->tup_data.t_data must be fixed on retrieval!
-	 */
-	memcpy(&change.tup_data, tuple, sizeof(HeapTupleData));
-	memcpy(dst, &change, SizeOfConcurrentChange);
-	dst += SizeOfConcurrentChange;
-	memcpy(dst, tuple->t_data, tuple->t_len);
-
-	/* The data has been copied. */
+	/* Free the flat copy if created above. */
 	if (flattened)
 		pfree(tuple);
-
-	/* Store as tuple of 1 bytea column. */
-	values[0] = PointerGetDatum(change_raw);
-	isnull[0] = false;
-	tuplestore_putvalues(dstate->tstore, dstate->tupdesc_change,
-						 values, isnull);
-
-	/* Accounting. */
-	dstate->nchanges++;
-
-	/* Cleanup. */
-	pfree(change_raw);
 }
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index 087821311cc..af12144795b 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -19,6 +19,7 @@
 
 #include "access/parallel.h"
 #include "commands/async.h"
+#include "commands/cluster.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "port/pg_bitutils.h"
@@ -694,6 +695,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
 	if (CheckProcSignal(PROCSIG_PARALLEL_APPLY_MESSAGE))
 		HandleParallelApplyMessageInterrupt();
 
+	if (CheckProcSignal(PROCSIG_REPACK_MESSAGE))
+		HandleRepackMessageInterrupt();
+
 	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_DATABASE))
 		HandleRecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_DATABASE);
 
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 7dd75a490aa..4a4858882f0 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -36,6 +36,7 @@
 #include "access/xact.h"
 #include "catalog/pg_type.h"
 #include "commands/async.h"
+#include "commands/cluster.h"
 #include "commands/event_trigger.h"
 #include "commands/explain_state.h"
 #include "commands/prepare.h"
@@ -3541,6 +3542,9 @@ ProcessInterrupts(void)
 
 	if (ParallelApplyMessagePending)
 		ProcessParallelApplyMessages();
+
+	if (RepackMessagePending)
+		ProcessRepackMessages();
 }
 
 /*
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index f39830dbb34..cbcc8550960 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -62,6 +62,7 @@ LOGICAL_APPLY_MAIN	"Waiting in main loop of logical replication apply process."
 LOGICAL_LAUNCHER_MAIN	"Waiting in main loop of logical replication launcher process."
 LOGICAL_PARALLEL_APPLY_MAIN	"Waiting in main loop of logical replication parallel apply process."
 RECOVERY_WAL_STREAM	"Waiting in main loop of startup process for WAL to arrive, during streaming recovery."
+REPACK_WORKER_MAIN	"Waiting in main loop of REPACK decoding worker process."
 REPLICATION_SLOTSYNC_MAIN	"Waiting in main loop of slot sync worker."
 REPLICATION_SLOTSYNC_SHUTDOWN	"Waiting for slot sync worker to shut down."
 SYSLOGGER_MAIN	"Waiting in main loop of syslogger process."
@@ -153,6 +154,7 @@ RECOVERY_CONFLICT_SNAPSHOT	"Waiting for recovery conflict resolution for a vacuu
 RECOVERY_CONFLICT_TABLESPACE	"Waiting for recovery conflict resolution for dropping a tablespace."
 RECOVERY_END_COMMAND	"Waiting for <xref linkend="guc-recovery-end-command"/> to complete."
 RECOVERY_PAUSE	"Waiting for recovery to be resumed."
+REPACK_WORKER_EXPORT	"Waiting for decoding worker to export a new output file."
 REPLICATION_ORIGIN_DROP	"Waiting for a replication origin to become inactive so it can be dropped."
 REPLICATION_SLOT_DROP	"Waiting for a replication slot to become inactive so it can be dropped."
 RESTORE_COMMAND	"Waiting for <xref linkend="guc-restore-command"/> to complete."
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index d8f76d325f9..2b983abce3e 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -22,7 +22,6 @@
 #include "access/xact.h"
 #include "commands/vacuum.h"
 #include "executor/tuptable.h"
-#include "replication/logical.h"
 #include "storage/read_stream.h"
 #include "utils/rel.h"
 #include "utils/snapshot.h"
@@ -631,7 +630,6 @@ typedef struct TableAmRoutine
 											  bool use_sort,
 											  TransactionId OldestXmin,
 											  Snapshot snapshot,
-											  LogicalDecodingContext *decoding_ctx,
 											  TransactionId *xid_cutoff,
 											  MultiXactId *multi_cutoff,
 											  double *num_tuples,
@@ -1652,7 +1650,7 @@ table_relation_copy_data(Relation rel, const RelFileLocator *newrlocator)
  * - snapshot - if != NULL, ignore data changes done by transactions that this
  *	 (MVCC) snapshot considers still in-progress or in the future.
  * - decoding_ctx - logical decoding context, to capture concurrent data
- *   changes.
+ *   changes. NULL if background worker takes care of the decoding.
  *
  * Output parameters:
  * - *xid_cutoff - rel's new relfrozenxid value, may be invalid
@@ -1666,7 +1664,6 @@ table_relation_copy_for_cluster(Relation OldTable, Relation NewTable,
 								bool use_sort,
 								TransactionId OldestXmin,
 								Snapshot snapshot,
-								LogicalDecodingContext *decoding_ctx,
 								TransactionId *xid_cutoff,
 								MultiXactId *multi_cutoff,
 								double *num_tuples,
@@ -1675,7 +1672,7 @@ table_relation_copy_for_cluster(Relation OldTable, Relation NewTable,
 {
 	OldTable->rd_tableam->relation_copy_for_cluster(OldTable, NewTable, OldIndex,
 													use_sort, OldestXmin,
-													snapshot, decoding_ctx,
+													snapshot,
 													xid_cutoff, multi_cutoff,
 													num_tuples, tups_vacuumed,
 													tups_recently_dead);
diff --git a/src/include/commands/cluster.h b/src/include/commands/cluster.h
index b43a1740053..0ac70ec30d7 100644
--- a/src/include/commands/cluster.h
+++ b/src/include/commands/cluster.h
@@ -17,6 +17,7 @@
 #include "nodes/parsenodes.h"
 #include "parser/parse_node.h"
 #include "replication/logical.h"
+#include "storage/buffile.h"
 #include "storage/lock.h"
 #include "storage/relfilelocator.h"
 #include "utils/relcache.h"
@@ -47,6 +48,9 @@ typedef struct ClusterParams
 extern RelFileLocator repacked_rel_locator;
 extern RelFileLocator repacked_rel_toast_locator;
 
+/*
+ * Stored as a single byte in the output file.
+ */
 typedef enum
 {
 	CHANGE_INSERT,
@@ -55,68 +59,30 @@ typedef enum
 	CHANGE_DELETE
 } ConcurrentChangeKind;
 
-typedef struct ConcurrentChange
-{
-	/* See the enum above. */
-	ConcurrentChangeKind kind;
-
-	/*
-	 * The actual tuple.
-	 *
-	 * The tuple data follows the ConcurrentChange structure. Before use make
-	 * sure the tuple is correctly aligned (ConcurrentChange can be stored as
-	 * bytea) and that tuple->t_data is fixed.
-	 */
-	HeapTupleData tup_data;
-} ConcurrentChange;
-
-#define SizeOfConcurrentChange (offsetof(ConcurrentChange, tup_data) + \
-								sizeof(HeapTupleData))
-
 /*
  * Logical decoding state.
  *
- * Here we store the data changes that we decode from WAL while the table
- * contents is being copied to a new storage. Also the necessary metadata
- * needed to apply these changes to the table is stored here.
+ * The output plugin uses it to store the data changes that it decodes from
+ * WAL while the table contents is being copied to a new storage.
  */
 typedef struct RepackDecodingState
 {
 	/* The relation whose changes we're decoding. */
 	Oid			relid;
 
-	/* Replication slot name. */
-	NameData	slotname;
-
-	/*
-	 * Decoded changes are stored here. Although we try to avoid excessive
-	 * batches, it can happen that the changes need to be stored to disk. The
-	 * tuplestore does this transparently.
-	 */
-	Tuplestorestate *tstore;
-
-	/* The current number of changes in tstore. */
-	double		nchanges;
-
-	/*
-	 * Descriptor to store the ConcurrentChange structure serialized (bytea).
-	 * We can't store the tuple directly because tuplestore only supports
-	 * minimum tuple and we may need to transfer OID system column from the
-	 * output plugin. Also we need to transfer the change kind, so it's better
-	 * to put everything in the structure than to use 2 tuplestores "in
-	 * parallel".
-	 */
-	TupleDesc	tupdesc_change;
-
-	/* Tuple descriptor needed to update indexes. */
+	/* Tuple descriptor of the relation being processed. */
 	TupleDesc	tupdesc;
 
-	/* Slot to retrieve data from tstore. */
-	TupleTableSlot *tsslot;
-
-	ResourceOwner resowner;
+	/* The current output file. */
+	BufFile    *file;
 } RepackDecodingState;
 
+extern PGDLLIMPORT volatile sig_atomic_t RepackMessagePending;
+
+extern bool IsRepackWorker(void);
+extern void HandleRepackMessageInterrupt(void);
+extern void ProcessRepackMessages(void);
+
 extern void ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel);
 
 extern void cluster_rel(RepackCommand command, Relation OldHeap, Oid indexOid,
@@ -125,9 +91,6 @@ extern void check_index_is_clusterable(Relation OldHeap, Oid indexOid,
 									   LOCKMODE lockmode);
 extern void mark_index_clustered(Relation rel, Oid indexOid, bool is_internal);
 
-extern void repack_decode_concurrent_changes(LogicalDecodingContext *ctx,
-											 XLogRecPtr end_of_wal);
-
 extern Oid	make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod,
 						  char relpersistence, LOCKMODE lockmode);
 extern void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap,
@@ -140,4 +103,5 @@ extern void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap,
 							 MultiXactId cutoffMulti,
 							 char newrelpersistence);
 
+extern void RepackWorkerMain(Datum main_arg);
 #endif							/* CLUSTER_H */
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index afeeb1ca019..c0a66516b66 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -36,6 +36,7 @@ typedef enum
 	PROCSIG_BARRIER,			/* global barrier interrupt  */
 	PROCSIG_LOG_MEMORY_CONTEXT, /* ask backend to log the memory contexts */
 	PROCSIG_PARALLEL_APPLY_MESSAGE, /* Message from parallel apply workers */
+	PROCSIG_REPACK_MESSAGE,		/* Message from repack worker */
 
 	/* Recovery conflict reasons */
 	PROCSIG_RECOVERY_CONFLICT_FIRST,
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 3139b14e85f..35344910f65 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -488,7 +488,6 @@ CompressFileHandle
 CompressionLocation
 CompressorState
 ComputeXidHorizonsResult
-ConcurrentChange
 ConcurrentChangeKind
 ConditionVariable
 ConditionVariableMinimallyPadded
@@ -629,6 +628,9 @@ DeclareCursorStmt
 DecodedBkpBlock
 DecodedXLogRecord
 DecodingOutputState
+DecodingWorker
+DecodingWorkerShared
+DecodingWorkerState
 DefElem
 DefElemAction
 DefaultACLInfo
-- 
2.47.3

