From bfabee80635269c0950864d6cd2cf59d613f2d21 Mon Sep 17 00:00:00 2001
From: Michael Paquier <michael@paquier.xyz>
Date: Mon, 25 Jun 2018 16:15:03 +0900
Subject: [PATCH 2/2] Add interface to read/write/fsync with transient files

The following set of routines gets added for the manipulation of
transient files:
void WriteTransientFile(int fd, char *buf, Size count, int elevel,
    const char *filename, uint32 wait_event_info);
bool ReadTransientFile(int fd, char *buf, Size count, int elevel,
    const char *filename, uint32 wait_event_info);
void SyncTransientFile(int fd, int elevel, const char *filename,
    uint32 wait_event_info);

This simplifies code related to replication slots, 2PC files, relation
mapper files and snapshot builds:
- Centralize errno handling for transient files with ENOSPC for write(2)
and read count for read(2)
- Wait events have to be defined, so those would unlikely get forgotten
in the future.
- Error handling for CloseTransientFile in code paths is centralized.
---
 src/backend/access/transam/twophase.c       |  25 +----
 src/backend/replication/logical/snapbuild.c | 110 ++------------------
 src/backend/replication/slot.c              |  46 +-------
 src/backend/storage/file/fd.c               |  97 ++++++++++++++++-
 src/backend/utils/cache/relmapper.c         |  40 ++-----
 src/include/storage/fd.h                    |  10 +-
 6 files changed, 128 insertions(+), 200 deletions(-)

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 0c99b33664..557261fc31 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1219,7 +1219,6 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
 	uint32		crc_offset;
 	pg_crc32c	calc_crc,
 				file_crc;
-	int			r;
 
 	TwoPhaseFilePath(path, xid);
 
@@ -1275,28 +1274,10 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
 	 */
 	buf = (char *) palloc(stat.st_size);
 
-	pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_READ);
-	r = read(fd, buf, stat.st_size);
-	if (r != stat.st_size)
+	if (!ReadTransientFile(fd, buf, stat.st_size,
+						   give_warnings ? WARNING : DEBUG3, path,
+						   WAIT_EVENT_TWOPHASE_FILE_READ))
 	{
-		int			save_errno = errno;
-
-		pgstat_report_wait_end();
-		CloseTransientFile(fd);
-		if (give_warnings)
-		{
-			if (r < 0)
-			{
-				errno = save_errno;
-				ereport(WARNING,
-						(errcode_for_file_access(),
-						 errmsg("could not read file \"%s\": %m", path)));
-			}
-			else
-				ereport(WARNING,
-						(errmsg("could not read file \"%s\": read %d of %zu",
-								path, r, stat.st_size)));
-		}
 		pfree(buf);
 		return NULL;
 	}
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 61bc9e8f14..05b74a61a3 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -1609,20 +1609,8 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
 		ereport(ERROR,
 				(errmsg("could not open file \"%s\": %m", path)));
 
-	pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_WRITE);
-	if ((write(fd, ondisk, needed_length)) != needed_length)
-	{
-		int			save_errno = errno;
-
-		CloseTransientFile(fd);
-
-		/* if write didn't set errno, assume problem is no disk space */
-		errno = save_errno ? save_errno : ENOSPC;
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not write to file \"%s\": %m", tmppath)));
-	}
-	pgstat_report_wait_end();
+	WriteTransientFile(fd, (char *) ondisk, needed_length, ERROR, tmppath,
+					   WAIT_EVENT_SNAPBUILD_WRITE);
 
 	/*
 	 * fsync the file before renaming so that even if we crash after this we
@@ -1686,7 +1674,6 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
 	int			fd;
 	char		path[MAXPGPATH];
 	Size		sz;
-	int			readBytes;
 	pg_crc32c	checksum;
 
 	/* no point in loading a snapshot if we're already there */
@@ -1716,29 +1703,9 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
 	fsync_fname(path, false);
 	fsync_fname("pg_logical/snapshots", true);
 
-
 	/* read statically sized portion of snapshot */
-	pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
-	readBytes = read(fd, &ondisk, SnapBuildOnDiskConstantSize);
-	pgstat_report_wait_end();
-	if (readBytes != SnapBuildOnDiskConstantSize)
-	{
-		int			save_errno = errno;
-
-		CloseTransientFile(fd);
-
-		if (readBytes < 0)
-		{
-			errno = save_errno;
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read file \"%s\": %m", path)));
-		}
-		else
-			ereport(ERROR,
-					(errmsg("could not read file \"%s\": read %d of %zu",
-							path, readBytes, SnapBuildOnDiskConstantSize)));
-	}
+	(void) ReadTransientFile(fd, (char *) &ondisk, SnapBuildOnDiskConstantSize,
+					  ERROR, path, WAIT_EVENT_SNAPBUILD_READ);
 
 	if (ondisk.magic != SNAPBUILD_MAGIC)
 		ereport(ERROR,
@@ -1756,80 +1723,23 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
 				SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
 
 	/* read SnapBuild */
-	pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
-	readBytes = read(fd, &ondisk.builder, sizeof(SnapBuild));
-	pgstat_report_wait_end();
-	if (readBytes != sizeof(SnapBuild))
-	{
-		int			save_errno = errno;
-
-		CloseTransientFile(fd);
-
-		if (readBytes < 0)
-		{
-			errno = save_errno;
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read file \"%s\": %m", path)));
-		}
-		else
-			ereport(ERROR,
-					(errmsg("could not read file \"%s\": read %d of %zu",
-							path, readBytes, sizeof(SnapBuild))));
-	}
+	(void) ReadTransientFile(fd, (char *) &ondisk.builder, sizeof(SnapBuild),
+					  ERROR, path, WAIT_EVENT_SNAPBUILD_READ);
 	COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild));
 
 	/* restore running xacts (dead, but kept for backward compat) */
 	sz = sizeof(TransactionId) * ondisk.builder.was_running.was_xcnt_space;
 	ondisk.builder.was_running.was_xip =
 		MemoryContextAllocZero(builder->context, sz);
-	pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
-	readBytes = read(fd, ondisk.builder.was_running.was_xip, sz);
-	pgstat_report_wait_end();
-	if (readBytes != sz)
-	{
-		int			save_errno = errno;
-
-		CloseTransientFile(fd);
-
-		if (readBytes < 0)
-		{
-			errno = save_errno;
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read file \"%s\": %m", path)));
-		}
-		else
-			ereport(ERROR,
-					(errmsg("could not read file \"%s\": read %d of %zu",
-							path, readBytes, sz)));
-	}
+	(void) ReadTransientFile(fd, (char *) ondisk.builder.was_running.was_xip, sz,
+					  ERROR, path, WAIT_EVENT_SNAPBUILD_READ);
 	COMP_CRC32C(checksum, ondisk.builder.was_running.was_xip, sz);
 
 	/* restore committed xacts information */
 	sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
 	ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz);
-	pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
-	readBytes = read(fd, ondisk.builder.committed.xip, sz);
-	pgstat_report_wait_end();
-	if (readBytes != sz)
-	{
-		int			save_errno = errno;
-
-		CloseTransientFile(fd);
-
-		if (readBytes < 0)
-		{
-			errno = save_errno;
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read file \"%s\": %m", path)));
-		}
-		else
-			ereport(ERROR,
-					(errmsg("could not read file \"%s\": read %d of %zu",
-							path, readBytes, sz)));
-	}
+	(void) ReadTransientFile(fd, (char *) ondisk.builder.committed.xip, sz,
+					  ERROR, path, WAIT_EVENT_SNAPBUILD_READ);
 	COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz);
 
 	CloseTransientFile(fd);
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index afbaf8c80d..818b13e973 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1354,7 +1354,6 @@ RestoreSlotFromDisk(const char *name)
 	char		path[MAXPGPATH + 22];
 	int			fd;
 	bool		restored = false;
-	int			readBytes;
 	pg_crc32c	checksum;
 
 	/* no need to lock here, no concurrent access allowed yet */
@@ -1405,25 +1404,8 @@ RestoreSlotFromDisk(const char *name)
 	END_CRIT_SECTION();
 
 	/* read part of statefile that's guaranteed to be version independent */
-	pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
-	readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
-	pgstat_report_wait_end();
-	if (readBytes != ReplicationSlotOnDiskConstantSize)
-	{
-		int			saved_errno = errno;
-
-		CloseTransientFile(fd);
-		errno = saved_errno;
-		if (readBytes < 0)
-			ereport(PANIC,
-					(errcode_for_file_access(),
-					 errmsg("could not read file \"%s\": %m", path)));
-		else
-			ereport(PANIC,
-					(errmsg("could not read file \"%s\": read %d of %zu",
-							path, readBytes,
-							ReplicationSlotOnDiskConstantSize)));
-	}
+	(void) ReadTransientFile(fd, (char *) &cp, ReplicationSlotOnDiskConstantSize,
+							 PANIC, path, WAIT_EVENT_REPLICATION_SLOT_READ);
 
 	/* verify magic */
 	if (cp.magic != SLOT_MAGIC)
@@ -1447,27 +1429,9 @@ RestoreSlotFromDisk(const char *name)
 						path, cp.length)));
 
 	/* Now that we know the size, read the entire file */
-	pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
-	readBytes = read(fd,
-					 (char *) &cp + ReplicationSlotOnDiskConstantSize,
-					 cp.length);
-	pgstat_report_wait_end();
-	if (readBytes != cp.length)
-	{
-		int			saved_errno = errno;
-
-		CloseTransientFile(fd);
-		errno = saved_errno;
-		if (readBytes < 0)
-			ereport(PANIC,
-					(errcode_for_file_access(),
-					 errmsg("could not read file \"%s\": %m", path)));
-		else
-			ereport(PANIC,
-					(errmsg("could not read file \"%s\": read %d of %zu",
-							path, readBytes, (Size) cp.length)));
-	}
-
+	(void) ReadTransientFile(fd, (char *) &cp + ReplicationSlotOnDiskConstantSize,
+							 cp.length, PANIC, path,
+							 WAIT_EVENT_REPLICATION_SLOT_READ);
 	CloseTransientFile(fd);
 
 	/* now verify the CRC */
diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c
index 8dd51f1767..fba7774ddc 100644
--- a/src/backend/storage/file/fd.c
+++ b/src/backend/storage/file/fd.c
@@ -47,8 +47,9 @@
  * ownership mechanism that provides automatic cleanup for shared files when
  * the last of a group of backends detaches.
  *
- * AllocateFile, AllocateDir, OpenPipeStream and OpenTransientFile are
- * wrappers around fopen(3), opendir(3), popen(3) and open(2), respectively.
+ * AllocateFile, AllocateDir, OpenPipeStream, OpenTransientFile,
+ * WriteTransientFile and ReadTransientFile are wrappers around fopen(3),
+ * opendir(3), popen(3), open(2), write(2) and read(2) respectively.
  * They behave like the corresponding native functions, except that the handle
  * is registered with the current subtransaction, and will be automatically
  * closed at abort. These are intended mainly for short operations like
@@ -2480,6 +2481,98 @@ TryAgain:
 	return NULL;
 }
 
+/*
+ * Write to a file which has been opened using OpenTransientFile or
+ * OpenTransientFilePerm.  Equivalent to write(2).
+ */
+void
+WriteTransientFile(int fd, char *buf, Size count, int elevel,
+				   const char *filename, uint32 wait_event_info)
+{
+	int			r;
+
+	pgstat_report_wait_start(wait_event_info);
+	r = write(fd, buf, count);
+	pgstat_report_wait_end();
+
+	if (r != count)
+	{
+		int         save_errno = errno;
+
+		(void) CloseTransientFile(fd);
+
+		/* if write didn't set errno, assume problem is no disk space */
+		errno = save_errno ? save_errno : ENOSPC;
+		ereport(elevel,
+				(errcode_for_file_access(),
+				 errmsg("could not write to file \"%s\": %m", filename)));
+	}
+}
+
+/*
+ * Read from a file which has been opened using OpenTransientFile or
+ * OpenTransientFilePerm.  Equivalent to read(2).  Returns true on
+ * success and false on failure.
+ */
+bool
+ReadTransientFile(int fd, char *buf, Size count, int elevel,
+				  const char *filename, uint32 wait_event_info)
+{
+	int			r;
+
+	pgstat_report_wait_start(wait_event_info);
+	r = read(fd, buf, count);
+	pgstat_report_wait_end();
+
+	if (r != count)
+	{
+		int         save_errno = errno;
+
+		CloseTransientFile(fd);
+
+		if (r < 0)
+		{
+			errno = save_errno;
+			ereport(elevel,
+					(errcode_for_file_access(),
+					 errmsg("could not read file \"%s\": %m", filename)));
+		}
+		else
+			ereport(elevel,
+					(errmsg("could not read file \"%s\": read %d of %zu",
+							filename, r, count)));
+		return false;
+	}
+
+	return true;
+}
+
+/*
+ * Write to a file which has been opened using OpenTransientFile or
+ * OpenTransientFilePerm.  Equivalent to fsync(2).
+ */
+void
+SyncTransientFile(int fd, int elevel, const char *filename,
+				  uint32 wait_event_info)
+{
+	int			status;
+
+	pgstat_report_wait_start(wait_event_info);
+	status = pg_fsync(fd);
+	pgstat_report_wait_end();
+
+	if (status != 0)
+	{
+		int         save_errno = errno;
+
+		(void) CloseTransientFile(fd);
+		errno = save_errno;
+		ereport(elevel,
+				(errcode_for_file_access(),
+				 errmsg("could not fsync file \"%s\": %m", filename)));
+	}
+}
+
 /*
  * Free an AllocateDesc of any type.
  *
diff --git a/src/backend/utils/cache/relmapper.c b/src/backend/utils/cache/relmapper.c
index 2d31f9f912..e6eff58d40 100644
--- a/src/backend/utils/cache/relmapper.c
+++ b/src/backend/utils/cache/relmapper.c
@@ -629,7 +629,6 @@ load_relmap_file(bool shared)
 	char		mapfilename[MAXPGPATH];
 	pg_crc32c	crc;
 	int			fd;
-	int			r;
 
 	if (shared)
 	{
@@ -659,20 +658,8 @@ load_relmap_file(bool shared)
 	 * look, the sinval signaling mechanism will make us re-read it before we
 	 * are able to access any relation that's affected by the change.
 	 */
-	pgstat_report_wait_start(WAIT_EVENT_RELATION_MAP_READ);
-	r = read(fd, map, sizeof(RelMapFile));
-	if (r != sizeof(RelMapFile))
-	{
-		if (r < 0)
-			ereport(FATAL,
-					(errcode_for_file_access(),
-					 errmsg("could not read file \"%s\": %m", mapfilename)));
-		else
-			ereport(FATAL,
-					(errmsg("could not read file \"%s\": read %d of %zu",
-							mapfilename, r, sizeof(RelMapFile))));
-	}
-	pgstat_report_wait_end();
+	(void) ReadTransientFile(fd, (char *) map, sizeof(RelMapFile), FATAL,
+							 mapfilename, WAIT_EVENT_RELATION_MAP_READ);
 
 	CloseTransientFile(fd);
 
@@ -782,18 +769,9 @@ write_relmap_file(bool shared, RelMapFile *newmap,
 	}
 
 	errno = 0;
-	pgstat_report_wait_start(WAIT_EVENT_RELATION_MAP_WRITE);
-	if (write(fd, newmap, sizeof(RelMapFile)) != sizeof(RelMapFile))
-	{
-		/* if write didn't set errno, assume problem is no disk space */
-		if (errno == 0)
-			errno = ENOSPC;
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not write file \"%s\": %m",
-						mapfilename)));
-	}
-	pgstat_report_wait_end();
+
+	WriteTransientFile(fd, (char *) newmap, sizeof(RelMapFile), ERROR,
+					   mapfilename, WAIT_EVENT_RELATION_MAP_WRITE);
 
 	/*
 	 * We choose to fsync the data to disk before considering the task done.
@@ -801,13 +779,7 @@ write_relmap_file(bool shared, RelMapFile *newmap,
 	 * issue, but it would complicate checkpointing --- see notes for
 	 * CheckPointRelationMap.
 	 */
-	pgstat_report_wait_start(WAIT_EVENT_RELATION_MAP_SYNC);
-	if (pg_fsync(fd) != 0)
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not fsync file \"%s\": %m",
-						mapfilename)));
-	pgstat_report_wait_end();
+	SyncTransientFile(fd, ERROR, mapfilename, WAIT_EVENT_RELATION_MAP_SYNC);
 
 	if (CloseTransientFile(fd))
 		ereport(ERROR,
diff --git a/src/include/storage/fd.h b/src/include/storage/fd.h
index 8e7c9728f4..4309d9da95 100644
--- a/src/include/storage/fd.h
+++ b/src/include/storage/fd.h
@@ -34,7 +34,9 @@
  *
  * Likewise, use AllocateDir/FreeDir, not opendir/closedir, to allocate
  * open directories (DIR*), and OpenTransientFile/CloseTransient File for an
- * unbuffered file descriptor.
+ * unbuffered file descriptor.  WriteTransientFile should be used instead
+ * of write(2), ReadTransientFile instead of read(2), and SyncTransientFile
+ * instead of fsync(2).
  */
 #ifndef FD_H
 #define FD_H
@@ -105,6 +107,12 @@ extern int	FreeDir(DIR *dir);
 /* Operations to allow use of a plain kernel FD, with automatic cleanup */
 extern int	OpenTransientFile(const char *fileName, int fileFlags);
 extern int	OpenTransientFilePerm(const char *fileName, int fileFlags, mode_t fileMode);
+extern void WriteTransientFile(int fd, char *buf, Size count, int elevel,
+							   const char *filename, uint32 wait_event_info);
+extern bool ReadTransientFile(int fd, char *buf, Size count, int elevel,
+							  const char *filename, uint32 wait_event_info);
+extern void SyncTransientFile(int fd, int elevel,  const char *filename,
+							  uint32 wait_event_info);
 extern int	CloseTransientFile(int fd);
 
 /* If you've really really gotta have a plain kernel FD, use this */
-- 
2.18.0

