From cb57f7985774d87d16401a94e7801a8f09396cbf Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Wed, 10 Feb 2021 16:27:04 +0530
Subject: [PATCH v24 08/10] Create custom compression methods

Provide syntax to create custom compression methods.

Dilip Kumar based on the patches from Ildus Kurbangaliev.
Design input from Tomas Vondra and Robert Haas
Reviewed by Robert Haas, Tomas Vondra, Alexander Korotkov and Justin Pryzby

XXX: this is failing pg_dump tests
---
 doc/src/sgml/ref/alter_table.sgml             |  6 ++-
 doc/src/sgml/ref/create_access_method.sgml    | 12 +++--
 doc/src/sgml/ref/create_table.sgml            |  9 ++--
 src/backend/access/common/detoast.c           | 51 ++++++++++++++++---
 src/backend/access/common/toast_internals.c   | 19 +++++--
 src/backend/access/compression/compress_lz4.c | 21 ++++----
 .../access/compression/compress_pglz.c        | 20 ++++----
 src/backend/access/index/amapi.c              | 51 ++++++++++++++-----
 src/backend/commands/amcmds.c                 | 22 +++++++-
 src/backend/parser/gram.y                     |  1 +
 src/backend/utils/adt/pg_upgrade_support.c    | 10 ++++
 src/bin/pg_dump/pg_dump.c                     |  8 +++
 src/bin/psql/tab-complete.c                   |  6 +++
 src/include/access/amapi.h                    |  1 +
 src/include/access/compressamapi.h            | 45 ++++++++++++++--
 src/include/access/toast_internals.h          | 16 ++++++
 src/include/catalog/binary_upgrade.h          |  2 +
 src/include/catalog/pg_proc.dat               |  4 ++
 src/include/postgres.h                        |  8 +++
 src/test/regress/expected/compression.out     | 40 ++++++++++++++-
 src/test/regress/sql/compression.sql          | 10 ++++
 21 files changed, 304 insertions(+), 58 deletions(-)

diff --git a/doc/src/sgml/ref/alter_table.sgml b/doc/src/sgml/ref/alter_table.sgml
index c9f443a59c..49c43df1c1 100644
--- a/doc/src/sgml/ref/alter_table.sgml
+++ b/doc/src/sgml/ref/alter_table.sgml
@@ -391,8 +391,10 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
     </term>
     <listitem>
      <para>
-      This sets the compression method for a column.  The supported compression
-      methods are <literal>pglz</literal> and <literal>lz4</literal>.
+      This sets the compression method for a column.  The Compression method
+      could be created with <xref linkend="sql-create-access-method"/> or
+      it can be set to any available compression method.  The supported buit-in
+      compression methods are <literal>pglz</literal> and <literal>lz4</literal>.
       <literal>lz4</literal> is available only if <literal>--with-lz4</literal>
       was used when building <productname>PostgreSQL</productname>.
       The <literal>PRESERVE</literal> list contains a list of compression
diff --git a/doc/src/sgml/ref/create_access_method.sgml b/doc/src/sgml/ref/create_access_method.sgml
index dae43dbaed..c5ef8b738d 100644
--- a/doc/src/sgml/ref/create_access_method.sgml
+++ b/doc/src/sgml/ref/create_access_method.sgml
@@ -61,8 +61,8 @@ CREATE ACCESS METHOD <replaceable class="parameter">name</replaceable>
     <listitem>
      <para>
       This clause specifies the type of access method to define.
-      Only <literal>TABLE</literal> and <literal>INDEX</literal>
-      are supported at present.
+      Currently, <literal>TABLE</literal>, <literal>INDEX</literal> and
+      <literal>COMPRESSION</literal> access methods are supported.
      </para>
     </listitem>
    </varlistentry>
@@ -77,11 +77,13 @@ CREATE ACCESS METHOD <replaceable class="parameter">name</replaceable>
       declared to take a single argument of type <type>internal</type>,
       and its return type depends on the type of access method;
       for <literal>TABLE</literal> access methods, it must
-      be <type>table_am_handler</type> and for <literal>INDEX</literal>
-      access methods, it must be <type>index_am_handler</type>.
+      be <type>table_am_handler</type>, for <literal>INDEX</literal>
+      access methods, it must be <type>index_am_handler</type> and
+      for <literal>COMPRESSION</literal> access methods, it must be
+      <type>compression_am_handler</type>.
       The C-level API that the handler function must implement varies
       depending on the type of access method. The table access method API
-      is described in <xref linkend="tableam"/> and the index access method
+      is described in <xref linkend="tableam"/>, the index access method
       API is described in <xref linkend="indexam"/>.
      </para>
     </listitem>
diff --git a/doc/src/sgml/ref/create_table.sgml b/doc/src/sgml/ref/create_table.sgml
index a4b297340f..196352ef42 100644
--- a/doc/src/sgml/ref/create_table.sgml
+++ b/doc/src/sgml/ref/create_table.sgml
@@ -1004,11 +1004,14 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
       column storage types.) Setting this property for a partitioned table
       has no direct effect, because such tables have no storage of their own,
       but the configured value is inherited by newly-created partitions.
-      The supported compression methods are <literal>pglz</literal> and
+      The supported built-in compression methods are <literal>pglz</literal> and
       <literal>lz4</literal>.  <literal>lz4</literal> is available only if
       <literal>--with-lz4</literal> was used when building
-      <productname>PostgreSQL</productname>. The default
-      is <literal>pglz</literal>.
+      <productname>PostgreSQL</productname>.
+      Compression methods can be created with <xref
+      linkend="sql-create-access-method"/> or it can be set to any available
+      compression method.
+      The default is <literal>pglz</literal>.
      </para>
     </listitem>
    </varlistentry>
diff --git a/src/backend/access/common/detoast.c b/src/backend/access/common/detoast.c
index 95d5b1c12a..fd7c42c95e 100644
--- a/src/backend/access/common/detoast.c
+++ b/src/backend/access/common/detoast.c
@@ -462,10 +462,17 @@ toast_fetch_datum_slice(struct varlena *attr, int32 sliceoffset,
  *
  * Returns the Oid of the compression method stored in the compressed data.  If
  * the varlena is not compressed then returns InvalidOid.
+ *
+ * For built-in methods, we only store the built-in compression method id in
+ * first 2-bits of the rawsize and that is directly mapped to the compression
+ * method Oid.  And, for the custom compression method we store the Oid of the
+ * compression method in the custom compression header.
  */
 Oid
 toast_get_compression_oid(struct varlena *attr)
 {
+	CompressionId cmid;
+
 	if (VARATT_IS_EXTERNAL_ONDISK(attr))
 	{
 		struct varatt_external toast_pointer;
@@ -485,7 +492,21 @@ toast_get_compression_oid(struct varlena *attr)
 	else if (!VARATT_IS_COMPRESSED(attr))
 		return InvalidOid;
 
-	return CompressionIdToOid(TOAST_COMPRESS_METHOD(attr));
+	/*
+	 * If it is custom compression id then get the Oid from the custom
+	 * compression header otherwise, directly translate the built-in
+	 * compression id to compression method Oid.
+	 */
+	cmid = TOAST_COMPRESS_METHOD(attr);
+	if (IsCustomCompression(cmid))
+	{
+		toast_compress_header_custom	*hdr;
+
+		hdr = (toast_compress_header_custom *) attr;
+		return hdr->cmoid;
+	}
+	else
+		return CompressionIdToOid(cmid);
 }
 
 /* ----------
@@ -494,7 +515,7 @@ toast_get_compression_oid(struct varlena *attr)
  * helper function for toast_decompress_datum and toast_decompress_datum_slice
  */
 static inline const CompressionAmRoutine *
-toast_get_compression_handler(struct varlena *attr)
+toast_get_compression_handler(struct varlena *attr, int32 *header_size)
 {
 	const CompressionAmRoutine *cmroutine;
 	CompressionId cmid;
@@ -507,10 +528,21 @@ toast_get_compression_handler(struct varlena *attr)
 	{
 		case PGLZ_COMPRESSION_ID:
 			cmroutine = &pglz_compress_methods;
+			*header_size = TOAST_COMPRESS_HDRSZ;
 			break;
 		case LZ4_COMPRESSION_ID:
 			cmroutine = &lz4_compress_methods;
+			*header_size = TOAST_COMPRESS_HDRSZ;
 			break;
+		case CUSTOM_COMPRESSION_ID:
+		{
+			toast_compress_header_custom	*hdr;
+
+			hdr = (toast_compress_header_custom *) attr;
+			cmroutine = GetCompressionAmRoutineByAmId(hdr->cmoid);
+			*header_size = TOAST_CUSTOM_COMPRESS_HDRSZ;
+			break;
+		}
 		default:
 			elog(ERROR, "invalid compression method id %d", cmid);
 	}
@@ -526,9 +558,11 @@ toast_get_compression_handler(struct varlena *attr)
 static struct varlena *
 toast_decompress_datum(struct varlena *attr)
 {
-	const CompressionAmRoutine *cmroutine = toast_get_compression_handler(attr);
+	int32	header_size;
+	const CompressionAmRoutine *cmroutine =
+				toast_get_compression_handler(attr, &header_size);
 
-	return cmroutine->datum_decompress(attr);
+	return cmroutine->datum_decompress(attr, header_size);
 }
 
 
@@ -542,16 +576,19 @@ toast_decompress_datum(struct varlena *attr)
 static struct varlena *
 toast_decompress_datum_slice(struct varlena *attr, int32 slicelength)
 {
-	const CompressionAmRoutine *cmroutine = toast_get_compression_handler(attr);
+	int32	header_size;
+	const CompressionAmRoutine *cmroutine =
+				toast_get_compression_handler(attr, &header_size);
 
 	/*
 	 * If the handler supports the slice decompression then decompress the
 	 * slice otherwise decompress complete data.
 	 */
 	if (cmroutine->datum_decompress_slice)
-		return cmroutine->datum_decompress_slice(attr, slicelength);
+		return cmroutine->datum_decompress_slice(attr, header_size,
+												 slicelength);
 	else
-		return cmroutine->datum_decompress(attr);
+		return cmroutine->datum_decompress(attr, header_size);
 }
 
 /* ----------
diff --git a/src/backend/access/common/toast_internals.c b/src/backend/access/common/toast_internals.c
index b04c5a5eb8..a3539065d3 100644
--- a/src/backend/access/common/toast_internals.c
+++ b/src/backend/access/common/toast_internals.c
@@ -48,6 +48,7 @@ toast_compress_datum(Datum value, Oid cmoid)
 {
 	struct varlena *tmp = NULL;
 	int32		valsize;
+	bool		isCustomCompression = false;
 	const CompressionAmRoutine *cmroutine = NULL;
 
 	Assert(!VARATT_IS_EXTERNAL(DatumGetPointer(value)));
@@ -65,11 +66,16 @@ toast_compress_datum(Datum value, Oid cmoid)
 			cmroutine = &lz4_compress_methods;
 			break;
 		default:
-			elog(ERROR, "Invalid compression method oid %u", cmoid);
+			isCustomCompression = true;
+			cmroutine = GetCompressionAmRoutineByAmId(cmoid);
+			break;
 	}
 
 	/* Call the actual compression function */
-	tmp = cmroutine->datum_compress((const struct varlena *) value);
+	tmp = cmroutine->datum_compress((const struct varlena *)value,
+									isCustomCompression ?
+									TOAST_CUSTOM_COMPRESS_HDRSZ :
+									TOAST_COMPRESS_HDRSZ);
 	if (!tmp)
 		return PointerGetDatum(NULL);
 
@@ -88,7 +94,14 @@ toast_compress_datum(Datum value, Oid cmoid)
 	if (VARSIZE(tmp) < valsize - 2)
 	{
 		/* successful compression */
-		TOAST_COMPRESS_SET_SIZE_AND_METHOD(tmp, valsize, CompressionOidToId(cmoid));
+		TOAST_COMPRESS_SET_SIZE_AND_METHOD(tmp, valsize, isCustomCompression ?
+										   CUSTOM_COMPRESSION_ID :
+										   CompressionOidToId(cmoid));
+
+		/* For custom compression, set the oid of the compression method */
+		if (isCustomCompression)
+			TOAST_COMPRESS_SET_CMOID(tmp, cmoid);
+
 		return PointerGetDatum(tmp);
 	}
 	else
diff --git a/src/backend/access/compression/compress_lz4.c b/src/backend/access/compression/compress_lz4.c
index 3079eff7eb..2a3c162836 100644
--- a/src/backend/access/compression/compress_lz4.c
+++ b/src/backend/access/compression/compress_lz4.c
@@ -27,7 +27,7 @@
  * compressed varlena, or NULL if compression fails.
  */
 static struct varlena *
-lz4_cmcompress(const struct varlena *value)
+lz4_cmcompress(const struct varlena *value, int32 header_size)
 {
 #ifndef HAVE_LIBLZ4
 	ereport(ERROR,
@@ -46,10 +46,10 @@ lz4_cmcompress(const struct varlena *value)
 	 * that will be needed for varlena overhead, and allocate that amount.
 	 */
 	max_size = LZ4_compressBound(valsize);
-	tmp = (struct varlena *) palloc(max_size + VARHDRSZ_COMPRESS);
+	tmp = (struct varlena *) palloc(max_size + header_size);
 
 	len = LZ4_compress_default(VARDATA_ANY(value),
-							   (char *) tmp + VARHDRSZ_COMPRESS,
+							   (char *) tmp + header_size,
 							   valsize, max_size);
 	if (len <= 0)
 		elog(ERROR, "could not compress data with lz4");
@@ -61,7 +61,7 @@ lz4_cmcompress(const struct varlena *value)
 		return NULL;
 	}
 
-	SET_VARSIZE_COMPRESSED(tmp, len + VARHDRSZ_COMPRESS);
+	SET_VARSIZE_COMPRESSED(tmp, len + header_size);
 
 	return tmp;
 #endif
@@ -73,7 +73,7 @@ lz4_cmcompress(const struct varlena *value)
  * Returns the decompressed varlena.
  */
 static struct varlena *
-lz4_cmdecompress(const struct varlena *value)
+lz4_cmdecompress(const struct varlena *value, int32 header_size)
 {
 #ifndef HAVE_LIBLZ4
 	ereport(ERROR,
@@ -87,9 +87,9 @@ lz4_cmdecompress(const struct varlena *value)
 	result = (struct varlena *) palloc(VARRAWSIZE_4B_C(value) + VARHDRSZ);
 
 	/* decompress the data */
-	rawsize = LZ4_decompress_safe((char *) value + VARHDRSZ_COMPRESS,
+	rawsize = LZ4_decompress_safe((char *) value + header_size,
 								  VARDATA(result),
-								  VARSIZE(value) - VARHDRSZ_COMPRESS,
+								  VARSIZE(value) - header_size,
 								  VARRAWSIZE_4B_C(value));
 	if (rawsize < 0)
 		ereport(ERROR,
@@ -109,7 +109,8 @@ lz4_cmdecompress(const struct varlena *value)
  * Decompresses part of the data. Returns the decompressed varlena.
  */
 static struct varlena *
-lz4_cmdecompress_slice(const struct varlena *value, int32 slicelength)
+lz4_cmdecompress_slice(const struct varlena *value, int32 header_size,
+					  int32 slicelength)
 {
 #ifndef HAVE_LIBLZ4
 	ereport(ERROR,
@@ -123,9 +124,9 @@ lz4_cmdecompress_slice(const struct varlena *value, int32 slicelength)
 	result = (struct varlena *) palloc(VARRAWSIZE_4B_C(value) + VARHDRSZ);
 
 	/* decompress partial data using lz4 routine */
-	rawsize = LZ4_decompress_safe_partial((char *) value + VARHDRSZ_COMPRESS,
+	rawsize = LZ4_decompress_safe_partial((char *) value + header_size,
 										  VARDATA(result),
-										  VARSIZE(value) - VARHDRSZ_COMPRESS,
+										  VARSIZE(value) - header_size,
 										  slicelength,
 										  VARRAWSIZE_4B_C(value));
 	if (rawsize < 0)
diff --git a/src/backend/access/compression/compress_pglz.c b/src/backend/access/compression/compress_pglz.c
index 8a4bf427cf..7f6e7429fe 100644
--- a/src/backend/access/compression/compress_pglz.c
+++ b/src/backend/access/compression/compress_pglz.c
@@ -26,7 +26,7 @@
  * compressed varlena, or NULL if compression fails.
  */
 static struct varlena *
-pglz_cmcompress(const struct varlena *value)
+pglz_cmcompress(const struct varlena *value, int32 header_size)
 {
 	int32		valsize,
 				len;
@@ -47,11 +47,11 @@ pglz_cmcompress(const struct varlena *value)
 	 * and allocate the memory for holding the compressed data and the header.
 	 */
 	tmp = (struct varlena *) palloc(PGLZ_MAX_OUTPUT(valsize) +
-									VARHDRSZ_COMPRESS);
+									header_size);
 
 	len = pglz_compress(VARDATA_ANY(value),
 						valsize,
-						(char *) tmp + VARHDRSZ_COMPRESS,
+						(char *) tmp + header_size,
 						NULL);
 	if (len < 0)
 	{
@@ -59,7 +59,7 @@ pglz_cmcompress(const struct varlena *value)
 		return NULL;
 	}
 
-	SET_VARSIZE_COMPRESSED(tmp, len + VARHDRSZ_COMPRESS);
+	SET_VARSIZE_COMPRESSED(tmp, len + header_size);
 
 	return tmp;
 }
@@ -70,15 +70,15 @@ pglz_cmcompress(const struct varlena *value)
  * Returns the decompressed varlena.
  */
 static struct varlena *
-pglz_cmdecompress(const struct varlena *value)
+pglz_cmdecompress(const struct varlena *value, int32 header_size)
 {
 	struct varlena *result;
 	int32		rawsize;
 
 	result = (struct varlena *) palloc(VARRAWSIZE_4B_C(value) + VARHDRSZ);
 
-	rawsize = pglz_decompress((char *) value + VARHDRSZ_COMPRESS,
-							  VARSIZE(value) - VARHDRSZ_COMPRESS,
+	rawsize = pglz_decompress((char *) value + header_size,
+							  VARSIZE(value) - header_size,
 							  VARDATA(result),
 							  VARRAWSIZE_4B_C(value), true);
 	if (rawsize < 0)
@@ -97,7 +97,7 @@ pglz_cmdecompress(const struct varlena *value)
  * Decompresses part of the data. Returns the decompressed varlena.
  */
 static struct varlena *
-pglz_cmdecompress_slice(const struct varlena *value,
+pglz_cmdecompress_slice(const struct varlena *value, int32 header_size,
 						int32 slicelength)
 {
 	struct varlena *result;
@@ -105,8 +105,8 @@ pglz_cmdecompress_slice(const struct varlena *value,
 
 	result = (struct varlena *) palloc(slicelength + VARHDRSZ);
 
-	rawsize = pglz_decompress((char *) value + VARHDRSZ_COMPRESS,
-							  VARSIZE(value) - VARHDRSZ_COMPRESS,
+	rawsize = pglz_decompress((char *) value + header_size,
+							  VARSIZE(value) - header_size,
 							  VARDATA(result),
 							  slicelength, false);
 	if (rawsize < 0)
diff --git a/src/backend/access/index/amapi.c b/src/backend/access/index/amapi.c
index d30bc43514..16f8fab359 100644
--- a/src/backend/access/index/amapi.c
+++ b/src/backend/access/index/amapi.c
@@ -14,6 +14,7 @@
 #include "postgres.h"
 
 #include "access/amapi.h"
+#include "access/compressamapi.h"
 #include "access/htup_details.h"
 #include "catalog/pg_am.h"
 #include "catalog/pg_opclass.h"
@@ -46,14 +47,14 @@ GetIndexAmRoutine(Oid amhandler)
 }
 
 /*
- * GetIndexAmRoutineByAmId - look up the handler of the index access method
- * with the given OID, and get its IndexAmRoutine struct.
+ * GetAmHandlerByAmId - look up the handler of the index/compression access
+ * method with the given OID, and get its handler function.
  *
- * If the given OID isn't a valid index access method, returns NULL if
- * noerror is true, else throws error.
+ * If the given OID isn't a valid index/compression access method, returns
+ * Invalid Oid if noerror is true, else throws error.
  */
-IndexAmRoutine *
-GetIndexAmRoutineByAmId(Oid amoid, bool noerror)
+regproc
+GetAmHandlerByAmId(Oid amoid, char amtype, bool noerror)
 {
 	HeapTuple	tuple;
 	Form_pg_am	amform;
@@ -64,24 +65,25 @@ GetIndexAmRoutineByAmId(Oid amoid, bool noerror)
 	if (!HeapTupleIsValid(tuple))
 	{
 		if (noerror)
-			return NULL;
+			return InvalidOid;
 		elog(ERROR, "cache lookup failed for access method %u",
 			 amoid);
 	}
 	amform = (Form_pg_am) GETSTRUCT(tuple);
 
 	/* Check if it's an index access method as opposed to some other AM */
-	if (amform->amtype != AMTYPE_INDEX)
+	if (amform->amtype != amtype)
 	{
 		if (noerror)
 		{
 			ReleaseSysCache(tuple);
-			return NULL;
+			return InvalidOid;
 		}
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("access method \"%s\" is not of type %s",
-						NameStr(amform->amname), "INDEX")));
+						NameStr(amform->amname), (amtype == AMTYPE_INDEX ?
+						"INDEX" : "COMPRESSION"))));
 	}
 
 	amhandler = amform->amhandler;
@@ -92,16 +94,41 @@ GetIndexAmRoutineByAmId(Oid amoid, bool noerror)
 		if (noerror)
 		{
 			ReleaseSysCache(tuple);
-			return NULL;
+			return InvalidOid;
 		}
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-				 errmsg("index access method \"%s\" does not have a handler",
+				 errmsg("access method \"%s\" does not have a handler",
 						NameStr(amform->amname))));
 	}
 
 	ReleaseSysCache(tuple);
 
+	return amhandler;
+}
+
+/*
+ * GetIndexAmRoutineByAmId - look up the handler of the index access method
+ * with the given OID, and get its IndexAmRoutine struct.
+ *
+ * If the given OID isn't a valid index access method, returns NULL if
+ * noerror is true, else throws error.
+ */
+IndexAmRoutine *
+GetIndexAmRoutineByAmId(Oid amoid, bool noerror)
+{
+	regproc		amhandler;
+
+	/* Get handler function OID for the access method */
+	amhandler = GetAmHandlerByAmId(amoid, AMTYPE_INDEX, noerror);
+
+	/* Complain if handler OID is invalid */
+	if (!OidIsValid(amhandler))
+	{
+		Assert(noerror);
+		return NULL;
+	}
+
 	/* And finally, call the handler function to get the API struct. */
 	return GetIndexAmRoutine(amhandler);
 }
diff --git a/src/backend/commands/amcmds.c b/src/backend/commands/amcmds.c
index 1682afd2a4..0ac86eee02 100644
--- a/src/backend/commands/amcmds.c
+++ b/src/backend/commands/amcmds.c
@@ -45,6 +45,9 @@ static Oid	default_toast_compression_oid = InvalidOid;
 
 static void AccessMethodCallback(Datum arg, int cacheid, uint32 hashvalue);
 
+/* Set by pg_upgrade_support functions */
+Oid		binary_upgrade_next_pg_am_oid = InvalidOid;
+
 /*
  * CreateAccessMethod
  *		Registers a new access method.
@@ -93,7 +96,19 @@ CreateAccessMethod(CreateAmStmt *stmt)
 	memset(values, 0, sizeof(values));
 	memset(nulls, false, sizeof(nulls));
 
-	amoid = GetNewOidWithIndex(rel, AmOidIndexId, Anum_pg_am_oid);
+	if (IsBinaryUpgrade  && OidIsValid(binary_upgrade_next_pg_am_oid))
+	{
+		/* amoid should be found in some cases */
+		if (binary_upgrade_next_pg_am_oid < FirstNormalObjectId &&
+			(!OidIsValid(amoid) || binary_upgrade_next_pg_am_oid != amoid))
+			elog(ERROR, "could not link to built-in attribute compression");
+
+		amoid = binary_upgrade_next_pg_am_oid;
+		binary_upgrade_next_pg_am_oid = InvalidOid;
+	}
+	else
+		amoid = GetNewOidWithIndex(rel, AmOidIndexId, Anum_pg_am_oid);
+
 	values[Anum_pg_am_oid - 1] = ObjectIdGetDatum(amoid);
 	values[Anum_pg_am_amname - 1] =
 		DirectFunctionCall1(namein, CStringGetDatum(stmt->amname));
@@ -237,6 +252,8 @@ get_am_type_string(char amtype)
 			return "INDEX";
 		case AMTYPE_TABLE:
 			return "TABLE";
+		case AMTYPE_COMPRESSION:
+			return "COMPRESSION";
 		default:
 			/* shouldn't happen */
 			elog(ERROR, "invalid access method type '%c'", amtype);
@@ -274,6 +291,9 @@ lookup_am_handler_func(List *handler_name, char amtype)
 		case AMTYPE_TABLE:
 			expectedType = TABLE_AM_HANDLEROID;
 			break;
+		case AMTYPE_COMPRESSION:
+			expectedType = COMPRESSION_AM_HANDLEROID;
+			break;
 		default:
 			elog(ERROR, "unrecognized access method type \"%c\"", amtype);
 	}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 9eb2b04d58..b22ce818cd 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -5305,6 +5305,7 @@ CreateAmStmt: CREATE ACCESS METHOD name TYPE_P am_type HANDLER handler_name
 am_type:
 			INDEX			{ $$ = AMTYPE_INDEX; }
 		|	TABLE			{ $$ = AMTYPE_TABLE; }
+		|	COMPRESSION		{ $$ = AMTYPE_COMPRESSION; }
 		;
 
 /*****************************************************************************
diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c
index a575c95079..f026104c01 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -128,6 +128,16 @@ binary_upgrade_set_next_pg_authid_oid(PG_FUNCTION_ARGS)
 	PG_RETURN_VOID();
 }
 
+Datum
+binary_upgrade_set_next_pg_am_oid(PG_FUNCTION_ARGS)
+{
+	Oid			amoid = PG_GETARG_OID(0);
+
+	CHECK_IS_BINARY_UPGRADE;
+	binary_upgrade_next_pg_am_oid = amoid;
+	PG_RETURN_VOID();
+}
+
 Datum
 binary_upgrade_create_empty_extension(PG_FUNCTION_ARGS)
 {
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 7bf345a4ac..88fa7e1ed3 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -13136,6 +13136,11 @@ dumpAccessMethod(Archive *fout, const AccessMethodInfo *aminfo)
 
 	qamname = pg_strdup(fmtId(aminfo->dobj.name));
 
+	if (dopt->binary_upgrade && aminfo->amtype == AMTYPE_COMPRESSION)
+		appendPQExpBuffer(q,
+						  "SELECT pg_catalog.binary_upgrade_set_next_pg_am_oid('%u'::pg_catalog.oid);\n",
+						  aminfo->dobj.catId.oid);
+
 	appendPQExpBuffer(q, "CREATE ACCESS METHOD %s ", qamname);
 
 	switch (aminfo->amtype)
@@ -13146,6 +13151,9 @@ dumpAccessMethod(Archive *fout, const AccessMethodInfo *aminfo)
 		case AMTYPE_TABLE:
 			appendPQExpBufferStr(q, "TYPE TABLE ");
 			break;
+		case AMTYPE_COMPRESSION:
+			appendPQExpBufferStr(q, "TYPE COMPRESSION ");
+			break;
 		default:
 			pg_log_warning("invalid type \"%c\" of access method \"%s\"",
 						   aminfo->amtype, qamname);
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 869fd3676d..8ca2b832a6 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -883,6 +883,12 @@ static const SchemaQuery Query_for_list_of_statistics = {
 "  WHERE substring(pg_catalog.quote_ident(amname),1,%d)='%s' AND "\
 "   amtype=" CppAsString2(AMTYPE_TABLE)
 
+#define Query_for_list_of_compression_access_methods \
+" SELECT pg_catalog.quote_ident(amname) "\
+"   FROM pg_catalog.pg_am "\
+"  WHERE substring(pg_catalog.quote_ident(amname),1,%d)='%s' AND "\
+"   amtype=" CppAsString2(AMTYPE_COMPRESSION)
+
 /* the silly-looking length condition is just to eat up the current word */
 #define Query_for_list_of_arguments \
 "SELECT pg_catalog.oidvectortypes(proargtypes)||')' "\
diff --git a/src/include/access/amapi.h b/src/include/access/amapi.h
index 1513cafcf4..bc1e2437f0 100644
--- a/src/include/access/amapi.h
+++ b/src/include/access/amapi.h
@@ -285,6 +285,7 @@ typedef struct IndexAmRoutine
 
 /* Functions in access/index/amapi.c */
 extern IndexAmRoutine *GetIndexAmRoutine(Oid amhandler);
+extern regproc GetAmHandlerByAmId(Oid amoid, char amtype, bool noerror);
 extern IndexAmRoutine *GetIndexAmRoutineByAmId(Oid amoid, bool noerror);
 
 void InitializeAccessMethods(void);
diff --git a/src/include/access/compressamapi.h b/src/include/access/compressamapi.h
index d75a8e9df2..bac7b47cfc 100644
--- a/src/include/access/compressamapi.h
+++ b/src/include/access/compressamapi.h
@@ -15,10 +15,13 @@
 
 #include "postgres.h"
 
+#include "fmgr.h"
+#include "access/amapi.h"
 #include "catalog/pg_am_d.h"
 #include "nodes/nodes.h"
 #include "utils/guc.h"
 
+
 /*
  * Built-in compression method-id.  The toast compression header will store
  * this in the first 2 bits of the raw length.  These built-in compression
@@ -27,7 +30,9 @@
 typedef enum CompressionId
 {
 	PGLZ_COMPRESSION_ID = 0,
-	LZ4_COMPRESSION_ID = 1
+	LZ4_COMPRESSION_ID = 1,
+	/* one free slot for the future built-in method */
+	CUSTOM_COMPRESSION_ID = 3
 } CompressionId;
 
 /* Default compression method if not specified. */
@@ -41,13 +46,19 @@ extern bool check_default_toast_compression(char **newval, void **extra, GucSour
 
 extern Oid GetDefaultToastCompression(void);
 
+#define IsCustomCompression(cmid)     ((cmid) == CUSTOM_COMPRESSION_ID)
+
 #define IsStorageCompressible(storage) ((storage) != TYPSTORAGE_PLAIN && \
 										(storage) != TYPSTORAGE_EXTERNAL)
 /* compression handler routines */
-typedef struct varlena *(*cmcompress_function) (const struct varlena *value);
-typedef struct varlena *(*cmdecompress_function) (const struct varlena *value);
+typedef struct varlena *(*cmcompress_function) (const struct varlena *value,
+												int32 toast_header_size);
+typedef struct varlena *(*cmdecompress_function) (const struct varlena *value,
+												  int32 toast_header_size);
 typedef struct varlena *(*cmdecompress_slice_function)
-			(const struct varlena *value, int32 slicelength);
+												(const struct varlena *value,
+												 int32 toast_header_size,
+												 int32 slicelength);
 
 /*
  * API struct for a compression AM.
@@ -106,4 +117,30 @@ CompressionIdToOid(CompressionId cmid)
 	}
 }
 
+/*
+ * GetCompressionAmRoutineByAmId - look up the handler of the compression access
+ * method with the given OID, and get its CompressionAmRoutine struct.
+ */
+static inline CompressionAmRoutine *
+GetCompressionAmRoutineByAmId(Oid amoid)
+{
+	regproc		amhandler;
+	Datum		datum;
+	CompressionAmRoutine *routine;
+
+	/* Get handler function OID for the access method */
+	amhandler = GetAmHandlerByAmId(amoid, AMTYPE_COMPRESSION, false);
+	Assert(OidIsValid(amhandler));
+
+	/* And finally, call the handler function to get the API struct */
+	datum = OidFunctionCall0(amhandler);
+	routine = (CompressionAmRoutine *) DatumGetPointer(datum);
+
+	if (routine == NULL || !IsA(routine, CompressionAmRoutine))
+		elog(ERROR, "compression access method handler function %u did not return an CompressionAmRoutine struct",
+			 amhandler);
+
+	return routine;
+}
+
 #endif							/* COMPRESSAMAPI_H */
diff --git a/src/include/access/toast_internals.h b/src/include/access/toast_internals.h
index 31ff91a09c..ac28f9ed55 100644
--- a/src/include/access/toast_internals.h
+++ b/src/include/access/toast_internals.h
@@ -27,10 +27,23 @@ typedef struct toast_compress_header
 								 * rawsize */
 } toast_compress_header;
 
+/*
+ * If the compression method were used, then data also contains
+ * Oid of compression options
+ */
+typedef struct toast_compress_header_custom
+{
+	int32		vl_len_;		/* varlena header (do not touch directly!) */
+	uint32		info;			/*  2 bits for compression method + rawsize */
+	Oid			cmoid;			/* Oid from pg_am */
+} toast_compress_header_custom;
+
 /*
  * Utilities for manipulation of header information for compressed
  * toast entries.
  */
+#define TOAST_COMPRESS_HDRSZ		((int32) sizeof(toast_compress_header))
+#define TOAST_CUSTOM_COMPRESS_HDRSZ ((int32)sizeof(toast_compress_header_custom))
 #define TOAST_COMPRESS_METHOD(ptr)  (((toast_compress_header *) (ptr))->info >> VARLENA_RAWSIZE_BITS)
 #define TOAST_COMPRESS_SET_SIZE_AND_METHOD(ptr, len, cm_method) \
 	do { \
@@ -38,6 +51,9 @@ typedef struct toast_compress_header
 		((toast_compress_header *) (ptr))->info = ((len) | (cm_method) << VARLENA_RAWSIZE_BITS); \
 	} while (0)
 
+#define TOAST_COMPRESS_SET_CMOID(ptr, oid) \
+	(((toast_compress_header_custom *)(ptr))->cmoid = (oid))
+
 extern Datum toast_compress_datum(Datum value, Oid cmoid);
 extern Oid	toast_get_valid_index(Oid toastoid, LOCKMODE lock);
 
diff --git a/src/include/catalog/binary_upgrade.h b/src/include/catalog/binary_upgrade.h
index f6e82e7ac5..9d65bae238 100644
--- a/src/include/catalog/binary_upgrade.h
+++ b/src/include/catalog/binary_upgrade.h
@@ -26,6 +26,8 @@ extern PGDLLIMPORT Oid binary_upgrade_next_toast_pg_class_oid;
 extern PGDLLIMPORT Oid binary_upgrade_next_pg_enum_oid;
 extern PGDLLIMPORT Oid binary_upgrade_next_pg_authid_oid;
 
+extern PGDLLIMPORT Oid binary_upgrade_next_pg_am_oid;
+
 extern PGDLLIMPORT bool binary_upgrade_record_init_privs;
 
 #endif							/* BINARY_UPGRADE_H */
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 4a13844ce6..64c9c6e538 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -10777,6 +10777,10 @@
   proname => 'binary_upgrade_set_next_pg_authid_oid', provolatile => 'v',
   proparallel => 'r', prorettype => 'void', proargtypes => 'oid',
   prosrc => 'binary_upgrade_set_next_pg_authid_oid' },
+{ oid => '2137', descr => 'for use by pg_upgrade',
+  proname => 'binary_upgrade_set_next_pg_am_oid', provolatile => 'v',
+  proparallel => 'r', prorettype => 'void', proargtypes => 'oid',
+  prosrc => 'binary_upgrade_set_next_pg_am_oid' },
 { oid => '3591', descr => 'for use by pg_upgrade',
   proname => 'binary_upgrade_create_empty_extension', proisstrict => 'f',
   provolatile => 'v', proparallel => 'u', prorettype => 'void',
diff --git a/src/include/postgres.h b/src/include/postgres.h
index 667927fd7c..ede3b11ef3 100644
--- a/src/include/postgres.h
+++ b/src/include/postgres.h
@@ -149,6 +149,14 @@ typedef union
 								 * flags */
 		char		va_data[FLEXIBLE_ARRAY_MEMBER]; /* Compressed data */
 	}			va_compressed;
+	struct						/* Compressed-in-line format */
+	{
+		uint32		va_header;
+		uint32		va_info;	/* Original data size (excludes header) and
+								 * flags */
+		Oid			va_cmid;	/* Oid of compression method */
+		char		va_data[FLEXIBLE_ARRAY_MEMBER]; /* Compressed data */
+	}			va_custom_compressed;
 } varattrib_4b;
 
 typedef struct
diff --git a/src/test/regress/expected/compression.out b/src/test/regress/expected/compression.out
index 3ed33b6534..e52e272d39 100644
--- a/src/test/regress/expected/compression.out
+++ b/src/test/regress/expected/compression.out
@@ -260,13 +260,51 @@ SELECT pg_column_compression(f1) FROM cmdata;
  pglz
 (2 rows)
 
+-- create compression method
+CREATE ACCESS METHOD pglz2 TYPE COMPRESSION HANDLER pglzhandler;
+ALTER TABLE cmdata ALTER COLUMN f1 SET COMPRESSION pglz2 PRESERVE ALL;
+INSERT INTO cmdata VALUES (repeat('1234567890',1004));
+\d+ cmdata
+                                        Table "public.cmdata"
+ Column | Type | Collation | Nullable | Default | Storage  | Compression | Stats target | Description 
+--------+------+-----------+----------+---------+----------+-------------+--------------+-------------
+ f1     | text |           |          |         | extended | pglz2       |              | 
+Indexes:
+    "idx" btree (f1)
+
+SELECT pg_column_compression(f1) FROM cmdata;
+ pg_column_compression 
+-----------------------
+ lz4
+ pglz
+ pglz2
+(3 rows)
+
+ALTER TABLE cmdata ALTER COLUMN f1 SET COMPRESSION lz4 PRESERVE (pglz2);
+SELECT pg_column_compression(f1) FROM cmdata;
+ pg_column_compression 
+-----------------------
+ lz4
+ lz4
+ pglz2
+(3 rows)
+
+\d+ cmdata
+                                        Table "public.cmdata"
+ Column | Type | Collation | Nullable | Default | Storage  | Compression | Stats target | Description 
+--------+------+-----------+----------+---------+----------+-------------+--------------+-------------
+ f1     | text |           |          |         | extended | lz4         |              | 
+Indexes:
+    "idx" btree (f1)
+
 -- check data is ok
 SELECT length(f1) FROM cmdata;
  length 
 --------
   10000
   10040
-(2 rows)
+  10040
+(3 rows)
 
 SELECT length(f1) FROM cmdata1;
  length 
diff --git a/src/test/regress/sql/compression.sql b/src/test/regress/sql/compression.sql
index 5774b55d82..9d2e72b784 100644
--- a/src/test/regress/sql/compression.sql
+++ b/src/test/regress/sql/compression.sql
@@ -105,6 +105,16 @@ SELECT pg_column_compression(f1) FROM cmdata;
 ALTER TABLE cmdata ALTER COLUMN f1 SET COMPRESSION lz4 PRESERVE ALL;
 SELECT pg_column_compression(f1) FROM cmdata;
 
+-- create compression method
+CREATE ACCESS METHOD pglz2 TYPE COMPRESSION HANDLER pglzhandler;
+ALTER TABLE cmdata ALTER COLUMN f1 SET COMPRESSION pglz2 PRESERVE ALL;
+INSERT INTO cmdata VALUES (repeat('1234567890',1004));
+\d+ cmdata
+SELECT pg_column_compression(f1) FROM cmdata;
+ALTER TABLE cmdata ALTER COLUMN f1 SET COMPRESSION lz4 PRESERVE (pglz2);
+SELECT pg_column_compression(f1) FROM cmdata;
+\d+ cmdata
+
 -- check data is ok
 SELECT length(f1) FROM cmdata;
 SELECT length(f1) FROM cmdata1;
-- 
2.17.0

