From 6d16b64e2fad8018999c604731ed3cc239ab89e1 Mon Sep 17 00:00:00 2001
From: Evgeny Voropaev <evorop@gmail.com>
Date: Fri, 20 Mar 2026 17:30:22 +0800
Subject: [PATCH v08 3/3] Use Delta Frame of Reference (DFoR) to compress
 prune/freeze records.

A prune/freeze record contains four sequences of integers representing
frozen, redirected, unused, and dead tuples. Using DFoR algorithms, the
`unused` and `dead` sequences are now compressed. The `frozen`
and `redirected` sequences cannot be compressed because the order of
their elements is significant, and DFoR does not support unsorted
sequences yet. The theoretical compression ratio for dfor_u16 can reach
up to 16.

The new GUC wal_prune_dfor_compression controls (enables or
disables) compression for prune/freeze records.

An integral TAP test, 052_prune_dfor_compression.pl, has been
implemented. It demonstrates an average compression ratio of at least 5
when analyzing prune/freeze records in practice.

Author: Evgeny Voropaev <evgeny.voropaev@tantorlabs.com> <evorop@gmail.com>
Reviewed by: Andrey Borodin <x4mmm@yandex-team.ru>
---
 src/backend/access/heap/heapam_xlog.c         |  12 +-
 src/backend/access/heap/pruneheap.c           | 141 ++++++++-
 src/backend/access/rmgrdesc/Makefile          |   1 +
 .../access/rmgrdesc/heapam_xlog_dfor.c        | 109 +++++++
 src/backend/access/rmgrdesc/heapdesc.c        |  49 ++-
 src/backend/access/rmgrdesc/meson.build       |   1 +
 src/backend/utils/misc/guc_parameters.dat     |   7 +
 src/backend/utils/misc/guc_tables.c           |   1 +
 src/backend/utils/misc/postgresql.conf.sample |   2 +
 src/bin/pg_waldump/.gitignore                 |   9 +
 src/bin/pg_waldump/Makefile                   |  26 +-
 src/bin/pg_waldump/meson.build                |   1 +
 src/include/access/heapam_xlog.h              |   8 +-
 src/include/access/heapam_xlog_dfor.h         | 137 +++++++++
 src/test/dfor/Makefile                        |   2 +
 .../recovery/t/052_prune_dfor_compression.pl  | 283 ++++++++++++++++++
 16 files changed, 755 insertions(+), 34 deletions(-)
 create mode 100644 src/backend/access/rmgrdesc/heapam_xlog_dfor.c
 create mode 100644 src/include/access/heapam_xlog_dfor.h
 create mode 100644 src/test/recovery/t/052_prune_dfor_compression.pl

diff --git a/src/backend/access/heap/heapam_xlog.c b/src/backend/access/heap/heapam_xlog.c
index f3f419d3dc1..2ecbd277c02 100644
--- a/src/backend/access/heap/heapam_xlog.c
+++ b/src/backend/access/heap/heapam_xlog.c
@@ -16,6 +16,7 @@
 
 #include "access/bufmask.h"
 #include "access/heapam.h"
+#include "access/heapam_xlog_dfor.h"
 #include "access/visibilitymap.h"
 #include "access/xlog.h"
 #include "access/xlogutils.h"
@@ -105,11 +106,20 @@ heap_xlog_prune_freeze(XLogReaderState *record)
 		char	   *dataptr = XLogRecGetBlockData(record, 0, &datalen);
 		bool		do_prune;
 
+		/*
+		 * DFoR unpacking needs outer buffers for saving results and for
+		 * allocating containers used during decompression. 2 buffer parts are
+		 * intended for saving sequences of offsets of dead and unused tuples.
+		 * Additional three chunks are needed for internal needs of the
+		 * dfor_unpack function.
+		 */
+		uint8 dfor_buf[5 * DFOR_BUF_PART_SIZE];
+
 		heap_xlog_deserialize_prune_and_freeze(dataptr, xlrec.flags,
 											   &nplans, &plans, &frz_offsets,
 											   &nredirected, &redirected,
 											   &ndead, &nowdead,
-											   &nunused, &nowunused);
+											   &nunused, &nowunused, dfor_buf);
 
 		do_prune = nredirected > 0 || ndead > 0 || nunused > 0;
 
diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c
index 74c355be219..a7de9ea7b05 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -16,6 +16,7 @@
 
 #include "access/heapam.h"
 #include "access/heapam_xlog.h"
+#include "access/heapam_xlog_dfor.h"
 #include "access/htup_details.h"
 #include "access/multixact.h"
 #include "access/transam.h"
@@ -238,7 +239,6 @@ static bool heap_page_will_freeze(bool did_tuple_hint_fpi, bool do_prune, bool d
 static bool heap_page_will_set_vm(PruneState *prstate, PruneReason reason,
 								  bool do_prune, bool do_freeze);
 
-
 /*
  * Optionally prune and repair fragmentation in the specified page.
  *
@@ -2529,6 +2529,24 @@ heap_log_freeze_plan(HeapTupleFreeze *tuples, int ntuples,
 	return nplans;
 }
 
+/*
+ * Comparator for offsets.
+ */
+static int
+offset_cmp(const void *arg1, const void *arg2)
+{
+	const OffsetNumber *offset1 = arg1;
+	const OffsetNumber *offset2 = arg2;
+	return (*offset1 > *offset2) - (*offset1 < *offset2);
+}
+
+#define ST_SORT sort_offsets
+#define ST_ELEMENT_TYPE_VOID
+#define ST_COMPARE(a, b) offset_cmp(a, b)
+#define ST_SCOPE static
+#define ST_DEFINE
+#include "lib/sort_template.h"
+
 /*
  * Write an XLOG_HEAP2_PRUNE* WAL record
  *
@@ -2586,11 +2604,34 @@ log_heap_prune_and_freeze(Relation relation, Buffer buffer,
 	bool		do_set_vm = vmflags & VISIBILITYMAP_VALID_BITS;
 	bool		heap_fpi_allowed = true;
 
+	dfor_meta_t dead_meta = { 0 };
+	dfor_meta_t unused_meta = { 0 };
+
+	uint8 dead_meta_pack[MAX_PACKED_META_SIZE];
+	uint8 unused_meta_pack[MAX_PACKED_META_SIZE];
+
+	/*
+	 * Since this code is run in a critical section we can't use dynamic
+	 * allocation during DFoR packing, but we can use buffers allocated in the
+	 * stack. We need at maximum:
+	 * 1) 2 * DFOR_BUF_PART_SIZE
+	 *        - for 2 packed sequences: dead, unused
+	 * 2) 3 * DFOR_BUF_PART_SIZE
+	 * 		  - for internal needs of the dfor_pack function.
+	 *
+	 * Overall, 5 * DFOR_BUF_PART_SIZE
+	 */
+	uint8 dfor_buf[5 * DFOR_BUF_PART_SIZE];
+
 	Assert((vmflags & VISIBILITYMAP_VALID_BITS) == vmflags);
 
 	xlrec.flags = 0;
 	regbuf_flags_heap = REGBUF_STANDARD;
 
+	/* Heuristically estimated threshold for turning on DFoR compression */
+	if (wal_prune_dfor_compression && (ndead > 9 || nunused > 9))
+		xlrec.flags |= XLHP_DFOR_COMPRESSED;
+
 	/*
 	 * We can avoid an FPI of the heap page if the only modification we are
 	 * making to it is to set PD_ALL_VISIBLE and checksums/wal_log_hints are
@@ -2622,6 +2663,10 @@ log_heap_prune_and_freeze(Relation relation, Buffer buffer,
 	if (do_set_vm)
 		XLogRegisterBuffer(1, vmbuffer, 0);
 
+	/*
+	 * xlhp_freeze_plans is array of structures and is not a sequence
+	 * of integers, that is why we cannot use DFoR compression here.
+	 */
 	if (nfrozen > 0)
 	{
 		int			nplans;
@@ -2650,26 +2695,92 @@ log_heap_prune_and_freeze(Relation relation, Buffer buffer,
 		XLogRegisterBufData(0, redirected,
 							sizeof(OffsetNumber[2]) * nredirected);
 	}
-	if (ndead > 0)
+	if ((xlrec.flags & XLHP_DFOR_COMPRESSED) != 0)
 	{
-		xlrec.flags |= XLHP_HAS_DEAD_ITEMS;
+		int dead_pack_res = 0;
+		int unused_pack_res = 0;
 
-		dead_items.ntargets = ndead;
-		XLogRegisterBufData(0, &dead_items,
-							offsetof(xlhp_prune_items, data));
-		XLogRegisterBufData(0, dead,
-							sizeof(OffsetNumber) * ndead);
+		/*
+		 * Dead tuple offsets are subject to be packed with DFoR.
+		 * After that we have:
+		 * 		dead_meta.pack = dfor_buf + DFOR_BUF_PART_SIZE;
+		 */
+		if (ndead > 0)
+		{
+			sort_offsets(dead, ndead, sizeof(OffsetNumber));
+			dead_pack_res = dfor_u16_pack(ndead, dead, DFOR_EXC_USE, &dead_meta,
+										  4 * DFOR_BUF_PART_SIZE, dfor_buf);
+		}
+
+		/*
+		 * Unused tuple offsets are subject to be packed with DFoR.
+		 * After that we have:
+		 * 		unused_meta.pack = dfor_buf + 2 * DFOR_BUF_PART_SIZE;
+		 */
+		if (nunused > 0)
+		{
+			sort_offsets(unused, nunused, sizeof(OffsetNumber));
+			unused_pack_res = dfor_u16_pack(nunused, unused, DFOR_EXC_USE,
+											&unused_meta,
+											4 * DFOR_BUF_PART_SIZE,
+											dfor_buf + DFOR_BUF_PART_SIZE);
+		}
+
+		if (dead_pack_res == 0 && unused_pack_res == 0)
+		{
+			/* All stages of packing have succeeded. We can save DFoR packets
+			 * into log */
+			size_t meta_pack_sz;
+			if (ndead > 0)
+			{
+				xlrec.flags |= XLHP_HAS_DEAD_ITEMS;
+
+				meta_pack_sz = log_heap_prune_and_freeze_pack_meta(
+					&dead_meta, dead_meta_pack);
+
+				XLogRegisterBufData(0, &dead_meta_pack, meta_pack_sz);
+				XLogRegisterBufData(0, dead_meta.pack, dead_meta.nbytes);
+			}
+			if (nunused > 0)
+			{
+				xlrec.flags |= XLHP_HAS_NOW_UNUSED_ITEMS;
+
+				meta_pack_sz = log_heap_prune_and_freeze_pack_meta(
+					&unused_meta, unused_meta_pack);
+
+				XLogRegisterBufData(0, &unused_meta_pack, meta_pack_sz);
+				XLogRegisterBufData(0, unused_meta.pack, unused_meta.nbytes);
+			}
+		}
+		else
+		{
+			/* Otherwise, we can't use DFoR compression */
+			xlrec.flags &= ~XLHP_DFOR_COMPRESSED;
+		}
 	}
-	if (nunused > 0)
+
+	if ((xlrec.flags & XLHP_DFOR_COMPRESSED) == 0)
 	{
-		xlrec.flags |= XLHP_HAS_NOW_UNUSED_ITEMS;
+		if (ndead > 0)
+		{
+			xlrec.flags |= XLHP_HAS_DEAD_ITEMS;
 
-		unused_items.ntargets = nunused;
-		XLogRegisterBufData(0, &unused_items,
-							offsetof(xlhp_prune_items, data));
-		XLogRegisterBufData(0, unused,
-							sizeof(OffsetNumber) * nunused);
+			dead_items.ntargets = ndead;
+			XLogRegisterBufData(0, &dead_items,
+								offsetof(xlhp_prune_items, data));
+			XLogRegisterBufData(0, dead, sizeof(OffsetNumber) * ndead);
+		}
+		if (nunused > 0)
+		{
+			xlrec.flags |= XLHP_HAS_NOW_UNUSED_ITEMS;
+
+			unused_items.ntargets = nunused;
+			XLogRegisterBufData(0, &unused_items,
+								offsetof(xlhp_prune_items, data));
+			XLogRegisterBufData(0, unused, sizeof(OffsetNumber) * nunused);
+		}
 	}
+
 	if (nfrozen > 0)
 		XLogRegisterBufData(0, frz_offsets,
 							sizeof(OffsetNumber) * nfrozen);
diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index cd95eec37f1..49e9c46145f 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -17,6 +17,7 @@ OBJS = \
 	gindesc.o \
 	gistdesc.o \
 	hashdesc.o \
+	heapam_xlog_dfor.o \
 	heapdesc.o \
 	logicalmsgdesc.o \
 	mxactdesc.o \
diff --git a/src/backend/access/rmgrdesc/heapam_xlog_dfor.c b/src/backend/access/rmgrdesc/heapam_xlog_dfor.c
new file mode 100644
index 00000000000..47fa000e367
--- /dev/null
+++ b/src/backend/access/rmgrdesc/heapam_xlog_dfor.c
@@ -0,0 +1,109 @@
+#include "lib/bitpack_u16.h"
+#include "access/heapam_xlog_dfor.h"
+
+bool wal_prune_dfor_compression = true;
+
+size_t
+log_heap_prune_and_freeze_pack_meta(const dfor_meta_t *meta, uint8 buf[])
+{
+	size_t caret = 0;
+	caret = bitpack_u16_pack(buf, caret, meta->item_cnt,
+							 XLHPF_META_ITEM_COUNT_SZ);
+	caret = bitpack_u16_pack(buf, caret, meta->delta_wid,
+							 XLHPF_META_DELTA_WIDTH_SZ);
+	caret = bitpack_u16_pack(buf, caret, (meta->exc_cnt == 0) ? 0 : 1,
+							 XLHPF_META_EXCEPTION_FLAG_SZ);
+	if (meta->exc_cnt != 0)
+	{
+		caret = bitpack_u16_pack(buf, caret, meta->exc_cnt,
+								 XLHPF_META_EXCEPTION_COUNT_SZ);
+		caret = bitpack_u16_pack(buf, caret, meta->exc_wid,
+								 XLHPF_META_EXCEPTION_WIDTH_SZ);
+		caret = bitpack_u16_pack(buf, caret, meta->exc_pos_wid,
+								 XLHPF_META_EXCEPTION_POSITION_WIDTH_SZ);
+	}
+	Assert(meta->item_cnt != 0);
+	Assert(meta->delta_wid != 0);
+	Assert(meta->nbytes != 0);
+
+#ifdef USE_ASSERT_CHECKING
+	{
+		dfor_meta_t checker;
+		log_heap_prune_and_freeze_unpack_meta(&checker, buf);
+		Assert(meta->item_cnt == checker.item_cnt);
+		Assert(meta->delta_wid == checker.delta_wid);
+		Assert(meta->exc_cnt == checker.exc_cnt);
+		Assert(meta->exc_wid == checker.exc_wid);
+		Assert(meta->exc_pos_wid == checker.exc_pos_wid);
+	}
+#endif
+	return (caret + 7) / 8; /* the length of packed dfor_meta, in bytes*/
+}
+
+size_t
+log_heap_prune_and_freeze_unpack_meta(dfor_meta_t *meta,
+									  const uint8 packed_meta[])
+{
+	size_t caret = 0;
+	bool exc;
+
+	meta->item_cnt = bitpack_u16_unpack(packed_meta, &caret,
+										XLHPF_META_ITEM_COUNT_SZ);
+	meta->delta_wid = bitpack_u16_unpack(packed_meta, &caret,
+										 XLHPF_META_DELTA_WIDTH_SZ);
+	exc = bitpack_u16_unpack(packed_meta, &caret, XLHPF_META_EXCEPTION_FLAG_SZ);
+
+	if (exc)
+	{
+		meta->exc_cnt = bitpack_u16_unpack(packed_meta, &caret,
+										 XLHPF_META_EXCEPTION_COUNT_SZ);
+		meta->exc_wid = bitpack_u16_unpack(packed_meta, &caret,
+										 XLHPF_META_EXCEPTION_WIDTH_SZ);
+		meta->exc_pos_wid =
+			bitpack_u16_unpack(packed_meta, &caret,
+							   XLHPF_META_EXCEPTION_POSITION_WIDTH_SZ);
+	}
+	else
+	{
+		meta->exc_cnt = 0;
+		meta->exc_wid = 0;
+		meta->exc_pos_wid = 0;
+	}
+	meta->nbytes = dfor_u16_calc_nbytes(*meta);
+
+	Assert(meta->item_cnt != 0);
+	Assert(meta->delta_wid != 0);
+	Assert(meta->nbytes != 0);
+
+	return (caret + 7) / 8;
+}
+
+void
+heap_xlog_deserialize_dfor(char **cursor, int *nitems,
+						   OffsetNumber **items,
+						   uint8 dfor_buf[])
+{
+	dfor_meta_t dfor = {0};
+	size_t packed_meta_nbytes;
+	uniqsortvect_u16_t vect;
+
+	packed_meta_nbytes =
+		log_heap_prune_and_freeze_unpack_meta(&dfor, (uint8*) *cursor);
+
+	*cursor += packed_meta_nbytes;
+
+	dfor.pack = (uint8 *)*cursor;
+	dfor_u16_unpack(&dfor, &vect, 4 * DFOR_BUF_PART_SIZE,
+					dfor_buf);
+
+	*cursor += dfor.nbytes;
+
+	Assert(dfor.nbytes != 0);
+
+	Assert(vect.cnt != 0);
+	Assert(vect.mem_is_outer == true);
+	Assert((void*)vect.m == (void*)dfor_buf);
+
+	*nitems = vect.cnt;
+	*items = vect.m;
+}
diff --git a/src/backend/access/rmgrdesc/heapdesc.c b/src/backend/access/rmgrdesc/heapdesc.c
index 75ae6f9d375..2446f158720 100644
--- a/src/backend/access/rmgrdesc/heapdesc.c
+++ b/src/backend/access/rmgrdesc/heapdesc.c
@@ -15,6 +15,7 @@
 #include "postgres.h"
 
 #include "access/heapam_xlog.h"
+#include "access/heapam_xlog_dfor.h"
 #include "access/rmgrdesc_utils.h"
 #include "access/visibilitymapdefs.h"
 #include "storage/standbydefs.h"
@@ -108,7 +109,8 @@ heap_xlog_deserialize_prune_and_freeze(char *cursor, uint16 flags,
 									   OffsetNumber **frz_offsets,
 									   int *nredirected, OffsetNumber **redirected,
 									   int *ndead, OffsetNumber **nowdead,
-									   int *nunused, OffsetNumber **nowunused)
+									   int *nunused, OffsetNumber **nowunused,
+									   uint8 dfor_buf[])
 {
 	if (flags & XLHP_HAS_FREEZE_PLANS)
 	{
@@ -146,14 +148,22 @@ heap_xlog_deserialize_prune_and_freeze(char *cursor, uint16 flags,
 
 	if (flags & XLHP_HAS_DEAD_ITEMS)
 	{
-		xlhp_prune_items *subrecord = (xlhp_prune_items *) cursor;
+		if(!(flags & XLHP_DFOR_COMPRESSED))
+		{
+			xlhp_prune_items *subrecord = (xlhp_prune_items *) cursor;
 
-		*ndead = subrecord->ntargets;
-		Assert(*ndead > 0);
-		*nowdead = subrecord->data;
+			*ndead = subrecord->ntargets;
+			Assert(*ndead > 0);
+			*nowdead = subrecord->data;
 
-		cursor += offsetof(xlhp_prune_items, data);
-		cursor += sizeof(OffsetNumber) * *ndead;
+			cursor += offsetof(xlhp_prune_items, data);
+			cursor += sizeof(OffsetNumber) * *ndead;
+		}
+		else
+		{
+			heap_xlog_deserialize_dfor(&cursor, ndead, nowdead,
+									   dfor_buf);
+		}
 	}
 	else
 	{
@@ -163,14 +173,22 @@ heap_xlog_deserialize_prune_and_freeze(char *cursor, uint16 flags,
 
 	if (flags & XLHP_HAS_NOW_UNUSED_ITEMS)
 	{
-		xlhp_prune_items *subrecord = (xlhp_prune_items *) cursor;
+		if(!(flags & XLHP_DFOR_COMPRESSED))
+		{
+			xlhp_prune_items *subrecord = (xlhp_prune_items *) cursor;
 
-		*nunused = subrecord->ntargets;
-		Assert(*nunused > 0);
-		*nowunused = subrecord->data;
+			*nunused = subrecord->ntargets;
+			Assert(*nunused > 0);
+			*nowunused = subrecord->data;
 
-		cursor += offsetof(xlhp_prune_items, data);
-		cursor += sizeof(OffsetNumber) * *nunused;
+			cursor += offsetof(xlhp_prune_items, data);
+			cursor += sizeof(OffsetNumber) * *nunused;
+		}
+		else
+		{
+			heap_xlog_deserialize_dfor(&cursor, nunused, nowunused,
+									   dfor_buf + DFOR_BUF_PART_SIZE);
+		}
 	}
 	else
 	{
@@ -309,13 +327,16 @@ heap2_desc(StringInfo buf, XLogReaderState *record)
 			xlhp_freeze_plan *plans;
 			OffsetNumber *frz_offsets;
 
+			uint8 dfor_buf[5 * DFOR_BUF_PART_SIZE];
+
 			char	   *cursor = XLogRecGetBlockData(record, 0, &datalen);
 
 			heap_xlog_deserialize_prune_and_freeze(cursor, xlrec->flags,
 												   &nplans, &plans, &frz_offsets,
 												   &nredirected, &redirected,
 												   &ndead, &nowdead,
-												   &nunused, &nowunused);
+												   &nunused, &nowunused,
+												   dfor_buf);
 
 			appendStringInfo(buf, ", nplans: %u, nredirected: %u, ndead: %u, nunused: %u",
 							 nplans, nredirected, ndead, nunused);
diff --git a/src/backend/access/rmgrdesc/meson.build b/src/backend/access/rmgrdesc/meson.build
index d9000ccd9fd..6ceea4514ec 100644
--- a/src/backend/access/rmgrdesc/meson.build
+++ b/src/backend/access/rmgrdesc/meson.build
@@ -11,6 +11,7 @@ rmgr_desc_sources = files(
   'gistdesc.c',
   'hashdesc.c',
   'heapdesc.c',
+  'heapam_xlog_dfor.c',
   'logicalmsgdesc.c',
   'mxactdesc.c',
   'nbtdesc.c',
diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat
index 83af594d4af..c53e2921c01 100644
--- a/src/backend/utils/misc/guc_parameters.dat
+++ b/src/backend/utils/misc/guc_parameters.dat
@@ -3507,6 +3507,13 @@
   boot_val => 'false',
 },
 
+{ name => 'wal_prune_dfor_compression', type => 'bool', context => 'PGC_SUSET', group => 'WAL_SETTINGS',
+  short_desc => 'Compress dead and unused offset arrays at PRUNE/FREEZE WAL records using DFOR.',
+  long_desc => 'Enables compression of dead and unused OffsetNumber arrays stored in heap PRUNE/FREEZE WAL records using customised delta frame-of-reference encoding.',
+  variable => 'wal_prune_dfor_compression',
+  boot_val => 'true'
+},
+
 { name => 'wal_receiver_create_temp_slot', type => 'bool', context => 'PGC_SIGHUP', group => 'REPLICATION_STANDBY',
   short_desc => 'Sets whether a WAL receiver should create a temporary replication slot if no permanent slot is configured.',
   variable => 'wal_receiver_create_temp_slot',
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 290ccbc543e..bee60b2ebcd 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -31,6 +31,7 @@
 
 #include "access/commit_ts.h"
 #include "access/gin.h"
+#include "access/heapam_xlog_dfor.h"
 #include "access/slru.h"
 #include "access/toast_compression.h"
 #include "access/twophase.h"
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index ac38cddaaf9..4152e789ee1 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -263,6 +263,8 @@
                                         # (change requires restart)
 #wal_compression = off                  # enables compression of full-page writes;
                                         # off, pglz, lz4, zstd, or on
+#wal_prune_dfor_compression = true      # Compress dead and unused offset arrays
+                                        # at PRUNE/FREEZE WAL records using DFOR.
 #wal_init_zero = on                     # zero-fill new WAL files
 #wal_recycle = on                       # recycle WAL files
 #wal_buffers = -1                       # min 32kB, -1 sets based on shared_buffers
diff --git a/src/bin/pg_waldump/.gitignore b/src/bin/pg_waldump/.gitignore
index ec51f41c767..a3c02446b9d 100644
--- a/src/bin/pg_waldump/.gitignore
+++ b/src/bin/pg_waldump/.gitignore
@@ -10,6 +10,7 @@
 /gistdesc.c
 /hashdesc.c
 /heapdesc.c
+/heapam_xlog_dfor.c
 /logicalmsgdesc.c
 /mxactdesc.c
 /nbtdesc.c
@@ -28,5 +29,13 @@
 /xlogreader.c
 /xlogstats.c
 
+# Source files copied from src/backend/lib
+/bitpack_templ.c
+/bitpack_u16.c
+/dfor_templ.c
+/dfor_u16.c
+/vect_templ.c
+/vect_u16.c
+
 # Generated by test suite
 /tmp_check/
diff --git a/src/bin/pg_waldump/Makefile b/src/bin/pg_waldump/Makefile
index aabb87566a2..5e521c1e822 100644
--- a/src/bin/pg_waldump/Makefile
+++ b/src/bin/pg_waldump/Makefile
@@ -8,8 +8,9 @@ export TAR
 
 subdir = src/bin/pg_waldump
 top_builddir = ../../..
-include $(top_builddir)/src/Makefile.global
+dfor_dir := $(top_builddir)/src/backend/lib
 
+include $(top_builddir)/src/Makefile.global
 OBJS = \
 	$(RMGRDESCOBJS) \
 	$(WIN32RES) \
@@ -20,10 +21,13 @@ OBJS = \
 	xlogreader.o \
 	xlogstats.o
 
+include $(dfor_dir)/Makefile.dfor
+OBJS += $(OBJS_DFOR)
+
 override CPPFLAGS := -DFRONTEND -I$(libpq_srcdir) $(CPPFLAGS)
 LDFLAGS_INTERNAL += -L$(top_builddir)/src/fe_utils -lpgfeutils
 
-RMGRDESCSOURCES = $(sort $(notdir $(wildcard $(top_srcdir)/src/backend/access/rmgrdesc/*desc*.c)))
+RMGRDESCSOURCES = $(sort $(notdir $(wildcard $(top_srcdir)/src/backend/access/rmgrdesc/*desc*.c))) heapam_xlog_dfor.c
 RMGRDESCOBJS = $(patsubst %.c,%.o,$(RMGRDESCSOURCES))
 
 
@@ -32,6 +36,24 @@ all: pg_waldump
 pg_waldump: $(OBJS) | submake-libpgport
 	$(CC) $(CFLAGS) $^ $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
 
+bitpack_templ.c: % : $(top_srcdir)/src/backend/lib/%
+	rm -f $@ && $(LN_S) $< .
+
+bitpack_u16.c: % : $(top_srcdir)/src/backend/lib/% bitpack_templ.c
+	rm -f $@ && $(LN_S) $< .
+
+dfor_templ.c: % : $(top_srcdir)/src/backend/lib/%
+	rm -f $@ && $(LN_S) $< .
+
+dfor_u16.c: % : $(top_srcdir)/src/backend/lib/% dfor_templ.c
+	rm -f $@ && $(LN_S) $< .
+
+vect_templ.c: % : $(top_srcdir)/src/backend/lib/%
+	rm -f $@ && $(LN_S) $< .
+
+vect_u16.c: % : $(top_srcdir)/src/backend/lib/% vect_templ.c
+	rm -f $@ && $(LN_S) $< .
+
 xlogreader.c: % : $(top_srcdir)/src/backend/access/transam/%
 	rm -f $@ && $(LN_S) $< .
 
diff --git a/src/bin/pg_waldump/meson.build b/src/bin/pg_waldump/meson.build
index 5296f21b82c..c33be88712c 100644
--- a/src/bin/pg_waldump/meson.build
+++ b/src/bin/pg_waldump/meson.build
@@ -10,6 +10,7 @@ pg_waldump_sources = files(
 pg_waldump_sources += rmgr_desc_sources
 pg_waldump_sources += xlogreader_sources
 pg_waldump_sources += files('../../backend/access/transam/xlogstats.c')
+pg_waldump_sources += dfor_sources
 
 if host_system == 'windows'
   pg_waldump_sources += rc_bin_gen.process(win32ver_rc, extra_args: [
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index fdca7d821c8..5ce885e6324 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -15,6 +15,7 @@
 #define HEAPAM_XLOG_H
 
 #include "access/htup.h"
+#include "access/htup_details.h"
 #include "access/xlogreader.h"
 #include "lib/stringinfo.h"
 #include "storage/buf.h"
@@ -341,6 +342,8 @@ typedef struct xl_heap_prune
 #define		XLHP_VM_ALL_VISIBLE			(1 << 8)
 #define		XLHP_VM_ALL_FROZEN			(1 << 9)
 
+#define		XLHP_DFOR_COMPRESSED		(1 << 10)
+
 /*
  * xlhp_freeze_plan describes how to freeze a group of one or more heap tuples
  * (appears in xl_heap_prune's xlhp_freeze_plans sub-record)
@@ -494,6 +497,7 @@ extern void heap_xlog_deserialize_prune_and_freeze(char *cursor, uint16 flags,
 												   OffsetNumber **frz_offsets,
 												   int *nredirected, OffsetNumber **redirected,
 												   int *ndead, OffsetNumber **nowdead,
-												   int *nunused, OffsetNumber **nowunused);
+												   int *nunused, OffsetNumber **nowunused,
+												   uint8 dfor_buf[]);
 
-#endif							/* HEAPAM_XLOG_H */
+#endif							/* HEAPAM_XLOG_H */
\ No newline at end of file
diff --git a/src/include/access/heapam_xlog_dfor.h b/src/include/access/heapam_xlog_dfor.h
new file mode 100644
index 00000000000..274b14e891e
--- /dev/null
+++ b/src/include/access/heapam_xlog_dfor.h
@@ -0,0 +1,137 @@
+#ifndef HEAPAM_XLOG_DFOR_H
+#define HEAPAM_XLOG_DFOR_H
+
+#include "postgres.h"
+
+#include "access/htup_details.h"
+#include "lib/dfor_u16.h"
+#include "storage/bufpage.h"
+
+/*
+ * DFoR's meta block for PRUNE/FREEZE record
+ *
+ * A meta block contains parameters required for decompression of the following
+ * DFoR pack. It is densely bit-packed. If the exception flag is zero, fields
+ * pertaining to exceptions is absent, which means that DFoR pack does not
+ * contain exceptions. Calculation of field widths takes into account
+ * next considerations:
+ *
+ * Max Item Count should be more or equal to MaxHeapTuplesPerPage. Since we
+ can't calculate
+ * MaxHeapTuplesPerPage on preprocessor stage, we intentionally overestimate it
+ * as:
+ *       Max Item Count > BLCKSZ / Min Tuple Size = BLCKSZ / 24
+ * to provide a margin. In general, depending on BLCKSZ, it should not result in
+ * DFoR meta block overhead.
+ * For instance, for a block size of 32768, we have Max Item Count = 1366, and
+ * it needs 11 bits width field.
+ *
+ * Size of field Item Count:
+ *       ITEM_COUNT_SZ = log2(MaxItemCount).
+ *
+ * Maximum Delta Width is equal to ITEM_COUNT_SZ. So DELTA_WIDTH_SZ in a DFoR
+ * meta block can be calculated as:
+ *      DELTA_WIDTH_SZ >= log2(Max Delta Width) = log2(ITEM_COUNT_SZ)
+ *
+ * Max Exception Count = 0.1 * MaxItemCount, according to DFoR algorithm which
+ * guarantees that not less than 90% of items will be covered without using
+ * exceptions. So:
+ *      EXCEPTION_COUNT_SZ >= log2(0.1 * MaxItemCount).
+ *
+ * An exception is part of a delta, exceeding choosen delta width. Exception is
+ * saved in separated part of DFoR pack and, since delta width is not less
+ * than 1:
+ *    EXCEPTION_WIDTH_SZ >= log2(Max Delta Width - 1) = log2(ITEM_COUNT_SZ - 1).
+ *
+ * An exception's position shows the position of of a delta to wich the
+ * exception has to be applied. Values of an exception position must cover the
+ * same value range as an Item Count, so the Max Width of an Exception Position
+ * is equal to width of Delta. Consequently, the size of Exception Position
+ * Width calculated as:
+ *     EXCEPTION_POSITION_WIDTH_SIZE = log2(Max Delta Width) = DELTA_WIDTH_SZ
+ *
+ * For example, Meta for BLCKSZ equal to 32768 has next sizes of field
+ * | sect. | byte | bits      |   param           |  size  | range of values |
+ * |-------|------|-----------|-------------------|--------|-----------------|
+ * |  main | 0, 1 | 0-10      | item count        | 11 bit | 1...1365        |
+ * |  main |    1 | 11-14     | delta width       |  4 bit | 1...11          |
+ * |  main |    1 | 15        | extra sect. flag  |  1 bit | 0...1           |
+ * | extra |    2 | 16-23     | exception count   |  8 bit | 0...137         |
+ * | extra |    3 | 24-27     | exception width   |  4 bit | 0...10          |
+ * | extra | 3, 4 | 28-35     | except pos. width |  4 bit | 1...11          |
+ */
+
+/*
+ * The sizes of fields in the compressed DFoR Meta structure of an
+ * XLOG_HEAP2_PRUNE* record.
+ */
+#if BLCKSZ == 32768
+#define XLHPF_META_ITEM_COUNT_SZ  11
+#define XLHPF_META_DELTA_WIDTH_SZ 4
+#elif BLCKSZ == 16384
+#define XLHPF_META_ITEM_COUNT_SZ  10
+#define XLHPF_META_DELTA_WIDTH_SZ 4
+#elif BLCKSZ == 8192
+#define XLHPF_META_ITEM_COUNT_SZ  9
+#define XLHPF_META_DELTA_WIDTH_SZ 4
+#elif BLCKSZ == 4096
+#define XLHPF_META_ITEM_COUNT_SZ  8
+#define XLHPF_META_DELTA_WIDTH_SZ 4
+#elif BLCKSZ == 2048
+#define XLHPF_META_ITEM_COUNT_SZ  7
+#define XLHPF_META_DELTA_WIDTH_SZ 3
+#elif BLCKSZ == 1024
+#define XLHPF_META_ITEM_COUNT_SZ  6
+#define XLHPF_META_DELTA_WIDTH_SZ 3
+#elif BLCKSZ == 512
+#define XLHPF_META_ITEM_COUNT_SZ  5
+#define XLHPF_META_DELTA_WIDTH_SZ 3
+#elif BLCKSZ == 256
+#define XLHPF_META_ITEM_COUNT_SZ  4
+#define XLHPF_META_DELTA_WIDTH_SZ 3
+#elif BLCKSZ == 128
+#define XLHPF_META_ITEM_COUNT_SZ  3
+#define XLHPF_META_DELTA_WIDTH_SZ 2
+#elif BLCKSZ == 64
+#define XLHPF_META_ITEM_COUNT_SZ  2
+#define XLHPF_META_DELTA_WIDTH_SZ 2
+#else
+#error "Unsupported BLCKSZ in XLog Heap And Prune."
+#endif
+
+#define XLHPF_META_EXCEPTION_FLAG_SZ 1 /* Flag about Extra Section presence */
+
+/* Size of Exception Count field */
+#if XLHPF_META_ITEM_COUNT_SZ > 6
+#define XLHPF_META_EXCEPTION_COUNT_SZ XLHPF_META_ITEM_COUNT_SZ - 3
+#else
+#define XLHPF_META_EXCEPTION_COUNT_SZ XLHPF_META_ITEM_COUNT_SZ 3
+#endif
+
+#define XLHPF_META_EXCEPTION_WIDTH_SZ \
+	XLHPF_META_DELTA_WIDTH_SZ /* Size of Exception Width field */
+
+#define XLHPF_META_EXCEPTION_POSITION_WIDTH_SZ \
+	XLHPF_META_DELTA_WIDTH_SZ /* Size of Exception Position width field */
+
+/* Maximal size of packed meta */
+#define MAX_PACKED_META_SIZE \
+	(XLHPF_META_ITEM_COUNT_SZ + XLHPF_META_DELTA_WIDTH_SZ +                  \
+	 XLHPF_META_EXCEPTION_FLAG_SZ + XLHPF_META_EXCEPTION_POSITION_WIDTH_SZ + \
+	 XLHPF_META_EXCEPTION_WIDTH_SZ + XLHPF_META_EXCEPTION_COUNT_SZ + 7) / 8
+
+/* The size of a typical chunk of memory used by dfor_pack */
+#define DFOR_BUF_PART_SIZE MaxHeapTuplesPerPage * sizeof(uint16)
+
+extern bool wal_prune_dfor_compression; /* GUC */
+
+extern size_t log_heap_prune_and_freeze_pack_meta(const dfor_meta_t *meta,
+												  uint8 buf[]);
+
+extern size_t log_heap_prune_and_freeze_unpack_meta(dfor_meta_t *meta,
+													const uint8 packed_meta[]);
+
+extern void heap_xlog_deserialize_dfor(char **cursor, int *nitems,
+									   OffsetNumber **items, uint8 dfor_buf[]);
+
+#endif							/* HEAPAM_XLOG_DFOR_H */
\ No newline at end of file
diff --git a/src/test/dfor/Makefile b/src/test/dfor/Makefile
index 43f0d51fb70..ee88f8b77cd 100644
--- a/src/test/dfor/Makefile
+++ b/src/test/dfor/Makefile
@@ -47,6 +47,8 @@ check-unit: $(TESTS)
 	cd $(top_builddir)/$(subdir) && \
 	   $(PROVE) $(PROVE_FLAGS) \
 	    -v $(addprefix ./,$(if $(PROVE_TESTS), $(PROVE_TESTS), $(TESTS)))
+# The example of using the check-unit rule:
+#		make check-unit PROVE_TESTS='test_dfor_u16' PROVE_FLAGS='--verbose'
 
 check: check-unit
 
diff --git a/src/test/recovery/t/052_prune_dfor_compression.pl b/src/test/recovery/t/052_prune_dfor_compression.pl
new file mode 100644
index 00000000000..951478fbbd3
--- /dev/null
+++ b/src/test/recovery/t/052_prune_dfor_compression.pl
@@ -0,0 +1,283 @@
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# ------------------------------------------------------------
+# Workload generating dead tuples and PRUNE WAL
+# ------------------------------------------------------------
+sub generate_prune_workload
+{
+	my ($node, $workload) = @_;
+
+
+	my $start_lsn;
+	my $end_lsn;
+
+	if ($workload eq "vacuum_with_index"
+		|| $workload eq "vacuum_no_index")
+	{
+		$node->safe_psql('postgres', q{
+			CREATE TABLE t_prune (
+				id int,
+				val text
+			) WITH (fillfactor = 100, autovacuum_enabled = false);
+		});
+
+		$node->safe_psql('postgres', q{
+			SET vacuum_freeze_min_age = 0;
+			SET vacuum_freeze_table_age = 0;
+		});
+
+		# -------------------------
+		# Phase 1: INSERT
+		# -------------------------
+		$node->safe_psql('postgres', q{
+			INSERT INTO t_prune
+			SELECT g, 'x'
+			FROM generate_series(1,3000000) g;
+		});
+
+		# Optional index
+		if ($workload eq "vacuum_with_index")
+		{
+			$node->safe_psql('postgres', q{
+				CREATE INDEX ON t_prune(id);
+			});
+		}
+		# -------------------------
+		# Phase 2: DELETE + VACUUM
+		# -------------------------
+		$node->safe_psql('postgres', q{
+			DELETE FROM t_prune
+			WHERE id % 500 <> 0;
+		});
+
+		# Force WAL flush and capture LSN
+		$start_lsn = $node->safe_psql('postgres', q{
+			SELECT pg_current_wal_flush_lsn();
+		});
+
+		# VACUUM cycles to trigger PRUNE
+		for my $i (1..3)
+		{
+			$node->safe_psql('postgres', q{ VACUUM FREEZE t_prune; });
+		}
+
+		$end_lsn = $node->safe_psql('postgres', q{
+			SELECT pg_current_wal_flush_lsn();
+		});
+	}
+	else
+	{
+		die "Workload is not defined: workload=$workload";
+	}
+
+	chomp($start_lsn);
+	print "Captured start LSN: $start_lsn\n";
+	chomp($end_lsn);
+	print "Captured end LSN: $end_lsn\n";
+
+	return ($start_lsn, $end_lsn);
+}
+
+# ------------------------------------------------------------
+# WAL analyzer
+# ------------------------------------------------------------
+sub collect_wal_stats
+{
+	my ($node, $start_lsn, $end_lsn) = @_;
+
+	my $wal_dir = $node->data_dir . "/pg_wal";
+
+	print "wal_dir=" . $wal_dir . "\n";
+	print "collect_wal_stats: start_lsn=$start_lsn\n";
+	print "collect_wal_stats: end_lsn=$end_lsn\n";
+
+	my $cmd;
+
+	if (defined $end_lsn && $end_lsn ne '')
+	{
+		$cmd = "pg_waldump -p $wal_dir -s $start_lsn -e $end_lsn 2>/dev/null";
+	}
+	else
+	{
+		$cmd = "pg_waldump -p $wal_dir -s $start_lsn 2>/dev/null";
+	}
+
+	my @lines = `$cmd`;
+
+	# -------------------------
+	# Counters
+	# -------------------------
+	my $total_records = 0;
+	my $total_bytes   = 0;
+
+	my $prune_records = 0;
+	my $prune_bytes   = 0;
+
+	foreach my $line (@lines)
+	{
+		# Extract total record size
+		if ($line =~ /len \(rec\/tot\):\s*\d+\/\s*(\d+)/)
+		{
+			my $size = $1;
+
+			$total_records++;
+			$total_bytes += $size;
+
+			# PRUNE-specific tracking
+			if ($line =~ /PRUNE_VACUUM_SCAN/)
+			{
+				$prune_records++;
+				$prune_bytes += $size;
+			}
+		}
+	}
+
+	if ($total_records == 0)
+	{
+		die "No WAL records found in range $start_lsn → $end_lsn";
+	}
+
+	print "TOTAL: records=$total_records; bytes=$total_bytes\n";
+	print "PRUNE: records=$prune_records; bytes=$prune_bytes\n";
+
+	return {
+		total_records => $total_records,
+		total_bytes   => $total_bytes,
+		prune_records => $prune_records,
+		prune_bytes   => $prune_bytes,
+	};
+}
+
+# ------------------------------------------------------------
+# Run test on a fresh cluster
+# ------------------------------------------------------------
+sub run_cluster_test
+{
+	my ($name, $compression, $workload) = @_;
+
+	my $node = PostgreSQL::Test::Cluster->new($name);
+
+	$node->init;
+
+	$node->append_conf('postgresql.conf', qq{
+		wal_level = replica
+		autovacuum = off
+		wal_prune_dfor_compression = $compression
+ 		wal_keep_size = '1GB'
+		max_wal_size = '20GB'
+	});
+
+	$node->start;
+
+	my ($start_lsn, $end_lsn) = generate_prune_workload($node, $workload);
+
+	$node->stop;
+
+	return collect_wal_stats($node, $start_lsn, $end_lsn);
+}
+
+# ------------------------------------------------------------
+# Formatting helpers
+# ------------------------------------------------------------
+
+sub _pct_reduction
+{
+	my ($before, $after) = @_;
+	return "N/A" if $before == 0;
+
+	my $pct = 100 * ($before - $after) / $before;
+	return sprintf("%d%%", int($pct + 0.5));
+}
+
+sub _ratio
+{
+	my ($before, $after) = @_;
+	return "N/A" if $after == 0;
+
+	my $r = $before / $after;
+	return sprintf("%.1fx", $r);
+}
+
+# ------------------------------------------------------------
+# Report: total WAL stats
+# ------------------------------------------------------------
+sub report_wal_diff
+{
+	my ($off, $on) = @_;
+
+	my $b_bytes = $off->{total_bytes};
+	my $a_bytes = $on->{total_bytes};
+
+	printf "%-20s %17s %17s %11s\n",
+		"-" x 20, "-" x 17, "-" x 17, "-" x 11;
+
+	printf "%-20s %17s %17s %11s\n",
+		"", "DFOR off, bytes", "DFOR on, bytes", "Reduction";
+
+	printf "%-20s %17s %17s %11s\n",
+		"-" x 20, "-" x 17, "-" x 17, "-" x 11;
+
+	printf "%-20s %17d %17d %11s\n",
+		"WAL total size",
+		$off->{total_bytes},
+		$on->{total_bytes},
+		_pct_reduction($off->{total_bytes}, $on->{total_bytes});
+
+	printf "%-20s %17d %17d %11s\n\n",
+		"Prune records size",
+		$off->{prune_bytes},
+		$on->{prune_bytes},
+		_ratio($off->{prune_bytes}, $on->{prune_bytes});
+}
+
+# ------------------------------------------------------------
+# Scenario 1: VACUUM without index
+# ------------------------------------------------------------
+my $off_noidx = run_cluster_test("prune_off_noidx", "off", "vacuum_no_index");
+my $on_noidx  = run_cluster_test("prune_on_noidx",  "on",  "vacuum_no_index");
+
+cmp_ok(
+	$off_noidx->{prune_bytes},
+	'>',
+	$on_noidx->{prune_bytes},
+	'DFOR reduces the PRUNE WAL size on vacuuming a table having no index.'
+);
+
+cmp_ok(
+	$off_noidx->{total_bytes},
+	'>',
+	$on_noidx->{total_bytes},
+	'DFOR reduces the total WAL size on vacuuming a table having no index.'
+);
+
+print "\n\n=== VACUUM (table with no index) ===\n";
+report_wal_diff($off_noidx, $on_noidx);
+
+# ------------------------------------------------------------
+# Scenario 2: VACUUM with index
+# ------------------------------------------------------------
+my $off_idx = run_cluster_test("prune_off_idx", "off", "vacuum_with_index");
+my $on_idx  = run_cluster_test("prune_on_idx",  "on",  "vacuum_with_index");
+
+cmp_ok(
+	$off_idx->{prune_bytes},
+	'>',
+	$on_idx->{prune_bytes},
+	'DFOR reduces the PRUNE WAL size on vacuuming a table having an index.'
+);
+
+cmp_ok(
+	$off_idx->{total_bytes},
+	'>=',
+	$on_idx->{total_bytes},
+	'DFOR reduces the total WAL size on vacuuming a table having an index.'
+);
+
+print "\n\n=== VACUUM (table with index) ===\n";
+report_wal_diff($off_idx, $on_idx);
+
+done_testing();
-- 
2.53.0

