From 908f21617940264d165ec9f2216778043fc876f1 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Wed, 3 Apr 2024 11:48:27 +0200
Subject: [PATCH v15 2/3] Make XLogCtl->log{Write,Flush}Result accessible with
 atomics

This removes the need to hold both the info_lck spinlock and
WALWriteLock to update them; we now use atomic monotonic increment.
---
 src/backend/access/transam/xlog.c | 64 +++++++++++++++++--------------
 src/include/port/atomics.h        | 33 ++++++++++++++++
 2 files changed, 68 insertions(+), 29 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 6213d99561..e2cd0e8f52 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -292,12 +292,7 @@ static bool doPageWrites;
  * LogwrtRqst indicates a byte position that we need to write and/or fsync
  * the log up to (all records before that point must be written or fsynced).
  * The positions already written/fsynced are maintained in logWriteResult
- * and logFlushResult.
- *
- * To read XLogCtl->logWriteResult or ->logFlushResult, you must hold either
- * info_lck or WALWriteLock.  To update them, you need to hold both locks.
- * The point of this arrangement is that the value can be examined by code
- * that already holds WALWriteLock without needing to grab info_lck as well.
+ * and logFlushResult using atomic access.
  * In addition to the shared variable, each backend has a private copy of
  * both in LogwrtResult, which is updated when convenient.
  *
@@ -473,12 +468,9 @@ typedef struct XLogCtlData
 	pg_time_t	lastSegSwitchTime;
 	XLogRecPtr	lastSegSwitchLSN;
 
-	/*
-	 * Protected by info_lck and WALWriteLock (you must hold either lock to
-	 * read it, but both to update)
-	 */
-	XLogRecPtr	logWriteResult; /* last byte + 1 written out */
-	XLogRecPtr	logFlushResult; /* last byte + 1 flushed */
+	/* These are accessed using atomics -- info_lck not needed */
+	pg_atomic_uint64 logWriteResult;	/* last byte + 1 written out */
+	pg_atomic_uint64 logFlushResult;	/* last byte + 1 flushed */
 
 	/*
 	 * Latest initialized page in the cache (last byte position + 1).
@@ -619,8 +611,8 @@ static XLogwrtResult LogwrtResult = {0, 0};
  */
 #define XLogUpdateLocalLogwrtResult(_target) \
 	do { \
-		_target.Write = XLogCtl->logWriteResult; \
-		_target.Flush = XLogCtl->logFlushResult; \
+		_target.Write = pg_atomic_read_membarrier_u64(&XLogCtl->logWriteResult); \
+		_target.Flush = pg_atomic_read_membarrier_u64(&XLogCtl->logFlushResult); \
 	} while (0)
 
 /*
@@ -1989,12 +1981,12 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
 			if (opportunistic)
 				break;
 
-			/* Before waiting, get info_lck and update LogwrtResult */
+			/* Advance shared memory write request position */
 			SpinLockAcquire(&XLogCtl->info_lck);
 			if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr)
 				XLogCtl->LogwrtRqst.Write = OldPageRqstPtr;
-			XLogUpdateLocalLogwrtResult(LogwrtResult);
 			SpinLockRelease(&XLogCtl->info_lck);
+			XLogUpdateLocalLogwrtResult(LogwrtResult);
 
 			/*
 			 * Now that we have an up-to-date LogwrtResult value, see if we
@@ -2558,13 +2550,26 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 	 */
 	{
 		SpinLockAcquire(&XLogCtl->info_lck);
-		XLogCtl->logWriteResult = LogwrtResult.Write;
-		XLogCtl->logFlushResult = LogwrtResult.Flush;
 		if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write)
 			XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
 		if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush)
 			XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
 		SpinLockRelease(&XLogCtl->info_lck);
+
+		pg_atomic_monotonic_advance_u64(&XLogCtl->logWriteResult,
+										LogwrtResult.Write);
+		pg_atomic_monotonic_advance_u64(&XLogCtl->logFlushResult,
+										LogwrtResult.Flush);
+
+#ifdef USE_ASSERT_CHECKING
+		{
+			XLogRecPtr	Write = pg_atomic_read_membarrier_u64(&XLogCtl->logWriteResult);
+			XLogRecPtr	Flush = pg_atomic_read_membarrier_u64(&XLogCtl->logFlushResult);
+
+			/* WAL flushed to disk is always ahead of WAL written */
+			Assert(Write >= Flush);
+		}
+#endif
 	}
 }
 
@@ -2582,12 +2587,12 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
 	XLogRecPtr	prevAsyncXactLSN;
 
 	SpinLockAcquire(&XLogCtl->info_lck);
-	XLogUpdateLocalLogwrtResult(LogwrtResult);
 	sleeping = XLogCtl->WalWriterSleeping;
 	prevAsyncXactLSN = XLogCtl->asyncXactLSN;
 	if (XLogCtl->asyncXactLSN < asyncXactLSN)
 		XLogCtl->asyncXactLSN = asyncXactLSN;
 	SpinLockRelease(&XLogCtl->info_lck);
+	XLogUpdateLocalLogwrtResult(LogwrtResult);
 
 	/*
 	 * If somebody else already called this function with a more aggressive
@@ -2794,8 +2799,8 @@ XLogFlush(XLogRecPtr record)
 		SpinLockAcquire(&XLogCtl->info_lck);
 		if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write)
 			WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
-		XLogUpdateLocalLogwrtResult(LogwrtResult);
 		SpinLockRelease(&XLogCtl->info_lck);
+		XLogUpdateLocalLogwrtResult(LogwrtResult);
 
 		/* done already? */
 		if (record <= LogwrtResult.Flush)
@@ -2949,9 +2954,9 @@ XLogBackgroundFlush(void)
 
 	/* read LogwrtResult and update local state */
 	SpinLockAcquire(&XLogCtl->info_lck);
-	XLogUpdateLocalLogwrtResult(LogwrtResult);
 	WriteRqst = XLogCtl->LogwrtRqst;
 	SpinLockRelease(&XLogCtl->info_lck);
+	XLogUpdateLocalLogwrtResult(LogwrtResult);
 
 	/* back off to last completed page boundary */
 	WriteRqst.Write -= WriteRqst.Write % XLOG_BLCKSZ;
@@ -3125,9 +3130,7 @@ XLogNeedsFlush(XLogRecPtr record)
 		return false;
 
 	/* read LogwrtResult and update local state */
-	SpinLockAcquire(&XLogCtl->info_lck);
 	XLogUpdateLocalLogwrtResult(LogwrtResult);
-	SpinLockRelease(&XLogCtl->info_lck);
 
 	/* check again */
 	if (record <= LogwrtResult.Flush)
@@ -4938,6 +4941,9 @@ XLOGShmemInit(void)
 	XLogCtl->InstallXLogFileSegmentActive = false;
 	XLogCtl->WalWriterSleeping = false;
 
+	pg_atomic_init_u64(&XLogCtl->logWriteResult, InvalidXLogRecPtr);
+	pg_atomic_init_u64(&XLogCtl->logFlushResult, InvalidXLogRecPtr);
+
 	SpinLockInit(&XLogCtl->Insert.insertpos_lck);
 	SpinLockInit(&XLogCtl->info_lck);
 	pg_atomic_init_u64(&XLogCtl->unloggedLSN, InvalidXLogRecPtr);
@@ -5961,10 +5967,14 @@ StartupXLOG(void)
 		XLogCtl->InitializedUpTo = EndOfLog;
 	}
 
+	/*
+	 * Update local and shared status.  This is OK to do without any locks
+	 * because no other process can be reading or writing WAL yet.
+	 */
 	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
 
-	XLogCtl->logWriteResult = LogwrtResult.Write;
-	XLogCtl->logFlushResult = LogwrtResult.Flush;
+	pg_atomic_write_u64(&XLogCtl->logWriteResult, EndOfLog);
+	pg_atomic_write_u64(&XLogCtl->logFlushResult, EndOfLog);
 
 	XLogCtl->LogwrtRqst.Write = EndOfLog;
 	XLogCtl->LogwrtRqst.Flush = EndOfLog;
@@ -6410,9 +6420,7 @@ GetFlushRecPtr(TimeLineID *insertTLI)
 {
 	Assert(XLogCtl->SharedRecoveryState == RECOVERY_STATE_DONE);
 
-	SpinLockAcquire(&XLogCtl->info_lck);
 	XLogUpdateLocalLogwrtResult(LogwrtResult);
-	SpinLockRelease(&XLogCtl->info_lck);
 
 	/*
 	 * If we're writing and flushing WAL, the time line can't be changing, so
@@ -9326,9 +9334,7 @@ GetXLogInsertRecPtr(void)
 XLogRecPtr
 GetXLogWriteRecPtr(void)
 {
-	SpinLockAcquire(&XLogCtl->info_lck);
 	XLogUpdateLocalLogwrtResult(LogwrtResult);
-	SpinLockRelease(&XLogCtl->info_lck);
 
 	return LogwrtResult.Write;
 }
diff --git a/src/include/port/atomics.h b/src/include/port/atomics.h
index ff47782cdb..e962b9f6d3 100644
--- a/src/include/port/atomics.h
+++ b/src/include/port/atomics.h
@@ -570,6 +570,39 @@ pg_atomic_sub_fetch_u64(volatile pg_atomic_uint64 *ptr, int64 sub_)
 	return pg_atomic_sub_fetch_u64_impl(ptr, sub_);
 }
 
+/*
+ * Monotonically advance the given variable using only atomic operations until
+ * it's at least the target value.
+ *
+ * Full barrier semantics (even when value is unchanged).
+ */
+static inline void
+pg_atomic_monotonic_advance_u64(volatile pg_atomic_uint64 *ptr, uint64 target_)
+{
+	uint64		currval;
+
+#ifndef PG_HAVE_ATOMIC_U64_SIMULATION
+	AssertPointerAlignment(ptr, 8);
+#endif
+
+	currval = pg_atomic_read_u64_impl(ptr);
+	if (currval >= target_)
+	{
+		pg_memory_barrier();
+		return;
+	}
+
+#ifndef PG_HAVE_ATOMIC_U64_SIMULATION
+	AssertPointerAlignment(&currval, 8);
+#endif
+
+	while (currval < target_)
+	{
+		if (pg_atomic_compare_exchange_u64_impl(ptr, &currval, target_))
+			break;
+	}
+}
+
 #undef INSIDE_ATOMICS_H
 
 #endif							/* ATOMICS_H */
-- 
2.39.2

