diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index 6497100a1a..824b2f11a3 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -120,8 +120,8 @@ static List *sock_paths = NIL; static char *PqSendBuffer; static int PqSendBufferSize; /* Size send buffer */ -static int PqSendPointer; /* Next index to store a byte in PqSendBuffer */ -static int PqSendStart; /* Next index to send a byte in PqSendBuffer */ +static size_t PqSendPointer; /* Next index to store a byte in PqSendBuffer */ +static size_t PqSendStart; /* Next index to send a byte in PqSendBuffer */ static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE]; static int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */ @@ -133,6 +133,7 @@ static int PqRecvLength; /* End of data available in PqRecvBuffer */ static bool PqCommBusy; /* busy sending data to the client */ static bool PqCommReadingMsg; /* in the middle of reading a message */ +#define internal_flush() internal_flush_buffer(PqSendBuffer, &PqSendStart, &PqSendPointer) /* Internal functions */ static void socket_comm_reset(void); @@ -144,7 +145,8 @@ static bool socket_is_send_pending(void); static int socket_putmessage(char msgtype, const char *s, size_t len); static void socket_putmessage_noblock(char msgtype, const char *s, size_t len); static int internal_putbytes(const char *s, size_t len); -static int internal_flush(void); +static pg_noinline int internal_flush_buffer(const char *s, size_t *start, + size_t *end); static int Lock_AF_UNIX(const char *unixSocketDir, const char *unixSocketPath); static int Setup_AF_UNIX(const char *sock_path); @@ -1282,14 +1284,32 @@ internal_putbytes(const char *s, size_t len) if (internal_flush()) return EOF; } - amount = PqSendBufferSize - PqSendPointer; - if (amount > len) - amount = len; - memcpy(PqSendBuffer + PqSendPointer, s, amount); - PqSendPointer += amount; - s += amount; - len -= amount; + + /* + * If the buffer is empty and data length is larger than the buffer + * size, send it without buffering. Otherwise, put as much data as + * possible into the buffer. + */ + if (len >= PqSendBufferSize && PqSendStart == PqSendPointer) + { + size_t start = 0; + + socket_set_nonblocking(false); + if (internal_flush_buffer(s, &start, &len)) + return EOF; + } + else + { + amount = PqSendBufferSize - PqSendPointer; + if (amount > len) + amount = len; + memcpy(PqSendBuffer + PqSendPointer, s, amount); + PqSendPointer += amount; + s += amount; + len -= amount; + } } + return 0; } @@ -1315,19 +1335,19 @@ socket_flush(void) } /* -------------------------------- - * internal_flush - flush pending output + * internal_flush_buffer - flush the given buffer content * * Returns 0 if OK (meaning everything was sent, or operation would block * and the socket is in non-blocking mode), or EOF if trouble. * -------------------------------- */ -static int -internal_flush(void) +static pg_noinline int +internal_flush_buffer(const char *s, size_t *start, size_t *end) { static int last_reported_send_errno = 0; - char *bufptr = PqSendBuffer + PqSendStart; - char *bufend = PqSendBuffer + PqSendPointer; + char *bufptr = (char*) s + *start; + char *bufend = (char*) s + *end; while (bufptr < bufend) { @@ -1373,7 +1393,7 @@ internal_flush(void) * flag that'll cause the next CHECK_FOR_INTERRUPTS to terminate * the connection. */ - PqSendStart = PqSendPointer = 0; + *start = *end = 0; ClientConnectionLost = 1; InterruptPending = 1; return EOF; @@ -1381,10 +1401,10 @@ internal_flush(void) last_reported_send_errno = 0; /* reset after any successful send */ bufptr += r; - PqSendStart += r; + *start += r; } - PqSendStart = PqSendPointer = 0; + *start = *end = 0; return 0; }