From 7e4a1c30a70a1c60fcf6cd505bf0c17463e7342b Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Sun, 23 Oct 2022 14:29:57 -0700
Subject: [PATCH v1 06/12] bufmgr: Acquire and clean victim buffer separately

Previously we held buffer locks for two buffer mapping partitions at the same
time to change the identity of buffers. Particularly for extending relations
needing to hold the extension lock while acquiring a victim buffer is
painful. By separating out the victim buffer acquisition, future commits will
be able to change relation extensions to scale better.

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/backend/storage/buffer/bufmgr.c   | 570 ++++++++++++++------------
 src/backend/storage/buffer/localbuf.c | 115 +++---
 2 files changed, 381 insertions(+), 304 deletions(-)

diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index a7342388a41..50550a8a70e 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -482,6 +482,7 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr,
 							   BlockNumber blockNum,
 							   BufferAccessStrategy strategy,
 							   bool *foundPtr);
+static Buffer GetVictimBuffer(BufferAccessStrategy strategy);
 static void FlushBuffer(BufferDesc *buf, SMgrRelation reln);
 static void FindAndDropRelationBuffers(RelFileLocator rlocator,
 									   ForkNumber forkNum,
@@ -1111,14 +1112,11 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 	BufferTag	newTag;			/* identity of requested block */
 	uint32		newHash;		/* hash value for newTag */
 	LWLock	   *newPartitionLock;	/* buffer partition lock for it */
-	BufferTag	oldTag;			/* previous identity of selected buffer */
-	uint32		oldHash;		/* hash value for oldTag */
-	LWLock	   *oldPartitionLock;	/* buffer partition lock for it */
-	uint32		oldFlags;
-	int			buf_id;
-	BufferDesc *buf;
-	bool		valid;
-	uint32		buf_state;
+	int			existing_buf_id;
+
+	Buffer		victim_buffer;
+	BufferDesc *victim_buf_hdr;
+	uint32		victim_buf_state;
 
 	/* create a tag so we can lookup the buffer */
 	InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
@@ -1129,15 +1127,18 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 
 	/* see if the block is in the buffer pool already */
 	LWLockAcquire(newPartitionLock, LW_SHARED);
-	buf_id = BufTableLookup(&newTag, newHash);
-	if (buf_id >= 0)
+	existing_buf_id = BufTableLookup(&newTag, newHash);
+	if (existing_buf_id >= 0)
 	{
+		BufferDesc *buf;
+		bool		valid;
+
 		/*
 		 * Found it.  Now, pin the buffer so no one can steal it from the
 		 * buffer pool, and check to see if the correct data has been loaded
 		 * into the buffer.
 		 */
-		buf = GetBufferDescriptor(buf_id);
+		buf = GetBufferDescriptor(existing_buf_id);
 
 		valid = PinBuffer(buf, strategy);
 
@@ -1174,266 +1175,96 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 	 */
 	LWLockRelease(newPartitionLock);
 
-	/* Loop here in case we have to try another victim buffer */
-	for (;;)
+	/*
+	 * Acquire a victim buffer. Somebody else might try to do the same, we
+	 * don't hold any conflicting locks. If so we'll have to undo our work
+	 * later.
+	 */
+	victim_buffer = GetVictimBuffer(strategy);
+	victim_buf_hdr = GetBufferDescriptor(victim_buffer - 1);
+
+	/*
+	 * Try to make a hashtable entry for the buffer under its new tag. If
+	 * somebody else inserted another buffer for the tag, we'll release the
+	 * victim buffer we acquired and use the already inserted one.
+	 */
+	LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
+	existing_buf_id = BufTableInsert(&newTag, newHash, victim_buf_hdr->buf_id);
+	if (existing_buf_id >= 0)
 	{
-		/*
-		 * Ensure, while the spinlock's not yet held, that there's a free
-		 * refcount entry.
-		 */
-		ReservePrivateRefCountEntry();
+		BufferDesc *existing_buf_hdr;
+		bool		valid;
 
 		/*
-		 * Select a victim buffer.  The buffer is returned with its header
-		 * spinlock still held!
+		 * Got a collision. Someone has already done what we were about to
+		 * do. We'll just handle this as if it were found in the buffer pool
+		 * in the first place.  First, give up the buffer we were planning to
+		 * use.
+		 *
+		 * We could do this after releasing the partition lock, but then we'd
+		 * have to call ResourceOwnerEnlargeBuffers() &
+		 * ReservePrivateRefCountEntry() before acquiring the lock, for the
+		 * rare case of such a collision.
 		 */
-		buf = StrategyGetBuffer(strategy, &buf_state);
+		UnpinBuffer(victim_buf_hdr);
 
-		Assert(BUF_STATE_GET_REFCOUNT(buf_state) == 0);
+		/* FIXME: Should we put the victim buffer onto the freelist? */
 
-		/* Must copy buffer flags while we still hold the spinlock */
-		oldFlags = buf_state & BUF_FLAG_MASK;
+		/* remaining code should match code at top of routine */
 
-		/* Pin the buffer and then release the buffer spinlock */
-		PinBuffer_Locked(buf);
+		existing_buf_hdr = GetBufferDescriptor(existing_buf_id);
 
-		/*
-		 * If the buffer was dirty, try to write it out.  There is a race
-		 * condition here, in that someone might dirty it after we released it
-		 * above, or even while we are writing it out (since our share-lock
-		 * won't prevent hint-bit updates).  We will recheck the dirty bit
-		 * after re-locking the buffer header.
-		 */
-		if (oldFlags & BM_DIRTY)
-		{
-			/*
-			 * We need a share-lock on the buffer contents to write it out
-			 * (else we might write invalid data, eg because someone else is
-			 * compacting the page contents while we write).  We must use a
-			 * conditional lock acquisition here to avoid deadlock.  Even
-			 * though the buffer was not pinned (and therefore surely not
-			 * locked) when StrategyGetBuffer returned it, someone else could
-			 * have pinned and exclusive-locked it by the time we get here. If
-			 * we try to get the lock unconditionally, we'd block waiting for
-			 * them; if they later block waiting for us, deadlock ensues.
-			 * (This has been observed to happen when two backends are both
-			 * trying to split btree index pages, and the second one just
-			 * happens to be trying to split the page the first one got from
-			 * StrategyGetBuffer.)
-			 */
-			if (LWLockConditionalAcquire(BufferDescriptorGetContentLock(buf),
-										 LW_SHARED))
-			{
-				/*
-				 * If using a nondefault strategy, and writing the buffer
-				 * would require a WAL flush, let the strategy decide whether
-				 * to go ahead and write/reuse the buffer or to choose another
-				 * victim.  We need lock to inspect the page LSN, so this
-				 * can't be done inside StrategyGetBuffer.
-				 */
-				if (strategy != NULL)
-				{
-					XLogRecPtr	lsn;
+		valid = PinBuffer(existing_buf_hdr, strategy);
 
-					/* Read the LSN while holding buffer header lock */
-					buf_state = LockBufHdr(buf);
-					lsn = BufferGetLSN(buf);
-					UnlockBufHdr(buf, buf_state);
-
-					if (XLogNeedsFlush(lsn) &&
-						StrategyRejectBuffer(strategy, buf))
-					{
-						/* Drop lock/pin and loop around for another buffer */
-						LWLockRelease(BufferDescriptorGetContentLock(buf));
-						UnpinBuffer(buf);
-						continue;
-					}
-				}
-
-				/* OK, do the I/O */
-				TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_START(forkNum, blockNum,
-														  smgr->smgr_rlocator.locator.spcOid,
-														  smgr->smgr_rlocator.locator.dbOid,
-														  smgr->smgr_rlocator.locator.relNumber);
-
-				FlushBuffer(buf, NULL);
-				LWLockRelease(BufferDescriptorGetContentLock(buf));
-
-				ScheduleBufferTagForWriteback(&BackendWritebackContext,
-											  &buf->tag);
-
-				TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_DONE(forkNum, blockNum,
-														 smgr->smgr_rlocator.locator.spcOid,
-														 smgr->smgr_rlocator.locator.dbOid,
-														 smgr->smgr_rlocator.locator.relNumber);
-			}
-			else
-			{
-				/*
-				 * Someone else has locked the buffer, so give it up and loop
-				 * back to get another one.
-				 */
-				UnpinBuffer(buf);
-				continue;
-			}
-		}
-
-		/*
-		 * To change the association of a valid buffer, we'll need to have
-		 * exclusive lock on both the old and new mapping partitions.
-		 */
-		if (oldFlags & BM_TAG_VALID)
-		{
-			/*
-			 * Need to compute the old tag's hashcode and partition lock ID.
-			 * XXX is it worth storing the hashcode in BufferDesc so we need
-			 * not recompute it here?  Probably not.
-			 */
-			oldTag = buf->tag;
-			oldHash = BufTableHashCode(&oldTag);
-			oldPartitionLock = BufMappingPartitionLock(oldHash);
-
-			/*
-			 * Must lock the lower-numbered partition first to avoid
-			 * deadlocks.
-			 */
-			if (oldPartitionLock < newPartitionLock)
-			{
-				LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
-				LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
-			}
-			else if (oldPartitionLock > newPartitionLock)
-			{
-				LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
-				LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE);
-			}
-			else
-			{
-				/* only one partition, only one lock */
-				LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
-			}
-		}
-		else
-		{
-			/* if it wasn't valid, we need only the new partition */
-			LWLockAcquire(newPartitionLock, LW_EXCLUSIVE);
-			/* remember we have no old-partition lock or tag */
-			oldPartitionLock = NULL;
-			/* keep the compiler quiet about uninitialized variables */
-			oldHash = 0;
-		}
-
-		/*
-		 * Try to make a hashtable entry for the buffer under its new tag.
-		 * This could fail because while we were writing someone else
-		 * allocated another buffer for the same block we want to read in.
-		 * Note that we have not yet removed the hashtable entry for the old
-		 * tag.
-		 */
-		buf_id = BufTableInsert(&newTag, newHash, buf->buf_id);
-
-		if (buf_id >= 0)
-		{
-			/*
-			 * Got a collision. Someone has already done what we were about to
-			 * do. We'll just handle this as if it were found in the buffer
-			 * pool in the first place.  First, give up the buffer we were
-			 * planning to use.
-			 */
-			UnpinBuffer(buf);
-
-			/* Can give up that buffer's mapping partition lock now */
-			if (oldPartitionLock != NULL &&
-				oldPartitionLock != newPartitionLock)
-				LWLockRelease(oldPartitionLock);
-
-			/* remaining code should match code at top of routine */
-
-			buf = GetBufferDescriptor(buf_id);
-
-			valid = PinBuffer(buf, strategy);
-
-			/* Can release the mapping lock as soon as we've pinned it */
-			LWLockRelease(newPartitionLock);
-
-			*foundPtr = true;
-
-			if (!valid)
-			{
-				/*
-				 * We can only get here if (a) someone else is still reading
-				 * in the page, or (b) a previous read attempt failed.  We
-				 * have to wait for any active read attempt to finish, and
-				 * then set up our own read attempt if the page is still not
-				 * BM_VALID.  StartBufferIO does it all.
-				 */
-				if (StartBufferIO(buf, true))
-				{
-					/*
-					 * If we get here, previous attempts to read the buffer
-					 * must have failed ... but we shall bravely try again.
-					 */
-					*foundPtr = false;
-				}
-			}
-
-			return buf;
-		}
-
-		/*
-		 * Need to lock the buffer header too in order to change its tag.
-		 */
-		buf_state = LockBufHdr(buf);
-
-		/*
-		 * Somebody could have pinned or re-dirtied the buffer while we were
-		 * doing the I/O and making the new hashtable entry.  If so, we can't
-		 * recycle this buffer; we must undo everything we've done and start
-		 * over with a new victim buffer.
-		 */
-		oldFlags = buf_state & BUF_FLAG_MASK;
-		if (BUF_STATE_GET_REFCOUNT(buf_state) == 1 && !(oldFlags & BM_DIRTY))
-			break;
-
-		UnlockBufHdr(buf, buf_state);
-		BufTableDelete(&newTag, newHash);
-		if (oldPartitionLock != NULL &&
-			oldPartitionLock != newPartitionLock)
-			LWLockRelease(oldPartitionLock);
+		/* Can release the mapping lock as soon as we've pinned it */
 		LWLockRelease(newPartitionLock);
-		UnpinBuffer(buf);
+
+		*foundPtr = true;
+
+		if (!valid)
+		{
+			/*
+			 * We can only get here if (a) someone else is still reading
+			 * in the page, or (b) a previous read attempt failed.  We
+			 * have to wait for any active read attempt to finish, and
+			 * then set up our own read attempt if the page is still not
+			 * BM_VALID.  StartBufferIO does it all.
+			 */
+			if (StartBufferIO(existing_buf_hdr, true))
+			{
+				/*
+				 * If we get here, previous attempts to read the buffer
+				 * must have failed ... but we shall bravely try again.
+				 */
+				*foundPtr = false;
+			}
+		}
+
+		return existing_buf_hdr;
 	}
 
 	/*
-	 * Okay, it's finally safe to rename the buffer.
-	 *
-	 * Clearing BM_VALID here is necessary, clearing the dirtybits is just
-	 * paranoia.  We also reset the usage_count since any recency of use of
-	 * the old content is no longer relevant.  (The usage_count starts out at
-	 * 1 so that the buffer can survive one clock-sweep pass.)
-	 *
+	 * Need to lock the buffer header too in order to change its tag.
+	 */
+	victim_buf_state = LockBufHdr(victim_buf_hdr);
+
+	/* some sanity checks while we hold the buffer header lock */
+	Assert(BUF_STATE_GET_REFCOUNT(victim_buf_state) == 1);
+	Assert(!(victim_buf_state & (BM_TAG_VALID | BM_VALID | BM_DIRTY | BM_IO_IN_PROGRESS)));
+
+	victim_buf_hdr->tag = newTag;
+
+	/*
 	 * Make sure BM_PERMANENT is set for buffers that must be written at every
 	 * checkpoint.  Unlogged buffers only need to be written at shutdown
 	 * checkpoints, except for their "init" forks, which need to be treated
 	 * just like permanent relations.
 	 */
-	buf->tag = newTag;
-	buf_state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED |
-				   BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT |
-				   BUF_USAGECOUNT_MASK);
+	victim_buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
 	if (relpersistence == RELPERSISTENCE_PERMANENT || forkNum == INIT_FORKNUM)
-		buf_state |= BM_TAG_VALID | BM_PERMANENT | BUF_USAGECOUNT_ONE;
-	else
-		buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
+		victim_buf_state |= BM_PERMANENT;
 
-	UnlockBufHdr(buf, buf_state);
-
-	if (oldPartitionLock != NULL)
-	{
-		BufTableDelete(&oldTag, oldHash);
-		if (oldPartitionLock != newPartitionLock)
-			LWLockRelease(oldPartitionLock);
-	}
+	UnlockBufHdr(victim_buf_hdr, victim_buf_state);
 
 	LWLockRelease(newPartitionLock);
 
@@ -1443,12 +1274,12 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
 	 * to read it before we did, so there's nothing left for BufferAlloc() to
 	 * do.
 	 */
-	if (StartBufferIO(buf, true))
+	if (StartBufferIO(victim_buf_hdr, true))
 		*foundPtr = false;
 	else
 		*foundPtr = true;
 
-	return buf;
+	return victim_buf_hdr;
 }
 
 /*
@@ -1557,6 +1388,239 @@ retry:
 	StrategyFreeBuffer(buf);
 }
 
+/*
+ * Helper routine for GetVictimBuffer()
+ *
+ * Needs to be called with the buffer pinned, but without the buffer header
+ * spinlock held.
+ *
+ * Returns true if the buffer can be reused, in which case the buffer is only
+ * pinned by this backend and marked as invalid, false otherwise.
+ */
+static bool
+InvalidateVictimBuffer(BufferDesc *buf_hdr)
+{
+	uint32 buf_state = pg_atomic_read_u32(&buf_hdr->state);
+
+	Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
+	Assert(GetPrivateRefCount(BufferDescriptorGetBuffer(buf_hdr)) == 1);
+
+	/* can't change while we're holding the pin */
+	if (buf_state & BM_TAG_VALID)
+	{
+		uint32      hash;
+		LWLock     *partition_lock;
+		BufferTag   tag;
+
+		/* have buffer pinned, so it's safe to read tag without lock */
+		tag = buf_hdr->tag;
+
+		hash = BufTableHashCode(&tag);
+		partition_lock = BufMappingPartitionLock(hash);
+
+		LWLockAcquire(partition_lock, LW_EXCLUSIVE);
+
+		/* lock the buffer header */
+		buf_state = LockBufHdr(buf_hdr);
+
+		Assert(BufferTagsEqual(&buf_hdr->tag, &tag));
+
+		/*
+		 * We have the buffer pinned nobody else should have been able to
+		 * unset this concurrently.
+		 */
+		Assert(buf_state & BM_TAG_VALID);
+		Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
+
+		/*
+		 * If somebody else pinned the buffer since, or even worse, dirtied it,
+		 * give up on this buffer: It's clearly in use.
+		 */
+		if (BUF_STATE_GET_REFCOUNT(buf_state) != 1 || (buf_state & BM_DIRTY))
+		{
+			Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
+
+			UnlockBufHdr(buf_hdr, buf_state);
+			LWLockRelease(partition_lock);
+
+			return false;
+		}
+
+		/*
+		 * Clear out the buffer's tag and flags and usagecount.  We must do
+		 * this to ensure that linear scans of the buffer array don't think
+		 * the buffer is valid.
+		 *
+		 * XXX: This is a pre-existing comment I just moved, but isn't it
+		 * entirely bogus with regard to the tag? We can't do anything with
+		 * the buffer without taking BM_VALID / BM_TAG_VALID into
+		 * account. Likely doesn't matter because we're already dirtying the
+		 * cacheline, but still.
+		 *
+		 */
+		ClearBufferTag(&buf_hdr->tag);
+		buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
+		UnlockBufHdr(buf_hdr, buf_state);
+
+		Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
+
+		BufTableDelete(&tag, hash);
+
+		LWLockRelease(partition_lock);
+	}
+
+	Assert(!(buf_state & (BM_DIRTY | BM_VALID | BM_TAG_VALID)));
+	Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0);
+	Assert(BUF_STATE_GET_REFCOUNT(pg_atomic_read_u32(&buf_hdr->state)) > 0);
+
+	return true;
+}
+
+static Buffer
+GetVictimBuffer(BufferAccessStrategy strategy)
+{
+	Buffer cur_buf;
+	BufferDesc *cur_buf_hdr = NULL;
+	uint32 cur_buf_state;
+
+	/*
+	 * Ensure, while the spinlock's not yet held, that there's a free
+	 * refcount entry.
+	 */
+	ReservePrivateRefCountEntry();
+	ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
+
+again:
+
+	/*
+	 * Select a victim buffer.  The buffer is returned with its header
+	 * spinlock still held!
+	 */
+	cur_buf_hdr = StrategyGetBuffer(strategy, &cur_buf_state);
+	cur_buf = BufferDescriptorGetBuffer(cur_buf_hdr);
+
+	Assert(BUF_STATE_GET_REFCOUNT(cur_buf_state) == 0);
+
+	/* Pin the buffer and then release the buffer spinlock */
+	PinBuffer_Locked(cur_buf_hdr);
+
+	/*
+	 * We shouldn't have any other pins for this buffer.
+	 */
+	BufferCheckOneLocalPin(cur_buf);
+
+	/*
+	 * If the buffer was dirty, try to write it out.  There is a race
+	 * condition here, in that someone might dirty it after we released the
+	 * buffer header lock above, or even while we are writing it out (since
+	 * our share-lock won't prevent hint-bit updates).  We will recheck the
+	 * dirty bit after re-locking the buffer header.
+	 */
+	if (cur_buf_state & BM_DIRTY)
+	{
+		LWLock *content_lock;
+
+		Assert(cur_buf_state & BM_TAG_VALID);
+		Assert(cur_buf_state & BM_VALID);
+
+		/*
+		 * We need a share-lock on the buffer contents to write it out
+		 * (else we might write invalid data, eg because someone else is
+		 * compacting the page contents while we write).  We must use a
+		 * conditional lock acquisition here to avoid deadlock.  Even
+		 * though the buffer was not pinned (and therefore surely not
+		 * locked) when StrategyGetBuffer returned it, someone else could
+		 * have pinned and exclusive-locked it by the time we get here. If
+		 * we try to get the lock unconditionally, we'd block waiting for
+		 * them; if they later block waiting for us, deadlock ensues.
+		 * (This has been observed to happen when two backends are both
+		 * trying to split btree index pages, and the second one just
+		 * happens to be trying to split the page the first one got from
+		 * StrategyGetBuffer.)
+		 */
+		content_lock = BufferDescriptorGetContentLock(cur_buf_hdr);
+		if (!LWLockConditionalAcquire(content_lock, LW_SHARED))
+		{
+			/*
+			 * Someone else has locked the buffer, so give it up and loop
+			 * back to get another one.
+			 */
+			UnpinBuffer(cur_buf_hdr);
+			goto again;
+		}
+
+		/*
+		 * If using a nondefault strategy, and writing the buffer would
+		 * require a WAL flush, let the strategy decide whether to go ahead
+		 * and write/reuse the buffer or to choose another victim.  We need
+		 * lock to inspect the page LSN, so this can't be done inside
+		 * StrategyGetBuffer.
+		 */
+		if (strategy != NULL)
+		{
+			XLogRecPtr	lsn;
+
+			/* Read the LSN while holding buffer header lock */
+			cur_buf_state = LockBufHdr(cur_buf_hdr);
+			lsn = BufferGetLSN(cur_buf_hdr);
+			UnlockBufHdr(cur_buf_hdr, cur_buf_state);
+
+			if (XLogNeedsFlush(lsn)
+				&& StrategyRejectBuffer(strategy, cur_buf_hdr))
+			{
+				LWLockRelease(content_lock);
+				UnpinBuffer(cur_buf_hdr);
+				goto again;
+			}
+		}
+
+		/* OK, do the I/O */
+		/* FIXME: These used the wrong smgr before afaict? */
+		{
+			SMgrRelation smgr = smgropen(BufTagGetRelFileLocator(&cur_buf_hdr->tag),
+										 InvalidBackendId);
+
+			TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_START(cur_buf_hdr->tag.forkNum,
+													  cur_buf_hdr->tag.blockNum,
+													  smgr->smgr_rlocator.locator.spcOid,
+													  smgr->smgr_rlocator.locator.dbOid,
+													  smgr->smgr_rlocator.locator.relNumber);
+
+			FlushBuffer(cur_buf_hdr, smgr);
+			LWLockRelease(content_lock);
+
+			ScheduleBufferTagForWriteback(&BackendWritebackContext,
+										  &cur_buf_hdr->tag);
+
+			TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_DONE(cur_buf_hdr->tag.forkNum,
+													 cur_buf_hdr->tag.blockNum,
+													 smgr->smgr_rlocator.locator.spcOid,
+													 smgr->smgr_rlocator.locator.dbOid,
+													 smgr->smgr_rlocator.locator.relNumber);
+		}
+	}
+
+	/*
+	 * If the buffer has an entry in the buffer mapping table, delete it. This
+	 * can fail because another backend could have pinned or dirtied the
+	 * buffer.
+	 */
+	if (!InvalidateVictimBuffer(cur_buf_hdr))
+	{
+		UnpinBuffer(cur_buf_hdr);
+		goto again;
+	}
+
+	/* a final set of sanity checks */
+	cur_buf_state = pg_atomic_read_u32(&cur_buf_hdr->state);
+
+	Assert(BUF_STATE_GET_REFCOUNT(cur_buf_state) == 1);
+	Assert(!(cur_buf_state & (BM_TAG_VALID | BM_VALID | BM_DIRTY)));
+
+	BufferCheckOneLocalPin(cur_buf);
+
+	return cur_buf;
+}
 /*
  * MarkBufferDirty
  *
diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c
index e6349536a16..6a3f325ea6d 100644
--- a/src/backend/storage/buffer/localbuf.c
+++ b/src/backend/storage/buffer/localbuf.c
@@ -44,13 +44,14 @@ BufferDesc *LocalBufferDescriptors = NULL;
 Block	   *LocalBufferBlockPointers = NULL;
 int32	   *LocalRefCount = NULL;
 
-static int	nextFreeLocalBuf = 0;
+static int	nextFreeLocalBufId = 0;
 
 static HTAB *LocalBufHash = NULL;
 
 
 static void InitLocalBuffers(void);
 static Block GetLocalBufferStorage(void);
+static Buffer GetLocalVictimBuffer(void);
 
 
 /*
@@ -112,10 +113,9 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
 	BufferTag	newTag;			/* identity of requested block */
 	LocalBufferLookupEnt *hresult;
 	BufferDesc *bufHdr;
-	int			b;
-	int			trycounter;
+	Buffer		victim_buffer;
+	int			bufid;
 	bool		found;
-	uint32		buf_state;
 
 	InitBufferTag(&newTag, &smgr->smgr_rlocator.locator, forkNum, blockNum);
 
@@ -129,23 +129,51 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
 
 	if (hresult)
 	{
-		b = hresult->id;
-		bufHdr = GetLocalBufferDescriptor(b);
+		bufid = hresult->id;
+		bufHdr = GetLocalBufferDescriptor(bufid);
 		Assert(BufferTagsEqual(&bufHdr->tag, &newTag));
-#ifdef LBDEBUG
-		fprintf(stderr, "LB ALLOC (%u,%d,%d) %d\n",
-				smgr->smgr_rlocator.locator.relNumber, forkNum, blockNum, -b - 1);
-#endif
 
 		*foundPtr = PinLocalBuffer(bufHdr, true);
-		return bufHdr;
+	}
+	else
+	{
+		uint32		buf_state;
+
+		victim_buffer = GetLocalVictimBuffer();
+		bufid = -(victim_buffer + 1);
+		bufHdr = GetLocalBufferDescriptor(bufid);
+
+		hresult = (LocalBufferLookupEnt *)
+			hash_search(LocalBufHash, (void *) &newTag, HASH_ENTER, &found);
+		if (found)					/* shouldn't happen */
+			elog(ERROR, "local buffer hash table corrupted");
+		hresult->id = bufid;
+
+		/*
+		 * it's all ours now.
+		 */
+		bufHdr->tag = newTag;
+
+		buf_state = pg_atomic_read_u32(&bufHdr->state);
+		buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
+		buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE;
+		pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
+
+		*foundPtr = false;
 	}
 
-#ifdef LBDEBUG
-	fprintf(stderr, "LB ALLOC (%u,%d,%d) %d\n",
-			smgr->smgr_rlocator.locator.relNumber, forkNum, blockNum,
-			-nextFreeLocalBuf - 1);
-#endif
+	return bufHdr;
+}
+
+static Buffer
+GetLocalVictimBuffer(void)
+{
+	int			victim_bufid;
+	int			trycounter;
+	uint32		buf_state;
+	BufferDesc *bufHdr;
+
+	ResourceOwnerEnlargeBuffers(CurrentResourceOwner);
 
 	/*
 	 * Need to get a new buffer.  We use a clock sweep algorithm (essentially
@@ -154,14 +182,14 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
 	trycounter = NLocBuffer;
 	for (;;)
 	{
-		b = nextFreeLocalBuf;
+		victim_bufid = nextFreeLocalBufId;
 
-		if (++nextFreeLocalBuf >= NLocBuffer)
-			nextFreeLocalBuf = 0;
+		if (++nextFreeLocalBufId >= NLocBuffer)
+			nextFreeLocalBufId = 0;
 
-		bufHdr = GetLocalBufferDescriptor(b);
+		bufHdr = GetLocalBufferDescriptor(victim_bufid);
 
-		if (LocalRefCount[b] == 0)
+		if (LocalRefCount[victim_bufid] == 0)
 		{
 			buf_state = pg_atomic_read_u32(&bufHdr->state);
 
@@ -184,6 +212,15 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
 					 errmsg("no empty local buffer available")));
 	}
 
+	/*
+	 * lazy memory allocation: allocate space on first use of a buffer.
+	 */
+	if (LocalBufHdrGetBlock(bufHdr) == NULL)
+	{
+		/* Set pointer for use by BufferGetBlock() macro */
+		LocalBufHdrGetBlock(bufHdr) = GetLocalBufferStorage();
+	}
+
 	/*
 	 * this buffer is not referenced but it might still be dirty. if that's
 	 * the case, write it out before reusing it!
@@ -213,19 +250,12 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
 	}
 
 	/*
-	 * lazy memory allocation: allocate space on first use of a buffer.
-	 */
-	if (LocalBufHdrGetBlock(bufHdr) == NULL)
-	{
-		/* Set pointer for use by BufferGetBlock() macro */
-		LocalBufHdrGetBlock(bufHdr) = GetLocalBufferStorage();
-	}
-
-	/*
-	 * Update the hash table: remove old entry, if any, and make new one.
+	 * Remove the victim buffer from the hashtable and mark as invalid.
 	 */
 	if (buf_state & BM_TAG_VALID)
 	{
+		LocalBufferLookupEnt *hresult;
+
 		hresult = (LocalBufferLookupEnt *)
 			hash_search(LocalBufHash, (void *) &bufHdr->tag,
 						HASH_REMOVE, NULL);
@@ -233,28 +263,11 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum,
 			elog(ERROR, "local buffer hash table corrupted");
 		/* mark buffer invalid just in case hash insert fails */
 		ClearBufferTag(&bufHdr->tag);
-		buf_state &= ~(BM_VALID | BM_TAG_VALID);
+		buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK);
 		pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
 	}
 
-	hresult = (LocalBufferLookupEnt *)
-		hash_search(LocalBufHash, (void *) &newTag, HASH_ENTER, &found);
-	if (found)					/* shouldn't happen */
-		elog(ERROR, "local buffer hash table corrupted");
-	hresult->id = b;
-
-	/*
-	 * it's all ours now.
-	 */
-	bufHdr->tag = newTag;
-	buf_state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_IO_ERROR);
-	buf_state |= BM_TAG_VALID;
-	buf_state &= ~BUF_USAGECOUNT_MASK;
-	buf_state += BUF_USAGECOUNT_ONE;
-	pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
-
-	*foundPtr = false;
-	return bufHdr;
+	return BufferDescriptorGetBuffer(bufHdr);
 }
 
 /*
@@ -423,7 +436,7 @@ InitLocalBuffers(void)
 				(errcode(ERRCODE_OUT_OF_MEMORY),
 				 errmsg("out of memory")));
 
-	nextFreeLocalBuf = 0;
+	nextFreeLocalBufId = 0;
 
 	/* initialize fields that need to start off nonzero */
 	for (i = 0; i < nbufs; i++)
-- 
2.38.0

