From 7ffeeb69fd626dbcc35ca13d97a8407573ea6d4a Mon Sep 17 00:00:00 2001
From: Alexey Kondratov <kondratov.aleksey@gmail.com>
Date: Wed, 28 Aug 2019 15:26:50 +0300
Subject: [PATCH v2 11/11] BGWorkers pool for streamed transactions apply
 without spilling on disk

---
 src/backend/postmaster/bgworker.c        |    3 +
 src/backend/postmaster/pgstat.c          |    3 +
 src/backend/replication/logical/proto.c  |   17 +-
 src/backend/replication/logical/worker.c | 1783 +++++++++++-----------
 src/include/pgstat.h                     |    1 +
 src/include/replication/logicalproto.h   |    4 +-
 src/include/replication/logicalworker.h  |    1 +
 7 files changed, 936 insertions(+), 876 deletions(-)

diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index f5db5a8c4a..6860df07ca 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -129,6 +129,9 @@ static const struct
 	},
 	{
 		"ApplyWorkerMain", ApplyWorkerMain
+	},
+	{
+		"LogicalApplyBgwMain", LogicalApplyBgwMain
 	}
 };
 
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index e5a4d147a7..b32994784f 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3637,6 +3637,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
 		case WAIT_EVENT_HASH_GROW_BUCKETS_REINSERTING:
 			event_name = "Hash/GrowBuckets/Reinserting";
 			break;
+		case WAIT_EVENT_LOGICAL_APPLY_WORKER_READY:
+			event_name = "LogicalApplyWorkerReady";
+			break;
 		case WAIT_EVENT_LOGICAL_SYNC_DATA:
 			event_name = "LogicalSyncData";
 			break;
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 4bec9fe8b5..954ce7343a 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -789,14 +789,11 @@ logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
 	pq_sendint64(out, txn->commit_time);
 }
 
-TransactionId
+void
 logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
 {
-	TransactionId	xid;
 	uint8			flags;
 
-	xid = pq_getmsgint(in, 4);
-
 	/* read flags (unused for now) */
 	flags = pq_getmsgbyte(in);
 
@@ -807,8 +804,6 @@ logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
 	commit_data->commit_lsn = pq_getmsgint64(in);
 	commit_data->end_lsn = pq_getmsgint64(in);
 	commit_data->committime = pq_getmsgint64(in);
-
-	return xid;
 }
 
 void
@@ -823,13 +818,3 @@ logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
 	pq_sendint32(out, xid);
 	pq_sendint32(out, subxid);
 }
-
-void
-logicalrep_read_stream_abort(StringInfo in, TransactionId *xid,
-							 TransactionId *subxid)
-{
-	Assert(xid && subxid);
-
-	*xid = pq_getmsgint(in, 4);
-	*subxid = pq_getmsgint(in, 4);
-}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index ca632b7dc4..ab43b12985 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -92,11 +92,16 @@
 #include "rewrite/rewriteHandler.h"
 
 #include "storage/bufmgr.h"
+// #include "storage/condition_variable.h"
+#include "storage/dsm.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
+#include "storage/shm_mq.h"
+#include "storage/shm_toc.h"
+#include "storage/spin.h"
 
 #include "tcop/tcopprot.h"
 
@@ -115,6 +120,54 @@
 #include "utils/syscache.h"
 
 #define NAPTIME_PER_CYCLE 1000	/* max sleep time between cycles (1s) */
+#define PG_LOGICAL_APPLY_SHM_MAGIC 0x79fb2447 // TODO Consider change
+
+typedef struct ParallelState
+{
+	slock_t	mutex;
+	// ConditionVariable cv;
+	bool	attached;
+	bool	ready;
+	bool	finished;
+	Oid		database_id;
+	Oid		authenticated_user_id;
+	Oid		subid;
+	Oid		stream_xid;
+	uint32	n;
+} ParallelState;
+
+typedef struct WorkerState
+{
+	TransactionId			 xid;
+	BackgroundWorkerHandle	*handle;
+	shm_mq_handle			*mq_handle;
+	dsm_segment				*dsm_seg;
+	ParallelState volatile	*pstate;
+} WorkerState;
+
+/* Apply workers hash table (initialized on first use) */
+static HTAB *ApplyWorkersHash = NULL;
+static WorkerState **ApplyWorkersIdleList = NULL;
+static uint32 pool_size = 10; /* MaxConnections default? */
+static uint32 nworkers = 0;
+static uint32 nfreeworkers = 0;
+
+/* Fields valid only for apply background workers */
+bool isLogicalApplyWorker = false;
+volatile ParallelState *MyParallelState = NULL;
+
+/* Worker setup and interactions */
+static void setup_dsm(WorkerState *wstate);
+static void setup_background_worker(WorkerState *wstate);
+static void cleanup_background_worker(dsm_segment *seg, Datum arg);
+static void handle_sigterm(SIGNAL_ARGS);
+
+static bool check_worker_status(WorkerState *wstate);
+static void wait_for_worker(WorkerState *wstate);
+static void wait_for_worker_to_finish(WorkerState *wstate);
+
+static WorkerState * find_or_start_worker(TransactionId xid, bool start);
+static void stop_worker(WorkerState *wstate);
 
 typedef struct FlushPosition
 {
@@ -143,47 +196,13 @@ bool		MySubscriptionValid = false;
 bool		in_remote_transaction = false;
 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
 
-/* fields valid only when processing streamed transaction */
+/* Fields valid only when processing streamed transaction */
 bool		in_streamed_transaction = false;
 
 static TransactionId stream_xid = InvalidTransactionId;
-static int	stream_fd = -1;
-
-typedef struct SubXactInfo
-{
-	TransactionId xid;			/* XID of the subxact */
-	off_t		offset;			/* offset in the file */
-}			SubXactInfo;
-
-static uint32 nsubxacts = 0;
-static uint32 nsubxacts_max = 0;
-static SubXactInfo * subxacts = NULL;
-static TransactionId subxact_last = InvalidTransactionId;
-
-static void subxact_filename(char *path, Oid subid, TransactionId xid);
-static void changes_filename(char *path, Oid subid, TransactionId xid);
-
-/*
- * Information about subtransactions of a given toplevel transaction.
- */
-static void subxact_info_write(Oid subid, TransactionId xid);
-static void subxact_info_read(Oid subid, TransactionId xid);
-static void subxact_info_add(TransactionId xid);
-
-/*
- * Serialize and deserialize changes for a toplevel transaction.
- */
-static void stream_cleanup_files(Oid subid, TransactionId xid);
-static void stream_open_file(Oid subid, TransactionId xid, bool first);
-static void stream_write_change(char action, StringInfo s);
-static void stream_close_file(void);
-
-/*
- * Array of serialized XIDs.
- */
-static int	nxids = 0;
-static int	maxnxids = 0;
-static TransactionId	*xids = NULL;
+static TransactionId current_xid = InvalidTransactionId;
+static TransactionId prev_xid = InvalidTransactionId;
+static uint32 nchanges = 0;
 
 static bool handle_streamed_transaction(const char action, StringInfo s);
 
@@ -199,6 +218,16 @@ static volatile sig_atomic_t got_SIGHUP = false;
 /* prototype needed because of stream_commit */
 static void apply_dispatch(StringInfo s);
 
+// /* Debug only */
+// static void
+// iter_sleep(int seconds)
+// {
+// 	for (int i = 0; i < seconds; i++)
+// 	{
+// 		pg_usleep(1 * 1000L * 1000L);
+// 	}
+// }
+
 /*
  * Should this worker apply changes for given relation.
  *
@@ -250,6 +279,107 @@ ensure_transaction(void)
 	return true;
 }
 
+/*
+ * Look up worker inside ApplyWorkersHash for requested xid.
+ * Throw error if not found or start a new one if start=true is passed.
+ */
+static WorkerState *
+find_or_start_worker(TransactionId xid, bool start)
+{
+	bool found;
+	WorkerState *entry = NULL;
+
+	Assert(TransactionIdIsValid(xid));
+
+	/* First time through, initialize apply workers hashtable */
+	if (ApplyWorkersHash == NULL)
+	{
+		HASHCTL		ctl;
+
+		MemSet(&ctl, 0, sizeof(ctl));
+		ctl.keysize = sizeof(TransactionId);
+		ctl.entrysize = sizeof(WorkerState);
+		ctl.hcxt = ApplyContext; /* Allocate ApplyWorkersHash in the ApplyContext */
+		ApplyWorkersHash = hash_create("logical apply workers hash", 8,
+									 &ctl,
+									 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+	}
+
+	Assert(ApplyWorkersHash != NULL);
+
+	/*
+	 * Find entry for requested transaction.
+	 */
+	entry = hash_search(ApplyWorkersHash, &xid, HASH_FIND, &found);
+
+	if (!found && start)
+	{
+		/* If there is at least one worker in the idle list, then take one. */
+		if (nfreeworkers > 0)
+		{
+			char action = 'R';
+
+			Assert(ApplyWorkersIdleList != NULL);
+
+			entry = ApplyWorkersIdleList[nfreeworkers - 1];
+			if (!hash_update_hash_key(ApplyWorkersHash,
+									  (void *) entry,
+									  (void *) &xid))
+				elog(ERROR, "could not reassign apply worker #%u entry from xid %u to xid %u",
+													entry->pstate->n, entry->xid, xid);
+
+			entry->xid = xid;
+			entry->pstate->finished = false;
+			entry->pstate->stream_xid = xid;
+			shm_mq_send(entry->mq_handle, 1, &action, false);
+
+			ApplyWorkersIdleList[--nfreeworkers] = NULL;
+		}
+		else
+		{
+			/* No entry in hash and no idle workers. Create a new one. */
+			entry = hash_search(ApplyWorkersHash, &xid, HASH_ENTER, &found);
+			entry->xid = xid;
+			setup_background_worker(entry);
+
+			if (nworkers == pool_size)
+			{
+				ApplyWorkersIdleList = repalloc(ApplyWorkersIdleList, pool_size + 10);
+				pool_size += 10;
+			}
+		}
+	}
+	else if (!found && !start)
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+				errmsg("could not find logical apply worker for xid %u", xid)));
+	else
+		elog(DEBUG5, "there is an existing logical apply worker for xid %u", xid);
+
+	Assert(entry != NULL);
+
+	return entry;
+}
+
+/*
+ * Gracefully teardown apply worker.
+ */
+static void
+stop_worker(WorkerState *wstate)
+{
+	/*
+	 * Sending zero-length data to worker in order to stop it.
+	 */
+	shm_mq_send(wstate->mq_handle, 0, NULL, false);
+
+	elog(LOG, "detaching DSM of apply worker #%u for xid %u",
+									wstate->pstate->n, wstate->xid);
+	dsm_detach(wstate->dsm_seg);
+
+	/* Delete worker entry */
+	(void) hash_search(ApplyWorkersHash, &wstate->xid, HASH_REMOVE, NULL);
+}
+
 /*
  * Handle streamed transactions.
  *
@@ -262,12 +392,12 @@ static bool
 handle_streamed_transaction(const char action, StringInfo s)
 {
 	TransactionId xid;
+	WorkerState *entry;
 
 	/* not in streaming mode */
-	if (!in_streamed_transaction)
+	if (!in_streamed_transaction || isLogicalApplyWorker)
 		return false;
 
-	Assert(stream_fd != -1);
 	Assert(TransactionIdIsValid(stream_xid));
 
 	/*
@@ -278,11 +408,16 @@ handle_streamed_transaction(const char action, StringInfo s)
 
 	Assert(TransactionIdIsValid(xid));
 
-	/* Add the new subxact to the array (unless already there). */
-	subxact_info_add(xid);
+	/*
+	 * Find worker for requested xid.
+	 */
+	entry = find_or_start_worker(stream_xid, false);
 
-	/* write the change to the current file */
-	stream_write_change(action, s);
+	// elog(LOG, "sending message of length=%d and raw=%s, action=%s", s->len, s->data, (char *) &action);
+	shm_mq_send(entry->mq_handle, s->len, s->data, false);
+	nchanges += 1;
+
+	// iter_sleep(3600);
 
 	return true;
 }
@@ -643,7 +778,8 @@ apply_handle_origin(StringInfo s)
 static void
 apply_handle_stream_start(StringInfo s)
 {
-	bool		first_segment;
+	bool		 first_segment;
+	WorkerState *entry;
 
 	Assert(!in_streamed_transaction);
 
@@ -652,17 +788,16 @@ apply_handle_stream_start(StringInfo s)
 
 	/* extract XID of the top-level transaction */
 	stream_xid = logicalrep_read_stream_start(s, &first_segment);
+	nchanges = 0;
 
-	/* open the spool file for this transaction */
-	stream_open_file(MyLogicalRepWorker->subid, stream_xid, first_segment);
+	/* Find worker for requested xid */
+	entry = find_or_start_worker(stream_xid, true);
 
-	/*
-	 * if this is not the first segment, open existing file
-	 *
-	 * XXX Note that the cleanup is performed by stream_open_file.
-	 */
-	if (!first_segment)
-		subxact_info_read(MyLogicalRepWorker->subid, stream_xid);
+	SpinLockAcquire(&entry->pstate->mutex);
+	entry->pstate->ready = false;
+	SpinLockRelease(&entry->pstate->mutex);
+
+	elog(LOG, "starting streaming of xid %u", stream_xid);
 
 	pgstat_report_activity(STATE_RUNNING, NULL);
 }
@@ -673,16 +808,19 @@ apply_handle_stream_start(StringInfo s)
 static void
 apply_handle_stream_stop(StringInfo s)
 {
+	WorkerState *entry;
+	char action = 'E';
+
 	Assert(in_streamed_transaction);
 
-	/*
-	 * Close the file with serialized changes, and serialize information about
-	 * subxacts for the toplevel transaction.
-	 */
-	subxact_info_write(MyLogicalRepWorker->subid, stream_xid);
-	stream_close_file();
+	/* Find worker for requested xid */
+	entry = find_or_start_worker(stream_xid, false);
+
+	shm_mq_send(entry->mq_handle, 1, &action, false);
+	wait_for_worker(entry);
 
 	in_streamed_transaction = false;
+	elog(LOG, "stopped streaming of xid %u, %u changes streamed", stream_xid, nchanges);
 
 	pgstat_report_activity(STATE_IDLE, NULL);
 }
@@ -695,96 +833,67 @@ apply_handle_stream_abort(StringInfo s)
 {
 	TransactionId xid;
 	TransactionId subxid;
+	WorkerState *entry;
 
 	Assert(!in_streamed_transaction);
 
-	logicalrep_read_stream_abort(s, &xid, &subxid);
-
-	/*
-	 * If the two XIDs are the same, it's in fact abort of toplevel xact, so
-	 * just delete the files with serialized info.
-	 */
-	if (xid == subxid)
+	if(isLogicalApplyWorker)
 	{
-		char		path[MAXPGPATH];
+		subxid = pq_getmsgint(s, 4);
 
-		/*
-		 * XXX Maybe this should be an error instead? Can we receive abort for
-		 * a toplevel transaction we haven't received?
-		 */
+		ereport(LOG,
+				(errcode_for_file_access(),
+				errmsg("[Apply BGW #%u] aborting current transaction xid=%u, subxid=%u",
+				MyParallelState->n, GetCurrentTransactionIdIfAny(), GetCurrentSubTransactionId())));
 
-		changes_filename(path, MyLogicalRepWorker->subid, xid);
+		if (subxid == stream_xid)
+			AbortCurrentTransaction();
+		else
+		{
+			char *spname = (char *) palloc(64 * sizeof(char));
+			sprintf(spname, "savepoint_for_xid_%u", subxid);
 
-		if (unlink(path) < 0)
-			ereport(ERROR,
+			ereport(LOG,
 					(errcode_for_file_access(),
-					 errmsg("could not remove file \"%s\": %m", path)));
+					errmsg("[Apply BGW #%u] rolling back to savepoint %s", MyParallelState->n, spname)));
 
-		subxact_filename(path, MyLogicalRepWorker->subid, xid);
-
-		if (unlink(path) < 0)
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not remove file \"%s\": %m", path)));
+			RollbackToSavepoint(spname);
+			CommitTransactionCommand();
+			// RollbackAndReleaseCurrentSubTransaction();
 
-		return;
+			pfree(spname);
+		}
 	}
 	else
 	{
-		/*
-		 * OK, so it's a subxact. We need to read the subxact file for the
-		 * toplevel transaction, determine the offset tracked for the subxact,
-		 * and truncate the file with changes. We also remove the subxacts
-		 * with higher offsets (or rather higher XIDs).
-		 *
-		 * We intentionally scan the array from the tail, because we're likely
-		 * aborting a change for the most recent subtransactions.
-		 *
-		 * XXX Can we rely on the subxact XIDs arriving in sorted order? That
-		 * would allow us to use binary search here.
-		 *
-		 * XXX Or perhaps we can rely on the aborts to arrive in the reverse
-		 * order, i.e. from the inner-most subxact (when nested)? In which
-		 * case we could simply check the last element.
-		 */
+		xid = pq_getmsgint(s, 4);
+		subxid = pq_getmsgint(s, 4);
 
-		int64		i;
-		int64		subidx;
-		bool		found = false;
-		char		path[MAXPGPATH];
+		/* Find worker for requested xid */
+		entry = find_or_start_worker(stream_xid, false);
 
-		subidx = -1;
-		subxact_info_read(MyLogicalRepWorker->subid, xid);
+		elog(LOG, "processing abort request of streamed transaction xid %u, subxid %u",
+			xid, subxid);
+		shm_mq_send(entry->mq_handle, s->len, s->data, false);
 
-		/* FIXME optimize the search by bsearch on sorted data */
-		for (i = nsubxacts; i > 0; i--)
+		if (subxid == stream_xid)
 		{
-			if (subxacts[i - 1].xid == subxid)
-			{
-				subidx = (i - 1);
-				found = true;
-				break;
-			}
-		}
-
-		/* We should not receive aborts for unknown subtransactions. */
-		Assert(found);
+			char action = 'F';
+			shm_mq_send(entry->mq_handle, 1, &action, false);
+			// shm_mq_send(entry->mq_handle, 0, NULL, false);
 
-		/* OK, truncate the file at the right offset. */
-		Assert((subidx >= 0) && (subidx < nsubxacts));
+			wait_for_worker_to_finish(entry);
 
-		changes_filename(path, MyLogicalRepWorker->subid, xid);
+			elog(LOG, "adding finished apply worker #%u for xid %u to the idle list",
+												entry->pstate->n, entry->xid);
+			ApplyWorkersIdleList[nfreeworkers++] = entry;
 
-		if (truncate(path, subxacts[subidx].offset))
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not truncate file \"%s\": %m", path)));
+			// elog(LOG, "detaching DSM of apply worker for xid=%u\n", entry->xid);
+			// dsm_detach(entry->dsm_seg);
 
-		/* discard the subxacts added later */
-		nsubxacts = subidx;
-
-		/* write the updated subxact list */
-		subxact_info_write(MyLogicalRepWorker->subid, xid);
+			// /* Delete worker entry */
+			// (void) hash_search(ApplyWorkersHash, &xid, HASH_REMOVE, NULL);
+		}
 	}
 }
 
@@ -794,159 +903,56 @@ apply_handle_stream_abort(StringInfo s)
 static void
 apply_handle_stream_commit(StringInfo s)
 {
-	int			fd;
 	TransactionId xid;
-	StringInfoData s2;
-	int			nchanges;
-
-	char		path[MAXPGPATH];
-	char	   *buffer = NULL;
+	WorkerState *entry;
 	LogicalRepCommitData commit_data;
 
-	MemoryContext oldcxt;
-
-	Assert(!in_streamed_transaction);
-
-	xid = logicalrep_read_stream_commit(s, &commit_data);
-
-	elog(DEBUG1, "received commit for streamed transaction %u", xid);
-
-	/* open the spool file for the committed transaction */
-	changes_filename(path, MyLogicalRepWorker->subid, xid);
-
-	elog(DEBUG1, "replaying changes from file '%s'", path);
-
-	fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
-	if (fd < 0)
+	if (isLogicalApplyWorker)
 	{
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not open file \"%s\": %m",
-						path)));
-	}
-
-	/* XXX Should this be allocated in another memory context? */
+		// logicalrep_read_stream_commit(s, &commit_data);
 
-	oldcxt = MemoryContextSwitchTo(TopMemoryContext);
-
-	buffer = palloc(8192);
-	initStringInfo(&s2);
-
-	MemoryContextSwitchTo(oldcxt);
-
-	ensure_transaction();
-
-	/*
-	 * Make sure the handle apply_dispatch methods are aware we're in a remote
-	 * transaction.
-	 */
-	in_remote_transaction = true;
-	pgstat_report_activity(STATE_RUNNING, NULL);
-
-	/*
-	 * Read the entries one by one and pass them through the same logic as in
-	 * apply_dispatch.
-	 */
-	nchanges = 0;
-	while (true)
+		CommitTransactionCommand();
+	}
+	else
 	{
-		int			nbytes;
-		int			len;
-
-		/* read length of the on-disk record */
-		pgstat_report_wait_start(WAIT_EVENT_LOGICAL_CHANGES_READ);
-		nbytes = read(fd, &len, sizeof(len));
-		pgstat_report_wait_end();
-
-		/* have we reached end of the file? */
-		if (nbytes == 0)
-			break;
-
-		/* do we have a correct length? */
-		if (nbytes != sizeof(len))
-		{
-			int			save_errno = errno;
-
-			CloseTransientFile(fd);
-			errno = save_errno;
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read file: %m")));
-			return;
-		}
-
-		Assert(len > 0);
+		char action = 'F';
 
-		/* make sure we have sufficiently large buffer */
-		buffer = repalloc(buffer, len);
-
-		/* and finally read the data into the buffer */
-		pgstat_report_wait_start(WAIT_EVENT_LOGICAL_CHANGES_READ);
-		if (read(fd, buffer, len) != len)
-		{
-			int			save_errno = errno;
-
-			CloseTransientFile(fd);
-			errno = save_errno;
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read file: %m")));
-			return;
-		}
-		pgstat_report_wait_end();
+		Assert(!in_streamed_transaction);
 
-		/* copy the buffer to the stringinfo and call apply_dispatch */
-		resetStringInfo(&s2);
-		appendBinaryStringInfo(&s2, buffer, len);
+		xid = pq_getmsgint(s, 4);
+		logicalrep_read_stream_commit(s, &commit_data);
 
-		/* Ensure we are reading the data into our memory context. */
-		oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
+		elog(DEBUG1, "received commit for streamed transaction %u", xid);
 
-		apply_dispatch(&s2);
+		/* Find worker for requested xid */
+		entry = find_or_start_worker(xid, false);
 
-		MemoryContextReset(ApplyMessageContext);
+		/* Send commit message */
+		shm_mq_send(entry->mq_handle, s->len, s->data, false);
 
-		MemoryContextSwitchTo(oldcxt);
+		/* Notify worker, that we are done with this xact */
+		shm_mq_send(entry->mq_handle, 1, &action, false);
 
-		nchanges++;
+		wait_for_worker_to_finish(entry);
 
-		if (nchanges % 1000 == 0)
-			elog(DEBUG1, "replayed %d changes from file '%s'",
-				 nchanges, path);
+		elog(LOG, "adding finished apply worker #%u for xid %u to the idle list",
+											entry->pstate->n, entry->xid);
+		ApplyWorkersIdleList[nfreeworkers++] = entry;
 
 		/*
-		 * send feedback to upstream
-		 *
-		 * XXX Probably should send a valid LSN. But which one?
+		 * Update origin state so we can restart streaming from correct
+		 * position in case of crash.
 		 */
-		send_feedback(InvalidXLogRecPtr, false, false);
-	}
-
-	CloseTransientFile(fd);
-
-	/*
-	 * Update origin state so we can restart streaming from correct
-	 * position in case of crash.
-	 */
-	replorigin_session_origin_lsn = commit_data.end_lsn;
-	replorigin_session_origin_timestamp = commit_data.committime;
-
-	CommitTransactionCommand();
-	pgstat_report_stat(false);
-
-	store_flush_position(commit_data.end_lsn);
-
-	elog(DEBUG1, "replayed %d (all) changes from file '%s'",
-		 nchanges, path);
+		replorigin_session_origin_lsn = commit_data.end_lsn;
+		replorigin_session_origin_timestamp = commit_data.committime;
 
-	in_remote_transaction = false;
-	pgstat_report_activity(STATE_IDLE, NULL);
+		pgstat_report_stat(false);
 
-	/* unlink the files with serialized changes and subxact info */
-	stream_cleanup_files(MyLogicalRepWorker->subid, xid);
+		store_flush_position(commit_data.end_lsn);
 
-	pfree(buffer);
-	pfree(s2.data);
+		in_remote_transaction = false;
+		pgstat_report_activity(STATE_IDLE, NULL);
+	}
 }
 
 /*
@@ -965,6 +971,8 @@ apply_handle_relation(StringInfo s)
 	if (handle_streamed_transaction('R', s))
 		return;
 
+	// iter_sleep(3600);
+
 	rel = logicalrep_read_rel(s);
 	logicalrep_relmap_update(rel);
 }
@@ -1407,6 +1415,38 @@ apply_dispatch(StringInfo s)
 {
 	char		action = pq_getmsgbyte(s);
 
+	if (isLogicalApplyWorker)
+	{
+		/*
+		 * Inside logical apply worker we can figure out that new subtransaction
+		 * was started if new change arrived with different xid. In that case we
+		 * can define named savepoint, so that we were able to commit/rollback it
+		 * separately later.
+		 * 
+		 * Special case is if the first change comes from subtransuction, then
+		 * we check that current_xid differs from stream_xid.
+		 */
+		current_xid = pq_getmsgint(s, 4);
+
+		if (current_xid != stream_xid
+			&& ((TransactionIdIsValid(prev_xid) && current_xid != prev_xid)
+				|| !TransactionIdIsValid(prev_xid)))
+		{
+			char *spname = (char *) palloc(64 * sizeof(char));
+			sprintf(spname, "savepoint_for_xid_%u", current_xid);
+
+			elog(LOG, "[Apply BGW #%u] defining savepoint %s", MyParallelState->n, spname);
+
+			DefineSavepoint(spname);
+			CommitTransactionCommand();
+			// BeginInternalSubTransaction(NULL);
+		}
+
+		prev_xid = current_xid;
+	}
+	// else
+	// 	elog(LOG, "Logical worker: applying dispatch for action=%s", (char *) &action);
+
 	switch (action)
 	{
 			/* BEGIN */
@@ -1435,6 +1475,7 @@ apply_dispatch(StringInfo s)
 			break;
 			/* RELATION */
 		case 'R':
+			// elog(LOG, "%s worker: applying dispatch for action=R", isLogicalApplyWorker ? "Apply" : "Logical");
 			apply_handle_relation(s);
 			break;
 			/* TYPE */
@@ -1565,12 +1606,18 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
 static void
 worker_onexit(int code, Datum arg)
 {
-	int	i;
+	HASH_SEQ_STATUS status;
+	WorkerState *entry;
 
-	elog(LOG, "cleanup files for %d transactions", nxids);
-
-	for (i = nxids-1; i >= 0; i--)
-		stream_cleanup_files(MyLogicalRepWorker->subid, xids[i]);
+	if (ApplyWorkersHash != NULL)
+	{
+		hash_seq_init(&status, ApplyWorkersHash);
+		while ((entry = (WorkerState *) hash_seq_search(&status)) != NULL)
+		{
+			stop_worker(entry);
+		}
+		hash_seq_term(&status);
+	}
 }
 
 /*
@@ -1593,6 +1640,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 	/* mark as idle, before starting to loop */
 	pgstat_report_activity(STATE_IDLE, NULL);
 
+	ApplyWorkersIdleList = palloc(sizeof(WorkerState *) * pool_size);
+
 	for (;;)
 	{
 		pgsocket	fd = PGINVALID_SOCKET;
@@ -1904,8 +1953,9 @@ maybe_reread_subscription(void)
 	Subscription *newsub;
 	bool		started_tx = false;
 
+	// TODO Probably we have to handle subscription reread in apply workers too.
 	/* When cache state is valid there is nothing to do here. */
-	if (MySubscriptionValid)
+	if (MySubscriptionValid || isLogicalApplyWorker)
 		return;
 
 	/* This function might be called inside or outside of transaction. */
@@ -2039,608 +2089,50 @@ subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
 	MySubscriptionValid = false;
 }
 
-/*
- * subxact_info_write
- *	  Store information about subxacts for a toplevel transaction.
- *
- * For each subxact we store offset of it's first change in the main file.
- * The file is always over-written as a whole, and we also include CRC32C
- * checksum of the information.
- *
- * XXX We should only store subxacts that were not aborted yet.
- *
- * XXX Maybe we should only include the checksum when the cluster is
- * initialized with checksums?
- *
- * XXX Add calls to pgstat_report_wait_start/pgstat_report_wait_end.
- */
+/* SIGHUP: set flag to reload configuration at next convenient time */
 static void
-subxact_info_write(Oid subid, TransactionId xid)
+logicalrep_worker_sighup(SIGNAL_ARGS)
 {
-	int			fd;
-	char		path[MAXPGPATH];
-	uint32		checksum;
-	Size		len;
-
-	Assert(TransactionIdIsValid(xid));
-
-	subxact_filename(path, subid, xid);
-
-	fd = OpenTransientFile(path, O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY);
-	if (fd < 0)
-	{
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not create file \"%s\": %m",
-						path)));
-		return;
-	}
-
-	len = sizeof(SubXactInfo) * nsubxacts;
-
-	/* compute the checksum */
-	INIT_CRC32C(checksum);
-	COMP_CRC32C(checksum, (char *) &nsubxacts, sizeof(nsubxacts));
-	COMP_CRC32C(checksum, (char *) subxacts, len);
-	FIN_CRC32C(checksum);
-
-	pgstat_report_wait_start(WAIT_EVENT_LOGICAL_SUBXACT_WRITE);
-
-	if (write(fd, &checksum, sizeof(checksum)) != sizeof(checksum))
-	{
-		int			save_errno = errno;
+	int			save_errno = errno;
 
-		CloseTransientFile(fd);
-		errno = save_errno;
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not write to file \"%s\": %m",
-						path)));
-		return;
-	}
+	got_SIGHUP = true;
 
-	if (write(fd, &nsubxacts, sizeof(nsubxacts)) != sizeof(nsubxacts))
-	{
-		int			save_errno = errno;
+	/* Waken anything waiting on the process latch */
+	SetLatch(MyLatch);
 
-		CloseTransientFile(fd);
-		errno = save_errno;
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not write to file \"%s\": %m",
-						path)));
-		return;
-	}
+	errno = save_errno;
+}
 
-	if ((len > 0) && (write(fd, subxacts, len) != len))
-	{
-		int			save_errno = errno;
+/* Logical Replication Apply worker entry point */
+void
+ApplyWorkerMain(Datum main_arg)
+{
+	int			worker_slot = DatumGetInt32(main_arg);
+	MemoryContext oldctx;
+	char		originname[NAMEDATALEN];
+	XLogRecPtr	origin_startpos;
+	char	   *myslotname;
+	WalRcvStreamOptions options;
 
-		CloseTransientFile(fd);
-		errno = save_errno;
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not write to file \"%s\": %m",
-						path)));
-		return;
-	}
+	/* Attach to slot */
+	logicalrep_worker_attach(worker_slot);
 
-	pgstat_report_wait_end();
+	/* Setup signal handling */
+	pqsignal(SIGHUP, logicalrep_worker_sighup);
+	pqsignal(SIGTERM, die);
+	BackgroundWorkerUnblockSignals();
 
 	/*
-	 * We don't need to fsync or anything, as we'll recreate the files after a
-	 * crash from scratch. So just close the file.
+	 * We don't currently need any ResourceOwner in a walreceiver process, but
+	 * if we did, we could call CreateAuxProcessResourceOwner here.
 	 */
-	CloseTransientFile(fd);
 
-	/*
-	 * But we free the memory allocated for subxact info. There might be one
-	 * exceptional transaction with many subxacts, and we don't want to keep
-	 * the memory allocated forewer.
-	 *
-	 */
-	if (subxacts)
-		pfree(subxacts);
+	/* Initialise stats to a sanish value */
+	MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
+		MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
 
-	subxacts = NULL;
-	subxact_last = InvalidTransactionId;
-	nsubxacts = 0;
-	nsubxacts_max = 0;
-}
-
-/*
- * subxact_info_read
- *	  Restore information about subxacts of a streamed transaction.
- *
- * Read information about subxacts into the global variables, and while
- * reading the information verify the checksum.
- *
- * XXX Add calls to pgstat_report_wait_start/pgstat_report_wait_end.
- *
- * XXX Do we need to allocate it in TopMemoryContext?
- */
-static void
-subxact_info_read(Oid subid, TransactionId xid)
-{
-	int			fd;
-	char		path[MAXPGPATH];
-	uint32		checksum;
-	uint32		checksum_new;
-	Size		len;
-	MemoryContext oldctx;
-
-	Assert(TransactionIdIsValid(xid));
-	Assert(!subxacts);
-	Assert(nsubxacts == 0);
-	Assert(nsubxacts_max == 0);
-
-	subxact_filename(path, subid, xid);
-
-	fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
-	if (fd < 0)
-	{
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not open file \"%s\": %m",
-						path)));
-		return;
-	}
-
-	pgstat_report_wait_start(WAIT_EVENT_LOGICAL_SUBXACT_READ);
-
-	/* read the checksum */
-	if (read(fd, &checksum, sizeof(checksum)) != sizeof(checksum))
-	{
-		int			save_errno = errno;
-
-		CloseTransientFile(fd);
-		errno = save_errno;
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not read file \"%s\": %m",
-						path)));
-		return;
-	}
-
-	/* read number of subxact items */
-	if (read(fd, &nsubxacts, sizeof(nsubxacts)) != sizeof(nsubxacts))
-	{
-		int			save_errno = errno;
-
-		CloseTransientFile(fd);
-		errno = save_errno;
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not read file \"%s\": %m",
-						path)));
-		return;
-	}
-
-	pgstat_report_wait_end();
-
-	len = sizeof(SubXactInfo) * nsubxacts;
-
-	/* we keep the maximum as a power of 2 */
-	nsubxacts_max = 1 << my_log2(nsubxacts);
-
-	/* subxacts are long-lived */
-	oldctx = MemoryContextSwitchTo(TopMemoryContext);
-	subxacts = palloc(nsubxacts_max * sizeof(SubXactInfo));
-	MemoryContextSwitchTo(oldctx);
-
-	pgstat_report_wait_start(WAIT_EVENT_LOGICAL_SUBXACT_READ);
-
-	if ((len > 0) && ((read(fd, subxacts, len)) != len))
-	{
-		int			save_errno = errno;
-
-		CloseTransientFile(fd);
-		errno = save_errno;
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not read file \"%s\": %m",
-						path)));
-		return;
-	}
-
-	pgstat_report_wait_end();
-
-	/* recompute the checksum */
-	INIT_CRC32C(checksum_new);
-	COMP_CRC32C(checksum_new, (char *) &nsubxacts, sizeof(nsubxacts));
-	COMP_CRC32C(checksum_new, (char *) subxacts, len);
-	FIN_CRC32C(checksum_new);
-
-	if (checksum_new != checksum)
-		ereport(ERROR,
-				(errmsg("checksum failure when reading subxacts")));
-
-	CloseTransientFile(fd);
-}
-
-/*
- * subxact_info_add
- *	  Add information about a subxact (offset in the main file).
- *
- * XXX Do we need to allocate it in TopMemoryContext?
- */
-static void
-subxact_info_add(TransactionId xid)
-{
-	int64		i;
-
-	/*
-	 * If the XID matches the toplevel transaction, we don't want to add it.
-	 */
-	if (stream_xid == xid)
-		return;
-
-	/*
-	 * In most cases we're checking the same subxact as we've already seen in
-	 * the last call, so make ure just ignore it (this change comes later).
-	 */
-	if (subxact_last == xid)
-		return;
-
-	/* OK, remember we're processing this XID. */
-	subxact_last = xid;
-
-	/*
-	 * Check if the transaction is already present in the array of subxact. We
-	 * intentionally scan the array from the tail, because we're likely adding
-	 * a change for the most recent subtransactions.
-	 *
-	 * XXX Can we rely on the subxact XIDs arriving in sorted order? That
-	 * would allow us to use binary search here.
-	 */
-	for (i = nsubxacts; i > 0; i--)
-	{
-		/* found, so we're done */
-		if (subxacts[i - 1].xid == xid)
-			return;
-	}
-
-	/* This is a new subxact, so we need to add it to the array. */
-
-	if (nsubxacts == 0)
-	{
-		MemoryContext oldctx;
-
-		nsubxacts_max = 128;
-		oldctx = MemoryContextSwitchTo(TopMemoryContext);
-		subxacts = palloc(nsubxacts_max * sizeof(SubXactInfo));
-		MemoryContextSwitchTo(oldctx);
-	}
-	else if (nsubxacts == nsubxacts_max)
-	{
-		nsubxacts_max *= 2;
-		subxacts = repalloc(subxacts, nsubxacts_max * sizeof(SubXactInfo));
-	}
-
-	subxacts[nsubxacts].xid = xid;
-	subxacts[nsubxacts].offset = lseek(stream_fd, 0, SEEK_END);
-
-	nsubxacts++;
-}
-
-/* format filename for file containing the info about subxacts */
-static void
-subxact_filename(char *path, Oid subid, TransactionId xid)
-{
-	char		tempdirpath[MAXPGPATH];
-
-	TempTablespacePath(tempdirpath, DEFAULTTABLESPACE_OID);
-
-	/*
-	 * We might need to create the tablespace's tempfile directory, if no
-	 * one has yet done so.
-	 *
-	 * Don't check for error from mkdir; it could fail if the directory
-	 * already exists (maybe someone else just did the same thing).  If
-	 * it doesn't work then we'll bomb out when opening the file
-	 */
-	mkdir(tempdirpath, S_IRWXU);
-
-	snprintf(path, MAXPGPATH, "%s/logical-%u-%u.subxacts",
-			 tempdirpath, subid, xid);
-}
-
-/* format filename for file containing serialized changes */
-static void
-changes_filename(char *path, Oid subid, TransactionId xid)
-{
-	char		tempdirpath[MAXPGPATH];
-
-	TempTablespacePath(tempdirpath, DEFAULTTABLESPACE_OID);
-
-	/*
-	 * We might need to create the tablespace's tempfile directory, if no
-	 * one has yet done so.
-	 *
-	 * Don't check for error from mkdir; it could fail if the directory
-	 * already exists (maybe someone else just did the same thing).  If
-	 * it doesn't work then we'll bomb out when opening the file
-	 */
-	mkdir(tempdirpath, S_IRWXU);
-
-	snprintf(path, MAXPGPATH, "%s/logical-%u-%u.changes",
-			 tempdirpath, subid, xid);
-}
-
-/*
- * stream_cleanup_files
- *	  Cleanup files for a subscription / toplevel transaction.
- *
- * Remove files with serialized changes and subxact info for a particular
- * toplevel transaction. Each subscription has a separate set of files.
- *
- * Note: The files may not exists, so handle ENOENT as non-error.
- *
- * TODO: Add missing_ok flag to specify in which cases it's OK not to
- * find the files, and when it's an error.
- */
-static void
-stream_cleanup_files(Oid subid, TransactionId xid)
-{
-	int			i;
-	char		path[MAXPGPATH];
-	bool		found = false;
-
-	subxact_filename(path, subid, xid);
-
-	if ((unlink(path) < 0) && (errno != ENOENT))
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not remove file \"%s\": %m", path)));
-
-	changes_filename(path, subid, xid);
-
-	if ((unlink(path) < 0) && (errno != ENOENT))
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not remove file \"%s\": %m", path)));
-
-	/*
-	 * Cleanup the XID from the array - find the XID in the array and
-	 * remove it by shifting all the remaining elements. The array is
-	 * bound to be fairly small (maximum number of in-progress xacts,
-	 * so max_connections + max_prepared_transactions) so simply loop
-	 * through the array and find index of the XID. Then move the rest
-	 * of the array by one element to the left.
-	 *
-	 * Notice we also call this from stream_open_file for first segment
-	 * of each transaction, to deal with possible left-overs after a
-	 * crash, so it's entirely possible not to find the XID in the
-	 * array here. In that case we don't remove anything.
-	 *
-	 * XXX Perhaps it'd be better to handle this automatically after a
-	 * restart, instead of doing it over and over for each transaction.
-	 */
-	for (i = 0; i < nxids; i++)
-	{
-		if (xids[i] == xid)
-		{
-			found = true;
-			break;
-		}
-	}
-
-	if (!found)
-		return;
-
-	/*
-	 * Move the last entry from the array to the place. We don't keep
-	 * the streamed transactions sorted or anything - we only expect 
-	 * a few of them in progress (max_connections + max_prepared_xacts)
-	 * so linear search is just fine.
-	 */
-	xids[i] = xids[nxids-1];
-	nxids--;
-}
-
-/*
- * stream_open_file
- *	  Open file we'll use to serialize changes for a toplevel transaction.
- *
- * Open a file for streamed changes from a toplevel transaction identified
- * by stream_xid (global variable). If it's the first chunk of streamed
- * changes for this transaction, perform cleanup by removing existing
- * files after a possible previous crash.
- *
- * This can only be called at the beginning of a "streaming" block, i.e.
- * between stream_start/stream_stop messages from the upstream.
- */
-static void
-stream_open_file(Oid subid, TransactionId xid, bool first_segment)
-{
-	char		path[MAXPGPATH];
-	int			flags;
-
-	Assert(in_streamed_transaction);
-	Assert(OidIsValid(subid));
-	Assert(TransactionIdIsValid(xid));
-	Assert(stream_fd == -1);
-
-	/*
-	 * If this is the first segment for this transaction, try removing
-	 * existing files (if there are any, possibly after a crash).
-	 */
-	if (first_segment)
-	{
-		MemoryContext	oldcxt;
-
-		/* XXX make sure there are no previous files for this transaction */
-		stream_cleanup_files(subid, xid);
-
-		oldcxt = MemoryContextSwitchTo(TopMemoryContext);
-
-		/*
-		 * We need to remember the XIDs we spilled to files, so that we can
-		 * remove them at worker exit (e.g. after DROP SUBSCRIPTION).
-		 *
-		 * The number of XIDs we may need to track is fairly small, because
-		 * we can only stream toplevel xacts (so limited by max_connections
-		 * and max_prepared_transactions), and we only stream the large ones.
-		 * So we simply keep the XIDs in an unsorted array. If the number of
-		 * xacts gets large for some reason (e.g. very high max_connections),
-		 * a more elaborate approach might be better - e.g. sorted array, to
-		 * speed-up the lookups.
-		 */
-		if (nxids == maxnxids)	/* array of XIDs is full */
-		{
-			if (!xids)
-			{
-				maxnxids = 64;
-				xids = palloc(maxnxids * sizeof(TransactionId));
-			}
-			else
-			{
-				maxnxids = 2 * maxnxids;
-				xids = repalloc(xids, maxnxids * sizeof(TransactionId));
-			}
-		}
-
-		xids[nxids++] = xid;
-
-		MemoryContextSwitchTo(oldcxt);
-	}
-
-	changes_filename(path, subid, xid);
-
-	elog(DEBUG1, "opening file '%s' for streamed changes", path);
-
-	/*
-	 * If this is the first streamed segment, the file must not exist, so
-	 * make sure we're the ones creating it. Otherwise just open the file
-	 * for writing, in append mode.
-	 */
-	if (first_segment)
-		flags = (O_WRONLY | O_CREAT | O_EXCL | PG_BINARY);
-	else
-		flags = (O_WRONLY | O_APPEND | PG_BINARY);
-
-	stream_fd = OpenTransientFile(path, flags);
-
-	if (stream_fd < 0)
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not open file \"%s\": %m",
-						path)));
-}
-
-/*
- * stream_close_file
- *	  Close the currently open file with streamed changes.
- *
- * This can only be called at the beginning of a "streaming" block, i.e.
- * between stream_start/stream_stop messages from the upstream.
- */
-static void
-stream_close_file(void)
-{
-	Assert(in_streamed_transaction);
-	Assert(TransactionIdIsValid(stream_xid));
-	Assert(stream_fd != -1);
-
-	CloseTransientFile(stream_fd);
-
-	stream_xid = InvalidTransactionId;
-	stream_fd = -1;
-}
-
-/*
- * stream_write_change
- *	  Serialize a change to a file for the current toplevel transaction.
- *
- * The change is serialied in a simple format, with length (not including
- * the length), action code (identifying the message type) and message
- * contents (without the subxact TransactionId value).
- *
- * XXX The subxact file includes CRC32C of the contents. Maybe we should
- * include something like that here too, but doing so will not be as
- * straighforward, because we write the file in chunks.
- */
-static void
-stream_write_change(char action, StringInfo s)
-{
-	int			len;
-
-	Assert(in_streamed_transaction);
-	Assert(TransactionIdIsValid(stream_xid));
-	Assert(stream_fd != -1);
-
-	/* total on-disk size, including the action type character */
-	len = (s->len - s->cursor) + sizeof(char);
-
-	pgstat_report_wait_start(WAIT_EVENT_LOGICAL_CHANGES_WRITE);
-
-	/* first write the size */
-	if (write(stream_fd, &len, sizeof(len)) != sizeof(len))
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not serialize streamed change to file: %m")));
-
-	/* then the action */
-	if (write(stream_fd, &action, sizeof(action)) != sizeof(action))
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not serialize streamed change to file: %m")));
-
-	/* and finally the remaining part of the buffer (after the XID) */
-	len = (s->len - s->cursor);
-
-	if (write(stream_fd, &s->data[s->cursor], len) != len)
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not serialize streamed change to file: %m")));
-
-	pgstat_report_wait_end();
-}
-
-/* SIGHUP: set flag to reload configuration at next convenient time */
-static void
-logicalrep_worker_sighup(SIGNAL_ARGS)
-{
-	int			save_errno = errno;
-
-	got_SIGHUP = true;
-
-	/* Waken anything waiting on the process latch */
-	SetLatch(MyLatch);
-
-	errno = save_errno;
-}
-
-/* Logical Replication Apply worker entry point */
-void
-ApplyWorkerMain(Datum main_arg)
-{
-	int			worker_slot = DatumGetInt32(main_arg);
-	MemoryContext oldctx;
-	char		originname[NAMEDATALEN];
-	XLogRecPtr	origin_startpos;
-	char	   *myslotname;
-	WalRcvStreamOptions options;
-
-	/* Attach to slot */
-	logicalrep_worker_attach(worker_slot);
-
-	/* Setup signal handling */
-	pqsignal(SIGHUP, logicalrep_worker_sighup);
-	pqsignal(SIGTERM, die);
-	BackgroundWorkerUnblockSignals();
-
-	/*
-	 * We don't currently need any ResourceOwner in a walreceiver process, but
-	 * if we did, we could call CreateAuxProcessResourceOwner here.
-	 */
-
-	/* Initialise stats to a sanish value */
-	MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
-		MyLogicalRepWorker->reply_time = GetCurrentTimestamp();
-
-	/* Load the libpq-specific functions */
-	load_file("libpqwalreceiver", false);
+	/* Load the libpq-specific functions */
+	load_file("libpqwalreceiver", false);
 
 	/* Run as replica session replication role. */
 	SetConfigOption("session_replication_role", "replica",
@@ -2798,3 +2290,580 @@ IsLogicalWorker(void)
 {
 	return MyLogicalRepWorker != NULL;
 }
+
+/*
+ * Apply Background Worker main loop.
+ */
+void
+LogicalApplyBgwMain(Datum main_arg)
+{
+	volatile ParallelState *pst;
+
+	dsm_segment			*seg;
+	shm_toc				*toc;
+	PGPROC				*registrant;
+	shm_mq				*mq;
+	shm_mq_handle		*mqh;
+	shm_mq_result		 shmq_res;
+	// ConditionVariable	 cv;
+	LogicalRepWorker	 lrw;
+	MemoryContext		 oldcontext;
+
+	MemoryContextSwitchTo(TopMemoryContext);
+
+	/* Load the subscription into persistent memory context. */
+	ApplyContext = AllocSetContextCreate(TopMemoryContext,
+										 "ApplyContext",
+										 ALLOCSET_DEFAULT_SIZES);
+
+	oldcontext = MemoryContextSwitchTo(ApplyContext);
+
+	/*
+	 * Init the ApplyMessageContext which we clean up after each replication
+	 * protocol message.
+	 */
+	ApplyMessageContext = AllocSetContextCreate(ApplyContext,
+												"ApplyMessageContext",
+												ALLOCSET_DEFAULT_SIZES);
+
+	isLogicalApplyWorker = true;
+
+	/*
+	 * Establish signal handlers.
+	 *
+	 * We want CHECK_FOR_INTERRUPTS() to kill off this worker process just as
+	 * it would a normal user backend.  To make that happen, we establish a
+	 * signal handler that is a stripped-down version of die().
+	 */
+	pqsignal(SIGTERM, handle_sigterm);
+	BackgroundWorkerUnblockSignals();
+
+	/*
+	 * Connect to the dynamic shared memory segment.
+	 *
+	 * The backend that registered this worker passed us the ID of a shared
+	 * memory segment to which we must attach for further instructions.  In
+	 * order to attach to dynamic shared memory, we need a resource owner.
+	 * Once we've mapped the segment in our address space, attach to the table
+	 * of contents so we can locate the various data structures we'll need to
+	 * find within the segment.
+	 */
+	CurrentResourceOwner = ResourceOwnerCreate(NULL, "Logical apply worker");
+	seg = dsm_attach(DatumGetInt32(main_arg));
+	if (seg == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("unable to map dynamic shared memory segment")));
+	toc = shm_toc_attach(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg));
+	if (toc == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("bad magic number in dynamic shared memory segment")));
+
+	/*
+	 * Acquire a worker number.
+	 *
+	 * By convention, the process registering this background worker should
+	 * have stored the control structure at key 0.  We look up that key to
+	 * find it.  Our worker number gives our identity: there may be just one
+	 * worker involved in this parallel operation, or there may be many.
+	 */
+	pst = shm_toc_lookup(toc, 0, false);
+	MyParallelState = pst;
+
+	SpinLockAcquire(&pst->mutex);
+	pst->attached = true;
+	SpinLockRelease(&pst->mutex);
+
+	/*
+	 * Attach to the message queue.
+	 */
+	mq = shm_toc_lookup(toc, 1, false);
+	shm_mq_set_receiver(mq, MyProc);
+	mqh = shm_mq_attach(mq, seg, NULL);
+
+	/* Restore database connection. */
+	BackgroundWorkerInitializeConnectionByOid(pst->database_id,
+											  pst->authenticated_user_id, 0);
+
+	/*
+	 * Set the client encoding to the database encoding, since that is what
+	 * the leader will expect.
+	 */
+	SetClientEncoding(GetDatabaseEncoding());
+
+	lrw.subid = pst->subid;
+	MyLogicalRepWorker = &lrw;
+
+	stream_xid = pst->stream_xid;
+
+	StartTransactionCommand();
+	BeginTransactionBlock();
+	CommitTransactionCommand();
+	StartTransactionCommand();
+	// PushActiveSnapshot(GetTransactionSnapshot());
+
+	MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
+
+	/*
+	 * Indicate that we're fully initialized and ready to begin the main part
+	 * of the parallel operation.
+	 *
+	 * Once we signal that we're ready, the user backend is entitled to assume
+	 * that our on_dsm_detach callbacks will fire before we disconnect from
+	 * the shared memory segment and exit.  Generally, that means we must have
+	 * attached to all relevant dynamic shared memory data structures by now.
+	 */
+	SpinLockAcquire(&pst->mutex);
+	pst->ready = true;
+	// cv = pst->cv;
+	// if (pst->workers_ready == pst->workers_total)
+	// {
+	//	 registrant = BackendPidGetProc(MyBgworkerEntry->bgw_notify_pid);
+	//	 if (registrant == NULL)
+	//	 {
+	//		 elog(DEBUG1, "registrant backend has exited prematurely");
+	//		 proc_exit(1);
+	//	 }
+	//	 SetLatch(&registrant->procLatch);
+	// }
+	SpinLockRelease(&pst->mutex);
+	elog(LOG, "[Apply BGW #%u] started", pst->n);
+
+	registrant = BackendPidGetProc(MyBgworkerEntry->bgw_notify_pid);
+	SetLatch(&registrant->procLatch);
+
+	for (;;)
+	{
+		void *data;
+		Size  len;
+		StringInfoData s;
+		MemoryContext	oldctx;
+
+		CHECK_FOR_INTERRUPTS();
+
+		/* Ensure we are reading the data into our memory context. */
+		oldctx = MemoryContextSwitchTo(ApplyMessageContext);
+
+		shmq_res = shm_mq_receive(mqh, &len, &data, false);
+
+		if (shmq_res != SHM_MQ_SUCCESS)
+			break;
+
+		if (len == 0)
+		{
+			elog(LOG, "[Apply BGW #%u] got zero-length message, stopping", pst->n);
+			break;
+		}
+		else
+		{
+			s.cursor = 0;
+			s.maxlen = -1;
+			s.data = (char *) data;
+			s.len = len;
+
+			/*
+			 * We use first byte of message for additional communication between
+			 * main Logical replication worker and Apply BGWorkers, so if it
+			 * differs from 'w', then process it first.
+			 */
+			switch (pq_getmsgbyte(&s))
+			{
+				/* Stream stop */
+				case 'E':
+				{
+					in_remote_transaction = false;
+
+					SpinLockAcquire(&pst->mutex);
+					pst->ready = true;
+					SpinLockRelease(&pst->mutex);
+					SetLatch(&registrant->procLatch);
+
+					elog(LOG, "[Apply BGW #%u] ended processing streaming chunk, waiting on shm_mq_receive", pst->n);
+
+					continue;
+				}
+				/* Reassign to the new transaction */
+				case 'R':
+				{
+					elog(LOG, "[Apply BGW #%u] switching from processing xid %u to xid %u",
+											pst->n, stream_xid, pst->stream_xid);
+					stream_xid = pst->stream_xid;
+
+					StartTransactionCommand();
+					BeginTransactionBlock();
+					CommitTransactionCommand();
+					StartTransactionCommand();
+
+					MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
+
+					continue;
+				}
+				/* Finished processing xact */
+				case 'F':
+				{
+					elog(LOG, "[Apply BGW #%u] finished processing xact %u", pst->n, stream_xid);
+
+					MemoryContextSwitchTo(ApplyContext);
+
+					CommitTransactionCommand();
+					EndTransactionBlock();
+					CommitTransactionCommand();
+
+					SpinLockAcquire(&pst->mutex);
+					pst->finished = true;
+					SpinLockRelease(&pst->mutex);
+
+					continue;
+				}
+				default:
+					break;
+			}
+
+			pq_getmsgint64(&s); // Read LSN info
+			pq_getmsgint64(&s); // TODO Do we need to process it here again somehow?
+			pq_getmsgint64(&s);
+
+			/*
+			 * Make sure the handle apply_dispatch methods are aware we're in a remote
+			 * transaction.
+			 */
+			in_remote_transaction = true;
+			pgstat_report_activity(STATE_RUNNING, NULL);
+
+			elog(DEBUG5, "[Apply BGW #%u] applying dispatch for action=%s",
+									pst->n, (char *) &s.data[s.cursor]);
+			apply_dispatch(&s);
+		}
+
+		MemoryContextSwitchTo(oldctx);
+		MemoryContextReset(ApplyMessageContext);
+	}
+
+	CommitTransactionCommand();
+	EndTransactionBlock();
+	CommitTransactionCommand();
+
+	MemoryContextSwitchTo(oldcontext);
+	MemoryContextReset(ApplyContext);
+
+	SpinLockAcquire(&pst->mutex);
+	pst->finished = true;
+	// if (pst->workers_finished == pst->workers_total)
+	// {
+	//	 registrant = BackendPidGetProc(MyBgworkerEntry->bgw_notify_pid);
+	//	 if (registrant == NULL)
+	//	 {
+	//		 elog(DEBUG1, "registrant backend has exited prematurely");
+	//		 proc_exit(1);
+	//	 }
+	//	 SetLatch(&registrant->procLatch);
+	// }
+	SpinLockRelease(&pst->mutex);
+
+	elog(LOG, "[Apply BGW #%u] exiting", pst->n);
+
+	/* Signal main process that we are done. */
+	// ConditionVariableBroadcast(&cv);
+	SetLatch(&registrant->procLatch);
+
+	/*
+	 * We're done.  Explicitly detach the shared memory segment so that we
+	 * don't get a resource leak warning at commit time.  This will fire any
+	 * on_dsm_detach callbacks we've registered, as well.  Once that's done,
+	 * we can go ahead and exit.
+	 */
+	dsm_detach(seg);
+	proc_exit(0);
+}
+
+/*
+ * When we receive a SIGTERM, we set InterruptPending and ProcDiePending just
+ * like a normal backend.  The next CHECK_FOR_INTERRUPTS() will do the right
+ * thing.
+ */
+static void
+handle_sigterm(SIGNAL_ARGS)
+{
+	int save_errno = errno;
+
+	SetLatch(MyLatch);
+
+	if (!proc_exit_inprogress)
+	{
+		InterruptPending = true;
+		ProcDiePending = true;
+	}
+
+	errno = save_errno;
+}
+
+/*
+ * Set up a dynamic shared memory segment.
+ *
+ * We set up a control region that contains a ParallelState,
+ * plus one region per message queue. There are as many message queues as
+ * the number of workers.
+ */
+static void
+setup_dsm(WorkerState *wstate)
+{
+	shm_toc_estimator	 e;
+	int					 toc_key = 0;
+	Size				 segsize;
+	dsm_segment			*seg;
+	shm_toc				*toc;
+	ParallelState		*pst;
+	shm_mq				*mq;
+	int64				 queue_size = 160000000; /* 16 MB for now */
+
+	/* Ensure a valid queue size. */
+	if (queue_size < 0 || ((uint64) queue_size) < shm_mq_minimum_size)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("queue size must be at least %zu bytes",
+						shm_mq_minimum_size)));
+	if (queue_size != ((Size) queue_size))
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("queue size overflows size_t")));
+
+	/*
+	 * Estimate how much shared memory we need.
+	 *
+	 * Because the TOC machinery may choose to insert padding of oddly-sized
+	 * requests, we must estimate each chunk separately.
+	 *
+	 * We need one key to register the location of the header, and we need
+	 * nworkers keys to track the locations of the message queues.
+	 */
+	shm_toc_initialize_estimator(&e);
+	shm_toc_estimate_chunk(&e, sizeof(ParallelState));
+	shm_toc_estimate_chunk(&e, (Size) queue_size);
+
+	shm_toc_estimate_keys(&e, 1 + 1);
+	segsize = shm_toc_estimate(&e);
+
+	/* Create the shared memory segment and establish a table of contents. */
+	seg = dsm_create(shm_toc_estimate(&e), 0);
+	toc = shm_toc_create(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg),
+						 segsize);
+
+	/* Set up the header region. */
+	pst = shm_toc_allocate(toc, sizeof(ParallelState));
+	SpinLockInit(&pst->mutex);
+	pst->attached = false;
+	pst->ready = false;
+	pst->finished = false;
+	pst->database_id = MyDatabaseId;
+	pst->subid = MyLogicalRepWorker->subid;
+	pst->stream_xid = stream_xid;
+	pst->authenticated_user_id = GetAuthenticatedUserId();
+	pst->n = nworkers + 1;
+	// ConditionVariableInit(&pst->cv);
+
+	shm_toc_insert(toc, toc_key++, pst);
+
+	/* Set up one message queue per worker, plus one. */
+	mq = shm_mq_create(shm_toc_allocate(toc, (Size) queue_size),
+						(Size) queue_size);
+	shm_toc_insert(toc, toc_key++, mq);
+	shm_mq_set_sender(mq, MyProc);
+
+	/* Attach the queues. */
+	wstate->mq_handle = shm_mq_attach(mq, seg, wstate->handle);
+
+	/* Return results to caller. */
+	wstate->dsm_seg = seg;
+	wstate->pstate = pst;
+}
+
+/*
+ * Register background workers.
+ */
+static void
+setup_background_worker(WorkerState *wstate)
+{
+	MemoryContext		oldcontext;
+	BackgroundWorker	worker;
+
+	elog(LOG, "setting up apply worker #%u", nworkers + 1);
+
+	/*
+	 * TOCHECK: We need the worker_state object and the background worker handles to
+	 * which it points to be allocated in TopMemoryContext rather than
+	 * ApplyMessageContext; otherwise, they'll be destroyed before the on_dsm_detach
+	 * hooks run.
+	 */
+	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+
+	setup_dsm(wstate);
+
+	/*
+	 * Arrange to kill all the workers if we abort before all workers are
+	 * finished hooking themselves up to the dynamic shared memory segment.
+	 *
+	 * If we die after all the workers have finished hooking themselves up to
+	 * the dynamic shared memory segment, we'll mark the two queues to which
+	 * we're directly connected as detached, and the worker(s) connected to
+	 * those queues will exit, marking any other queues to which they are
+	 * connected as detached.  This will cause any as-yet-unaware workers
+	 * connected to those queues to exit in their turn, and so on, until
+	 * everybody exits.
+	 *
+	 * But suppose the workers which are supposed to connect to the queues to
+	 * which we're directly attached exit due to some error before they
+	 * actually attach the queues.  The remaining workers will have no way of
+	 * knowing this.  From their perspective, they're still waiting for those
+	 * workers to start, when in fact they've already died.
+	 */
+	on_dsm_detach(wstate->dsm_seg, cleanup_background_worker,
+				  PointerGetDatum(wstate));
+
+	/* Configure a worker. */
+	MemSet(&worker, 0, sizeof(BackgroundWorker));
+
+	worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
+		BGWORKER_BACKEND_DATABASE_CONNECTION;
+	worker.bgw_start_time = BgWorkerStart_ConsistentState;
+	worker.bgw_restart_time = BGW_NEVER_RESTART;
+	worker.bgw_notify_pid = MyProcPid;
+	sprintf(worker.bgw_library_name, "postgres");
+	sprintf(worker.bgw_function_name, "LogicalApplyBgwMain");
+
+	worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(wstate->dsm_seg));
+
+	/* Register the workers. */
+	snprintf(worker.bgw_name, BGW_MAXLEN,
+			"logical replication apply worker #%u for subscription %u",
+										nworkers + 1, MySubscription->oid);
+	if (!RegisterDynamicBackgroundWorker(&worker, &wstate->handle))
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+					errmsg("could not register background process"),
+					errhint("You may need to increase max_worker_processes.")));
+
+	/* All done. */
+	MemoryContextSwitchTo(oldcontext);
+
+	/* Wait for worker to become ready. */
+	wait_for_worker(wstate);
+
+	/*
+	 * Once we reach this point, all workers are ready.  We no longer need to
+	 * kill them if we die; they'll die on their own as the message queues
+	 * shut down.
+	 */
+	cancel_on_dsm_detach(wstate->dsm_seg, cleanup_background_worker,
+						 PointerGetDatum(wstate));
+
+	nworkers += 1;
+}
+
+static void
+cleanup_background_worker(dsm_segment *seg, Datum arg)
+{
+	WorkerState *wstate = (WorkerState *) DatumGetPointer(arg);
+
+	TerminateBackgroundWorker(wstate->handle);
+}
+
+static void
+wait_for_worker(WorkerState *wstate)
+{
+	bool result = false;
+
+	for (;;)
+	{
+		// ConditionVariable cv;
+		bool ready;
+
+		/* If the worker is ready, we have succeeded. */
+		SpinLockAcquire(&wstate->pstate->mutex);
+		ready = wstate->pstate->ready;
+		// cv = wstate->pstate->cv;
+		SpinLockRelease(&wstate->pstate->mutex);
+		if (ready)
+		{
+			result = true;
+			break;
+		}
+
+		/* If any workers (or the postmaster) have died, we have failed. */
+		if (!check_worker_status(wstate))
+		{
+			result = false;
+			break;
+		}
+
+		/* Wait for the workers to wake us up. */
+		// ConditionVariableSleep(&cv, WAIT_EVENT_LOGICAL_APPLY_WORKER_READY);
+
+		/* Wait to be signalled. */
+		WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
+							WAIT_EVENT_LOGICAL_APPLY_WORKER_READY);
+
+		/* Reset the latch so we don't spin. */
+		ResetLatch(MyLatch);
+
+		/* An interrupt may have occurred while we were waiting. */
+		CHECK_FOR_INTERRUPTS();
+	}
+
+	// ConditionVariableCancelSleep();
+
+	if (!result)
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+				 errmsg("one or more background workers failed to start")));
+}
+
+static bool
+check_worker_status(WorkerState *wstate)
+{
+	BgwHandleStatus status;
+	pid_t			pid;
+
+	status = GetBackgroundWorkerPid(wstate->handle, &pid);
+	if (status == BGWH_STOPPED || status == BGWH_POSTMASTER_DIED)
+		return false;
+
+	/* Otherwise, things still look OK. */
+	return true;
+}
+
+static void
+wait_for_worker_to_finish(WorkerState *wstate)
+{
+	elog(LOG, "waiting for apply worker #%u to finish processing xid %u",
+										wstate->pstate->n, wstate->xid);
+
+	for (;;)
+	{
+		// ConditionVariable cv;
+		bool finished;
+
+		/* If the worker is finished, we have succeeded. */
+		SpinLockAcquire(&wstate->pstate->mutex);
+		finished = wstate->pstate->finished;
+		// cv = wstate->pstate->cv;
+		SpinLockRelease(&wstate->pstate->mutex);
+		if (finished)
+		{
+			break;
+		}
+
+		/* Wait for the workers to wake us up. */
+		// ConditionVariableSleep(&cv, WAIT_EVENT_LOGICAL_APPLY_WORKER_READY);
+
+		/* Wait to be signalled. */
+		WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
+							WAIT_EVENT_LOGICAL_APPLY_WORKER_READY);
+
+		/* Reset the latch so we don't spin. */
+		ResetLatch(MyLatch);
+
+		/* An interrupt may have occurred while we were waiting. */
+		CHECK_FOR_INTERRUPTS();
+	}
+}
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 3a89e23488..7c72db9e83 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -819,6 +819,7 @@ typedef enum
 	WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATING,
 	WAIT_EVENT_HASH_GROW_BUCKETS_ELECTING,
 	WAIT_EVENT_HASH_GROW_BUCKETS_REINSERTING,
+	WAIT_EVENT_LOGICAL_APPLY_WORKER_READY,
 	WAIT_EVENT_LOGICAL_SYNC_DATA,
 	WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE,
 	WAIT_EVENT_MQ_INTERNAL,
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 802275311d..afb15c2736 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -122,12 +122,10 @@ extern TransactionId logicalrep_read_stream_stop(StringInfo in);
 
 extern void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
 						XLogRecPtr commit_lsn);
-extern TransactionId logicalrep_read_stream_commit(StringInfo out,
+extern void logicalrep_read_stream_commit(StringInfo out,
 					   LogicalRepCommitData *commit_data);
 
 extern void logicalrep_write_stream_abort(StringInfo out,
 							  TransactionId xid, TransactionId subxid);
-extern void logicalrep_read_stream_abort(StringInfo in,
-							 TransactionId *xid, TransactionId *subxid);
 
 #endif							/* LOGICALREP_PROTO_H */
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index e9524aefd9..30ad40247d 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -13,6 +13,7 @@
 #define LOGICALWORKER_H
 
 extern void ApplyWorkerMain(Datum main_arg);
+extern void LogicalApplyBgwMain(Datum main_arg);
 
 extern bool IsLogicalWorker(void);
 
-- 
2.17.1

