From b2295700fb9e1d01b03bdebca3c71692941144c9 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Sat, 23 Apr 2016 19:18:00 -0700
Subject: [PATCH] Emit invalidations to standby for transactions without xid.
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

So far, when a transaction with pending invalidations, but without an
assigned xid, committed, we simply ignored those invalidation
messages. That's problematic, because those are actually sent for a
reason.

Known symptoms of this include that existing sessions on a hot-standby
replica sometimes fail to notice new concurrently built indexes and
visibility map updates.

The solution is to WAL log such invalidations in transactions without an
xid. We considered to alternatively force-assign an xid, but that'd be
problematic for vacuum, which might be run in systems with few xids.

Important: This adds a new WAL record, but as the patch has to be
back-patched, we can't bump the WAL page magic. This means that standbys
have to be updated before primaries; otherwise
"PANIC: standby_redo: unknown op code 32" errors can be encountered.

XXX:

Reported-By: Васильев Дмитрий, Masahiko Sawada
Discussion:
    CAB-SwXY6oH=9twBkXJtgR4UC1NqT-vpYAtxCseME62ADwyK5OA@mail.gmail.com
    CAD21AoDpZ6Xjg=gFrGPnSn4oTRRcwK1EBrWCq9OqOHuAcMMC=w@mail.gmail.com
---
 src/backend/access/rmgrdesc/standbydesc.c       | 54 +++++++++++++++++++++++++
 src/backend/access/rmgrdesc/xactdesc.c          | 30 ++------------
 src/backend/access/transam/xact.c               | 18 +++++++++
 src/backend/replication/logical/decode.c        |  9 +++++
 src/backend/replication/logical/reorderbuffer.c | 53 +++++++++++++++---------
 src/backend/storage/ipc/standby.c               | 35 ++++++++++++++++
 src/backend/utils/cache/inval.c                 |  5 ++-
 src/include/replication/reorderbuffer.h         |  2 +
 src/include/storage/standby.h                   |  2 +
 src/include/storage/standbydefs.h               | 21 ++++++++++
 10 files changed, 181 insertions(+), 48 deletions(-)

diff --git a/src/backend/access/rmgrdesc/standbydesc.c b/src/backend/access/rmgrdesc/standbydesc.c
index 4872cfb..e6172cc 100644
--- a/src/backend/access/rmgrdesc/standbydesc.c
+++ b/src/backend/access/rmgrdesc/standbydesc.c
@@ -58,6 +58,14 @@ standby_desc(StringInfo buf, XLogReaderState *record)
 
 		standby_desc_running_xacts(buf, xlrec);
 	}
+	else if (info == XLOG_INVALIDATIONS)
+	{
+		xl_invalidations *xlrec = (xl_invalidations *) rec;
+
+		standby_desc_invalidations(buf, xlrec->nmsgs, xlrec->msgs,
+								   xlrec->dbId, xlrec->tsId,
+								   xlrec->relcacheInitFileInval);
+	}
 }
 
 const char *
@@ -73,7 +81,53 @@ standby_identify(uint8 info)
 		case XLOG_RUNNING_XACTS:
 			id = "RUNNING_XACTS";
 			break;
+		case XLOG_INVALIDATIONS:
+			id = "INVALIDATIONS";
+			break;
 	}
 
 	return id;
 }
+
+/*
+ * This routine is used by both standby_desc and xact_desc, because
+ * transaction commits and XLOG_INVALIDATIONS messages contain invalidations;
+ * it seems pointless to duplicate the code.
+ */
+void
+standby_desc_invalidations(StringInfo buf,
+						   int nmsgs, SharedInvalidationMessage *msgs,
+						   Oid dbId, Oid tsId,
+						   bool relcacheInitFileInval)
+{
+	int i;
+
+	if (relcacheInitFileInval)
+		appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
+						 dbId, tsId);
+
+	appendStringInfoString(buf, "; inval msgs:");
+	for (i = 0; i < nmsgs; i++)
+	{
+		SharedInvalidationMessage *msg = &msgs[i];
+
+		if (msg->id >= 0)
+			appendStringInfo(buf, " catcache %d", msg->id);
+		else if (msg->id == SHAREDINVALCATALOG_ID)
+			appendStringInfo(buf, " catalog %u", msg->cat.catId);
+		else if (msg->id == SHAREDINVALRELCACHE_ID)
+			appendStringInfo(buf, " relcache %u", msg->rc.relId);
+		/* not expected, but print something anyway */
+		else if (msg->id == SHAREDINVALSMGR_ID)
+			appendStringInfoString(buf, " smgr");
+		/* not expected, but print something anyway */
+		else if (msg->id == SHAREDINVALRELMAP_ID)
+			appendStringInfoString(buf, " relmap");
+		else if (msg->id == SHAREDINVALRELMAP_ID)
+			appendStringInfo(buf, " relmap db %u", msg->rm.dbId);
+		else if (msg->id == SHAREDINVALSNAPSHOT_ID)
+			appendStringInfo(buf, " snapshot %u", msg->sn.relId);
+		else
+			appendStringInfo(buf, " unknown id %d", msg->id);
+	}
+}
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index e8a334c..6f07c5c 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -18,6 +18,7 @@
 #include "access/xact.h"
 #include "catalog/catalog.h"
 #include "storage/sinval.h"
+#include "storage/standbydefs.h"
 #include "utils/timestamp.h"
 
 /*
@@ -203,32 +204,9 @@ xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId
 	}
 	if (parsed.nmsgs > 0)
 	{
-		if (XactCompletionRelcacheInitFileInval(parsed.xinfo))
-			appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
-							 parsed.dbId, parsed.tsId);
-
-		appendStringInfoString(buf, "; inval msgs:");
-		for (i = 0; i < parsed.nmsgs; i++)
-		{
-			SharedInvalidationMessage *msg = &parsed.msgs[i];
-
-			if (msg->id >= 0)
-				appendStringInfo(buf, " catcache %d", msg->id);
-			else if (msg->id == SHAREDINVALCATALOG_ID)
-				appendStringInfo(buf, " catalog %u", msg->cat.catId);
-			else if (msg->id == SHAREDINVALRELCACHE_ID)
-				appendStringInfo(buf, " relcache %u", msg->rc.relId);
-			/* not expected, but print something anyway */
-			else if (msg->id == SHAREDINVALSMGR_ID)
-				appendStringInfoString(buf, " smgr");
-			/* not expected, but print something anyway */
-			else if (msg->id == SHAREDINVALRELMAP_ID)
-				appendStringInfoString(buf, " relmap");
-			else if (msg->id == SHAREDINVALSNAPSHOT_ID)
-				appendStringInfo(buf, " snapshot %u", msg->sn.relId);
-			else
-				appendStringInfo(buf, " unknown id %d", msg->id);
-		}
+		standby_desc_invalidations(
+			buf, parsed.nmsgs, parsed.msgs, parsed.dbId, parsed.tsId,
+			XactCompletionRelcacheInitFileInval(parsed.xinfo));
 	}
 
 	if (XactCompletionForceSyncCommit(parsed.xinfo))
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 7e37331..95690ff 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1164,6 +1164,24 @@ RecordTransactionCommit(void)
 		Assert(nchildren == 0);
 
 		/*
+		 * Transactions without an assigned xid can contain invalidation
+		 * messages (e.g. explicit relcache invalidations or catcache
+		 * invalidations for inplace updates); standbys need to process
+		 * those. We can't emit a commit record without an xid, and we don't
+		 * want to force assigning an xid, because that'd be problematic for
+		 * e.g. vacuum.  Hence we emit a bespoke record for the
+		 * invalidations. We don't want to use that in case a commit record is
+		 * emitted, so they happen synchronously with commits (besides not
+		 * wanting to emit more WAL recoreds).
+		 */
+		if (nmsgs != 0)
+		{
+			LogStandbyInvalidations(nmsgs, invalMessages,
+									RelcacheInitFileInval);
+			wrote_xlog = true; /* not strictly necessary */
+		}
+
+		/*
 		 * If we didn't create XLOG entries, we're done here; otherwise we
 		 * should trigger flushing those entries the same as a commit record
 		 * would.  This will primarily happen for HOT pruning and the like; we
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 0cdb0b8..0c248f0 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -327,6 +327,15 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			break;
 		case XLOG_STANDBY_LOCK:
 			break;
+		case XLOG_INVALIDATIONS:
+			{
+				xl_invalidations *invalidations =
+					(xl_invalidations *) XLogRecGetData(r);
+
+				ReorderBufferImmediateInvalidation(
+					ctx->reorder, invalidations->nmsgs, invalidations->msgs);
+			}
+			break;
 		default:
 			elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
 	}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 4520708..57821c3 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1810,26 +1810,8 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 	 * catalog and we need to update the caches according to that.
 	 */
 	if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
-	{
-		bool		use_subtxn = IsTransactionOrTransactionBlock();
-
-		if (use_subtxn)
-			BeginInternalSubTransaction("replay");
-
-		/*
-		 * Force invalidations to happen outside of a valid transaction - that
-		 * way entries will just be marked as invalid without accessing the
-		 * catalog. That's advantageous because we don't need to setup the
-		 * full state necessary for catalog access.
-		 */
-		if (use_subtxn)
-			AbortCurrentTransaction();
-
-		ReorderBufferExecuteInvalidations(rb, txn);
-
-		if (use_subtxn)
-			RollbackAndReleaseCurrentSubTransaction();
-	}
+		ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
+										   txn->invalidations);
 	else
 		Assert(txn->ninvalidations == 0);
 
@@ -1837,6 +1819,37 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 	ReorderBufferCleanupTXN(rb, txn);
 }
 
+/*
+ * Execute invalidations happening outside the context of a decoded
+ * transaction. That currently happens either for xid-less commits
+ * (c.f. RecordTransactionCommit()) or for invalidations in uninteresting
+ * transactions (via ReorderBufferForget()).
+ */
+void
+ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations,
+								   SharedInvalidationMessage *invalidations)
+{
+	bool		use_subtxn = IsTransactionOrTransactionBlock();
+	int			i;
+
+	if (use_subtxn)
+		BeginInternalSubTransaction("replay");
+
+	/*
+	 * Force invalidations to happen outside of a valid transaction - that
+	 * way entries will just be marked as invalid without accessing the
+	 * catalog. That's advantageous because we don't need to setup the
+	 * full state necessary for catalog access.
+	 */
+	if (use_subtxn)
+		AbortCurrentTransaction();
+
+	for (i = 0; i < ninvalidations; i++)
+		LocalExecuteInvalidationMessage(&invalidations[i]);
+
+	if (use_subtxn)
+		RollbackAndReleaseCurrentSubTransaction();
+}
 
 /*
  * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 6a9bf84..762dfa6 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -825,6 +825,16 @@ standby_redo(XLogReaderState *record)
 
 		ProcArrayApplyRecoveryInfo(&running);
 	}
+	else if (info == XLOG_INVALIDATIONS)
+	{
+		xl_invalidations *xlrec = (xl_invalidations *) XLogRecGetData(record);
+
+		ProcessCommittedInvalidationMessages(xlrec->msgs,
+											 xlrec->nmsgs,
+											 xlrec->relcacheInitFileInval,
+											 xlrec->dbId,
+											 xlrec->tsId);
+	}
 	else
 		elog(PANIC, "standby_redo: unknown op code %u", info);
 }
@@ -1068,3 +1078,28 @@ LogAccessExclusiveLockPrepare(void)
 	 */
 	(void) GetTopTransactionId();
 }
+
+/*
+ * Emit WAL for invalidations. This currently is only used for commits without
+ * an xid but which contain invalidations.
+ */
+void
+LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs,
+						bool relcacheInitFileInval)
+{
+	xl_invalidations xlrec;
+
+	/* prepare record */
+	memset(&xlrec, 0, sizeof(xlrec));
+	xlrec.dbId = MyDatabaseId;
+	xlrec.tsId = MyDatabaseTableSpace;
+	xlrec.relcacheInitFileInval = relcacheInitFileInval;
+	xlrec.nmsgs = nmsgs;
+
+	/* perform insertion */
+	XLogBeginInsert();
+	XLogRegisterData((char *) (&xlrec), MinSizeOfInvalidations);
+	XLogRegisterData((char *) msgs,
+					 nmsgs * sizeof(SharedInvalidationMessage));
+	XLogInsert(RM_STANDBY_ID, XLOG_INVALIDATIONS);
+}
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 924bebb..5803518 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -842,8 +842,9 @@ xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs,
 }
 
 /*
- * ProcessCommittedInvalidationMessages is executed by xact_redo_commit()
- * to process invalidation messages added to commit records.
+ * ProcessCommittedInvalidationMessages is executed by xact_redo_commit() or
+ * standby_redo() to process invalidation messages. Currently that happens
+ * only at end-of-xact.
  *
  * Relcache init file invalidation requires processing both
  * before and after we send the SI messages. See AtEOXact_Inval()
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 4c54953..e070894 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -391,6 +391,8 @@ void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn
 						 CommandId cmin, CommandId cmax, CommandId combocid);
 void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
 							  Size nmsgs, SharedInvalidationMessage *msgs);
+void ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations,
+										SharedInvalidationMessage *invalidations);
 void		ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
 void		ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
 bool		ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index aafc9b8..5205884 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -85,5 +85,7 @@ extern void LogAccessExclusiveLock(Oid dbOid, Oid relOid);
 extern void LogAccessExclusiveLockPrepare(void);
 
 extern XLogRecPtr LogStandbySnapshot(void);
+extern void LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs,
+									bool relcacheInitFileInval);
 
 #endif   /* STANDBY_H */
diff --git a/src/include/storage/standbydefs.h b/src/include/storage/standbydefs.h
index 609d06e..bd3c97f 100644
--- a/src/include/storage/standbydefs.h
+++ b/src/include/storage/standbydefs.h
@@ -17,17 +17,23 @@
 #include "access/xlogreader.h"
 #include "lib/stringinfo.h"
 #include "storage/lockdefs.h"
+#include "storage/sinval.h"
 
 /* Recovery handlers for the Standby Rmgr (RM_STANDBY_ID) */
 extern void standby_redo(XLogReaderState *record);
 extern void standby_desc(StringInfo buf, XLogReaderState *record);
 extern const char *standby_identify(uint8 info);
+extern void standby_desc_invalidations(StringInfo buf,
+									   int nmsgs, SharedInvalidationMessage *msgs,
+									   Oid dbId, Oid tsId,
+									   bool relcacheInitFileInval);
 
 /*
  * XLOG message types
  */
 #define XLOG_STANDBY_LOCK			0x00
 #define XLOG_RUNNING_XACTS			0x10
+#define XLOG_INVALIDATIONS			0x20
 
 typedef struct xl_standby_locks
 {
@@ -50,4 +56,19 @@ typedef struct xl_running_xacts
 	TransactionId xids[FLEXIBLE_ARRAY_MEMBER];
 } xl_running_xacts;
 
+/*
+ * Invalidations for standby, currently only when transactions without an
+ * assigned xid commit.
+ */
+typedef struct xl_invalidations
+{
+	Oid			dbId;			/* MyDatabaseId */
+	Oid			tsId;			/* MyDatabaseTableSpace */
+	bool		relcacheInitFileInval;	/* invalidate relcache init file */
+	int			nmsgs;			/* number of shared inval msgs */
+	SharedInvalidationMessage msgs[FLEXIBLE_ARRAY_MEMBER];
+} xl_invalidations;
+
+#define MinSizeOfInvalidations offsetof(xl_invalidations, msgs)
+
 #endif   /* STANDBYDEFS_H */
-- 
2.7.0.229.g701fa7f

