From 3feafa5ecd6666aacbaf3ceda466044476dda63a Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Wed, 16 Mar 2016 18:14:18 -0700
Subject: [PATCH 6/6] WIP: Use epoll for Wait Event API if available.

---
 configure                     |   2 +-
 configure.in                  |   2 +-
 src/backend/port/unix_latch.c | 238 ++++++++++++++++++++++++++++++++++++++++--
 src/include/pg_config.h.in    |   3 +
 4 files changed, 235 insertions(+), 10 deletions(-)

diff --git a/configure b/configure
index a45be67..da897ae 100755
--- a/configure
+++ b/configure
@@ -10193,7 +10193,7 @@ fi
 ## Header files
 ##
 
-for ac_header in atomic.h crypt.h dld.h fp_class.h getopt.h ieeefp.h ifaddrs.h langinfo.h mbarrier.h poll.h pwd.h sys/ioctl.h sys/ipc.h sys/poll.h sys/pstat.h sys/resource.h sys/select.h sys/sem.h sys/shm.h sys/socket.h sys/sockio.h sys/tas.h sys/time.h sys/un.h termios.h ucred.h utime.h wchar.h wctype.h
+for ac_header in atomic.h crypt.h dld.h fp_class.h getopt.h ieeefp.h ifaddrs.h langinfo.h mbarrier.h poll.h pwd.h sys/epoll.h sys/ioctl.h sys/ipc.h sys/poll.h sys/pstat.h sys/resource.h sys/select.h sys/sem.h sys/shm.h sys/socket.h sys/sockio.h sys/tas.h sys/time.h sys/un.h termios.h ucred.h utime.h wchar.h wctype.h
 do :
   as_ac_Header=`$as_echo "ac_cv_header_$ac_header" | $as_tr_sh`
 ac_fn_c_check_header_mongrel "$LINENO" "$ac_header" "$as_ac_Header" "$ac_includes_default"
diff --git a/configure.in b/configure.in
index c298926..dee3c45 100644
--- a/configure.in
+++ b/configure.in
@@ -1183,7 +1183,7 @@ AC_SUBST(UUID_LIBS)
 ##
 
 dnl sys/socket.h is required by AC_FUNC_ACCEPT_ARGTYPES
-AC_CHECK_HEADERS([atomic.h crypt.h dld.h fp_class.h getopt.h ieeefp.h ifaddrs.h langinfo.h mbarrier.h poll.h pwd.h sys/ioctl.h sys/ipc.h sys/poll.h sys/pstat.h sys/resource.h sys/select.h sys/sem.h sys/shm.h sys/socket.h sys/sockio.h sys/tas.h sys/time.h sys/un.h termios.h ucred.h utime.h wchar.h wctype.h])
+AC_CHECK_HEADERS([atomic.h crypt.h dld.h fp_class.h getopt.h ieeefp.h ifaddrs.h langinfo.h mbarrier.h poll.h pwd.h sys/epoll.h sys/ioctl.h sys/ipc.h sys/poll.h sys/pstat.h sys/resource.h sys/select.h sys/sem.h sys/shm.h sys/socket.h sys/sockio.h sys/tas.h sys/time.h sys/un.h termios.h ucred.h utime.h wchar.h wctype.h])
 
 # On BSD, test for net/if.h will fail unless sys/socket.h
 # is included first.
diff --git a/src/backend/port/unix_latch.c b/src/backend/port/unix_latch.c
index 9bcfe14..c233d68 100644
--- a/src/backend/port/unix_latch.c
+++ b/src/backend/port/unix_latch.c
@@ -38,6 +38,9 @@
 #include <unistd.h>
 #include <sys/time.h>
 #include <sys/types.h>
+#ifdef HAVE_SYS_EPOLL_H
+#include <sys/epoll.h>
+#endif
 #ifdef HAVE_POLL_H
 #include <poll.h>
 #endif
@@ -72,8 +75,10 @@
 #error "no latch implementation available"
 #endif
 
-#if defined(WAIT_USE_POLL) || defined(WAIT_USE_SELECT)
+#if defined(WAIT_USE_EPOLL) || defined(WAIT_USE_POLL) || defined(WAIT_USE_SELECT)
 /* don't overwrite manual choice */
+#elif defined(HAVE_SYS_EPOLL_H)
+#define WAIT_USE_EPOLL
 #elif defined(HAVE_POLL)
 #define WAIT_USE_POLL
 #elif HAVE_SYS_SELECT_H
@@ -89,7 +94,10 @@ typedef struct WaitEventSet
 	Latch *latch;
 	int latch_pos;
 	WaitEvent *events;
-#if defined(WAIT_USE_POLL)
+#if defined(WAIT_USE_EPOLL)
+	struct epoll_event *epoll_ret_events;
+	int epoll_fd;
+#elif defined(WAIT_USE_POLL)
 	struct pollfd *pollfds;
 #endif
 } WaitEventSet;
@@ -105,7 +113,9 @@ static int	selfpipe_writefd = -1;
 static void sendSelfPipeByte(void);
 static void drainSelfPipe(void);
 
-#if defined(WAIT_USE_POLL)
+#if defined(WAIT_USE_EPOLL)
+static void WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action);
+#elif defined(WAIT_USE_POLL)
 static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event);
 #endif
 
@@ -671,7 +681,9 @@ CreateWaitEventSet(int nevents)
 	sz += sizeof(WaitEventSet);
 	sz += sizeof(WaitEvent) * nevents;
 
-#if defined(LATCH_USE_POLL)
+#if defined(WAIT_USE_EPOLL)
+	sz += sizeof(struct epoll_event) * nevents;
+#elif defined(WAIT_USE_POLL)
 	sz += sizeof(struct pollfd) * nevents;
 #endif
 
@@ -683,7 +695,10 @@ CreateWaitEventSet(int nevents)
 	set->events = (WaitEvent *) data;
 	data += sizeof(WaitEvent) * nevents;
 
-#if defined(WAIT_USE_POLL)
+#if defined(WAIT_USE_EPOLL)
+	set->epoll_ret_events = (struct epoll_event *) data;
+	data += sizeof(struct epoll_event) * nevents;
+#elif defined(WAIT_USE_POLL)
 	set->pollfds = (struct pollfd *) data;
 	data += sizeof(struct pollfd) * nevents;
 #endif
@@ -691,6 +706,12 @@ CreateWaitEventSet(int nevents)
 	set->latch = NULL;
 	set->nevents_space = nevents;
 
+#if defined(WAIT_USE_EPOLL)
+	set->epoll_fd = epoll_create(nevents);
+	if (set->epoll_fd < 0)
+		elog(ERROR, "epoll_create failed: %m");
+#endif
+
 	return set;
 }
 
@@ -725,7 +746,9 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, int fd, Latch *latch)
 		event->fd = postmaster_alive_fds[POSTMASTER_FD_WATCH];
 	}
 
-#if defined(WAIT_USE_POLL)
+#if defined(WAIT_USE_EPOLL)
+	WaitEventAdjustEpoll(set, event, EPOLL_CTL_ADD);
+#elif defined(WAIT_USE_POLL)
 	WaitEventAdjustPoll(set, event);
 #endif
 
@@ -765,11 +788,59 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
 		set->latch = latch;
 	}
 
-#if defined(WAIT_USE_POLL)
+#if defined(WAIT_USE_EPOLL)
+	WaitEventAdjustEpoll(set, event, EPOLL_CTL_MOD);
+#elif defined(WAIT_USE_POLL)
 	WaitEventAdjustPoll(set, event);
 #endif
 }
 
+#if defined(WAIT_USE_EPOLL)
+static void
+WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
+{
+	struct epoll_event epoll_ev;
+	int rc;
+
+	epoll_ev.events = EPOLLERR | EPOLLHUP;
+	/* pointer to our event, returned by epoll_wait */
+	epoll_ev.data.ptr = event;
+
+	/* prepare pollfd entry once */
+	if (event->events == WL_LATCH_SET)
+	{
+		Assert(set->latch != NULL);
+		epoll_ev.events |= EPOLLIN;
+	}
+	else if (event->events == WL_POSTMASTER_DEATH)
+	{
+		epoll_ev.events |= EPOLLIN;
+	}
+	else
+	{
+		Assert(event->fd >= 0);
+		Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE));
+
+		if (event->events & WL_SOCKET_READABLE)
+			epoll_ev.events |= EPOLLIN;
+		if (event->events & WL_SOCKET_WRITEABLE)
+			epoll_ev.events |= EPOLLOUT;
+	}
+
+	/*
+	 * Even though unused, we also poss epoll_ev as the data argument for
+	 * EPOLL_CTL_DELETE.  There used to be an epoll bug requiring that, and it
+	 * makes the code simpler...
+	 */
+	rc = epoll_ctl(set->epoll_fd, action, event->fd, &epoll_ev);
+
+	if (rc < 0)
+		ereport(ERROR,
+				(errcode_for_socket_access(),
+				 errmsg("epoll_ctl() failed: %m")));
+}
+#endif
+
 #if defined(WAIT_USE_POLL)
 static void
 WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
@@ -803,7 +874,158 @@ WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
 }
 #endif
 
-#if defined(WAIT_USE_POLL)
+#if defined(WAIT_USE_EPOLL)
+int
+WaitEventSetWait(WaitEventSet *set, long timeout,
+				 WaitEvent* occurred_events, int nevents)
+{
+	int returned_events = 0;
+	instr_time	start_time,
+				cur_time;
+	long		cur_timeout = -1;
+	WaitEvent *cur_event;
+
+	Assert(nevents > 0);
+
+	if (timeout)
+	{
+		INSTR_TIME_SET_CURRENT(start_time);
+		Assert(timeout >= 0 && timeout <= INT_MAX);
+		cur_timeout = timeout;
+	}
+
+	waiting = true;
+	while (returned_events == 0)
+	{
+		int rc;
+		int pos;
+		struct epoll_event *cur_epoll_event;
+
+		/* return immediately if latch is set */
+		if (set->latch && set->latch->is_set)
+		{
+			occurred_events->fd = -1;
+			occurred_events->pos = set->latch_pos;
+			occurred_events->events = WL_LATCH_SET;
+			occurred_events++;
+			returned_events++;
+
+			continue;
+		}
+
+		/* Sleep */
+		rc = epoll_wait(set->epoll_fd, set->epoll_ret_events,
+						nevents, cur_timeout);
+
+		/* Check return code */
+		if (rc < 0)
+		{
+			/* EINTR is okay, otherwise complain */
+			if (errno != EINTR)
+			{
+				waiting = false;
+				ereport(ERROR,
+						(errcode_for_socket_access(),
+						 errmsg("poll() failed: %m")));
+			}
+			continue;
+		}
+		else if (rc == 0)
+		{
+			break;
+		}
+
+		/* iterate over the returned epoll events */
+		for (pos = 0, cur_epoll_event = set->epoll_ret_events;
+			 pos < rc && returned_events < nevents;
+			 pos++, cur_epoll_event++)
+		{
+			cur_event = (WaitEvent *) cur_epoll_event->data.ptr;
+
+			if (cur_event->events == WL_LATCH_SET &&
+				cur_epoll_event->events & (EPOLLIN | EPOLLERR | EPOLLHUP))
+			{
+				/* There's data in the self-pipe, clear it. */
+				drainSelfPipe();
+
+				if (set->latch->is_set)
+				{
+					occurred_events->fd = -1;
+					occurred_events->pos = cur_event->pos;
+					occurred_events->events = WL_LATCH_SET;
+					occurred_events++;
+				}
+			}
+			else if (cur_event->events == WL_POSTMASTER_DEATH &&
+					 cur_epoll_event->events & (EPOLLIN | EPOLLERR | EPOLLHUP))
+			{
+				/*
+				 * FIXME:
+				 * According to the select(2) man page on Linux, select(2) may
+				 * spuriously return and report a file descriptor as readable,
+				 * when it's not; and presumably so can poll(2).  It's not
+				 * clear that the relevant cases would ever apply to the
+				 * postmaster pipe, but since the consequences of falsely
+				 * returning WL_POSTMASTER_DEATH could be pretty unpleasant,
+				 * we take the trouble to positively verify EOF with
+				 * PostmasterIsAlive().
+				 */
+				if (!PostmasterIsAlive())
+				{
+					occurred_events->fd = -1;
+					occurred_events->pos = cur_event->pos;
+					occurred_events->events = WL_POSTMASTER_DEATH;
+					occurred_events++;
+					returned_events++;
+				}
+			}
+			else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
+			{
+				Assert(cur_event->fd);
+
+				occurred_events->fd = cur_event->fd;
+				occurred_events->pos = cur_event->pos;
+				occurred_events->events = 0;
+
+				if ((cur_event->events & WL_SOCKET_READABLE) &&
+					(cur_epoll_event->events & (EPOLLIN | EPOLLERR | EPOLLHUP)))
+				{
+					occurred_events->events |= WL_SOCKET_READABLE;
+				}
+
+				if ((cur_event->events & WL_SOCKET_WRITEABLE) &&
+					(cur_epoll_event->events & (EPOLLOUT | EPOLLERR | EPOLLHUP)))
+				{
+					occurred_events->events |= WL_SOCKET_WRITEABLE;
+					occurred_events++;
+					returned_events++;
+				}
+
+				if (occurred_events->events != 0)
+				{
+					occurred_events++;
+					returned_events++;
+				}
+			}
+		}
+
+
+		if (occurred_events == 0 && timeout != 0)
+		{
+			INSTR_TIME_SET_CURRENT(cur_time);
+			INSTR_TIME_SUBTRACT(cur_time, start_time);
+			cur_timeout = timeout - (long) INSTR_TIME_GET_MILLISEC(cur_time);
+			if (cur_timeout <= 0)
+				goto out;
+		}
+	}
+out:
+	waiting = false;
+
+	return returned_events;
+}
+
+#elif defined(WAIT_USE_POLL)
 int
 WaitEventSetWait(WaitEventSet *set, long timeout,
 				 WaitEvent* occurred_events, int nevents)
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index 3813226..c72635c 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -530,6 +530,9 @@
 /* Define to 1 if you have the syslog interface. */
 #undef HAVE_SYSLOG
 
+/* Define to 1 if you have the <sys/epoll.h> header file. */
+#undef HAVE_SYS_EPOLL_H
+
 /* Define to 1 if you have the <sys/ioctl.h> header file. */
 #undef HAVE_SYS_IOCTL_H
 
-- 
2.7.0.229.g701fa7f

