From bdb1101bc5b2fd0dd3efa6484f08fa4a1bb93b0c Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Wed, 16 Mar 2016 17:28:05 -0700
Subject: [PATCH 5/6] WIP: WaitEvent API

---
 src/backend/libpq/be-secure.c     |  24 +--
 src/backend/libpq/pqcomm.c        |  12 ++
 src/backend/port/unix_latch.c     | 317 ++++++++++++++++++++++++++++++++++++++
 src/backend/utils/init/miscinit.c |   8 +
 src/include/libpq/libpq.h         |   3 +
 src/include/storage/latch.h       |  44 ++++++
 6 files changed, 396 insertions(+), 12 deletions(-)

diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c
index ac709d1..c396811 100644
--- a/src/backend/libpq/be-secure.c
+++ b/src/backend/libpq/be-secure.c
@@ -140,13 +140,13 @@ retry:
 	/* In blocking mode, wait until the socket is ready */
 	if (n < 0 && !port->noblock && (errno == EWOULDBLOCK || errno == EAGAIN))
 	{
-		int			w;
+		WaitEvent   event;
 
 		Assert(waitfor);
 
-		w = WaitLatchOrSocket(MyLatch,
-							  WL_LATCH_SET | WL_POSTMASTER_DEATH | waitfor,
-							  port->sock, 0);
+		ModifyWaitEvent(FeBeWaitSet, 0, waitfor, NULL);
+
+		WaitEventSetWait(FeBeWaitSet, 0 /* no timeout */, &event, 1);
 
 		/*
 		 * If the postmaster has died, it's not safe to continue running,
@@ -165,13 +165,13 @@ retry:
 		 * cycles checking for this very rare condition, and this should cause
 		 * us to exit quickly in most cases.)
 		 */
-		if (w & WL_POSTMASTER_DEATH)
+		if (event.events & WL_POSTMASTER_DEATH)
 			ereport(FATAL,
 					(errcode(ERRCODE_ADMIN_SHUTDOWN),
 					errmsg("terminating connection due to unexpected postmaster exit")));
 
 		/* Handle interrupt. */
-		if (w & WL_LATCH_SET)
+		if (event.events & WL_LATCH_SET)
 		{
 			ResetLatch(MyLatch);
 			ProcessClientReadInterrupt(true);
@@ -241,22 +241,22 @@ retry:
 
 	if (n < 0 && !port->noblock && (errno == EWOULDBLOCK || errno == EAGAIN))
 	{
-		int			w;
+		WaitEvent   event;
 
 		Assert(waitfor);
 
-		w = WaitLatchOrSocket(MyLatch,
-							  WL_LATCH_SET | WL_POSTMASTER_DEATH | waitfor,
-							  port->sock, 0);
+		ModifyWaitEvent(FeBeWaitSet, 0, waitfor, NULL);
+
+		WaitEventSetWait(FeBeWaitSet, 0 /* no timeout */, &event, 1);
 
 		/* See comments in secure_read. */
-		if (w & WL_POSTMASTER_DEATH)
+		if (event.events & WL_POSTMASTER_DEATH)
 			ereport(FATAL,
 					(errcode(ERRCODE_ADMIN_SHUTDOWN),
 					errmsg("terminating connection due to unexpected postmaster exit")));
 
 		/* Handle interrupt. */
-		if (w & WL_LATCH_SET)
+		if (event.events & WL_LATCH_SET)
 		{
 			ResetLatch(MyLatch);
 			ProcessClientWriteInterrupt(true);
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 71473db..31d646d 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -201,6 +201,18 @@ pq_init(void)
 				(errmsg("could not set socket to nonblocking mode: %m")));
 #endif
 
+	{
+		MemoryContext oldcontext;
+
+		oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+
+		FeBeWaitSet = CreateWaitEventSet(3);
+		AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock, NULL);
+		AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch);
+		AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, -1, NULL);
+
+		MemoryContextSwitchTo(oldcontext);
+	}
 }
 
 /* --------------------------------
diff --git a/src/backend/port/unix_latch.c b/src/backend/port/unix_latch.c
index f6cb15b..9bcfe14 100644
--- a/src/backend/port/unix_latch.c
+++ b/src/backend/port/unix_latch.c
@@ -72,6 +72,28 @@
 #error "no latch implementation available"
 #endif
 
+#if defined(WAIT_USE_POLL) || defined(WAIT_USE_SELECT)
+/* don't overwrite manual choice */
+#elif defined(HAVE_POLL)
+#define WAIT_USE_POLL
+#elif HAVE_SYS_SELECT_H
+#define WAIT_USE_SELECT
+#else
+#error "no wait set implementation available"
+#endif
+
+typedef struct WaitEventSet
+{
+	int nevents;
+	int nevents_space;
+	Latch *latch;
+	int latch_pos;
+	WaitEvent *events;
+#if defined(WAIT_USE_POLL)
+	struct pollfd *pollfds;
+#endif
+} WaitEventSet;
+
 /* Are we currently in WaitLatch? The signal handler would like to know. */
 static volatile sig_atomic_t waiting = false;
 
@@ -83,6 +105,9 @@ static int	selfpipe_writefd = -1;
 static void sendSelfPipeByte(void);
 static void drainSelfPipe(void);
 
+#if defined(WAIT_USE_POLL)
+static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event);
+#endif
 
 /*
  * Initialize the process-local latch infrastructure.
@@ -636,6 +661,298 @@ ResetLatch(volatile Latch *latch)
 	pg_memory_barrier();
 }
 
+WaitEventSet *
+CreateWaitEventSet(int nevents)
+{
+	WaitEventSet   *set;
+	char		   *data;
+	Size			sz = 0;
+
+	sz += sizeof(WaitEventSet);
+	sz += sizeof(WaitEvent) * nevents;
+
+#if defined(LATCH_USE_POLL)
+	sz += sizeof(struct pollfd) * nevents;
+#endif
+
+	data = (char *) palloc0(sz);
+
+	set = (WaitEventSet *) data;
+	data += sizeof(WaitEventSet);
+
+	set->events = (WaitEvent *) data;
+	data += sizeof(WaitEvent) * nevents;
+
+#if defined(WAIT_USE_POLL)
+	set->pollfds = (struct pollfd *) data;
+	data += sizeof(struct pollfd) * nevents;
+#endif
+
+	set->latch = NULL;
+	set->nevents_space = nevents;
+
+	return set;
+}
+
+int
+AddWaitEventToSet(WaitEventSet *set, uint32 events, int fd, Latch *latch)
+{
+	WaitEvent *event;
+
+	if (set->nevents_space <= set->nevents)
+		elog(ERROR, "no space for yet another event");
+
+	if (set->latch && latch)
+		elog(ERROR, "can only wait for one latch");
+	if (!latch && (events & WL_LATCH_SET))
+		elog(ERROR, "cannot wait on latch without latch");
+
+	/* FIXME: validate event mask */
+
+	event = &set->events[set->nevents];
+	event->pos = set->nevents++;
+	event->fd = fd;
+	event->events = events;
+
+	if (events == WL_LATCH_SET)
+	{
+		set->latch = latch;
+		set->latch_pos = event->pos;
+		event->fd = selfpipe_readfd;
+	}
+	else if (events == WL_POSTMASTER_DEATH)
+	{
+		event->fd = postmaster_alive_fds[POSTMASTER_FD_WATCH];
+	}
+
+#if defined(WAIT_USE_POLL)
+	WaitEventAdjustPoll(set, event);
+#endif
+
+	return event->pos;
+}
+
+void
+ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
+{
+	WaitEvent *event;
+
+	Assert(pos < set->nevents);
+
+	event = &set->events[pos];
+
+	/* no need to perform any checks/modifications */
+	if (events == event->events && !(event->events & WL_LATCH_SET))
+		return;
+
+	if (event->events & WL_LATCH_SET &&
+		events != event->events)
+	{
+		/* we could allow to disable latch events for a while */
+		elog(ERROR, "cannot modify latch event");
+	}
+	if (event->events & WL_POSTMASTER_DEATH &&
+		events != event->events)
+	{
+		elog(ERROR, "cannot modify postmaster death event");
+	}
+
+	/* FIXME: validate event mask */
+	event->events = events;
+
+	if (events == WL_LATCH_SET)
+	{
+		set->latch = latch;
+	}
+
+#if defined(WAIT_USE_POLL)
+	WaitEventAdjustPoll(set, event);
+#endif
+}
+
+#if defined(WAIT_USE_POLL)
+static void
+WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
+{
+	struct pollfd *pollfd = &set->pollfds[event->pos];
+
+	pollfd->revents = 0;
+	pollfd->fd = event->fd;
+
+	/* prepare pollfd entry once */
+	if (event->events == WL_LATCH_SET)
+	{
+		Assert(set->latch != NULL);
+		pollfd->events = POLLIN;
+	}
+	else if (event->events == WL_POSTMASTER_DEATH)
+	{
+		pollfd->events = POLLIN;
+	}
+	else
+	{
+		Assert(event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE));
+		pollfd->events = 0;
+		if (event->events & WL_SOCKET_READABLE)
+			pollfd->events |= POLLIN;
+		if (event->events & WL_SOCKET_WRITEABLE)
+			pollfd->events |= POLLOUT;
+	}
+
+	Assert(event->fd >= 0);
+}
+#endif
+
+#if defined(WAIT_USE_POLL)
+int
+WaitEventSetWait(WaitEventSet *set, long timeout,
+				 WaitEvent* occurred_events, int nevents)
+{
+	int returned_events = 0;
+	instr_time	start_time,
+				cur_time;
+	long		cur_timeout = -1;
+	WaitEvent *cur_event;
+
+	struct pollfd *pfds = set->pollfds;
+
+	Assert(nevents > 0);
+
+	if (timeout)
+	{
+		INSTR_TIME_SET_CURRENT(start_time);
+		Assert(timeout >= 0 && timeout <= INT_MAX);
+		cur_timeout = timeout;
+	}
+
+	waiting = true;
+	while (returned_events == 0)
+	{
+		int rc;
+		int pos;
+		struct pollfd *cur_pollfd;
+
+		/* return immediately if latch is set */
+		if (set->latch && set->latch->is_set)
+		{
+			occurred_events->fd = -1;
+			occurred_events->pos = set->latch_pos;
+			occurred_events->events = WL_LATCH_SET;
+			occurred_events++;
+			returned_events++;
+
+			continue;
+		}
+
+		/* Sleep */
+		rc = poll(pfds, set->nevents, (int) cur_timeout);
+
+		/* Check return code */
+		if (rc < 0)
+		{
+			/* EINTR is okay, otherwise complain */
+			if (errno != EINTR)
+			{
+				waiting = false;
+				ereport(ERROR,
+						(errcode_for_socket_access(),
+						 errmsg("poll() failed: %m")));
+			}
+			continue;
+		}
+		else if (rc == 0)
+		{
+			break;
+		}
+
+		for (pos = 0, cur_event = set->events, cur_pollfd = set->pollfds;
+			 pos < set->nevents && returned_events < nevents;
+			 pos++, cur_event++, cur_pollfd++)
+		{
+			if (cur_event->events == WL_LATCH_SET &&
+				(cur_pollfd->revents & (POLLIN | POLLHUP | POLLERR | POLLNVAL)))
+			{
+				/* There's data in the self-pipe, clear it. */
+				drainSelfPipe();
+
+				if (set->latch->is_set)
+				{
+					occurred_events->fd = -1;
+					occurred_events->pos = cur_event->pos;
+					occurred_events->events = WL_LATCH_SET;
+					occurred_events++;
+				}
+			}
+			else if (cur_event->events == WL_POSTMASTER_DEATH &&
+					 (cur_pollfd->revents & (POLLIN | POLLHUP | POLLERR | POLLNVAL)))
+			{
+				/*
+				 * According to the select(2) man page on Linux, select(2) may
+				 * spuriously return and report a file descriptor as readable,
+				 * when it's not; and presumably so can poll(2).  It's not
+				 * clear that the relevant cases would ever apply to the
+				 * postmaster pipe, but since the consequences of falsely
+				 * returning WL_POSTMASTER_DEATH could be pretty unpleasant,
+				 * we take the trouble to positively verify EOF with
+				 * PostmasterIsAlive().
+				 */
+				if (!PostmasterIsAlive())
+				{
+					occurred_events->fd = -1;
+					occurred_events->pos = cur_event->pos;
+					occurred_events->events = WL_POSTMASTER_DEATH;
+					occurred_events++;
+					returned_events++;
+				}
+			}
+			else if (cur_event->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE))
+			{
+				Assert(cur_event->fd);
+
+				occurred_events->fd = cur_event->fd;
+				occurred_events->pos = cur_event->pos;
+				occurred_events->events = 0;
+
+				if ((cur_event->events & WL_SOCKET_READABLE) &&
+					(cur_pollfd->revents & (POLLIN | POLLHUP | POLLERR | POLLNVAL)))
+				{
+					occurred_events->events |= WL_SOCKET_READABLE;
+				}
+
+				if ((cur_event->events & WL_SOCKET_WRITEABLE) &&
+					(cur_pollfd->revents & (POLLOUT | POLLHUP | POLLERR | POLLNVAL)))
+				{
+					occurred_events->events |= WL_SOCKET_WRITEABLE;
+					occurred_events++;
+					returned_events++;
+				}
+
+				if (occurred_events->events != 0)
+				{
+					occurred_events++;
+					returned_events++;
+				}
+			}
+		}
+
+
+		if (occurred_events == 0 && timeout != 0)
+		{
+			INSTR_TIME_SET_CURRENT(cur_time);
+			INSTR_TIME_SUBTRACT(cur_time, start_time);
+			cur_timeout = timeout - (long) INSTR_TIME_GET_MILLISEC(cur_time);
+			if (cur_timeout <= 0)
+				goto out;
+		}
+	}
+out:
+	waiting = false;
+
+	return returned_events;
+}
+#elif defined(WAIT_USE_SELECT)
+#endif
+
 /*
  * SetLatch uses SIGUSR1 to wake up the process waiting on the latch.
  *
diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c
index 18f5e6f..d13355b 100644
--- a/src/backend/utils/init/miscinit.c
+++ b/src/backend/utils/init/miscinit.c
@@ -33,6 +33,7 @@
 
 #include "access/htup_details.h"
 #include "catalog/pg_authid.h"
+#include "libpq/libpq.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "postmaster/autovacuum.h"
@@ -247,6 +248,9 @@ SwitchToSharedLatch(void)
 
 	MyLatch = &MyProc->procLatch;
 
+	if (FeBeWaitSet)
+		ModifyWaitEvent(FeBeWaitSet, 1, WL_LATCH_SET, MyLatch);
+
 	/*
 	 * Set the shared latch as the local one might have been set. This
 	 * shouldn't normally be necessary as code is supposed to check the
@@ -262,6 +266,10 @@ SwitchBackToLocalLatch(void)
 	Assert(MyProc != NULL && MyLatch == &MyProc->procLatch);
 
 	MyLatch = &LocalLatchData;
+
+	if (FeBeWaitSet)
+		ModifyWaitEvent(FeBeWaitSet, 1, WL_LATCH_SET, MyLatch);
+
 	SetLatch(MyLatch);
 }
 
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index 0569994..109fdf7 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -19,6 +19,7 @@
 
 #include "lib/stringinfo.h"
 #include "libpq/libpq-be.h"
+#include "storage/latch.h"
 
 
 typedef struct
@@ -95,6 +96,8 @@ extern ssize_t secure_raw_write(Port *port, const void *ptr, size_t len);
 
 extern bool ssl_loaded_verify_locations;
 
+WaitEventSet *FeBeWaitSet;
+
 /* GUCs */
 extern char *SSLCipherSuites;
 extern char *SSLECDHCurve;
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index e77491e..941e2f0 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -102,6 +102,50 @@ typedef struct Latch
 #define WL_TIMEOUT			 (1 << 3)
 #define WL_POSTMASTER_DEATH  (1 << 4)
 
+typedef struct WaitEventSet WaitEventSet;
+
+typedef struct WaitEvent
+{
+	int		pos;		/* position in the event data structure */
+	uint32	events;		/* tripped events */
+	int		fd;			/* fd associated with event */
+} WaitEvent;
+
+/*
+ * Create a WaitEventSet with space for nevents different events to wait for.
+ *
+ * latch may be NULL.
+ */
+extern WaitEventSet *CreateWaitEventSet(int nevents);
+
+/* ---
+ * Add an event to the set. Possible events are:
+ * - WL_LATCH_SET: Wait for the latch to be set
+ * - WL_POSTMASTER_DEATH: Wait for postmaster to die
+ * - WL_SOCKET_READABLE: Wait for socket to become readable
+ *   can be combined in one event with WL_SOCKET_WRITEABLE
+ * - WL_SOCKET_WRITABLE: Wait for socket to become readable
+ *   can be combined with WL_SOCKET_READABLE
+ *
+ * Returns the offset in WaitEventSet->events (starting from 0), which can be
+ * used to modify previously added wait events.
+ */
+extern int AddWaitEventToSet(WaitEventSet *set, uint32 events, int fd, Latch *latch);
+
+/*
+ * Change the event mask and, if applicable, the associated latch of of a
+ * WaitEvent.
+ */
+extern void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch);
+
+/*
+ * Wait for events added to the set to happen, or until the timeout is
+ * reached.  At most nevents occurrent events are returned.
+ *
+ * Returns the number of events occurred, or 0 if the timeout was reached.
+ */
+extern int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent* occurred_events, int nevents);
+
 /*
  * prototypes for functions in latch.c
  */
-- 
2.7.0.229.g701fa7f

