From 688ece91fb2803453088939f046c337abca85297 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Fri, 9 Oct 2020 23:23:09 -0300
Subject: [PATCH 2/2] move declarations and reindent

---
 src/backend/replication/logical/tablesync.c | 142 ++++++++++----------
 1 file changed, 68 insertions(+), 74 deletions(-)

diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 271fcb7822..f1283a7187 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -819,6 +819,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	char	   *err;
 	char		relstate;
 	XLogRecPtr	relstate_lsn;
+	Relation	rel;
+	WalRcvExecResult *res;
 
 	/* Check the state of the table synchronization. */
 	StartTransactionCommand();
@@ -873,91 +875,83 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	Assert(MyLogicalRepWorker->relstate == SUBREL_STATE_INIT ||
 		   MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC);
 
-			{
-				Relation	rel;
-				WalRcvExecResult *res;
+	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+	MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
+	MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
+	SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
-				SpinLockAcquire(&MyLogicalRepWorker->relmutex);
-				MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC;
-				MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr;
-				SpinLockRelease(&MyLogicalRepWorker->relmutex);
+	/* Update the state and make it visible to others. */
+	StartTransactionCommand();
+	UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+							   MyLogicalRepWorker->relid,
+							   MyLogicalRepWorker->relstate,
+							   MyLogicalRepWorker->relstate_lsn);
+	CommitTransactionCommand();
+	pgstat_report_stat(false);
 
-				/* Update the state and make it visible to others. */
-				StartTransactionCommand();
-				UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
-										   MyLogicalRepWorker->relid,
-										   MyLogicalRepWorker->relstate,
-										   MyLogicalRepWorker->relstate_lsn);
-				CommitTransactionCommand();
-				pgstat_report_stat(false);
+	/*
+	 * We want to do the table data sync in a single transaction.
+	 */
+	StartTransactionCommand();
 
-				/*
-				 * We want to do the table data sync in a single transaction.
-				 */
-				StartTransactionCommand();
+	/*
+	 * Use a standard write lock here. It might be better to disallow access
+	 * to the table while it's being synchronized. But we don't want to block
+	 * the main apply process from working and it has to open the relation in
+	 * RowExclusiveLock when remapping remote relation id to local one.
+	 */
+	rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
 
-				/*
-				 * Use a standard write lock here. It might be better to
-				 * disallow access to the table while it's being synchronized.
-				 * But we don't want to block the main apply process from
-				 * working and it has to open the relation in RowExclusiveLock
-				 * when remapping remote relation id to local one.
-				 */
-				rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock);
+	/*
+	 * Create a temporary slot for the sync process. We do this inside the
+	 * transaction so that we can use the snapshot made by the slot to get
+	 * existing data.
+	 */
+	res = walrcv_exec(wrconn,
+					  "BEGIN READ ONLY ISOLATION LEVEL "
+					  "REPEATABLE READ", 0, NULL);
+	if (res->status != WALRCV_OK_COMMAND)
+		ereport(ERROR,
+				(errmsg("table copy could not start transaction on publisher"),
+				 errdetail("The error was: %s", res->err)));
+	walrcv_clear_result(res);
 
-				/*
-				 * Create a temporary slot for the sync process. We do this
-				 * inside the transaction so that we can use the snapshot made
-				 * by the slot to get existing data.
-				 */
-				res = walrcv_exec(wrconn,
-								  "BEGIN READ ONLY ISOLATION LEVEL "
-								  "REPEATABLE READ", 0, NULL);
-				if (res->status != WALRCV_OK_COMMAND)
-					ereport(ERROR,
-							(errmsg("table copy could not start transaction on publisher"),
-							 errdetail("The error was: %s", res->err)));
-				walrcv_clear_result(res);
+	/*
+	 * Create new temporary logical decoding slot.
+	 *
+	 * We'll use slot for data copy so make sure the snapshot is used for the
+	 * transaction; that way the COPY will get data that is consistent with
+	 * the lsn used by the slot to start decoding.
+	 */
+	walrcv_create_slot(wrconn, slotname, true,
+					   CRS_USE_SNAPSHOT, origin_startpos);
 
-				/*
-				 * Create new temporary logical decoding slot.
-				 *
-				 * We'll use slot for data copy so make sure the snapshot is
-				 * used for the transaction; that way the COPY will get data
-				 * that is consistent with the lsn used by the slot to start
-				 * decoding.
-				 */
-				walrcv_create_slot(wrconn, slotname, true,
-								   CRS_USE_SNAPSHOT, origin_startpos);
+	PushActiveSnapshot(GetTransactionSnapshot());
+	copy_table(rel);
+	PopActiveSnapshot();
 
-				PushActiveSnapshot(GetTransactionSnapshot());
-				copy_table(rel);
-				PopActiveSnapshot();
+	res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
+	if (res->status != WALRCV_OK_COMMAND)
+		ereport(ERROR,
+				(errmsg("table copy could not finish transaction on publisher"),
+				 errdetail("The error was: %s", res->err)));
+	walrcv_clear_result(res);
 
-				res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
-				if (res->status != WALRCV_OK_COMMAND)
-					ereport(ERROR,
-							(errmsg("table copy could not finish transaction on publisher"),
-							 errdetail("The error was: %s", res->err)));
-				walrcv_clear_result(res);
+	table_close(rel, NoLock);
 
-				table_close(rel, NoLock);
+	/* Make the copy visible. */
+	CommandCounterIncrement();
 
-				/* Make the copy visible. */
-				CommandCounterIncrement();
+	/*
+	 * We are done with the initial data synchronization, update the state.
+	 */
+	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+	MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
+	MyLogicalRepWorker->relstate_lsn = *origin_startpos;
+	SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
-				/*
-				 * We are done with the initial data synchronization, update
-				 * the state.
-				 */
-				SpinLockAcquire(&MyLogicalRepWorker->relmutex);
-				MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT;
-				MyLogicalRepWorker->relstate_lsn = *origin_startpos;
-				SpinLockRelease(&MyLogicalRepWorker->relmutex);
-
-				/* Wait for main apply worker to tell us to catchup. */
-				wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
-			}
+	/* Wait for main apply worker to tell us to catchup. */
+	wait_for_worker_state_change(SUBREL_STATE_CATCHUP);
 
 	/* Finally, let caller complete the stream. */
 	return slotname;
-- 
2.20.1

