From 1c99e98386a763de70326e96fb9b7cfa72373e5f Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Fri, 18 May 2018 13:05:42 -0700
Subject: [PATCH v1 7/7] Heavily-WIP: Send file descriptors to checkpointer for
 fsyncing.

This addresses the issue that, at least on linux, fsyncs only reliably
see errors that occurred after they've been opeend.

Author:
Reviewed-By:
Discussion: https://postgr.es/m/
Backpatch:
---
 src/backend/access/transam/xlog.c     |   7 +-
 src/backend/postmaster/checkpointer.c | 358 +++++++----------
 src/backend/postmaster/postmaster.c   |  38 ++
 src/backend/storage/smgr/md.c         | 545 ++++++++++++++++----------
 src/include/postmaster/bgwriter.h     |   8 +-
 src/include/postmaster/postmaster.h   |   5 +
 src/include/storage/smgr.h            |   3 +-
 7 files changed, 542 insertions(+), 422 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index adbd6a21264..427774152eb 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -8634,8 +8634,10 @@ CreateCheckPoint(int flags)
 	 * Note: because it is possible for log_checkpoints to change while a
 	 * checkpoint proceeds, we always accumulate stats, even if
 	 * log_checkpoints is currently off.
+	 *
+	 * Note #2: this is reset at the end of the checkpoint, not here, because
+	 * we might have to fsync before getting here (see mdsync()).
 	 */
-	MemSet(&CheckpointStats, 0, sizeof(CheckpointStats));
 	CheckpointStats.ckpt_start_t = GetCurrentTimestamp();
 
 	/*
@@ -8999,6 +9001,9 @@ CreateCheckPoint(int flags)
 									 CheckpointStats.ckpt_segs_recycled);
 
 	LWLockRelease(CheckpointLock);
+
+	/* reset stats */
+	MemSet(&CheckpointStats, 0, sizeof(CheckpointStats));
 }
 
 /*
diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c
index 333eb91c9de..1bce610336a 100644
--- a/src/backend/postmaster/checkpointer.c
+++ b/src/backend/postmaster/checkpointer.c
@@ -48,6 +48,7 @@
 #include "pgstat.h"
 #include "port/atomics.h"
 #include "postmaster/bgwriter.h"
+#include "postmaster/postmaster.h"
 #include "replication/syncrep.h"
 #include "storage/bufmgr.h"
 #include "storage/condition_variable.h"
@@ -102,19 +103,21 @@
  *
  * The requests array holds fsync requests sent by backends and not yet
  * absorbed by the checkpointer.
- *
- * Unlike the checkpoint fields, num_backend_writes, num_backend_fsync, and
- * the requests fields are protected by CheckpointerCommLock.
  *----------
  */
 typedef struct
 {
+	uint32		type;
 	RelFileNode rnode;
 	ForkNumber	forknum;
 	BlockNumber segno;			/* see md.c for special values */
+	bool		contains_fd;
 	/* might add a real request-type field later; not needed yet */
 } CheckpointerRequest;
 
+#define CKPT_REQUEST_RNODE			1
+#define CKPT_REQUEST_SYN			2
+
 typedef struct
 {
 	pid_t		checkpointer_pid;	/* PID (0 if not started) */
@@ -131,8 +134,6 @@ typedef struct
 	pg_atomic_uint32 num_backend_fsync;	/* counts user backend fsync calls */
 	pg_atomic_uint32 ckpt_cycle; /* cycle */
 
-	int			num_requests;	/* current # of requests */
-	int			max_requests;	/* allocated array size */
 	CheckpointerRequest requests[FLEXIBLE_ARRAY_MEMBER];
 } CheckpointerShmemStruct;
 
@@ -168,13 +169,17 @@ static double ckpt_cached_elapsed;
 static pg_time_t last_checkpoint_time;
 static pg_time_t last_xlog_switch_time;
 
+static BlockNumber next_syn_rqst;
+static BlockNumber received_syn_rqst;
+
 /* Prototypes for private functions */
 
 static void CheckArchiveTimeout(void);
 static bool IsCheckpointOnSchedule(double progress);
 static bool ImmediateCheckpointRequested(void);
-static bool CompactCheckpointerRequestQueue(void);
 static void UpdateSharedMemoryConfig(void);
+static void SendFsyncRequest(CheckpointerRequest *request, int fd);
+static bool AbsorbFsyncRequest(void);
 
 /* Signal handlers */
 
@@ -557,10 +562,11 @@ CheckpointerMain(void)
 			cur_timeout = Min(cur_timeout, XLogArchiveTimeout - elapsed_secs);
 		}
 
-		rc = WaitLatch(MyLatch,
-					   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
-					   cur_timeout * 1000L /* convert to ms */ ,
-					   WAIT_EVENT_CHECKPOINTER_MAIN);
+		rc = WaitLatchOrSocket(MyLatch,
+							   WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH | WL_SOCKET_READABLE,
+							   fsync_fds[FSYNC_FD_PROCESS],
+							   cur_timeout * 1000L /* convert to ms */ ,
+							   WAIT_EVENT_CHECKPOINTER_MAIN);
 
 		/*
 		 * Emergency bailout if postmaster has died.  This is to avoid the
@@ -910,12 +916,7 @@ CheckpointerShmemSize(void)
 {
 	Size		size;
 
-	/*
-	 * Currently, the size of the requests[] array is arbitrarily set equal to
-	 * NBuffers.  This may prove too large or small ...
-	 */
 	size = offsetof(CheckpointerShmemStruct, requests);
-	size = add_size(size, mul_size(NBuffers, sizeof(CheckpointerRequest)));
 
 	return size;
 }
@@ -938,13 +939,10 @@ CheckpointerShmemInit(void)
 	if (!found)
 	{
 		/*
-		 * First time through, so initialize.  Note that we zero the whole
-		 * requests array; this is so that CompactCheckpointerRequestQueue can
-		 * assume that any pad bytes in the request structs are zeroes.
+		 * First time through, so initialize.
 		 */
 		MemSet(CheckpointerShmem, 0, size);
 		SpinLockInit(&CheckpointerShmem->ckpt_lck);
-		CheckpointerShmem->max_requests = NBuffers;
 		pg_atomic_init_u32(&CheckpointerShmem->ckpt_cycle, 0);
 		pg_atomic_init_u32(&CheckpointerShmem->num_backend_writes, 0);
 		pg_atomic_init_u32(&CheckpointerShmem->num_backend_fsync, 0);
@@ -1124,176 +1122,61 @@ RequestCheckpoint(int flags)
  * the queue is full and contains no duplicate entries.  In that case, we
  * let the backend know by returning false.
  */
-bool
-ForwardFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
+void
+ForwardFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno,
+					File file)
 {
-	CheckpointerRequest *request;
-	bool		too_full;
+	CheckpointerRequest request = {0};
 
 	if (!IsUnderPostmaster)
-		return false;			/* probably shouldn't even get here */
+		elog(ERROR, "ForwardFsyncRequest must not be called in single user mode");
 
 	if (AmCheckpointerProcess())
 		elog(ERROR, "ForwardFsyncRequest must not be called in checkpointer");
 
-	LWLockAcquire(CheckpointerCommLock, LW_EXCLUSIVE);
+	request.type = CKPT_REQUEST_RNODE;
+	request.rnode = rnode;
+	request.forknum = forknum;
+	request.segno = segno;
+	request.contains_fd = file != -1;
 
-	/*
-	 * If the checkpointer isn't running or the request queue is full, the
-	 * backend will have to perform its own fsync request.  But before forcing
-	 * that to happen, we can try to compact the request queue.
-	 */
-	if (CheckpointerShmem->checkpointer_pid == 0 ||
-		(CheckpointerShmem->num_requests >= CheckpointerShmem->max_requests &&
-		 !CompactCheckpointerRequestQueue()))
-	{
-		/*
-		 * Count the subset of writes where backends have to do their own
-		 * fsync
-		 */
-		if (!AmBackgroundWriterProcess())
-			pg_atomic_fetch_add_u32(&CheckpointerShmem->num_backend_fsync, 1);
-		LWLockRelease(CheckpointerCommLock);
-		return false;
-	}
-
-	/* OK, insert request */
-	request = &CheckpointerShmem->requests[CheckpointerShmem->num_requests++];
-	request->rnode = rnode;
-	request->forknum = forknum;
-	request->segno = segno;
-
-	/* If queue is more than half full, nudge the checkpointer to empty it */
-	too_full = (CheckpointerShmem->num_requests >=
-				CheckpointerShmem->max_requests / 2);
-
-	LWLockRelease(CheckpointerCommLock);
-
-	/* ... but not till after we release the lock */
-	if (too_full && ProcGlobal->checkpointerLatch)
-		SetLatch(ProcGlobal->checkpointerLatch);
-
-	return true;
-}
-
-/*
- * CompactCheckpointerRequestQueue
- *		Remove duplicates from the request queue to avoid backend fsyncs.
- *		Returns "true" if any entries were removed.
- *
- * Although a full fsync request queue is not common, it can lead to severe
- * performance problems when it does happen.  So far, this situation has
- * only been observed to occur when the system is under heavy write load,
- * and especially during the "sync" phase of a checkpoint.  Without this
- * logic, each backend begins doing an fsync for every block written, which
- * gets very expensive and can slow down the whole system.
- *
- * Trying to do this every time the queue is full could lose if there
- * aren't any removable entries.  But that should be vanishingly rare in
- * practice: there's one queue entry per shared buffer.
- */
-static bool
-CompactCheckpointerRequestQueue(void)
-{
-	struct CheckpointerSlotMapping
-	{
-		CheckpointerRequest request;
-		int			slot;
-	};
-
-	int			n,
-				preserve_count;
-	int			num_skipped = 0;
-	HASHCTL		ctl;
-	HTAB	   *htab;
-	bool	   *skip_slot;
-
-	/* must hold CheckpointerCommLock in exclusive mode */
-	Assert(LWLockHeldByMe(CheckpointerCommLock));
-
-	/* Initialize skip_slot array */
-	skip_slot = palloc0(sizeof(bool) * CheckpointerShmem->num_requests);
-
-	/* Initialize temporary hash table */
-	MemSet(&ctl, 0, sizeof(ctl));
-	ctl.keysize = sizeof(CheckpointerRequest);
-	ctl.entrysize = sizeof(struct CheckpointerSlotMapping);
-	ctl.hcxt = CurrentMemoryContext;
-
-	htab = hash_create("CompactCheckpointerRequestQueue",
-					   CheckpointerShmem->num_requests,
-					   &ctl,
-					   HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
-
-	/*
-	 * The basic idea here is that a request can be skipped if it's followed
-	 * by a later, identical request.  It might seem more sensible to work
-	 * backwards from the end of the queue and check whether a request is
-	 * *preceded* by an earlier, identical request, in the hopes of doing less
-	 * copying.  But that might change the semantics, if there's an
-	 * intervening FORGET_RELATION_FSYNC or FORGET_DATABASE_FSYNC request, so
-	 * we do it this way.  It would be possible to be even smarter if we made
-	 * the code below understand the specific semantics of such requests (it
-	 * could blow away preceding entries that would end up being canceled
-	 * anyhow), but it's not clear that the extra complexity would buy us
-	 * anything.
-	 */
-	for (n = 0; n < CheckpointerShmem->num_requests; n++)
-	{
-		CheckpointerRequest *request;
-		struct CheckpointerSlotMapping *slotmap;
-		bool		found;
-
-		/*
-		 * We use the request struct directly as a hashtable key.  This
-		 * assumes that any padding bytes in the structs are consistently the
-		 * same, which should be okay because we zeroed them in
-		 * CheckpointerShmemInit.  Note also that RelFileNode had better
-		 * contain no pad bytes.
-		 */
-		request = &CheckpointerShmem->requests[n];
-		slotmap = hash_search(htab, request, HASH_ENTER, &found);
-		if (found)
-		{
-			/* Duplicate, so mark the previous occurrence as skippable */
-			skip_slot[slotmap->slot] = true;
-			num_skipped++;
-		}
-		/* Remember slot containing latest occurrence of this request value */
-		slotmap->slot = n;
-	}
-
-	/* Done with the hash table. */
-	hash_destroy(htab);
-
-	/* If no duplicates, we're out of luck. */
-	if (!num_skipped)
-	{
-		pfree(skip_slot);
-		return false;
-	}
-
-	/* We found some duplicates; remove them. */
-	preserve_count = 0;
-	for (n = 0; n < CheckpointerShmem->num_requests; n++)
-	{
-		if (skip_slot[n])
-			continue;
-		CheckpointerShmem->requests[preserve_count++] = CheckpointerShmem->requests[n];
-	}
-	ereport(DEBUG1,
-			(errmsg("compacted fsync request queue from %d entries to %d entries",
-					CheckpointerShmem->num_requests, preserve_count)));
-	CheckpointerShmem->num_requests = preserve_count;
-
-	/* Cleanup. */
-	pfree(skip_slot);
-	return true;
+	SendFsyncRequest(&request, request.contains_fd ? FileGetRawDesc(file) : -1);
 }
 
 /*
  * AbsorbFsyncRequests
- *		Retrieve queued fsync requests and pass them to local smgr.
+ *		Retrieve queued fsync requests and pass them to local smgr. Stop when
+ *		resources would be exhausted by absorbing more.
+ *
+ * This is exported because we want to continue accepting requests during
+ * mdsync().
+ */
+void
+AbsorbFsyncRequests(void)
+{
+	if (!AmCheckpointerProcess())
+		return;
+
+	/* Transfer stats counts into pending pgstats message */
+	BgWriterStats.m_buf_written_backend +=
+		pg_atomic_exchange_u32(&CheckpointerShmem->num_backend_writes, 0);
+	BgWriterStats.m_buf_fsync_backend +=
+		pg_atomic_exchange_u32(&CheckpointerShmem->num_backend_fsync, 0);
+
+	while (true)
+	{
+		if (!FlushFsyncRequestQueueIfNecessary())
+			break;
+
+		if (!AbsorbFsyncRequest())
+			break;
+	}
+}
+
+/*
+ * AbsorbAllFsyncRequests
+ *		Retrieve all already pending fsync requests and pass them to local
+ *		smgr.
  *
  * This is exported because it must be called during CreateCheckPoint;
  * we have to be sure we have accepted all pending requests just before
@@ -1301,17 +1184,13 @@ CompactCheckpointerRequestQueue(void)
  * non-checkpointer processes, do nothing if not checkpointer.
  */
 void
-AbsorbFsyncRequests(void)
+AbsorbAllFsyncRequests(void)
 {
-	CheckpointerRequest *requests = NULL;
-	CheckpointerRequest *request;
-	int			n;
+	CheckpointerRequest request = {0};
 
 	if (!AmCheckpointerProcess())
 		return;
 
-	LWLockAcquire(CheckpointerCommLock, LW_EXCLUSIVE);
-
 	/* Transfer stats counts into pending pgstats message */
 	BgWriterStats.m_buf_written_backend +=
 		pg_atomic_exchange_u32(&CheckpointerShmem->num_backend_writes, 0);
@@ -1319,35 +1198,61 @@ AbsorbFsyncRequests(void)
 		pg_atomic_exchange_u32(&CheckpointerShmem->num_backend_fsync, 0);
 
 	/*
-	 * We try to avoid holding the lock for a long time by copying the request
-	 * array, and processing the requests after releasing the lock.
-	 *
-	 * Once we have cleared the requests from shared memory, we have to PANIC
-	 * if we then fail to absorb them (eg, because our hashtable runs out of
-	 * memory).  This is because the system cannot run safely if we are unable
-	 * to fsync what we have been told to fsync.  Fortunately, the hashtable
-	 * is so small that the problem is quite unlikely to arise in practice.
+	 * For mdsync()'s guarantees to work, all pending fsync requests need to
+	 * be executed. But we don't want to absorb requests till the queue is
+	 * empty, as that could take a long while.  So instead we enqueue
 	 */
-	n = CheckpointerShmem->num_requests;
-	if (n > 0)
+	request.type = CKPT_REQUEST_SYN;
+	request.segno = ++next_syn_rqst;
+	SendFsyncRequest(&request, -1);
+
+	received_syn_rqst = next_syn_rqst + 1;
+	while (received_syn_rqst != request.segno)
 	{
-		requests = (CheckpointerRequest *) palloc(n * sizeof(CheckpointerRequest));
-		memcpy(requests, CheckpointerShmem->requests, n * sizeof(CheckpointerRequest));
+		if (!FlushFsyncRequestQueueIfNecessary())
+			elog(FATAL, "may not happen");
+
+		if (!AbsorbFsyncRequest())
+			break;
+	}
+}
+
+/*
+ * AbsorbFsyncRequest
+ *		Retrieve one queued fsync request and pass them to local smgr.
+ */
+static bool
+AbsorbFsyncRequest(void)
+{
+	CheckpointerRequest req;
+	int fd;
+	int ret;
+
+	/* FIXME, this should be a critical section */
+	ReserveTransientFile();
+
+	ret = pg_uds_recv_with_fd(fsync_fds[FSYNC_FD_PROCESS], &req, sizeof(req), &fd);
+	if (ret < 0 && (errno == EWOULDBLOCK || errno == EAGAIN))
+		return false;
+	else if (ret < 0)
+		elog(FATAL, "recvmsg failed: %m");
+
+	if (req.contains_fd != (fd != -1))
+	{
+		elog(FATAL, "message should have fd associated, but doesn't");
 	}
 
-	START_CRIT_SECTION();
+	if (req.type == CKPT_REQUEST_SYN)
+	{
+		received_syn_rqst = req.segno;
+		Assert(fd == -1);
+	}
+	else
+	{
+		RememberFsyncRequest(req.rnode, req.forknum, req.segno, fd);
+	}
 
-	CheckpointerShmem->num_requests = 0;
-
-	LWLockRelease(CheckpointerCommLock);
-
-	for (request = requests; n > 0; request++, n--)
-		RememberFsyncRequest(request->rnode, request->forknum, request->segno);
-
-	END_CRIT_SECTION();
-
-	if (requests)
-		pfree(requests);
+	return true;
 }
 
 /*
@@ -1402,3 +1307,42 @@ IncCheckpointSyncCycle(void)
 {
 	return pg_atomic_fetch_add_u32(&CheckpointerShmem->ckpt_cycle, 1);
 }
+
+void
+CountBackendWrite(void)
+{
+	pg_atomic_fetch_add_u32(&CheckpointerShmem->num_backend_writes, 1);
+}
+
+static void
+SendFsyncRequest(CheckpointerRequest *request, int fd)
+{
+	ssize_t ret;
+
+	while (true)
+	{
+		ret = pg_uds_send_with_fd(fsync_fds[FSYNC_FD_SUBMIT], request, sizeof(*request),
+								  request->contains_fd ? fd : -1);
+
+		if (ret >= 0)
+		{
+			/*
+			 * Don't think short reads will ever happen in realistic
+			 * implementations, but better make sure that's true...
+			 */
+			if (ret != sizeof(*request))
+				elog(FATAL, "oops, gotta do better");
+			break;
+		}
+		else if (errno == EWOULDBLOCK || errno == EAGAIN)
+		{
+			/* blocked on write - wait for socket to become readable */
+			/* FIXME: postmaster death? Other interrupts? */
+			WaitLatchOrSocket(NULL, WL_SOCKET_WRITEABLE, fsync_fds[FSYNC_FD_SUBMIT], -1, 0);
+		}
+		else
+		{
+			ereport(FATAL, (errmsg("could not receive fsync request: %m")));
+		}
+	}
+}
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index a4b53b33cdd..135aa29bfeb 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -70,6 +70,7 @@
 #include <time.h>
 #include <sys/wait.h>
 #include <ctype.h>
+#include <sys/types.h>
 #include <sys/stat.h>
 #include <sys/socket.h>
 #include <fcntl.h>
@@ -434,6 +435,7 @@ static pid_t StartChildProcess(AuxProcType type);
 static void StartAutovacuumWorker(void);
 static void MaybeStartWalReceiver(void);
 static void InitPostmasterDeathWatchHandle(void);
+static void InitFsyncFdSocketPair(void);
 
 /*
  * Archiver is allowed to start up at the current postmaster state?
@@ -568,6 +570,8 @@ int			postmaster_alive_fds[2] = {-1, -1};
 HANDLE		PostmasterHandle;
 #endif
 
+int			fsync_fds[2] = {-1, -1};
+
 /*
  * Postmaster main entry point
  */
@@ -1195,6 +1199,11 @@ PostmasterMain(int argc, char *argv[])
 	 */
 	InitPostmasterDeathWatchHandle();
 
+	/*
+	 * Initialize socket pair used to transport file descriptors over.
+	 */
+	InitFsyncFdSocketPair();
+
 #ifdef WIN32
 
 	/*
@@ -6443,3 +6452,32 @@ InitPostmasterDeathWatchHandle(void)
 								 GetLastError())));
 #endif							/* WIN32 */
 }
+
+/* Create socket used for requesting fsyncs by checkpointer */
+static void
+InitFsyncFdSocketPair(void)
+{
+	Assert(MyProcPid == PostmasterPid);
+	if (socketpair(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0, fsync_fds) < 0)
+		ereport(FATAL,
+				(errcode_for_file_access(),
+				 errmsg_internal("could not create fsync sockets: %m")));
+
+	/*
+	 * Set O_NONBLOCK on both fds.
+	 */
+	if (fcntl(fsync_fds[FSYNC_FD_PROCESS], F_SETFL, O_NONBLOCK) == -1)
+		ereport(FATAL,
+				(errcode_for_socket_access(),
+				 errmsg_internal("could not set fsync process socket to nonblocking mode: %m")));
+
+	if (fcntl(fsync_fds[FSYNC_FD_SUBMIT], F_SETFL, O_NONBLOCK) == -1)
+		ereport(FATAL,
+				(errcode_for_socket_access(),
+				 errmsg_internal("could not set fsync submit socket to nonblocking mode: %m")));
+
+	/*
+	 * FIXME: do DuplicateHandle dance for windows - can that work
+	 * trivially?
+	 */
+}
diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c
index 555774320b5..e24b0e9ec39 100644
--- a/src/backend/storage/smgr/md.c
+++ b/src/backend/storage/smgr/md.c
@@ -142,8 +142,8 @@ typedef struct
 	CycleCtr	cycle_ctr;		/* sync cycle of oldest request */
 	/* requests[f] has bit n set if we need to fsync segment n of fork f */
 	Bitmapset  *requests[MAX_FORKNUM + 1];
-	/* canceled[f] is true if we canceled fsyncs for fork "recently" */
-	bool		canceled[MAX_FORKNUM + 1];
+	int		   *syncfds[MAX_FORKNUM + 1];
+	int			syncfd_len[MAX_FORKNUM + 1];
 } PendingOperationEntry;
 
 typedef struct
@@ -152,6 +152,8 @@ typedef struct
 	CycleCtr	cycle_ctr;		/* mdckpt_cycle_ctr when request was made */
 } PendingUnlinkEntry;
 
+static uint32 open_fsync_queue_files = 0;
+static bool mdsync_in_progress = false;
 static HTAB *pendingOpsTable = NULL;
 static List *pendingUnlinks = NIL;
 static MemoryContext pendingOpsCxt; /* context for the above  */
@@ -196,6 +198,8 @@ static MdfdVec *_mdfd_getseg(SMgrRelation reln, ForkNumber forkno,
 			 BlockNumber blkno, bool skipFsync, int behavior);
 static BlockNumber _mdnblocks(SMgrRelation reln, ForkNumber forknum,
 		   MdfdVec *seg);
+static char *mdpath(RelFileNode rnode, ForkNumber forknum, BlockNumber segno);
+static void mdsyncpass(bool include_current);
 
 
 /*
@@ -1049,43 +1053,28 @@ mdimmedsync(SMgrRelation reln, ForkNumber forknum)
 }
 
 /*
- *	mdsync() -- Sync previous writes to stable storage.
+ * Do one pass over the the fsync request hashtable and perform the necessary
+ * fsyncs. Increments the mdsync cycle counter.
+ *
+ * If include_current is true perform all fsyncs (this is done if too many
+ * files are open), otherwise only perform the fsyncs belonging to the cycle
+ * valid at call time.
  */
-void
-mdsync(void)
+static void
+mdsyncpass(bool include_current)
 {
-	static bool mdsync_in_progress = false;
-
 	HASH_SEQ_STATUS hstat;
 	PendingOperationEntry *entry;
 	int			absorb_counter;
 
 	/* Statistics on sync times */
-	int			processed = 0;
 	instr_time	sync_start,
 				sync_end,
 				sync_diff;
 	uint64		elapsed;
-	uint64		longest = 0;
-	uint64		total_elapsed = 0;
-
-	/*
-	 * This is only called during checkpoints, and checkpoints should only
-	 * occur in processes that have created a pendingOpsTable.
-	 */
-	if (!pendingOpsTable)
-		elog(ERROR, "cannot sync without a pendingOpsTable");
-
-	/*
-	 * If we are in the checkpointer, the sync had better include all fsync
-	 * requests that were queued by backends up to this point.  The tightest
-	 * race condition that could occur is that a buffer that must be written
-	 * and fsync'd for the checkpoint could have been dumped by a backend just
-	 * before it was visited by BufferSync().  We know the backend will have
-	 * queued an fsync request before clearing the buffer's dirtybit, so we
-	 * are safe as long as we do an Absorb after completing BufferSync().
-	 */
-	AbsorbFsyncRequests();
+	int			processed = CheckpointStats.ckpt_sync_rels;
+	uint64		longest = CheckpointStats.ckpt_longest_sync;
+	uint64		total_elapsed = CheckpointStats.ckpt_agg_sync_time;
 
 	/*
 	 * To avoid excess fsync'ing (in the worst case, maybe a never-terminating
@@ -1133,17 +1122,27 @@ mdsync(void)
 	while ((entry = (PendingOperationEntry *) hash_seq_search(&hstat)) != NULL)
 	{
 		ForkNumber	forknum;
+		bool has_remaining;
 
 		/*
-		 * If the entry is new then don't process it this time; it might
-		 * contain multiple fsync-request bits, but they are all new.  Note
-		 * "continue" bypasses the hash-remove call at the bottom of the loop.
+		 * If processing fsync requests because of too may file handles, close
+		 * regardless of cycle. Otherwise nothing to be closed might be found,
+		 * and we want to make room as quickly as possible so more requests
+		 * can be absorbed.
 		 */
-		if (entry->cycle_ctr == GetCheckpointSyncCycle())
-			continue;
+		if (!include_current)
+		{
+			/*
+			 * If the entry is new then don't process it this time; it might
+			 * contain multiple fsync-request bits, but they are all new.  Note
+			 * "continue" bypasses the hash-remove call at the bottom of the loop.
+			 */
+			if (entry->cycle_ctr == GetCheckpointSyncCycle())
+				continue;
 
-		/* Else assert we haven't missed it */
-		Assert((CycleCtr) (entry->cycle_ctr + 1) == GetCheckpointSyncCycle());
+			/* Else assert we haven't missed it */
+			Assert((CycleCtr) (entry->cycle_ctr + 1) == GetCheckpointSyncCycle());
+		}
 
 		/*
 		 * Scan over the forks and segments represented by the entry.
@@ -1158,158 +1157,151 @@ mdsync(void)
 		 */
 		for (forknum = 0; forknum <= MAX_FORKNUM; forknum++)
 		{
-			Bitmapset  *requests = entry->requests[forknum];
 			int			segno;
 
-			entry->requests[forknum] = NULL;
-			entry->canceled[forknum] = false;
-
-			while ((segno = bms_first_member(requests)) >= 0)
+			segno = -1;
+			while ((segno = bms_next_member(entry->requests[forknum], segno)) >= 0)
 			{
-				int			failures;
+				char	   *path;
+				int			returnCode;
+
+				/*
+				 * Temporarily mark as processed. Have to do so before
+				 * absorbing further requests, otherwise we might delete a new
+				 * requests in a new cycle.
+				 */
+				bms_del_member(entry->requests[forknum], segno);
+
+				if (entry->syncfd_len[forknum] <= segno ||
+					entry->syncfds[forknum][segno] == -1)
+				{
+					/*
+					 * Optionally open file, if we want to support not
+					 * transporting fds as well.
+					 */
+					elog(FATAL, "file not opened");
+				}
 
 				/*
 				 * If fsync is off then we don't have to bother opening the
 				 * file at all.  (We delay checking until this point so that
 				 * changing fsync on the fly behaves sensibly.)
+				 *
+				 * XXX: Why is that an important goal? Doesn't give any
+				 * interesting guarantees afaict?
 				 */
-				if (!enableFsync)
-					continue;
-
-				/*
-				 * If in checkpointer, we want to absorb pending requests
-				 * every so often to prevent overflow of the fsync request
-				 * queue.  It is unspecified whether newly-added entries will
-				 * be visited by hash_seq_search, but we don't care since we
-				 * don't need to process them anyway.
-				 */
-				if (--absorb_counter <= 0)
+				if (enableFsync)
 				{
-					AbsorbFsyncRequests();
-					absorb_counter = FSYNCS_PER_ABSORB;
-				}
-
-				/*
-				 * The fsync table could contain requests to fsync segments
-				 * that have been deleted (unlinked) by the time we get to
-				 * them. Rather than just hoping an ENOENT (or EACCES on
-				 * Windows) error can be ignored, what we do on error is
-				 * absorb pending requests and then retry.  Since mdunlink()
-				 * queues a "cancel" message before actually unlinking, the
-				 * fsync request is guaranteed to be marked canceled after the
-				 * absorb if it really was this case. DROP DATABASE likewise
-				 * has to tell us to forget fsync requests before it starts
-				 * deletions.
-				 */
-				for (failures = 0;; failures++) /* loop exits at "break" */
-				{
-					SMgrRelation reln;
-					MdfdVec    *seg;
-					char	   *path;
-					int			save_errno;
-
 					/*
-					 * Find or create an smgr hash entry for this relation.
-					 * This may seem a bit unclean -- md calling smgr?	But
-					 * it's really the best solution.  It ensures that the
-					 * open file reference isn't permanently leaked if we get
-					 * an error here. (You may say "but an unreferenced
-					 * SMgrRelation is still a leak!" Not really, because the
-					 * only case in which a checkpoint is done by a process
-					 * that isn't about to shut down is in the checkpointer,
-					 * and it will periodically do smgrcloseall(). This fact
-					 * justifies our not closing the reln in the success path
-					 * either, which is a good thing since in non-checkpointer
-					 * cases we couldn't safely do that.)
+					 * The fsync table could contain requests to fsync
+					 * segments that have been deleted (unlinked) by the time
+					 * we get to them.  That used to be problematic, but now
+					 * we have a filehandle to the deleted file. That means we
+					 * might fsync an empty file superfluously, in a
+					 * relatively tight window, which is acceptable.
 					 */
-					reln = smgropen(entry->rnode, InvalidBackendId);
 
-					/* Attempt to open and fsync the target segment */
-					seg = _mdfd_getseg(reln, forknum,
-									   (BlockNumber) segno * (BlockNumber) RELSEG_SIZE,
-									   false,
-									   EXTENSION_RETURN_NULL
-									   | EXTENSION_DONT_CHECK_SIZE);
+					path = mdpath(entry->rnode, forknum, segno);
 
 					INSTR_TIME_SET_CURRENT(sync_start);
 
-					if (seg != NULL &&
-						FileSync(seg->mdfd_vfd, WAIT_EVENT_DATA_FILE_SYNC) >= 0)
+					pgstat_report_wait_start(WAIT_EVENT_DATA_FILE_SYNC);
+					returnCode = pg_fsync(entry->syncfds[forknum][segno]);
+					pgstat_report_wait_end();
+
+					if (returnCode < 0)
 					{
-						/* Success; update statistics about sync timing */
-						INSTR_TIME_SET_CURRENT(sync_end);
-						sync_diff = sync_end;
-						INSTR_TIME_SUBTRACT(sync_diff, sync_start);
-						elapsed = INSTR_TIME_GET_MICROSEC(sync_diff);
-						if (elapsed > longest)
-							longest = elapsed;
-						total_elapsed += elapsed;
-						processed++;
-						if (log_checkpoints)
-							elog(DEBUG1, "checkpoint sync: number=%d file=%s time=%.3f msec",
-								 processed,
-								 FilePathName(seg->mdfd_vfd),
-								 (double) elapsed / 1000);
+						/* XXX: decide on policy */
+						bms_add_member(entry->requests[forknum], segno);
 
-						break;	/* out of retry loop */
-					}
-
-					/* Compute file name for use in message */
-					save_errno = errno;
-					path = _mdfd_segpath(reln, forknum, (BlockNumber) segno);
-					errno = save_errno;
-
-					/*
-					 * It is possible that the relation has been dropped or
-					 * truncated since the fsync request was entered.
-					 * Therefore, allow ENOENT, but only if we didn't fail
-					 * already on this file.  This applies both for
-					 * _mdfd_getseg() and for FileSync, since fd.c might have
-					 * closed the file behind our back.
-					 *
-					 * XXX is there any point in allowing more than one retry?
-					 * Don't see one at the moment, but easy to change the
-					 * test here if so.
-					 */
-					if (!FILE_POSSIBLY_DELETED(errno) ||
-						failures > 0)
 						ereport(ERROR,
 								(errcode_for_file_access(),
 								 errmsg("could not fsync file \"%s\": %m",
 										path)));
-					else
+					}
+
+					/* Success; update statistics about sync timing */
+					INSTR_TIME_SET_CURRENT(sync_end);
+					sync_diff = sync_end;
+					INSTR_TIME_SUBTRACT(sync_diff, sync_start);
+					elapsed = INSTR_TIME_GET_MICROSEC(sync_diff);
+					if (elapsed > longest)
+						longest = elapsed;
+					total_elapsed += elapsed;
+					processed++;
+					if (log_checkpoints)
 						ereport(DEBUG1,
-								(errcode_for_file_access(),
-								 errmsg("could not fsync file \"%s\" but retrying: %m",
-										path)));
+								(errmsg("checkpoint sync: number=%d file=%s time=%.3f msec",
+										processed,
+										path,
+										(double) elapsed / 1000),
+								 errhidestmt(true),
+								 errhidecontext(true)));
+
 					pfree(path);
+				}
 
+				/*
+				 * It shouldn't be possible for a new request to arrive during
+				 * the fsync (on error this will not be reached).
+				 */
+				Assert(!bms_is_member(segno, entry->requests[forknum]));
+
+				/*
+				 * Close file.  XXX: centralize code.
+				 */
+				{
+					open_fsync_queue_files--;
+					CloseTransientFile(entry->syncfds[forknum][segno]);
+					entry->syncfds[forknum][segno] = -1;
+				}
+
+				/*
+				 * If in checkpointer, we want to absorb pending requests every so
+				 * often to prevent overflow of the fsync request queue.  It is
+				 * unspecified whether newly-added entries will be visited by
+				 * hash_seq_search, but we don't care since we don't need to process
+				 * them anyway.
+				 */
+				if (absorb_counter-- <= 0)
+				{
 					/*
-					 * Absorb incoming requests and check to see if a cancel
-					 * arrived for this relation fork.
+					 * Don't absorb if too many files are open. This pass will
+					 * soon close some, so check again later.
 					 */
-					AbsorbFsyncRequests();
-					absorb_counter = FSYNCS_PER_ABSORB; /* might as well... */
-
-					if (entry->canceled[forknum])
-						break;
-				}				/* end retry loop */
+					if (open_fsync_queue_files < ((MaxTransientFiles() * 8) / 10))
+						AbsorbFsyncRequests();
+					absorb_counter = FSYNCS_PER_ABSORB;
+				}
 			}
-			bms_free(requests);
 		}
 
 		/*
-		 * We've finished everything that was requested before we started to
-		 * scan the entry.  If no new requests have been inserted meanwhile,
-		 * remove the entry.  Otherwise, update its cycle counter, as all the
-		 * requests now in it must have arrived during this cycle.
+		 * We've finished everything for the file that was requested before we
+		 * started to scan the entry.  If no new requests have been inserted
+		 * meanwhile, remove the entry.  Otherwise, update its cycle counter,
+		 * as all the requests now in it must have arrived during this cycle.
+		 *
+		 * This needs to be checked separately from the above for-each-fork
+		 * loop, as new requests for this relation could have been absorbed.
 		 */
+		has_remaining = false;
 		for (forknum = 0; forknum <= MAX_FORKNUM; forknum++)
 		{
-			if (entry->requests[forknum] != NULL)
-				break;
+			if (bms_is_empty(entry->requests[forknum]))
+			{
+				if (entry->syncfds[forknum])
+				{
+					pfree(entry->syncfds[forknum]);
+					entry->syncfds[forknum] = NULL;
+				}
+				bms_free(entry->requests[forknum]);
+				entry->requests[forknum] = NULL;
+			}
+			else
+				has_remaining = true;
 		}
-		if (forknum <= MAX_FORKNUM)
+		if (has_remaining)
 			entry->cycle_ctr = GetCheckpointSyncCycle();
 		else
 		{
@@ -1320,13 +1312,66 @@ mdsync(void)
 		}
 	}							/* end loop over hashtable entries */
 
-	/* Return sync performance metrics for report at checkpoint end */
+	/* Flag successful completion of mdsync */
+	mdsync_in_progress = false;
+
+	/* Maintain sync performance metrics for report at checkpoint end */
 	CheckpointStats.ckpt_sync_rels = processed;
 	CheckpointStats.ckpt_longest_sync = longest;
 	CheckpointStats.ckpt_agg_sync_time = total_elapsed;
+}
 
-	/* Flag successful completion of mdsync */
-	mdsync_in_progress = false;
+/*
+ *	mdsync() -- Sync previous writes to stable storage.
+ */
+void
+mdsync(void)
+{
+	/*
+	 * This is only called during checkpoints, and checkpoints should only
+	 * occur in processes that have created a pendingOpsTable.
+	 */
+	if (!pendingOpsTable)
+		elog(ERROR, "cannot sync without a pendingOpsTable");
+
+	/*
+	 * If we are in the checkpointer, the sync had better include all fsync
+	 * requests that were queued by backends up to this point.  The tightest
+	 * race condition that could occur is that a buffer that must be written
+	 * and fsync'd for the checkpoint could have been dumped by a backend just
+	 * before it was visited by BufferSync().  We know the backend will have
+	 * queued an fsync request before clearing the buffer's dirtybit, so we
+	 * are safe as long as we do an Absorb after completing BufferSync().
+	 */
+	AbsorbAllFsyncRequests();
+
+	mdsyncpass(false);
+}
+
+/*
+ * Flush the fsync request queue enough to make sure there's room for at least
+ * one more entry.
+ */
+bool
+FlushFsyncRequestQueueIfNecessary(void)
+{
+	if (mdsync_in_progress)
+		return false;
+
+	while (true)
+	{
+		if (open_fsync_queue_files >= ((MaxTransientFiles() * 8) / 10))
+		{
+			elog(DEBUG1,
+				 "flush fsync request queue due to %u open files",
+				 open_fsync_queue_files);
+			mdsyncpass(true);
+		}
+		else
+			break;
+	}
+
+	return true;
 }
 
 /*
@@ -1411,12 +1456,38 @@ mdpostckpt(void)
 		 */
 		if (--absorb_counter <= 0)
 		{
-			AbsorbFsyncRequests();
+			/* XXX: Centralize this condition */
+			if (open_fsync_queue_files < ((MaxTransientFiles() * 8) / 10))
+				AbsorbFsyncRequests();
 			absorb_counter = UNLINKS_PER_ABSORB;
 		}
 	}
 }
 
+
+/*
+ * Return the filename for the specified segment of the relation. The
+ * returned string is palloc'd.
+ */
+static char *
+mdpath(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
+{
+	char	   *path,
+			   *fullpath;
+
+	path = relpathperm(rnode, forknum);
+
+	if (segno > 0)
+	{
+		fullpath = psprintf("%s.%u", path, segno);
+		pfree(path);
+	}
+	else
+		fullpath = path;
+
+	return fullpath;
+}
+
 /*
  * register_dirty_segment() -- Mark a relation segment as needing fsync
  *
@@ -1437,6 +1508,13 @@ register_dirty_segment(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg)
 	pg_memory_barrier();
 	cycle = GetCheckpointSyncCycle();
 
+	/*
+	 * For historical reasons checkpointer keeps track of the number of time
+	 * backends perform writes themselves.
+	 */
+	if (!AmBackgroundWriterProcess())
+		CountBackendWrite();
+
 	/*
 	 * Don't repeatedly register the same segment as dirty.
 	 *
@@ -1449,27 +1527,23 @@ register_dirty_segment(SMgrRelation reln, ForkNumber forknum, MdfdVec *seg)
 
 	if (pendingOpsTable)
 	{
-		/* push it into local pending-ops table */
-		RememberFsyncRequest(reln->smgr_rnode.node, forknum, seg->mdfd_segno);
-		seg->mdfd_dirtied_cycle = cycle;
+		int fd;
+
+		/*
+		 * Push it into local pending-ops table.
+		 *
+		 * Gotta duplicate the fd - we can't have fd.c close it behind our
+		 * back, as that'd lead to loosing error reporting guarantees on
+		 * linux. RememberFsyncRequest() will manage the lifetime.
+		 */
+		ReserveTransientFile();
+		fd = dup(FileGetRawDesc(seg->mdfd_vfd));
+		if (fd < 0)
+			elog(ERROR, "couldn't dup: %m");
+		RememberFsyncRequest(reln->smgr_rnode.node, forknum, seg->mdfd_segno, fd);
 	}
 	else
-	{
-		if (ForwardFsyncRequest(reln->smgr_rnode.node, forknum, seg->mdfd_segno))
-		{
-			seg->mdfd_dirtied_cycle = cycle;
-			return;				/* passed it off successfully */
-		}
-
-		ereport(DEBUG1,
-				(errmsg("could not forward fsync request because request queue is full")));
-
-		if (FileSync(seg->mdfd_vfd, WAIT_EVENT_DATA_FILE_SYNC) < 0)
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not fsync file \"%s\": %m",
-							FilePathName(seg->mdfd_vfd))));
-	}
+		ForwardFsyncRequest(reln->smgr_rnode.node, forknum, seg->mdfd_segno, seg->mdfd_vfd);
 }
 
 /*
@@ -1491,21 +1565,14 @@ register_unlink(RelFileNodeBackend rnode)
 	{
 		/* push it into local pending-ops table */
 		RememberFsyncRequest(rnode.node, MAIN_FORKNUM,
-							 UNLINK_RELATION_REQUEST);
+							 UNLINK_RELATION_REQUEST,
+							 -1);
 	}
 	else
 	{
-		/*
-		 * Notify the checkpointer about it.  If we fail to queue the request
-		 * message, we have to sleep and try again, because we can't simply
-		 * delete the file now.  Ugly, but hopefully won't happen often.
-		 *
-		 * XXX should we just leave the file orphaned instead?
-		 */
+		/* Notify the checkpointer about it. */
 		Assert(IsUnderPostmaster);
-		while (!ForwardFsyncRequest(rnode.node, MAIN_FORKNUM,
-									UNLINK_RELATION_REQUEST))
-			pg_usleep(10000L);	/* 10 msec seems a good number */
+		ForwardFsyncRequest(rnode.node, MAIN_FORKNUM, UNLINK_RELATION_REQUEST, -1);
 	}
 }
 
@@ -1531,7 +1598,7 @@ register_unlink(RelFileNodeBackend rnode)
  * heavyweight operation anyhow, so we'll live with it.)
  */
 void
-RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
+RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno, int fd)
 {
 	Assert(pendingOpsTable);
 
@@ -1549,18 +1616,28 @@ RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
 			/*
 			 * We can't just delete the entry since mdsync could have an
 			 * active hashtable scan.  Instead we delete the bitmapsets; this
-			 * is safe because of the way mdsync is coded.  We also set the
-			 * "canceled" flags so that mdsync can tell that a cancel arrived
-			 * for the fork(s).
+			 * is safe because of the way mdsync is coded.
 			 */
 			if (forknum == InvalidForkNumber)
 			{
 				/* remove requests for all forks */
 				for (forknum = 0; forknum <= MAX_FORKNUM; forknum++)
 				{
+					int segno;
+
 					bms_free(entry->requests[forknum]);
 					entry->requests[forknum] = NULL;
-					entry->canceled[forknum] = true;
+
+					for (segno = 0; segno < entry->syncfd_len[forknum]; segno++)
+					{
+						if (entry->syncfds[forknum][segno] != -1)
+						{
+							open_fsync_queue_files--;
+							CloseTransientFile(entry->syncfds[forknum][segno]);
+							entry->syncfds[forknum][segno] = -1;
+						}
+					}
+
 				}
 			}
 			else
@@ -1568,7 +1645,16 @@ RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
 				/* remove requests for single fork */
 				bms_free(entry->requests[forknum]);
 				entry->requests[forknum] = NULL;
-				entry->canceled[forknum] = true;
+
+				for (segno = 0; segno < entry->syncfd_len[forknum]; segno++)
+				{
+					if (entry->syncfds[forknum][segno] != -1)
+					{
+						open_fsync_queue_files--;
+						CloseTransientFile(entry->syncfds[forknum][segno]);
+						entry->syncfds[forknum][segno] = -1;
+					}
+				}
 			}
 		}
 	}
@@ -1592,7 +1678,6 @@ RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
 				{
 					bms_free(entry->requests[forknum]);
 					entry->requests[forknum] = NULL;
-					entry->canceled[forknum] = true;
 				}
 			}
 		}
@@ -1646,7 +1731,8 @@ RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
 		{
 			entry->cycle_ctr = GetCheckpointSyncCycle();
 			MemSet(entry->requests, 0, sizeof(entry->requests));
-			MemSet(entry->canceled, 0, sizeof(entry->canceled));
+			MemSet(entry->syncfds, 0, sizeof(entry->syncfds));
+			MemSet(entry->syncfd_len, 0, sizeof(entry->syncfd_len));
 		}
 
 		/*
@@ -1658,6 +1744,55 @@ RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum, BlockNumber segno)
 		entry->requests[forknum] = bms_add_member(entry->requests[forknum],
 												  (int) segno);
 
+		if (fd >= 0)
+		{
+			/* make space for entry */
+			if (entry->syncfds[forknum] == NULL)
+			{
+				int i;
+
+				entry->syncfds[forknum] = palloc(sizeof(int*) * (segno + 1));
+				entry->syncfd_len[forknum] = segno + 1;
+
+				for (i = 0; i <= segno; i++)
+					entry->syncfds[forknum][i] = -1;
+			}
+			else  if (entry->syncfd_len[forknum] <= segno)
+			{
+				int i;
+
+				entry->syncfds[forknum] = repalloc(entry->syncfds[forknum],
+												   sizeof(int*) * (segno + 1));
+
+				/* initialize newly created entries */
+				for (i = entry->syncfd_len[forknum]; i <= segno; i++)
+					entry->syncfds[forknum][i] = -1;
+
+				entry->syncfd_len[forknum] = segno + 1;
+			}
+
+			if (entry->syncfds[forknum][segno] == -1)
+			{
+				open_fsync_queue_files++;
+				/* caller must have reserved entry */
+				RegisterTransientFile(fd);
+				entry->syncfds[forknum][segno] = fd;
+			}
+			else
+			{
+				/*
+				 * File is already open. Have to keep the older fd, errors
+				 * might only be reported to it, thus close the one we just
+				 * got.
+				 *
+				 * XXX: check for errrors.
+				 */
+				close(fd);
+			}
+
+			FlushFsyncRequestQueueIfNecessary();
+		}
+
 		MemoryContextSwitchTo(oldcxt);
 	}
 }
@@ -1674,22 +1809,12 @@ ForgetRelationFsyncRequests(RelFileNode rnode, ForkNumber forknum)
 	if (pendingOpsTable)
 	{
 		/* standalone backend or startup process: fsync state is local */
-		RememberFsyncRequest(rnode, forknum, FORGET_RELATION_FSYNC);
+		RememberFsyncRequest(rnode, forknum, FORGET_RELATION_FSYNC, -1);
 	}
 	else if (IsUnderPostmaster)
 	{
-		/*
-		 * Notify the checkpointer about it.  If we fail to queue the cancel
-		 * message, we have to sleep and try again ... ugly, but hopefully
-		 * won't happen often.
-		 *
-		 * XXX should we CHECK_FOR_INTERRUPTS in this loop?  Escaping with an
-		 * error would leave the no-longer-used file still present on disk,
-		 * which would be bad, so I'm inclined to assume that the checkpointer
-		 * will always empty the queue soon.
-		 */
-		while (!ForwardFsyncRequest(rnode, forknum, FORGET_RELATION_FSYNC))
-			pg_usleep(10000L);	/* 10 msec seems a good number */
+		/* Notify the checkpointer about it. */
+		ForwardFsyncRequest(rnode, forknum, FORGET_RELATION_FSYNC, -1);
 
 		/*
 		 * Note we don't wait for the checkpointer to actually absorb the
@@ -1713,14 +1838,12 @@ ForgetDatabaseFsyncRequests(Oid dbid)
 	if (pendingOpsTable)
 	{
 		/* standalone backend or startup process: fsync state is local */
-		RememberFsyncRequest(rnode, InvalidForkNumber, FORGET_DATABASE_FSYNC);
+		RememberFsyncRequest(rnode, InvalidForkNumber, FORGET_DATABASE_FSYNC, -1);
 	}
 	else if (IsUnderPostmaster)
 	{
 		/* see notes in ForgetRelationFsyncRequests */
-		while (!ForwardFsyncRequest(rnode, InvalidForkNumber,
-									FORGET_DATABASE_FSYNC))
-			pg_usleep(10000L);	/* 10 msec seems a good number */
+		ForwardFsyncRequest(rnode, InvalidForkNumber, FORGET_DATABASE_FSYNC, -1);
 	}
 }
 
diff --git a/src/include/postmaster/bgwriter.h b/src/include/postmaster/bgwriter.h
index 87a5cfad415..58ba671a907 100644
--- a/src/include/postmaster/bgwriter.h
+++ b/src/include/postmaster/bgwriter.h
@@ -16,6 +16,7 @@
 #define _BGWRITER_H
 
 #include "storage/block.h"
+#include "storage/fd.h"
 #include "storage/relfilenode.h"
 
 
@@ -31,9 +32,10 @@ extern void CheckpointerMain(void) pg_attribute_noreturn();
 extern void RequestCheckpoint(int flags);
 extern void CheckpointWriteDelay(int flags, double progress);
 
-extern bool ForwardFsyncRequest(RelFileNode rnode, ForkNumber forknum,
-					BlockNumber segno);
+extern void ForwardFsyncRequest(RelFileNode rnode, ForkNumber forknum,
+								BlockNumber segno, File file);
 extern void AbsorbFsyncRequests(void);
+extern void AbsorbAllFsyncRequests(void);
 
 extern Size CheckpointerShmemSize(void);
 extern void CheckpointerShmemInit(void);
@@ -43,4 +45,6 @@ extern uint32 IncCheckpointSyncCycle(void);
 
 extern bool FirstCallSinceLastCheckpoint(void);
 
+extern void CountBackendWrite(void);
+
 #endif							/* _BGWRITER_H */
diff --git a/src/include/postmaster/postmaster.h b/src/include/postmaster/postmaster.h
index 1877eef2391..e2ba64e8984 100644
--- a/src/include/postmaster/postmaster.h
+++ b/src/include/postmaster/postmaster.h
@@ -44,6 +44,11 @@ extern int	postmaster_alive_fds[2];
 #define POSTMASTER_FD_OWN		1	/* kept open by postmaster only */
 #endif
 
+#define FSYNC_FD_SUBMIT			0
+#define FSYNC_FD_PROCESS		1
+
+extern int	fsync_fds[2];
+
 extern PGDLLIMPORT const char *progname;
 
 extern void PostmasterMain(int argc, char *argv[]) pg_attribute_noreturn();
diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h
index 558e4d8518b..798a9652927 100644
--- a/src/include/storage/smgr.h
+++ b/src/include/storage/smgr.h
@@ -140,7 +140,8 @@ extern void mdpostckpt(void);
 
 extern void SetForwardFsyncRequests(void);
 extern void RememberFsyncRequest(RelFileNode rnode, ForkNumber forknum,
-					 BlockNumber segno);
+					 BlockNumber segno, int fd);
+extern bool FlushFsyncRequestQueueIfNecessary(void);
 extern void ForgetRelationFsyncRequests(RelFileNode rnode, ForkNumber forknum);
 extern void ForgetDatabaseFsyncRequests(Oid dbid);
 
-- 
2.17.0.rc1.dirty

