From 86a03671c3bdbaee3dabe02a9a1f8e2e23098652 Mon Sep 17 00:00:00 2001
From: Arseny Sher <sher-ars@yandex.ru>
Date: Tue, 22 Oct 2019 19:02:14 +0300
Subject: [PATCH 1/2] Fix serialized snapshot usage for new logical slots.

Previously, snapbuilder entered into SNAPBUILD_CONSISTENT immediately after
deserializing the snapshot. Generally this is incorrect because
SNAPBUILD_CONSISTENT means not just complete snapshot, but also reorderbuffer
filled with all currently running xacts. This is painless for decoding sessions
with existing slots because they won't stream anything before
confirmed_flush_lsn is reached anyway, at which point all transactions which
hadn't got into reorderbuffer would definitely finish. However, new slots might
be created too early, thus losing (not decoding) parts of transactions committed
after freshly created confirmed_flush_lsn. This can happen under the following
extremely unlucky circumstances:
 - New slot creation reserves point in WAL since which it would read it
   (GetXLogInsertRecPtr);
 - It logs xl_running_xacts to start assembling a snapshot;
 - Running decoding session with another slot quickly reads this
   xl_running_xacts and serializes its snapshot;
 - New slot reads xl_running_xacts and picks this snapshot up, saying that it
   is ready to stream henceforth, though its reorderbuffer is empty.

It turns out, as comment to AllocateSnapshotBuilder explains, that snapbuilder
can't locate *earliest* possible point for streaming on its own. Thus, if
snapbuild users know it (decoding session for existing slot), we trust them and
switch after snapshot deserialization into SNAPBUILD_CONSISTENT as
previously. However, if it is not known (new slot creation), switch into
SNAPBUILD_FULL_SNAPSHOT and wait for all running xacts to finish as usual.
---
 src/backend/replication/logical/logical.c   |   2 +-
 src/backend/replication/logical/snapbuild.c | 141 ++++++++++++++++++++++------
 2 files changed, 115 insertions(+), 28 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index da265f5294..67ed52e56f 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -327,7 +327,7 @@ CreateInitDecodingContext(char *plugin,
 	ReplicationSlotMarkDirty();
 	ReplicationSlotSave();
 
-	ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
+	ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
 								 need_full_snapshot, false,
 								 read_page, prepare_write, do_write,
 								 update_progress);
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 0bd1d0f954..3ed178a4f8 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -165,7 +165,7 @@ struct SnapBuild
 
 	/*
 	 * Don't replay commits from an LSN < this LSN. This can be set externally
-	 * but it will also be advanced (never retreat) from within snapbuild.c.
+	 * or established by snapbuild.c once consistent snapshot is assembled.
 	 */
 	XLogRecPtr	start_decoding_at;
 
@@ -275,7 +275,7 @@ static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutof
 
 /* serialization functions */
 static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
-static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
+static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn, TransactionId nextXid);
 
 /*
  * Return TransactionId after which the next phase of initial snapshot
@@ -309,7 +309,42 @@ SnapBuildStartNextPhaseAt(SnapBuild *builder, TransactionId at)
  * Allocate a new snapshot builder.
  *
  * xmin_horizon is the xid >= which we can be sure no catalog rows have been
- * removed, start_lsn is the LSN >= we want to replay commits.
+ * removed.
+ * start_lsn is InvalidXLogRecPtr or LSN >= we want to replay commits.
+ *  InvalidXLogRecPtr is used during slot creation; snapbuild will assemble
+ *  consistent snapshot and (if continue decoding -- no core code does that
+ *  currently) stream all further commits.
+ *  If normal lsn is passed, caller *must* be sure that WAL reading starts
+ *  early enough to build the snapshot and pick up the first change of
+ *  earliest xact to stream. Slot creation and advancement machinery
+ *  guarantees that slot's <restart_lsn, confirmed_flush_lsn> pair always
+ *  satisfies this.
+ *
+ *	The condition is the caller's responsibility because in some cases
+ *	snapbuilder has no way to enforce this rule without risk of skipping legit
+ *	transactions. Specifically, after constructing (usually deserializing) FULL
+ *	snapshot snapbuilder must iterate over WAL further until all running xacts
+ *	from the snap finish, as we must see with correct snapshots all changes of
+ *	every transaction we are going to stream. However, snapbuild can't
+ *  *immediately* notice this point, because
+ *    - Snapbuilder updates xmin by taking it from xl_running_xacts
+ *      (c.f. SnapBuildProcessRunningXacts). Even if we guarantee that, for
+ *      each possible WAL reading starting position, there is always an an
+ *      xl_running_xacts records logged right before the earliest possible
+ *      streaming point -- IOW, after all xacts which we can't stream had
+ *      finished (which is currently true btw, as slot's advancement is
+ *      considered only at xl_running_xacts) -- that would not be enough due
+ *      to races around xl_running_xacts, i.e with WAL like
+ *      [ <T1> <restart_lsn> <T1 commit> <confirmed_flush_lsn, xrx> <T2 commit> ]
+ *      T2 might be skipped if T1 is shown as running in xl_running_xacts.
+ *	  - Tracking xmin manually by recoding commits is not only inefficient,
+ *      it just not feasible because serialized snapshot is not full: it
+ *      contains only committed catalog-modifying xacts. Thus, we can't
+ *      distinguish non-catalog-modifying xact committed before serialized
+ *      snapshot from not yet committed one.
+ *  So, trust the caller: once given start_lsn is reached, it means we must
+ *  have reached the point where all further xacts can be streamed, FULL ->
+ *  CONSISTENT transition.
  */
 SnapBuild *
 AllocateSnapshotBuilder(ReorderBuffer *reorder,
@@ -408,7 +443,8 @@ SnapBuildCurrentState(SnapBuild *builder)
 bool
 SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
 {
-	return ptr < builder->start_decoding_at;
+	return XLogRecPtrIsInvalid(builder->start_decoding_at) ||
+		ptr < builder->start_decoding_at;
 }
 
 /*
@@ -945,16 +981,16 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
 		 TransactionIdPrecedes(xid, SnapBuildNextPhaseAt(builder))))
 	{
 		/* ensure that only commits after this are getting replayed */
-		if (builder->start_decoding_at <= lsn)
-			builder->start_decoding_at = lsn + 1;
+		Assert(XLogRecPtrIsInvalid(builder->start_decoding_at)	||
+			   builder->start_decoding_at > lsn);
 		return;
 	}
 
 	if (builder->state < SNAPBUILD_CONSISTENT)
 	{
 		/* ensure that only commits after this are getting replayed */
-		if (builder->start_decoding_at <= lsn)
-			builder->start_decoding_at = lsn + 1;
+		Assert(XLogRecPtrIsInvalid(builder->start_decoding_at)	||
+			   builder->start_decoding_at > lsn);
 
 		/*
 		 * If building an exportable snapshot, force xid to be tracked, even
@@ -966,6 +1002,16 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
 		}
 	}
 
+	if (!XLogRecPtrIsInvalid(builder->start_decoding_at) &&
+		builder->start_decoding_at <= lsn)
+	{
+		/*
+		 * We are going to stream this xact, so must already have fine
+		 * snapshot.
+		 */
+		Assert(builder->state == SNAPBUILD_CONSISTENT);
+	}
+
 	for (nxact = 0; nxact < nsubxacts; nxact++)
 	{
 		TransactionId subxid = subxacts[nxact];
@@ -1250,10 +1296,10 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 	 */
 	if (running->oldestRunningXid == running->nextXid)
 	{
-		if (builder->start_decoding_at == InvalidXLogRecPtr ||
-			builder->start_decoding_at <= lsn)
+		if (XLogRecPtrIsInvalid(builder->start_decoding_at))
 			/* can decode everything after this */
 			builder->start_decoding_at = lsn + 1;
+		Assert(builder->start_decoding_at >= lsn);
 
 		/* As no transactions were running xmin/xmax can be trivially set. */
 		builder->xmin = running->nextXid;	/* < are finished */
@@ -1275,9 +1321,9 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 	}
 	/* b) valid on disk state and not building full snapshot */
 	else if (!builder->building_full_snapshot &&
-			 SnapBuildRestore(builder, lsn))
+			 SnapBuildRestore(builder, lsn, running->nextXid))
 	{
-		/* there won't be any state to cleanup */
+		/* there won't be much state to cleanup */
 		return false;
 	}
 
@@ -1358,6 +1404,10 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
 	{
 		builder->state = SNAPBUILD_CONSISTENT;
 		SnapBuildStartNextPhaseAt(builder, InvalidTransactionId);
+		if (XLogRecPtrIsInvalid(builder->start_decoding_at))
+			/* can decode everything after this */
+			builder->start_decoding_at = lsn + 1;
+		Assert(builder->start_decoding_at >= lsn);
 
 		ereport(LOG,
 				(errmsg("logical decoding found consistent point at %X/%X",
@@ -1471,8 +1521,8 @@ typedef struct SnapBuildOnDisk
 void
 SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn)
 {
-	if (builder->state < SNAPBUILD_CONSISTENT)
-		SnapBuildRestore(builder, lsn);
+	if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
+		SnapBuildRestore(builder, lsn, InvalidTransactionId);
 	else
 		SnapBuildSerialize(builder, lsn);
 }
@@ -1499,10 +1549,12 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
 		   builder->last_serialized_snapshot <= lsn);
 
 	/*
-	 * no point in serializing if we cannot continue to work immediately after
-	 * restoring the snapshot
+	 * No point in serializing if the snapshot is not complete.
+	 * However, FULL snapshot is just as good as CONSISTENT; difference
+	 * between these states is not snapshot property, but whether we have
+	 * filled reorderbuffer with all currently running xacts.
 	 */
-	if (builder->state < SNAPBUILD_CONSISTENT)
+	if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
 		return;
 
 	/*
@@ -1688,10 +1740,15 @@ out:
 
 /*
  * Restore a snapshot into 'builder' if previously one has been stored at the
- * location indicated by 'lsn'. Returns true if successful, false otherwise.
+ * location indicated by 'lsn'.
+ * nextXid is first not yet running xid as of this lsn or InvalidTransactionId;
+ *   unless external code knows where it is safe to start streaming, we can
+ *   use serialized snapshot only if we are aware which xids finish we must
+ *   wait to be able to stream all further commits.
+ *  Returns true if successful, false otherwise.
  */
 static bool
-SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
+SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn, TransactionId nextXid)
 {
 	SnapBuildOnDisk ondisk;
 	int			fd;
@@ -1701,7 +1758,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
 	pg_crc32c	checksum;
 
 	/* no point in loading a snapshot if we're already there */
-	if (builder->state == SNAPBUILD_CONSISTENT)
+	if (builder->state >= SNAPBUILD_FULL_SNAPSHOT)
 		return false;
 
 	sprintf(path, "pg_logical/snapshots/%X-%X.snap",
@@ -1884,11 +1941,47 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
 	if (TransactionIdPrecedes(ondisk.builder.xmin, builder->initial_xmin_horizon))
 		goto snapshot_not_interesting;
 
+	/*
+	 * Don't use snapshot if external code doesn't know where it is safe to
+	 * start streaming and we have no idea which xids to wait for.
+	 */
+	if (XLogRecPtrIsInvalid(builder->start_decoding_at) &&
+		!TransactionIdIsValid(nextXid))
+		goto snapshot_not_interesting;
 
 	/* ok, we think the snapshot is sensible, copy over everything important */
 	builder->xmin = ondisk.builder.xmin;
 	builder->xmax = ondisk.builder.xmax;
-	builder->state = ondisk.builder.state;
+	Assert(ondisk.builder.state >= SNAPBUILD_FULL_SNAPSHOT);
+
+	if (XLogRecPtrIsInvalid(builder->start_decoding_at))
+	{
+		/*
+		 * Snapshot is fine, now we need to wait till we see all further
+		 * commits since the xact's first record.
+		 */
+		builder->state = SNAPBUILD_FULL_SNAPSHOT;
+		SnapBuildStartNextPhaseAt(builder, nextXid);
+	}
+	else
+	{
+		/*
+		 * If external code (c.f. AllocateSnapshotBuilder) knows we would pick
+		 * up all xacts in full before start_decoding_at, just go directly
+		 * into CONSISTENT. Though we probably can't stream right now (as we
+		 * haven't seen beginnings of some xacts), no xact will be streamed
+		 * before start_decoding_at, and we can't be sure to switch into
+		 * CONSISTENT later in time anyway.
+		 */
+		builder->state = SNAPBUILD_CONSISTENT;
+		SnapBuildStartNextPhaseAt(builder, InvalidTransactionId);
+
+		ereport(LOG,
+				(errmsg("logical decoding found consistent point at %X/%X",
+						(uint32) (lsn >> 32), (uint32) lsn),
+				 errdetail("Logical decoding will begin using saved snapshot.")));
+	}
+
 
 	builder->committed.xcnt = ondisk.builder.committed.xcnt;
 	/* We only allocated/stored xcnt, not xcnt_space xids ! */
@@ -1911,12 +2004,6 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
 
 	ReorderBufferSetRestartPoint(builder->reorder, lsn);
 
-	Assert(builder->state == SNAPBUILD_CONSISTENT);
-
-	ereport(LOG,
-			(errmsg("logical decoding found consistent point at %X/%X",
-					(uint32) (lsn >> 32), (uint32) lsn),
-			 errdetail("Logical decoding will begin using saved snapshot.")));
 	return true;
 
 snapshot_not_interesting:
-- 
2.11.0

