From 8952bc955585d12a06cdd6d339bcbabafc772768 Mon Sep 17 00:00:00 2001
From: Arseny Sher <sher-ars@yandex.ru>
Date: Tue, 22 Oct 2019 19:02:14 +0300
Subject: [PATCH 1/2] Don't use serialized snapshots during logical slot
 creation.

snapbuild.c enters SNAPBUILD_CONSISTENT immediately after deserializing the
snapshot. Generally this is incorrect because SNAPBUILD_CONSISTENT means not
just complete snapshot, but also reorderbuffer filled with all currently running
xacts. This is fine for decoding sessions with existing slots though as they
won't stream anything before confirmed_flush_lsn is reached anyway, at which
point all transactions which hadn't got into reorderbuffer would definitely
finish. However, new slots might be created too early, thus losing (not
decoding) parts of transactions committed after freshly created
confirmed_flush_lsn. This can happen under the following extremely unlucky
circumstances:
 - New slot creation reserves point in WAL since which it would read it
   (GetXLogInsertRecPtr);
 - It logs xl_running_xacts to start assembling a snapshot;
 - Running decoding session with another slot quickly reads this
   xl_running_xacts and serializes its snapshot;
 - New slot reads xl_running_xacts and picks this snapshot up, saying that it
   is ready to stream henceforth, though its reorderbuffer is empty.

Fix by forbidding to use serialized snapshots during slot creation. Actually we
could use snapshot if it was picked at xl_running_xacts where we could perform
usual FULL -> CONSISTENT transition conservatively via nextXid, but that seems
like a complication for a too small benefit.
---
 src/backend/replication/logical/logical.c   |  2 +-
 src/backend/replication/logical/snapbuild.c | 85 +++++++++++++++++++++++------
 2 files changed, 70 insertions(+), 17 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index da265f5294..67ed52e56f 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -327,7 +327,7 @@ CreateInitDecodingContext(char *plugin,
 	ReplicationSlotMarkDirty();
 	ReplicationSlotSave();
 
-	ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
+	ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
 								 need_full_snapshot, false,
 								 read_page, prepare_write, do_write,
 								 update_progress);
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 0bd1d0f954..6f503a93aa 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -165,7 +165,7 @@ struct SnapBuild
 
 	/*
 	 * Don't replay commits from an LSN < this LSN. This can be set externally
-	 * but it will also be advanced (never retreat) from within snapbuild.c.
+	 * or established by snapbuild.c once consistent snapshot is assembled.
 	 */
 	XLogRecPtr	start_decoding_at;
 
@@ -309,7 +309,16 @@ SnapBuildStartNextPhaseAt(SnapBuild *builder, TransactionId at)
  * Allocate a new snapshot builder.
  *
  * xmin_horizon is the xid >= which we can be sure no catalog rows have been
- * removed, start_lsn is the LSN >= we want to replay commits.
+ * removed.
+ * start_lsn is InvalidXLogRecPtr or LSN >= we want to replay commits.
+ *  InvalidXLogRecPtr is used during slot creation; snapbuild will assemble
+ *  consistent snapshot and (if continue decoding -- no core code does that
+ *  currently) stream all further commits.
+ *  If normal LSN is passed, caller *must* be sure that WAL reading starts
+ *  early enough to get the snapshot and pick up the first change of
+ *  earliest xact to stream. Slot creation and advancement machinery
+ *  guarantees that slot's <restart_lsn, confirmed_flush_lsn> pair always
+ *  satisfies this.
  */
 SnapBuild *
 AllocateSnapshotBuilder(ReorderBuffer *reorder,
@@ -408,7 +417,8 @@ SnapBuildCurrentState(SnapBuild *builder)
 bool
 SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
 {
-	return ptr < builder->start_decoding_at;
+	return XLogRecPtrIsInvalid(builder->start_decoding_at) ||
+		ptr < builder->start_decoding_at;
 }
 
 /*
@@ -945,16 +955,16 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
 		 TransactionIdPrecedes(xid, SnapBuildNextPhaseAt(builder))))
 	{
 		/* ensure that only commits after this are getting replayed */
-		if (builder->start_decoding_at <= lsn)
-			builder->start_decoding_at = lsn + 1;
+		Assert(XLogRecPtrIsInvalid(builder->start_decoding_at)	||
+			   builder->start_decoding_at > lsn);
 		return;
 	}
 
 	if (builder->state < SNAPBUILD_CONSISTENT)
 	{
 		/* ensure that only commits after this are getting replayed */
-		if (builder->start_decoding_at <= lsn)
-			builder->start_decoding_at = lsn + 1;
+		Assert(XLogRecPtrIsInvalid(builder->start_decoding_at)	||
+			   builder->start_decoding_at > lsn);
 
 		/*
 		 * If building an exportable snapshot, force xid to be tracked, even
@@ -966,6 +976,16 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
 		}
 	}
 
+	if (!XLogRecPtrIsInvalid(builder->start_decoding_at) &&
+		builder->start_decoding_at <= lsn)
+	{
+		/*
+		 * We are going to stream this xact, so must already have fine
+		 * snapshot.
+		 */
+		Assert(builder->state == SNAPBUILD_CONSISTENT);
+	}
+
 	for (nxact = 0; nxact < nsubxacts; nxact++)
 	{
 		TransactionId subxid = subxacts[nxact];
@@ -1250,10 +1270,10 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 	 */
 	if (running->oldestRunningXid == running->nextXid)
 	{
-		if (builder->start_decoding_at == InvalidXLogRecPtr ||
-			builder->start_decoding_at <= lsn)
+		if (XLogRecPtrIsInvalid(builder->start_decoding_at))
 			/* can decode everything after this */
 			builder->start_decoding_at = lsn + 1;
+		Assert(builder->start_decoding_at >= lsn);
 
 		/* As no transactions were running xmin/xmax can be trivially set. */
 		builder->xmin = running->nextXid;	/* < are finished */
@@ -1277,7 +1297,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 	else if (!builder->building_full_snapshot &&
 			 SnapBuildRestore(builder, lsn))
 	{
-		/* there won't be any state to cleanup */
+		/* there won't be much state to cleanup */
 		return false;
 	}
 
@@ -1358,6 +1378,10 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 	{
 		builder->state = SNAPBUILD_CONSISTENT;
 		SnapBuildStartNextPhaseAt(builder, InvalidTransactionId);
+		if (XLogRecPtrIsInvalid(builder->start_decoding_at))
+			/* can decode everything after this */
+			builder->start_decoding_at = lsn + 1;
+		Assert(builder->start_decoding_at >= lsn);
 
 		ereport(LOG,
 				(errmsg("logical decoding found consistent point at %X/%X",
@@ -1499,8 +1523,12 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
 		   builder->last_serialized_snapshot <= lsn);
 
 	/*
-	 * no point in serializing if we cannot continue to work immediately after
-	 * restoring the snapshot
+	 * No point in serializing if the snapshot is not complete (< FULL).
+	 * FULL snapshot per se is just as good as CONSISTENT; difference between
+	 * these states is not a snapshot property, but whether we have filled
+	 * reorderbuffer with all currently running xacts. However, there isn't
+	 * much sense in serializing it as serialized snaps are used only for
+	 * decoding from existing slots, which had already reached CONSISTENT.
 	 */
 	if (builder->state < SNAPBUILD_CONSISTENT)
 		return;
@@ -1688,7 +1716,8 @@ out:
 
 /*
  * Restore a snapshot into 'builder' if previously one has been stored at the
- * location indicated by 'lsn'. Returns true if successful, false otherwise.
+ * location indicated by 'lsn'.
+ * Returns true if successful, false otherwise.
  */
 static bool
 SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
@@ -1704,6 +1733,16 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
 	if (builder->state == SNAPBUILD_CONSISTENT)
 		return false;
 
+	/*
+	 * Don't use serialized snapshot if we are not sure where all
+	 * currently running xacts will finish (new slot creation).
+	 * (Actually, if we came here through xl_running_xacts, we could perform
+	 * SNAPBUILD_FULL_SNAPSHOT -> SNAPBUILD_CONSISTENT transition properly,
+	 * but added lines of code would hardly worth the benefit.)
+	 */
+	if (builder->start_decoding_at == InvalidXLogRecPtr)
+		return false;
+
 	sprintf(path, "pg_logical/snapshots/%X-%X.snap",
 			(uint32) (lsn >> 32), (uint32) lsn);
 
@@ -1888,7 +1927,6 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
 	/* ok, we think the snapshot is sensible, copy over everything important */
 	builder->xmin = ondisk.builder.xmin;
 	builder->xmax = ondisk.builder.xmax;
-	builder->state = ondisk.builder.state;
 
 	builder->committed.xcnt = ondisk.builder.committed.xcnt;
 	/* We only allocated/stored xcnt, not xcnt_space xids ! */
@@ -1901,6 +1939,23 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
 	}
 	ondisk.builder.committed.xip = NULL;
 
+	/*
+	 * We probably don't have in reorderbuffer parts of currently running
+	 * xacts right now. However, snapbuild can't determine *earliest* spot
+	 * where decoding is safe (we'd see all xacts in full) on its own, because
+	 * this requires to know which xids already wrote something, and such
+	 * writes are beyond our reading of WAL. xl_running_xacts->nextXid
+	 * doesn't help us here, because 1) issued xid doesn't necessarily means
+	 * xact wrote something, so there would be a risk of arriving at
+	 * CONSISTENT a bit later than possible which is ok for new slot creation,
+	 * but unacceptable otherwise; 2) snapshot might be serialized at some
+	 * other point.
+	 *
+	 * Slot must have provided as with safe start_decoding_at though, so just
+	 * skip FULL -> CONSISTENT transition and set CONSISTENT directly.
+	 */
+	builder->state = SNAPBUILD_CONSISTENT;
+
 	/* our snapshot is not interesting anymore, build a new one */
 	if (builder->snapshot != NULL)
 	{
@@ -1911,8 +1966,6 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
 
 	ReorderBufferSetRestartPoint(builder->reorder, lsn);
 
-	Assert(builder->state == SNAPBUILD_CONSISTENT);
-
 	ereport(LOG,
 			(errmsg("logical decoding found consistent point at %X/%X",
 					(uint32) (lsn >> 32), (uint32) lsn),
-- 
2.11.0

