From 592ba861a18bcd936534f5d014d8debcc09abee6 Mon Sep 17 00:00:00 2001
From: Ildus Kurbangaliev <i.kurbangaliev@gmail.com>
Date: Mon, 18 Jun 2018 15:51:07 +0300
Subject: [PATCH 5/8] Add zlib compression method

Signed-off-by: Ildus Kurbangaliev <i.kurbangaliev@gmail.com>
---
 src/backend/Makefile                        |   2 +-
 src/backend/access/compression/Makefile     |   2 +-
 src/backend/access/compression/cm_zlib.c    | 252 ++++++++++++++++++++
 src/include/catalog/pg_am.dat               |   3 +
 src/include/catalog/pg_attr_compression.dat |   1 +
 src/include/catalog/pg_proc.dat             |   4 +
 6 files changed, 262 insertions(+), 2 deletions(-)
 create mode 100644 src/backend/access/compression/cm_zlib.c

diff --git a/src/backend/Makefile b/src/backend/Makefile
index 478a96db9b..bd5009e190 100644
--- a/src/backend/Makefile
+++ b/src/backend/Makefile
@@ -45,7 +45,7 @@ OBJS = $(SUBDIROBJS) $(LOCALOBJS) $(top_builddir)/src/port/libpgport_srv.a \
 LIBS := $(filter-out -lpgport -lpgcommon, $(LIBS)) $(LDAP_LIBS_BE) $(ICU_LIBS)
 
 # The backend doesn't need everything that's in LIBS, however
-LIBS := $(filter-out -lz -lreadline -ledit -ltermcap -lncurses -lcurses, $(LIBS))
+LIBS := $(filter-out -lreadline -ledit -ltermcap -lncurses -lcurses, $(LIBS))
 
 ifeq ($(with_systemd),yes)
 LIBS += -lsystemd
diff --git a/src/backend/access/compression/Makefile b/src/backend/access/compression/Makefile
index 14286920d3..7ea5ee2e43 100644
--- a/src/backend/access/compression/Makefile
+++ b/src/backend/access/compression/Makefile
@@ -12,6 +12,6 @@ subdir = src/backend/access/compression
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = cm_pglz.o cmapi.o
+OBJS = cm_pglz.o cm_zlib.o cmapi.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/compression/cm_zlib.c b/src/backend/access/compression/cm_zlib.c
new file mode 100644
index 0000000000..0dcb56ddf3
--- /dev/null
+++ b/src/backend/access/compression/cm_zlib.c
@@ -0,0 +1,252 @@
+/*-------------------------------------------------------------------------
+ *
+ * cm_zlib.c
+ *	  zlib compression method
+ *
+ * Copyright (c) 2015-2018, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/compression/cm_zlib.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+#include "access/cmapi.h"
+#include "commands/defrem.h"
+#include "nodes/parsenodes.h"
+#include "utils/builtins.h"
+
+#ifdef HAVE_LIBZ
+#include <zlib.h>
+
+#define ZLIB_MAX_DICTIONARY_LENGTH		32768
+#define ZLIB_DICTIONARY_DELIM			(" ,")
+
+typedef struct
+{
+	int				level;
+	Bytef			dict[ZLIB_MAX_DICTIONARY_LENGTH];
+	unsigned int	dictlen;
+} zlib_state;
+
+/*
+ * Check options if specified. All validation is located here so
+ * we don't need do it again in cminitstate function.
+ */
+static void
+zlib_cmcheck(Form_pg_attribute att, List *options)
+{
+	ListCell	*lc;
+	foreach(lc, options)
+	{
+		DefElem    *def = (DefElem *) lfirst(lc);
+
+		if (strcmp(def->defname, "level") == 0)
+		{
+			if (strcmp(defGetString(def), "best_speed") != 0 &&
+				strcmp(defGetString(def), "best_compression") != 0 &&
+				strcmp(defGetString(def), "default") != 0)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("unexpected value for zlib compression level: \"%s\"", defGetString(def))));
+		}
+		else if (strcmp(def->defname, "dict") == 0)
+		{
+			int		ntokens = 0;
+			char   *val,
+				   *tok;
+
+			val = pstrdup(defGetString(def));
+			if (strlen(val) > (ZLIB_MAX_DICTIONARY_LENGTH - 1))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						(errmsg("zlib dictionary length should be less than %d", ZLIB_MAX_DICTIONARY_LENGTH))));
+
+			while ((tok = strtok(val, ZLIB_DICTIONARY_DELIM)) != NULL)
+			{
+				ntokens++;
+				val = NULL;
+			}
+
+			if (ntokens < 2)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						(errmsg("zlib dictionary is too small"))));
+		}
+		else
+			ereport(ERROR,
+					(errcode(ERRCODE_UNDEFINED_PARAMETER),
+					 errmsg("unexpected parameter for zlib: \"%s\"", def->defname)));
+	}
+}
+
+static void *
+zlib_cminitstate(Oid acoid, List *options)
+{
+	zlib_state		*state = NULL;
+
+	state = palloc0(sizeof(zlib_state));
+	state->level = Z_DEFAULT_COMPRESSION;
+
+	if (list_length(options) > 0)
+	{
+		ListCell	*lc;
+
+		foreach(lc, options)
+		{
+			DefElem    *def = (DefElem *) lfirst(lc);
+
+			if (strcmp(def->defname, "level") == 0)
+			{
+				if (strcmp(defGetString(def), "best_speed") == 0)
+					state->level = Z_BEST_SPEED;
+				else if (strcmp(defGetString(def), "best_compression") == 0)
+					state->level = Z_BEST_COMPRESSION;
+			}
+			else if (strcmp(def->defname, "dict") == 0)
+			{
+				char   *val,
+					   *tok;
+
+				val = pstrdup(defGetString(def));
+
+				/* Fill the zlib dictionary */
+				while ((tok = strtok(val, ZLIB_DICTIONARY_DELIM)) != NULL)
+				{
+					int len = strlen(tok);
+					memcpy((void *) (state->dict + state->dictlen), tok, len);
+					state->dictlen += len + 1;
+					Assert(state->dictlen <= ZLIB_MAX_DICTIONARY_LENGTH);
+
+					/* add space as dictionary delimiter */
+					state->dict[state->dictlen - 1] = ' ';
+					val = NULL;
+				}
+			}
+		}
+	}
+
+	return state;
+}
+
+static struct varlena *
+zlib_cmcompress(CompressionAmOptions *cmoptions, const struct varlena *value)
+{
+	int32			valsize,
+					len;
+	struct varlena *tmp = NULL;
+	z_streamp		zp;
+	int				res;
+	zlib_state	   *state = (zlib_state *) cmoptions->acstate;
+
+	zp = (z_streamp) palloc(sizeof(z_stream));
+	zp->zalloc = Z_NULL;
+	zp->zfree = Z_NULL;
+	zp->opaque = Z_NULL;
+
+	if (deflateInit(zp, state->level) != Z_OK)
+		elog(ERROR, "could not initialize compression library: %s", zp->msg);
+
+	if (state->dictlen > 0)
+	{
+		res = deflateSetDictionary(zp, state->dict, state->dictlen);
+		if (res != Z_OK)
+			elog(ERROR, "could not set dictionary for zlib: %s", zp->msg);
+	}
+
+	valsize = VARSIZE_ANY_EXHDR(DatumGetPointer(value));
+	tmp = (struct varlena *) palloc(valsize + VARHDRSZ_CUSTOM_COMPRESSED);
+	zp->next_in = (void *) VARDATA_ANY(value);
+	zp->avail_in = valsize;
+	zp->avail_out = valsize;
+	zp->next_out = (void *)((char *) tmp + VARHDRSZ_CUSTOM_COMPRESSED);
+
+	do {
+		res = deflate(zp, Z_FINISH);
+		if (res == Z_STREAM_ERROR)
+			elog(ERROR, "could not compress data: %s", zp->msg);
+	} while (zp->avail_in != 0);
+
+	Assert(res == Z_STREAM_END);
+
+	len = valsize - zp->avail_out;
+	if (deflateEnd(zp) != Z_OK)
+		elog(ERROR, "could not close compression stream: %s", zp->msg);
+	pfree(zp);
+
+	if (len > 0)
+	{
+		SET_VARSIZE_COMPRESSED(tmp, len + VARHDRSZ_CUSTOM_COMPRESSED);
+		return tmp;
+	}
+
+	pfree(tmp);
+#endif
+	return NULL;
+}
+
+static struct varlena *
+zlib_cmdecompress(CompressionAmOptions *cmoptions, const struct varlena *value)
+{
+	struct varlena *result;
+	z_streamp		zp;
+	int				res = Z_OK;
+	zlib_state	   *state = (zlib_state *) cmoptions->acstate;
+
+	zp = (z_streamp) palloc(sizeof(z_stream));
+	zp->zalloc = Z_NULL;
+	zp->zfree = Z_NULL;
+	zp->opaque = Z_NULL;
+
+	if (inflateInit(zp) != Z_OK)
+		elog(ERROR, "could not initialize compression library: %s", zp->msg);
+
+	Assert(VARATT_IS_CUSTOM_COMPRESSED(value));
+	zp->next_in = (void *) ((char *) value + VARHDRSZ_CUSTOM_COMPRESSED);
+	zp->avail_in = VARSIZE(value) - VARHDRSZ_CUSTOM_COMPRESSED;
+	zp->avail_out = VARRAWSIZE_4B_C(value);
+
+	result = (struct varlena *) palloc(zp->avail_out + VARHDRSZ);
+	SET_VARSIZE(result, zp->avail_out + VARHDRSZ);
+	zp->next_out = (void *) VARDATA(result);
+
+	while (zp->avail_in > 0)
+	{
+		res = inflate(zp, 0);
+		if (res == Z_NEED_DICT && state->dictlen > 0)
+		{
+			res = inflateSetDictionary(zp, state->dict, state->dictlen);
+			if (res != Z_OK)
+				elog(ERROR, "could not set dictionary for zlib");
+			continue;
+		}
+		if (!(res == Z_OK || res == Z_STREAM_END))
+			elog(ERROR, "could not uncompress data: %s", zp->msg);
+	}
+
+	if (inflateEnd(zp) != Z_OK)
+		elog(ERROR, "could not close compression library: %s", zp->msg);
+
+	pfree(zp);
+	return result;
+}
+
+Datum
+zlibhandler(PG_FUNCTION_ARGS)
+{
+#ifndef HAVE_LIBZ
+	ereport(ERROR,
+			(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+			 errmsg("not built with zlib support")));
+#else
+	CompressionAmRoutine *routine = makeNode(CompressionAmRoutine);
+
+	routine->cmcheck = zlib_cmcheck;
+	routine->cminitstate = zlib_cminitstate;
+	routine->cmcompress = zlib_cmcompress;
+	routine->cmdecompress = zlib_cmdecompress;
+
+	PG_RETURN_POINTER(routine);
+#endif
+}
diff --git a/src/include/catalog/pg_am.dat b/src/include/catalog/pg_am.dat
index 4bf1c49d11..16a098d633 100644
--- a/src/include/catalog/pg_am.dat
+++ b/src/include/catalog/pg_am.dat
@@ -36,5 +36,8 @@
 { oid => '4002', oid_symbol => 'PGLZ_COMPRESSION_AM_OID',
   descr => 'pglz compression access method',
   amname => 'pglz', amhandler => 'pglzhandler', amtype => 'c' },
+{ oid => '4011', oid_symbol => 'ZLIB_COMPRESSION_AM_OID',
+  descr => 'zlib compression access method',
+  amname => 'zlib', amhandler => 'zlibhandler', amtype => 'c' },
 
 ]
diff --git a/src/include/catalog/pg_attr_compression.dat b/src/include/catalog/pg_attr_compression.dat
index 4e72bde16c..a7216fe963 100644
--- a/src/include/catalog/pg_attr_compression.dat
+++ b/src/include/catalog/pg_attr_compression.dat
@@ -19,5 +19,6 @@
 [
 
 { acoid => '4002', acname => 'pglz' },
+{ acoid => '4011', acname => 'zlib' },
 
 ]
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 5ff8e886bd..2dce2c087d 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -914,6 +914,10 @@
   proname => 'pglzhandler', provolatile => 'v',
   prorettype => 'compression_am_handler', proargtypes => 'internal',
   prosrc => 'pglzhandler' },
+{ oid => '4010', descr => 'zlib compression access method handler',
+  proname => 'zlibhandler', provolatile => 'v',
+  prorettype => 'compression_am_handler', proargtypes => 'internal',
+  prosrc => 'zlibhandler' },
 
 { oid => '338', descr => 'validate an operator class',
   proname => 'amvalidate', provolatile => 'v', prorettype => 'bool',
-- 
2.21.0

