From 8905fc32399617b7a655a6e8f3f05aecb62a83bc Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Sun, 9 Mar 2025 19:35:04 -0400
Subject: [PATCH v2.6 26/34] aio: Implement smgr/md/fd write support

TODO:
- register_dirty_segment_aio() can error out in edge cases
---
 src/include/storage/fd.h               |   1 +
 src/include/storage/md.h               |   5 +
 src/include/storage/smgr.h             |   5 +
 src/backend/storage/aio/aio_callback.c |   1 +
 src/backend/storage/file/fd.c          |  28 ++++
 src/backend/storage/smgr/md.c          | 189 +++++++++++++++++++++++++
 src/backend/storage/smgr/smgr.c        |  22 +++
 7 files changed, 251 insertions(+)

diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h
index b77d8e5e30e..2cc7c5a4761 100644
--- a/src/include/storage/fd.h
+++ b/src/include/storage/fd.h
@@ -112,6 +112,7 @@ extern int	FilePrefetch(File file, off_t offset, off_t amount, uint32 wait_event
 extern ssize_t FileReadV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info);
 extern ssize_t FileWriteV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info);
 extern int	FileStartReadV(struct PgAioHandle *ioh, File file, int iovcnt, off_t offset, uint32 wait_event_info);
+extern int	FileStartWriteV(struct PgAioHandle *ioh, File file, int iovcnt, off_t offset, uint32 wait_event_info);
 extern int	FileSync(File file, uint32 wait_event_info);
 extern int	FileZero(File file, off_t offset, off_t amount, uint32 wait_event_info);
 extern int	FileFallocate(File file, off_t offset, off_t amount, uint32 wait_event_info);
diff --git a/src/include/storage/md.h b/src/include/storage/md.h
index 9d7131eff43..47ae6c36c94 100644
--- a/src/include/storage/md.h
+++ b/src/include/storage/md.h
@@ -21,6 +21,7 @@
 #include "storage/sync.h"
 
 extern const PgAioHandleCallbacks aio_md_readv_cb;
+extern const PgAioHandleCallbacks aio_md_writev_cb;
 
 /* md storage manager functionality */
 extern void mdinit(void);
@@ -45,6 +46,10 @@ extern void mdstartreadv(PgAioHandle *ioh,
 extern void mdwritev(SMgrRelation reln, ForkNumber forknum,
 					 BlockNumber blocknum,
 					 const void **buffers, BlockNumber nblocks, bool skipFsync);
+extern void mdstartwritev(PgAioHandle *ioh,
+						  SMgrRelation reln, ForkNumber forknum,
+						  BlockNumber blocknum,
+						  const void **buffers, BlockNumber nblocks, bool skipFsync);
 extern void mdwriteback(SMgrRelation reln, ForkNumber forknum,
 						BlockNumber blocknum, BlockNumber nblocks);
 extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum);
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index 856ebcda350..f00b3763ac9 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -108,6 +108,11 @@ extern void smgrwritev(SMgrRelation reln, ForkNumber forknum,
 					   BlockNumber blocknum,
 					   const void **buffers, BlockNumber nblocks,
 					   bool skipFsync);
+extern void smgrstartwritev(PgAioHandle *ioh,
+							SMgrRelation reln, ForkNumber forknum,
+							BlockNumber blocknum,
+							const void **buffers, BlockNumber nblocks,
+							bool skipFsync);
 extern void smgrwriteback(SMgrRelation reln, ForkNumber forknum,
 						  BlockNumber blocknum, BlockNumber nblocks);
 extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum);
diff --git a/src/backend/storage/aio/aio_callback.c b/src/backend/storage/aio/aio_callback.c
index f76f74ba166..fb6ac058a09 100644
--- a/src/backend/storage/aio/aio_callback.c
+++ b/src/backend/storage/aio/aio_callback.c
@@ -41,6 +41,7 @@ static const PgAioHandleCallbacksEntry aio_handle_cbs[] = {
 	CALLBACK_ENTRY(PGAIO_HCB_INVALID, aio_invalid_cb),
 
 	CALLBACK_ENTRY(PGAIO_HCB_MD_READV, aio_md_readv_cb),
+	CALLBACK_ENTRY(PGAIO_HCB_MD_WRITEV, aio_md_writev_cb),
 
 	CALLBACK_ENTRY(PGAIO_HCB_SHARED_BUFFER_READV, aio_shared_buffer_readv_cb),
 
diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index be8c4c2d60d..65abebefbfb 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -2346,6 +2346,34 @@ retry:
 	return returnCode;
 }
 
+int
+FileStartWriteV(PgAioHandle *ioh, File file,
+				int iovcnt, off_t offset,
+				uint32 wait_event_info)
+{
+	int			returnCode;
+	Vfd		   *vfdP;
+
+	Assert(FileIsValid(file));
+
+	DO_DB(elog(LOG, "FileStartWriteV: %d (%s) " INT64_FORMAT " %d",
+			   file, VfdCache[file].fileName,
+			   (int64) offset,
+			   iovcnt));
+
+	returnCode = FileAccess(file);
+	if (returnCode < 0)
+		return returnCode;
+
+	vfdP = &VfdCache[file];
+
+	/* FIXME: think about / reimplement  temp_file_limit */
+
+	pgaio_io_prep_writev(ioh, vfdP->fd, iovcnt, offset);
+
+	return 0;
+}
+
 int
 FileSync(File file, uint32 wait_event_info)
 {
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index 11d4d5a7aea..a61fc14805e 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -155,12 +155,19 @@ static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum,
 
 static PgAioResult md_readv_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data);
 static void md_readv_report(PgAioResult result, const PgAioTargetData *target_data, int elevel);
+static PgAioResult md_writev_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data);
+static void md_writev_report(PgAioResult result, const PgAioTargetData *target_data, int elevel);
 
 const PgAioHandleCallbacks aio_md_readv_cb = {
 	.complete_shared = md_readv_complete,
 	.report = md_readv_report,
 };
 
+const PgAioHandleCallbacks aio_md_writev_cb = {
+	.complete_shared = md_writev_complete,
+	.report = md_writev_report,
+};
+
 
 static inline int
 _mdfd_open_flags(void)
@@ -1107,6 +1114,56 @@ mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 	}
 }
 
+/*
+ * mdstartwritev() -- Asynchronous version of mdrwritev().
+ */
+void
+mdstartwritev(PgAioHandle *ioh,
+			  SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+			  const void **buffers, BlockNumber nblocks, bool skipFsync)
+{
+	off_t		seekpos;
+	MdfdVec    *v;
+	BlockNumber nblocks_this_segment;
+	struct iovec *iov;
+	int			iovcnt;
+
+	v = _mdfd_getseg(reln, forknum, blocknum, false,
+					 EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY);
+
+	seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
+
+	Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
+
+	nblocks_this_segment =
+		Min(nblocks,
+			RELSEG_SIZE - (blocknum % ((BlockNumber) RELSEG_SIZE)));
+
+	if (nblocks_this_segment != nblocks)
+		elog(ERROR, "write crossing segment boundary");
+
+	iovcnt = pgaio_io_get_iovec(ioh, &iov);
+
+	Assert(nblocks <= iovcnt);
+
+	iovcnt = buffers_to_iovec(iov, unconstify(void **, buffers), nblocks_this_segment);
+
+	Assert(iovcnt <= nblocks_this_segment);
+
+	if (!(io_direct_flags & IO_DIRECT_DATA))
+		pgaio_io_set_flag(ioh, PGAIO_HF_BUFFERED);
+
+	pgaio_io_set_target_smgr(ioh,
+							 reln,
+							 forknum,
+							 blocknum,
+							 nblocks,
+							 skipFsync);
+	pgaio_io_register_callbacks(ioh, PGAIO_HCB_MD_WRITEV, 0);
+
+	FileStartWriteV(ioh, v->mdfd_vfd, iovcnt, seekpos, WAIT_EVENT_DATA_FILE_WRITE);
+}
+
 
 /*
  * mdwriteback() -- Tell the kernel to write pages back to storage.
@@ -1495,6 +1552,40 @@ register_dirty_segment(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg)
 	}
 }
 
+/*
+ * Like register_dirty_segment(), except for use by AIO. In the completion
+ * callback we don't have access to the MdfdVec (the completion callback might
+ * be executed in a different backend than the issuing backend), therefore we
+ * have to implement this slightly differently.
+ */
+static void
+register_dirty_segment_aio(RelFileLocator locator, ForkNumber forknum, uint64 segno)
+{
+	FileTag		tag;
+
+	INIT_MD_FILETAG(tag, locator, forknum, segno);
+
+	/*
+	 * Can't block here waiting for checkpointer to accept our sync request,
+	 * as checkpointer might be waiting for this AIO to finish if offloaded to
+	 * a worker.
+	 */
+	if (!RegisterSyncRequest(&tag, SYNC_REQUEST, false /* retryOnError */ ))
+	{
+		char		path[MAXPGPATH];
+
+		ereport(DEBUG1,
+				(errmsg_internal("could not forward fsync request because request queue is full")));
+
+		/* reuse mdsyncfiletag() to avoid duplicating code */
+		if (mdsyncfiletag(&tag, path))
+			ereport(data_sync_elevel(ERROR),
+					(errcode_for_file_access(),
+					 errmsg("could not fsync file \"%s\": %m",
+							path)));
+	}
+}
+
 /*
  * register_unlink_segment() -- Schedule a file to be deleted after next checkpoint
  */
@@ -2017,3 +2108,101 @@ md_readv_report(PgAioResult result, const PgAioTargetData *td, int elevel)
 					   td->smgr.nblocks * (size_t) BLCKSZ));
 	}
 }
+
+/*
+ * AIO completion callback for mdstartwritev().
+ */
+static PgAioResult
+md_writev_complete(PgAioHandle *ioh, PgAioResult prior_result, uint8 cb_data)
+{
+	PgAioTargetData *td = pgaio_io_get_target_data(ioh);
+	PgAioResult result = prior_result;
+
+	if (prior_result.result < 0)
+	{
+		result.status = ARS_ERROR;
+		result.id = PGAIO_HCB_MD_WRITEV;
+		/* For "hard" errors, track the error number in error_data */
+		result.error_data = -prior_result.result;
+		result.result = 0;
+
+		pgaio_result_report(result, td, LOG);
+
+		return result;
+	}
+
+	/*
+	 * The smgr API operates in blocks, therefore convert the result from
+	 * bytes to blocks.
+	 */
+	result.result /= BLCKSZ;
+
+	if (result.result == 0)
+	{
+		/* consider 0 blocks written a failure */
+		result.status = ARS_ERROR;
+		result.id = PGAIO_HCB_MD_WRITEV;
+		result.error_data = 0;
+
+		pgaio_result_report(result, td, LOG);
+
+		return result;
+	}
+
+	if (result.status != ARS_ERROR &&
+		result.result < td->smgr.nblocks)
+	{
+		/* partial writes should be retried at upper level */
+		result.status = ARS_PARTIAL;
+		result.id = PGAIO_HCB_MD_WRITEV;
+	}
+
+	if (!td->smgr.skip_fsync)
+		register_dirty_segment_aio(td->smgr.rlocator, td->smgr.forkNum,
+								   td->smgr.blockNum / ((BlockNumber) RELSEG_SIZE));
+
+	return result;
+}
+
+/*
+ * AIO error reporting callback for mdstartwritev().
+ */
+static void
+md_writev_report(PgAioResult result, const PgAioTargetData *td, int elevel)
+{
+	RelPathStr	path;
+
+	path = relpathbackend(td->smgr.rlocator,
+						  td->smgr.is_temp ? MyProcNumber : INVALID_PROC_NUMBER,
+						  td->smgr.forkNum);
+
+	if (result.error_data != 0)
+	{
+		errno = result.error_data;	/* for errcode_for_file_access() */
+
+		ereport(elevel,
+				errcode_for_file_access(),
+				errmsg("could not write blocks %u..%u in file \"%s\": %m",
+					   td->smgr.blockNum,
+					   td->smgr.blockNum + td->smgr.nblocks,
+					   path.str)
+			);
+	}
+	else
+	{
+		/*
+		 * NB: This will typically only be output in debug messages, while
+		 * retrying a partial IO.
+		 */
+		ereport(elevel,
+				errcode(ERRCODE_DATA_CORRUPTED),
+				errmsg("could not write blocks %u..%u in file \"%s\": wrote only %zu of %zu bytes",
+					   td->smgr.blockNum,
+					   td->smgr.blockNum + td->smgr.nblocks - 1,
+					   path.str,
+					   result.result * (size_t) BLCKSZ,
+					   td->smgr.nblocks * (size_t) BLCKSZ
+					   )
+			);
+	}
+}
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index 607c14ee173..088b189543b 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -102,6 +102,11 @@ typedef struct f_smgr
 								BlockNumber blocknum,
 								const void **buffers, BlockNumber nblocks,
 								bool skipFsync);
+	void		(*smgr_startwritev) (PgAioHandle *ioh,
+									 SMgrRelation reln, ForkNumber forknum,
+									 BlockNumber blocknum,
+									 const void **buffers, BlockNumber nblocks,
+									 bool skipFsync);
 	void		(*smgr_writeback) (SMgrRelation reln, ForkNumber forknum,
 								   BlockNumber blocknum, BlockNumber nblocks);
 	BlockNumber (*smgr_nblocks) (SMgrRelation reln, ForkNumber forknum);
@@ -129,6 +134,7 @@ static const f_smgr smgrsw[] = {
 		.smgr_readv = mdreadv,
 		.smgr_startreadv = mdstartreadv,
 		.smgr_writev = mdwritev,
+		.smgr_startwritev = mdstartwritev,
 		.smgr_writeback = mdwriteback,
 		.smgr_nblocks = mdnblocks,
 		.smgr_truncate = mdtruncate,
@@ -691,6 +697,22 @@ smgrwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 										 buffers, nblocks, skipFsync);
 }
 
+/*
+ * smgrstartwritev() -- asynchronous version of smgrwritev()
+ *
+ * This starts an asynchronous writev IO with the IO handle `ioh`. Other than
+ * `ioh` all parameters are the same as smgrwritev().
+ */
+void
+smgrstartwritev(PgAioHandle *ioh,
+				SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+				const void **buffers, BlockNumber nblocks, bool skipFsync)
+{
+	smgrsw[reln->smgr_which].smgr_startwritev(ioh,
+											  reln, forknum, blocknum, buffers,
+											  nblocks, skipFsync);
+}
+
 /*
  * smgrwriteback() -- Trigger kernel writeback for the supplied range of
  *					   blocks.
-- 
2.48.1.76.g4e746b1a31.dirty

