From 523335a6ef6b959d4aa1a666d1a76ba133b8b3c2 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Mon, 24 Oct 2022 16:44:16 -0700
Subject: [PATCH v2 06/14] bufmgr: Support multiple in-progress IOs by using
 resowner

---
 src/include/storage/bufmgr.h          |  2 +-
 src/include/utils/resowner_private.h  |  5 ++
 src/backend/access/transam/xact.c     |  4 +-
 src/backend/postmaster/autovacuum.c   |  1 -
 src/backend/postmaster/bgwriter.c     |  1 -
 src/backend/postmaster/checkpointer.c |  1 -
 src/backend/postmaster/walwriter.c    |  1 -
 src/backend/storage/buffer/bufmgr.c   | 86 ++++++++++++---------------
 src/backend/utils/resowner/resowner.c | 60 +++++++++++++++++++
 9 files changed, 105 insertions(+), 56 deletions(-)

diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index 3becf32a3c0..2e1d7540fd0 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -176,7 +176,7 @@ extern bool ConditionalLockBufferForCleanup(Buffer buffer);
 extern bool IsBufferCleanupOK(Buffer buffer);
 extern bool HoldingBufferPinThatDelaysRecovery(void);
 
-extern void AbortBufferIO(void);
+extern void AbortBufferIO(Buffer buffer);
 
 extern void BufmgrCommit(void);
 extern bool BgBufferSync(struct WritebackContext *wb_context);
diff --git a/src/include/utils/resowner_private.h b/src/include/utils/resowner_private.h
index 1b1f3181b54..ae58438ec76 100644
--- a/src/include/utils/resowner_private.h
+++ b/src/include/utils/resowner_private.h
@@ -30,6 +30,11 @@ extern void ResourceOwnerEnlargeBuffers(ResourceOwner owner);
 extern void ResourceOwnerRememberBuffer(ResourceOwner owner, Buffer buffer);
 extern void ResourceOwnerForgetBuffer(ResourceOwner owner, Buffer buffer);
 
+/* support for IO-in-progress management */
+extern void ResourceOwnerEnlargeBufferIOs(ResourceOwner owner);
+extern void ResourceOwnerRememberBufferIO(ResourceOwner owner, Buffer buffer);
+extern void ResourceOwnerForgetBufferIO(ResourceOwner owner, Buffer buffer);
+
 /* support for local lock management */
 extern void ResourceOwnerRememberLock(ResourceOwner owner, LOCALLOCK *locallock);
 extern void ResourceOwnerForgetLock(ResourceOwner owner, LOCALLOCK *locallock);
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index d85e3139082..2a91ed64a7a 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2725,8 +2725,7 @@ AbortTransaction(void)
 	pgstat_report_wait_end();
 	pgstat_progress_end_command();
 
-	/* Clean up buffer I/O and buffer context locks, too */
-	AbortBufferIO();
+	/* Clean up buffer context locks, too */
 	UnlockBuffers();
 
 	/* Reset WAL record construction state */
@@ -5086,7 +5085,6 @@ AbortSubTransaction(void)
 
 	pgstat_report_wait_end();
 	pgstat_progress_end_command();
-	AbortBufferIO();
 	UnlockBuffers();
 
 	/* Reset WAL record construction state */
diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c
index f5ea381c53e..2d5d1855439 100644
--- a/src/backend/postmaster/autovacuum.c
+++ b/src/backend/postmaster/autovacuum.c
@@ -526,7 +526,6 @@ AutoVacLauncherMain(int argc, char *argv[])
 		 */
 		LWLockReleaseAll();
 		pgstat_report_wait_end();
-		AbortBufferIO();
 		UnlockBuffers();
 		/* this is probably dead code, but let's be safe: */
 		if (AuxProcessResourceOwner)
diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c
index 69667f0eb4b..1c4244b6ec5 100644
--- a/src/backend/postmaster/bgwriter.c
+++ b/src/backend/postmaster/bgwriter.c
@@ -167,7 +167,6 @@ BackgroundWriterMain(void)
 		 */
 		LWLockReleaseAll();
 		ConditionVariableCancelSleep();
-		AbortBufferIO();
 		UnlockBuffers();
 		ReleaseAuxProcessResources(false);
 		AtEOXact_Buffers(false);
diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c
index de0bbbfa791..c1ed6737df5 100644
--- a/src/backend/postmaster/checkpointer.c
+++ b/src/backend/postmaster/checkpointer.c
@@ -271,7 +271,6 @@ CheckpointerMain(void)
 		LWLockReleaseAll();
 		ConditionVariableCancelSleep();
 		pgstat_report_wait_end();
-		AbortBufferIO();
 		UnlockBuffers();
 		ReleaseAuxProcessResources(false);
 		AtEOXact_Buffers(false);
diff --git a/src/backend/postmaster/walwriter.c b/src/backend/postmaster/walwriter.c
index 3113e8fbdd5..20e0807c60d 100644
--- a/src/backend/postmaster/walwriter.c
+++ b/src/backend/postmaster/walwriter.c
@@ -163,7 +163,6 @@ WalWriterMain(void)
 		LWLockReleaseAll();
 		ConditionVariableCancelSleep();
 		pgstat_report_wait_end();
-		AbortBufferIO();
 		UnlockBuffers();
 		ReleaseAuxProcessResources(false);
 		AtEOXact_Buffers(false);
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index b9af8a05989..0cdeb644e6e 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -159,10 +159,6 @@ int			checkpoint_flush_after = DEFAULT_CHECKPOINT_FLUSH_AFTER;
 int			bgwriter_flush_after = DEFAULT_BGWRITER_FLUSH_AFTER;
 int			backend_flush_after = DEFAULT_BACKEND_FLUSH_AFTER;
 
-/* local state for StartBufferIO and related functions */
-static BufferDesc *InProgressBuf = NULL;
-static bool IsForInput;
-
 /* local state for LockBufferForCleanup */
 static BufferDesc *PinCountWaitBuf = NULL;
 
@@ -2689,7 +2685,6 @@ InitBufferPoolAccess(void)
 static void
 AtProcExit_Buffers(int code, Datum arg)
 {
-	AbortBufferIO();
 	UnlockBuffers();
 
 	CheckForBufferLeaks();
@@ -4618,7 +4613,7 @@ StartBufferIO(BufferDesc *buf, bool forInput)
 {
 	uint32		buf_state;
 
-	Assert(!InProgressBuf);
+	ResourceOwnerEnlargeBufferIOs(CurrentResourceOwner);
 
 	for (;;)
 	{
@@ -4642,8 +4637,8 @@ StartBufferIO(BufferDesc *buf, bool forInput)
 	buf_state |= BM_IO_IN_PROGRESS;
 	UnlockBufHdr(buf, buf_state);
 
-	InProgressBuf = buf;
-	IsForInput = forInput;
+	ResourceOwnerRememberBufferIO(CurrentResourceOwner,
+								  BufferDescriptorGetBuffer(buf));
 
 	return true;
 }
@@ -4669,8 +4664,6 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits)
 {
 	uint32		buf_state;
 
-	Assert(buf == InProgressBuf);
-
 	buf_state = LockBufHdr(buf);
 
 	Assert(buf_state & BM_IO_IN_PROGRESS);
@@ -4682,13 +4675,14 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits)
 	buf_state |= set_flag_bits;
 	UnlockBufHdr(buf, buf_state);
 
-	InProgressBuf = NULL;
+	ResourceOwnerForgetBufferIO(CurrentResourceOwner,
+								BufferDescriptorGetBuffer(buf));
 
 	ConditionVariableBroadcast(BufferDescriptorGetIOCV(buf));
 }
 
 /*
- * AbortBufferIO: Clean up any active buffer I/O after an error.
+ * AbortBufferIO: Clean up active buffer I/O after an error.
  *
  *	All LWLocks we might have held have been released,
  *	but we haven't yet released buffer pins, so the buffer is still pinned.
@@ -4697,46 +4691,42 @@ TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits)
  *	possible the error condition wasn't related to the I/O.
  */
 void
-AbortBufferIO(void)
+AbortBufferIO(Buffer buf)
 {
-	BufferDesc *buf = InProgressBuf;
+	BufferDesc *buf_hdr = GetBufferDescriptor(buf - 1);
+	uint32		buf_state;
 
-	if (buf)
+	buf_state = LockBufHdr(buf_hdr);
+	Assert(buf_state & (BM_IO_IN_PROGRESS | BM_TAG_VALID));
+
+	if (!(buf_state & BM_VALID))
 	{
-		uint32		buf_state;
-
-		buf_state = LockBufHdr(buf);
-		Assert(buf_state & BM_IO_IN_PROGRESS);
-		if (IsForInput)
-		{
-			Assert(!(buf_state & BM_DIRTY));
-
-			/* We'd better not think buffer is valid yet */
-			Assert(!(buf_state & BM_VALID));
-			UnlockBufHdr(buf, buf_state);
-		}
-		else
-		{
-			Assert(buf_state & BM_DIRTY);
-			UnlockBufHdr(buf, buf_state);
-			/* Issue notice if this is not the first failure... */
-			if (buf_state & BM_IO_ERROR)
-			{
-				/* Buffer is pinned, so we can read tag without spinlock */
-				char	   *path;
-
-				path = relpathperm(BufTagGetRelFileLocator(&buf->tag),
-								   BufTagGetForkNum(&buf->tag));
-				ereport(WARNING,
-						(errcode(ERRCODE_IO_ERROR),
-						 errmsg("could not write block %u of %s",
-								buf->tag.blockNum, path),
-						 errdetail("Multiple failures --- write error might be permanent.")));
-				pfree(path);
-			}
-		}
-		TerminateBufferIO(buf, false, BM_IO_ERROR);
+		Assert(!(buf_state & BM_DIRTY));
+		UnlockBufHdr(buf_hdr, buf_state);
 	}
+	else
+	{
+		Assert(!(buf_state & BM_DIRTY));
+		UnlockBufHdr(buf_hdr, buf_state);
+
+		/* Issue notice if this is not the first failure... */
+		if (buf_state & BM_IO_ERROR)
+		{
+			/* Buffer is pinned, so we can read tag without spinlock */
+			char	   *path;
+
+			path = relpathperm(BufTagGetRelFileLocator(&buf_hdr->tag),
+							   BufTagGetForkNum(&buf_hdr->tag));
+			ereport(WARNING,
+					(errcode(ERRCODE_IO_ERROR),
+					 errmsg("could not write block %u of %s",
+							buf_hdr->tag.blockNum, path),
+					 errdetail("Multiple failures --- write error might be permanent.")));
+			pfree(path);
+		}
+	}
+
+	TerminateBufferIO(buf_hdr, false, BM_IO_ERROR);
 }
 
 /*
diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c
index 19b6241e45d..fccc59b39dd 100644
--- a/src/backend/utils/resowner/resowner.c
+++ b/src/backend/utils/resowner/resowner.c
@@ -121,6 +121,7 @@ typedef struct ResourceOwnerData
 
 	/* We have built-in support for remembering: */
 	ResourceArray bufferarr;	/* owned buffers */
+	ResourceArray bufferioarr;	/* in-progress buffer IO */
 	ResourceArray catrefarr;	/* catcache references */
 	ResourceArray catlistrefarr;	/* catcache-list pins */
 	ResourceArray relrefarr;	/* relcache references */
@@ -441,6 +442,7 @@ ResourceOwnerCreate(ResourceOwner parent, const char *name)
 	}
 
 	ResourceArrayInit(&(owner->bufferarr), BufferGetDatum(InvalidBuffer));
+	ResourceArrayInit(&(owner->bufferioarr), BufferGetDatum(InvalidBuffer));
 	ResourceArrayInit(&(owner->catrefarr), PointerGetDatum(NULL));
 	ResourceArrayInit(&(owner->catlistrefarr), PointerGetDatum(NULL));
 	ResourceArrayInit(&(owner->relrefarr), PointerGetDatum(NULL));
@@ -517,6 +519,24 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
 
 	if (phase == RESOURCE_RELEASE_BEFORE_LOCKS)
 	{
+		/*
+		 * Abort failed buffer IO. AbortBufferIO()->TerminateBufferIO() calls
+		 * ResourceOwnerForgetBufferIOs(), so we just have to iterate till
+		 * there are none.
+		 *
+		 * Needs to be before we release buffer pins.
+		 *
+		 * During a commit, there shouldn't be any in-progress IO.
+		 */
+		while (ResourceArrayGetAny(&(owner->bufferioarr), &foundres))
+		{
+			Buffer		res = DatumGetBuffer(foundres);
+
+			if (isCommit)
+				elog(PANIC, "lost track of buffer IO on buffer %u", res);
+			AbortBufferIO(res);
+		}
+
 		/*
 		 * Release buffer pins.  Note that ReleaseBuffer will remove the
 		 * buffer entry from our array, so we just have to iterate till there
@@ -746,6 +766,7 @@ ResourceOwnerDelete(ResourceOwner owner)
 
 	/* And it better not own any resources, either */
 	Assert(owner->bufferarr.nitems == 0);
+	Assert(owner->bufferioarr.nitems == 0);
 	Assert(owner->catrefarr.nitems == 0);
 	Assert(owner->catlistrefarr.nitems == 0);
 	Assert(owner->relrefarr.nitems == 0);
@@ -775,6 +796,7 @@ ResourceOwnerDelete(ResourceOwner owner)
 
 	/* And free the object. */
 	ResourceArrayFree(&(owner->bufferarr));
+	ResourceArrayFree(&(owner->bufferioarr));
 	ResourceArrayFree(&(owner->catrefarr));
 	ResourceArrayFree(&(owner->catlistrefarr));
 	ResourceArrayFree(&(owner->relrefarr));
@@ -976,6 +998,44 @@ ResourceOwnerForgetBuffer(ResourceOwner owner, Buffer buffer)
 			 buffer, owner->name);
 }
 
+
+/*
+ * Make sure there is room for at least one more entry in a ResourceOwner's
+ * buffer array.
+ *
+ * This is separate from actually inserting an entry because if we run out
+ * of memory, it's critical to do so *before* acquiring the resource.
+ */
+void
+ResourceOwnerEnlargeBufferIOs(ResourceOwner owner)
+{
+	/* We used to allow pinning buffers without a resowner, but no more */
+	Assert(owner != NULL);
+	ResourceArrayEnlarge(&(owner->bufferioarr));
+}
+
+/*
+ * Remember that a buffer IO is owned by a ResourceOwner
+ *
+ * Caller must have previously done ResourceOwnerEnlargeBufferIOs()
+ */
+void
+ResourceOwnerRememberBufferIO(ResourceOwner owner, Buffer buffer)
+{
+	ResourceArrayAdd(&(owner->bufferioarr), BufferGetDatum(buffer));
+}
+
+/*
+ * Forget that a buffer IO is owned by a ResourceOwner
+ */
+void
+ResourceOwnerForgetBufferIO(ResourceOwner owner, Buffer buffer)
+{
+	if (!ResourceArrayRemove(&(owner->bufferioarr), BufferGetDatum(buffer)))
+		elog(PANIC, "buffer IO %d is not owned by resource owner %s",
+			 buffer, owner->name);
+}
+
 /*
  * Remember that a Local Lock is owned by a ResourceOwner
  *
-- 
2.38.0

