From 2f8d219e4524e95bcc4744695182b930f37de90b Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Sun, 7 Jan 2024 17:55:31 -0500
Subject: [PATCH v2 12/17] Merge prune and freeze records

When there are both tuples to prune and freeze on a page, emit a single,
combined prune record containing the offsets for pruning and the freeze
plans and offsets for freezing. This will reduce the number of WAL
records emitted.
---
 src/backend/access/heap/heapam.c    | 42 ++++++++++++--
 src/backend/access/heap/pruneheap.c | 85 +++++++++++++----------------
 src/include/access/heapam_xlog.h    | 20 +++++--
 3 files changed, 90 insertions(+), 57 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index a3691584c55..a8f35eba3c9 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -8803,24 +8803,28 @@ heap_xlog_prune(XLogReaderState *record)
 	if (action == BLK_NEEDS_REDO)
 	{
 		Page		page = (Page) BufferGetPage(buffer);
-		OffsetNumber *end;
 		OffsetNumber *redirected;
 		OffsetNumber *nowdead;
 		OffsetNumber *nowunused;
 		int			nredirected;
 		int			ndead;
 		int			nunused;
+		int			nplans;
 		Size		datalen;
+		xl_heap_freeze_plan *plans;
+		OffsetNumber *frz_offsets;
+		int			curoff = 0;
 
-		redirected = (OffsetNumber *) XLogRecGetBlockData(record, 0, &datalen);
-
+		nplans = xlrec->nplans;
 		nredirected = xlrec->nredirected;
 		ndead = xlrec->ndead;
-		end = (OffsetNumber *) ((char *) redirected + datalen);
+		nunused = xlrec->nunused;
+
+		plans = (xl_heap_freeze_plan *) XLogRecGetBlockData(record, 0, &datalen);
+		redirected = (OffsetNumber *) &plans[nplans];
 		nowdead = redirected + (nredirected * 2);
 		nowunused = nowdead + ndead;
-		nunused = (end - nowunused);
-		Assert(nunused >= 0);
+		frz_offsets = nowunused + nunused;
 
 		/* Update all line pointers per the record, and repair fragmentation */
 		heap_page_prune_execute(buffer,
@@ -8828,6 +8832,32 @@ heap_xlog_prune(XLogReaderState *record)
 								nowdead, ndead,
 								nowunused, nunused);
 
+		for (int p = 0; p < nplans; p++)
+		{
+			HeapTupleFreeze frz;
+
+			/*
+			 * Convert freeze plan representation from WAL record into
+			 * per-tuple format used by heap_execute_freeze_tuple
+			 */
+			frz.xmax = plans[p].xmax;
+			frz.t_infomask2 = plans[p].t_infomask2;
+			frz.t_infomask = plans[p].t_infomask;
+			frz.frzflags = plans[p].frzflags;
+			frz.offset = InvalidOffsetNumber;	/* unused, but be tidy */
+
+			for (int i = 0; i < plans[p].ntuples; i++)
+			{
+				OffsetNumber offset = frz_offsets[curoff++];
+				ItemId		lp;
+				HeapTupleHeader tuple;
+
+				lp = PageGetItemId(page, offset);
+				tuple = (HeapTupleHeader) PageGetItem(page, lp);
+				heap_execute_freeze_tuple(tuple, &frz);
+			}
+		}
+
 		/*
 		 * Note: we don't worry about updating the page's prunability hints.
 		 * At worst this will cause an extra prune cycle to occur soon.
diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c
index ca64c45d8a3..70d35a21e98 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -605,6 +605,9 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 	 */
 	PageClearFull(page);
 
+	if (do_freeze)
+		heap_freeze_prepared_tuples(buffer, frozen, presult->nfrozen);
+
 	MarkBufferDirty(buffer);
 
 	/*
@@ -615,10 +618,37 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 		xl_heap_prune xlrec;
 		XLogRecPtr	recptr;
 
+		xl_heap_freeze_plan plans[MaxHeapTuplesPerPage];
+		OffsetNumber offsets[MaxHeapTuplesPerPage];
+
 		xlrec.isCatalogRel = RelationIsAccessibleInLogicalDecoding(relation);
-		xlrec.snapshotConflictHorizon = prstate.snapshotConflictHorizon;
 		xlrec.nredirected = prstate.nredirected;
 		xlrec.ndead = prstate.ndead;
+		xlrec.nunused = prstate.nunused;
+		xlrec.nplans = 0;
+
+		/*
+		 * The snapshotConflictHorizon for the whole record should be the most
+		 * conservative of all the horizons calculated for any of the possible
+		 * modifications. If this record will prune tuples, any transactions
+		 * on the standby older than the youngest xmax of the most recently
+		 * removed tuple this record will prune will conflict. If this record
+		 * will freeze tuples, any transactions on the standby with xids older
+		 * than the youngest tuple this record will freeze will conflict.
+		 */
+		if (do_freeze)
+			xlrec.snapshotConflictHorizon = Max(prstate.snapshotConflictHorizon,
+												frz_conflict_horizon);
+		else
+			xlrec.snapshotConflictHorizon = prstate.snapshotConflictHorizon;
+
+		/*
+		 * Prepare deduplicated representation for use in WAL record
+		 * Destructively sorts tuples array in-place.
+		 */
+		if (do_freeze)
+			xlrec.nplans = heap_log_freeze_plan(frozen,
+												presult->nfrozen, plans, offsets);
 
 		XLogBeginInsert();
 		XLogRegisterData((char *) &xlrec, SizeOfHeapPrune);
@@ -630,6 +660,10 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 		 * pretend that they are.  When XLogInsert stores the whole buffer,
 		 * the offset arrays need not be stored too.
 		 */
+		if (xlrec.nplans > 0)
+			XLogRegisterBufData(0, (char *) plans,
+								xlrec.nplans * sizeof(xl_heap_freeze_plan));
+
 		if (prstate.nredirected > 0)
 			XLogRegisterBufData(0, (char *) prstate.redirected,
 								prstate.nredirected *
@@ -643,56 +677,13 @@ heap_page_prune_and_freeze(Relation relation, Buffer buffer,
 			XLogRegisterBufData(0, (char *) prstate.nowunused,
 								prstate.nunused * sizeof(OffsetNumber));
 
-		recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_PRUNE);
-
-		PageSetLSN(BufferGetPage(buffer), recptr);
-	}
-
-	if (do_freeze)
-	{
-		Assert(presult->nfrozen > 0);
-
-		heap_freeze_prepared_tuples(buffer, frozen, presult->nfrozen);
-
-		MarkBufferDirty(buffer);
-
-		/* Now WAL-log freezing if necessary */
-		if (RelationNeedsWAL(relation))
-		{
-			xl_heap_freeze_plan plans[MaxHeapTuplesPerPage];
-			OffsetNumber offsets[MaxHeapTuplesPerPage];
-			int			nplans;
-			xl_heap_freeze_page xlrec;
-			XLogRecPtr	recptr;
-
-			/*
-			 * Prepare deduplicated representation for use in WAL record
-			 * Destructively sorts tuples array in-place.
-			 */
-			nplans = heap_log_freeze_plan(frozen, presult->nfrozen, plans, offsets);
-
-			xlrec.snapshotConflictHorizon = frz_conflict_horizon;
-			xlrec.isCatalogRel = RelationIsAccessibleInLogicalDecoding(relation);
-			xlrec.nplans = nplans;
-
-			XLogBeginInsert();
-			XLogRegisterData((char *) &xlrec, SizeOfHeapFreezePage);
-
-			/*
-			 * The freeze plan array and offset array are not actually in the
-			 * buffer, but pretend that they are.  When XLogInsert stores the
-			 * whole buffer, the arrays need not be stored too.
-			 */
-			XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
-			XLogRegisterBufData(0, (char *) plans,
-								nplans * sizeof(xl_heap_freeze_plan));
+		if (xlrec.nplans > 0)
 			XLogRegisterBufData(0, (char *) offsets,
 								presult->nfrozen * sizeof(OffsetNumber));
 
-			recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_FREEZE_PAGE);
+		recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_PRUNE);
 
-			PageSetLSN(page, recptr);
-		}
+		PageSetLSN(BufferGetPage(buffer), recptr);
 	}
 
 	END_CRIT_SECTION();
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 6488dad5e64..22f236bb52a 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -231,23 +231,35 @@ typedef struct xl_heap_update
  * during opportunistic pruning)
  *
  * The array of OffsetNumbers following the fixed part of the record contains:
+ *	* for each freeze plan: the freeze plan
  *	* for each redirected item: the item offset, then the offset redirected to
  *	* for each now-dead item: the item offset
  *	* for each now-unused item: the item offset
- * The total number of OffsetNumbers is therefore 2*nredirected+ndead+nunused.
- * Note that nunused is not explicitly stored, but may be found by reference
- * to the total record length.
+ *	* for each tuple frozen by the freeze plans: the offset of the item corresponding to that tuple
+ * The total number of OffsetNumbers is therefore
+ * (2*nredirected) + ndead + nunused + (sum[plan.ntuples for plan in plans])
  *
  * Acquires a full cleanup lock.
  */
 typedef struct xl_heap_prune
 {
 	TransactionId snapshotConflictHorizon;
+	uint16		nplans;
 	uint16		nredirected;
 	uint16		ndead;
+	uint16		nunused;
 	bool		isCatalogRel;	/* to handle recovery conflict during logical
 								 * decoding on standby */
-	/* OFFSET NUMBERS are in the block reference 0 */
+	/*
+	 * OFFSET NUMBERS and freeze plans are in the block reference 0 in the
+	 * following order:
+	 *
+	 *		* xl_heap_freeze_plan plans[nplans];
+	 * 		* OffsetNumber redirected[2 * nredirected];
+	 * 		* OffsetNumber nowdead[ndead];
+	 *		* OffsetNumber nowunused[nunused];
+	 * 		* OffsetNumber frz_offsets[...];
+	 */
 } xl_heap_prune;
 
 #define SizeOfHeapPrune (offsetof(xl_heap_prune, isCatalogRel) + sizeof(bool))
-- 
2.40.1

