From c9446bfc60ae125adea476bcc5b7d50f4bafa38b Mon Sep 17 00:00:00 2001
From: Alexey Kondratov <kondratov.aleksey@gmail.com>
Date: Sun, 29 Dec 2019 14:38:13 +0300
Subject: [PATCH v3] Make physical slot advance to be persistent

Starting from v11 pg_replication_slot_advance result was not persistent
after restart. This patch fixes that bug by proper slot flushing if it
was modified.
---
 src/backend/replication/logical/logical.c | 10 ++++++--
 src/backend/replication/slotfuncs.c       | 31 +++++++++++++++--------
 2 files changed, 29 insertions(+), 12 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 7e06615864..bdb82da39b 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1049,7 +1049,10 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 
 		SpinLockRelease(&MyReplicationSlot->mutex);
 
-		/* first write new xmin to disk, so we know what's up after a crash */
+		/*
+		 * First write new xmin and/or LSN to disk, so we know what's up
+		 * after a crash.
+		 */
 		if (updated_xmin || updated_restart)
 		{
 			ReplicationSlotMarkDirty();
@@ -1057,6 +1060,10 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 			elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart);
 		}
 
+		/* Compute global required LSN if restart_lsn was changed */
+		if (updated_restart)
+			ReplicationSlotsComputeRequiredLSN();
+
 		/*
 		 * Now the new xmin is safely on disk, we can let the global value
 		 * advance. We do not take ProcArrayLock or similar since we only
@@ -1070,7 +1077,6 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
 			SpinLockRelease(&MyReplicationSlot->mutex);
 
 			ReplicationSlotsComputeRequiredXmin(false);
-			ReplicationSlotsComputeRequiredLSN();
 		}
 	}
 	else
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index ba08ad405f..9d0e99afb8 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -370,6 +370,19 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
 		MyReplicationSlot->data.restart_lsn = moveto;
 		SpinLockRelease(&MyReplicationSlot->mutex);
 		retlsn = moveto;
+
+		/*
+		 * Dirty the slot as we updated data that is meant to be persistent
+		 * between restarts, flush it and re-compute global required LSN.
+		 *
+		 * If we change the order of operations and do not flush slot before
+		 * required LSN computing, then there will be a race.  Least recent WAL
+		 * segments may be already utilized, so if we crash and start again with
+		 * old restart_lsn there will be no WAL to proceed properly.
+		 */
+		ReplicationSlotMarkDirty();
+		ReplicationSlotSave();
+		ReplicationSlotsComputeRequiredLSN();
 	}
 
 	return retlsn;
@@ -463,6 +476,10 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
 
 		if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
 		{
+			/*
+			 * It also takes care about computing and updating global
+			 * required xmin and LSN if needed.
+			 */
 			LogicalConfirmReceivedLocation(moveto);
 
 			/*
@@ -564,7 +581,10 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 						(uint32) (moveto >> 32), (uint32) moveto,
 						(uint32) (minlsn >> 32), (uint32) minlsn)));
 
-	/* Do the actual slot update, depending on the slot type */
+	/*
+	 * Do the actual slot update, depending on the slot type.  Slot will be
+	 * marked as dirty and flushed by pg_*_replication_slot_advance if needed.
+	 */
 	if (OidIsValid(MyReplicationSlot->data.database))
 		endlsn = pg_logical_replication_slot_advance(moveto);
 	else
@@ -573,15 +593,6 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 	values[0] = NameGetDatum(&MyReplicationSlot->data.name);
 	nulls[0] = false;
 
-	/* Update the on disk state when lsn was updated. */
-	if (XLogRecPtrIsInvalid(endlsn))
-	{
-		ReplicationSlotMarkDirty();
-		ReplicationSlotsComputeRequiredXmin(false);
-		ReplicationSlotsComputeRequiredLSN();
-		ReplicationSlotSave();
-	}
-
 	ReplicationSlotRelease();
 
 	/* Return the reached position. */

base-commit: 0ce38730ac72029f3f2c95ae80b44f5b9060cbcc
-- 
2.19.1

