From ce69d2824f89f675a25e098b3980a304baca7ade Mon Sep 17 00:00:00 2001
From: Craig Ringer <craig@2ndquadrant.com>
Date: Mon, 23 Jan 2017 13:25:30 +0800
Subject: [PATCH 1/2] Fix race between clog truncation and lookup

There was previously no way to look up an arbitrary xid without
running the risk of having clog truncated out from under you.

This hasn 't been a problem because anything looking up xids in clog knows
they're protected by datminxid, but that's not the case for arbitrary
user-supplied XIDs. clog was truncated before we advanced oldestXid under
XidGenLock, so holding XidGenLock during a clog lookup was insufficient to
prevent the race. There's no way to look up a SLRU with soft-failure;
attempting a lookup produces an I/O error. There's also no safe way to trap and
swallow the SLRU lookup error due mainly to locking issues.

To address this, increase oldestXid under XidGenLock before we trunate clog
rather than after, so concurrent lookups of arbitrary XIDs are safe if they are
done under XidGenLock. The rest of the xid limits are still advanced after clog
truncation to ensure there's no chance of a new xid trying to access an
about-to-be-truncated clog page. In practice this can't happen anyway since we
use only half the xid space at any time, but additional guards against future
change are warranted with something this crucial.

This race also exists in a worse form on standby servers. On a standby we only
advance oldestXid when we replay the next checkpoint, so there's a much larger
window between clog truncation and subsequent updating of the limit. Fix this
by recording the oldest xid in clog truncation records and applying the
oldestXid under XidGenLock before replaying the clog truncation.

Note that There's no need to take XidGenLock for normal clog lookups protected
by datfrozenxid, only if accepting arbitrary XIDs that might not be protected by
vacuum thresholds.
---
 src/backend/access/rmgrdesc/clogdesc.c | 12 ++++++--
 src/backend/access/transam/clog.c      | 50 +++++++++++++++++++++++++++-------
 src/backend/access/transam/varsup.c    | 49 ++++++++++++++++++++++++++-------
 src/backend/access/transam/xlog.c      | 22 ++++++++++-----
 src/backend/commands/vacuum.c          |  4 +--
 src/include/access/clog.h              |  8 +++++-
 src/include/access/transam.h           |  4 +--
 7 files changed, 115 insertions(+), 34 deletions(-)

diff --git a/src/backend/access/rmgrdesc/clogdesc.c b/src/backend/access/rmgrdesc/clogdesc.c
index 352de48..ef268c5 100644
--- a/src/backend/access/rmgrdesc/clogdesc.c
+++ b/src/backend/access/rmgrdesc/clogdesc.c
@@ -23,12 +23,20 @@ clog_desc(StringInfo buf, XLogReaderState *record)
 	char	   *rec = XLogRecGetData(record);
 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
 
-	if (info == CLOG_ZEROPAGE || info == CLOG_TRUNCATE)
+	if (info == CLOG_ZEROPAGE)
 	{
 		int			pageno;
 
 		memcpy(&pageno, rec, sizeof(int));
-		appendStringInfo(buf, "%d", pageno);
+		appendStringInfo(buf, "page %d", pageno);
+	}
+	else if (info == CLOG_TRUNCATE)
+	{
+		xl_clog_truncate xlrec;
+
+		memcpy(&xlrec, rec, sizeof(xl_clog_truncate));
+		appendStringInfo(buf, "page %d; oldestXact %u",
+			xlrec.pageno, xlrec.oldestXact);
 	}
 }
 
diff --git a/src/backend/access/transam/clog.c b/src/backend/access/transam/clog.c
index 1a43819..cfd1c91 100644
--- a/src/backend/access/transam/clog.c
+++ b/src/backend/access/transam/clog.c
@@ -83,7 +83,8 @@ static SlruCtlData ClogCtlData;
 static int	ZeroCLOGPage(int pageno, bool writeXlog);
 static bool CLOGPagePrecedes(int page1, int page2);
 static void WriteZeroPageXlogRec(int pageno);
-static void WriteTruncateXlogRec(int pageno);
+static void WriteTruncateXlogRec(int pageno, TransactionId oldestXact,
+								 Oid oldestXidDb);
 static void TransactionIdSetPageStatus(TransactionId xid, int nsubxids,
 						   TransactionId *subxids, XidStatus status,
 						   XLogRecPtr lsn, int pageno);
@@ -640,11 +641,24 @@ ExtendCLOG(TransactionId newestXact)
  * the XLOG flush unless we have confirmed that there is a removable segment.
  */
 void
-TruncateCLOG(TransactionId oldestXact)
+TruncateCLOG(TransactionId oldestXact, Oid oldestxid_datoid)
 {
 	int			cutoffPage;
 
 	/*
+	 * Advance oldestXid before truncating clog, so concurrent xact status
+	 * lookups can ensure they don't attempt to access truncated-away clog.
+	 *
+	 * We must do this even if we find we can't actually truncate away any clog
+	 * pages, since we'll advance the xid limits and need oldestXmin to be
+	 * consistent with the new limits.
+	 *
+	 * Losing this on crash before a checkpoint is harmless unless we truncated
+	 * clog, in which case redo of the clog truncation will re-apply it.
+	 */
+	AdvanceOldestXid(oldestXact, oldestxid_datoid);
+
+	/*
 	 * The cutoff point is the start of the segment containing oldestXact. We
 	 * pass the *page* containing oldestXact to SimpleLruTruncate.
 	 */
@@ -654,8 +668,17 @@ TruncateCLOG(TransactionId oldestXact)
 	if (!SlruScanDirectory(ClogCtl, SlruScanDirCbReportPresence, &cutoffPage))
 		return;					/* nothing to remove */
 
-	/* Write XLOG record and flush XLOG to disk */
-	WriteTruncateXlogRec(cutoffPage);
+	/* vac_truncate_clog already advanced oldestXid */
+	Assert(TransactionIdPrecedesOrEquals(oldestXact,
+		   ShmemVariableCache->oldestXid));
+
+	/*
+	 * Write XLOG record and flush XLOG to disk. We record the oldest xid we're
+	 * keeping information about here so we can ensure that it's always ahead
+	 * of clog truncation in case we crash, and so a standby finds out the new
+	 * valid xid before the next checkpoint.
+	 */
+	WriteTruncateXlogRec(cutoffPage, oldestXact, oldestxid_datoid);
 
 	/* Now we can remove the old CLOG segment(s) */
 	SimpleLruTruncate(ClogCtl, cutoffPage);
@@ -704,12 +727,17 @@ WriteZeroPageXlogRec(int pageno)
  * in TruncateCLOG().
  */
 static void
-WriteTruncateXlogRec(int pageno)
+WriteTruncateXlogRec(int pageno, TransactionId oldestXact, Oid oldestXactDb)
 {
 	XLogRecPtr	recptr;
+	xl_clog_truncate xlrec;
+
+	xlrec.pageno = pageno;
+	xlrec.oldestXact = oldestXact;
+	xlrec.oldestXactDb = oldestXactDb;
 
 	XLogBeginInsert();
-	XLogRegisterData((char *) (&pageno), sizeof(int));
+	XLogRegisterData((char *) (&xlrec), sizeof(xl_clog_truncate));
 	recptr = XLogInsert(RM_CLOG_ID, CLOG_TRUNCATE);
 	XLogFlush(recptr);
 }
@@ -742,17 +770,19 @@ clog_redo(XLogReaderState *record)
 	}
 	else if (info == CLOG_TRUNCATE)
 	{
-		int			pageno;
+		xl_clog_truncate xlrec;
 
-		memcpy(&pageno, XLogRecGetData(record), sizeof(int));
+		memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_clog_truncate));
 
 		/*
 		 * During XLOG replay, latest_page_number isn't set up yet; insert a
 		 * suitable value to bypass the sanity test in SimpleLruTruncate.
 		 */
-		ClogCtl->shared->latest_page_number = pageno;
+		ClogCtl->shared->latest_page_number = xlrec.pageno;
+
+		AdvanceOldestXid(xlrec.oldestXact, xlrec.oldestXactDb);
 
-		SimpleLruTruncate(ClogCtl, pageno);
+		SimpleLruTruncate(ClogCtl, xlrec.pageno);
 	}
 	else
 		elog(PANIC, "clog_redo: unknown op code %u", info);
diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index fc084c5..9aacb0f 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -259,19 +259,52 @@ ReadNewTransactionId(void)
 }
 
 /*
- * Determine the last safe XID to allocate given the currently oldest
+ * Advance the cluster-wide oldestXid.
+ *
+ * We must ensure that this never goes backwards, otherwise the xid limits set
+ * in SetTransactionIdLimit(...) could be insufficiently conservative if two
+ * vacuums race, with the lower oldestXmin winning then the higher xid limits
+ * winning.
+ */
+void
+AdvanceOldestXid(TransactionId oldest_datfrozenxid, Oid oldest_datoid)
+{
+	LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
+	if (TransactionIdPrecedes(ShmemVariableCache->oldestXid,
+		oldest_datfrozenxid))
+	{
+		ShmemVariableCache->oldestXid = oldest_datfrozenxid;
+		ShmemVariableCache->oldestXidDB = oldest_datoid;
+	}
+	LWLockRelease(XidGenLock);
+}
+
+/*
+ * Determine the last safe XID to allocate using the currently oldest
  * datfrozenxid (ie, the oldest XID that might exist in any database
  * of our cluster), and the OID of the (or a) database with that value.
  */
 void
-SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid)
+SetTransactionIdLimit(void)
 {
 	TransactionId xidVacLimit;
 	TransactionId xidWarnLimit;
 	TransactionId xidStopLimit;
 	TransactionId xidWrapLimit;
 	TransactionId curXid;
+	Oid			  oldest_datoid;
+	TransactionId oldest_datfrozenxid;
+
+	LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
 
+	/*
+	 * We fetch oldest_datfrozenxid and oldest_datoid from shmem to ensure that
+	 * if some other vacuum has advanced oldestXid since we did, we use the
+	 * latest values and the limits never go backwards.
+	 */
+	oldest_datfrozenxid = ShmemVariableCache->oldestXid;
+	oldest_datoid = ShmemVariableCache->oldestXidDB;
+	
 	Assert(TransactionIdIsNormal(oldest_datfrozenxid));
 
 	/*
@@ -284,6 +317,7 @@ SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid)
 	xidWrapLimit = oldest_datfrozenxid + (MaxTransactionId >> 1);
 	if (xidWrapLimit < FirstNormalTransactionId)
 		xidWrapLimit += FirstNormalTransactionId;
+	ShmemVariableCache->xidWrapLimit = xidWrapLimit;
 
 	/*
 	 * We'll refuse to continue assigning XIDs in interactive mode once we get
@@ -296,6 +330,7 @@ SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid)
 	xidStopLimit = xidWrapLimit - 1000000;
 	if (xidStopLimit < FirstNormalTransactionId)
 		xidStopLimit -= FirstNormalTransactionId;
+	ShmemVariableCache->xidStopLimit = xidStopLimit;
 
 	/*
 	 * We'll start complaining loudly when we get within 10M transactions of
@@ -310,6 +345,7 @@ SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid)
 	xidWarnLimit = xidStopLimit - 10000000;
 	if (xidWarnLimit < FirstNormalTransactionId)
 		xidWarnLimit -= FirstNormalTransactionId;
+	ShmemVariableCache->xidWarnLimit = xidWarnLimit;
 
 	/*
 	 * We'll start trying to force autovacuums when oldest_datfrozenxid gets
@@ -329,15 +365,8 @@ SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid)
 	xidVacLimit = oldest_datfrozenxid + autovacuum_freeze_max_age;
 	if (xidVacLimit < FirstNormalTransactionId)
 		xidVacLimit += FirstNormalTransactionId;
-
-	/* Grab lock for just long enough to set the new limit values */
-	LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
-	ShmemVariableCache->oldestXid = oldest_datfrozenxid;
 	ShmemVariableCache->xidVacLimit = xidVacLimit;
-	ShmemVariableCache->xidWarnLimit = xidWarnLimit;
-	ShmemVariableCache->xidStopLimit = xidStopLimit;
-	ShmemVariableCache->xidWrapLimit = xidWrapLimit;
-	ShmemVariableCache->oldestXidDB = oldest_datoid;
+
 	curXid = ShmemVariableCache->nextXid;
 	LWLockRelease(XidGenLock);
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 2f5d603..823f4bd 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -4877,7 +4877,8 @@ BootStrapXLOG(void)
 	ShmemVariableCache->nextOid = checkPoint.nextOid;
 	ShmemVariableCache->oidCount = 0;
 	MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
-	SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+	AdvanceOldestXid(checkPoint.oldestXid, checkPoint.oldestXidDB);
+	SetTransactionIdLimit();
 	SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB);
 	SetCommitTsLimit(InvalidTransactionId, InvalidTransactionId);
 
@@ -6471,7 +6472,8 @@ StartupXLOG(void)
 	ShmemVariableCache->nextOid = checkPoint.nextOid;
 	ShmemVariableCache->oidCount = 0;
 	MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
-	SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+	AdvanceOldestXid(checkPoint.oldestXid, checkPoint.oldestXidDB);
+	SetTransactionIdLimit();
 	SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB);
 	SetCommitTsLimit(checkPoint.oldestCommitTsXid,
 					 checkPoint.newestCommitTsXid);
@@ -9456,7 +9458,11 @@ xlog_redo(XLogReaderState *record)
 
 		MultiXactAdvanceOldest(checkPoint.oldestMulti,
 							   checkPoint.oldestMultiDB);
-		SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+		/*
+		 * No need to call AdvanceOldestXid, startup or an earlier clog trunate
+		 * record will have already advanced it. Just advance the limits.
+		 */
+		SetTransactionIdLimit();
 
 		/*
 		 * If we see a shutdown checkpoint while waiting for an end-of-backup
@@ -9553,10 +9559,12 @@ xlog_redo(XLogReaderState *record)
 		 */
 		MultiXactAdvanceOldest(checkPoint.oldestMulti,
 							   checkPoint.oldestMultiDB);
-		if (TransactionIdPrecedes(ShmemVariableCache->oldestXid,
-								  checkPoint.oldestXid))
-			SetTransactionIdLimit(checkPoint.oldestXid,
-								  checkPoint.oldestXidDB);
+		/*
+		 * We don't need to AdvanceOldestXid here; StartupXLOG or a clog
+		 * truncation record will ensure it's up to date, and we just update
+		 * the limits here based on what's already in shmem.
+		 */
+		SetTransactionIdLimit();
 		/* ControlFile->checkPointCopy always tracks the latest ckpt XID */
 		ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch;
 		ControlFile->checkPointCopy.nextXid = checkPoint.nextXid;
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 812fb4a..5ec307f 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -1164,7 +1164,7 @@ vac_truncate_clog(TransactionId frozenXID,
 	/*
 	 * Truncate CLOG, multixact and CommitTs to the oldest computed value.
 	 */
-	TruncateCLOG(frozenXID);
+	TruncateCLOG(frozenXID, oldestxid_datoid);
 	TruncateCommitTs(frozenXID);
 	TruncateMultiXact(minMulti, minmulti_datoid);
 
@@ -1174,7 +1174,7 @@ vac_truncate_clog(TransactionId frozenXID,
 	 * for an(other) autovac cycle if needed.   XXX should we avoid possibly
 	 * signalling twice?
 	 */
-	SetTransactionIdLimit(frozenXID, oldestxid_datoid);
+	SetTransactionIdLimit();
 	SetMultiXactIdLimit(minMulti, minmulti_datoid);
 }
 
diff --git a/src/include/access/clog.h b/src/include/access/clog.h
index 2894bd5..60a9e11 100644
--- a/src/include/access/clog.h
+++ b/src/include/access/clog.h
@@ -28,6 +28,12 @@ typedef int XidStatus;
 #define TRANSACTION_STATUS_ABORTED			0x02
 #define TRANSACTION_STATUS_SUB_COMMITTED	0x03
 
+typedef struct xl_clog_truncate
+{
+	int pageno;
+	TransactionId oldestXact;
+	Oid oldestXactDb;
+} xl_clog_truncate;
 
 extern void TransactionIdSetTreeStatus(TransactionId xid, int nsubxids,
 				   TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
@@ -42,7 +48,7 @@ extern void TrimCLOG(void);
 extern void ShutdownCLOG(void);
 extern void CheckPointCLOG(void);
 extern void ExtendCLOG(TransactionId newestXact);
-extern void TruncateCLOG(TransactionId oldestXact);
+extern void TruncateCLOG(TransactionId oldestXact, Oid oldestxid_datoid);
 
 /* XLOG stuff */
 #define CLOG_ZEROPAGE		0x00
diff --git a/src/include/access/transam.h b/src/include/access/transam.h
index 522c104..95cdef1 100644
--- a/src/include/access/transam.h
+++ b/src/include/access/transam.h
@@ -171,8 +171,8 @@ extern XLogRecPtr TransactionIdGetCommitLSN(TransactionId xid);
 /* in transam/varsup.c */
 extern TransactionId GetNewTransactionId(bool isSubXact);
 extern TransactionId ReadNewTransactionId(void);
-extern void SetTransactionIdLimit(TransactionId oldest_datfrozenxid,
-					  Oid oldest_datoid);
+extern void AdvanceOldestXid(TransactionId oldest_datfrozenxid, Oid oldest_datoid);
+extern void SetTransactionIdLimit(void);
 extern bool ForceTransactionIdLimitUpdate(void);
 extern Oid	GetNewObjectId(void);
 
-- 
2.5.5

