Re: non-blocking patches.

From: Bruce Momjian <pgman(at)candle(dot)pha(dot)pa(dot)us>
To: Alfred Perlstein <bright(at)wintelcom(dot)net>
Cc: pgsql-patches(at)postgreSQL(dot)org
Subject: Re: non-blocking patches.
Date: 2000-06-01 18:02:08
Message-ID: 200006011802.OAA19327@candle.pha.pa.us
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-patches

Just to confirm that these patches appear to have been applied.

>
> These patches to libpq allow a user to toggle the blocking nature of
> the database connection.
>
> They also fix a problem where if the database's pipe was full it could
> busy-loop attempting to flush the send buffer.
>
> It's assumed that callers to PQexec want blocking behavior and the
> mode of the connection will be toggled while the query is being
> executed via PQexec.
>
> A new field has been added to the PGconn structure to allow the database
> to track the non-blocking nature of the connection without polling the
> status of the socket via syscalls.
>
> When in non-blocking mode the library is careful to make sure that
> it will send a complete command/line down the wire before allowing it.
>
> The case of EINTR in pqFlush() is caught and averted from making a
> useless select() call.
>
> There is a problem though, some of the code (particularly "fe-exec.c" line 518)
> may now get out of sync because:
>
> if (pqPutnchar("Q", 1, conn) ||
> pqPuts(query, conn) ||
> pqFlush(conn))
> {
> handleSendFailure(conn);
> return 0;
> }
>
> may send a 'Q' but be unable to send the query, I'm unsure if
> handleSendFailure() is able to reliably deal with this. I may need
> to work on reservations for the send buffer if not.
>
> Does anyone know? I'll be investigating meanwhile but I wanted
> people to get a snapshot of what I was working on so I could get
> some feedback if i'm going in the right direction.
>
> These patches need review. My apologies for not running it through
> pgindent, but the patches supplied postgresql don't seem to apply
> cleanly to FreeBSD's indent any longer and my indent was segfaulting.
>
> Hopefully I kept within the guidelines for acceptable changes.
>
> thanks,
> -Alfred Perlstein - [bright(at)rush(dot)net|alfred(at)freebsd(dot)org]
> Wintelcom systems administrator and programmer
> - http://www.wintelcom.net/ [bright(at)wintelcom(dot)net]
>
>
> Index: fe-connect.c
> ===================================================================
> RCS file: /home/pgcvs/pgsql/src/interfaces/libpq/fe-connect.c,v
> retrieving revision 1.108
> diff -u -u -r1.108 fe-connect.c
> --- fe-connect.c 1999/12/02 00:26:15 1.108
> +++ fe-connect.c 1999/12/14 09:42:24
> @@ -595,31 +595,6 @@
> return 0;
> }
>
> -
> -/* ----------
> - * connectMakeNonblocking -
> - * Make a connection non-blocking.
> - * Returns 1 if successful, 0 if not.
> - * ----------
> - */
> -static int
> -connectMakeNonblocking(PGconn *conn)
> -{
> -#ifndef WIN32
> - if (fcntl(conn->sock, F_SETFL, O_NONBLOCK) < 0)
> -#else
> - if (ioctlsocket(conn->sock, FIONBIO, &on) != 0)
> -#endif
> - {
> - printfPQExpBuffer(&conn->errorMessage,
> - "connectMakeNonblocking -- fcntl() failed: errno=%d\n%s\n",
> - errno, strerror(errno));
> - return 0;
> - }
> -
> - return 1;
> -}
> -
> /* ----------
> * connectNoDelay -
> * Sets the TCP_NODELAY socket option.
> @@ -792,7 +767,7 @@
> * Ewan Mellor <eem21(at)cam(dot)ac(dot)uk>.
> * ---------- */
> #if (!defined(WIN32) || defined(WIN32_NON_BLOCKING_CONNECTIONS)) && !defined(USE_SSL)
> - if (!connectMakeNonblocking(conn))
> + if (PQsetnonblocking(conn, TRUE) != 0)
> goto connect_errReturn;
> #endif
>
> @@ -904,7 +879,7 @@
> /* This makes the connection non-blocking, for all those cases which forced us
> not to do it above. */
> #if (defined(WIN32) && !defined(WIN32_NON_BLOCKING_CONNECTIONS)) || defined(USE_SSL)
> - if (!connectMakeNonblocking(conn))
> + if (PQsetnonblocking(conn, TRUE) != 0)
> goto connect_errReturn;
> #endif
>
> @@ -1702,6 +1677,7 @@
> conn->inBuffer = (char *) malloc(conn->inBufSize);
> conn->outBufSize = 8 * 1024;
> conn->outBuffer = (char *) malloc(conn->outBufSize);
> + conn->nonblocking = FALSE;
> initPQExpBuffer(&conn->errorMessage);
> initPQExpBuffer(&conn->workBuffer);
> if (conn->inBuffer == NULL ||
> @@ -1811,6 +1787,7 @@
> conn->lobjfuncs = NULL;
> conn->inStart = conn->inCursor = conn->inEnd = 0;
> conn->outCount = 0;
> + conn->nonblocking = FALSE;
>
> }
>
> Index: fe-exec.c
> ===================================================================
> RCS file: /home/pgcvs/pgsql/src/interfaces/libpq/fe-exec.c,v
> retrieving revision 1.86
> diff -u -u -r1.86 fe-exec.c
> --- fe-exec.c 1999/11/11 00:10:14 1.86
> +++ fe-exec.c 1999/12/14 05:55:11
> @@ -13,6 +13,7 @@
> */
> #include <errno.h>
> #include <ctype.h>
> +#include <fcntl.h>
>
> #include "postgres.h"
> #include "libpq-fe.h"
> @@ -24,7 +25,6 @@
> #include <unistd.h>
> #endif
>
> -
> /* keep this in same order as ExecStatusType in libpq-fe.h */
> const char *const pgresStatus[] = {
> "PGRES_EMPTY_QUERY",
> @@ -574,7 +574,15 @@
> * we will NOT block waiting for more input.
> */
> if (pqReadData(conn) < 0)
> + {
> + /*
> + * try to flush the send-queue otherwise we may never get a
> + * resonce for something that may not have already been sent
> + * because it's in our write buffer!
> + */
> + pqFlush(conn);
> return 0;
> + }
> /* Parsing of the data waits till later. */
> return 1;
> }
> @@ -1088,8 +1096,17 @@
> {
> PGresult *result;
> PGresult *lastResult;
> + bool savedblocking;
>
> /*
> + * we assume anyone calling PQexec wants blocking behaviour,
> + * we force the blocking status of the connection to blocking
> + * for the duration of this function and restore it on return
> + */
> + savedblocking = PQisnonblocking(conn);
> + PQsetnonblocking(conn, FALSE);
> +
> + /*
> * Silently discard any prior query result that application didn't
> * eat. This is probably poor design, but it's here for backward
> * compatibility.
> @@ -1102,14 +1119,15 @@
> PQclear(result);
> printfPQExpBuffer(&conn->errorMessage,
> "PQexec: you gotta get out of a COPY state yourself.\n");
> - return NULL;
> + /* restore blocking status */
> + goto errout;
> }
> PQclear(result);
> }
>
> /* OK to send the message */
> if (!PQsendQuery(conn, query))
> - return NULL;
> + goto errout;
>
> /*
> * For backwards compatibility, return the last result if there are
> @@ -1142,7 +1160,13 @@
> result->resultStatus == PGRES_COPY_OUT)
> break;
> }
> +
> + PQsetnonblocking(conn, savedblocking);
> return lastResult;
> +
> +errout:
> + PQsetnonblocking(conn, savedblocking);
> + return NULL;
> }
>
>
> @@ -1431,8 +1455,14 @@
> "PQendcopy() -- I don't think there's a copy in progress.\n");
> return 1;
> }
> +
> + /* make sure no data is waiting to be sent */
> + if (pqFlush(conn))
> + return (1);
>
> - (void) pqFlush(conn); /* make sure no data is waiting to be sent */
> + /* non blocking connections may have to abort at this point. */
> + if (PQisnonblocking(conn) && PQisBusy(conn))
> + return (1);
>
> /* Return to active duty */
> conn->asyncStatus = PGASYNC_BUSY;
> @@ -2025,4 +2055,72 @@
> return 1;
> else
> return 0;
> +}
> +
> +/* PQsetnonblocking:
> + sets the PGconn's database connection non-blocking if the arg is TRUE
> + or makes it non-blocking if the arg is FALSE, this will not protect
> + you from PQexec(), you'll only be safe when using the non-blocking
> + API
> + Needs to be called only on a connected database connection.
> +*/
> +
> +int
> +PQsetnonblocking(PGconn *conn, int arg)
> +{
> + int fcntlarg;
> +
> + arg = (arg == TRUE) ? 1 : 0;
> + if (arg == conn->nonblocking)
> + return (0);
> +
> +#ifdef USE_SSL
> + if (conn->ssl)
> + {
> + printfPQExpBuffer(&conn->errorMessage,
> + "PQsetnonblocking() -- not supported when using SSL\n");
> + return (-1);
> + }
> +#endif /* USE_SSL */
> +
> +#ifndef WIN32
> + fcntlarg = fcntl(conn->sock, F_GETFL, 0);
> + if (fcntlarg == -1)
> + return (-1);
> +
> + if ((arg == TRUE &&
> + fcntl(conn->sock, F_SETFL, fcntlarg | O_NONBLOCK) == -1) ||
> + (arg == FALSE &&
> + fcntl(conn->sock, F_SETFL, fcntlarg & ~O_NONBLOCK) == -1))
> +#else
> + fcntlarg = arg;
> + if (ioctlsocket(conn->sock, FIONBIO, &fcntlarg) != 0)
> +#endif
> + {
> + printfPQExpBuffer(&conn->errorMessage,
> + "PQsetblocking() -- unable to set nonblocking status to %s\n",
> + arg == TRUE ? "TRUE" : "FALSE");
> + return (-1);
> + }
> +
> + conn->nonblocking = arg;
> + return (0);
> +}
> +
> +/* return the blocking status of the database connection, TRUE == nonblocking,
> + FALSE == blocking
> +*/
> +int
> +PQisnonblocking(PGconn *conn)
> +{
> +
> + return (conn->nonblocking);
> +}
> +
> +/* try to force data out, really only useful for non-blocking users */
> +int
> +PQflush(PGconn *conn)
> +{
> +
> + return (pqFlush(conn));
> }
> Index: fe-misc.c
> ===================================================================
> RCS file: /home/pgcvs/pgsql/src/interfaces/libpq/fe-misc.c,v
> retrieving revision 1.33
> diff -u -u -r1.33 fe-misc.c
> --- fe-misc.c 1999/11/30 03:08:19 1.33
> +++ fe-misc.c 1999/12/14 08:21:09
> @@ -86,6 +86,34 @@
> {
> size_t avail = Max(conn->outBufSize - conn->outCount, 0);
>
> + /*
> + * if we are non-blocking and the send queue is too full to buffer this
> + * request then try to flush some and return an error
> + */
> + if (PQisnonblocking(conn) && nbytes > avail && pqFlush(conn))
> + {
> + /*
> + * even if the flush failed we may still have written some
> + * data, recalculate the size of the send-queue relative
> + * to the amount we have to send, we may be able to queue it
> + * afterall even though it's not sent to the database it's
> + * ok, any routines that check the data coming from the
> + * database better call pqFlush() anyway.
> + */
> + if (nbytes > Max(conn->outBufSize - conn->outCount, 0))
> + {
> + printfPQExpBuffer(&conn->errorMessage,
> + "pqPutBytes -- pqFlush couldn't flush enough"
> + " data: space available: %d, space needed %d\n",
> + Max(conn->outBufSize - conn->outCount, 0), nbytes);
> + return EOF;
> + }
> + }
> +
> + /*
> + * the non-blocking code above makes sure that this isn't true,
> + * essentially this is no-op
> + */
> while (nbytes > avail)
> {
> memcpy(conn->outBuffer + conn->outCount, s, avail);
> @@ -548,6 +576,14 @@
> return EOF;
> }
>
> + /*
> + * don't try to send zero data, allows us to use this function
> + * without too much worry about overhead
> + */
> + if (len == 0)
> + return (0);
> +
> + /* while there's still data to send */
> while (len > 0)
> {
> /* Prevent being SIGPIPEd if backend has closed the connection. */
> @@ -556,6 +592,7 @@
> #endif
>
> int sent;
> +
> #ifdef USE_SSL
> if (conn->ssl)
> sent = SSL_write(conn->ssl, ptr, len);
> @@ -585,6 +622,8 @@
> case EWOULDBLOCK:
> break;
> #endif
> + case EINTR:
> + continue;
>
> case EPIPE:
> #ifdef ECONNRESET
> @@ -616,13 +655,31 @@
> ptr += sent;
> len -= sent;
> }
> +
> if (len > 0)
> {
> /* We didn't send it all, wait till we can send more */
> +
> + /*
> + * if the socket is in non-blocking mode we may need
> + * to abort here
> + */
> +#ifdef USE_SSL
> + /* can't do anything for our SSL users yet */
> + if (conn->ssl == NULL)
> + {
> +#endif
> + if (PQisnonblocking(conn))
> + {
> + /* shift the contents of the buffer */
> + memmove(conn->outBuffer, ptr, len);
> + conn->outCount = len;
> + return EOF;
> + }
> +#ifdef USE_SSL
> + }
> +#endif
>
> - /* At first glance this looks as though it should block. I think
> - * that it will be OK though, as long as the socket is
> - * non-blocking. */
> if (pqWait(FALSE, TRUE, conn))
> return EOF;
> }
> Index: libpq-fe.h
> ===================================================================
> RCS file: /home/pgcvs/pgsql/src/interfaces/libpq/libpq-fe.h,v
> retrieving revision 1.53
> diff -u -u -r1.53 libpq-fe.h
> --- libpq-fe.h 1999/11/30 03:08:19 1.53
> +++ libpq-fe.h 1999/12/14 01:30:01
> @@ -269,6 +269,13 @@
> extern int PQputnbytes(PGconn *conn, const char *buffer, int nbytes);
> extern int PQendcopy(PGconn *conn);
>
> + /* Set blocking/nonblocking connection to the backend */
> + extern int PQsetnonblocking(PGconn *conn, int arg);
> + extern int PQisnonblocking(PGconn *conn);
> +
> + /* Force the write buffer to be written (or at least try) */
> + extern int PQflush(PGconn *conn);
> +
> /*
> * "Fast path" interface --- not really recommended for application
> * use
> Index: libpq-int.h
> ===================================================================
> RCS file: /home/pgcvs/pgsql/src/interfaces/libpq/libpq-int.h,v
> retrieving revision 1.14
> diff -u -u -r1.14 libpq-int.h
> --- libpq-int.h 1999/11/30 03:08:19 1.14
> +++ libpq-int.h 1999/12/14 01:30:01
> @@ -215,6 +215,9 @@
> int inEnd; /* offset to first position after avail
> * data */
>
> + int nonblocking; /* whether this connection is using a blocking
> + * socket to the backend or not */
> +
> /* Buffer for data not yet sent to backend */
> char *outBuffer; /* currently allocated buffer */
> int outBufSize; /* allocated size of buffer */
>
>
> ************
>
>

--
Bruce Momjian | http://www.op.net/~candle
pgman(at)candle(dot)pha(dot)pa(dot)us | (610) 853-3000
+ If your life is a hard drive, | 830 Blythe Avenue
+ Christ can be your backup. | Drexel Hill, Pennsylvania 19026

Browse pgsql-patches by date

  From Date Subject
Next Message Bruce Momjian 2000-06-01 18:41:36 Re: Lock
Previous Message Bruce Momjian 2000-06-01 17:55:58 Re: Industrial-Strength Logging