From 74e4b1a13b0a7e98704ac31c40a3364569e7f260 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Tue, 13 Jan 2026 20:10:32 -0500
Subject: [PATCH v12 2/6] Require share-exclusive lock to set hint bits and to
 flush

At the moment hint bits can be set with just a share lock on a page (and,
until 45f658dacb9, in one case even without any lock). Because of this we need
to copy pages while writing them out, as otherwise the checksum could be
corrupted.

The need to copy the page is problematic to implement AIO writes:

1) Instead of just needing a single buffer for a copied page we need one for
   each page that's potentially undergoing I/O
2) To be able to use the "worker" AIO implementation the copied page needs to
   reside in shared memory

It also causes problems for using unbuffered/direct-IO, independent of AIO:
Some filesystems, raid implementations, ... do not tolerate the data being
written out to change during the write. E.g. they may compute internal
checksums that can be invalidated by concurrent modifications, leading e.g. to
filesystem errors (as the case with btrfs).

It also just is plain odd to allow modifications of buffers that are just
share locked.

To address these issues, this commit changes the rules so that modifications
to pages are not allowed anymore while holding a share lock. Instead the new
share-exclusive lock (introduced in fcb9c977aa5) allows at most one backend to
modify a buffer while other backends have the same page share locked. An
existing share-lock can be upgraded to a share-exclusive lock, if there are no
conflicting locks. For that BufferBeginSetHintBits()/BufferFinishSetHintBits()
and BufferSetHintBits16() have been introduced.

To prevent hint bits from being set while the buffer is being written out,
writing out buffers now requires a share-exclusive lock.

The use of share-exclusive to gate setting hint bits means that from now on
only one backend can set hint bits at a time. To allow multiple backends to
set hint bits would require more complicated locking: For setting hint bits
we'd need to store the count of backends currently setting hint bits and we
would need another lock-level for I/O conflicting with the lock-level to set
hint bits. Given that the share-exclusive lock for setting hint bits is only
held for a short time, that backends would often just set the same hint bits
and that the cost of occasionally not setting hint bits in hotly accessed
pages is fairly low, this seems like an acceptable tradeoff.

The biggest change to adapt to this is in heapam. To avoid performance
regressions for sequential scans that need to set a lot of hint bits, we need
to amortize the cost of BufferBeginSetHintBits() for cases where hint bits are
set at a high frequency, HeapTupleSatisfiesMVCCBatch() uses the new
SetHintBitsExt(), which defers BufferFinishSetHintBits() until all hint bits
on a page have been set.  Conversely, to avoid regressions in cases where we
can't set hint bits in bulk (because we're looking only at individual tuples),
use BufferSetHintBits16() when setting hint bits without batching.

Several other places also need to be adapted, but those changes are
comparatively simpler.

After this we do not need to copy buffers to write them out anymore. That
change is done separately however.

Discussion: https://postgr.es/m/fvfmkr5kk4nyex56ejgxj3uzi63isfxovp2biecb4bspbjrze7@az2pljabhnff
Discussion: https://postgr.es/m/stj36ea6yyhoxtqkhpieia2z4krnam7qyetc57rfezgk4zgapf%40gcnactj4z56m
---
 src/include/storage/bufmgr.h                |   4 +
 src/backend/access/gist/gistget.c           |  20 +-
 src/backend/access/hash/hashutil.c          |  14 +-
 src/backend/access/heap/heapam_visibility.c | 130 +++++--
 src/backend/access/nbtree/nbtinsert.c       |  31 +-
 src/backend/access/nbtree/nbtutils.c        |  16 +-
 src/backend/access/transam/xloginsert.c     |  11 +-
 src/backend/storage/buffer/README           |  66 ++--
 src/backend/storage/buffer/bufmgr.c         | 389 +++++++++++++++-----
 src/backend/storage/freespace/freespace.c   |  14 +-
 src/backend/storage/freespace/fsmpage.c     |  11 +-
 src/tools/pgindent/typedefs.list            |   1 +
 12 files changed, 525 insertions(+), 182 deletions(-)

diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index a40adf6b2a8..4017896f951 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -314,6 +314,10 @@ extern void BufferGetTag(Buffer buffer, RelFileLocator *rlocator,
 
 extern void MarkBufferDirtyHint(Buffer buffer, bool buffer_std);
 
+extern bool BufferSetHintBits16(uint16 *ptr, uint16 val, Buffer buffer);
+extern bool BufferBeginSetHintBits(Buffer buffer);
+extern void BufferFinishSetHintBits(Buffer buffer, bool mark_dirty, bool buffer_std);
+
 extern void UnlockBuffers(void);
 extern void UnlockBuffer(Buffer buffer);
 extern void LockBufferInternal(Buffer buffer, BufferLockMode mode);
diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c
index 11b214eb99b..606b108a136 100644
--- a/src/backend/access/gist/gistget.c
+++ b/src/backend/access/gist/gistget.c
@@ -64,11 +64,7 @@ gistkillitems(IndexScanDesc scan)
 	 * safe.
 	 */
 	if (BufferGetLSNAtomic(buffer) != so->curPageLSN)
-	{
-		UnlockReleaseBuffer(buffer);
-		so->numKilled = 0;		/* reset counter */
-		return;
-	}
+		goto unlock;
 
 	Assert(GistPageIsLeaf(page));
 
@@ -78,6 +74,17 @@ gistkillitems(IndexScanDesc scan)
 	 */
 	for (i = 0; i < so->numKilled; i++)
 	{
+		if (!killedsomething)
+		{
+			/*
+			 * Use the hint bit infrastructure to check if we can update the
+			 * page while just holding a share lock. If we are not allowed,
+			 * there's no point continuing.
+			 */
+			if (!BufferBeginSetHintBits(buffer))
+				goto unlock;
+		}
+
 		offnum = so->killedItems[i];
 		iid = PageGetItemId(page, offnum);
 		ItemIdMarkDead(iid);
@@ -87,9 +94,10 @@ gistkillitems(IndexScanDesc scan)
 	if (killedsomething)
 	{
 		GistMarkPageHasGarbage(page);
-		MarkBufferDirtyHint(buffer, true);
+		BufferFinishSetHintBits(buffer, true, true);
 	}
 
+unlock:
 	UnlockReleaseBuffer(buffer);
 
 	/*
diff --git a/src/backend/access/hash/hashutil.c b/src/backend/access/hash/hashutil.c
index cf7f0b90176..3e16119d027 100644
--- a/src/backend/access/hash/hashutil.c
+++ b/src/backend/access/hash/hashutil.c
@@ -593,6 +593,17 @@ _hash_kill_items(IndexScanDesc scan)
 
 			if (ItemPointerEquals(&ituple->t_tid, &currItem->heapTid))
 			{
+				if (!killedsomething)
+				{
+					/*
+					 * Use the hint bit infrastructure to check if we can
+					 * update the page while just holding a share lock. If we
+					 * are not allowed, there's no point continuing.
+					 */
+					if (!BufferBeginSetHintBits(so->currPos.buf))
+						goto unlock_page;
+				}
+
 				/* found the item */
 				ItemIdMarkDead(iid);
 				killedsomething = true;
@@ -610,9 +621,10 @@ _hash_kill_items(IndexScanDesc scan)
 	if (killedsomething)
 	{
 		opaque->hasho_flag |= LH_PAGE_HAS_DEAD_TUPLES;
-		MarkBufferDirtyHint(buf, true);
+		BufferFinishSetHintBits(so->currPos.buf, true, true);
 	}
 
+unlock_page:
 	if (so->hashso_bucket_buf == so->currPos.buf ||
 		havePin)
 		LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK);
diff --git a/src/backend/access/heap/heapam_visibility.c b/src/backend/access/heap/heapam_visibility.c
index 75ae268d753..fc64f4343ce 100644
--- a/src/backend/access/heap/heapam_visibility.c
+++ b/src/backend/access/heap/heapam_visibility.c
@@ -80,10 +80,38 @@
 
 
 /*
- * SetHintBits()
+ * To be allowed to set hint bits, SetHintBits() needs to call
+ * BufferBeginSetHintBits(). However, that's not free, and some callsites call
+ * SetHintBits() on many tuples in a row. For those it makes sense to amortize
+ * the cost of BufferBeginSetHintBits(). Additionally it's desirable to defer
+ * the cost of BufferBeginSetHintBits() until a hint bit needs to actually be
+ * set. This enum serves as the necessary state space passed to
+ * SetHintBitsExt().
+ */
+typedef enum SetHintBitsState
+{
+	/* not yet checked if hint bits may be set */
+	SHB_INITIAL,
+	/* failed to get permission to set hint bits, don't check again */
+	SHB_DISABLED,
+	/* allowed to set hint bits */
+	SHB_ENABLED,
+} SetHintBitsState;
+
+/*
+ * SetHintBitsExt()
  *
  * Set commit/abort hint bits on a tuple, if appropriate at this time.
  *
+ * To be allowed to set a hint bit on a tuple, the page must not be undergoing
+ * IO at this time (otherwise we e.g. could corrupt PG's page checksum or even
+ * the filesystem's, as is known to happen with btrfs).
+ *
+ * The right to set a hint bit can be acquired on a page level with
+ * BufferBeginSetHintBits(). Only a single backend gets the right to set hint
+ * bits at a time.  Alternatively, if called with a NULL SetHintBitsState*,
+ * hint bits are set with BufferSetHintBits16().
+ *
  * It is only safe to set a transaction-committed hint bit if we know the
  * transaction's commit record is guaranteed to be flushed to disk before the
  * buffer, or if the table is temporary or unlogged and will be obliterated by
@@ -111,24 +139,67 @@
  * InvalidTransactionId if no check is needed.
  */
 static inline void
-SetHintBits(HeapTupleHeader tuple, Buffer buffer,
-			uint16 infomask, TransactionId xid)
+SetHintBitsExt(HeapTupleHeader tuple, Buffer buffer,
+			   uint16 infomask, TransactionId xid, SetHintBitsState *state)
 {
+	/*
+	 * In batched mode, if we previously did not get permission to set hint
+	 * bits, don't try again - in all likelihood IO is still going on.
+	 */
+	if (state && *state == SHB_DISABLED)
+		return;
+
 	if (TransactionIdIsValid(xid))
 	{
-		/* NB: xid must be known committed here! */
-		XLogRecPtr	commitLSN = TransactionIdGetCommitLSN(xid);
+		if (BufferIsPermanent(buffer))
+		{
+			/* NB: xid must be known committed here! */
+			XLogRecPtr	commitLSN = TransactionIdGetCommitLSN(xid);
+
+			if (XLogNeedsFlush(commitLSN) &&
+				BufferGetLSNAtomic(buffer) < commitLSN)
+			{
+				/* not flushed and no LSN interlock, so don't set hint */
+				return;
+			}
+		}
+	}
+
+	/*
+	 * If we're not operating in batch mode, use BufferSetHintBits16() to mark
+	 * the page dirty, that's cheaper than
+	 * BufferBeginSetHintBits()/BufferFinishSetHintBits(). That's important
+	 * for cases where we set a lot of hint bits on a page individually.
+	 */
+	if (!state)
+	{
+		BufferSetHintBits16(&tuple->t_infomask,
+							tuple->t_infomask | infomask, buffer);
+		return;
+	}
 
-		if (BufferIsPermanent(buffer) && XLogNeedsFlush(commitLSN) &&
-			BufferGetLSNAtomic(buffer) < commitLSN)
+	if (*state == SHB_INITIAL)
+	{
+		if (!BufferBeginSetHintBits(buffer))
 		{
-			/* not flushed and no LSN interlock, so don't set hint */
+			*state = SHB_DISABLED;
 			return;
 		}
-	}
 
+		*state = SHB_ENABLED;
+	}
 	tuple->t_infomask |= infomask;
-	MarkBufferDirtyHint(buffer, true);
+}
+
+/*
+ * Simple wrapper around SetHintBitExt(), use when operating on a single
+ * tuple.
+ */
+static inline void
+SetHintBits(HeapTupleHeader tuple, Buffer buffer,
+			uint16 infomask, TransactionId xid)
+{
+	SetHintBitsExt(tuple, buffer, infomask, xid, NULL);
 }
 
 /*
@@ -864,9 +935,9 @@ HeapTupleSatisfiesDirty(HeapTuple htup, Snapshot snapshot,
  * inserting/deleting transaction was still running --- which was more cycles
  * and more contention on ProcArrayLock.
  */
-static bool
+static inline bool
 HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
-					   Buffer buffer)
+					   Buffer buffer, SetHintBitsState *state)
 {
 	HeapTupleHeader tuple = htup->t_data;
 
@@ -921,8 +992,8 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
 			if (!TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetRawXmax(tuple)))
 			{
 				/* deleting subtransaction must have aborted */
-				SetHintBits(tuple, buffer, HEAP_XMAX_INVALID,
-							InvalidTransactionId);
+				SetHintBitsExt(tuple, buffer, HEAP_XMAX_INVALID,
+							   InvalidTransactionId, state);
 				return true;
 			}
 
@@ -934,13 +1005,13 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
 		else if (XidInMVCCSnapshot(HeapTupleHeaderGetRawXmin(tuple), snapshot))
 			return false;
 		else if (TransactionIdDidCommit(HeapTupleHeaderGetRawXmin(tuple)))
-			SetHintBits(tuple, buffer, HEAP_XMIN_COMMITTED,
-						HeapTupleHeaderGetRawXmin(tuple));
+			SetHintBitsExt(tuple, buffer, HEAP_XMIN_COMMITTED,
+						   HeapTupleHeaderGetRawXmin(tuple), state);
 		else
 		{
 			/* it must have aborted or crashed */
-			SetHintBits(tuple, buffer, HEAP_XMIN_INVALID,
-						InvalidTransactionId);
+			SetHintBitsExt(tuple, buffer, HEAP_XMIN_INVALID,
+						   InvalidTransactionId, state);
 			return false;
 		}
 	}
@@ -1003,14 +1074,14 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot,
 		if (!TransactionIdDidCommit(HeapTupleHeaderGetRawXmax(tuple)))
 		{
 			/* it must have aborted or crashed */
-			SetHintBits(tuple, buffer, HEAP_XMAX_INVALID,
-						InvalidTransactionId);
+			SetHintBitsExt(tuple, buffer, HEAP_XMAX_INVALID,
+						   InvalidTransactionId, state);
 			return true;
 		}
 
 		/* xmax transaction committed */
-		SetHintBits(tuple, buffer, HEAP_XMAX_COMMITTED,
-					HeapTupleHeaderGetRawXmax(tuple));
+		SetHintBitsExt(tuple, buffer, HEAP_XMAX_COMMITTED,
+					   HeapTupleHeaderGetRawXmax(tuple), state);
 	}
 	else
 	{
@@ -1607,9 +1678,10 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot,
  * ->vistuples_dense is set to contain the offsets of visible tuples.
  *
  * The reason this is more efficient than HeapTupleSatisfiesMVCC() is that it
- * avoids a cross-translation-unit function call for each tuple and allows the
- * compiler to optimize across calls to HeapTupleSatisfiesMVCC. In the future
- * it will also allow more efficient setting of hint bits.
+ * avoids a cross-translation-unit function call for each tuple, allows the
+ * compiler to optimize across calls to HeapTupleSatisfiesMVCC and allows
+ * setting hint bits more efficiently (see the one BufferFinishSetHintBits()
+ * call below).
  *
  * Returns the number of visible tuples.
  */
@@ -1620,6 +1692,7 @@ HeapTupleSatisfiesMVCCBatch(Snapshot snapshot, Buffer buffer,
 							OffsetNumber *vistuples_dense)
 {
 	int			nvis = 0;
+	SetHintBitsState state = SHB_INITIAL;
 
 	Assert(IsMVCCSnapshot(snapshot));
 
@@ -1628,7 +1701,7 @@ HeapTupleSatisfiesMVCCBatch(Snapshot snapshot, Buffer buffer,
 		bool		valid;
 		HeapTuple	tup = &batchmvcc->tuples[i];
 
-		valid = HeapTupleSatisfiesMVCC(tup, snapshot, buffer);
+		valid = HeapTupleSatisfiesMVCC(tup, snapshot, buffer, &state);
 		batchmvcc->visible[i] = valid;
 
 		if (likely(valid))
@@ -1638,6 +1711,9 @@ HeapTupleSatisfiesMVCCBatch(Snapshot snapshot, Buffer buffer,
 		}
 	}
 
+	if (state == SHB_ENABLED)
+		BufferFinishSetHintBits(buffer, true, true);
+
 	return nvis;
 }
 
@@ -1657,7 +1733,7 @@ HeapTupleSatisfiesVisibility(HeapTuple htup, Snapshot snapshot, Buffer buffer)
 	switch (snapshot->snapshot_type)
 	{
 		case SNAPSHOT_MVCC:
-			return HeapTupleSatisfiesMVCC(htup, snapshot, buffer);
+			return HeapTupleSatisfiesMVCC(htup, snapshot, buffer, NULL);
 		case SNAPSHOT_SELF:
 			return HeapTupleSatisfiesSelf(htup, snapshot, buffer);
 		case SNAPSHOT_ANY:
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index d17aaa5aa0f..796e1513ddf 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -681,20 +681,31 @@ _bt_check_unique(Relation rel, BTInsertState insertstate, Relation heapRel,
 				{
 					/*
 					 * The conflicting tuple (or all HOT chains pointed to by
-					 * all posting list TIDs) is dead to everyone, so mark the
-					 * index entry killed.
+					 * all posting list TIDs) is dead to everyone, so try to
+					 * mark the index entry killed. It's ok if we're not
+					 * allowed to, this isn't required for correctness.
 					 */
-					ItemIdMarkDead(curitemid);
-					opaque->btpo_flags |= BTP_HAS_GARBAGE;
+					Buffer		buf;
 
-					/*
-					 * Mark buffer with a dirty hint, since state is not
-					 * crucial. Be sure to mark the proper buffer dirty.
-					 */
+					/* Be sure to operate on the proper buffer */
 					if (nbuf != InvalidBuffer)
-						MarkBufferDirtyHint(nbuf, true);
+						buf = nbuf;
 					else
-						MarkBufferDirtyHint(insertstate->buf, true);
+						buf = insertstate->buf;
+
+					/*
+					 * Use the hint bit infrastructure to check if we can
+					 * update the page while just holding a share lock.
+					 *
+					 * Can't use BufferSetHintBits16() here as we update two
+					 * different locations.
+					 */
+					if (BufferBeginSetHintBits(buf))
+					{
+						ItemIdMarkDead(curitemid);
+						opaque->btpo_flags |= BTP_HAS_GARBAGE;
+						BufferFinishSetHintBits(buf, true, true);
+					}
 				}
 
 				/*
diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c
index 5c50f0dd1bd..76e6c6fbf88 100644
--- a/src/backend/access/nbtree/nbtutils.c
+++ b/src/backend/access/nbtree/nbtutils.c
@@ -361,6 +361,17 @@ _bt_killitems(IndexScanDesc scan)
 			 */
 			if (killtuple && !ItemIdIsDead(iid))
 			{
+				if (!killedsomething)
+				{
+					/*
+					 * Use the hint bit infrastructure to check if we can
+					 * update the page while just holding a share lock. If we
+					 * are not allowed, there's no point continuing.
+					 */
+					if (!BufferBeginSetHintBits(buf))
+						goto unlock_page;
+				}
+
 				/* found the item/all posting list items */
 				ItemIdMarkDead(iid);
 				killedsomething = true;
@@ -371,8 +382,6 @@ _bt_killitems(IndexScanDesc scan)
 	}
 
 	/*
-	 * Since this can be redone later if needed, mark as dirty hint.
-	 *
 	 * Whenever we mark anything LP_DEAD, we also set the page's
 	 * BTP_HAS_GARBAGE flag, which is likewise just a hint.  (Note that we
 	 * only rely on the page-level flag in !heapkeyspace indexes.)
@@ -380,9 +389,10 @@ _bt_killitems(IndexScanDesc scan)
 	if (killedsomething)
 	{
 		opaque->btpo_flags |= BTP_HAS_GARBAGE;
-		MarkBufferDirtyHint(buf, true);
+		BufferFinishSetHintBits(buf, true, true);
 	}
 
+unlock_page:
 	if (!so->dropPin)
 		_bt_unlockbuf(rel, buf);
 	else
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index d3acaa636c3..bd6e6f06389 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -1077,11 +1077,6 @@ XLogCheckBufferNeedsBackup(Buffer buffer)
  * We only need to do something if page has not yet been full page written in
  * this checkpoint round. The LSN of the inserted wal record is returned if we
  * had to write, InvalidXLogRecPtr otherwise.
- *
- * It is possible that multiple concurrent backends could attempt to write WAL
- * records. In that case, multiple copies of the same block would be recorded
- * in separate WAL records by different backends, though that is still OK from
- * a correctness perspective.
  */
 XLogRecPtr
 XLogSaveBufferForHint(Buffer buffer, bool buffer_std)
@@ -1102,11 +1097,9 @@ XLogSaveBufferForHint(Buffer buffer, bool buffer_std)
 
 	/*
 	 * We assume page LSN is first data on *every* page that can be passed to
-	 * XLogInsert, whether it has the standard page layout or not. Since we're
-	 * only holding a share-lock on the page, we must take the buffer header
-	 * lock when we look at the LSN.
+	 * XLogInsert, whether it has the standard page layout or not.
 	 */
-	lsn = BufferGetLSNAtomic(buffer);
+	lsn = PageGetLSN(BufferGetPage(buffer));
 
 	if (lsn <= RedoRecPtr)
 	{
diff --git a/src/backend/storage/buffer/README b/src/backend/storage/buffer/README
index 119f31b5d65..b332e002ba1 100644
--- a/src/backend/storage/buffer/README
+++ b/src/backend/storage/buffer/README
@@ -25,21 +25,26 @@ that might need to do such a wait is instead handled by waiting to obtain
 the relation-level lock, which is why you'd better hold one first.)  Pins
 may not be held across transaction boundaries, however.
 
-Buffer content locks: there are two kinds of buffer lock, shared and exclusive,
-which act just as you'd expect: multiple backends can hold shared locks on
-the same buffer, but an exclusive lock prevents anyone else from holding
-either shared or exclusive lock.  (These can alternatively be called READ
-and WRITE locks.)  These locks are intended to be short-term: they should not
-be held for long.  Buffer locks are acquired and released by LockBuffer().
-It will *not* work for a single backend to try to acquire multiple locks on
-the same buffer.  One must pin a buffer before trying to lock it.
+Buffer content locks: there are three kinds of buffer lock, shared,
+share-exclusive and exclusive:
+a) multiple backends can hold shared locks on the same buffer
+   (alternatively called a READ lock)
+b) one backend can hold a share-exclusive lock on a buffer while multiple
+   backends can hold a share lock
+c) an exclusive lock prevents anyone else from holding a shared,
+   share-exclusive or exclusive lock.
+   (alternatively called a WRITE lock)
+
+These locks are intended to be short-term: they should not be held for long.
+Buffer locks are acquired and released by LockBuffer().  It will *not* work
+for a single backend to try to acquire multiple locks on the same buffer.  One
+must pin a buffer before trying to lock it.
 
 Buffer access rules:
 
-1. To scan a page for tuples, one must hold a pin and either shared or
-exclusive content lock.  To examine the commit status (XIDs and status bits)
-of a tuple in a shared buffer, one must likewise hold a pin and either shared
-or exclusive lock.
+1. To scan a page for tuples, one must hold a pin and at least a share lock.
+To examine the commit status (XIDs and status bits) of a tuple in a shared
+buffer, one must likewise hold a pin and at least a share lock.
 
 2. Once one has determined that a tuple is interesting (visible to the
 current transaction) one may drop the content lock, yet continue to access
@@ -55,19 +60,25 @@ one must hold a pin and an exclusive content lock on the containing buffer.
 This ensures that no one else might see a partially-updated state of the
 tuple while they are doing visibility checks.
 
-4. It is considered OK to update tuple commit status bits (ie, OR the
-values HEAP_XMIN_COMMITTED, HEAP_XMIN_INVALID, HEAP_XMAX_COMMITTED, or
-HEAP_XMAX_INVALID into t_infomask) while holding only a shared lock and
-pin on a buffer.  This is OK because another backend looking at the tuple
-at about the same time would OR the same bits into the field, so there
-is little or no risk of conflicting update; what's more, if there did
-manage to be a conflict it would merely mean that one bit-update would
-be lost and need to be done again later.  These four bits are only hints
-(they cache the results of transaction status lookups in pg_xact), so no
-great harm is done if they get reset to zero by conflicting updates.
-Note, however, that a tuple is frozen by setting both HEAP_XMIN_INVALID
-and HEAP_XMIN_COMMITTED; this is a critical update and accordingly requires
-an exclusive buffer lock (and it must also be WAL-logged).
+4. Non-critical information on a page ("hint bits") may be modified while
+holding only a share-exclusive lock and pin on the page. To do so in cases
+where only a share lock is already held, use BufferBeginSetHintBits() &
+BufferFinishSetHintBits() (if multiple hint bits are to be set) or
+BufferSetHintBits16() (if a single hint bit is set).
+
+E.g. for heapam, a share-exclusive lock allows to update tuple commit status
+bits (ie, OR the values HEAP_XMIN_COMMITTED, HEAP_XMIN_INVALID,
+HEAP_XMAX_COMMITTED, or HEAP_XMAX_INVALID into t_infomask) while holding only
+a share-exclusive lock and pin on a buffer.  This is OK because another
+backend looking at the tuple at about the same time would OR the same bits
+into the field, so there is little or no risk of conflicting update; what's
+more, if there did manage to be a conflict it would merely mean that one
+bit-update would be lost and need to be done again later.  These four bits are
+only hints (they cache the results of transaction status lookups in pg_xact),
+so no great harm is done if they get reset to zero by conflicting updates.
+Note, however, that a tuple is frozen by setting both HEAP_XMIN_INVALID and
+HEAP_XMIN_COMMITTED; this is a critical update and accordingly requires an
+exclusive buffer lock (and it must also be WAL-logged).
 
 5. To physically remove a tuple or compact free space on a page, one
 must hold a pin and an exclusive lock, *and* observe while holding the
@@ -80,7 +91,6 @@ buffer (increment the refcount) while one is performing the cleanup, but
 it won't be able to actually examine the page until it acquires shared
 or exclusive content lock.
 
-
 Obtaining the lock needed under rule #5 is done by the bufmgr routines
 LockBufferForCleanup() or ConditionalLockBufferForCleanup().  They first get
 an exclusive lock and then check to see if the shared pin count is currently
@@ -96,6 +106,10 @@ VACUUM's use, since we don't allow multiple VACUUMs concurrently on a single
 relation anyway.  Anyone wishing to obtain a cleanup lock outside of recovery
 or a VACUUM must use the conditional variant of the function.
 
+6. To write out a buffer, a share-exclusive lock needs to be held. This
+prevents the buffer from being modified while written out, which could corrupt
+checksums and cause issues on the OS or device level when direct-IO is used.
+
 
 Buffer Manager's Internal Locking
 ---------------------------------
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 7241477cac0..3b32b4e0ab1 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -2480,9 +2480,8 @@ again:
 	/*
 	 * If the buffer was dirty, try to write it out.  There is a race
 	 * condition here, in that someone might dirty it after we released the
-	 * buffer header lock above, or even while we are writing it out (since
-	 * our share-lock won't prevent hint-bit updates).  We will recheck the
-	 * dirty bit after re-locking the buffer header.
+	 * buffer header lock above.  We will recheck the dirty bit after
+	 * re-locking the buffer header.
 	 */
 	if (buf_state & BM_DIRTY)
 	{
@@ -2490,20 +2489,20 @@ again:
 		Assert(buf_state & BM_VALID);
 
 		/*
-		 * We need a share-lock on the buffer contents to write it out (else
-		 * we might write invalid data, eg because someone else is compacting
-		 * the page contents while we write).  We must use a conditional lock
-		 * acquisition here to avoid deadlock.  Even though the buffer was not
-		 * pinned (and therefore surely not locked) when StrategyGetBuffer
-		 * returned it, someone else could have pinned and exclusive-locked it
-		 * by the time we get here. If we try to get the lock unconditionally,
-		 * we'd block waiting for them; if they later block waiting for us,
-		 * deadlock ensues. (This has been observed to happen when two
-		 * backends are both trying to split btree index pages, and the second
-		 * one just happens to be trying to split the page the first one got
-		 * from StrategyGetBuffer.)
+		 * We need a share-exclusive lock on the buffer contents to write it
+		 * out (else we might write invalid data, eg because someone else is
+		 * compacting the page contents while we write).  We must use a
+		 * conditional lock acquisition here to avoid deadlock.  Even though
+		 * the buffer was not pinned (and therefore surely not locked) when
+		 * StrategyGetBuffer returned it, someone else could have pinned and
+		 * (share-)exclusive-locked it by the time we get here. If we try to
+		 * get the lock unconditionally, we'd block waiting for them; if they
+		 * later block waiting for us, deadlock ensues. (This has been
+		 * observed to happen when two backends are both trying to split btree
+		 * index pages, and the second one just happens to be trying to split
+		 * the page the first one got from StrategyGetBuffer.)
 		 */
-		if (!BufferLockConditional(buf, buf_hdr, BUFFER_LOCK_SHARE))
+		if (!BufferLockConditional(buf, buf_hdr, BUFFER_LOCK_SHARE_EXCLUSIVE))
 		{
 			/*
 			 * Someone else has locked the buffer, so give it up and loop back
@@ -2516,18 +2515,21 @@ again:
 		/*
 		 * If using a nondefault strategy, and writing the buffer would
 		 * require a WAL flush, let the strategy decide whether to go ahead
-		 * and write/reuse the buffer or to choose another victim.  We need a
-		 * lock to inspect the page LSN, so this can't be done inside
+		 * and write/reuse the buffer or to choose another victim.  We need to
+		 * hold the content lock in at least share-exclusive mode to safely
+		 * inspect the page LSN, so this couldn't have been done inside
 		 * StrategyGetBuffer.
 		 */
 		if (strategy != NULL)
 		{
 			XLogRecPtr	lsn;
 
-			/* Read the LSN while holding buffer header lock */
-			buf_state = LockBufHdr(buf_hdr);
+			/*
+			 * As we now hold at least a share-exclusive lock on the buffer,
+			 * the LSN cannot change during the flush (and thus can't be
+			 * torn).
+			 */
 			lsn = BufferGetLSN(buf_hdr);
-			UnlockBufHdr(buf_hdr);
 
 			if (XLogNeedsFlush(lsn)
 				&& StrategyRejectBuffer(strategy, buf_hdr, from_ring))
@@ -3017,7 +3019,7 @@ BufferIsLockedByMeInMode(Buffer buffer, BufferLockMode mode)
  *
  *		Checks if buffer is already dirty.
  *
- * Buffer must be pinned and exclusive-locked.  (Without an exclusive lock,
+ * Buffer must be pinned and [share-]exclusive-locked.  (Without such a lock,
  * the result may be stale before it's returned.)
  */
 bool
@@ -3037,7 +3039,8 @@ BufferIsDirty(Buffer buffer)
 	else
 	{
 		bufHdr = GetBufferDescriptor(buffer - 1);
-		Assert(BufferIsLockedByMeInMode(buffer, BUFFER_LOCK_EXCLUSIVE));
+		Assert(BufferIsLockedByMeInMode(buffer, BUFFER_LOCK_SHARE_EXCLUSIVE) ||
+			   BufferIsLockedByMeInMode(buffer, BUFFER_LOCK_EXCLUSIVE));
 	}
 
 	return pg_atomic_read_u64(&bufHdr->state) & BM_DIRTY;
@@ -4072,8 +4075,8 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context)
 	}
 
 	/*
-	 * Pin it, share-lock it, write it.  (FlushBuffer will do nothing if the
-	 * buffer is clean by the time we've locked it.)
+	 * Pin it, share-exclusive-lock it, write it.  (FlushBuffer will do
+	 * nothing if the buffer is clean by the time we've locked it.)
 	 */
 	PinBuffer_Locked(bufHdr);
 
@@ -4403,11 +4406,8 @@ BufferGetTag(Buffer buffer, RelFileLocator *rlocator, ForkNumber *forknum,
  * However, we will need to force the changes to disk via fsync before
  * we can checkpoint WAL.
  *
- * The caller must hold a pin on the buffer and have share-locked the
- * buffer contents.  (Note: a share-lock does not prevent updates of
- * hint bits in the buffer, so the page could change while the write
- * is in progress, but we assume that that will not invalidate the data
- * written.)
+ * The caller must hold a pin on the buffer and have
+ * (share-)exclusively-locked the buffer contents.
  *
  * If the caller has an smgr reference for the buffer's relation, pass it
  * as the second parameter.  If not, pass NULL.
@@ -4423,6 +4423,9 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	char	   *bufToWrite;
 	uint64		buf_state;
 
+	Assert(BufferLockHeldByMeInMode(buf, BUFFER_LOCK_EXCLUSIVE) ||
+		   BufferLockHeldByMeInMode(buf, BUFFER_LOCK_SHARE_EXCLUSIVE));
+
 	/*
 	 * Try to start an I/O operation.  If StartBufferIO returns false, then
 	 * someone else flushed the buffer before we could, so we need not do
@@ -4450,8 +4453,8 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
 	buf_state = LockBufHdr(buf);
 
 	/*
-	 * Run PageGetLSN while holding header lock, since we don't have the
-	 * buffer locked exclusively in all cases.
+	 * As we hold at least a share-exclusive lock on the buffer, the LSN
+	 * cannot change during the flush (and thus can't be torn).
 	 */
 	recptr = BufferGetLSN(buf);
 
@@ -4555,7 +4558,7 @@ FlushUnlockedBuffer(BufferDesc *buf, SMgrRelation reln,
 {
 	Buffer		buffer = BufferDescriptorGetBuffer(buf);
 
-	BufferLockAcquire(buffer, buf, BUFFER_LOCK_SHARE);
+	BufferLockAcquire(buffer, buf, BUFFER_LOCK_SHARE_EXCLUSIVE);
 	FlushBuffer(buf, reln, IOOBJECT_RELATION, IOCONTEXT_NORMAL);
 	BufferLockUnlock(buffer, buf);
 }
@@ -4627,8 +4630,9 @@ BufferIsPermanent(Buffer buffer)
 /*
  * BufferGetLSNAtomic
  *		Retrieves the LSN of the buffer atomically using a buffer header lock.
- *		This is necessary for some callers who may not have an exclusive lock
- *		on the buffer.
+ *		This is necessary for some callers who may only hold a share lock on
+ *		the buffer. A share lock allows a concurrent backend to set hint bits
+ *		on the page, which in turn may require a WAL record to be emitted.
  */
 XLogRecPtr
 BufferGetLSNAtomic(Buffer buffer)
@@ -5474,8 +5478,8 @@ FlushDatabaseBuffers(Oid dbid)
 }
 
 /*
- * Flush a previously, shared or exclusively, locked and pinned buffer to the
- * OS.
+ * Flush a previously, share-exclusively or exclusively, locked and pinned
+ * buffer to the OS.
  */
 void
 FlushOneBuffer(Buffer buffer)
@@ -5548,56 +5552,38 @@ IncrBufferRefCount(Buffer buffer)
 }
 
 /*
- * MarkBufferDirtyHint
+ * Shared-buffer only helper for MarkBufferDirtyHint() and
+ * BufferSetHintBits16().
  *
- *	Mark a buffer dirty for non-critical changes.
- *
- * This is essentially the same as MarkBufferDirty, except:
- *
- * 1. The caller does not write WAL; so if checksums are enabled, we may need
- *	  to write an XLOG_FPI_FOR_HINT WAL record to protect against torn pages.
- * 2. The caller might have only share-lock instead of exclusive-lock on the
- *	  buffer's content lock.
- * 3. This function does not guarantee that the buffer is always marked dirty
- *	  (due to a race condition), so it cannot be used for important changes.
+ * This is separated out because it turns out that the repeated checks for
+ * local buffers, repeated GetBufferDescriptor() and repeated reading of the
+ * buffer's state sufficiently hurts the performance of BufferSetHintBits16().
  */
-void
-MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
+static inline void
+MarkSharedBufferDirtyHint(Buffer buffer, BufferDesc *bufHdr, uint64 lockstate,
+						  bool buffer_std)
 {
-	BufferDesc *bufHdr;
 	Page		page = BufferGetPage(buffer);
 
-	if (!BufferIsValid(buffer))
-		elog(ERROR, "bad buffer ID: %d", buffer);
-
-	if (BufferIsLocal(buffer))
-	{
-		MarkLocalBufferDirty(buffer);
-		return;
-	}
-
-	bufHdr = GetBufferDescriptor(buffer - 1);
-
 	Assert(GetPrivateRefCount(buffer) > 0);
-	/* here, either share or exclusive lock is OK */
-	Assert(BufferIsLockedByMe(buffer));
+
+	/* here, either share-exclusive or exclusive lock is OK */
+	Assert(BufferLockHeldByMeInMode(bufHdr, BUFFER_LOCK_EXCLUSIVE) ||
+		   BufferLockHeldByMeInMode(bufHdr, BUFFER_LOCK_SHARE_EXCLUSIVE));
 
 	/*
 	 * This routine might get called many times on the same page, if we are
 	 * making the first scan after commit of an xact that added/deleted many
-	 * tuples. So, be as quick as we can if the buffer is already dirty.  We
-	 * do this by not acquiring spinlock if it looks like the status bits are
-	 * already set.  Since we make this test unlocked, there's a chance we
-	 * might fail to notice that the flags have just been cleared, and failed
-	 * to reset them, due to memory-ordering issues.  But since this function
-	 * is only intended to be used in cases where failing to write out the
-	 * data would be harmless anyway, it doesn't really matter.
+	 * tuples. So, be as quick as we can if the buffer is already dirty.
+	 *
+	 * As we are holding (at least) a share-exclusive lock, nobody could have
+	 * cleaned or dirtied the page concurrently, so we can just rely on the
+	 * previously fetched value here without any danger of races.
 	 */
-	if ((pg_atomic_read_u64(&bufHdr->state) & (BM_DIRTY | BM_JUST_DIRTIED)) !=
-		(BM_DIRTY | BM_JUST_DIRTIED))
+	if (unlikely((lockstate & (BM_DIRTY | BM_JUST_DIRTIED)) !=
+				 (BM_DIRTY | BM_JUST_DIRTIED)))
 	{
 		XLogRecPtr	lsn = InvalidXLogRecPtr;
-		bool		dirtied = false;
 		bool		delayChkptFlags = false;
 		uint64		buf_state;
 
@@ -5610,8 +5596,7 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
 		 * We don't check full_page_writes here because that logic is included
 		 * when we call XLogInsert() since the value changes dynamically.
 		 */
-		if (XLogHintBitIsNeeded() &&
-			(pg_atomic_read_u64(&bufHdr->state) & BM_PERMANENT))
+		if (XLogHintBitIsNeeded() && (lockstate & BM_PERMANENT))
 		{
 			/*
 			 * If we must not write WAL, due to a relfilelocator-specific
@@ -5656,27 +5641,29 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
 
 		buf_state = LockBufHdr(bufHdr);
 
+		/*
+		 * It should not be possible for the buffer to already be dirty, see
+		 * comment above.
+		 */
+		Assert(!(buf_state & BM_DIRTY));
 		Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
 
-		if (!(buf_state & BM_DIRTY))
+		if (XLogRecPtrIsValid(lsn))
 		{
-			dirtied = true;		/* Means "will be dirtied by this action" */
-
 			/*
-			 * Set the page LSN if we wrote a backup block. We aren't supposed
-			 * to set this when only holding a share lock but as long as we
-			 * serialise it somehow we're OK. We choose to set LSN while
-			 * holding the buffer header lock, which causes any reader of an
-			 * LSN who holds only a share lock to also obtain a buffer header
-			 * lock before using PageGetLSN(), which is enforced in
-			 * BufferGetLSNAtomic().
+			 * Set the page LSN if we wrote a backup block. To allow backends
+			 * that only hold a share lock on the buffer to read the LSN in a
+			 * tear-free manner, we set the page LSN while holding the buffer
+			 * header lock. This allows any reader of an LSN who holds only a
+			 * share lock to also obtain a buffer header lock before using
+			 * PageGetLSN() to read the LSN in a tear free way. This is done
+			 * in BufferGetLSNAtomic().
 			 *
 			 * If checksums are enabled, you might think we should reset the
 			 * checksum here. That will happen when the page is written
 			 * sometime later in this checkpoint cycle.
 			 */
-			if (XLogRecPtrIsValid(lsn))
-				PageSetLSN(page, lsn);
+			PageSetLSN(page, lsn);
 		}
 
 		UnlockBufHdrExt(bufHdr, buf_state,
@@ -5686,15 +5673,48 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
 		if (delayChkptFlags)
 			MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
 
-		if (dirtied)
-		{
-			pgBufferUsage.shared_blks_dirtied++;
-			if (VacuumCostActive)
-				VacuumCostBalance += VacuumCostPageDirty;
-		}
+		pgBufferUsage.shared_blks_dirtied++;
+		if (VacuumCostActive)
+			VacuumCostBalance += VacuumCostPageDirty;
 	}
 }
 
+/*
+ * MarkBufferDirtyHint
+ *
+ *	Mark a buffer dirty for non-critical changes.
+ *
+ * This is essentially the same as MarkBufferDirty, except:
+ *
+ * 1. The caller does not write WAL; so if checksums are enabled, we may need
+ *	  to write an XLOG_FPI_FOR_HINT WAL record to protect against torn pages.
+ * 2. The caller might have only a share-exclusive-lock instead of an
+ *	  exclusive-lock on the buffer's content lock.
+ * 3. This function does not guarantee that the buffer is always marked dirty
+ *	  (it e.g. can't always on a hot standby), so it cannot be used for
+ *	  important changes.
+ */
+inline void
+MarkBufferDirtyHint(Buffer buffer, bool buffer_std)
+{
+	BufferDesc *bufHdr;
+
+	bufHdr = GetBufferDescriptor(buffer - 1);
+
+	if (!BufferIsValid(buffer))
+		elog(ERROR, "bad buffer ID: %d", buffer);
+
+	if (BufferIsLocal(buffer))
+	{
+		MarkLocalBufferDirty(buffer);
+		return;
+	}
+
+	MarkSharedBufferDirtyHint(buffer, bufHdr,
+							  pg_atomic_read_u64(&bufHdr->state),
+							  buffer_std);
+}
+
 /*
  * Release buffer content locks for shared buffers.
  *
@@ -6796,6 +6816,187 @@ IsBufferCleanupOK(Buffer buffer)
 	return false;
 }
 
+/*
+ * Helper for BufferBeginSetHintBits() and BufferSetHintBits16().
+ *
+ * This checks if the current lock mode already suffices to allow hint bits
+ * being set and, if not, whether the current lock can be upgraded.
+ *
+ * Updates *lockstate when returning true.
+ */
+static inline bool
+SharedBufferBeginSetHintBits(Buffer buffer, BufferDesc *buf_hdr, uint64 *lockstate)
+{
+	uint64		old_state;
+	PrivateRefCountEntry *ref;
+	BufferLockMode mode;
+
+	ref = GetPrivateRefCountEntry(buffer, true);
+
+	if (ref == NULL)
+		elog(ERROR, "buffer is not pinned");
+
+	mode = ref->data.lockmode;
+	if (mode == BUFFER_LOCK_UNLOCK)
+		elog(ERROR, "buffer is not locked");
+
+	/* we're done if we are already holding a sufficient lock level */
+	if (mode == BUFFER_LOCK_EXCLUSIVE || mode == BUFFER_LOCK_SHARE_EXCLUSIVE)
+	{
+		*lockstate = pg_atomic_read_u64(&buf_hdr->state);
+		return true;
+	}
+
+	/*
+	 * We are only holding a share lock right now, try to upgrade it to
+	 * SHARE_EXCLUSIVE.
+	 */
+	Assert(mode == BUFFER_LOCK_SHARE);
+
+	old_state = pg_atomic_read_u64(&buf_hdr->state);
+	while (true)
+	{
+		uint64		desired_state;
+
+		desired_state = old_state;
+
+		/*
+		 * Can't upgrade if somebody else holds the lock in exclusive or
+		 * share-exclusive mode.
+		 */
+		if (unlikely((old_state & (BM_LOCK_VAL_EXCLUSIVE | BM_LOCK_VAL_SHARE_EXCLUSIVE)) != 0))
+		{
+			return false;
+		}
+
+		/* currently held lock state */
+		desired_state -= BM_LOCK_VAL_SHARED;
+
+		/* new lock level */
+		desired_state += BM_LOCK_VAL_SHARE_EXCLUSIVE;
+
+		if (likely(pg_atomic_compare_exchange_u64(&buf_hdr->state,
+												  &old_state, desired_state)))
+		{
+			ref->data.lockmode = BUFFER_LOCK_SHARE_EXCLUSIVE;
+			*lockstate = desired_state;
+
+			return true;
+		}
+	}
+}
+
+/*
+ * Try to acquire the right to set hint bits on the buffer.
+ *
+ * To be allowed to set hint bits, this backend needs to hold either a
+ * share-exclusive or an exclusive lock. In case this backend only holds a
+ * share lock, this function will try to upgrade the lock to
+ * share-exclusive. The caller is only allowed to set hint bits if true is
+ * returned.
+ *
+ * Once BufferBeginSetHintBits() has returned true, hint bits may be set
+ * without further calls to BufferBeginSetHintBits(), until the buffer is
+ * unlocked.
+ *
+ *
+ * Requiring a share-exclusive lock to set hint bits prevents setting hint
+ * bits on buffers that are currently being written out, which could corrupt
+ * the checksum on the page. Flushing buffers also requires a share-exclusive
+ * lock.
+ *
+ * Due to a lock >= share-exclusive being required to set hint bits, only one
+ * backend can set hint bits at a time. Allowing multiple backends to set hint
+ * bits would require more complicated locking: For setting hint bits we'd
+ * need to store the count of backends currently setting hint bits, for I/O we
+ * would need another lock-level conflicting with the hint-setting
+ * lock-level. Given that the share-exclusive lock for setting hint bits is
+ * only held for a short time, that backends often would just set the same
+ * hint bits and that the cost of occasionally not setting hint bits in hotly
+ * accessed pages is fairly low, this seems like an acceptable tradeoff.
+ */
+bool
+BufferBeginSetHintBits(Buffer buffer)
+{
+	BufferDesc *buf_hdr;
+	uint64		lockstate;
+
+	if (BufferIsLocal(buffer))
+	{
+		/*
+		 * NB: Will need to check if there is a write in progress, once it is
+		 * possible for writes to be done asynchronously.
+		 */
+		return true;
+	}
+
+	buf_hdr = GetBufferDescriptor(buffer - 1);
+
+	return SharedBufferBeginSetHintBits(buffer, buf_hdr, &lockstate);
+}
+
+/*
+ * End a phase of setting hint bits on this buffer, started with
+ * BufferBeginSetHintBits().
+ *
+ * This would strictly speaking not be required (i.e. the caller could do
+ * MarkBufferDirtyHint() if so desired), but allows us to perform some sanity
+ * checks.
+ */
+void
+BufferFinishSetHintBits(Buffer buffer, bool mark_dirty, bool buffer_std)
+{
+	if (!BufferIsLocal(buffer))
+		Assert(BufferIsLockedByMeInMode(buffer, BUFFER_LOCK_SHARE_EXCLUSIVE) ||
+			   BufferIsLockedByMeInMode(buffer, BUFFER_LOCK_EXCLUSIVE));
+
+	if (mark_dirty)
+		MarkBufferDirtyHint(buffer, buffer_std);
+}
+
+/*
+ * Try to set a single hint bit in a buffer.
+ *
+ * This is a bit faster than BufferBeginSetHintBits() /
+ * BufferFinishSetHintBits() when setting a single hint bit, but slower than
+ * the former when setting several hint bits.
+ */
+bool
+BufferSetHintBits16(uint16 *ptr, uint16 val, Buffer buffer)
+{
+	BufferDesc *buf_hdr;
+	uint64		lockstate;
+#ifdef USE_ASSERT_CHECKING
+	char	   *page;
+
+	/* verify that the address is on the page */
+	page = BufferGetPage(buffer);
+	Assert((char *) ptr >= page && (char *) ptr < (page + BLCKSZ));
+#endif
+
+	if (BufferIsLocal(buffer))
+	{
+		*ptr = val;
+
+		MarkLocalBufferDirty(buffer);
+
+		return true;
+	}
+
+	buf_hdr = GetBufferDescriptor(buffer - 1);
+
+	if (SharedBufferBeginSetHintBits(buffer, buf_hdr, &lockstate))
+	{
+		*ptr = val;
+
+		MarkSharedBufferDirtyHint(buffer, buf_hdr, lockstate, true);
+
+		return true;
+	}
+
+	return false;
+}
+
 
 /*
  *	Functions for buffer I/O handling
diff --git a/src/backend/storage/freespace/freespace.c b/src/backend/storage/freespace/freespace.c
index ad337c00871..b9a8f368a63 100644
--- a/src/backend/storage/freespace/freespace.c
+++ b/src/backend/storage/freespace/freespace.c
@@ -904,13 +904,17 @@ fsm_vacuum_page(Relation rel, FSMAddress addr,
 	max_avail = fsm_get_max_avail(page);
 
 	/*
-	 * Reset the next slot pointer. This encourages the use of low-numbered
-	 * pages, increasing the chances that a later vacuum can truncate the
-	 * relation. We don't bother with marking the page dirty if it wasn't
-	 * already, since this is just a hint.
+	 * Try to reset the next slot pointer. This encourages the use of
+	 * low-numbered pages, increasing the chances that a later vacuum can
+	 * truncate the relation. We don't bother with marking the page dirty if
+	 * it wasn't already, since this is just a hint.
 	 */
 	LockBuffer(buf, BUFFER_LOCK_SHARE);
-	((FSMPage) PageGetContents(page))->fp_next_slot = 0;
+	if (BufferBeginSetHintBits(buf))
+	{
+		((FSMPage) PageGetContents(page))->fp_next_slot = 0;
+		BufferFinishSetHintBits(buf, false, false);
+	}
 	LockBuffer(buf, BUFFER_LOCK_UNLOCK);
 
 	ReleaseBuffer(buf);
diff --git a/src/backend/storage/freespace/fsmpage.c b/src/backend/storage/freespace/fsmpage.c
index 33ee825529c..a2657c4033b 100644
--- a/src/backend/storage/freespace/fsmpage.c
+++ b/src/backend/storage/freespace/fsmpage.c
@@ -298,9 +298,18 @@ restart:
 	 * lock and get a garbled next pointer every now and then, than take the
 	 * concurrency hit of an exclusive lock.
 	 *
+	 * Without an exclusive lock, we need to use the hint bit infrastructure
+	 * to be allowed to modify the page.
+	 *
 	 * Wrap-around is handled at the beginning of this function.
 	 */
-	fsmpage->fp_next_slot = slot + (advancenext ? 1 : 0);
+	if (exclusive_lock_held || BufferBeginSetHintBits(buf))
+	{
+		fsmpage->fp_next_slot = slot + (advancenext ? 1 : 0);
+
+		if (!exclusive_lock_held)
+			BufferFinishSetHintBits(buf, false, false);
+	}
 
 	return slot;
 }
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 9f5ee8fd482..3a42feacdc1 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2757,6 +2757,7 @@ SetConstraintStateData
 SetConstraintTriggerData
 SetExprState
 SetFunctionReturnMode
+SetHintBitsState
 SetOp
 SetOpCmd
 SetOpPath
-- 
2.48.1.76.g4e746b1a31.dirty

