From bb5ab738ba2e4e12dc0739de3da51fe244f6e483 Mon Sep 17 00:00:00 2001
From: Filip Janus <fjanus@redhat.com>
Date: Thu, 31 Jul 2025 14:02:16 +0200
Subject: [PATCH v20251001 01/25] Add transparent compression for temporary
 files

This commit implements transparent compression for temporary files in PostgreSQL,
specifically designed for hash join operations that spill to disk.

Features:
- Support for LZ4 and PGLZ compression algorithms
- GUC parameter 'temp_file_compression' to control compression
- Transparent compression/decompression in BufFile layer
- Shared compression buffer to minimize memory allocation
- Hash join integration using BufFileCreateCompressTemp()

The compression is applied automatically when temp_file_compression is enabled,
with no changes required to calling code. Only hash joins use compression
currently, with seeking limited to rewinding to start.

Configuration options:
- temp_file_compression = 'no' (default)
- temp_file_compression = 'pglz'
- temp_file_compression = 'lz4' (requires --with-lz4)

Fix GUC tables structure for compression support
---
 src/Makefile.global.in                        |   1 +
 src/backend/access/gist/gistbuildbuffers.c    |   2 +-
 src/backend/backup/backup_manifest.c          |   2 +-
 src/backend/executor/nodeHashjoin.c           |   2 +-
 src/backend/storage/file/buffile.c            | 317 +++++++++++++++++-
 src/backend/utils/misc/guc_parameters.dat     |   7 +
 src/backend/utils/misc/guc_tables.c           |  13 +
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/backend/utils/sort/logtape.c              |   2 +-
 src/backend/utils/sort/tuplestore.c           |   2 +-
 src/include/storage/buffile.h                 |  12 +-
 11 files changed, 338 insertions(+), 23 deletions(-)

diff --git a/src/Makefile.global.in b/src/Makefile.global.in
index 0aa389bc710..3a8b277a9ae 100644
--- a/src/Makefile.global.in
+++ b/src/Makefile.global.in
@@ -201,6 +201,7 @@ with_liburing	= @with_liburing@
 with_libxml	= @with_libxml@
 with_libxslt	= @with_libxslt@
 with_llvm	= @with_llvm@
+with_lz4	= @with_lz4@
 with_system_tzdata = @with_system_tzdata@
 with_uuid	= @with_uuid@
 with_zlib	= @with_zlib@
diff --git a/src/backend/access/gist/gistbuildbuffers.c b/src/backend/access/gist/gistbuildbuffers.c
index 0707254d18e..9cc371f47fe 100644
--- a/src/backend/access/gist/gistbuildbuffers.c
+++ b/src/backend/access/gist/gistbuildbuffers.c
@@ -54,7 +54,7 @@ gistInitBuildBuffers(int pagesPerBuffer, int levelStep, int maxLevel)
 	 * Create a temporary file to hold buffer pages that are swapped out of
 	 * memory.
 	 */
-	gfbb->pfile = BufFileCreateTemp(false);
+	gfbb->pfile = BufFileCreateTemp(false, false);
 	gfbb->nFileBlocks = 0;
 
 	/* Initialize free page management. */
diff --git a/src/backend/backup/backup_manifest.c b/src/backend/backup/backup_manifest.c
index d05252f383c..35d088db0f3 100644
--- a/src/backend/backup/backup_manifest.c
+++ b/src/backend/backup/backup_manifest.c
@@ -65,7 +65,7 @@ InitializeBackupManifest(backup_manifest_info *manifest,
 		manifest->buffile = NULL;
 	else
 	{
-		manifest->buffile = BufFileCreateTemp(false);
+		manifest->buffile = BufFileCreateTemp(false, false);
 		manifest->manifest_ctx = pg_cryptohash_create(PG_SHA256);
 		if (pg_cryptohash_init(manifest->manifest_ctx) < 0)
 			elog(ERROR, "failed to initialize checksum of backup manifest: %s",
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 5661ad76830..384265ca74a 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -1434,7 +1434,7 @@ ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
 	{
 		MemoryContext oldctx = MemoryContextSwitchTo(hashtable->spillCxt);
 
-		file = BufFileCreateTemp(false);
+		file = BufFileCreateCompressTemp(false);
 		*fileptr = file;
 
 		MemoryContextSwitchTo(oldctx);
diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c
index 366d70d38a1..3cb3b4fcbb7 100644
--- a/src/backend/storage/file/buffile.c
+++ b/src/backend/storage/file/buffile.c
@@ -53,6 +53,17 @@
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
 #include "utils/resowner.h"
+#include "utils/memutils.h"
+#include "common/pg_lzcompress.h"
+
+#ifdef USE_LZ4
+#include <lz4.h>
+#endif
+
+/* Compression types */
+#define TEMP_NONE_COMPRESSION  0
+#define TEMP_PGLZ_COMPRESSION  1
+#define TEMP_LZ4_COMPRESSION   2
 
 /*
  * We break BufFiles into gigabyte-sized segments, regardless of RELSEG_SIZE.
@@ -62,6 +73,8 @@
 #define MAX_PHYSICAL_FILESIZE	0x40000000
 #define BUFFILE_SEG_SIZE		(MAX_PHYSICAL_FILESIZE / BLCKSZ)
 
+int temp_file_compression = TEMP_NONE_COMPRESSION;
+
 /*
  * This data structure represents a buffered file that consists of one or
  * more physical files (each accessed through a virtual file descriptor
@@ -101,6 +114,10 @@ struct BufFile
 	 * wasting per-file alignment padding when some users create many files.
 	 */
 	PGAlignedBlock buffer;
+
+	bool		compress_tempfile; /* transparent compression mode */
+	bool		compress; /* State of usage file compression */
+	char		*cBuffer; /* compression buffer */
 };
 
 static BufFile *makeBufFileCommon(int nfiles);
@@ -127,6 +144,9 @@ makeBufFileCommon(int nfiles)
 	file->curOffset = 0;
 	file->pos = 0;
 	file->nbytes = 0;
+	file->compress_tempfile = false;
+	file->compress = false;
+	file->cBuffer = NULL;
 
 	return file;
 }
@@ -188,9 +208,16 @@ extendBufFile(BufFile *file)
  * Note: if interXact is true, the caller had better be calling us in a
  * memory context, and with a resource owner, that will survive across
  * transaction boundaries.
+ *
+ * If compress is true the temporary files will be compressed before
+ * writing on disk.
+ *
+ * Note: The compression does not support random access. Only the hash joins
+ * use it for now. The seek operation other than seek to the beginning of the
+ * buffile will corrupt temporary data offsets.
  */
 BufFile *
-BufFileCreateTemp(bool interXact)
+BufFileCreateTemp(bool interXact, bool compress)
 {
 	BufFile    *file;
 	File		pfile;
@@ -212,9 +239,68 @@ BufFileCreateTemp(bool interXact)
 	file = makeBufFile(pfile);
 	file->isInterXact = interXact;
 
+	if (temp_file_compression != TEMP_NONE_COMPRESSION)
+	{
+		file->compress = compress;
+	}
+
 	return file;
 }
 
+/*
+ * Wrapper for BufFileCreateTemp
+ * We want to limit the number of memory allocations for the compression buffer,
+ * only one buffer for all compression operations is enough
+ */
+BufFile *
+BufFileCreateCompressTemp(bool interXact)
+{
+	static char *buff = NULL;
+	static int allocated_for_compression = TEMP_NONE_COMPRESSION;
+	static int allocated_size = 0;
+	BufFile    *tmpBufFile = BufFileCreateTemp(interXact, true);
+
+	if (temp_file_compression != TEMP_NONE_COMPRESSION)
+	{
+		int			size = 0;
+
+		switch (temp_file_compression)
+		{
+			case TEMP_LZ4_COMPRESSION:
+#ifdef USE_LZ4
+				size = LZ4_compressBound(BLCKSZ) + sizeof(int);
+#endif
+				break;
+			case TEMP_PGLZ_COMPRESSION:
+				size = pglz_maximum_compressed_size(BLCKSZ, BLCKSZ) + 2 * sizeof(int);
+				break;
+		}
+
+		/*
+		 * Allocate or reallocate buffer if needed:
+		 * - Buffer is NULL (first time)
+		 * - Compression type changed
+		 * - Current buffer is too small
+		 */
+		if (buff == NULL || 
+			allocated_for_compression != temp_file_compression ||
+			allocated_size < size)
+		{
+			if (buff != NULL)
+				pfree(buff);
+			
+			/*
+			 * Persistent buffer for all temporary file compressions
+			 */
+			buff = MemoryContextAlloc(TopMemoryContext, size);
+			allocated_for_compression = temp_file_compression;
+			allocated_size = size;
+		}
+	}
+	tmpBufFile->cBuffer = buff;
+	return tmpBufFile;
+}
+
 /*
  * Build the name for a given segment of a given BufFile.
  */
@@ -454,21 +540,133 @@ BufFileLoadBuffer(BufFile *file)
 	else
 		INSTR_TIME_SET_ZERO(io_start);
 
+	if (!file->compress)
+	{
+
+		/*
+		* Read whatever we can get, up to a full bufferload.
+		*/
+		file->nbytes = FileRead(thisfile,
+								file->buffer.data,
+								sizeof(file->buffer),
+								file->curOffset,
+								WAIT_EVENT_BUFFILE_READ);
+		if (file->nbytes < 0)
+		{
+			file->nbytes = 0;
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not read file \"%s\": %m",
+							FilePathName(thisfile))));
+		}
 	/*
-	 * Read whatever we can get, up to a full bufferload.
+	 * Read and decompress data from the temporary file
+	 * The first reading loads size of the compressed block
+	 * Second reading loads compressed data
 	 */
-	file->nbytes = FileRead(thisfile,
-							file->buffer.data,
-							sizeof(file->buffer.data),
+	} else {
+		int nread;
+		int nbytes;
+
+		nread = FileRead(thisfile,
+							&nbytes,
+							sizeof(nbytes),
 							file->curOffset,
 							WAIT_EVENT_BUFFILE_READ);
-	if (file->nbytes < 0)
-	{
-		file->nbytes = 0;
-		ereport(ERROR,
-				(errcode_for_file_access(),
-				 errmsg("could not read file \"%s\": %m",
-						FilePathName(thisfile))));
+		
+		/* Check if first read succeeded */
+		if (nread != sizeof(nbytes) && nread > 0)
+		{
+			ereport(ERROR,
+					(errcode(ERRCODE_DATA_CORRUPTED),
+					 errmsg_internal("first read is broken")));
+		}
+		
+		/* if not EOF let's continue */
+		if (nread > 0)
+		{
+			/* A long life buffer limits number of memory allocations */
+			char * buff = file->cBuffer;
+			int original_size = 0;
+			int header_advance = sizeof(nbytes);
+
+			Assert(file->cBuffer != NULL);
+
+			/* For PGLZ, read additional original size */
+			if (temp_file_compression == TEMP_PGLZ_COMPRESSION) {
+				int nread_orig = FileRead(thisfile,
+							&original_size,
+							sizeof(original_size),
+							file->curOffset + sizeof(nbytes),
+							WAIT_EVENT_BUFFILE_READ);
+
+				/* Check if second read succeeded */
+				if (nread_orig != sizeof(original_size) && nread_orig > 0) {
+					ereport(ERROR,
+							(errcode(ERRCODE_DATA_CORRUPTED),
+							 errmsg_internal("second read is corrupt: expected %d bytes, got %d bytes", 
+							 				 (int)sizeof(original_size), nread_orig)));
+				}
+
+				if (nread_orig <= 0) {
+					file->nbytes = 0;
+					return;
+				}
+
+				/* Check if data is uncompressed (marker = -1) */
+				if (original_size == -1) {
+
+                    int nread_data = 0;
+					/* Uncompressed data: read directly into buffer */
+					file->curOffset += 2 * sizeof(int);  /* Skip both header fields */
+					nread_data = FileRead(thisfile,
+											file->buffer.data,
+											nbytes,  /* nbytes contains original size */
+											file->curOffset,
+											WAIT_EVENT_BUFFILE_READ);
+					file->nbytes = nread_data;
+					file->curOffset += nread_data;
+					return;
+				}
+
+				header_advance = 2 * sizeof(int);
+			}
+
+			/*
+			 * Read compressed data, curOffset differs with pos
+			 * It reads less data than it returns to caller
+			 * So the curOffset must be advanced here based on compressed size
+			 */
+			file->curOffset += header_advance;
+
+			nread = FileRead(thisfile,
+							buff,
+							nbytes,
+							file->curOffset,
+							WAIT_EVENT_BUFFILE_READ);
+
+			switch (temp_file_compression)
+			{
+				case TEMP_LZ4_COMPRESSION:
+#ifdef USE_LZ4
+					file->nbytes = LZ4_decompress_safe(buff,
+						file->buffer.data,nbytes,sizeof(file->buffer));
+#endif
+					break;
+
+							case TEMP_PGLZ_COMPRESSION:
+				file->nbytes = pglz_decompress(buff,nbytes,
+					file->buffer.data,original_size,false);
+				break;
+			}
+			file->curOffset += nread;
+
+			if (file->nbytes < 0)
+				ereport(ERROR,
+						(errcode(ERRCODE_DATA_CORRUPTED),
+						 errmsg_internal("compressed lz4 data is corrupt")));
+		}
+
 	}
 
 	if (track_io_timing)
@@ -494,8 +692,79 @@ static void
 BufFileDumpBuffer(BufFile *file)
 {
 	int			wpos = 0;
-	int			bytestowrite;
+	int			bytestowrite = 0;
 	File		thisfile;
+	char	   *DataToWrite = file->buffer.data;
+	int			nbytesOriginal = file->nbytes;
+
+	/*
+	 * Compression logic: compress the buffer data if compression is enabled
+	 */
+	if (file->compress)
+	{
+		char	   *cData;
+		int			cSize = 0;
+
+		Assert(file->cBuffer != NULL);
+		cData = file->cBuffer;
+
+		switch (temp_file_compression)
+		{
+			case TEMP_LZ4_COMPRESSION:
+				{
+#ifdef USE_LZ4
+					int			cBufferSize = LZ4_compressBound(file->nbytes);
+
+					/*
+					 * Using stream compression would lead to the slight
+					 * improvement in compression ratio
+					 */
+					cSize = LZ4_compress_default(file->buffer.data,
+												 cData + sizeof(int), file->nbytes, cBufferSize);
+#endif
+					break;
+				}
+			case TEMP_PGLZ_COMPRESSION:
+				cSize = pglz_compress(file->buffer.data, file->nbytes,
+									  cData + 2 * sizeof(int), PGLZ_strategy_always);
+				break;
+		}
+
+		/* Check if compression was successful */
+		if (cSize <= 0) {
+			if (temp_file_compression == TEMP_PGLZ_COMPRESSION) {
+
+                int marker;
+				/* PGLZ compression failed, store uncompressed data with -1 marker */
+				memcpy(cData, &nbytesOriginal, sizeof(int));  /* First field: original size */
+				marker = -1;  /* Second field: -1 = uncompressed marker */
+				memcpy(cData + sizeof(int), &marker, sizeof(int));
+				memcpy(cData + 2 * sizeof(int), file->buffer.data, nbytesOriginal);
+				file->nbytes = nbytesOriginal + 2 * sizeof(int);
+				DataToWrite = cData;
+			} else {
+				/* LZ4 compression failed, report error */
+				ereport(ERROR,
+						(errcode(ERRCODE_DATA_CORRUPTED),
+						 errmsg_internal("LZ4 compression failed: compressed size %d, original size %d", 
+						 				 cSize, nbytesOriginal)));
+			}
+		} else {
+			/*
+			 * Write header in front of compressed data
+			 * LZ4 format: [compressed_size:int][compressed_data]
+			 * PGLZ format: [compressed_size:int][original_size:int][compressed_data]
+			 */
+			memcpy(cData, &cSize, sizeof(int));
+			if (temp_file_compression == TEMP_PGLZ_COMPRESSION) {
+				memcpy(cData + sizeof(int), &nbytesOriginal, sizeof(int));
+				file->nbytes = cSize + 2 * sizeof(int);
+			} else {
+				file->nbytes = cSize + sizeof(int);
+			}
+			DataToWrite = cData;
+		}
+	}
 
 	/*
 	 * Unlike BufFileLoadBuffer, we must dump the whole buffer even if it
@@ -535,7 +804,7 @@ BufFileDumpBuffer(BufFile *file)
 			INSTR_TIME_SET_ZERO(io_start);
 
 		bytestowrite = FileWrite(thisfile,
-								 file->buffer.data + wpos,
+								 DataToWrite + wpos,
 								 bytestowrite,
 								 file->curOffset,
 								 WAIT_EVENT_BUFFILE_WRITE);
@@ -564,7 +833,15 @@ BufFileDumpBuffer(BufFile *file)
 	 * logical file position, ie, original value + pos, in case that is less
 	 * (as could happen due to a small backwards seek in a dirty buffer!)
 	 */
-	file->curOffset -= (file->nbytes - file->pos);
+	if (!file->compress)
+		file->curOffset -= (file->nbytes - file->pos);
+	else if (nbytesOriginal - file->pos != 0)
+		/*
+		 * curOffset must be corrected also if compression is enabled, nbytes
+		 * was changed by compression but we have to use the original value of
+		 * nbytes
+		 */
+		file->curOffset -= bytestowrite;
 	if (file->curOffset < 0)	/* handle possible segment crossing */
 	{
 		file->curFile--;
@@ -602,8 +879,14 @@ BufFileReadCommon(BufFile *file, void *ptr, size_t size, bool exact, bool eofOK)
 	{
 		if (file->pos >= file->nbytes)
 		{
-			/* Try to load more data into buffer. */
-			file->curOffset += file->pos;
+			/* Try to load more data into buffer.
+			 *
+			 * curOffset is moved within BufFileLoadBuffer
+			 * because stored data size differs from loaded/
+			 * decompressed size
+			 */
+			if (!file->compress)
+				file->curOffset += file->pos;
 			file->pos = 0;
 			file->nbytes = 0;
 			BufFileLoadBuffer(file);
diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat
index 6bc6be13d2a..399cf903fff 100644
--- a/src/backend/utils/misc/guc_parameters.dat
+++ b/src/backend/utils/misc/guc_parameters.dat
@@ -3214,6 +3214,13 @@
   options => 'default_toast_compression_options',
 },
 
+{ name => 'temp_file_compression', type => 'enum', context => 'PGC_USERSET', group => 'CLIENT_CONN_STATEMENT',
+  short_desc => 'Sets the default compression method for temporary files.',
+  variable => 'temp_file_compression',
+  boot_val => 'TEMP_NONE_COMPRESSION',
+  options => 'temp_file_compression_options',
+},
+
 { name => 'default_transaction_isolation', type => 'enum', context => 'PGC_USERSET', group => 'CLIENT_CONN_STATEMENT',
   short_desc => 'Sets the transaction isolation level of each new transaction.',
   variable => 'DefaultXactIsoLevel',
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 00c8376cf4d..2fb3891b730 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -78,6 +78,7 @@
 #include "replication/syncrep.h"
 #include "storage/aio.h"
 #include "storage/bufmgr.h"
+#include "storage/buffile.h"
 #include "storage/bufpage.h"
 #include "storage/copydir.h"
 #include "storage/io_worker.h"
@@ -464,6 +465,18 @@ static const struct config_enum_entry default_toast_compression_options[] = {
 	{NULL, 0, false}
 };
 
+/*
+ * pglz and zstd support should be added as future enhancement
+ */
+static const struct config_enum_entry temp_file_compression_options[] = {
+	{"no", TEMP_NONE_COMPRESSION, false},
+	{"pglz", TEMP_PGLZ_COMPRESSION, false},
+#ifdef  USE_LZ4
+	{"lz4", TEMP_LZ4_COMPRESSION, false},
+#endif
+	{NULL, 0, false}
+};
+
 static const struct config_enum_entry wal_compression_options[] = {
 	{"pglz", WAL_COMPRESSION_PGLZ, false},
 #ifdef USE_LZ4
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index c36fcb9ab61..f380983d2f2 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -182,6 +182,7 @@
 
 #max_notify_queue_pages = 1048576	# limits the number of SLRU pages allocated
 					# for NOTIFY / LISTEN queue
+#temp_file_compression = 'no'		# enables temporary files compression
 
 # - Kernel Resources -
 
diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c
index e529ceb8260..d862e22ef18 100644
--- a/src/backend/utils/sort/logtape.c
+++ b/src/backend/utils/sort/logtape.c
@@ -592,7 +592,7 @@ LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker)
 		lts->pfile = BufFileCreateFileSet(&fileset->fs, filename);
 	}
 	else
-		lts->pfile = BufFileCreateTemp(false);
+		lts->pfile = BufFileCreateTemp(false, false);
 
 	return lts;
 }
diff --git a/src/backend/utils/sort/tuplestore.c b/src/backend/utils/sort/tuplestore.c
index c9aecab8d66..ef85924cd21 100644
--- a/src/backend/utils/sort/tuplestore.c
+++ b/src/backend/utils/sort/tuplestore.c
@@ -860,7 +860,7 @@ tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
 			 */
 			oldcxt = MemoryContextSwitchTo(state->context->parent);
 
-			state->myfile = BufFileCreateTemp(state->interXact);
+			state->myfile = BufFileCreateTemp(state->interXact, false);
 
 			MemoryContextSwitchTo(oldcxt);
 
diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h
index a2f4821f240..57908dd5462 100644
--- a/src/include/storage/buffile.h
+++ b/src/include/storage/buffile.h
@@ -32,11 +32,21 @@
 
 typedef struct BufFile BufFile;
 
+typedef enum
+{
+	TEMP_NONE_COMPRESSION,
+	TEMP_PGLZ_COMPRESSION,
+	TEMP_LZ4_COMPRESSION
+} TempCompression;
+
+extern PGDLLIMPORT int temp_file_compression;
+
 /*
  * prototypes for functions in buffile.c
  */
 
-extern BufFile *BufFileCreateTemp(bool interXact);
+extern BufFile *BufFileCreateTemp(bool interXact, bool compress);
+extern BufFile *BufFileCreateCompressTemp(bool interXact);
 extern void BufFileClose(BufFile *file);
 pg_nodiscard extern size_t BufFileRead(BufFile *file, void *ptr, size_t size);
 extern void BufFileReadExact(BufFile *file, void *ptr, size_t size);
-- 
2.51.0

