From c4f3fe9002ec977f172f9413073353557d9f45f8 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Tue, 18 Mar 2025 14:40:06 -0400
Subject: [PATCH v2.14 20/29] aio: Implement smgr/md/fd write support

TODO:
- Right now the sync.c integration with smgr.c/md.c isn't properly safe to use
  in a critical section

  The only reason it doesn't immediately fail is that it's reasonably rare
  that RegisterSyncRequest() fails *and* either:

  - smgropen()->hash_search(HASH_ENTER) decides to resize the hash table, even
    though the lookup is guaranteed to succeed for io_method=worker.

  - an io_method=uring completion is run in a different backend and smgropen()
    needs to build a new entry and thus needs to allocate memory

  For a bit I thought this could be worked around easily enough by not doing
  an smgropen() in mdsyncfiletag(), or adding a "fallible" smgropen() and
  instead just opening the file directly. That actually does kinda solve the
  problem, but only because the memory allocation in PathNameOpenFile()
  uses malloc(), not palloc() and thus doesn't trigger

- temp_file_limit implementation
---
 src/include/storage/fd.h               |   1 +
 src/include/storage/md.h               |   5 +
 src/include/storage/smgr.h             |   5 +
 src/backend/storage/aio/aio_callback.c |   1 +
 src/backend/storage/file/fd.c          |  28 ++++
 src/backend/storage/smgr/md.c          | 201 ++++++++++++++++++++++++-
 src/backend/storage/smgr/smgr.c        |  29 ++++
 7 files changed, 269 insertions(+), 1 deletion(-)

diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h
index b77d8e5e30e..2cc7c5a4761 100644
--- a/src/include/storage/fd.h
+++ b/src/include/storage/fd.h
@@ -112,6 +112,7 @@ extern int	FilePrefetch(File file, off_t offset, off_t amount, uint32 wait_event
 extern ssize_t FileReadV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info);
 extern ssize_t FileWriteV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info);
 extern int	FileStartReadV(struct PgAioHandle *ioh, File file, int iovcnt, off_t offset, uint32 wait_event_info);
+extern int	FileStartWriteV(struct PgAioHandle *ioh, File file, int iovcnt, off_t offset, uint32 wait_event_info);
 extern int	FileSync(File file, uint32 wait_event_info);
 extern int	FileZero(File file, off_t offset, off_t amount, uint32 wait_event_info);
 extern int	FileFallocate(File file, off_t offset, off_t amount, uint32 wait_event_info);
diff --git a/src/include/storage/md.h b/src/include/storage/md.h
index 9d7131eff43..47ae6c36c94 100644
--- a/src/include/storage/md.h
+++ b/src/include/storage/md.h
@@ -21,6 +21,7 @@
 #include "storage/sync.h"
 
 extern const PgAioHandleCallbacks aio_md_readv_cb;
+extern const PgAioHandleCallbacks aio_md_writev_cb;
 
 /* md storage manager functionality */
 extern void mdinit(void);
@@ -45,6 +46,10 @@ extern void mdstartreadv(PgAioHandle *ioh,
 extern void mdwritev(SMgrRelation reln, ForkNumber forknum,
 					 BlockNumber blocknum,
 					 const void **buffers, BlockNumber nblocks, bool skipFsync);
+extern void mdstartwritev(PgAioHandle *ioh,
+						  SMgrRelation reln, ForkNumber forknum,
+						  BlockNumber blocknum,
+						  const void **buffers, BlockNumber nblocks, bool skipFsync);
 extern void mdwriteback(SMgrRelation reln, ForkNumber forknum,
 						BlockNumber blocknum, BlockNumber nblocks);
 extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum);
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index 856ebcda350..f00b3763ac9 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -108,6 +108,11 @@ extern void smgrwritev(SMgrRelation reln, ForkNumber forknum,
 					   BlockNumber blocknum,
 					   const void **buffers, BlockNumber nblocks,
 					   bool skipFsync);
+extern void smgrstartwritev(PgAioHandle *ioh,
+							SMgrRelation reln, ForkNumber forknum,
+							BlockNumber blocknum,
+							const void **buffers, BlockNumber nblocks,
+							bool skipFsync);
 extern void smgrwriteback(SMgrRelation reln, ForkNumber forknum,
 						  BlockNumber blocknum, BlockNumber nblocks);
 extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum);
diff --git a/src/backend/storage/aio/aio_callback.c b/src/backend/storage/aio/aio_callback.c
index c0063d6950e..67026f5f6b4 100644
--- a/src/backend/storage/aio/aio_callback.c
+++ b/src/backend/storage/aio/aio_callback.c
@@ -41,6 +41,7 @@ static const PgAioHandleCallbacksEntry aio_handle_cbs[] = {
 	CALLBACK_ENTRY(PGAIO_HCB_INVALID, aio_invalid_cb),
 
 	CALLBACK_ENTRY(PGAIO_HCB_MD_READV, aio_md_readv_cb),
+	CALLBACK_ENTRY(PGAIO_HCB_MD_WRITEV, aio_md_writev_cb),
 
 	CALLBACK_ENTRY(PGAIO_HCB_SHARED_BUFFER_READV, aio_shared_buffer_readv_cb),
 
diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index 0e8299dd556..2138d47dab9 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -2348,6 +2348,34 @@ retry:
 	return returnCode;
 }
 
+int
+FileStartWriteV(PgAioHandle *ioh, File file,
+				int iovcnt, off_t offset,
+				uint32 wait_event_info)
+{
+	int			returnCode;
+	Vfd		   *vfdP;
+
+	Assert(FileIsValid(file));
+
+	DO_DB(elog(LOG, "FileStartWriteV: %d (%s) " INT64_FORMAT " %d",
+			   file, VfdCache[file].fileName,
+			   (int64) offset,
+			   iovcnt));
+
+	returnCode = FileAccess(file);
+	if (returnCode < 0)
+		return returnCode;
+
+	vfdP = &VfdCache[file];
+
+	/* FIXME: think about / reimplement  temp_file_limit */
+
+	pgaio_io_start_writev(ioh, vfdP->fd, iovcnt, offset);
+
+	return 0;
+}
+
 int
 FileSync(File file, uint32 wait_event_info)
 {
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index eedba1e5794..f6bdc7e37bc 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -155,12 +155,19 @@ static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum,
 
 static PgAioResult md_readv_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data);
 static void md_readv_report(PgAioResult result, const PgAioTargetData *target_data, int elevel);
+static PgAioResult md_writev_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data);
+static void md_writev_report(PgAioResult result, const PgAioTargetData *target_data, int elevel);
 
 const PgAioHandleCallbacks aio_md_readv_cb = {
 	.complete_shared = md_readv_complete,
 	.report = md_readv_report,
 };
 
+const PgAioHandleCallbacks aio_md_writev_cb = {
+	.complete_shared = md_writev_complete,
+	.report = md_writev_report,
+};
+
 
 static inline int
 _mdfd_open_flags(void)
@@ -1115,6 +1122,64 @@ mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 	}
 }
 
+/*
+ * mdstartwritev() -- Asynchronous version of mdrwritev().
+ */
+void
+mdstartwritev(PgAioHandle *ioh,
+			  SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+			  const void **buffers, BlockNumber nblocks, bool skipFsync)
+{
+	off_t		seekpos;
+	MdfdVec    *v;
+	BlockNumber nblocks_this_segment;
+	struct iovec *iov;
+	int			iovcnt;
+	int			ret;
+
+	v = _mdfd_getseg(reln, forknum, blocknum, false,
+					 EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY);
+
+	seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
+
+	Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
+
+	nblocks_this_segment =
+		Min(nblocks,
+			RELSEG_SIZE - (blocknum % ((BlockNumber) RELSEG_SIZE)));
+
+	if (nblocks_this_segment != nblocks)
+		elog(ERROR, "write crossing segment boundary");
+
+	iovcnt = pgaio_io_get_iovec(ioh, &iov);
+
+	Assert(nblocks <= iovcnt);
+
+	iovcnt = buffers_to_iovec(iov, unconstify(void **, buffers), nblocks_this_segment);
+
+	Assert(iovcnt <= nblocks_this_segment);
+
+	if (!(io_direct_flags & IO_DIRECT_DATA))
+		pgaio_io_set_flag(ioh, PGAIO_HF_BUFFERED);
+
+	pgaio_io_set_target_smgr(ioh,
+							 reln,
+							 forknum,
+							 blocknum,
+							 nblocks,
+							 skipFsync);
+	pgaio_io_register_callbacks(ioh, PGAIO_HCB_MD_WRITEV, 0);
+
+	ret = FileStartWriteV(ioh, v->mdfd_vfd, iovcnt, seekpos, WAIT_EVENT_DATA_FILE_WRITE);
+	if (ret != 0)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not start writing blocks %u..%u in file \"%s\": %m",
+						blocknum,
+						blocknum + nblocks_this_segment - 1,
+						FilePathName(v->mdfd_vfd))));
+}
+
 
 /*
  * mdwriteback() -- Tell the kernel to write pages back to storage.
@@ -1503,6 +1568,40 @@ register_dirty_segment(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg)
 	}
 }
 
+/*
+ * Like register_dirty_segment(), except for use by AIO. In the completion
+ * callback we don't have access to the MdfdVec (the completion callback might
+ * be executed in a different backend than the issuing backend), therefore we
+ * have to implement this slightly differently.
+ */
+static void
+register_dirty_segment_aio(RelFileLocator locator, ForkNumber forknum, uint64 segno)
+{
+	FileTag		tag;
+
+	INIT_MD_FILETAG(tag, locator, forknum, segno);
+
+	/*
+	 * Can't block here waiting for checkpointer to accept our sync request,
+	 * as checkpointer might be waiting for this AIO to finish if offloaded to
+	 * a worker.
+	 */
+	if (!RegisterSyncRequest(&tag, SYNC_REQUEST, false /* retryOnError */ ))
+	{
+		char		path[MAXPGPATH];
+
+		ereport(DEBUG1,
+				(errmsg_internal("could not forward fsync request because request queue is full")));
+
+		/* reuse mdsyncfiletag() to avoid duplicating code */
+		if (mdsyncfiletag(&tag, path))
+			ereport(data_sync_elevel(ERROR),
+					(errcode_for_file_access(),
+					 errmsg("could not fsync file \"%s\": %m",
+							path)));
+	}
+}
+
 /*
  * register_unlink_segment() -- Schedule a file to be deleted after next checkpoint
  */
@@ -1997,7 +2096,7 @@ md_readv_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data)
  * AIO error reporting callback for mdstartreadv().
  *
  * Errors are encoded as follows:
- * - PgAioResult.error_data != 0 encodes IO that failed with errno != 0
+ * - PgAioResult.error_data != 0 encodes IO that failed with that errno
  * - PgAioResult.error_data == 0 encodes IO that didn't read all data
  */
 static void
@@ -2037,3 +2136,103 @@ md_readv_report(PgAioResult result, const PgAioTargetData *td, int elevel)
 					   td->smgr.nblocks * (size_t) BLCKSZ));
 	}
 }
+
+/*
+ * AIO completion callback for mdstartwritev().
+ */
+static PgAioResult
+md_writev_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data)
+{
+	PgAioTargetData *td = pgaio_io_get_target_data(ioh);
+	PgAioResult result = prior_result;
+
+	if (prior_result.result < 0)
+	{
+		result.status = PGAIO_RS_ERROR;
+		result.id = PGAIO_HCB_MD_WRITEV;
+		/* For "hard" errors, track the error number in error_data */
+		result.error_data = -prior_result.result;
+		result.result = 0;
+
+		pgaio_result_report(result, td, LOG);
+
+		return result;
+	}
+
+	/*
+	 * As explained above smgrstartwritev(), the smgr API operates on the
+	 * level of blocks, rather than bytes. Convert.
+	 */
+	result.result /= BLCKSZ;
+
+	Assert(result.result <= td->smgr.nblocks);
+
+	if (result.result == 0)
+	{
+		/* consider 0 blocks written a failure */
+		result.status = PGAIO_RS_ERROR;
+		result.id = PGAIO_HCB_MD_WRITEV;
+		result.error_data = 0;
+
+		pgaio_result_report(result, td, LOG);
+
+		return result;
+	}
+
+	if (result.status != PGAIO_RS_ERROR &&
+		result.result < td->smgr.nblocks)
+	{
+		/* partial writes should be retried at upper level */
+		result.status = PGAIO_RS_PARTIAL;
+		result.id = PGAIO_HCB_MD_WRITEV;
+	}
+
+	if (!td->smgr.skip_fsync)
+		register_dirty_segment_aio(td->smgr.rlocator, td->smgr.forkNum,
+								   td->smgr.blockNum / ((BlockNumber) RELSEG_SIZE));
+
+	return result;
+}
+
+/*
+ * AIO error reporting callback for mdstartwritev().
+ */
+static void
+md_writev_report(PgAioResult result, const PgAioTargetData *td, int elevel)
+{
+	RelPathStr	path;
+
+	path = relpathbackend(td->smgr.rlocator,
+						  td->smgr.is_temp ? MyProcNumber : INVALID_PROC_NUMBER,
+						  td->smgr.forkNum);
+
+	if (result.error_data != 0)
+	{
+		errno = result.error_data;	/* for errcode_for_file_access() */
+
+		ereport(elevel,
+				errcode_for_file_access(),
+				errmsg("could not write blocks %u..%u in file \"%s\": %m",
+					   td->smgr.blockNum,
+					   td->smgr.blockNum + td->smgr.nblocks,
+					   path.str)
+			);
+	}
+	else
+	{
+		/*
+		 * NB: This will typically only be output in debug messages, while
+		 * retrying a partial IO.
+		 */
+		ereport(elevel,
+				errcode(ERRCODE_DATA_CORRUPTED),
+				errmsg("could not write blocks %u..%u in file \"%s\": wrote only %zu of %zu bytes",
+					   td->smgr.blockNum,
+					   td->smgr.blockNum + td->smgr.nblocks - 1,
+					   path.str,
+					   result.result * (size_t) BLCKSZ,
+					   td->smgr.nblocks * (size_t) BLCKSZ
+					   )
+			);
+	}
+}
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index bf8f57b410a..e56d6a2597b 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -115,6 +115,11 @@ typedef struct f_smgr
 								BlockNumber blocknum,
 								const void **buffers, BlockNumber nblocks,
 								bool skipFsync);
+	void		(*smgr_startwritev) (PgAioHandle *ioh,
+									 SMgrRelation reln, ForkNumber forknum,
+									 BlockNumber blocknum,
+									 const void **buffers, BlockNumber nblocks,
+									 bool skipFsync);
 	void		(*smgr_writeback) (SMgrRelation reln, ForkNumber forknum,
 								   BlockNumber blocknum, BlockNumber nblocks);
 	BlockNumber (*smgr_nblocks) (SMgrRelation reln, ForkNumber forknum);
@@ -142,6 +147,7 @@ static const f_smgr smgrsw[] = {
 		.smgr_readv = mdreadv,
 		.smgr_startreadv = mdstartreadv,
 		.smgr_writev = mdwritev,
+		.smgr_startwritev = mdstartwritev,
 		.smgr_writeback = mdwriteback,
 		.smgr_nblocks = mdnblocks,
 		.smgr_truncate = mdtruncate,
@@ -787,6 +793,29 @@ smgrwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 	RESUME_INTERRUPTS();
 }
 
+/*
+ * smgrstartwritev() -- asynchronous version of smgrwritev()
+ *
+ * This starts an asynchronous writev IO using the IO handle `ioh`. Other than
+ * `ioh` all parameters are the same as smgrwritev().
+ *
+ * Completion callbacks above smgr will be passed the result as the number of
+ * successfully written blocks if the write [partially] succeeds. This
+ * maintains the abstraction that smgr operates on the level of blocks, rather
+ * than bytes.
+ */
+void
+smgrstartwritev(PgAioHandle *ioh,
+				SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+				const void **buffers, BlockNumber nblocks, bool skipFsync)
+{
+	HOLD_INTERRUPTS();
+	smgrsw[reln->smgr_which].smgr_startwritev(ioh,
+											  reln, forknum, blocknum, buffers,
+											  nblocks, skipFsync);
+	RESUME_INTERRUPTS();
+}
+
 /*
  * smgrwriteback() -- Trigger kernel writeback for the supplied range of
  *					   blocks.
-- 
2.48.1.76.g4e746b1a31.dirty

