From dd9b53878faeedba921ea7027e98ddbb433e8647 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Tue, 2 Feb 2021 14:03:43 -0300
Subject: [PATCH v7 1/3] Change XLogCtl->LogwrtResult to use atomic ops
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Currently, access to LogwrtResult is protected by a spinlock.  This
becomes severely contended in some scenarios, such as with a largish
replication flock: walsenders all calling GetFlushRecPtr repeatedly
cause the processor heat up to the point where eggs can be fried on top.

This can be reduced to a non-problem by replacing XLogCtl->LogwrtResult
with a struct containing a pair of atomically accessed variables.

In addition, this commit splits the process-local copy of these values
(kept in the freestanding LogwrtResult struct) into two separate
LogWriteResult and LogFlushResult.  This is not strictly necessary, but
it makes it clearer that these are updated separately, each on their own
schedule.

Author: Álvaro Herrera <alvherre@alvh.no-ip.org>
Discussion: https://postgr.es/m/20200831182156.GA3983@alvherre.pgsql
---
 src/backend/access/transam/xlog.c | 85 +++++++++++++++----------------
 src/include/port/atomics.h        | 29 +++++++++++
 src/tools/pgindent/typedefs.list  |  1 +
 3 files changed, 72 insertions(+), 43 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 4ac3871c74..6f2eb494fe 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -285,16 +285,13 @@ static bool doPageWrites;
  *
  * LogwrtRqst indicates a byte position that we need to write and/or fsync
  * the log up to (all records before that point must be written or fsynced).
- * LogwrtResult indicates the byte positions we have already written/fsynced.
- * These structs are identical but are declared separately to indicate their
- * slightly different functions.
+ * LogWrtResult indicates the byte positions we have already written/fsynced.
+ * These structs are similar but are declared separately to indicate their
+ * slightly different functions; in addition, the latter is read and written
+ * using atomic operations.
  *
- * To read XLogCtl->LogwrtResult, you must hold either info_lck or
- * WALWriteLock.  To update it, you need to hold both locks.  The point of
- * this arrangement is that the value can be examined by code that already
- * holds WALWriteLock without needing to grab info_lck as well.  In addition
- * to the shared variable, each backend has a private copy of LogwrtResult,
- * which is updated when convenient.
+ * In addition to the shared variable, each backend has a private copy of
+ * LogwrtResult, each member of which is separately updated when convenient.
  *
  * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
  * (protected by info_lck), but we don't need to cache any copies of it.
@@ -317,6 +314,12 @@ static bool doPageWrites;
  *----------
  */
 
+typedef struct XLogwrtAtomic
+{
+	pg_atomic_uint64 Write;		/* last byte + 1 written out */
+	pg_atomic_uint64 Flush;		/* last byte + 1 flushed */
+} XLogwrtAtomic;
+
 typedef struct XLogwrtRqst
 {
 	XLogRecPtr	Write;			/* last byte + 1 to write out */
@@ -480,6 +483,7 @@ typedef struct XLogCtlData
 {
 	XLogCtlInsert Insert;
 
+	XLogwrtAtomic LogwrtResult; /* uses atomics */
 	/* Protected by info_lck: */
 	XLogwrtRqst LogwrtRqst;
 	XLogRecPtr	RedoRecPtr;		/* a recent copy of Insert->RedoRecPtr */
@@ -497,12 +501,6 @@ typedef struct XLogCtlData
 	pg_time_t	lastSegSwitchTime;
 	XLogRecPtr	lastSegSwitchLSN;
 
-	/*
-	 * Protected by info_lck and WALWriteLock (you must hold either lock to
-	 * read it, but both to update)
-	 */
-	XLogwrtResult LogwrtResult;
-
 	/*
 	 * Latest initialized page in the cache (last byte position + 1).
 	 *
@@ -622,7 +620,7 @@ static ControlFileData *ControlFile = NULL;
 static int	UsableBytesInSegment;
 
 /*
- * Private, possibly out-of-date copy of shared LogwrtResult.
+ * Private, possibly out-of-date copy of shared XLogCtl->LogwrtResult.
  * See discussion above.
  */
 static XLogwrtResult LogwrtResult = {0, 0};
@@ -932,8 +930,6 @@ XLogInsertRecord(XLogRecData *rdata,
 		/* advance global request to include new block(s) */
 		if (XLogCtl->LogwrtRqst.Write < EndPos)
 			XLogCtl->LogwrtRqst.Write = EndPos;
-		/* update local result copy while I have the chance */
-		LogwrtResult = XLogCtl->LogwrtResult;
 		SpinLockRelease(&XLogCtl->info_lck);
 	}
 
@@ -1811,6 +1807,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
 	 * Now that we have the lock, check if someone initialized the page
 	 * already.
 	 */
+	LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 	while (upto >= XLogCtl->InitializedUpTo || opportunistic)
 	{
 		nextidx = XLogRecPtrToBufIdx(XLogCtl->InitializedUpTo);
@@ -1830,17 +1827,17 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
 			if (opportunistic)
 				break;
 
-			/* Before waiting, get info_lck and update LogwrtResult */
+			/* Advance shared memory write request position */
 			SpinLockAcquire(&XLogCtl->info_lck);
 			if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr)
 				XLogCtl->LogwrtRqst.Write = OldPageRqstPtr;
-			LogwrtResult = XLogCtl->LogwrtResult;
 			SpinLockRelease(&XLogCtl->info_lck);
 
 			/*
-			 * Now that we have an up-to-date LogwrtResult value, see if we
-			 * still need to write it or if someone else already did.
+			 * Before waiting, update LogwrtResult.Write and see if we still need
+			 * to write it or if someone else already did.
 			 */
+			LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 			if (LogwrtResult.Write < OldPageRqstPtr)
 			{
 				/*
@@ -1855,7 +1852,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
 
 				LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
 
-				LogwrtResult = XLogCtl->LogwrtResult;
+				LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 				if (LogwrtResult.Write >= OldPageRqstPtr)
 				{
 					/* OK, someone wrote it already */
@@ -2101,7 +2098,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 	/*
 	 * Update local LogwrtResult (caller probably did this already, but...)
 	 */
-	LogwrtResult = XLogCtl->LogwrtResult;
+	LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 
 	/*
 	 * Since successive pages in the xlog cache are consecutively allocated,
@@ -2316,6 +2313,9 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 
 	Assert(npages == 0);
 
+	/* Publish current write result position */
+	pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Write, LogwrtResult.Write);
+
 	/*
 	 * If asked to flush, do so
 	 */
@@ -2352,16 +2352,16 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 		LogwrtResult.Flush = LogwrtResult.Write;
 	}
 
+	/* Publish current flush result position */
+	pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Flush, LogwrtResult.Flush);
+
 	/*
-	 * Update shared-memory status
-	 *
-	 * We make sure that the shared 'request' values do not fall behind the
+	 * Make sure that the shared 'request' values do not fall behind the
 	 * 'result' values.  This is not absolutely essential, but it saves some
 	 * code in a couple of places.
 	 */
 	{
 		SpinLockAcquire(&XLogCtl->info_lck);
-		XLogCtl->LogwrtResult = LogwrtResult;
 		if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write)
 			XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
 		if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush)
@@ -2381,8 +2381,8 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
 	XLogRecPtr	WriteRqstPtr = asyncXactLSN;
 	bool		sleeping;
 
+	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
 	sleeping = XLogCtl->WalWriterSleeping;
 	if (XLogCtl->asyncXactLSN < asyncXactLSN)
 		XLogCtl->asyncXactLSN = asyncXactLSN;
@@ -2587,10 +2587,10 @@ XLogFlush(XLogRecPtr record)
 		SpinLockAcquire(&XLogCtl->info_lck);
 		if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write)
 			WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
-		LogwrtResult = XLogCtl->LogwrtResult;
 		SpinLockRelease(&XLogCtl->info_lck);
 
 		/* done already? */
+		LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 		if (record <= LogwrtResult.Flush)
 			break;
 
@@ -2618,7 +2618,7 @@ XLogFlush(XLogRecPtr record)
 		}
 
 		/* Got the lock; recheck whether request is satisfied */
-		LogwrtResult = XLogCtl->LogwrtResult;
+		LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 		if (record <= LogwrtResult.Flush)
 		{
 			LWLockRelease(WALWriteLock);
@@ -2742,7 +2742,6 @@ XLogBackgroundFlush(void)
 
 	/* read LogwrtResult and update local state */
 	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
 	WriteRqst = XLogCtl->LogwrtRqst;
 	SpinLockRelease(&XLogCtl->info_lck);
 
@@ -2750,8 +2749,10 @@ XLogBackgroundFlush(void)
 	WriteRqst.Write -= WriteRqst.Write % XLOG_BLCKSZ;
 
 	/* if we have already flushed that far, consider async commit records */
+	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 	if (WriteRqst.Write <= LogwrtResult.Flush)
 	{
+		pg_memory_barrier();
 		SpinLockAcquire(&XLogCtl->info_lck);
 		WriteRqst.Write = XLogCtl->asyncXactLSN;
 		SpinLockRelease(&XLogCtl->info_lck);
@@ -2767,6 +2768,7 @@ XLogBackgroundFlush(void)
 	{
 		if (openLogFile >= 0)
 		{
+			LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 			if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo,
 								 wal_segment_size))
 			{
@@ -2826,7 +2828,8 @@ XLogBackgroundFlush(void)
 	/* now wait for any in-progress insertions to finish and get write lock */
 	WaitXLogInsertionsToFinish(WriteRqst.Write);
 	LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
-	LogwrtResult = XLogCtl->LogwrtResult;
+	LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
+	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 	if (WriteRqst.Write > LogwrtResult.Write ||
 		WriteRqst.Flush > LogwrtResult.Flush)
 	{
@@ -2914,9 +2917,7 @@ XLogNeedsFlush(XLogRecPtr record)
 		return false;
 
 	/* read LogwrtResult and update local state */
-	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
-	SpinLockRelease(&XLogCtl->info_lck);
+	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 
 	/* check again */
 	if (record <= LogwrtResult.Flush)
@@ -5489,7 +5490,9 @@ StartupXLOG(void)
 
 	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
 
-	XLogCtl->LogwrtResult = LogwrtResult;
+	/* XXX OK to write without WALWriteLock? */
+	pg_atomic_write_u64(&XLogCtl->LogwrtResult.Write, EndOfLog);
+	pg_atomic_write_u64(&XLogCtl->LogwrtResult.Flush, EndOfLog);
 
 	XLogCtl->LogwrtRqst.Write = EndOfLog;
 	XLogCtl->LogwrtRqst.Flush = EndOfLog;
@@ -5925,9 +5928,7 @@ GetFlushRecPtr(TimeLineID *insertTLI)
 {
 	Assert(XLogCtl->SharedRecoveryState == RECOVERY_STATE_DONE);
 
-	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
-	SpinLockRelease(&XLogCtl->info_lck);
+	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 
 	/*
 	 * If we're writing and flushing WAL, the time line can't be changing, so
@@ -9083,9 +9084,7 @@ GetXLogInsertRecPtr(void)
 XLogRecPtr
 GetXLogWriteRecPtr(void)
 {
-	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
-	SpinLockRelease(&XLogCtl->info_lck);
+	LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 
 	return LogwrtResult.Write;
 }
diff --git a/src/include/port/atomics.h b/src/include/port/atomics.h
index 9550e04aaa..7abed1785a 100644
--- a/src/include/port/atomics.h
+++ b/src/include/port/atomics.h
@@ -519,6 +519,35 @@ pg_atomic_sub_fetch_u64(volatile pg_atomic_uint64 *ptr, int64 sub_)
 	return pg_atomic_sub_fetch_u64_impl(ptr, sub_);
 }
 
+/*
+ * Monotonically advance the given variable using only atomic operations until
+ * it's at least the target value.
+ *
+ * Full barrier semantics (even when value is unchanged).
+ */
+static inline void
+pg_atomic_monotonic_advance_u64(volatile pg_atomic_uint64 *ptr, uint64 target_)
+{
+	uint64		currval;
+
+#ifndef PG_HAVE_ATOMIC_U64_SIMULATION
+	AssertPointerAlignment(ptr, 8);
+#endif
+
+	currval = pg_atomic_read_u64(ptr);
+	if (currval >= target_)
+	{
+		pg_memory_barrier();
+		return;
+	}
+
+	while (currval < target_)
+	{
+		if (pg_atomic_compare_exchange_u64(ptr, &currval, target_))
+			break;
+	}
+}
+
 #undef INSIDE_ATOMICS_H
 
 #endif							/* ATOMICS_H */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 93d5190508..5fed02537a 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2959,6 +2959,7 @@ XLogRecoveryCtlData
 XLogRedoAction
 XLogSegNo
 XLogSource
+XLogwrtAtomic
 XLogwrtResult
 XLogwrtRqst
 XPVIV
-- 
2.30.2

