Re: Fix for non-blocking connections in libpq

From: Bruce Momjian <pgman(at)candle(dot)pha(dot)pa(dot)us>
To: Bernhard Herzog <bh(at)intevation(dot)de>
Cc: pgsql-patches(at)postgresql(dot)org
Subject: Re: Fix for non-blocking connections in libpq
Date: 2002-03-05 06:07:37
Message-ID: 200203050607.g2567bn21015@candle.pha.pa.us
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-patches


Old version backed out. Newer patch applied. Thanks.

---------------------------------------------------------------------------

Bernhard Herzog wrote:
> Bruce Momjian <pgman(at)candle(dot)pha(dot)pa(dot)us> writes:
>
> > Bernard, just checking. Is this the most recent version of your patch?
>
> In principle, yes. However, I've ported it to the CVS version in the
> meantime. Here's a patch against current CVS HEAD:
>
>
> Index: src/interfaces/libpq/fe-exec.c
> ===================================================================
> RCS file: /projects/cvsroot/pgsql/src/interfaces/libpq/fe-exec.c,v
> retrieving revision 1.113
> diff -c -r1.113 fe-exec.c
> *** src/interfaces/libpq/fe-exec.c 2001/10/25 05:50:13 1.113
> --- src/interfaces/libpq/fe-exec.c 2002/02/25 10:21:06
> ***************
> *** 2340,2342 ****
> --- 2340,2350 ----
>
> return (pqFlush(conn));
> }
> +
> + /* try to force data out, really only useful for non-blocking users.
> + * This implementation actually works for non-blocking connections */
> + int
> + PQsendSome(PGconn *conn)
> + {
> + return pqSendSome(conn);
> + }
> Index: src/interfaces/libpq/fe-misc.c
> ===================================================================
> RCS file: /projects/cvsroot/pgsql/src/interfaces/libpq/fe-misc.c,v
> retrieving revision 1.65
> diff -c -r1.65 fe-misc.c
> *** src/interfaces/libpq/fe-misc.c 2001/12/03 00:28:24 1.65
> --- src/interfaces/libpq/fe-misc.c 2002/02/25 10:21:06
> ***************
> *** 110,164 ****
> static int
> pqPutBytes(const char *s, size_t nbytes, PGconn *conn)
> {
> ! size_t avail = Max(conn->outBufSize - conn->outCount, 0);
> !
> ! /*
> ! * if we are non-blocking and the send queue is too full to buffer
> ! * this request then try to flush some and return an error
> */
> ! if (pqIsnonblocking(conn) && nbytes > avail && pqFlush(conn))
> {
> ! /*
> ! * even if the flush failed we may still have written some data,
> ! * recalculate the size of the send-queue relative to the amount
> ! * we have to send, we may be able to queue it afterall even
> ! * though it's not sent to the database it's ok, any routines that
> ! * check the data coming from the database better call pqFlush()
> ! * anyway.
> ! */
> ! if (nbytes > Max(conn->outBufSize - conn->outCount, 0))
> ! {
> ! printfPQExpBuffer(&conn->errorMessage,
> ! libpq_gettext("could not flush enough data (space available: %d, space needed %d)\n"),
> ! (int) Max(conn->outBufSize - conn->outCount, 0),
> ! (int) nbytes);
> ! return EOF;
> ! }
> ! /* fixup avail for while loop */
> avail = Max(conn->outBufSize - conn->outCount, 0);
> ! }
>
> ! /*
> ! * is the amount of data to be sent is larger than the size of the
> ! * output buffer then we must flush it to make more room.
> ! *
> ! * the code above will make sure the loop conditional is never true for
> ! * non-blocking connections
> ! */
> ! while (nbytes > avail)
> ! {
> ! memcpy(conn->outBuffer + conn->outCount, s, avail);
> ! conn->outCount += avail;
> ! s += avail;
> ! nbytes -= avail;
> ! if (pqFlush(conn))
> ! return EOF;
> ! avail = conn->outBufSize;
> ! }
>
> ! memcpy(conn->outBuffer + conn->outCount, s, nbytes);
> ! conn->outCount += nbytes;
>
> return 0;
> }
>
> --- 110,184 ----
> static int
> pqPutBytes(const char *s, size_t nbytes, PGconn *conn)
> {
> ! /* Strategy to handle blocking and non-blocking connections: Fill
> ! * the output buffer and flush it repeatedly until either all data
> ! * has been sent or is at least queued in the buffer.
> ! *
> ! * For non-blocking connections, grow the buffer if not all data
> ! * fits into it and the buffer can't be sent because the socket
> ! * would block.
> */
> !
> ! while (nbytes)
> {
> ! size_t avail, remaining;
> !
> ! /* fill the output buffer */
> avail = Max(conn->outBufSize - conn->outCount, 0);
> ! remaining = Min(avail, nbytes);
> ! memcpy(conn->outBuffer + conn->outCount, s, remaining);
> ! conn->outCount += remaining;
> ! s += remaining;
> ! nbytes -= remaining;
> !
> ! /* if the data didn't fit completely into the buffer, try to
> ! * flush the buffer */
> ! if (nbytes)
> ! {
> ! int send_result = pqSendSome(conn);
>
> ! /* if there were errors, report them */
> ! if (send_result < 0)
> ! return EOF;
>
> ! /* if not all data could be sent, increase the output
> ! * buffer, put the rest of s into it and return
> ! * successfully. This case will only happen in a
> ! * non-blocking connection
> ! */
> ! if (send_result > 0)
> ! {
> ! /* try to grow the buffer.
> ! * FIXME: The new size could be chosen more
> ! * intelligently.
> ! */
> ! size_t buflen = conn->outCount + nbytes;
> ! if (buflen > conn->outBufSize)
> ! {
> ! char * newbuf = realloc(conn->outBuffer, buflen);
> ! if (!newbuf)
> ! {
> ! /* realloc failed. Probably out of memory */
> ! printfPQExpBuffer(&conn->errorMessage,
> ! "cannot allocate memory for output buffer\n");
> ! return EOF;
> ! }
> ! conn->outBuffer = newbuf;
> ! conn->outBufSize = buflen;
> ! }
> ! /* put the data into it */
> ! memcpy(conn->outBuffer + conn->outCount, s, nbytes);
> ! conn->outCount += nbytes;
>
> + /* report success. */
> + return 0;
> + }
> + }
> +
> + /* pqSendSome was able to send all data. Continue with the next
> + * chunk of s. */
> + } /* while */
> +
> return 0;
> }
>
> ***************
> *** 604,613 ****
> }
>
> /*
> ! * pqFlush: send any data waiting in the output buffer
> */
> int
> ! pqFlush(PGconn *conn)
> {
> char *ptr = conn->outBuffer;
> int len = conn->outCount;
> --- 624,636 ----
> }
>
> /*
> ! * pqSendSome: send any data waiting in the output buffer.
> ! *
> ! * Return 0 on sucess, -1 on failure and 1 when data remains because the
> ! * socket would block and the connection is non-blocking.
> */
> int
> ! pqSendSome(PGconn *conn)
> {
> char *ptr = conn->outBuffer;
> int len = conn->outCount;
> ***************
> *** 616,622 ****
> {
> printfPQExpBuffer(&conn->errorMessage,
> libpq_gettext("connection not open\n"));
> ! return EOF;
> }
>
> /*
> --- 639,645 ----
> {
> printfPQExpBuffer(&conn->errorMessage,
> libpq_gettext("connection not open\n"));
> ! return -1;
> }
>
> /*
> ***************
> *** 674,680 ****
> printfPQExpBuffer(&conn->errorMessage,
> libpq_gettext(
> "server closed the connection unexpectedly\n"
> ! "\tThis probably means the server terminated abnormally\n"
> "\tbefore or while processing the request.\n"));
>
> /*
> --- 697,703 ----
> printfPQExpBuffer(&conn->errorMessage,
> libpq_gettext(
> "server closed the connection unexpectedly\n"
> ! "\tThis probably means the server terminated abnormally\n"
> "\tbefore or while processing the request.\n"));
>
> /*
> ***************
> *** 685,698 ****
> * the socket open until pqReadData finds no more data
> * can be read.
> */
> ! return EOF;
>
> default:
> printfPQExpBuffer(&conn->errorMessage,
> libpq_gettext("could not send data to server: %s\n"),
> SOCK_STRERROR(SOCK_ERRNO));
> /* We don't assume it's a fatal error... */
> ! return EOF;
> }
> }
> else
> --- 708,721 ----
> * the socket open until pqReadData finds no more data
> * can be read.
> */
> ! return -1;
>
> default:
> printfPQExpBuffer(&conn->errorMessage,
> libpq_gettext("could not send data to server: %s\n"),
> SOCK_STRERROR(SOCK_ERRNO));
> /* We don't assume it's a fatal error... */
> ! return -1;
> }
> }
> else
> ***************
> *** 707,713 ****
>
> /*
> * if the socket is in non-blocking mode we may need to abort
> ! * here
> */
> #ifdef USE_SSL
> /* can't do anything for our SSL users yet */
> --- 730,736 ----
>
> /*
> * if the socket is in non-blocking mode we may need to abort
> ! * here and return 1 to indicate that data is still pending.
> */
> #ifdef USE_SSL
> /* can't do anything for our SSL users yet */
> ***************
> *** 719,732 ****
> /* shift the contents of the buffer */
> memmove(conn->outBuffer, ptr, len);
> conn->outCount = len;
> ! return EOF;
> }
> #ifdef USE_SSL
> }
> #endif
>
> if (pqWait(FALSE, TRUE, conn))
> ! return EOF;
> }
> }
>
> --- 742,755 ----
> /* shift the contents of the buffer */
> memmove(conn->outBuffer, ptr, len);
> conn->outCount = len;
> ! return 1;
> }
> #ifdef USE_SSL
> }
> #endif
>
> if (pqWait(FALSE, TRUE, conn))
> ! return -1;
> }
> }
>
> ***************
> *** 735,740 ****
> --- 758,783 ----
> if (conn->Pfdebug)
> fflush(conn->Pfdebug);
>
> + return 0;
> + }
> +
> +
> +
> + /*
> + * pqFlush: send any data waiting in the output buffer
> + *
> + * Implemented in terms of pqSendSome to recreate the old behavior which
> + * returned 0 if all data was sent or EOF. EOF was sent regardless of
> + * whether an error occurred or not all data was sent on a non-blocking
> + * socket.
> + */
> + int
> + pqFlush(PGconn *conn)
> + {
> + if (pqSendSome(conn))
> + {
> + return EOF;
> + }
> return 0;
> }
>
> Index: src/interfaces/libpq/libpq-fe.h
> ===================================================================
> RCS file: /projects/cvsroot/pgsql/src/interfaces/libpq/libpq-fe.h,v
> retrieving revision 1.80
> diff -c -r1.80 libpq-fe.h
> *** src/interfaces/libpq/libpq-fe.h 2001/11/08 20:37:52 1.80
> --- src/interfaces/libpq/libpq-fe.h 2002/02/25 10:21:06
> ***************
> *** 279,284 ****
> --- 279,285 ----
>
> /* Force the write buffer to be written (or at least try) */
> extern int PQflush(PGconn *conn);
> + extern int PQsendSome(PGconn *conn);
>
> /*
> * "Fast path" interface --- not really recommended for application
> Index: src/interfaces/libpq/libpq-int.h
> ===================================================================
> RCS file: /projects/cvsroot/pgsql/src/interfaces/libpq/libpq-int.h,v
> retrieving revision 1.44
> diff -c -r1.44 libpq-int.h
> *** src/interfaces/libpq/libpq-int.h 2001/11/05 17:46:38 1.44
> --- src/interfaces/libpq/libpq-int.h 2002/02/25 10:21:06
> ***************
> *** 323,328 ****
> --- 323,329 ----
> extern int pqPutInt(int value, size_t bytes, PGconn *conn);
> extern int pqReadData(PGconn *conn);
> extern int pqFlush(PGconn *conn);
> + extern int pqSendSome(PGconn *conn);
> extern int pqWait(int forRead, int forWrite, PGconn *conn);
> extern int pqReadReady(PGconn *conn);
> extern int pqWriteReady(PGconn *conn);
>
>
>
> Bernhard
>
> --
> Intevation GmbH http://intevation.de/
> Sketch http://sketch.sourceforge.net/
> MapIt! http://mapit.de/
>
> ---------------------------(end of broadcast)---------------------------
> TIP 5: Have you checked our extensive FAQ?
>
> http://www.postgresql.org/users-lounge/docs/faq.html
>

--
Bruce Momjian | http://candle.pha.pa.us
pgman(at)candle(dot)pha(dot)pa(dot)us | (610) 853-3000
+ If your life is a hard drive, | 830 Blythe Avenue
+ Christ can be your backup. | Drexel Hill, Pennsylvania 19026

In response to

Browse pgsql-patches by date

  From Date Subject
Next Message Bruce Momjian 2002-03-05 06:13:16 Re: \du undocumented in psql help
Previous Message Bruce Momjian 2002-03-05 05:54:13 Re: oid2name cleanup