Index: src/backend/postmaster/pgstat.c =================================================================== RCS file: /cvsroot/pgsql/src/backend/postmaster/pgstat.c,v retrieving revision 1.118 diff -c -c -r1.118 pgstat.c *** src/backend/postmaster/pgstat.c 3 Jan 2006 19:54:08 -0000 1.118 --- src/backend/postmaster/pgstat.c 4 Jan 2006 23:06:26 -0000 *************** *** 109,117 **** * ---------- */ NON_EXEC_STATIC int pgStatSock = -1; - NON_EXEC_STATIC int pgStatPipe[2] = {-1, -1}; static struct sockaddr_storage pgStatAddr; - static pid_t pgStatCollectorPid = 0; static time_t last_pgstat_start_time; --- 109,115 ---- *************** *** 166,172 **** NON_EXEC_STATIC void PgstatBufferMain(int argc, char *argv[]); NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]); static void force_statwrite(SIGNAL_ARGS); - static void pgstat_recvbuffer(void); static void pgstat_exit(SIGNAL_ARGS); static void pgstat_die(SIGNAL_ARGS); static void pgstat_beshutdown_hook(int code, Datum arg); --- 164,169 ---- *************** *** 1491,1536 **** pgstat_parseArgs(argc, argv); #endif - /* - * Start a buffering process to read from the socket, so we have a little - * more time to process incoming messages. - * - * NOTE: the process structure is: postmaster is parent of buffer process - * is parent of collector process. This way, the buffer can detect - * collector failure via SIGCHLD, whereas otherwise it wouldn't notice - * collector failure until it tried to write on the pipe. That would mean - * that after the postmaster started a new collector, we'd have two buffer - * processes competing to read from the UDP socket --- not good. - */ - if (pgpipe(pgStatPipe) < 0) - ereport(ERROR, - (errcode_for_socket_access(), - errmsg("could not create pipe for statistics buffer: %m"))); - /* child becomes collector process */ ! #ifdef EXEC_BACKEND ! pgStatCollectorPid = pgstat_forkexec(STAT_PROC_COLLECTOR); ! #else ! pgStatCollectorPid = fork(); ! #endif ! switch (pgStatCollectorPid) ! { ! case -1: ! ereport(ERROR, ! (errmsg("could not fork statistics collector: %m"))); ! ! #ifndef EXEC_BACKEND ! case 0: ! /* child becomes collector process */ ! PgstatCollectorMain(0, NULL); ! break; ! #endif ! ! default: ! /* parent becomes buffer process */ ! closesocket(pgStatPipe[0]); ! pgstat_recvbuffer(); ! } exit(0); } --- 1488,1495 ---- pgstat_parseArgs(argc, argv); #endif /* child becomes collector process */ ! PgstatCollectorMain(0, NULL); exit(0); } *************** *** 1548,1559 **** PgstatCollectorMain(int argc, char *argv[]) { PgStat_Msg msg; - fd_set rfds; - int readPipe; - int len = 0; - struct itimerval timeval; HASHCTL hash_ctl; bool need_timer = false; MyProcPid = getpid(); /* reset MyProcPid */ --- 1507,1517 ---- PgstatCollectorMain(int argc, char *argv[]) { PgStat_Msg msg; HASHCTL hash_ctl; bool need_timer = false; + struct itimerval timeval; + bool is_block_mode = false; + int loops = 0; MyProcPid = getpid(); /* reset MyProcPid */ *************** *** 1587,1596 **** pgstat_parseArgs(argc, argv); #endif - /* Close unwanted files */ - closesocket(pgStatPipe[1]); - closesocket(pgStatSock); - /* * Identify myself via ps */ --- 1545,1550 ---- *************** *** 1626,1791 **** pgStatBeTable = (PgStat_StatBeEntry *) palloc0(sizeof(PgStat_StatBeEntry) * MaxBackends); - readPipe = pgStatPipe[0]; - /* * Process incoming messages and handle all the reporting stuff until * there are no more messages. */ for (;;) { if (need_statwrite) { ! pgstat_write_statsfile(); need_statwrite = false; need_timer = true; } ! /* ! * Setup the descriptor set for select(2) ! */ ! FD_ZERO(&rfds); ! FD_SET(readPipe, &rfds); ! ! /* ! * Now wait for something to do. ! */ ! if (select(readPipe + 1, &rfds, NULL, NULL, NULL) < 0) { ! if (errno == EINTR) ! continue; ! ereport(ERROR, ! (errcode_for_socket_access(), ! errmsg("select() failed in statistics collector: %m"))); } ! /* ! * Check if there is a new statistics message to collect. ! */ ! if (FD_ISSET(readPipe, &rfds)) ! { ! /* ! * We may need to issue multiple read calls in case the buffer ! * process didn't write the message in a single write, which is ! * possible since it dumps its buffer bytewise. In any case, we'd ! * need two reads since we don't know the message length ! * initially. ! */ ! int nread = 0; ! int targetlen = sizeof(PgStat_MsgHdr); /* initial */ ! bool pipeEOF = false; ! while (nread < targetlen) { ! len = piperead(readPipe, ((char *) &msg) + nread, ! targetlen - nread); ! if (len < 0) ! { ! if (errno == EINTR) ! continue; ! ereport(ERROR, ! (errcode_for_socket_access(), ! errmsg("could not read from statistics collector pipe: %m"))); ! } ! if (len == 0) /* EOF on the pipe! */ { ! pipeEOF = true; ! break; ! } ! nread += len; ! if (nread == sizeof(PgStat_MsgHdr)) ! { ! /* we have the header, compute actual msg length */ ! targetlen = msg.msg_hdr.m_size; ! if (targetlen < (int) sizeof(PgStat_MsgHdr) || ! targetlen > (int) sizeof(msg)) ! { ! /* ! * Bogus message length implies that we got out of ! * sync with the buffer process somehow. Abort so that ! * we can restart both processes. ! */ ! ereport(ERROR, ! (errmsg("invalid statistics message length"))); ! } } } ! ! /* ! * EOF on the pipe implies that the buffer process exited. Fall ! * out of outer loop. ! */ ! if (pipeEOF) ! break; ! ! /* ! * Distribute the message to the specific function handling it. ! */ ! switch (msg.msg_hdr.m_type) { ! case PGSTAT_MTYPE_DUMMY: ! break; ! case PGSTAT_MTYPE_BESTART: ! pgstat_recv_bestart((PgStat_MsgBestart *) &msg, nread); ! break; ! case PGSTAT_MTYPE_BETERM: ! pgstat_recv_beterm((PgStat_MsgBeterm *) &msg, nread); ! break; ! case PGSTAT_MTYPE_TABSTAT: ! pgstat_recv_tabstat((PgStat_MsgTabstat *) &msg, nread); ! break; ! case PGSTAT_MTYPE_TABPURGE: ! pgstat_recv_tabpurge((PgStat_MsgTabpurge *) &msg, nread); ! break; ! case PGSTAT_MTYPE_ACTIVITY: ! pgstat_recv_activity((PgStat_MsgActivity *) &msg, nread); ! break; ! case PGSTAT_MTYPE_DROPDB: ! pgstat_recv_dropdb((PgStat_MsgDropdb *) &msg, nread); ! break; ! case PGSTAT_MTYPE_RESETCOUNTER: ! pgstat_recv_resetcounter((PgStat_MsgResetcounter *) &msg, ! nread); ! break; ! case PGSTAT_MTYPE_AUTOVAC_START: ! pgstat_recv_autovac((PgStat_MsgAutovacStart *) &msg, nread); ! break; ! case PGSTAT_MTYPE_VACUUM: ! pgstat_recv_vacuum((PgStat_MsgVacuum *) &msg, nread); ! break; ! case PGSTAT_MTYPE_ANALYZE: ! pgstat_recv_analyze((PgStat_MsgAnalyze *) &msg, nread); ! break; ! default: ! break; ! } ! /* ! * Globally count messages. ! */ ! pgStatNumMessages++; ! if (need_timer) ! { ! if (setitimer(ITIMER_REAL, &timeval, NULL)) ! ereport(ERROR, ! (errmsg("unable to set statistics collector timer: %m"))); ! need_timer = false; ! } } /* * Note that we do NOT check for postmaster exit inside the loop; only * EOF on the buffer pipe causes us to fall out. This ensures we * don't exit prematurely if there are still a few messages in the --- 1580,1704 ---- pgStatBeTable = (PgStat_StatBeEntry *) palloc0(sizeof(PgStat_StatBeEntry) * MaxBackends); /* * Process incoming messages and handle all the reporting stuff until * there are no more messages. */ for (;;) { + int nread; + if (need_statwrite) { ! //pgstat_write_statsfile(); need_statwrite = false; need_timer = true; } ! if (need_timer) { ! if (setitimer(ITIMER_REAL, &timeval, NULL)) ! ereport(ERROR, ! (errmsg("unable to set statistics collector timer: %m"))); ! need_timer = false; } ! nread = recv(pgStatSock, (char *) &msg, ! sizeof(PgStat_Msg), 0); ! if (nread > 0 && is_block_mode) /* got data */ ! { ! pg_set_noblock(pgStatSock); ! is_block_mode = false; ! } ! else if (nread < 0) ! { ! if (errno == EAGAIN) { ! if (!is_block_mode) { ! /* no data, block mode */ ! pg_set_block(pgStatSock); ! is_block_mode = true; } + continue; } ! else if (errno == EINTR) { ! if (!PostmasterIsAlive(true)) ! ereport(ERROR, ! (errmsg("stats collector exited: %m"))); ! continue; ! } ! else ! ereport(ERROR, ! (errmsg("stats collector exited: %m"))); ! } ! //fprintf(stderr, "nread = %d, type = %d\n", nread, msg.msg_hdr.m_type); ! if (++loops % 1000 == 0) ! fprintf(stderr, "loops = %d\n", loops); ! /* ! * Distribute the message to the specific function handling it. ! */ ! switch (msg.msg_hdr.m_type) ! { ! case PGSTAT_MTYPE_DUMMY: ! break; ! case PGSTAT_MTYPE_BESTART: ! pgstat_recv_bestart((PgStat_MsgBestart *) &msg, nread); ! break; ! case PGSTAT_MTYPE_BETERM: ! pgstat_recv_beterm((PgStat_MsgBeterm *) &msg, nread); ! break; ! case PGSTAT_MTYPE_TABSTAT: ! pgstat_recv_tabstat((PgStat_MsgTabstat *) &msg, nread); ! break; ! case PGSTAT_MTYPE_TABPURGE: ! pgstat_recv_tabpurge((PgStat_MsgTabpurge *) &msg, nread); ! break; ! case PGSTAT_MTYPE_ACTIVITY: ! pgstat_recv_activity((PgStat_MsgActivity *) &msg, nread); ! break; ! case PGSTAT_MTYPE_DROPDB: ! pgstat_recv_dropdb((PgStat_MsgDropdb *) &msg, nread); ! break; ! case PGSTAT_MTYPE_RESETCOUNTER: ! pgstat_recv_resetcounter((PgStat_MsgResetcounter *) &msg, ! nread); ! break; ! case PGSTAT_MTYPE_AUTOVAC_START: ! pgstat_recv_autovac((PgStat_MsgAutovacStart *) &msg, nread); ! break; ! case PGSTAT_MTYPE_VACUUM: ! pgstat_recv_vacuum((PgStat_MsgVacuum *) &msg, nread); ! break; ! case PGSTAT_MTYPE_ANALYZE: ! pgstat_recv_analyze((PgStat_MsgAnalyze *) &msg, nread); ! break; ! default: ! break; } /* + * Globally count messages. + */ + pgStatNumMessages++; + + + /* * Note that we do NOT check for postmaster exit inside the loop; only * EOF on the buffer pipe causes us to fall out. This ensures we * don't exit prematurely if there are still a few messages in the *************** *** 1813,2032 **** } - /* ---------- - * pgstat_recvbuffer() - - * - * This is the body of the separate buffering process. Its only - * purpose is to receive messages from the UDP socket as fast as - * possible and forward them over a pipe into the collector itself. - * If the collector is slow to absorb messages, they are buffered here. - * ---------- - */ - static void - pgstat_recvbuffer(void) - { - fd_set rfds; - fd_set wfds; - struct timeval timeout; - int writePipe = pgStatPipe[1]; - int maxfd; - int len; - int xfr; - int frm; - PgStat_Msg input_buffer; - char *msgbuffer; - int msg_send = 0; /* next send index in buffer */ - int msg_recv = 0; /* next receive index */ - int msg_have = 0; /* number of bytes stored */ - bool overflow = false; - - /* - * Identify myself via ps - */ - init_ps_display("stats buffer process", "", ""); - set_ps_display(""); - - /* - * We want to die if our child collector process does. There are two ways - * we might notice that it has died: receive SIGCHLD, or get a write - * failure on the pipe leading to the child. We can set SIGPIPE to kill - * us here. Our SIGCHLD handler was already set up before we forked (must - * do it that way, else it's a race condition). - */ - pqsignal(SIGPIPE, SIG_DFL); - PG_SETMASK(&UnBlockSig); - - /* - * Set the write pipe to nonblock mode, so that we cannot block when the - * collector falls behind. - */ - if (!pg_set_noblock(writePipe)) - ereport(ERROR, - (errcode_for_socket_access(), - errmsg("could not set statistics collector pipe to nonblocking mode: %m"))); - - /* - * Allocate the message buffer - */ - msgbuffer = (char *) palloc(PGSTAT_RECVBUFFERSZ); - - /* - * Loop forever - */ - for (;;) - { - FD_ZERO(&rfds); - FD_ZERO(&wfds); - maxfd = -1; - - /* - * As long as we have buffer space we add the socket to the read - * descriptor set. - */ - if (msg_have <= (int) (PGSTAT_RECVBUFFERSZ - sizeof(PgStat_Msg))) - { - FD_SET(pgStatSock, &rfds); - maxfd = pgStatSock; - overflow = false; - } - else - { - if (!overflow) - { - ereport(LOG, - (errmsg("statistics buffer is full"))); - overflow = true; - } - } - - /* - * If we have messages to write out, we add the pipe to the write - * descriptor set. - */ - if (msg_have > 0) - { - FD_SET(writePipe, &wfds); - if (writePipe > maxfd) - maxfd = writePipe; - } - - /* - * Wait for some work to do; but not for more than 10 seconds. (This - * determines how quickly we will shut down after an ungraceful - * postmaster termination; so it needn't be very fast.) struct timeout - * is modified by some operating systems. - */ - timeout.tv_sec = 10; - timeout.tv_usec = 0; - - if (select(maxfd + 1, &rfds, &wfds, NULL, &timeout) < 0) - { - if (errno == EINTR) - continue; - ereport(ERROR, - (errcode_for_socket_access(), - errmsg("select() failed in statistics buffer: %m"))); - } - - /* - * If there is a message on the socket, read it and check for - * validity. - */ - if (FD_ISSET(pgStatSock, &rfds)) - { - len = recv(pgStatSock, (char *) &input_buffer, - sizeof(PgStat_Msg), 0); - if (len < 0) - ereport(ERROR, - (errcode_for_socket_access(), - errmsg("could not read statistics message: %m"))); - - /* - * We ignore messages that are smaller than our common header - */ - if (len < sizeof(PgStat_MsgHdr)) - continue; - - /* - * The received length must match the length in the header - */ - if (input_buffer.msg_hdr.m_size != len) - continue; - - /* - * O.K. - we accept this message. Copy it to the circular - * msgbuffer. - */ - frm = 0; - while (len > 0) - { - xfr = PGSTAT_RECVBUFFERSZ - msg_recv; - if (xfr > len) - xfr = len; - Assert(xfr > 0); - memcpy(msgbuffer + msg_recv, - ((char *) &input_buffer) + frm, - xfr); - msg_recv += xfr; - if (msg_recv == PGSTAT_RECVBUFFERSZ) - msg_recv = 0; - msg_have += xfr; - frm += xfr; - len -= xfr; - } - } - - /* - * If the collector is ready to receive, write some data into his - * pipe. We may or may not be able to write all that we have. - * - * NOTE: if what we have is less than PIPE_BUF bytes but more than the - * space available in the pipe buffer, most kernels will refuse to - * write any of it, and will return EAGAIN. This means we will - * busy-loop until the situation changes (either because the collector - * caught up, or because more data arrives so that we have more than - * PIPE_BUF bytes buffered). This is not good, but is there any way - * around it? We have no way to tell when the collector has caught - * up... - */ - if (FD_ISSET(writePipe, &wfds)) - { - xfr = PGSTAT_RECVBUFFERSZ - msg_send; - if (xfr > msg_have) - xfr = msg_have; - Assert(xfr > 0); - len = pipewrite(writePipe, msgbuffer + msg_send, xfr); - if (len < 0) - { - if (errno == EINTR || errno == EAGAIN) - continue; /* not enough space in pipe */ - ereport(ERROR, - (errcode_for_socket_access(), - errmsg("could not write to statistics collector pipe: %m"))); - } - /* NB: len < xfr is okay */ - msg_send += len; - if (msg_send == PGSTAT_RECVBUFFERSZ) - msg_send = 0; - msg_have -= len; - } - - /* - * Make sure we forwarded all messages before we check for postmaster - * termination. - */ - if (msg_have != 0 || FD_ISSET(pgStatSock, &rfds)) - continue; - - /* - * If the postmaster has terminated, we die too. (This is no longer - * the normal exit path, however.) - */ - if (!PostmasterIsAlive(true)) - exit(0); - } - } - /* SIGQUIT signal handler for buffer process */ static void pgstat_exit(SIGNAL_ARGS) --- 1726,1731 ---- *************** *** 2049,2054 **** --- 1748,1754 ---- exit(0); } + /* SIGCHLD signal handler for buffer process */ static void pgstat_die(SIGNAL_ARGS)