/*------------------------------------------------------------------------- * * unix_latch.c * Routines for inter-process latches * * A latch is a boolean variable, with operations that let you to sleep * until it is set. A latch can be set from another process, or a signal * handler within the same process. * * The latch interface is a reliable replacement for the common pattern of * using pg_usleep() or select() to wait until a signal arrives, where the * signal handler sets a global variable. Because on some platforms, an * incoming signal doesn't interrupt sleep, and even on platforms where it * does there is a race condition if the signal arrives just before * entering the sleep, the common pattern must periodically wake up and * poll the global variable. pselect() system call was invented to solve * the problem, but it is not portable enough. Latches are designed to * overcome these limitations, allowing you to sleep without polling and * ensuring a quick response to signals from other processes. * * There are two kinds of latches: local and shared. A local latch is * initialized by InitLatch, and can only be set from the same process. * A local latch can be used to wait for a signal to arrive, by calling * SetLatch in the signal handler. A shared latch resides in shared memory, * and must be initialized at postmaster startup by InitSharedLatch. Before * a shared latch can be waited on, it must be associated with a process * with OwnLatch. Only the process owning the latch can wait on it, but any * process can set it. * * There are three basic operations on a latch: * * SetLatch - Sets the latch * ResetLatch - Clears the latch, allowing it to be set again * WaitLatch - Waits for the latch to become set * * The correct pattern to wait for an event is: * * for (;;) * { * ResetLatch(); * if (work to do) * Do Stuff(); * * WaitLatch(); * } * * It's important to reset the latch *before* checking if there's work to * do. Otherwise, if someone sets the latch between the check and the * ResetLatch call, you will miss it and Wait will block. * * To wake up the waiter, you must first set a global flag or something * else that the main loop tests in the "if (work to do)" part, and call * SetLatch *after* that. SetLatch is designed to return quickly if the * latch is already set. * * * Implementation * -------------- * * The Unix implementation uses the so-called self-pipe trick to overcome * the race condition involved with select() and setting a global flag * in the signal handler. When a latch is set and the current process * is waiting for it, the signal handler wakes up the select() in * WaitLatch by writing a byte to a pipe. A signal by itself doesn't * interrupt select() on all platforms, and even on platforms where it * does, a signal that arrives just before the select() call does not * prevent the select() from entering sleep. An incoming byte on a pipe * however reliably interrupts the sleep, and makes select() to return * immediately if the signal arrives just before select() begins. * * When SetLatch is called from the same process that owns the latch, * SetLatch writes the byte directly to the pipe. If it's owned by another * process, SIGUSR1 is sent and the signal handler in the waiting process * writes the byte to the pipe on behalf of the signaling process. * * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION * $PostgreSQL$ * *------------------------------------------------------------------------- */ #include #include #include #include #include #include #include #include #include "latch.h" /* Are we currently in WaitLatch? The signal handler would like to know. */ static volatile sig_atomic_t waiting = false; /* Read and write end of the self-pipe */ static int selfpipe_readfd = -1; static int selfpipe_writefd = -1; /* private function prototypes */ static void initSelfPipe(void); static void drainSelfPipe(void); static void sendSelfPipeByte(void); #ifdef PIPE static void drainLatchPipe(volatile Latch *latch); #endif /* * Initialize a backend-local latch. */ void InitLatch(volatile Latch *latch) { #if defined(LINUX_SEM) #elif defined(PIPE) #else /* Initialize the self pipe if this is our first latch in the process */ if (selfpipe_readfd == -1) initSelfPipe(); latch->is_set = false; #endif DEBUG("SetLatch %p\n", latch); latch->owner_pid = MyProcPid; latch->is_shared = false; } /* * Initialize a shared latch that can be set from other processes. The latch * is initially owned by no-one, use OwnLatch to associate it with the * current process. * * NB: When you introduce a new shared latch, you must increase the shared * latch count in NumSharedLatches in win32_latch.c! */ void InitSharedLatch(volatile Latch *latch) { latch->owner_pid = 0; latch->is_shared = true; #if defined(LINUX_SEM) latch->semId = -1; #elif defined(PIPE) { int ret = pipe((int *)(latch->pipefds)); Assert(!ret); if (fcntl(latch->pipefds[0], F_SETFL, O_NONBLOCK) < 0) elog(FATAL, "fcntl() failed on read-end of self-pipe: %m"); if (fcntl(latch->pipefds[1], F_SETFL, O_NONBLOCK) < 0) elog(FATAL, "fcntl() failed on write-end of self-pipe: %m"); } #else latch->is_set = false; #endif } /* * Associate a shared latch with the current process, allowing it to * wait on it. * * Make sure that latch_sigusr1_handler() is called from the SIGUSR1 signal * handler, as shared latches use SIGUSR1 to for inter-process communication. */ void OwnLatch(volatile Latch *latch) { int i; int s; int ret; union semun semun; Assert(latch->is_shared); /* Initialize the self pipe if this is our first latch in the process */ #if defined(LINUX_SEM) semun.val = 0; if (latch->semId == -1) { for (i = 0; i < 1000; i++) { s = semget(7500 + i, 1, IPC_CREAT | IPC_EXCL|0x1ff); if (s < 0) { DEBUG("semget error %d %d\n", s, errno); if (i == 999) { printf("Can't get sem, aborting\n"); exit (1); } } else { break; } } ret = semctl(s, 0, SETVAL, semun); Assert(!ret); latch->semId = s; } DEBUG("latch %p latch->semId %d\n", latch, latch->semId); #elif defined (PIPE) /* Nothing to do: we allocated it in initlatch. */ #else if (selfpipe_readfd == -1) initSelfPipe(); #endif if (latch->owner_pid != 0) elog(ERROR, "latch already owned"); latch->owner_pid = MyProcPid; } /* * Disown a shared latch currently owned by the current process. */ void DisownLatch(volatile Latch *latch) { Assert(latch->is_shared); Assert(latch->owner_pid == MyProcPid); latch->owner_pid = 0; } /* * Wait for given latch to be set or until timeout is exceeded. * If the latch is already set, the function returns immediately. * * The 'timeout' is given in microseconds, and -1 means wait forever. * On some platforms, signals cause the timeout to be restarted, so beware * that the function can sleep for several times longer than the specified * timeout. * * The latch must be owned by the current process, ie. it must be a * backend-local latch initialized with InitLatch, or a shared latch * associated with the current process by calling OwnLatch. * * Returns 'true' if the latch was set, or 'false' if timeout was reached. */ bool WaitLatch(volatile Latch *latch, long timeout) { #if defined(LINUX_SEM) struct timespec t; struct sembuf sops; t.tv_sec = timeout * 1000000; t.tv_nsec = 0; sops.sem_op = -1; sops.sem_flg = 0; sops.sem_num = 0; DEBUG("wait on latch %p latch->semId %d\n", latch, latch->semId); do { int errStatus = semtimedop(latch->semId, &sops, 1, &t); if (errStatus == 0) { return true; } else if (errStatus == -1) { Assert(errno == EAGAIN || errno == EINTR); if (errno == EAGAIN) { return false; } else { continue; } } } while (true); #else return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0; #endif } /* * Like WaitLatch, but will also return when there's data available in * 'sock' for reading. Returns 0 if timeout was reached, 1 if the latch * was set, or 2 if the scoket became readable. */ int WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout) { #if defined(LINUX_SEM) /* Nothing to do. */ #elif defined(PIPE) struct timeval tv, *tvp = NULL; int result = 0; struct pollfd pfd[2]; int nfds = 1; int rc = 0; if (latch->owner_pid != MyProcPid) elog(ERROR, "cannot wait on a latch owned by another process"); if (timeout > 0) { timeout = timeout * 1000; //millis } pfd[0].fd = latch->pipefds[0]; pfd[0].events = POLLIN; pfd[0].revents = 0x0; if (sock != PGINVALID_SOCKET) { pfd[1].fd = sock; pfd[1].events = POLLIN; pfd[1].revents = 0x0; nfds = 2; } for (;;) { int hifd; rc = poll(pfd, nfds, timeout); if (rc < 0) { if (errno == EINTR) continue; elog(ERROR, "select() failed: %m"); } if (rc == 0 && timeout != -1) { result = 0; break; } if (sock != PGINVALID_SOCKET && (pfd[1].revents & POLLIN) != 0) { result = 2; break; } else { Assert((pfd[0].revents & POLLIN) != 0); result = 1; break; } } return result; #else struct timeval tv, *tvp = NULL; fd_set input_mask; int rc; int result = 0; if (latch->owner_pid != MyProcPid) elog(ERROR, "cannot wait on a latch owned by another process") ; /* Initialize timeout */ if (timeout >= 0) { tv.tv_sec = timeout / 1000000L; tv.tv_usec = timeout % 1000000L; tvp = &tv; } waiting = true; for (;;) { int hifd; /* * Clear the pipe, and check if the latch is set already. If s omeone * sets the latch between this and the select() below, the set ter * will write a byte to the pipe (or signal us and the signal handler * will do that), and the select() will return immediately. */ drainSelfPipe(); if (latch->is_set) { result = 1; break; } FD_ZERO(&input_mask); FD_SET(selfpipe_readfd, &input_mask); hifd = selfpipe_readfd; if (sock != PGINVALID_SOCKET) { FD_SET(sock, &input_mask); if (sock > hifd) hifd = sock; } rc = select(hifd + 1, &input_mask, NULL, NULL, tvp); if (rc < 0) { if (errno == EINTR) continue; elog(ERROR, "select() failed: %m"); } if (rc == 0) { /* timeout exceeded */ result = 0; break; } if (sock != PGINVALID_SOCKET && FD_ISSET(sock, &input_mask)) { result = 2; break; /* data available in socket */ } } waiting = false; return result; #endif } /* * Sets a latch and wakes up anyone waiting on it. Returns quickly if the * latch is already set. */ void SetLatch(volatile Latch *latch) { #if defined(LINUX_SEM) int ret; union semun semun; struct sembuf sops; semun.val = 1; DEBUG("SetLatch %p\n", latch); ret = semctl(latch->semId, 0, SETVAL, semun); Assert(!ret); #elif defined(PIPE) char c; int ret; DEBUG("SetLatch %p\n", latch); ret = write(latch->pipefds[1], &c, 1); Assert(ret == 1); #else pid_t owner_pid; /* Quick exit if already set */ if (latch->is_set) return; DEBUG("SetLatch %p\n", latch); latch->is_set = true; /* * See if anyone's waiting for the latch. It can be the current process * if we're in a signal handler. We use the self-pipe to wake up the * select() in that case. If it's another process, send a signal. * * Fetch owner_pid only once, in case the owner simultaneously disowns * the latch and clears owner_pid. XXX: This assumes that pid_t is * atomic, which isn't guaranteed to be true! In practice, the effecti ve * range of pid_t fits in a 32 bit integer, and so should be atomic. I n * the worst case, we might end up signaling wrong process if the righ t * one disowns the latch just as we fetch owner_pid. Even then, you're * very unlucky if a process with that bogus pid exists. */ owner_pid = latch->owner_pid; if (owner_pid == 0) return; else if (owner_pid == MyProcPid) sendSelfPipeByte(); else kill(owner_pid, SIGUSR1); #endif } /* * Clear the latch. Calling WaitLatch after this will sleep, unless * the latch is set again before the WaitLatch call. */ void ResetLatch(volatile Latch *latch) { #if defined(LINUX_SEM) #elif defined (PIPE) drainLatchPipe(latch); #else /* Only the owner should reset the latch */ Assert(latch->owner_pid == MyProcPid); latch->is_set = false; #endif } /* * SetLatch uses SIGUSR1 to wake up the process waiting on the latch. Wake * up WaitLatch. */ void latch_sigusr1_handler(void) { #ifdef PIPE Assert(false); #endif if (waiting) sendSelfPipeByte(); } /* initialize the self-pipe */ static void initSelfPipe(void) { int pipefd[2]; /* * Set up the self-pipe that allows a signal handler to wake up the * select() in WaitLatch. Make the write-end non-blocking, so that * SetLatch won't block if the event has already been set many times * filling the kernel buffer. Make the read-end non-blocking too, so * that we can easily clear the pipe by reading until EAGAIN or * EWOULDBLOCK. */ if (pipe(pipefd) < 0) elog(FATAL, "pipe() failed: %m"); if (fcntl(pipefd[0], F_SETFL, O_NONBLOCK) < 0) elog(FATAL, "fcntl() failed on read-end of self-pipe: %m"); if (fcntl(pipefd[1], F_SETFL, O_NONBLOCK) < 0) elog(FATAL, "fcntl() failed on write-end of self-pipe: %m"); DEBUG("InitSelfPipe %d\n", getpid()); selfpipe_readfd = pipefd[0]; selfpipe_writefd = pipefd[1]; } /* Send one byte to the self-pipe, to wake up WaitLatch */ static void sendSelfPipeByte(void) { int rc; char dummy = 0; retry: rc = write(selfpipe_writefd, &dummy, 1); if (rc < 0) { /* If interrupted by signal, just retry */ if (errno == EINTR) goto retry; /* * If the pipe is full, we don't need to retry, the data that' s * there already is enough to wake up WaitLatch. */ if (errno == EAGAIN || errno == EWOULDBLOCK) return; /* * Oops, the write() failed for some other reason. We might be in * a signal handler, so it's not safe to elog(). We have no ch oice * but silently ignore the error. */ return; } } /* Read all available data from the self-pipe */ static void drainSelfPipe(void) { /* * There shouldn't normally be more than one byte in the pipe, or mayb e * a few more if multiple processes run SetLatch at the same instant. */ char buf[16]; int rc; for (;;) { rc = read(selfpipe_readfd, buf, sizeof(buf)); if (rc < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) break; /* the pipe is empty */ else if (errno == EINTR) continue; /* retry */ else elog(ERROR, "read() on self-pipe failed: %m"); } else if (rc == 0) elog(ERROR, "unexpected EOF on self-pipe"); } } #ifdef PIPE /* Read all available data from the self-pipe */ static void drainLatchPipe(volatile Latch *latch) { /* * There shouldn't normally be more than one byte in the pipe, or mayb e * a few more if multiple processes run SetLatch at the same instant. */ char buf[16]; int rc; for (;;) { rc = read(latch->pipefds[0], buf, sizeof(buf)); if (rc < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) break; /* the pipe is empty */ else if (errno == EINTR) continue; /* retry */ else elog(ERROR, "read() on self-pipe failed: %m"); } else if (rc == 0) elog(ERROR, "unexpected EOF on latch-pipe"); } } #endif