From 050b6c3fa73c9153aeef58fcd306533c1008802e Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Fri, 26 Apr 2024 08:32:44 +1200
Subject: [PATCH v11 2/3] Refactor tidstore.c memory management.

Previously, TidStoreIterateNext() would expand the set of offsets for
each block into a buffer that it overwrote each time.  In order to be
able to collect the offsets for multiple blocks before working with
them, change the contract.  Now, the offsets are obtained by a separate
call to TidStoreGetBlockOffsets(), which can be called at a later time,
and TidStoreIteratorResult objects are safe to copy and store in a
queue.

This will be used by a later patch, to avoid the need for expensive
extra copies of offset array and associated memory management.
---
 src/backend/access/common/tidstore.c          | 68 +++++++++----------
 src/backend/access/heap/vacuumlazy.c          |  9 ++-
 src/include/access/tidstore.h                 | 12 ++--
 .../modules/test_tidstore/test_tidstore.c     |  9 ++-
 4 files changed, 53 insertions(+), 45 deletions(-)

diff --git a/src/backend/access/common/tidstore.c b/src/backend/access/common/tidstore.c
index fb3949d69f6..c3c1987204b 100644
--- a/src/backend/access/common/tidstore.c
+++ b/src/backend/access/common/tidstore.c
@@ -147,9 +147,6 @@ struct TidStoreIter
 	TidStoreIterResult output;
 };
 
-static void tidstore_iter_extract_tids(TidStoreIter *iter, BlockNumber blkno,
-									   BlocktableEntry *page);
-
 /*
  * Create a TidStore. The TidStore will live in the memory context that is
  * CurrentMemoryContext at the time of this call. The TID storage, backed
@@ -486,13 +483,6 @@ TidStoreBeginIterate(TidStore *ts)
 	iter = palloc0(sizeof(TidStoreIter));
 	iter->ts = ts;
 
-	/*
-	 * We start with an array large enough to contain at least the offsets
-	 * from one completely full bitmap element.
-	 */
-	iter->output.max_offset = 2 * BITS_PER_BITMAPWORD;
-	iter->output.offsets = palloc(sizeof(OffsetNumber) * iter->output.max_offset);
-
 	if (TidStoreIsShared(ts))
 		iter->tree_iter.shared = shared_ts_begin_iterate(ts->tree.shared);
 	else
@@ -503,9 +493,9 @@ TidStoreBeginIterate(TidStore *ts)
 
 
 /*
- * Scan the TidStore and return the TIDs of the next block. The offsets in
- * each iteration result are ordered, as are the block numbers over all
- * iterations.
+ * Return a result that contains the next block number and that can be used to
+ * obtain the set of offsets by calling TidStoreGetBlockOffsets().  The result
+ * is copyable.
  */
 TidStoreIterResult *
 TidStoreIterateNext(TidStoreIter *iter)
@@ -521,10 +511,10 @@ TidStoreIterateNext(TidStoreIter *iter)
 	if (page == NULL)
 		return NULL;
 
-	/* Collect TIDs from the key-value pair */
-	tidstore_iter_extract_tids(iter, (BlockNumber) key, page);
+	iter->output.blkno = key;
+	iter->output.internal_page = page;
 
-	return &(iter->output);
+	return &iter->output;
 }
 
 /*
@@ -540,7 +530,6 @@ TidStoreEndIterate(TidStoreIter *iter)
 	else
 		local_ts_end_iterate(iter->tree_iter.local);
 
-	pfree(iter->output.offsets);
 	pfree(iter);
 }
 
@@ -575,16 +564,19 @@ TidStoreGetHandle(TidStore *ts)
 	return (dsa_pointer) shared_ts_get_handle(ts->tree.shared);
 }
 
-/* Extract TIDs from the given key-value pair */
-static void
-tidstore_iter_extract_tids(TidStoreIter *iter, BlockNumber blkno,
-						   BlocktableEntry *page)
+/*
+ * Given a TidStoreIterResult returned by TidStoreIterateNext(), extract the
+ * offset numbers.  Returns the number of offsets filled in, if <=
+ * max_offsets.  Otherwise, fills in as much as it can in the given space, and
+ * returns the size of the buffer that would be needed.
+ */
+int
+TidStoreGetBlockOffsets(TidStoreIterResult *result,
+						OffsetNumber *offsets,
+						int max_offsets)
 {
-	TidStoreIterResult *result = (&iter->output);
-	int			wordnum;
-
-	result->num_offsets = 0;
-	result->blkno = blkno;
+	BlocktableEntry *page = result->internal_page;
+	int			num_offsets = 0;
 
 	if (page->header.nwords == 0)
 	{
@@ -592,31 +584,33 @@ tidstore_iter_extract_tids(TidStoreIter *iter, BlockNumber blkno,
 		for (int i = 0; i < NUM_FULL_OFFSETS; i++)
 		{
 			if (page->header.full_offsets[i] != InvalidOffsetNumber)
-				result->offsets[result->num_offsets++] = page->header.full_offsets[i];
+			{
+				if (num_offsets < max_offsets)
+					offsets[num_offsets] = page->header.full_offsets[i];
+				num_offsets++;
+			}
 		}
 	}
 	else
 	{
-		for (wordnum = 0; wordnum < page->header.nwords; wordnum++)
+		for (int wordnum = 0; wordnum < page->header.nwords; wordnum++)
 		{
 			bitmapword	w = page->words[wordnum];
 			int			off = wordnum * BITS_PER_BITMAPWORD;
 
-			/* Make sure there is enough space to add offsets */
-			if ((result->num_offsets + BITS_PER_BITMAPWORD) > result->max_offset)
-			{
-				result->max_offset *= 2;
-				result->offsets = repalloc(result->offsets,
-										   sizeof(OffsetNumber) * result->max_offset);
-			}
-
 			while (w != 0)
 			{
 				if (w & 1)
-					result->offsets[result->num_offsets++] = (OffsetNumber) off;
+				{
+					if (num_offsets < max_offsets)
+						offsets[num_offsets] = (OffsetNumber) off;
+					num_offsets++;
+				}
 				off++;
 				w >>= 1;
 			}
 		}
 	}
+
+	return num_offsets;
 }
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index f76ef2e7c63..19c13671666 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -2144,12 +2144,17 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
 		Buffer		buf;
 		Page		page;
 		Size		freespace;
+		OffsetNumber offsets[MaxOffsetNumber];
+		int			num_offsets;
 
 		vacuum_delay_point();
 
 		blkno = iter_result->blkno;
 		vacrel->blkno = blkno;
 
+		num_offsets = TidStoreGetBlockOffsets(iter_result, offsets, lengthof(offsets));
+		Assert(num_offsets <= lengthof(offsets));
+
 		/*
 		 * Pin the visibility map page in case we need to mark the page
 		 * all-visible.  In most cases this will be very cheap, because we'll
@@ -2161,8 +2166,8 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
 		buf = ReadBufferExtended(vacrel->rel, MAIN_FORKNUM, blkno, RBM_NORMAL,
 								 vacrel->bstrategy);
 		LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
-		lazy_vacuum_heap_page(vacrel, blkno, buf, iter_result->offsets,
-							  iter_result->num_offsets, vmbuffer);
+		lazy_vacuum_heap_page(vacrel, blkno, buf, offsets,
+							  num_offsets, vmbuffer);
 
 		/* Now that we've vacuumed the page, record its available space */
 		page = BufferGetPage(buf);
diff --git a/src/include/access/tidstore.h b/src/include/access/tidstore.h
index 32aa9995193..d95cabd7b5e 100644
--- a/src/include/access/tidstore.h
+++ b/src/include/access/tidstore.h
@@ -20,13 +20,14 @@
 typedef struct TidStore TidStore;
 typedef struct TidStoreIter TidStoreIter;
 
-/* Result struct for TidStoreIterateNext */
+/*
+ * Result struct for TidStoreIterateNext.  This is copyable, but should be
+ * treated as opaque.  Call TidStoreGetOffsets() to obtain the offsets.
+ */
 typedef struct TidStoreIterResult
 {
 	BlockNumber blkno;
-	int			max_offset;
-	int			num_offsets;
-	OffsetNumber *offsets;
+	void	   *internal_page;
 } TidStoreIterResult;
 
 extern TidStore *TidStoreCreateLocal(size_t max_bytes, bool insert_only);
@@ -42,6 +43,9 @@ extern void TidStoreSetBlockOffsets(TidStore *ts, BlockNumber blkno, OffsetNumbe
 extern bool TidStoreIsMember(TidStore *ts, ItemPointer tid);
 extern TidStoreIter *TidStoreBeginIterate(TidStore *ts);
 extern TidStoreIterResult *TidStoreIterateNext(TidStoreIter *iter);
+extern int	TidStoreGetBlockOffsets(TidStoreIterResult *result,
+									OffsetNumber *offsets,
+									int max_offsets);
 extern void TidStoreEndIterate(TidStoreIter *iter);
 extern size_t TidStoreMemoryUsage(TidStore *ts);
 extern dsa_pointer TidStoreGetHandle(TidStore *ts);
diff --git a/src/test/modules/test_tidstore/test_tidstore.c b/src/test/modules/test_tidstore/test_tidstore.c
index 3f6a11bf21c..94ddcf1de82 100644
--- a/src/test/modules/test_tidstore/test_tidstore.c
+++ b/src/test/modules/test_tidstore/test_tidstore.c
@@ -267,9 +267,14 @@ check_set_block_offsets(PG_FUNCTION_ARGS)
 	iter = TidStoreBeginIterate(tidstore);
 	while ((iter_result = TidStoreIterateNext(iter)) != NULL)
 	{
-		for (int i = 0; i < iter_result->num_offsets; i++)
+		OffsetNumber offsets[MaxOffsetNumber];
+		int			num_offsets;
+
+		num_offsets = TidStoreGetBlockOffsets(iter_result, offsets, lengthof(offsets));
+		Assert(num_offsets <= lengthof(offsets));
+		for (int i = 0; i < num_offsets; i++)
 			ItemPointerSet(&(items.iter_tids[num_iter_tids++]), iter_result->blkno,
-						   iter_result->offsets[i]);
+						   offsets[i]);
 	}
 	TidStoreEndIterate(iter);
 	TidStoreUnlock(tidstore);
-- 
2.34.1

