From 4e1236d5300a6bac1de0d5d82a185257e99d4f47 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Thu, 1 Oct 2020 20:53:04 -0300
Subject: [PATCH 2/2] Review logical replication tablesync code

It was too obscure and it let bugs hide.  Remove optimization that let
sync worker exit ahead of time.
---
 src/backend/catalog/pg_subscription.c       |   7 +-
 src/backend/replication/logical/tablesync.c | 255 +++++++++-----------
 src/backend/replication/logical/worker.c    |  15 +-
 3 files changed, 122 insertions(+), 155 deletions(-)

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 311d46225a..86825c476b 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -361,10 +361,9 @@ GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn,
 	}
 
 	/* Get the state. */
-	d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
-						Anum_pg_subscription_rel_srsubstate, &isnull);
-	Assert(!isnull);
-	substate = DatumGetChar(d);
+	substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
+
+	/* Get the LSN */
 	d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
 						Anum_pg_subscription_rel_srsublsn, &isnull);
 	if (isnull)
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index c27d970589..4826f1d228 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1,6 +1,6 @@
 /*-------------------------------------------------------------------------
  * tablesync.c
- *	  PostgreSQL logical replication
+ *	  PostgreSQL logical replication -- initial table data synchronization
  *
  * Copyright (c) 2012-2020, PostgreSQL Global Development Group
  *
@@ -26,26 +26,30 @@
  *	   - It allows us to synchronize any tables added after the initial
  *		 synchronization has finished.
  *
- *	  The stream position synchronization works in multiple steps.
- *	   - Sync finishes copy and sets worker state as SYNCWAIT and waits for
- *		 state to change in a loop.
- *	   - Apply periodically checks tables that are synchronizing for SYNCWAIT.
- *		 When the desired state appears, it will set the worker state to
- *		 CATCHUP and starts loop-waiting until either the table state is set
- *		 to SYNCDONE or the sync worker exits.
+ *	  The stream position synchronization works in multiple steps:
+ *	   - apply worker requests a tablesync worker to start, setting the new
+ *		 table state to INIT.
+ *	   - tablesync worker starts; changes table state from INIT to DATASYNC while
+ *		 copying.
+ *	   - tablesync worker finishes the copy and sets table state to SYNCWAIT;
+ *		 waits for state change.
+ *	   - Apply worker periodically checks for tables in SYNCWAIT state.  When
+ *		 any appear, it sets the table state to CATCHUP and starts loop-waiting
+ *		 until either the table state is set to SYNCDONE or the sync worker
+ *		 exits.
  *	   - After the sync worker has seen the state change to CATCHUP, it will
  *		 read the stream and apply changes (acting like an apply worker) until
  *		 it catches up to the specified stream position.  Then it sets the
  *		 state to SYNCDONE.  There might be zero changes applied between
  *		 CATCHUP and SYNCDONE, because the sync worker might be ahead of the
  *		 apply worker.
- *	   - Once the state was set to SYNCDONE, the apply will continue tracking
+ *	   - Once the state is set to SYNCDONE, the apply will continue tracking
  *		 the table until it reaches the SYNCDONE stream position, at which
  *		 point it sets state to READY and stops tracking.  Again, there might
  *		 be zero changes in between.
  *
- *	  So the state progression is always: INIT -> DATASYNC -> SYNCWAIT -> CATCHUP ->
- *	  SYNCDONE -> READY.
+ *	  So the state progression is always: INIT -> DATASYNC -> SYNCWAIT ->
+ *	  CATCHUP -> SYNCDONE -> READY.
  *
  *	  The catalog pg_subscription_rel is used to keep information about
  *	  subscribed tables and their state.  Some transient state during data
@@ -67,6 +71,7 @@
  *			-> continue rep
  *		  apply:11
  *			-> set in catalog READY
+ *
  *	   - Sync in front:
  *		  sync:10
  *			-> set in memory SYNCWAIT
@@ -142,13 +147,14 @@ finish_sync_worker(void)
 }
 
 /*
- * Wait until the relation synchronization state is set in the catalog to the
- * expected one.
+ * Wait until the relation sync state is set in the catalog to the expected
+ * one; return true when it happens.
  *
- * Used when transitioning from CATCHUP state to SYNCDONE.
+ * Returns false if the table sync worker or the table itself have
+ * disappeared, or the table state has been reset.
  *
- * Returns false if the synchronization worker has disappeared or the table state
- * has been reset.
+ * Currently, this is used in the apply worker when transitioning from
+ * CATCHUP state to SYNCDONE.
  */
 static bool
 wait_for_relation_state_change(Oid relid, char expected_state)
@@ -162,28 +168,23 @@ wait_for_relation_state_change(Oid relid, char expected_state)
 
 		CHECK_FOR_INTERRUPTS();
 
-		/* XXX use cache invalidation here to improve performance? */
-		PushActiveSnapshot(GetLatestSnapshot());
+		InvalidateCatalogSnapshot();
 		state = GetSubscriptionRelState(MyLogicalRepWorker->subid,
 										relid, &statelsn, true);
-		PopActiveSnapshot();
 
 		if (state == SUBREL_STATE_UNKNOWN)
-			return false;
+			break;
 
 		if (state == expected_state)
 			return true;
 
 		/* Check if the sync worker is still running and bail if not. */
 		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-
-		/* Check if the opposite worker is still running and bail if not. */
-		worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
-										am_tablesync_worker() ? InvalidOid : relid,
+		worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid,
 										false);
 		LWLockRelease(LogicalRepWorkerLock);
 		if (!worker)
-			return false;
+			break;
 
 		(void) WaitLatch(MyLatch,
 						 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
@@ -810,6 +811,9 @@ copy_table(Relation rel)
 /*
  * Start syncing the table in the sync worker.
  *
+ * If nothing needs to be done to sync the table, we exit the worker without
+ * any further action.
+ *
  * The returned slot name is palloc'ed in current memory context.
  */
 char *
@@ -819,6 +823,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	char	   *err;
 	char		relstate;
 	XLogRecPtr	relstate_lsn;
+	Relation	rel;
+	WalRcvExecResult *res;
 
 	/* Check the state of the table synchronization. */
 	StartTransactionCommand();
@@ -832,6 +838,18 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	MyLogicalRepWorker->relstate_lsn = relstate_lsn;
 	SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
+	/*
+	 * If synchronization is already done or no longer necessary, exit now
+	 * that we've updated shared memory state.
+	 */
+	switch (relstate)
+	{
+		case SUBREL_STATE_SYNCDONE:
+		case SUBREL_STATE_READY:
+		case SUBREL_STATE_UNKNOWN:
+			finish_sync_worker();	/* doesn't return */
+	}
+
 	/*
 	 * To build a slot name for the sync work, we are limited to NAMEDATALEN -
 	 * 1 characters.  We cut the original slot name to NAMEDATALEN - 28 chars
@@ -856,134 +874,87 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 		ereport(ERROR,
 				(errmsg("could not connect to the publisher: %s", err)));
 
-	switch (MyLogicalRepWorker->relstate)
-	{
-		case SUBREL_STATE_INIT:
-		case SUBREL_STATE_DATASYNC:
-			{
-				Relation	rel;
-				WalRcvExecResult *res;
+	Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
+		   MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC);
 
-				SpinLockAcquire(&MyLogicalRepWorker->relmutex);
-				MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
-				MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
-				SpinLockRelease(&MyLogicalRepWorker->relmutex);
+	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+	MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
+	MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
+	SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
-				/* Update the state and make it visible to others. */
-				StartTransactionCommand();
-				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
-										   MyLogicalRepWorker->relid,
-										   MyLogicalRepWorker->relstate,
-										   MyLogicalRepWorker->relstate_lsn);
-				CommitTransactionCommand();
-				pgstat_report_stat(false);
+	/* Update the state and make it visible to others. */
+	StartTransactionCommand();
+	UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+							   MyLogicalRepWorker->relid,
+							   MyLogicalRepWorker->relstate,
+							   MyLogicalRepWorker->relstate_lsn);
+	CommitTransactionCommand();
+	pgstat_report_stat(false);
 
-				/*
-				 * We want to do the table data sync in a single transaction.
-				 */
-				StartTransactionCommand();
+	/*
+	 * We want to do the table data sync in a single transaction.
+	 */
+	StartTransactionCommand();
 
-				/*
-				 * Use a standard write lock here. It might be better to
-				 * disallow access to the table while it's being synchronized.
-				 * But we don't want to block the main apply process from
-				 * working and it has to open the relation in RowExclusiveLock
-				 * when remapping remote relation id to local one.
-				 */
-				rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
+	/*
+	 * Use a standard write lock here. It might be better to disallow access
+	 * to the table while it's being synchronized. But we don't want to block
+	 * the main apply process from working and it has to open the relation in
+	 * RowExclusiveLock when remapping remote relation id to local one.
+	 */
+	rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
 
-				/*
-				 * Create a temporary slot for the sync process. We do this
-				 * inside the transaction so that we can use the snapshot made
-				 * by the slot to get existing data.
-				 */
-				res = walrcv_exec(wrconn,
-								  "BEGIN READ ONLY ISOLATION LEVEL "
-								  "REPEATABLE READ", 0, NULL);
-				if (res->status != WALRCV_OK_COMMAND)
-					ereport(ERROR,
-							(errmsg("table copy could not start transaction on publisher"),
-							 errdetail("The error was: %s", res->err)));
-				walrcv_clear_result(res);
+	/*
+	 * Start a transaction in the remote node in REPEATABLE READ mode.  This
+	 * ensures that both the replication slot we create (see below) and the
+	 * COPY are consistent with each other.
+	 */
+	res = walrcv_exec(wrconn,
+					  "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
+					  0, NULL);
+	if (res->status != WALRCV_OK_COMMAND)
+		ereport(ERROR,
+				(errmsg("table copy could not start transaction on publisher"),
+				 errdetail("The error was: %s", res->err)));
+	walrcv_clear_result(res);
 
-				/*
-				 * Create new temporary logical decoding slot.
-				 *
-				 * We'll use slot for data copy so make sure the snapshot is
-				 * used for the transaction; that way the COPY will get data
-				 * that is consistent with the lsn used by the slot to start
-				 * decoding.
-				 */
-				walrcv_create_slot(wrconn, slotname, true,
-								   CRS_USE_SNAPSHOT, origin_startpos);
+	/*
+	 * Create a new temporary logical decoding slot.  This slot will be used
+	 * for the catchup phase after COPY is done, so tell it to use the
+	 * snapshot to make the final data consistent.
+	 */
+	walrcv_create_slot(wrconn, slotname, true,
+					   CRS_USE_SNAPSHOT, origin_startpos);
 
-				PushActiveSnapshot(GetTransactionSnapshot());
-				copy_table(rel);
-				PopActiveSnapshot();
+	/* Now do the initial data copy */
+	PushActiveSnapshot(GetTransactionSnapshot());
+	copy_table(rel);
+	PopActiveSnapshot();
 
-				res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
-				if (res->status != WALRCV_OK_COMMAND)
-					ereport(ERROR,
-							(errmsg("table copy could not finish transaction on publisher"),
-							 errdetail("The error was: %s", res->err)));
-				walrcv_clear_result(res);
+	res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
+	if (res->status != WALRCV_OK_COMMAND)
+		ereport(ERROR,
+				(errmsg("table copy could not finish transaction on publisher"),
+				 errdetail("The error was: %s", res->err)));
+	walrcv_clear_result(res);
 
-				table_close(rel, NoLock);
+	table_close(rel, NoLock);
 
-				/* Make the copy visible. */
-				CommandCounterIncrement();
+	/* Make the copy visible. */
+	CommandCounterIncrement();
 
-				/*
-				 * We are done with the initial data synchronization, update
-				 * the state.
-				 */
-				SpinLockAcquire(&MyLogicalRepWorker->relmutex);
-				MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
-				MyLogicalRepWorker->relstate_lsn = *origin_startpos;
-				SpinLockRelease(&MyLogicalRepWorker->relmutex);
-
-				/* Wait for main apply worker to tell us to catchup. */
-				wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
-
-				/*----------
-				 * There are now two possible states here:
-				 * a) Sync is behind the apply.  If that's the case we need to
-				 *	  catch up with it by consuming the logical replication
-				 *	  stream up to the relstate_lsn.  For that, we exit this
-				 *	  function and continue in ApplyWorkerMain().
-				 * b) Sync is caught up with the apply.  So it can just set
-				 *	  the state to SYNCDONE and finish.
-				 *----------
-				 */
-				if (*origin_startpos >= MyLogicalRepWorker->relstate_lsn)
-				{
-					/*
-					 * Update the new state in catalog.  No need to bother
-					 * with the shmem state as we are exiting for good.
-					 */
-					UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
-											   MyLogicalRepWorker->relid,
-											   SUBREL_STATE_SYNCDONE,
-											   *origin_startpos);
-					finish_sync_worker();
-				}
-				break;
-			}
-		case SUBREL_STATE_SYNCDONE:
-		case SUBREL_STATE_READY:
-		case SUBREL_STATE_UNKNOWN:
-
-			/*
-			 * Nothing to do here but finish.  (UNKNOWN means the relation was
-			 * removed from pg_subscription_rel before the sync worker could
-			 * start.)
-			 */
-			finish_sync_worker();
-			break;
-		default:
-			elog(ERROR, "unknown relation state \"%c\"",
-				 MyLogicalRepWorker->relstate);
-	}
+	/*
+	 * We are done with the initial data synchronization, update the state.
+	 */
+	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+	MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
+	MyLogicalRepWorker->relstate_lsn = *origin_startpos;
+	SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
+	/*
+	 * Finally, wait until the main apply worker tells us to catch up and then
+	 * return to let LogicalRepApplyLoop do it.
+	 */
+	wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
 	return slotname;
 }
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 8d5d9e05b3..8ed4cb1405 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -2065,6 +2065,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 {
 	TimestampTz last_recv_timestamp = GetCurrentTimestamp();
 	bool		ping_sent = false;
+	TimeLineID	tli;
 
 	/*
 	 * Init the ApplyMessageContext which we clean up after each replication
@@ -2206,12 +2207,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
 		/* Check if we need to exit the streaming loop. */
 		if (endofstream)
-		{
-			TimeLineID	tli;
-
-			walrcv_endstreaming(wrconn, &tli);
 			break;
-		}
 
 		/*
 		 * Wait for more data or latch.  If we have unflushed transactions,
@@ -2288,6 +2284,9 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 			send_feedback(last_received, requestReply, requestReply);
 		}
 	}
+
+	/* All done */
+	walrcv_endstreaming(wrconn, &tli);
 }
 
 /*
@@ -3029,10 +3028,8 @@ ApplyWorkerMain(Datum main_arg)
 		/* This is table synchronization worker, call initial sync. */
 		syncslotname = LogicalRepSyncTableStart(&origin_startpos);
 
-		/* The slot name needs to be allocated in permanent memory context. */
-		oldctx = MemoryContextSwitchTo(ApplyContext);
-		myslotname = pstrdup(syncslotname);
-		MemoryContextSwitchTo(oldctx);
+		/* allocate slot name in long-lived context */
+		myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
 
 		pfree(syncslotname);
 	}
-- 
2.20.1

