From 18b1f01230fe768b807c8391789a9920b6120e1a Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Tue, 5 Jan 2021 10:10:36 -0800
Subject: [PATCH v2.4 02/29] Allow lwlocks to be unowned

This is required for AIO so that the lock hold during a write can be released
in another backend. Which in turn is required to avoid the potential for
deadlocks.
---
 src/include/storage/lwlock.h      |   2 +
 src/backend/storage/lmgr/lwlock.c | 108 +++++++++++++++++++++++-------
 2 files changed, 85 insertions(+), 25 deletions(-)

diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index 2aa46fd50da..13a7dc89980 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -129,6 +129,8 @@ extern bool LWLockAcquireOrWait(LWLock *lock, LWLockMode mode);
 extern void LWLockRelease(LWLock *lock);
 extern void LWLockReleaseClearVar(LWLock *lock, pg_atomic_uint64 *valptr, uint64 val);
 extern void LWLockReleaseAll(void);
+extern void LWLockDisown(LWLock *l);
+extern void LWLockReleaseDisowned(LWLock *l, LWLockMode mode);
 extern bool LWLockHeldByMe(LWLock *lock);
 extern bool LWLockAnyHeldByMe(LWLock *lock, int nlocks, size_t stride);
 extern bool LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode);
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index f1e74f184f1..b02625194be 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -1773,36 +1773,15 @@ LWLockUpdateVar(LWLock *lock, pg_atomic_uint64 *valptr, uint64 val)
 	}
 }
 
-
 /*
- * LWLockRelease - release a previously acquired lock
+ * Helper function to release lock, shared between LWLockRelease() and
+ * LWLockeleaseDisowned().
  */
-void
-LWLockRelease(LWLock *lock)
+static void
+LWLockReleaseInternal(LWLock *lock, LWLockMode mode)
 {
-	LWLockMode	mode;
 	uint32		oldstate;
 	bool		check_waiters;
-	int			i;
-
-	/*
-	 * Remove lock from list of locks held.  Usually, but not always, it will
-	 * be the latest-acquired lock; so search array backwards.
-	 */
-	for (i = num_held_lwlocks; --i >= 0;)
-		if (lock == held_lwlocks[i].lock)
-			break;
-
-	if (i < 0)
-		elog(ERROR, "lock %s is not held", T_NAME(lock));
-
-	mode = held_lwlocks[i].mode;
-
-	num_held_lwlocks--;
-	for (; i < num_held_lwlocks; i++)
-		held_lwlocks[i] = held_lwlocks[i + 1];
-
-	PRINT_LWDEBUG("LWLockRelease", lock, mode);
 
 	/*
 	 * Release my hold on lock, after that it can immediately be acquired by
@@ -1840,6 +1819,85 @@ LWLockRelease(LWLock *lock)
 		LOG_LWDEBUG("LWLockRelease", lock, "releasing waiters");
 		LWLockWakeup(lock);
 	}
+}
+
+void
+LWLockReleaseDisowned(LWLock *lock, LWLockMode mode)
+{
+	LWLockReleaseInternal(lock, mode);
+}
+
+/*
+ * Stop treating lock as held by current backend.
+ *
+ * This is the code that can be shared between actually releasing a lock
+ * (LWLockRelease()) and just not tracking ownership of the lock anymore
+ * without releasing the lock (LWLockDisown()).
+ *
+ * Returns the mode in which the lock was held by the current backend.
+ *
+ * NB: This does not call RESUME_INTERRUPTS(), but leaves that responsibility
+ * of the caller.
+ *
+ * NB: This will leave lock->owner pointing to the current backend (if
+ * LOCK_DEBUG is set). This is somewhat intentional, as it makes it easier to
+ * debug cases of missing wakeups during lock release.
+ */
+static inline LWLockMode
+LWLockDisownInternal(LWLock *lock)
+{
+	LWLockMode	mode;
+	int			i;
+
+	/*
+	 * Remove lock from list of locks held.  Usually, but not always, it will
+	 * be the latest-acquired lock; so search array backwards.
+	 */
+	for (i = num_held_lwlocks; --i >= 0;)
+		if (lock == held_lwlocks[i].lock)
+			break;
+
+	if (i < 0)
+		elog(ERROR, "lock %s is not held", T_NAME(lock));
+
+	mode = held_lwlocks[i].mode;
+
+	num_held_lwlocks--;
+	for (; i < num_held_lwlocks; i++)
+		held_lwlocks[i] = held_lwlocks[i + 1];
+
+	return mode;
+}
+
+/*
+ * Stop treating lock as held by current backend.
+ *
+ * After calling this function it's the callers responsibility to ensure that
+ * the lock gets released (via LWLockReleaseDisowned()), even in case of an
+ * error. This only is desirable if the lock is going to be released in a
+ * different process than the process that acquired it.
+ */
+void
+LWLockDisown(LWLock *lock)
+{
+	LWLockDisownInternal(lock);
+
+	RESUME_INTERRUPTS();
+}
+
+/*
+ * LWLockRelease - release a previously acquired lock
+ */
+void
+LWLockRelease(LWLock *lock)
+{
+	LWLockMode	mode;
+
+	mode = LWLockDisownInternal(lock);
+
+	PRINT_LWDEBUG("LWLockRelease", lock, mode);
+
+	LWLockReleaseInternal(lock, mode);
 
 	/*
 	 * Now okay to allow cancel/die interrupts.
-- 
2.48.1.76.g4e746b1a31.dirty

