From 0eaeb4009f1b6956d36a89b1139d49ae1f6db2dc Mon Sep 17 00:00:00 2001
From: Tom Lane <tgl@sss.pgh.pa.us>
Date: Sat, 11 Oct 2025 20:26:24 -0400
Subject: [PATCH v3 3/4] Fix serious performance problems in
 LZ4Stream_read_internal.

I was distressed to find that reading an LZ4-compressed toc.dat
file was hundreds of times slower than it ought to be.  On
investigation, the blame mostly affixes to LZ4Stream_read_overflow's
habit of memmove'ing all the remaining buffered data after each read
operation.  Since reading a TOC file tends to involve a lot of small
(even one-byte) decompression calls, that amounts to an O(N^2) cost.

This could have been fixed with a minimal patch, but to my
eyes LZ4Stream_read_internal and LZ4Stream_read_overflow are
badly-written spaghetti code; in particular the eol_flag logic
is inefficient and duplicative.  I chose to throw the code
away and rewrite from scratch.  This version is about sixty
lines shorter as well as not having the performance issue.

Fortunately, AFAICT the only way to get to this problem is to
manually LZ4-compress the toc.dat and/or blobs.toc files within a
directory-style archive.  Few people do that, which likely explains
the lack of field complaints.

Author: Tom Lane <tgl@sss.pgh.pa.us>
Discussion: https://postgr.es/m/3515357.1760128017@sss.pgh.pa.us
---
 src/bin/pg_dump/compress_lz4.c | 242 ++++++++++++---------------------
 1 file changed, 89 insertions(+), 153 deletions(-)

diff --git a/src/bin/pg_dump/compress_lz4.c b/src/bin/pg_dump/compress_lz4.c
index c9ea895c137..450afd4e2be 100644
--- a/src/bin/pg_dump/compress_lz4.c
+++ b/src/bin/pg_dump/compress_lz4.c
@@ -65,14 +65,12 @@ typedef struct LZ4State
 	char	   *buffer;			/* buffer for compressed data */
 	size_t		buflen;			/* allocated size of buffer */
 	size_t		bufdata;		/* amount of valid data currently in buffer */
-
-	/*
-	 * Used by the Stream API to store already uncompressed data that the
-	 * caller has not consumed.
-	 */
-	size_t		overflowalloclen;
-	size_t		overflowlen;
-	char	   *overflowbuf;
+	/* These fields are used only while decompressing: */
+	size_t		bufnext;		/* next buffer position to decompress */
+	char	   *outbuf;			/* buffer for decompressed data */
+	size_t		outbuflen;		/* allocated size of outbuf */
+	size_t		outbufdata;		/* amount of valid data currently in outbuf */
+	size_t		outbufnext;		/* next outbuf position to return */
 
 	/*
 	 * Used by both APIs to keep track of error codes.
@@ -168,8 +166,8 @@ ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs)
 		pg_fatal("could not create LZ4 decompression context: %s",
 				 LZ4F_getErrorName(status));
 
-	outbuf = pg_malloc0(DEFAULT_IO_BUFFER_SIZE);
-	readbuf = pg_malloc0(DEFAULT_IO_BUFFER_SIZE);
+	outbuf = pg_malloc(DEFAULT_IO_BUFFER_SIZE);
+	readbuf = pg_malloc(DEFAULT_IO_BUFFER_SIZE);
 	readbuflen = DEFAULT_IO_BUFFER_SIZE;
 	while ((r = cs->readF(AH, &readbuf, &readbuflen)) > 0)
 	{
@@ -184,7 +182,6 @@ ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs)
 			size_t		out_size = DEFAULT_IO_BUFFER_SIZE;
 			size_t		read_size = readend - readp;
 
-			memset(outbuf, 0, DEFAULT_IO_BUFFER_SIZE);
 			status = LZ4F_decompress(ctx, outbuf, &out_size,
 									 readp, &read_size, &dec_opt);
 			if (LZ4F_isError(status))
@@ -327,15 +324,16 @@ InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compressi
 
 /*
  * LZ4 equivalent to feof() or gzeof().  Return true iff there is no
- * decompressed output in the overflow buffer and the end of the backing file
- * is reached.
+ * more buffered data and the end of the input file has been reached.
  */
 static bool
 LZ4Stream_eof(CompressFileHandle *CFH)
 {
 	LZ4State   *state = (LZ4State *) CFH->private_data;
 
-	return state->overflowlen == 0 && feof(state->fp);
+	return state->outbufnext >= state->outbufdata &&
+		state->bufnext >= state->bufdata &&
+		feof(state->fp);
 }
 
 static const char *
@@ -357,13 +355,15 @@ LZ4Stream_get_error(CompressFileHandle *CFH)
  *
  * Creates the necessary contexts for either compression or decompression. When
  * compressing data (indicated by compressing=true), it additionally writes the
- * LZ4 header in the output stream.
+ * LZ4 header in the output buffer.
+ *
+ * It's expected that a not-yet-initialized LZ4State will be zero-filled.
  *
  * Returns true on success. In case of a failure returns false, and stores the
  * error code in state->errcode.
  */
 static bool
-LZ4Stream_init(LZ4State *state, int size, bool compressing)
+LZ4Stream_init(LZ4State *state, bool compressing)
 {
 	size_t		status;
 
@@ -386,66 +386,22 @@ LZ4Stream_init(LZ4State *state, int size, bool compressing)
 			return false;
 		}
 
-		state->buflen = Max(size, DEFAULT_IO_BUFFER_SIZE);
+		state->buflen = DEFAULT_IO_BUFFER_SIZE;
 		state->buffer = pg_malloc(state->buflen);
-
-		state->overflowalloclen = state->buflen;
-		state->overflowbuf = pg_malloc(state->overflowalloclen);
-		state->overflowlen = 0;
+		state->outbuflen = DEFAULT_IO_BUFFER_SIZE;
+		state->outbuf = pg_malloc(state->outbuflen);
 	}
 
 	state->inited = true;
 	return true;
 }
 
-/*
- * Read already decompressed content from the overflow buffer into 'ptr' up to
- * 'size' bytes, if available. If the eol_flag is set, then stop at the first
- * occurrence of the newline char prior to 'size' bytes.
- *
- * Any unread content in the overflow buffer is moved to the beginning.
- *
- * Returns the number of bytes read from the overflow buffer (and copied into
- * the 'ptr' buffer), or 0 if the overflow buffer is empty.
- */
-static int
-LZ4Stream_read_overflow(LZ4State *state, void *ptr, int size, bool eol_flag)
-{
-	char	   *p;
-	int			readlen = 0;
-
-	if (state->overflowlen == 0)
-		return 0;
-
-	if (state->overflowlen >= size)
-		readlen = size;
-	else
-		readlen = state->overflowlen;
-
-	if (eol_flag && (p = memchr(state->overflowbuf, '\n', readlen)))
-		/* Include the line terminating char */
-		readlen = p - state->overflowbuf + 1;
-
-	memcpy(ptr, state->overflowbuf, readlen);
-	state->overflowlen -= readlen;
-
-	if (state->overflowlen > 0)
-		memmove(state->overflowbuf, state->overflowbuf + readlen, state->overflowlen);
-
-	return readlen;
-}
-
 /*
  * The workhorse for reading decompressed content out of an LZ4 compressed
  * stream.
  *
  * It will read up to 'ptrsize' decompressed content, or up to the new line
- * char if found first when the eol_flag is set. It is possible that the
- * decompressed output generated by reading any compressed input via the
- * LZ4F API, exceeds 'ptrsize'. Any exceeding decompressed content is stored
- * at an overflow buffer within LZ4State. Of course, when the function is
- * called, it will first try to consume any decompressed content already
- * present in the overflow buffer, before decompressing new content.
+ * char if one is found first when the eol_flag is set.
  *
  * Returns the number of bytes of decompressed data copied into the ptr
  * buffer, or -1 in case of error.
@@ -454,62 +410,85 @@ static int
 LZ4Stream_read_internal(LZ4State *state, void *ptr, int ptrsize, bool eol_flag)
 {
 	int			dsize = 0;
-	int			rsize;
-	int			size = ptrsize;
-	bool		eol_found = false;
-
-	void	   *readbuf;
+	int			remaining = ptrsize;
 
 	/* Lazy init */
-	if (!LZ4Stream_init(state, size, false /* decompressing */ ))
+	if (!LZ4Stream_init(state, false /* decompressing */ ))
 	{
 		pg_log_error("unable to initialize LZ4 library: %s",
 					 LZ4F_getErrorName(state->errcode));
 		return -1;
 	}
 
-	/* No work needs to be done for a zero-sized output buffer */
-	if (size <= 0)
-		return 0;
-
-	/* Verify that there is enough space in the outbuf */
-	if (size > state->buflen)
+	/* Loop until postcondition is satisfied */
+	while (remaining > 0)
 	{
-		state->buflen = size;
-		state->buffer = pg_realloc(state->buffer, size);
-	}
-
-	/* use already decompressed content if available */
-	dsize = LZ4Stream_read_overflow(state, ptr, size, eol_flag);
-	if (dsize == size || (eol_flag && memchr(ptr, '\n', dsize)))
-		return dsize;
-
-	readbuf = pg_malloc(size);
+		/*
+		 * If we already have some decompressed data, return that.
+		 */
+		if (state->outbufnext < state->outbufdata)
+		{
+			char	   *outptr = state->outbuf + state->outbufnext;
+			size_t		readlen = state->outbufdata - state->outbufnext;
+			bool		eol_found = false;
+
+			if (readlen > remaining)
+				readlen = remaining;
+			/* If eol_flag is set, don't read beyond a newline */
+			if (eol_flag)
+			{
+				char	   *eolptr = memchr(outptr, '\n', readlen);
 
-	do
-	{
-		char	   *rp;
-		char	   *rend;
+				if (eolptr)
+				{
+					readlen = eolptr - outptr + 1;
+					eol_found = true;
+				}
+			}
+			memcpy(ptr, outptr, readlen);
+			ptr = ((char *) ptr) + readlen;
+			state->outbufnext += readlen;
+			dsize += readlen;
+			remaining -= readlen;
+			if (eol_found || remaining == 0)
+				break;
+			/* We must have emptied outbuf */
+			Assert(state->outbufnext >= state->outbufdata);
+		}
 
-		rsize = fread(readbuf, 1, size, state->fp);
-		if (rsize < size && !feof(state->fp))
+		/*
+		 * If we don't have any pending compressed data, load more into
+		 * state->buffer.
+		 */
+		if (state->bufnext >= state->bufdata)
 		{
-			pg_log_error("could not read from input file: %m");
-			return -1;
-		}
+			size_t		rsize;
 
-		rp = (char *) readbuf;
-		rend = (char *) readbuf + rsize;
+			rsize = fread(state->buffer, 1, state->buflen, state->fp);
+			if (rsize < state->buflen && !feof(state->fp))
+			{
+				pg_log_error("could not read from input file: %m");
+				return -1;
+			}
+			if (rsize == 0)
+				break;			/* must be EOF */
+			state->bufdata = rsize;
+			state->bufnext = 0;
+		}
 
-		while (rp < rend)
+		/*
+		 * Decompress some data into state->outbuf.
+		 */
 		{
 			size_t		status;
-			size_t		outlen = state->buflen;
-			size_t		read_remain = rend - rp;
-
-			memset(state->buffer, 0, outlen);
-			status = LZ4F_decompress(state->dtx, state->buffer, &outlen,
-									 rp, &read_remain, NULL);
+			size_t		outlen = state->outbuflen;
+			size_t		inlen = state->bufdata - state->bufnext;
+
+			status = LZ4F_decompress(state->dtx,
+									 state->outbuf, &outlen,
+									 state->buffer + state->bufnext,
+									 &inlen,
+									 NULL);
 			if (LZ4F_isError(status))
 			{
 				state->errcode = status;
@@ -517,54 +496,11 @@ LZ4Stream_read_internal(LZ4State *state, void *ptr, int ptrsize, bool eol_flag)
 							 LZ4F_getErrorName(state->errcode));
 				return -1;
 			}
-
-			rp += read_remain;
-
-			/*
-			 * fill in what space is available in ptr if the eol flag is set,
-			 * either skip if one already found or fill up to EOL if present
-			 * in the outbuf
-			 */
-			if (outlen > 0 && dsize < size && eol_found == false)
-			{
-				char	   *p;
-				size_t		lib = (!eol_flag) ? size - dsize : size - 1 - dsize;
-				size_t		len = outlen < lib ? outlen : lib;
-
-				if (eol_flag &&
-					(p = memchr(state->buffer, '\n', outlen)) &&
-					(size_t) (p - state->buffer + 1) <= len)
-				{
-					len = p - state->buffer + 1;
-					eol_found = true;
-				}
-
-				memcpy((char *) ptr + dsize, state->buffer, len);
-				dsize += len;
-
-				/* move what did not fit, if any, at the beginning of the buf */
-				if (len < outlen)
-					memmove(state->buffer, state->buffer + len, outlen - len);
-				outlen -= len;
-			}
-
-			/* if there is available output, save it */
-			if (outlen > 0)
-			{
-				while (state->overflowlen + outlen > state->overflowalloclen)
-				{
-					state->overflowalloclen *= 2;
-					state->overflowbuf = pg_realloc(state->overflowbuf,
-													state->overflowalloclen);
-				}
-
-				memcpy(state->overflowbuf + state->overflowlen, state->buffer, outlen);
-				state->overflowlen += outlen;
-			}
+			state->bufnext += inlen;
+			state->outbufdata = outlen;
+			state->outbufnext = 0;
 		}
-	} while (rsize == size && dsize < size && eol_found == false);
-
-	pg_free(readbuf);
+	}
 
 	return dsize;
 }
@@ -579,7 +515,7 @@ LZ4Stream_write(const void *ptr, size_t size, CompressFileHandle *CFH)
 	size_t		remaining = size;
 
 	/* Lazy init */
-	if (!LZ4Stream_init(state, size, true))
+	if (!LZ4Stream_init(state, true))
 		pg_fatal("unable to initialize LZ4 library: %s",
 				 LZ4F_getErrorName(state->errcode));
 
@@ -742,7 +678,7 @@ LZ4Stream_close(CompressFileHandle *CFH)
 			if (LZ4F_isError(status))
 				pg_log_error("could not end decompression: %s",
 							 LZ4F_getErrorName(status));
-			pg_free(state->overflowbuf);
+			pg_free(state->outbuf);
 		}
 
 		pg_free(state->buffer);
-- 
2.43.7

