From 6e61b1b2d2202c23674f27ba80cd50ff596fb95b Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Tue, 18 Nov 2025 09:22:28 -0500
Subject: [PATCH v6 12/14] Require share-exclusive lock to set hint bits

At the moment hint bits can be set with just a share lock on a page (and in
one place even without any lock). Because of this we need to copy pages while
writing them out, as otherwise the checksum could be corrupted.

The need to copy the page is problematic to implement AIO writes:

1) Instead of just needing a single buffer for a copied page we need one for
   each page that's potentially undergoing IO
2) To be able to use the "worker" AIO implementation the copied page needs to
   reside in shared memory.

It also causes problems for using unbuffered/direct-IO, independent of AIO:
Some filesystems, raid implementations, ... do not tolerate the data being
written out to change during the write. E.g. they may compute internal
checksums that can be invalidated by concurrent modifications, leading e.g. to
filesystem errors (as the case with btrfs).

It also just is plain odd to allow modifications of buffers that are just
share locked.

To address these issue, this commit changes the rules so that modifications to
pages are not allowed anymore while holding a share lock. Instead the new
share-exclusive lock (introduced in FIXME XXXX TODO) allows at most one
backend to modify a buffer while other backends have the same page share
locked. An existing share-lock can be upgraded to a share-exclusive lock, if
there are no conflicting locks. For that
BufferBeginSetHintBits()/BufferBeginSetHintBits() and BufferSetHintBits16()
have been introduced.

The biggest change to adapt to this is in heapam. To avoid performance
regressions for sequential scans that need to set a lot of hint bits, we need
to amortize the cost of BufferBeginSetHintBits() for cases where hint bits are
set at a high frequency, HeapTupleSatisfiesMVCCBatch() uses the new
SetHintBitsExt() which defers BufferFinishSetHintBits() until all hint bits on
a page have been set.  Conversely, to avoid regressions in cases where we
can't set hint bits in bulk (because we're looking only at individual tuples),
use BufferSetHintBits16() when setting hint bits without batching.

Several other places also need to be adapted, but those changes are
comparatively simpler.

After this we do not need to copy buffers to write them out anymore. That
change is done separately however.

TODO:
- Address FIXMEs
- reflow parts of storage/buffer/README that I didn't reindent to make the
  diff more readable

Discussion: https://postgr.es/m/fvfmkr5kk4nyex56ejgxj3uzi63isfxovp2biecb4bspbjrze7@az2pljabhnff
Discussion: https://postgr.es/m/stj36ea6yyhoxtqkhpieia2z4krnam7qyetc57rfezgk4zgapf%40gcnactj4z56m
---
 src/include/storage/bufmgr.h                |   4 +
 src/backend/access/gist/gistget.c           |  19 +-
 src/backend/access/hash/hashutil.c          |  10 +-
 src/backend/access/heap/heapam_visibility.c | 124 ++++++++--
 src/backend/access/nbtree/nbtinsert.c       |  28 ++-
 src/backend/access/nbtree/nbtutils.c        |  16 +-
 src/backend/storage/buffer/README           |  32 ++-
 src/backend/storage/buffer/bufmgr.c         | 241 +++++++++++++++-----
 src/backend/storage/freespace/freespace.c   |  20 +-
 src/backend/storage/freespace/fsmpage.c     |  11 +-
 src/tools/pgindent/typedefs.list            |   1 +
 11 files changed, 392 insertions(+), 114 deletions(-)

diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 8e442492d4d..afa16afffc9 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -302,6 +302,10 @@ extern void BufferGetTag(Buffer buffer, RelFileLocator *rlocator,
 
 extern void MarkBufferDirtyHint(Buffer buffer, bool buffer_std);
 
+extern bool BufferSetHintBits16(uint16 *ptr, uint16 val, Buffer buffer);
+extern bool BufferBeginSetHintBits(Buffer buffer);
+extern void BufferFinishSetHintBits(Buffer buffer, bool mark_dirty, bool buffer_std);
+
 extern void UnlockBuffers(void);
 extern void UnlockBuffer(Buffer buffer);
 extern void LockBufferInternal(Buffer buffer, BufferLockMode mode);
diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c
index 9ba45acfff3..956ece6bed5 100644
--- a/src/backend/access/gist/gistget.c
+++ b/src/backend/access/gist/gistget.c
@@ -63,11 +63,7 @@ gistkillitems(IndexScanDesc scan)
 	 * safe.
 	 */
 	if (BufferGetLSNAtomic(buffer) != so->curPageLSN)
-	{
-		UnlockReleaseBuffer(buffer);
-		so->numKilled = 0;		/* reset counter */
-		return;
-	}
+		goto unlock;
 
 	Assert(GistPageIsLeaf(page));
 
@@ -77,6 +73,16 @@ gistkillitems(IndexScanDesc scan)
 	 */
 	for (i = 0; i < so->numKilled; i++)
 	{
+		if (!killedsomething)
+		{
+			/*
+			 * Use hint bit infrastructure to be allowed to modify the page
+			 * without holding an exclusive lock.
+			 */
+			if (!BufferBeginSetHintBits(buffer))
+				goto unlock;
+		}
+
 		offnum = so->killedItems[i];
 		iid = PageGetItemId(page, offnum);
 		ItemIdMarkDead(iid);
@@ -86,9 +92,10 @@ gistkillitems(IndexScanDesc scan)
 	if (killedsomething)
 	{
 		GistMarkPageHasGarbage(page);
-		MarkBufferDirtyHint(buffer, true);
+		BufferFinishSetHintBits(buffer, true, true);
 	}
 
+unlock:
 	UnlockReleaseBuffer(buffer);
 
 	/*
diff --git a/src/backend/access/hash/hashutil.c b/src/backend/access/hash/hashutil.c
index f41233fcd07..d1d603770b2 100644
--- a/src/backend/access/hash/hashutil.c
+++ b/src/backend/access/hash/hashutil.c
@@ -593,6 +593,13 @@ _hash_kill_items(IndexScanDesc scan)
 
 			if (ItemPointerEquals(&ituple->t_tid, &currItem->heapTid))
 			{
+				/*
+				 * Use hint bit infrastructure to be allowed to modify the
+				 * page without holding an exclusive lock.
+				 */
+				if (!BufferBeginSetHintBits(so->currPos.buf))
+					goto unlock_page;
+
 				/* found the item */
 				ItemIdMarkDead(iid);
 				killedsomething = true;
@@ -610,9 +617,10 @@ _hash_kill_items(IndexScanDesc scan)
 	if (killedsomething)
 	{
 		opaque->hasho_flag |= LH_PAGE_HAS_DEAD_TUPLES;
-		MarkBufferDirtyHint(buf, true);
+		BufferFinishSetHintBits(so->currPos.buf, true, true);
 	}
 
+unlock_page:
 	if (so->hashso_bucket_buf == so->currPos.buf ||
 		havePin)
 		LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK);
diff --git a/src/backend/access/heap/heapam_visibility.c b/src/backend/access/heap/heapam_visibility.c
index 5645cfd8a49..630ba7df167 100644
--- a/src/backend/access/heap/heapam_visibility.c
+++ b/src/backend/access/heap/heapam_visibility.c
@@ -80,10 +80,38 @@
 
 
 /*
- * SetHintBits()
+ * To be allowed to set hint bits, SetHintBits() needs to call
+ * BufferBeginSetHintBits(). However, that's not free, and some callsites call
+ * SetHintBits() on many tuples in a row. For those it makes sense to amortize
+ * the cost of BufferBeginSetHintBits(). Additionally it's desirable to defer
+ * the cost of BufferBeginSetHintBits() until a hint bit needs to actually be
+ * set. This enum serves as the necessary state space passed to
+ * SetHintbitsExt().
+ */
+typedef enum SetHintBitsState
+{
+	/* not yet checked if hint bits may be set */
+	SHB_INITIAL,
+	/* failed to get permission to set hint bits, don't check again */
+	SHB_DISABLED,
+	/* allowed to set hint bits */
+	SHB_ENABLED,
+} SetHintBitsState;
+
+/*
+ * SetHintBitsExt()
  *
  * Set commit/abort hint bits on a tuple, if appropriate at this time.
  *
+ * To be allowed to set a hint bit on a tuple, the page must not be undergoing
+ * IO at this time (otherwise we e.g. could corrupt PG's page checksum or even
+ * the filesystem's, as is known to happen with btrfs).
+ *
+ * The right to set a hint bit can be acquired on a page level with
+ * BufferBeginSetHintBits(). Only a single backend gets the right to set hint
+ * bits at a time.  Alternatively, if called with a NULL SetHintBitsState*,
+ * hint bits are set with BufferSetHintBits16().
+ *
  * It is only safe to set a transaction-committed hint bit if we know the
  * transaction's commit record is guaranteed to be flushed to disk before the
  * buffer, or if the table is temporary or unlogged and will be obliterated by
@@ -111,24 +139,68 @@
  * InvalidTransactionId if no check is needed.
  */
 static inline void
-SetHintBits(HeapTupleHeader tuple, Buffer buffer,
-			uint16 infomask, TransactionId xid)
+SetHintBitsExt(HeapTupleHeader tuple, Buffer buffer,
+			   uint16 infomask, TransactionId xid, SetHintBitsState *state)
 {
 	if (TransactionIdIsValid(xid))
 	{
-		/* NB: xid must be known committed here! */
-		XLogRecPtr	commitLSN = TransactionIdGetCommitLSN(xid);
+		if (BufferIsPermanent(buffer))
+		{
+			/* NB: xid must be known committed here! */
+			XLogRecPtr	commitLSN = TransactionIdGetCommitLSN(xid);
+
+			if (XLogNeedsFlush(commitLSN) &&
+				BufferGetLSNAtomic(buffer) < commitLSN)
+			{
+				/* not flushed and no LSN interlock, so don't set hint */
+				return;
+			}
+		}
+	}
+
+	/*
+	 * If we're not operating in batch mode, use BufferSetHintBits16 to mark
+	 * the page dirty, that's cheaper than
+	 * BufferBeginSetHintBits()/BufferFinishSetHintBits(). That's important
+	 * for cases where we set a lot of hint bits on a page individually.
+	 */
+	if (!state)
+	{
+		BufferSetHintBits16(&tuple->t_infomask, tuple->t_infomask | infomask, buffer);
+		return;
+	}
+
+	/*
+	 * In batched mode and we previously did not get permission to set hint
+	 * bits. Don't try again, in all likelihood IO is still going on.
+	 */
+	if (*state == SHB_DISABLED)
+		return;
 
-		if (BufferIsPermanent(buffer) && XLogNeedsFlush(commitLSN) &&
-			BufferGetLSNAtomic(buffer) < commitLSN)
+	if (*state == SHB_INITIAL)
+	{
+		if (!BufferBeginSetHintBits(buffer))
 		{
-			/* not flushed and no LSN interlock, so don't set hint */
+			*state = SHB_DISABLED;
 			return;
 		}
+
+		if (state)
+			*state = SHB_ENABLED;
+
 	}
-
 	tuple->t_infomask |= infomask;
-	MarkBufferDirtyHint(buffer, true);
+}
+
+/*
+ * Simple wrapper around SetHintBitExt(), use when operating on a single
+ * tuple.
+ */
+static inline void
+SetHintBits(HeapTupleHeader tuple, Buffer buffer,
+			uint16 infomask, TransactionId xid)
+{
+	SetHintBitsExt(tuple, buffer, infomask, xid, NULL);
 }
 
 /*
@@ -864,9 +936,9 @@ HeapTupleSatisfiesDirty(HeapTuple htup, Snapshot snapshot,
  * inserting/deleting transaction was still running --- which was more cycles
  * and more contention on ProcArrayLock.
  */
-static bool
+static inline bool
 HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
-					   Buffer buffer)
+					   Buffer buffer, SetHintBitsState *state)
 {
 	HeapTupleHeader tuple = htup->t_data;
 
@@ -921,8 +993,8 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
 			if (!TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetRawXmax(tuple)))
 			{
 				/* deleting subtransaction must have aborted */
-				SetHintBits(tuple, buffer, HEAP_XMAX_INVALID,
-							InvalidTransactionId);
+				SetHintBitsExt(tuple, buffer, HEAP_XMAX_INVALID,
+							   InvalidTransactionId, state);
 				return true;
 			}
 
@@ -934,13 +1006,13 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
 		else if (XidInMVCCSnapshot(HeapTupleHeaderGetRawXmin(tuple), snapshot))
 			return false;
 		else if (TransactionIdDidCommit(HeapTupleHeaderGetRawXmin(tuple)))
-			SetHintBits(tuple, buffer, HEAP_XMIN_COMMITTED,
-						HeapTupleHeaderGetRawXmin(tuple));
+			SetHintBitsExt(tuple, buffer, HEAP_XMIN_COMMITTED,
+						   HeapTupleHeaderGetRawXmin(tuple), state);
 		else
 		{
 			/* it must have aborted or crashed */
-			SetHintBits(tuple, buffer, HEAP_XMIN_INVALID,
-						InvalidTransactionId);
+			SetHintBitsExt(tuple, buffer, HEAP_XMIN_INVALID,
+						   InvalidTransactionId, state);
 			return false;
 		}
 	}
@@ -1003,14 +1075,14 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
 		if (!TransactionIdDidCommit(HeapTupleHeaderGetRawXmax(tuple)))
 		{
 			/* it must have aborted or crashed */
-			SetHintBits(tuple, buffer, HEAP_XMAX_INVALID,
-						InvalidTransactionId);
+			SetHintBitsExt(tuple, buffer, HEAP_XMAX_INVALID,
+						   InvalidTransactionId, state);
 			return true;
 		}
 
 		/* xmax transaction committed */
-		SetHintBits(tuple, buffer, HEAP_XMAX_COMMITTED,
-					HeapTupleHeaderGetRawXmax(tuple));
+		SetHintBitsExt(tuple, buffer, HEAP_XMAX_COMMITTED,
+					   HeapTupleHeaderGetRawXmax(tuple), state);
 	}
 	else
 	{
@@ -1606,6 +1678,7 @@ HeapTupleSatisfiesMVCCBatch(Snapshot snapshot, Buffer buffer,
 							OffsetNumber *vistuples_dense)
 {
 	int			nvis = 0;
+	SetHintBitsState state = SHB_INITIAL;
 #ifdef BATCHMVCC_FEWER_ARGS
 	HeapTupleData *tuples = batchmvcc->tuples;
 	bool	   *visible = batchmvcc->visible;
@@ -1618,7 +1691,7 @@ HeapTupleSatisfiesMVCCBatch(Snapshot snapshot, Buffer buffer,
 		bool		valid;
 		HeapTuple	tup = &tuples[i];
 
-		valid = HeapTupleSatisfiesMVCC(tup, snapshot, buffer);
+		valid = HeapTupleSatisfiesMVCC(tup, snapshot, buffer, &state);
 		visible[i] = valid;
 
 		if (likely(valid))
@@ -1628,6 +1701,9 @@ HeapTupleSatisfiesMVCCBatch(Snapshot snapshot, Buffer buffer,
 		}
 	}
 
+	if (state == SHB_ENABLED)
+		BufferFinishSetHintBits(buffer, true, true);
+
 	return nvis;
 }
 
@@ -1647,7 +1723,7 @@ HeapTupleSatisfiesVisibility(HeapTuple htup, Snapshot snapshot, Buffer buffer)
 	switch (snapshot->snapshot_type)
 	{
 		case SNAPSHOT_MVCC:
-			return HeapTupleSatisfiesMVCC(htup, snapshot, buffer);
+			return HeapTupleSatisfiesMVCC(htup, snapshot, buffer, NULL);
 		case SNAPSHOT_SELF:
 			return HeapTupleSatisfiesSelf(htup, snapshot, buffer);
 		case SNAPSHOT_ANY:
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index 7c113c007e5..545e1d7d9e0 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -680,20 +680,28 @@ _bt_check_unique(Relation rel, BTInsertState insertstate, Relation heapRel,
 				{
 					/*
 					 * The conflicting tuple (or all HOT chains pointed to by
-					 * all posting list TIDs) is dead to everyone, so mark the
-					 * index entry killed.
+					 * all posting list TIDs) is dead to everyone, so try to
+					 * mark the index entry killed. It's ok if we're not
+					 * allowed to, this isn't required for correctness.
 					 */
-					ItemIdMarkDead(curitemid);
-					opaque->btpo_flags |= BTP_HAS_GARBAGE;
+					Buffer		buf;
 
-					/*
-					 * Mark buffer with a dirty hint, since state is not
-					 * crucial. Be sure to mark the proper buffer dirty.
-					 */
+					/* Be sure to operate on the proper buffer */
 					if (nbuf != InvalidBuffer)
-						MarkBufferDirtyHint(nbuf, true);
+						buf = nbuf;
 					else
-						MarkBufferDirtyHint(insertstate->buf, true);
+						buf = insertstate->buf;
+
+					/*
+					 * Can't use BufferSetHintBits16() here as we update two
+					 * different locations.
+					 */
+					if (BufferBeginSetHintBits(buf))
+					{
+						ItemIdMarkDead(curitemid);
+						opaque->btpo_flags |= BTP_HAS_GARBAGE;
+						BufferFinishSetHintBits(buf, true, true);
+					}
 				}
 
 				/*
diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c
index ab0f98b0287..34e548b9930 100644
--- a/src/backend/access/nbtree/nbtutils.c
+++ b/src/backend/access/nbtree/nbtutils.c
@@ -3542,10 +3542,19 @@ _bt_killitems(IndexScanDesc scan)
 			 * it's possible that multiple processes attempt to do this
 			 * simultaneously, leading to multiple full-page images being sent
 			 * to WAL (if wal_log_hints or data checksums are enabled), which
-			 * is undesirable.
+			 * is undesirable.  We need to use the hint bit infrastructure to
+			 * update the page while just holding a share lock.
 			 */
 			if (killtuple && !ItemIdIsDead(iid))
 			{
+				/*
+				 * If we're not able to set hint bits, there's no point
+				 * continuing.
+				 */
+				if (!killedsomething &&
+					!BufferBeginSetHintBits(buf))
+					goto unlock_page;
+
 				/* found the item/all posting list items */
 				ItemIdMarkDead(iid);
 				killedsomething = true;
@@ -3556,8 +3565,6 @@ _bt_killitems(IndexScanDesc scan)
 	}
 
 	/*
-	 * Since this can be redone later if needed, mark as dirty hint.
-	 *
 	 * Whenever we mark anything LP_DEAD, we also set the page's
 	 * BTP_HAS_GARBAGE flag, which is likewise just a hint.  (Note that we
 	 * only rely on the page-level flag in !heapkeyspace indexes.)
@@ -3565,9 +3572,10 @@ _bt_killitems(IndexScanDesc scan)
 	if (killedsomething)
 	{
 		opaque->btpo_flags |= BTP_HAS_GARBAGE;
-		MarkBufferDirtyHint(buf, true);
+		BufferFinishSetHintBits(buf, true, true);
 	}
 
+unlock_page:
 	if (!so->dropPin)
 		_bt_unlockbuf(rel, buf);
 	else
diff --git a/src/backend/storage/buffer/README b/src/backend/storage/buffer/README
index 119f31b5d65..9a4dc101c26 100644
--- a/src/backend/storage/buffer/README
+++ b/src/backend/storage/buffer/README
@@ -25,14 +25,20 @@ that might need to do such a wait is instead handled by waiting to obtain
 the relation-level lock, which is why you'd better hold one first.)  Pins
 may not be held across transaction boundaries, however.
 
-Buffer content locks: there are two kinds of buffer lock, shared and exclusive,
-which act just as you'd expect: multiple backends can hold shared locks on
-the same buffer, but an exclusive lock prevents anyone else from holding
-either shared or exclusive lock.  (These can alternatively be called READ
-and WRITE locks.)  These locks are intended to be short-term: they should not
-be held for long.  Buffer locks are acquired and released by LockBuffer().
-It will *not* work for a single backend to try to acquire multiple locks on
-the same buffer.  One must pin a buffer before trying to lock it.
+Buffer content locks: there three kinds of buffer lock, shared,
+share-exclusive and exclusive:
+a) multiple backends can hold shared locks on the same buffer
+   (alternatively called a READ lock)
+b) one backend can hold an share-exclusive lock on a buffer while multiple
+   backends can hold a share lock
+c) an exclusive lock prevents anyone else from holding either shared or
+   exclusive lock.
+   (alternatively called a WRITE lock)
+
+These locks are intended to be short-term: they should not be held for long.
+Buffer locks are acquired and released by LockBuffer().  It will *not* work
+for a single backend to try to acquire multiple locks on the same buffer.  One
+must pin a buffer before trying to lock it.
 
 Buffer access rules:
 
@@ -55,8 +61,14 @@ one must hold a pin and an exclusive content lock on the containing buffer.
 This ensures that no one else might see a partially-updated state of the
 tuple while they are doing visibility checks.
 
-4. It is considered OK to update tuple commit status bits (ie, OR the
-values HEAP_XMIN_COMMITTED, HEAP_XMIN_INVALID, HEAP_XMAX_COMMITTED, or
+4. Non-critical information on a page ("hint bits") may be modified while
+holding only a share-exclusive lock and pin on the page. To do so in cases
+where only a share lock is already held, use BufferBeginSetHintBits() &
+BufferFinishSetHintBits() (if multiple hint bits are to be set) or
+BufferSetHintBits16() (if a single hit bit is set).
+
+E.g. for heapam, a share-exclusive lock allows to update tuple commit status
+bits (ie, OR the values HEAP_XMIN_COMMITTED, HEAP_XMIN_INVALID, HEAP_XMAX_COMMITTED, or
 HEAP_XMAX_INVALID into t_infomask) while holding only a shared lock and
 pin on a buffer.  This is OK because another backend looking at the tuple
 at about the same time would OR the same bits into the field, so there
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index da83b775d0b..9ed7a368d74 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -2422,9 +2422,8 @@ again:
 	/*
 	 * If the buffer was dirty, try to write it out.  There is a race
 	 * condition here, in that someone might dirty it after we released the
-	 * buffer header lock above, or even while we are writing it out (since
-	 * our share-lock won't prevent hint-bit updates).  We will recheck the
-	 * dirty bit after re-locking the buffer header.
+	 * buffer header lock above.  We will recheck the dirty bit after
+	 * re-locking the buffer header.
 	 */
 	if (buf_state & BM_DIRTY)
 	{
@@ -2432,12 +2431,12 @@ again:
 		Assert(buf_state & BM_VALID);
 
 		/*
-		 * We need a share-lock on the buffer contents to write it out (else
+		 * We need a share-exclusive lock on the buffer contents to write it out (else
 		 * we might write invalid data, eg because someone else is compacting
 		 * the page contents while we write).  We must use a conditional lock
 		 * acquisition here to avoid deadlock.  Even though the buffer was not
 		 * pinned (and therefore surely not locked) when StrategyGetBuffer
-		 * returned it, someone else could have pinned and exclusive-locked it
+		 * returned it, someone else could have pinned and (share-)exclusive-locked it
 		 * by the time we get here. If we try to get the lock unconditionally,
 		 * we'd block waiting for them; if they later block waiting for us,
 		 * deadlock ensues. (This has been observed to happen when two
@@ -2445,7 +2444,7 @@ again:
 		 * one just happens to be trying to split the page the first one got
 		 * from StrategyGetBuffer.)
 		 */
-		if (!BufferLockConditional(buf, buf_hdr, BUFFER_LOCK_SHARE))
+		if (!BufferLockConditional(buf, buf_hdr, BUFFER_LOCK_SHARE_EXCLUSIVE))
 		{
 			/*
 			 * Someone else has locked the buffer, so give it up and loop back
@@ -4014,8 +4013,8 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context)
 	}
 
 	/*
-	 * Pin it, share-lock it, write it.  (FlushBuffer will do nothing if the
-	 * buffer is clean by the time we've locked it.)
+	 * Pin it, share-exclusive-lock it, write it.  (FlushBuffer will do
+	 * nothing if the buffer is clean by the time we've locked it.)
 	 */
 	PinBuffer_Locked(bufHdr);
 
@@ -4329,11 +4328,8 @@ BufferGetTag(Buffer buffer, RelFileLocator *rlocator, ForkNumber *forknum,
  * However, we will need to force the changes to disk via fsync before
  * we can checkpoint WAL.
  *
- * The caller must hold a pin on the buffer and have share-locked the
- * buffer contents.  (Note: a share-lock does not prevent updates of
- * hint bits in the buffer, so the page could change while the write
- * is in progress, but we assume that that will not invalidate the data
- * written.)
+ * The caller must hold a pin on the buffer and have
+ * (share-)exclusively-locked the buffer contents.
  *
  * If the caller has an smgr reference for the buffer's relation, pass it
  * as the second parameter.  If not, pass NULL.
@@ -4349,6 +4345,9 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	char	   *bufToWrite;
 	uint64		buf_state;
 
+	Assert(BufferLockHeldByMeInMode(buf, BUFFER_LOCK_EXCLUSIVE) ||
+		   BufferLockHeldByMeInMode(buf, BUFFER_LOCK_SHARE_EXCLUSIVE));
+
 	/*
 	 * Try to start an I/O operation.  If StartBufferIO returns false, then
 	 * someone else flushed the buffer before we could, so we need not do
@@ -4481,7 +4480,7 @@ FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln,
 {
 	Buffer		buffer = BufferDescriptorGetBuffer(buf);
 
-	BufferLockAcquire(buffer, buf, BUFFER_LOCK_SHARE);
+	BufferLockAcquire(buffer, buf, BUFFER_LOCK_SHARE_EXCLUSIVE);
 	FlushBuffer(buf, reln, IOOBJECT_RELATION, IOCONTEXT_NORMAL);
 	BufferLockUnlock(buffer, buf);
 }
@@ -5400,8 +5399,8 @@ FlushDatabaseBuffers(Oid dbid)
 }
 
 /*
- * Flush a previously, shared or exclusively, locked and pinned buffer to the
- * OS.
+ * Flush a previously, share-exclusively or exclusively, locked and pinned
+ * buffer to the OS.
  */
 void
 FlushOneBuffer(Buffer buffer)
@@ -5474,39 +5473,23 @@ IncrBufferRefCount(Buffer buffer)
 }
 
 /*
- * MarkBufferDirtyHint
+ * Shared-buffer only helper for MarkBufferDirtyHint() and
+ * BufferSetHintBits16().
  *
- *	Mark a buffer dirty for non-critical changes.
- *
- * This is essentially the same as MarkBufferDirty, except:
- *
- * 1. The caller does not write WAL; so if checksums are enabled, we may need
- *	  to write an XLOG_FPI_FOR_HINT WAL record to protect against torn pages.
- * 2. The caller might have only share-lock instead of exclusive-lock on the
- *	  buffer's content lock.
- * 3. This function does not guarantee that the buffer is always marked dirty
- *	  (due to a race condition), so it cannot be used for important changes.
+ * This is separated out because it turns out that the repeated checks for
+ * local buffers, repeated GetBufferDescriptor() and repeated reading of the
+ * buffer's state sufficiently hurts the performance of BufferSetHintBits16().
  */
-void
-MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
+static inline void
+MarkSharedBufferDirtyHint(Buffer buffer, BufferDesc *bufHdr, uint64 lockstate, bool buffer_std)
 {
-	BufferDesc *bufHdr;
 	Page		page = BufferGetPage(buffer);
 
-	if (!BufferIsValid(buffer))
-		elog(ERROR, "bad buffer ID: %d", buffer);
-
-	if (BufferIsLocal(buffer))
-	{
-		MarkLocalBufferDirty(buffer);
-		return;
-	}
-
-	bufHdr = GetBufferDescriptor(buffer - 1);
-
 	Assert(GetPrivateRefCount(buffer) > 0);
-	/* here, either share or exclusive lock is OK */
-	Assert(BufferIsLockedByMe(buffer));
+
+	/* here, either share-exclusive or exclusive lock is OK */
+	Assert(BufferLockHeldByMeInMode(bufHdr, BUFFER_LOCK_EXCLUSIVE) ||
+		   BufferLockHeldByMeInMode(bufHdr, BUFFER_LOCK_SHARE_EXCLUSIVE));
 
 	/*
 	 * This routine might get called many times on the same page, if we are
@@ -5519,8 +5502,8 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
 	 * is only intended to be used in cases where failing to write out the
 	 * data would be harmless anyway, it doesn't really matter.
 	 */
-	if ((pg_atomic_read_u64(&bufHdr->state) & (BM_DIRTY | BM_JUST_DIRTIED)) !=
-		(BM_DIRTY | BM_JUST_DIRTIED))
+	if (unlikely((lockstate & (BM_DIRTY | BM_JUST_DIRTIED)) !=
+				 (BM_DIRTY | BM_JUST_DIRTIED)))
 	{
 		XLogRecPtr	lsn = InvalidXLogRecPtr;
 		bool		dirtied = false;
@@ -5589,13 +5572,13 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
 			dirtied = true;		/* Means "will be dirtied by this action" */
 
 			/*
-			 * Set the page LSN if we wrote a backup block. We aren't supposed
-			 * to set this when only holding a share lock but as long as we
-			 * serialise it somehow we're OK. We choose to set LSN while
-			 * holding the buffer header lock, which causes any reader of an
-			 * LSN who holds only a share lock to also obtain a buffer header
-			 * lock before using PageGetLSN(), which is enforced in
-			 * BufferGetLSNAtomic().
+			 * Set the page LSN if we wrote a backup block. To allow backends
+			 * that only hold a share lock on the buffer to read the LSN in a
+			 * tear-free manner, we set the page LSN while holding the buffer
+			 * header lock. This allows any reader of an LSN who holds only a
+			 * share lock to also obtain a buffer header lock before using
+			 * PageGetLSN() to read the LSN in a tear free way. This is done
+			 * in BufferGetLSNAtomic().
 			 *
 			 * If checksums are enabled, you might think we should reset the
 			 * checksum here. That will happen when the page is written
@@ -5621,6 +5604,40 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
 	}
 }
 
+/*
+ * MarkBufferDirtyHint
+ *
+ *	Mark a buffer dirty for non-critical changes.
+ *
+ * This is essentially the same as MarkBufferDirty, except:
+ *
+ * 1. The caller does not write WAL; so if checksums are enabled, we may need
+ *	  to write an XLOG_FPI_FOR_HINT WAL record to protect against torn pages.
+ * 2. The caller might have only share-exclusive-lock instead of
+ *	  exclusive-lock on the buffer's content lock.
+ * 3. This function does not guarantee that the buffer is always marked dirty
+ *	  (due to a race condition), so it cannot be used for important changes.
+ */
+inline void
+MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
+{
+	BufferDesc *bufHdr;
+
+	bufHdr = GetBufferDescriptor(buffer - 1);
+
+	if (!BufferIsValid(buffer))
+		elog(ERROR, "bad buffer ID: %d", buffer);
+
+	if (BufferIsLocal(buffer))
+	{
+		MarkLocalBufferDirty(buffer);
+		return;
+	}
+
+	MarkSharedBufferDirtyHint(buffer, bufHdr, pg_atomic_read_u64(&bufHdr->state),
+							  buffer_std);
+}
+
 /*
  * Release buffer content locks for shared buffers.
  *
@@ -6673,6 +6690,126 @@ IsBufferCleanupOK(Buffer buffer)
 	return false;
 }
 
+static inline bool
+SharedBufferBeginSetHintBits(Buffer buffer, BufferDesc *buf_hdr, uint64 *lockstate)
+{
+	uint64		old_state;
+	PrivateRefCountEntry *ref;
+	BufferLockMode mode;
+
+	ref = GetPrivateRefCountEntry(buffer, true);
+
+	if (ref == NULL)
+		elog(ERROR, "lock is not held");
+
+	mode = ref->data.lockmode;
+	if (mode == BUFFER_LOCK_UNLOCK)
+		elog(ERROR, "buffer is not locked");
+
+	/*
+	 * Already am holding the required lock level.
+	 */
+	if (mode == BUFFER_LOCK_EXCLUSIVE || mode == BUFFER_LOCK_SHARE_EXCLUSIVE)
+	{
+		*lockstate = pg_atomic_read_u64(&buf_hdr->state);
+		return true;
+	}
+
+	/*
+	 * Only holding a share lock right now, try to upgrade to SHARE_EXCLUSIVE.
+	 */
+	Assert(mode == BUFFER_LOCK_SHARE);
+
+	old_state = pg_atomic_read_u64(&buf_hdr->state);
+	while (true)
+	{
+		uint64		desired_state;
+
+		desired_state = old_state;
+
+		/*
+		 * Can't upgrade if somebody else holds the lock in exlusive or
+		 * share-exclusive mode.
+		 */
+		if (unlikely((old_state & (BM_LOCK_VAL_EXCLUSIVE | BM_LOCK_VAL_SHARE_EXCLUSIVE)) != 0))
+		{
+			return false;
+		}
+
+		/* currently held lock state */
+		desired_state -= BM_LOCK_VAL_SHARED;
+
+		/* new lock level */
+		desired_state += BM_LOCK_VAL_SHARE_EXCLUSIVE;
+
+		if (likely(pg_atomic_compare_exchange_u64(&buf_hdr->state,
+												  &old_state, desired_state)))
+		{
+			ref->data.lockmode = BUFFER_LOCK_SHARE_EXCLUSIVE;
+			*lockstate = desired_state;
+
+			return true;
+		}
+	}
+
+}
+
+bool
+BufferSetHintBits16(uint16 *ptr, uint16 val, Buffer buffer)
+{
+	BufferDesc *buf_hdr;
+	uint64		lockstate;
+
+	if (BufferIsLocal(buffer))
+	{
+		*ptr = val;
+
+		MarkLocalBufferDirty(buffer);
+
+		return true;
+	}
+
+	buf_hdr = GetBufferDescriptor(buffer - 1);
+
+	if (SharedBufferBeginSetHintBits(buffer, buf_hdr, &lockstate))
+	{
+		*ptr = val;
+
+		MarkSharedBufferDirtyHint(buffer, buf_hdr, lockstate, true);
+
+		return true;
+	}
+
+	return false;
+}
+
+bool
+BufferBeginSetHintBits(Buffer buffer)
+{
+	BufferDesc *buf_hdr;
+	uint64		lockstate;
+
+	if (BufferIsLocal(buffer))
+	{
+		/*
+		 * TODO: will need to check for write IO once that's done
+		 * asynchronously.
+		 */
+
+		return true;
+	}
+
+	buf_hdr = GetBufferDescriptor(buffer - 1);
+
+	return SharedBufferBeginSetHintBits(buffer, buf_hdr, &lockstate);
+}
+
+void
+BufferFinishSetHintBits(Buffer buffer, bool mark_dirty, bool buffer_std)
+{
+	if (mark_dirty)
+		MarkBufferDirtyHint(buffer, buffer_std);
+}
 
 /*
  *	Functions for buffer I/O handling
diff --git a/src/backend/storage/freespace/freespace.c b/src/backend/storage/freespace/freespace.c
index 4773a9cc65e..6cdfbbeb260 100644
--- a/src/backend/storage/freespace/freespace.c
+++ b/src/backend/storage/freespace/freespace.c
@@ -904,14 +904,22 @@ fsm_vacuum_page(Relation rel, FSMAddress addr,
 	max_avail = fsm_get_max_avail(page);
 
 	/*
-	 * Reset the next slot pointer. This encourages the use of low-numbered
-	 * pages, increasing the chances that a later vacuum can truncate the
-	 * relation.  We don't bother with a lock here, nor with marking the page
-	 * dirty if it wasn't already, since this is just a hint.
+	 * Try to reset the next slot pointer. This encourages the use of
+	 * low-numbered pages, increasing the chances that a later vacuum can
+	 * truncate the relation.  We don't bother with a lock here, nor with
+	 * marking the page dirty if it wasn't already, since this is just a hint.
+	 *
+	 * To be allowed to update the page without an exclusive lock, we have to
+	 * use the hint bit infrastructure.
 	 */
-	((FSMPage) PageGetContents(page))->fp_next_slot = 0;
+	LockBuffer(buf, BUFFER_LOCK_SHARE);
+	if (BufferBeginSetHintBits(buf))
+	{
+		((FSMPage) PageGetContents(page))->fp_next_slot = 0;
+		BufferFinishSetHintBits(buf, false, false);
+	}
 
-	ReleaseBuffer(buf);
+	UnlockReleaseBuffer(buf);
 
 	return max_avail;
 }
diff --git a/src/backend/storage/freespace/fsmpage.c b/src/backend/storage/freespace/fsmpage.c
index 66a5c80b5a6..a59696b6484 100644
--- a/src/backend/storage/freespace/fsmpage.c
+++ b/src/backend/storage/freespace/fsmpage.c
@@ -298,9 +298,18 @@ restart:
 	 * lock and get a garbled next pointer every now and then, than take the
 	 * concurrency hit of an exclusive lock.
 	 *
+	 * Without an exclusive lock, we need to use the hint bit infrastructure
+	 * to be allowed to modify the page.
+	 *
 	 * Wrap-around is handled at the beginning of this function.
 	 */
-	fsmpage->fp_next_slot = slot + (advancenext ? 1 : 0);
+	if (exclusive_lock_held || BufferBeginSetHintBits(buf))
+	{
+		fsmpage->fp_next_slot = slot + (advancenext ? 1 : 0);
+
+		if (!exclusive_lock_held)
+			BufferFinishSetHintBits(buf, false, true);
+	}
 
 	return slot;
 }
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 9d14239b4c4..66e8f2e9fa6 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2731,6 +2731,7 @@ SetConstraintStateData
 SetConstraintTriggerData
 SetExprState
 SetFunctionReturnMode
+SetHintBitsState
 SetOp
 SetOpCmd
 SetOpPath
-- 
2.48.1.76.g4e746b1a31.dirty

