Skip site navigation (1) Skip section navigation (2)

Re: Fix for non-blocking connections in libpq

From: Bruce Momjian <pgman(at)candle(dot)pha(dot)pa(dot)us>
To: Bernhard Herzog <bh(at)intevation(dot)de>
Cc: pgsql-patches(at)postgresql(dot)org
Subject: Re: Fix for non-blocking connections in libpq
Date: 2002-01-25 02:27:41
Message-ID: 200201250227.g0P2Rfx11500@candle.pha.pa.us (view raw or flat)
Thread:
Lists: pgsql-patches
This has been saved for the 7.3 release:

	http://candle.pha.pa.us/cgi-bin/pgpatches2

---------------------------------------------------------------------------

Bernhard Herzog wrote:
> 
> Here's a patch against 7.1.3 that fixes a problem with sending larger
> queries over non-blocking connections with libpq. "Larger" here
> basically means that it doesn't fit into the output buffer.
> 
> The basic strategy is to fix pqFlush and pqPutBytes.
> 
> The problem with pqFlush as it stands now is that it returns EOF when an
> error occurs or when not all data could be sent. The latter case is
> clearly not an error for a non-blocking connection but the caller can't
> distringuish it from an error very well.
> 
> The first part of the fix is therefore to fix pqFlush. This is done by
> to renaming it to pqSendSome which only differs from pqFlush in its
> return values to allow the caller to make the above distinction and a
> new pqFlush which is implemented in terms of pqSendSome and behaves
> exactly like the old pqFlush.
> 
> The second part of the fix modifies pqPutBytes to use pqSendSome instead
> of pqFlush and to either send all the data or if not all data can be
> sent on a non-blocking connection to at least put all data into the
> output buffer, enlarging it if necessary. The callers of pqPutBytes
> don't have to be changed because from their point of view pqPutBytes
> behaves like before. It either succeeds in queueing all output data or
> fails with an error.
> 
> I've also added a new API function PQsendSome which analogously to
> PQflush just calls pqSendSome. Programs using non-blocking queries
> should use this new function. The main difference is that this function
> will have to be called repeatedly (calling select() properly in between)
> until all data has been written.
> 
> AFAICT, the code in CVS HEAD hasn't changed with respect to non-blocking
> queries and this fix should work there, too, but I haven't tested that
> yet.
> 
> 
>    Bernhard
> 
> diff -rc -x *.o postgresql-7.1.3-orig/src/interfaces/libpq/fe-exec.c postgresql-7.1.3/src/interfaces/libpq/fe-exec.c
> *** postgresql-7.1.3-orig/src/interfaces/libpq/fe-exec.c	Sat Feb 10 03:31:30 2001
> --- postgresql-7.1.3/src/interfaces/libpq/fe-exec.c	Tue Jan 22 12:12:58 2002
> ***************
> *** 2214,2216 ****
> --- 2214,2224 ----
>   
>   	return (pqFlush(conn));
>   }
> + 
> + /* try to force data out, really only useful for non-blocking users.
> +  * This implementation actually works for non-blocking connections */
> + int
> + PQsendSome(PGconn *conn)
> + {
> + 	return pqSendSome(conn);
> + }
> diff -rc -x *.o postgresql-7.1.3-orig/src/interfaces/libpq/fe-misc.c postgresql-7.1.3/src/interfaces/libpq/fe-misc.c
> *** postgresql-7.1.3-orig/src/interfaces/libpq/fe-misc.c	Sun Apr  1 01:13:30 2001
> --- postgresql-7.1.3/src/interfaces/libpq/fe-misc.c	Tue Jan 22 12:07:25 2002
> ***************
> *** 90,144 ****
>   static int
>   pqPutBytes(const char *s, size_t nbytes, PGconn *conn)
>   {
> ! 	size_t		avail = Max(conn->outBufSize - conn->outCount, 0);
> ! 
> ! 	/*
> ! 	 * if we are non-blocking and the send queue is too full to buffer
> ! 	 * this request then try to flush some and return an error
>   	 */
> - 	if (pqIsnonblocking(conn) && nbytes > avail && pqFlush(conn))
> - 	{
>   
> ! 		/*
> ! 		 * even if the flush failed we may still have written some data,
> ! 		 * recalculate the size of the send-queue relative to the amount
> ! 		 * we have to send, we may be able to queue it afterall even
> ! 		 * though it's not sent to the database it's ok, any routines that
> ! 		 * check the data coming from the database better call pqFlush()
> ! 		 * anyway.
> ! 		 */
> ! 		if (nbytes > Max(conn->outBufSize - conn->outCount, 0))
> ! 		{
> ! 			printfPQExpBuffer(&conn->errorMessage,
> ! 						   "pqPutBytes --  pqFlush couldn't flush enough"
> ! 						 " data: space available: %d, space needed %d\n",
> ! 					  Max(conn->outBufSize - conn->outCount, 0), nbytes);
> ! 			return EOF;
> ! 		}
> ! 		/* fixup avail for while loop */
>   		avail = Max(conn->outBufSize - conn->outCount, 0);
> ! 	}
>   
> ! 	/*
> ! 	 * is the amount of data to be sent is larger than the size of the
> ! 	 * output buffer then we must flush it to make more room.
> ! 	 *
> ! 	 * the code above will make sure the loop conditional is never true for
> ! 	 * non-blocking connections
> ! 	 */
> ! 	while (nbytes > avail)
> ! 	{
> ! 		memcpy(conn->outBuffer + conn->outCount, s, avail);
> ! 		conn->outCount += avail;
> ! 		s += avail;
> ! 		nbytes -= avail;
> ! 		if (pqFlush(conn))
> ! 			return EOF;
> ! 		avail = conn->outBufSize;
> ! 	}
>   
> ! 	memcpy(conn->outBuffer + conn->outCount, s, nbytes);
> ! 	conn->outCount += nbytes;
>   
>   	return 0;
>   }
> --- 90,163 ----
>   static int
>   pqPutBytes(const char *s, size_t nbytes, PGconn *conn)
>   {
> ! 	/* Strategy to handle blocking and non-blocking connections: Fill
> ! 	 * the output buffer and flush it repeatedly until either all data
> ! 	 * has been sent or is at least queued in the buffer.
> ! 	 *
> ! 	 * For non-blocking connections, grow the buffer if not all data
> ! 	 * fits into it and the buffer can't be sent because the socket
> ! 	 * would block.
>   	 */
>   
> ! 	while (nbytes)
> ! 	{
> ! 		size_t avail, remaining;
> ! 		
> ! 		/* fill the output buffer */
>   		avail = Max(conn->outBufSize - conn->outCount, 0);
> ! 		remaining = Min(avail, nbytes);
> ! 		memcpy(conn->outBuffer + conn->outCount, s, remaining);
> ! 		conn->outCount += remaining;
> ! 		s += remaining;
> ! 		nbytes -= remaining;
> ! 
> ! 		/* if the data didn't fit completely into the buffer, try to
> ! 		 * flush the buffer */
> ! 		if (nbytes)
> ! 		{
> ! 			int send_result = pqSendSome(conn);
>   
> ! 			/* if there were errors, report them */
> ! 			if (send_result < 0)
> ! 				return EOF;
>   
> ! 			/* if not all data could be sent, increase the output
> ! 			 * buffer, put the rest of s into it and return
> ! 			 * successfully. This case will only happen in a
> ! 			 * non-blocking connection
> ! 			 */
> ! 			if (send_result > 0)
> ! 			{
> ! 				/* try to grow the buffer.
> ! 				 * FIXME: The new size could be chosen more
> ! 				 * intelligently.
> ! 				 */
> ! 				size_t buflen = conn->outCount + nbytes;
> ! 				if (buflen > conn->outBufSize)
> ! 				{
> ! 					char * newbuf = realloc(conn->outBuffer, buflen);
> ! 					if (!newbuf)
> ! 					{
> ! 						/* realloc failed. Probably out of memory */
> ! 						printfPQExpBuffer(&conn->errorMessage,
> ! 								"cannot allocate memory for output buffer\n");
> ! 						return EOF;
> ! 					}
> ! 					conn->outBuffer = newbuf;
> ! 					conn->outBufSize = buflen;
> ! 				}
> ! 				/* put the data into it */
> ! 				memcpy(conn->outBuffer + conn->outCount, s, nbytes);
> ! 				conn->outCount += nbytes;
> ! 
> ! 				/* report success. */
> ! 				return 0;
> ! 			}
> ! 		}
> ! 
> ! 		/* pqSendSome was able to send all data. Continue with the next
> ! 		 * chunk of s. */
> ! 	} /* while */
>   
>   	return 0;
>   }
> ***************
> *** 575,584 ****
>   }
>   
>   /* --------------------------------------------------------------------- */
> ! /* pqFlush: send any data waiting in the output buffer
>    */
>   int
> ! pqFlush(PGconn *conn)
>   {
>   	char	   *ptr = conn->outBuffer;
>   	int			len = conn->outCount;
> --- 594,615 ----
>   }
>   
>   /* --------------------------------------------------------------------- */
> ! /* pqSendSome: send any data waiting in the output buffer and return 0
> !  * if all data was sent, -1 if an error occurred or 1 if not all data
> !  * could be written because the socket would have blocked.
> !  *
> !  * For a blocking connection all data will be sent unless an error
> !  * occurrs. -1 will only be returned if the connection is non-blocking.
> !  *
> !  * Internally, the case of data remaining in the buffer after pqSendSome
> !  * could be determined by looking at outCount, but this function also
> !  * serves as the implementation of the new API function PQsendsome.
> !  *
> !  * FIXME: perhaps it would be more useful to return the number of bytes
> !  * remaining?
>    */
>   int
> ! pqSendSome(PGconn *conn)
>   {
>   	char	   *ptr = conn->outBuffer;
>   	int			len = conn->outCount;
> ***************
> *** 586,593 ****
>   	if (conn->sock < 0)
>   	{
>   		printfPQExpBuffer(&conn->errorMessage,
> ! 						  "pqFlush() -- connection not open\n");
> ! 		return EOF;
>   	}
>   
>   	/*
> --- 617,624 ----
>   	if (conn->sock < 0)
>   	{
>   		printfPQExpBuffer(&conn->errorMessage,
> ! 						  "pqSendSome() -- connection not open\n");
> ! 		return -1;
>   	}
>   
>   	/*
> ***************
> *** 645,651 ****
>   				case ECONNRESET:
>   #endif
>   					printfPQExpBuffer(&conn->errorMessage,
> ! 									  "pqFlush() -- backend closed the channel unexpectedly.\n"
>   									  "\tThis probably means the backend terminated abnormally"
>   						   " before or while processing the request.\n");
>   
> --- 676,682 ----
>   				case ECONNRESET:
>   #endif
>   					printfPQExpBuffer(&conn->errorMessage,
> ! 									  "pqSendSome() -- backend closed the channel unexpectedly.\n"
>   									  "\tThis probably means the backend terminated abnormally"
>   						   " before or while processing the request.\n");
>   
> ***************
> *** 657,670 ****
>   					 * the socket open until pqReadData finds no more data
>   					 * can be read.
>   					 */
> ! 					return EOF;
>   
>   				default:
>   					printfPQExpBuffer(&conn->errorMessage,
> ! 					  "pqFlush() --  couldn't send data: errno=%d\n%s\n",
>   									  errno, strerror(errno));
>   					/* We don't assume it's a fatal error... */
> ! 					return EOF;
>   			}
>   		}
>   		else
> --- 688,701 ----
>   					 * the socket open until pqReadData finds no more data
>   					 * can be read.
>   					 */
> ! 					return -1;
>   
>   				default:
>   					printfPQExpBuffer(&conn->errorMessage,
> ! 					  "pqSendSome() --  couldn't send data: errno=%d\n%s\n",
>   									  errno, strerror(errno));
>   					/* We don't assume it's a fatal error... */
> ! 					return -1;
>   			}
>   		}
>   		else
> ***************
> *** 679,685 ****
>   
>   			/*
>   			 * if the socket is in non-blocking mode we may need to abort
> ! 			 * here
>   			 */
>   #ifdef USE_SSL
>   			/* can't do anything for our SSL users yet */
> --- 710,716 ----
>   
>   			/*
>   			 * if the socket is in non-blocking mode we may need to abort
> ! 			 * here and return 1 to indicate that data is still pending.
>   			 */
>   #ifdef USE_SSL
>   			/* can't do anything for our SSL users yet */
> ***************
> *** 691,704 ****
>   					/* shift the contents of the buffer */
>   					memmove(conn->outBuffer, ptr, len);
>   					conn->outCount = len;
> ! 					return EOF;
>   				}
>   #ifdef USE_SSL
>   			}
>   #endif
>   
>   			if (pqWait(FALSE, TRUE, conn))
> ! 				return EOF;
>   		}
>   	}
>   
> --- 722,735 ----
>   					/* shift the contents of the buffer */
>   					memmove(conn->outBuffer, ptr, len);
>   					conn->outCount = len;
> ! 					return 1;
>   				}
>   #ifdef USE_SSL
>   			}
>   #endif
>   
>   			if (pqWait(FALSE, TRUE, conn))
> ! 				return -1;
>   		}
>   	}
>   
> ***************
> *** 707,712 ****
> --- 738,762 ----
>   	if (conn->Pfdebug)
>   		fflush(conn->Pfdebug);
>   
> + 	return 0;
> + }
> + 
> + 
> + /* --------------------------------------------------------------------- */
> + /* pqFlush: send any data waiting in the output buffer
> +  *
> +  * Implemented in terms of pqSendSome to recreate the old behavior which
> +  * returned 0 if all data was sent or EOF. EOF was sent regardless of
> +  * whether an error occurred or not all data was sent on a non-blocking
> +  * socket.
> +  */
> + int
> + pqFlush(PGconn *conn)
> + {
> + 	if (pqSendSome(conn))
> + 	{
> + 		return EOF;
> + 	}
>   	return 0;
>   }
>   
> diff -rc -x *.o postgresql-7.1.3-orig/src/interfaces/libpq/libpq-fe.h postgresql-7.1.3/src/interfaces/libpq/libpq-fe.h
> *** postgresql-7.1.3-orig/src/interfaces/libpq/libpq-fe.h	Thu Mar 22 05:01:27 2001
> --- postgresql-7.1.3/src/interfaces/libpq/libpq-fe.h	Tue Jan 22 12:13:17 2002
> ***************
> *** 265,270 ****
> --- 265,273 ----
>   
>   	/* Force the write buffer to be written (or at least try) */
>   	extern int	PQflush(PGconn *conn);
> + 	/* Force the write buffer to be written (or at least try)
> +            (better than PQflush) */
> + 	extern int	PQsendSome(PGconn *conn);
>   
>   	/*
>   	 * "Fast path" interface --- not really recommended for application
> diff -rc -x *.o postgresql-7.1.3-orig/src/interfaces/libpq/libpq-int.h postgresql-7.1.3/src/interfaces/libpq/libpq-int.h
> *** postgresql-7.1.3-orig/src/interfaces/libpq/libpq-int.h	Thu Mar 22 05:01:27 2001
> --- postgresql-7.1.3/src/interfaces/libpq/libpq-int.h	Tue Jan 22 12:13:17 2002
> ***************
> *** 321,326 ****
> --- 321,327 ----
>   extern int	pqPutInt(int value, size_t bytes, PGconn *conn);
>   extern int	pqReadData(PGconn *conn);
>   extern int	pqFlush(PGconn *conn);
> + extern int	pqSendSome(PGconn *conn);
>   extern int	pqWait(int forRead, int forWrite, PGconn *conn);
>   extern int	pqReadReady(PGconn *conn);
>   extern int	pqWriteReady(PGconn *conn);
> 
> 
> 
> -- 
> Intevation GmbH                                 http://intevation.de/
> Sketch                                 http://sketch.sourceforge.net/
> MapIt!                                               http://mapit.de/
> 
> 
> 
> ---------------------------(end of broadcast)---------------------------
> TIP 5: Have you checked our extensive FAQ?
> 
> http://www.postgresql.org/users-lounge/docs/faq.html
> 

-- 
  Bruce Momjian                        |  http://candle.pha.pa.us
  pgman(at)candle(dot)pha(dot)pa(dot)us               |  (610) 853-3000
  +  If your life is a hard drive,     |  830 Blythe Avenue
  +  Christ can be your backup.        |  Drexel Hill, Pennsylvania 19026

In response to

pgsql-patches by date

Next:From: Serguei MokhovDate: 2002-01-25 05:31:41
Subject: Russian NLS Update: pg_dump
Previous:From: Bruce MomjianDate: 2002-01-25 02:22:16
Subject: Re: enable debugging in jdbc

Privacy Policy | About PostgreSQL
Copyright © 1996-2014 The PostgreSQL Global Development Group