From 13f0bd1fc0bc86addb30fab960853d61d137e4bd Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Wed, 22 Jan 2025 16:06:51 -0500
Subject: [PATCH v2.4 16/29] aio: Implement smgr/md/fd aio methods

---
 src/include/storage/aio.h              |   6 +-
 src/include/storage/aio_types.h        |  12 +-
 src/include/storage/fd.h               |   6 +
 src/include/storage/md.h               |  12 +
 src/include/storage/smgr.h             |  22 ++
 src/backend/storage/aio/aio_callback.c |   4 +
 src/backend/storage/aio/aio_target.c   |   2 +
 src/backend/storage/file/fd.c          |  68 +++++
 src/backend/storage/smgr/md.c          | 362 +++++++++++++++++++++++++
 src/backend/storage/smgr/smgr.c        | 126 +++++++++
 10 files changed, 616 insertions(+), 4 deletions(-)

diff --git a/src/include/storage/aio.h b/src/include/storage/aio.h
index 3c058c84003..4c5fb7bcfce 100644
--- a/src/include/storage/aio.h
+++ b/src/include/storage/aio.h
@@ -108,9 +108,10 @@ typedef enum PgAioTargetID
 {
 	/* intentionally the zero value, to help catch zeroed memory etc */
 	PGAIO_TID_INVALID = 0,
+	PGAIO_TID_SMGR,
 } PgAioTargetID;
 
-#define PGAIO_TID_COUNT (PGAIO_TID_INVALID + 1)
+#define PGAIO_TID_COUNT (PGAIO_TID_SMGR + 1)
 
 
 /*
@@ -174,6 +175,9 @@ typedef struct PgAioTargetInfo
 typedef enum PgAioHandleCallbackID
 {
 	PGAIO_HCB_INVALID,
+
+	PGAIO_HCB_MD_READV,
+	PGAIO_HCB_MD_WRITEV,
 } PgAioHandleCallbackID;
 
 
diff --git a/src/include/storage/aio_types.h b/src/include/storage/aio_types.h
index d2617139a25..762fce3f075 100644
--- a/src/include/storage/aio_types.h
+++ b/src/include/storage/aio_types.h
@@ -58,11 +58,17 @@ typedef struct PgAioWaitRef
  */
 typedef union PgAioTargetData
 {
-	/* just as an example placeholder for later */
 	struct
 	{
-		uint32		queue_id;
-	}			wal;
+		RelFileLocator rlocator;	/* physical relation identifier */
+		BlockNumber blockNum;	/* blknum relative to begin of reln */
+		BlockNumber nblocks;
+		ForkNumber	forkNum:8;	/* don't waste 4 byte for four values */
+		bool		is_temp:1;	/* proc can be inferred by owning AIO */
+		bool		release_lock:1;
+		bool		skip_fsync:1;
+		uint8		mode;
+	}			smgr;
 } PgAioTargetData;
 
 
diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h
index e3067ab6597..e2fd896646e 100644
--- a/src/include/storage/fd.h
+++ b/src/include/storage/fd.h
@@ -101,6 +101,8 @@ extern PGDLLIMPORT int max_safe_fds;
  * prototypes for functions in fd.c
  */
 
+struct PgAioHandle;
+
 /* Operations on virtual Files --- equivalent to Unix kernel file ops */
 extern File PathNameOpenFile(const char *fileName, int fileFlags);
 extern File PathNameOpenFilePerm(const char *fileName, int fileFlags, mode_t fileMode);
@@ -109,6 +111,10 @@ extern void FileClose(File file);
 extern int	FilePrefetch(File file, off_t offset, off_t amount, uint32 wait_event_info);
 extern ssize_t FileReadV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info);
 extern ssize_t FileWriteV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info);
+extern ssize_t FileReadV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info);
+extern int	FileStartReadV(struct PgAioHandle *ioh, File file, int iovcnt, off_t offset, uint32 wait_event_info);
+extern ssize_t FileWriteV(File file, const struct iovec *iov, int iovcnt, off_t offset, uint32 wait_event_info);
+extern int	FileStartWriteV(struct PgAioHandle *ioh, File file, int iovcnt, off_t offset, uint32 wait_event_info);
 extern int	FileSync(File file, uint32 wait_event_info);
 extern int	FileZero(File file, off_t offset, off_t amount, uint32 wait_event_info);
 extern int	FileFallocate(File file, off_t offset, off_t amount, uint32 wait_event_info);
diff --git a/src/include/storage/md.h b/src/include/storage/md.h
index 05bf537066e..7b28c3d482c 100644
--- a/src/include/storage/md.h
+++ b/src/include/storage/md.h
@@ -19,6 +19,10 @@
 #include "storage/smgr.h"
 #include "storage/sync.h"
 
+struct PgAioHandleCallbacks;
+extern const struct PgAioHandleCallbacks aio_md_readv_cb;
+extern const struct PgAioHandleCallbacks aio_md_writev_cb;
+
 /* md storage manager functionality */
 extern void mdinit(void);
 extern void mdopen(SMgrRelation reln);
@@ -36,9 +40,16 @@ extern uint32 mdmaxcombine(SMgrRelation reln, ForkNumber forknum,
 						   BlockNumber blocknum);
 extern void mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 					void **buffers, BlockNumber nblocks);
+extern void mdstartreadv(struct PgAioHandle *ioh,
+						 SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+						 void **buffers, BlockNumber nblocks);
 extern void mdwritev(SMgrRelation reln, ForkNumber forknum,
 					 BlockNumber blocknum,
 					 const void **buffers, BlockNumber nblocks, bool skipFsync);
+extern void mdstartwritev(struct PgAioHandle *ioh,
+						  SMgrRelation reln, ForkNumber forknum,
+						  BlockNumber blocknum,
+						  const void **buffers, BlockNumber nblocks, bool skipFsync);
 extern void mdwriteback(SMgrRelation reln, ForkNumber forknum,
 						BlockNumber blocknum, BlockNumber nblocks);
 extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum);
@@ -46,6 +57,7 @@ extern void mdtruncate(SMgrRelation reln, ForkNumber forknum,
 					   BlockNumber old_blocks, BlockNumber nblocks);
 extern void mdimmedsync(SMgrRelation reln, ForkNumber forknum);
 extern void mdregistersync(SMgrRelation reln, ForkNumber forknum);
+extern int	mdfd(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, uint32 *off);
 
 extern void ForgetDatabaseSyncRequests(Oid dbid);
 extern void DropRelationFiles(RelFileLocator *delrels, int ndelrels, bool isRedo);
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index 4016b206ad6..86fa07b110f 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -73,6 +73,11 @@ typedef SMgrRelationData *SMgrRelation;
 #define SmgrIsTemp(smgr) \
 	RelFileLocatorBackendIsTemp((smgr)->smgr_rlocator)
 
+struct PgAioHandle;
+struct PgAioTargetInfo;
+
+extern const struct PgAioTargetInfo aio_smgr_target_info;
+
 extern void smgrinit(void);
 extern SMgrRelation smgropen(RelFileLocator rlocator, ProcNumber backend);
 extern bool smgrexists(SMgrRelation reln, ForkNumber forknum);
@@ -97,10 +102,19 @@ extern uint32 smgrmaxcombine(SMgrRelation reln, ForkNumber forknum,
 extern void smgrreadv(SMgrRelation reln, ForkNumber forknum,
 					  BlockNumber blocknum,
 					  void **buffers, BlockNumber nblocks);
+extern void smgrstartreadv(struct PgAioHandle *ioh,
+						   SMgrRelation reln, ForkNumber forknum,
+						   BlockNumber blocknum,
+						   void **buffers, BlockNumber nblocks);
 extern void smgrwritev(SMgrRelation reln, ForkNumber forknum,
 					   BlockNumber blocknum,
 					   const void **buffers, BlockNumber nblocks,
 					   bool skipFsync);
+extern void smgrstartwritev(struct PgAioHandle *ioh,
+							SMgrRelation reln, ForkNumber forknum,
+							BlockNumber blocknum,
+							const void **buffers, BlockNumber nblocks,
+							bool skipFsync);
 extern void smgrwriteback(SMgrRelation reln, ForkNumber forknum,
 						  BlockNumber blocknum, BlockNumber nblocks);
 extern BlockNumber smgrnblocks(SMgrRelation reln, ForkNumber forknum);
@@ -110,6 +124,7 @@ extern void smgrtruncate(SMgrRelation reln, ForkNumber *forknum, int nforks,
 						 BlockNumber *nblocks);
 extern void smgrimmedsync(SMgrRelation reln, ForkNumber forknum);
 extern void smgrregistersync(SMgrRelation reln, ForkNumber forknum);
+extern int	smgrfd(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, uint32 *off);
 extern void AtEOXact_SMgr(void);
 extern bool ProcessBarrierSmgrRelease(void);
 
@@ -127,4 +142,11 @@ smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 	smgrwritev(reln, forknum, blocknum, &buffer, 1, skipFsync);
 }
 
+extern void pgaio_io_set_target_smgr(struct PgAioHandle *ioh,
+									 SMgrRelationData *smgr,
+									 ForkNumber forknum,
+									 BlockNumber blocknum,
+									 int nblocks,
+									 bool skip_fsync);
+
 #endif							/* SMGR_H */
diff --git a/src/backend/storage/aio/aio_callback.c b/src/backend/storage/aio/aio_callback.c
index 5629dc4cc94..adb8050eb58 100644
--- a/src/backend/storage/aio/aio_callback.c
+++ b/src/backend/storage/aio/aio_callback.c
@@ -18,6 +18,7 @@
 #include "miscadmin.h"
 #include "storage/aio.h"
 #include "storage/aio_internal.h"
+#include "storage/md.h"
 #include "utils/memutils.h"
 
 
@@ -38,6 +39,9 @@ typedef struct PgAioHandleCallbacksEntry
 static const PgAioHandleCallbacksEntry aio_handle_cbs[] = {
 #define CALLBACK_ENTRY(id, callback)  [id] = {.cb = &callback, .name = #callback}
 	CALLBACK_ENTRY(PGAIO_HCB_INVALID, aio_invalid_cb),
+
+	CALLBACK_ENTRY(PGAIO_HCB_MD_READV, aio_md_readv_cb),
+	CALLBACK_ENTRY(PGAIO_HCB_MD_WRITEV, aio_md_writev_cb),
 #undef CALLBACK_ENTRY
 };
 
diff --git a/src/backend/storage/aio/aio_target.c b/src/backend/storage/aio/aio_target.c
index 15428968e58..a43edd89890 100644
--- a/src/backend/storage/aio/aio_target.c
+++ b/src/backend/storage/aio/aio_target.c
@@ -18,6 +18,7 @@
 
 #include "storage/aio.h"
 #include "storage/aio_internal.h"
+#include "storage/smgr.h"
 
 
 /*
@@ -31,6 +32,7 @@ static const PgAioTargetInfo *pgaio_target_info[] = {
 	[PGAIO_TID_INVALID] = &(PgAioTargetInfo) {
 		.name = "invalid",
 	},
+	[PGAIO_TID_SMGR] = &aio_smgr_target_info,
 };
 
 
diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index e454db4c020..a9c90bc4e59 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -94,6 +94,7 @@
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/startup.h"
+#include "storage/aio.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "utils/guc.h"
@@ -1294,6 +1295,8 @@ LruDelete(File file)
 
 	vfdP = &VfdCache[file];
 
+	pgaio_closing_fd(vfdP->fd);
+
 	/*
 	 * Close the file.  We aren't expecting this to fail; if it does, better
 	 * to leak the FD than to mess up our internal state.
@@ -1987,6 +1990,8 @@ FileClose(File file)
 
 	if (!FileIsNotOpen(file))
 	{
+		pgaio_closing_fd(vfdP->fd);
+
 		/* close the file */
 		if (close(vfdP->fd) != 0)
 		{
@@ -2210,6 +2215,32 @@ retry:
 	return returnCode;
 }
 
+int
+FileStartReadV(struct PgAioHandle *ioh, File file,
+			   int iovcnt, off_t offset,
+			   uint32 wait_event_info)
+{
+	int			returnCode;
+	Vfd		   *vfdP;
+
+	Assert(FileIsValid(file));
+
+	DO_DB(elog(LOG, "FileStartReadV: %d (%s) " INT64_FORMAT " %d",
+			   file, VfdCache[file].fileName,
+			   (int64) offset,
+			   iovcnt));
+
+	returnCode = FileAccess(file);
+	if (returnCode < 0)
+		return returnCode;
+
+	vfdP = &VfdCache[file];
+
+	pgaio_io_prep_readv(ioh, vfdP->fd, iovcnt, offset);
+
+	return 0;
+}
+
 ssize_t
 FileWriteV(File file, const struct iovec *iov, int iovcnt, off_t offset,
 		   uint32 wait_event_info)
@@ -2315,6 +2346,34 @@ retry:
 	return returnCode;
 }
 
+int
+FileStartWriteV(struct PgAioHandle *ioh, File file,
+				int iovcnt, off_t offset,
+				uint32 wait_event_info)
+{
+	int			returnCode;
+	Vfd		   *vfdP;
+
+	Assert(FileIsValid(file));
+
+	DO_DB(elog(LOG, "FileStartWriteV: %d (%s) " INT64_FORMAT " %d",
+			   file, VfdCache[file].fileName,
+			   (int64) offset,
+			   iovcnt));
+
+	returnCode = FileAccess(file);
+	if (returnCode < 0)
+		return returnCode;
+
+	vfdP = &VfdCache[file];
+
+	/* FIXME: think about / reimplement  temp_file_limit */
+
+	pgaio_io_prep_writev(ioh, vfdP->fd, iovcnt, offset);
+
+	return 0;
+}
+
 int
 FileSync(File file, uint32 wait_event_info)
 {
@@ -2498,6 +2557,12 @@ FilePathName(File file)
 int
 FileGetRawDesc(File file)
 {
+	int			returnCode;
+
+	returnCode = FileAccess(file);
+	if (returnCode < 0)
+		return returnCode;
+
 	Assert(FileIsValid(file));
 	return VfdCache[file].fd;
 }
@@ -2778,6 +2843,7 @@ FreeDesc(AllocateDesc *desc)
 			result = closedir(desc->desc.dir);
 			break;
 		case AllocateDescRawFD:
+			pgaio_closing_fd(desc->desc.fd);
 			result = close(desc->desc.fd);
 			break;
 		default:
@@ -2846,6 +2912,8 @@ CloseTransientFile(int fd)
 	/* Only get here if someone passes us a file not in allocatedDescs */
 	elog(WARNING, "fd passed to CloseTransientFile was not obtained from OpenTransientFile");
 
+	pgaio_closing_fd(fd);
+
 	return close(fd);
 }
 
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index 7bf0b45e2c3..db508d63573 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -31,6 +31,7 @@
 #include "miscadmin.h"
 #include "pg_trace.h"
 #include "pgstat.h"
+#include "storage/aio.h"
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
 #include "storage/md.h"
@@ -132,6 +133,22 @@ static MdfdVec *_mdfd_getseg(SMgrRelation reln, ForkNumber forknum,
 static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum,
 							  MdfdVec *seg);
 
+static PgAioResult md_readv_complete(PgAioHandle *ioh, PgAioResult prior_result);
+static void md_readv_report(PgAioResult result, const PgAioTargetData *target_data, int elevel);
+static PgAioResult md_writev_complete(PgAioHandle *ioh, PgAioResult prior_result);
+static void md_writev_report(PgAioResult result, const PgAioTargetData *target_data, int elevel);
+
+const struct PgAioHandleCallbacks aio_md_readv_cb = {
+	.complete_shared = md_readv_complete,
+	.report = md_readv_report,
+};
+
+const struct PgAioHandleCallbacks aio_md_writev_cb = {
+	.complete_shared = md_writev_complete,
+	.report = md_writev_report,
+};
+
+
 static inline int
 _mdfd_open_flags(void)
 {
@@ -927,6 +944,53 @@ mdreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 	}
 }
 
+void
+mdstartreadv(PgAioHandle *ioh,
+			 SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+			 void **buffers, BlockNumber nblocks)
+{
+	off_t		seekpos;
+	MdfdVec    *v;
+	BlockNumber nblocks_this_segment;
+	struct iovec *iov;
+	int			iovcnt;
+
+	v = _mdfd_getseg(reln, forknum, blocknum, false,
+					 EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY);
+
+	seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
+
+	Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
+
+	nblocks_this_segment =
+		Min(nblocks,
+			RELSEG_SIZE - (blocknum % ((BlockNumber) RELSEG_SIZE)));
+
+	if (nblocks_this_segment != nblocks)
+		elog(ERROR, "read crossing segment boundary");
+
+	iovcnt = pgaio_io_get_iovec(ioh, &iov);
+
+	Assert(nblocks <= iovcnt);
+
+	iovcnt = buffers_to_iovec(iov, buffers, nblocks_this_segment);
+
+	Assert(iovcnt <= nblocks_this_segment);
+
+	if (!(io_direct_flags & IO_DIRECT_DATA))
+		pgaio_io_set_flag(ioh, PGAIO_HF_BUFFERED);
+
+	pgaio_io_set_target_smgr(ioh,
+							 reln,
+							 forknum,
+							 blocknum,
+							 nblocks,
+							 false);
+	pgaio_io_register_callbacks(ioh, PGAIO_HCB_MD_READV);
+
+	FileStartReadV(ioh, v->mdfd_vfd, iovcnt, seekpos, WAIT_EVENT_DATA_FILE_READ);
+}
+
 /*
  * mdwritev() -- Write the supplied blocks at the appropriate location.
  *
@@ -1032,6 +1096,53 @@ mdwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 	}
 }
 
+void
+mdstartwritev(PgAioHandle *ioh,
+			  SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+			  const void **buffers, BlockNumber nblocks, bool skipFsync)
+{
+	off_t		seekpos;
+	MdfdVec    *v;
+	BlockNumber nblocks_this_segment;
+	struct iovec *iov;
+	int			iovcnt;
+
+	v = _mdfd_getseg(reln, forknum, blocknum, false,
+					 EXTENSION_FAIL | EXTENSION_CREATE_RECOVERY);
+
+	seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
+
+	Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
+
+	nblocks_this_segment =
+		Min(nblocks,
+			RELSEG_SIZE - (blocknum % ((BlockNumber) RELSEG_SIZE)));
+
+	if (nblocks_this_segment != nblocks)
+		elog(ERROR, "write crossing segment boundary");
+
+	iovcnt = pgaio_io_get_iovec(ioh, &iov);
+
+	Assert(nblocks <= iovcnt);
+
+	iovcnt = buffers_to_iovec(iov, unconstify(void **, buffers), nblocks_this_segment);
+
+	Assert(iovcnt <= nblocks_this_segment);
+
+	if (!(io_direct_flags & IO_DIRECT_DATA))
+		pgaio_io_set_flag(ioh, PGAIO_HF_BUFFERED);
+
+	pgaio_io_set_target_smgr(ioh,
+							 reln,
+							 forknum,
+							 blocknum,
+							 nblocks,
+							 skipFsync);
+	pgaio_io_register_callbacks(ioh, PGAIO_HCB_MD_WRITEV);
+
+	FileStartWriteV(ioh, v->mdfd_vfd, iovcnt, seekpos, WAIT_EVENT_DATA_FILE_WRITE);
+}
+
 
 /*
  * mdwriteback() -- Tell the kernel to write pages back to storage.
@@ -1355,6 +1466,21 @@ mdimmedsync(SMgrRelation reln, ForkNumber forknum)
 	}
 }
 
+int
+mdfd(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, uint32 *off)
+{
+	MdfdVec    *v = mdopenfork(reln, forknum, EXTENSION_FAIL);
+
+	v = _mdfd_getseg(reln, forknum, blocknum, false,
+					 EXTENSION_FAIL);
+
+	*off = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
+
+	Assert(*off < (off_t) BLCKSZ * RELSEG_SIZE);
+
+	return FileGetRawDesc(v->mdfd_vfd);
+}
+
 /*
  * register_dirty_segment() -- Mark a relation segment as needing fsync
  *
@@ -1405,6 +1531,35 @@ register_dirty_segment(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg)
 	}
 }
 
+/*
+ * Like register_dirty_segment(), except for use by AIO. In the completion
+ * callback we don't have access to the MdfdVec (the completion callback might
+ * be executed in a different backend than the issuing backend), therefore we
+ * have to implement this slightly differently.
+ */
+static void
+register_dirty_segment_aio(RelFileLocator locator, ForkNumber forknum, uint64 segno)
+{
+	FileTag		tag;
+
+	INIT_MD_FILETAG(tag, locator, forknum, segno);
+
+	if (!RegisterSyncRequest(&tag, SYNC_REQUEST, false /* retryOnError */ ))
+	{
+		char		path[MAXPGPATH];
+
+		ereport(DEBUG1,
+				(errmsg_internal("could not forward fsync request because request queue is full")));
+
+		/* reuse mdsyncfiletag() to avoid duplicating code */
+		if (mdsyncfiletag(&tag, path))
+			ereport(data_sync_elevel(ERROR),
+					(errcode_for_file_access(),
+					 errmsg("could not fsync file \"%s\": %m",
+							path)));
+	}
+}
+
 /*
  * register_unlink_segment() -- Schedule a file to be deleted after next checkpoint
  */
@@ -1838,3 +1993,210 @@ mdfiletagmatches(const FileTag *ftag, const FileTag *candidate)
 	 */
 	return ftag->rlocator.dbOid == candidate->rlocator.dbOid;
 }
+
+/*
+ * AIO completion callback for mdstartreadv().
+ */
+static PgAioResult
+md_readv_complete(PgAioHandle *ioh, PgAioResult prior_result)
+{
+	PgAioTargetData *td = pgaio_io_get_target_data(ioh);
+	PgAioResult result = prior_result;
+
+	if (prior_result.result < 0)
+	{
+		result.status = ARS_ERROR;
+		result.id = PGAIO_HCB_MD_READV;
+		/* For "hard" errors, track the error number in error_data */
+		result.error_data = -prior_result.result;
+		result.result = 0;
+
+		md_readv_report(result, td, LOG);
+
+		return result;
+	}
+
+	/*
+	 * The smgr API operates in blocks, therefore convert the result from
+	 * bytes to blocks.
+	 */
+	result.result /= BLCKSZ;
+
+	if (result.result == 0)
+	{
+		/* consider 0 blocks read a failure */
+		result.status = ARS_ERROR;
+		result.id = PGAIO_HCB_MD_READV;
+		result.error_data = 0;
+
+		md_readv_report(result, td, LOG);
+
+		return result;
+	}
+
+	if (result.status != ARS_ERROR &&
+		result.result < td->smgr.nblocks)
+	{
+		/* partial reads should be retried at upper level */
+		result.status = ARS_PARTIAL;
+		result.id = PGAIO_HCB_MD_READV;
+	}
+
+	return result;
+}
+
+/*
+ * AIO error reporting callback for mdstartreadv().
+ */
+static void
+md_readv_report(PgAioResult result, const PgAioTargetData *td, int elevel)
+{
+	MemoryContext oldContext = CurrentMemoryContext;
+	char	   *path;
+
+	/* AFIXME: */
+	oldContext = MemoryContextSwitchTo(ErrorContext);
+
+	path = relpathbackend(td->smgr.rlocator,
+						  td->smgr.is_temp ? MyProcNumber : INVALID_PROC_NUMBER,
+						  td->smgr.forkNum);
+
+	if (result.error_data != 0)
+	{
+		errno = result.error_data;	/* for errcode_for_file_access() */
+
+		ereport(elevel,
+				errcode_for_file_access(),
+				errmsg("could not read blocks %u..%u in file \"%s\": %m",
+					   td->smgr.blockNum,
+					   td->smgr.blockNum + td->smgr.nblocks,
+					   path
+					   )
+			);
+	}
+	else
+	{
+		/*
+		 * NB: This will typically only be output in debug messages, while
+		 * retrying a partial IO.
+		 */
+		ereport(elevel,
+				errcode(ERRCODE_DATA_CORRUPTED),
+				errmsg("could not read blocks %u..%u in file \"%s\": read only %zu of %zu bytes",
+					   td->smgr.blockNum,
+					   td->smgr.blockNum + td->smgr.nblocks - 1,
+					   path,
+					   result.result * (size_t) BLCKSZ,
+					   td->smgr.nblocks * (size_t) BLCKSZ
+					   )
+			);
+	}
+
+	pfree(path);
+	MemoryContextSwitchTo(oldContext);
+}
+
+/*
+ * AIO completion callback for mdstartwritev().
+ */
+static PgAioResult
+md_writev_complete(PgAioHandle *ioh, PgAioResult prior_result)
+{
+	PgAioTargetData *td = pgaio_io_get_target_data(ioh);
+	PgAioResult result = prior_result;
+
+	if (prior_result.result < 0)
+	{
+		result.status = ARS_ERROR;
+		result.id = PGAIO_HCB_MD_WRITEV;
+		/* For "hard" errors, track the error number in error_data */
+		result.error_data = -prior_result.result;
+		result.result = 0;
+
+		md_writev_report(result, td, LOG);
+
+		return result;
+	}
+
+	/*
+	 * The smgr API operates in blocks, therefore convert the result from
+	 * bytes to blocks.
+	 */
+	result.result /= BLCKSZ;
+
+	if (result.result == 0)
+	{
+		/* consider 0 blocks written a failure */
+		result.status = ARS_ERROR;
+		result.id = PGAIO_HCB_MD_WRITEV;
+		result.error_data = 0;
+
+		md_writev_report(result, td, LOG);
+
+		return result;
+	}
+
+	if (result.status != ARS_ERROR &&
+		result.result < td->smgr.nblocks)
+	{
+		/* partial writes should be retried at upper level */
+		result.status = ARS_PARTIAL;
+		result.id = PGAIO_HCB_MD_WRITEV;
+	}
+
+	if (!td->smgr.skip_fsync)
+		register_dirty_segment_aio(td->smgr.rlocator, td->smgr.forkNum,
+								   td->smgr.blockNum / ((BlockNumber) RELSEG_SIZE));
+
+	return result;
+}
+
+/*
+ * AIO error reporting callback for mdstartwritev().
+ */
+static void
+md_writev_report(PgAioResult result, const PgAioTargetData *td, int elevel)
+{
+	MemoryContext oldContext = CurrentMemoryContext;
+	char	   *path;
+
+	/* AFIXME: */
+	oldContext = MemoryContextSwitchTo(ErrorContext);
+
+	path = relpathbackend(td->smgr.rlocator,
+						  td->smgr.is_temp ? MyProcNumber : INVALID_PROC_NUMBER,
+						  td->smgr.forkNum);
+
+	if (result.error_data != 0)
+	{
+		errno = result.error_data;	/* for errcode_for_file_access() */
+
+		ereport(elevel,
+				errcode_for_file_access(),
+				errmsg("could not write blocks %u..%u in file \"%s\": %m",
+					   td->smgr.blockNum,
+					   td->smgr.blockNum + td->smgr.nblocks,
+					   path)
+			);
+	}
+	else
+	{
+		/*
+		 * NB: This will typically only be output in debug messages, while
+		 * retrying a partial IO.
+		 */
+		ereport(elevel,
+				errcode(ERRCODE_DATA_CORRUPTED),
+				errmsg("could not write blocks %u..%u in file \"%s\": wrote only %zu of %zu bytes",
+					   td->smgr.blockNum,
+					   td->smgr.blockNum + td->smgr.nblocks - 1,
+					   path,
+					   result.result * (size_t) BLCKSZ,
+					   td->smgr.nblocks * (size_t) BLCKSZ
+					   )
+			);
+	}
+
+	pfree(path);
+	MemoryContextSwitchTo(oldContext);
+}
diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c
index ebe35c04de5..fb231e6ad48 100644
--- a/src/backend/storage/smgr/smgr.c
+++ b/src/backend/storage/smgr/smgr.c
@@ -53,6 +53,7 @@
 
 #include "access/xlogutils.h"
 #include "lib/ilist.h"
+#include "storage/aio.h"
 #include "storage/bufmgr.h"
 #include "storage/ipc.h"
 #include "storage/md.h"
@@ -93,10 +94,19 @@ typedef struct f_smgr
 	void		(*smgr_readv) (SMgrRelation reln, ForkNumber forknum,
 							   BlockNumber blocknum,
 							   void **buffers, BlockNumber nblocks);
+	void		(*smgr_startreadv) (struct PgAioHandle *ioh,
+									SMgrRelation reln, ForkNumber forknum,
+									BlockNumber blocknum,
+									void **buffers, BlockNumber nblocks);
 	void		(*smgr_writev) (SMgrRelation reln, ForkNumber forknum,
 								BlockNumber blocknum,
 								const void **buffers, BlockNumber nblocks,
 								bool skipFsync);
+	void		(*smgr_startwritev) (struct PgAioHandle *ioh,
+									 SMgrRelation reln, ForkNumber forknum,
+									 BlockNumber blocknum,
+									 const void **buffers, BlockNumber nblocks,
+									 bool skipFsync);
 	void		(*smgr_writeback) (SMgrRelation reln, ForkNumber forknum,
 								   BlockNumber blocknum, BlockNumber nblocks);
 	BlockNumber (*smgr_nblocks) (SMgrRelation reln, ForkNumber forknum);
@@ -104,6 +114,7 @@ typedef struct f_smgr
 								  BlockNumber old_blocks, BlockNumber nblocks);
 	void		(*smgr_immedsync) (SMgrRelation reln, ForkNumber forknum);
 	void		(*smgr_registersync) (SMgrRelation reln, ForkNumber forknum);
+	int			(*smgr_fd) (SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, uint32 *off);
 } f_smgr;
 
 static const f_smgr smgrsw[] = {
@@ -121,12 +132,15 @@ static const f_smgr smgrsw[] = {
 		.smgr_prefetch = mdprefetch,
 		.smgr_maxcombine = mdmaxcombine,
 		.smgr_readv = mdreadv,
+		.smgr_startreadv = mdstartreadv,
 		.smgr_writev = mdwritev,
+		.smgr_startwritev = mdstartwritev,
 		.smgr_writeback = mdwriteback,
 		.smgr_nblocks = mdnblocks,
 		.smgr_truncate = mdtruncate,
 		.smgr_immedsync = mdimmedsync,
 		.smgr_registersync = mdregistersync,
+		.smgr_fd = mdfd,
 	}
 };
 
@@ -145,6 +159,16 @@ static void smgrshutdown(int code, Datum arg);
 static void smgrdestroy(SMgrRelation reln);
 
 
+static void smgr_aio_reopen(PgAioHandle *ioh);
+static char *smgr_aio_describe_identity(const PgAioTargetData *sd);
+
+const struct PgAioTargetInfo aio_smgr_target_info = {
+	.name = "smgr",
+	.reopen = smgr_aio_reopen,
+	.describe_identity = smgr_aio_describe_identity,
+};
+
+
 /*
  * smgrinit(), smgrshutdown() -- Initialize or shut down storage
  *								 managers.
@@ -623,6 +647,19 @@ smgrreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 										nblocks);
 }
 
+/*
+ * AFIXME: FILL ME IN
+ */
+void
+smgrstartreadv(struct PgAioHandle *ioh,
+			   SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+			   void **buffers, BlockNumber nblocks)
+{
+	smgrsw[reln->smgr_which].smgr_startreadv(ioh,
+											 reln, forknum, blocknum, buffers,
+											 nblocks);
+}
+
 /*
  * smgrwritev() -- Write the supplied buffers out.
  *
@@ -657,6 +694,19 @@ smgrwritev(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
 										 buffers, nblocks, skipFsync);
 }
 
+/*
+ * AFIXME: FILL ME IN
+ */
+void
+smgrstartwritev(struct PgAioHandle *ioh,
+				SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
+				const void **buffers, BlockNumber nblocks, bool skipFsync)
+{
+	smgrsw[reln->smgr_which].smgr_startwritev(ioh,
+											  reln, forknum, blocknum, buffers,
+											  nblocks, skipFsync);
+}
+
 /*
  * smgrwriteback() -- Trigger kernel writeback for the supplied range of
  *					   blocks.
@@ -819,6 +869,12 @@ smgrimmedsync(SMgrRelation reln, ForkNumber forknum)
 	smgrsw[reln->smgr_which].smgr_immedsync(reln, forknum);
 }
 
+int
+smgrfd(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, uint32 *off)
+{
+	return smgrsw[reln->smgr_which].smgr_fd(reln, forknum, blocknum, off);
+}
+
 /*
  * AtEOXact_SMgr
  *
@@ -847,3 +903,73 @@ ProcessBarrierSmgrRelease(void)
 	smgrreleaseall();
 	return true;
 }
+
+void
+pgaio_io_set_target_smgr(PgAioHandle *ioh,
+						 struct SMgrRelationData *smgr,
+						 ForkNumber forknum,
+						 BlockNumber blocknum,
+						 int nblocks,
+						 bool skip_fsync)
+{
+	PgAioTargetData *sd = pgaio_io_get_target_data(ioh);
+
+	pgaio_io_set_target(ioh, PGAIO_TID_SMGR);
+
+	/* backend is implied via IO owner */
+	sd->smgr.rlocator = smgr->smgr_rlocator.locator;
+	sd->smgr.forkNum = forknum;
+	sd->smgr.blockNum = blocknum;
+	sd->smgr.nblocks = nblocks;
+	sd->smgr.is_temp = SmgrIsTemp(smgr);
+	sd->smgr.release_lock = false;
+	/* Temp relations should never be fsync'd */
+	sd->smgr.skip_fsync = skip_fsync && !SmgrIsTemp(smgr);
+	sd->smgr.mode = RBM_NORMAL;
+}
+
+static void
+smgr_aio_reopen(PgAioHandle *ioh)
+{
+	PgAioTargetData *sd = pgaio_io_get_target_data(ioh);
+	PgAioOpData *od = pgaio_io_get_op_data(ioh);
+	SMgrRelation reln;
+	ProcNumber	procno;
+	uint32		off;
+
+	if (sd->smgr.is_temp)
+		procno = pgaio_io_get_owner(ioh);
+	else
+		procno = INVALID_PROC_NUMBER;
+
+	reln = smgropen(sd->smgr.rlocator, procno);
+	od->read.fd = smgrfd(reln, sd->smgr.forkNum, sd->smgr.blockNum, &off);
+	Assert(off == od->read.offset);
+}
+
+static char *
+smgr_aio_describe_identity(const PgAioTargetData *sd)
+{
+	char	   *path;
+	char	   *desc;
+
+	path = relpathbackend(sd->smgr.rlocator,
+						  sd->smgr.is_temp ? MyProcNumber : INVALID_PROC_NUMBER,
+						  sd->smgr.forkNum);
+
+	if (sd->smgr.nblocks == 0)
+		desc = psprintf(_("file \"%s\""), path);
+	else if (sd->smgr.nblocks == 1)
+		desc = psprintf(_("block %u in file \"%s\""),
+						sd->smgr.blockNum,
+						path);
+	else
+		desc = psprintf(_("blocks %u..%u in file \"%s\""),
+						sd->smgr.blockNum,
+						sd->smgr.blockNum + sd->smgr.nblocks - 1,
+						path);
+
+	pfree(path);
+
+	return desc;
+}
-- 
2.48.1.76.g4e746b1a31.dirty

