From e2f5cedfc35652f198f9ca3bca9a45572f8845fb Mon Sep 17 00:00:00 2001
From: Amit Khandekar <amit.khandekar@enterprisedb.com>
Date: Thu, 16 Jan 2020 10:05:15 +0530
Subject: [PATCH v6 1/5] Allow logical decoding on standby.

Allow a logical slot to be created on standby. Restrict its usage
or its creation if wal_level on primary is less than logical.
During slot creation, it's restart_lsn is set to the last replayed
LSN. Effectively, a logical slot creation on standby waits for an
xl_running_xact record to arrive from primary. Conflicting slots
would be handled in next commits.

Andres Freund and Amit Khandekar.
---
 src/backend/access/transam/xlog.c         | 11 +++++
 src/backend/replication/logical/decode.c  | 22 ++++++++-
 src/backend/replication/logical/logical.c | 37 ++++++++-------
 src/backend/replication/slot.c            | 57 +++++++++++++++--------
 src/backend/replication/walsender.c       | 10 ++--
 src/include/access/xlog.h                 |  1 +
 6 files changed, 98 insertions(+), 40 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index de2d4ee582..a666b4b935 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -4967,6 +4967,17 @@ LocalProcessControlFile(bool reset)
 	ReadControlFile();
 }
 
+/*
+ * Get the wal_level from the control file. For a standby, this value should be
+ * considered as its active wal_level, because it may be different from what
+ * was originally configured on standby.
+ */
+WalLevel
+GetActiveWalLevel(void)
+{
+	return ControlFile->wal_level;
+}
+
 /*
  * Initialization of shared memory for XLOG
  */
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index c2e5e3abf8..3a072af75b 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -187,11 +187,31 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			 * can restart from there.
 			 */
 			break;
+		case XLOG_PARAMETER_CHANGE:
+		{
+			xl_parameter_change *xlrec =
+				(xl_parameter_change *) XLogRecGetData(buf->record);
+
+			/*
+			 * If wal_level on primary is reduced to less than logical, then we
+			 * want to prevent existing logical slots from being used.
+			 * Existing logical slots on standby get dropped when this WAL
+			 * record is replayed; and further, slot creation fails when the
+			 * wal level is not sufficient; but all these operations are not
+			 * synchronized, so a logical slot may creep in while the wal_level
+			 * is being reduced.  Hence this extra check.
+			 */
+			if (xlrec->wal_level < WAL_LEVEL_LOGICAL)
+				ereport(ERROR,
+						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+						 errmsg("logical decoding on standby requires "
+								"wal_level >= logical on master")));
+			break;
+		}
 		case XLOG_NOOP:
 		case XLOG_NEXTOID:
 		case XLOG_SWITCH:
 		case XLOG_BACKUP_END:
-		case XLOG_PARAMETER_CHANGE:
 		case XLOG_RESTORE_POINT:
 		case XLOG_FPW_CHANGE:
 		case XLOG_FPI_FOR_HINT:
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 5adf253583..03463719f8 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -91,23 +91,22 @@ CheckLogicalDecodingRequirements(void)
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("logical decoding requires a database connection")));
 
-	/* ----
-	 * TODO: We got to change that someday soon...
-	 *
-	 * There's basically three things missing to allow this:
-	 * 1) We need to be able to correctly and quickly identify the timeline a
-	 *	  LSN belongs to
-	 * 2) We need to force hot_standby_feedback to be enabled at all times so
-	 *	  the primary cannot remove rows we need.
-	 * 3) support dropping replication slots referring to a database, in
-	 *	  dbase_redo. There can't be any active ones due to HS recovery
-	 *	  conflicts, so that should be relatively easy.
-	 * ----
-	 */
 	if (RecoveryInProgress())
-		ereport(ERROR,
-				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("logical decoding cannot be used while in recovery")));
+	{
+		/*
+		 * This check may have race conditions, but whenever
+		 * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we
+		 * verify that there are no existing logical replication slots. And to
+		 * avoid races around creating a new slot,
+		 * CheckLogicalDecodingRequirements() is called once before creating
+		 * the slot, and once when logical decoding is initially starting up.
+		 */
+		if (GetActiveWalLevel() < WAL_LEVEL_LOGICAL)
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("logical decoding on standby requires "
+							"wal_level >= logical on master")));
+	}
 }
 
 /*
@@ -240,6 +239,12 @@ CreateInitDecodingContext(char *plugin,
 	LogicalDecodingContext *ctx;
 	MemoryContext old_context;
 
+	/*
+	 * On standby, this check is also required while creating the slot. Check
+	 * the comments in this function.
+	 */
+	CheckLogicalDecodingRequirements();
+
 	/* shorter lines... */
 	slot = MyReplicationSlot;
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 1cec53d748..00aa95ba15 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1016,37 +1016,56 @@ ReplicationSlotReserveWal(void)
 		/*
 		 * For logical slots log a standby snapshot and start logical decoding
 		 * at exactly that position. That allows the slot to start up more
-		 * quickly.
+		 * quickly. But on a standby we cannot do WAL writes, so just use the
+		 * replay pointer; effectively, an attempt to create a logical slot on
+		 * standby will cause it to wait for an xl_running_xact record to be
+		 * logged independently on the primary, so that a snapshot can be built
+		 * using the record.
 		 *
-		 * That's not needed (or indeed helpful) for physical slots as they'll
-		 * start replay at the last logged checkpoint anyway. Instead return
-		 * the location of the last redo LSN. While that slightly increases
-		 * the chance that we have to retry, it's where a base backup has to
-		 * start replay at.
+		 * None of this is needed (or indeed helpful) for physical slots as
+		 * they'll start replay at the last logged checkpoint anyway. Instead
+		 * return the location of the last redo LSN. While that slightly
+		 * increases the chance that we have to retry, it's where a base backup
+		 * has to start replay at.
 		 */
+		if (SlotIsPhysical(slot))
+			restart_lsn = GetRedoRecPtr();
+		else if (RecoveryInProgress())
+		{
+			restart_lsn = GetXLogReplayRecPtr(NULL);
+			/*
+			 * Replay pointer may point one past the end of the record. If that
+			 * is a XLOG page boundary, it will not be a valid LSN for the
+			 * start of a record, so bump it up past the page header.
+			 */
+			if (!XRecOffIsValid(restart_lsn))
+			{
+				if (restart_lsn % XLOG_BLCKSZ != 0)
+					elog(ERROR, "invalid replay pointer");
+				/* For the first page of a segment file, it's a long header */
+				if (XLogSegmentOffset(restart_lsn, wal_segment_size) == 0)
+					restart_lsn += SizeOfXLogLongPHD;
+				else
+					restart_lsn += SizeOfXLogShortPHD;
+			}
+		}
+		else
+			restart_lsn = GetXLogInsertRecPtr();
+
+		SpinLockAcquire(&slot->mutex);
+		slot->data.restart_lsn = restart_lsn;
+		SpinLockRelease(&slot->mutex);
+
 		if (!RecoveryInProgress() && SlotIsLogical(slot))
 		{
 			XLogRecPtr	flushptr;
 
-			/* start at current insert position */
-			restart_lsn = GetXLogInsertRecPtr();
-			SpinLockAcquire(&slot->mutex);
-			slot->data.restart_lsn = restart_lsn;
-			SpinLockRelease(&slot->mutex);
-
 			/* make sure we have enough information to start */
 			flushptr = LogStandbySnapshot();
 
 			/* and make sure it's fsynced to disk */
 			XLogFlush(flushptr);
 		}
-		else
-		{
-			restart_lsn = GetRedoRecPtr();
-			SpinLockAcquire(&slot->mutex);
-			slot->data.restart_lsn = restart_lsn;
-			SpinLockRelease(&slot->mutex);
-		}
 
 		/* prevent WAL removal as fast as possible */
 		ReplicationSlotsComputeRequiredLSN();
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 76ec3c7dd0..9274fec10b 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2819,10 +2819,12 @@ XLogSendLogical(void)
 	 * If first time through in this session, initialize flushPtr.  Otherwise,
 	 * we only need to update flushPtr if EndRecPtr is past it.
 	 */
-	if (flushPtr == InvalidXLogRecPtr)
-		flushPtr = GetFlushRecPtr();
-	else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
-		flushPtr = GetFlushRecPtr();
+	if (flushPtr == InvalidXLogRecPtr ||
+		logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
+	{
+		flushPtr = (am_cascading_walsender ?
+					GetStandbyFlushRecPtr() : GetFlushRecPtr());
+	}
 
 	/* If EndRecPtr is still past our flushPtr, it means we caught up. */
 	if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 98b033fc20..6228779df6 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -298,6 +298,7 @@ extern Size XLOGShmemSize(void);
 extern void XLOGShmemInit(void);
 extern void BootStrapXLOG(void);
 extern void LocalProcessControlFile(bool reset);
+extern WalLevel GetActiveWalLevel(void);
 extern void StartupXLOG(void);
 extern void ShutdownXLOG(int code, Datum arg);
 extern void InitXLOGAccess(void);
-- 
2.20.1

