From db429b6dd4a18f3cf5343f44c9f553328e697f13 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas@vondra.me>
Date: Fri, 25 Apr 2025 13:22:38 +0200
Subject: [PATCH v20250709 4/6] prefetch for gist indexes

Implements gist_stream_read_next() and gist_ordered_stream_read_next()
callbacks, for different types of scans:

* gist_stream_read_next() is for traditional index scans, returning
  blocks from GISTScanOpaque.

* gist_ordered_stream_read_next() is for scans with results ordered by
  distance, etc.

The ordered scans rely on a pairing heap - the items are fed into it,
but then are read and returned one by one. That would make prefetching
quite useless, so the patch introduces a small queue on top of the
pairing heap, and the items are loaded in batches. This is what
getNextNearestPrefetch() is responsible for.

Note: Right now the batches are always 32 items, which may regress
queries with LIMIT clauses, etc. It should start at 1 and gradually
increase the batch size. Similarly to how prefetch distance grows.

FIXME The memory management of the batches is almost certainly leaky,
needs to be cleaned up.
---
 src/backend/access/gist/gistget.c  | 160 +++++++++++++++++++-
 src/backend/access/gist/gistscan.c | 225 +++++++++++++++++++++++++++++
 src/include/access/gist_private.h  |  16 ++
 3 files changed, 400 insertions(+), 1 deletion(-)

diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c
index 387d9972345..7e292ebb442 100644
--- a/src/backend/access/gist/gistget.c
+++ b/src/backend/access/gist/gistget.c
@@ -21,7 +21,9 @@
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "storage/predicate.h"
+#include "utils/datum.h"
 #include "utils/float.h"
+#include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 
@@ -395,6 +397,7 @@ gistScanPage(IndexScanDesc scan, GISTSearchItem *pageItem,
 	}
 
 	so->nPageData = so->curPageData = 0;
+	so->streamPageData = -1;
 	scan->xs_hitup = NULL;		/* might point into pageDataCxt */
 	if (so->pageDataCxt)
 		MemoryContextReset(so->pageDataCxt);
@@ -460,6 +463,8 @@ gistScanPage(IndexScanDesc scan, GISTSearchItem *pageItem,
 			so->pageData[so->nPageData].heapPtr = it->t_tid;
 			so->pageData[so->nPageData].recheck = recheck;
 			so->pageData[so->nPageData].offnum = i;
+			so->pageData[so->nPageData].allVisible = false;
+			so->pageData[so->nPageData].allVisibleSet = false;
 
 			/*
 			 * In an index-only scan, also fetch the data from the tuple.  The
@@ -496,6 +501,8 @@ gistScanPage(IndexScanDesc scan, GISTSearchItem *pageItem,
 				item->data.heap.heapPtr = it->t_tid;
 				item->data.heap.recheck = recheck;
 				item->data.heap.recheckDistances = recheck_distances;
+				item->data.heap.allVisible = false;
+				item->data.heap.allVisibleSet = false;
 
 				/*
 				 * In an index-only scan, also fetch the data from the tuple.
@@ -589,6 +596,22 @@ getNextNearest(IndexScanDesc scan)
 			/* in an index-only scan, also return the reconstructed tuple. */
 			if (scan->xs_want_itup)
 				scan->xs_hitup = item->data.heap.recontup;
+
+			/*
+			 * If this is index-only scan, determine the VM status, so that
+			 * we can set xs_visible correctly.
+			 */
+			if (scan->xs_want_itup && ! item->data.heap.allVisibleSet)
+			{
+				item->data.heap.allVisibleSet = true;
+				item->data.heap.allVisible
+					= VM_ALL_VISIBLE(scan->heapRelation,
+									 ItemPointerGetBlockNumber(&item->data.heap.heapPtr),
+									 &so->vmBuffer);
+			}
+
+			scan->xs_visible = item->data.heap.allVisible;
+
 			res = true;
 		}
 		else
@@ -605,6 +628,119 @@ getNextNearest(IndexScanDesc scan)
 	return res;
 }
 
+/*
+ * A variant of getNextNearest() that stashes the items into a small buffer, so
+ * that the prefetching can work (getNextNearest returns items one by one).
+ *
+ * XXX Uses a small secondary queue, because getNextNearest() may be modifying
+ * the regular pageData[] buffer.
+ */
+static bool
+getNextNearestPrefetch(IndexScanDesc scan)
+{
+	GISTScanOpaque so = (GISTScanOpaque) scan->opaque;
+	GISTSearchHeapItem *item;
+
+	/* did we use all items from the queue */
+	if (so->queueItem == so->queueUsed)
+	{
+		/* grow the number of items */
+		int		maxitems = Min(Max(1, 2 * so->queueUsed), 32);
+
+		/* FIXME gradually incresse the number of items, not 32 all the time */
+		maxitems = 32;
+
+		so->queueItem = 0;
+		so->queueUsed = 0;
+
+		while (so->queueUsed < maxitems)
+		{
+			if (!getNextNearest(scan))
+				break;
+
+			item = &so->queueItems[so->queueUsed++];
+
+			item->recheck = scan->xs_recheck;
+			item->heapPtr = scan->xs_heaptid;
+			item->recontup = scan->xs_hitup;
+
+			/*
+			 * FIXME free the memory (for tuples and orderbyvals/orderbynulls)
+			 * it's leaking now.
+			 */
+			item->orderbyvals = palloc0(sizeof(Datum) * scan->numberOfOrderBys);
+			item->orderbynulls = palloc0(sizeof(bool) * scan->numberOfOrderBys);
+
+			/*
+			 * copy the distances - might be float8, which may be byref, so use
+			 * datumCopy, otherwise it gets clobbered by other items
+			 */
+			for (int i = 0; i < scan->numberOfOrderBys; i++)
+			{
+				int16   typlen;
+				bool    typbyval;
+
+				/* don't copy NULL values */
+				if (scan->xs_orderbynulls[i])
+					continue;
+
+				get_typlenbyval(so->orderByTypes[i], &typlen, &typbyval);
+
+				item->orderbyvals[i] = datumCopy(scan->xs_orderbyvals[i],
+												 typbyval, typlen);
+			}
+
+			memcpy(item->orderbynulls,
+				   scan->xs_orderbynulls,
+				   sizeof(bool) * scan->numberOfOrderBys);
+
+			/* reset, so that we don't free it accidentally */
+			scan->xs_hitup = NULL;
+		}
+
+		/* found no new items, we're done */
+		if (so->queueUsed == 0)
+			return false;
+
+		/* restart the stream for the new queue */
+		so->queueStream = -1;
+		so->lastBlock = InvalidBlockNumber; /* XXX needed? */
+		read_stream_reset(scan->xs_rs);
+	}
+
+	/* next item to return */
+	item = &so->queueItems[so->queueItem++];
+
+	scan->xs_heaptid = item->heapPtr;
+	scan->xs_recheck = item->recheck;
+
+	/* here it's fine to copy the datum (even if byref pointers) */
+	memcpy(scan->xs_orderbyvals,
+		   item->orderbyvals,
+		   sizeof(Datum) * scan->numberOfOrderBys);
+
+	memcpy(scan->xs_orderbynulls,
+		   item->orderbynulls,
+		   sizeof(bool) * scan->numberOfOrderBys);
+
+	/* in an index-only scan, also return the reconstructed tuple. */
+	if (scan->xs_want_itup)
+		scan->xs_hitup = item->recontup;
+
+	if (scan->xs_want_itup && ! item->allVisibleSet)
+	{
+		item->allVisibleSet = true;
+		item->allVisible
+			= VM_ALL_VISIBLE(scan->heapRelation,
+							 ItemPointerGetBlockNumber(&item->heapPtr),
+							 &so->vmBuffer);
+	}
+
+	scan->xs_visible = item->allVisible;
+
+	return true;
+}
+
 /*
  * gistgettuple() -- Get the next tuple in the scan
  */
@@ -642,7 +778,10 @@ gistgettuple(IndexScanDesc scan, ScanDirection dir)
 	if (scan->numberOfOrderBys > 0)
 	{
 		/* Must fetch tuples in strict distance order */
-		return getNextNearest(scan);
+		if (scan->xs_rs)
+			return getNextNearestPrefetch(scan);
+		else
+			return getNextNearest(scan);
 	}
 	else
 	{
@@ -677,6 +816,18 @@ gistgettuple(IndexScanDesc scan, ScanDirection dir)
 				if (scan->xs_want_itup)
 					scan->xs_hitup = so->pageData[so->curPageData].recontup;
 
+				/* determine VM status, if not done already */
+				if (scan->xs_want_itup && !so->pageData[so->curPageData].allVisibleSet)
+				{
+					so->pageData[so->curPageData].allVisibleSet = true;
+					so->pageData[so->curPageData].allVisible
+						= VM_ALL_VISIBLE(scan->heapRelation,
+										 ItemPointerGetBlockNumber(&scan->xs_heaptid),
+										 &so->vmBuffer);
+				}
+
+				scan->xs_visible = so->pageData[so->curPageData].allVisible;
+
 				so->curPageData++;
 
 				return true;
@@ -734,6 +885,13 @@ gistgettuple(IndexScanDesc scan, ScanDirection dir)
 
 				pfree(item);
 			} while (so->nPageData == 0);
+
+			if (scan->xs_rs)
+			{
+				so->streamPageData = -1;
+				so->lastBlock = InvalidBlockNumber;
+				read_stream_reset(scan->xs_rs);
+			}
 		}
 	}
 }
diff --git a/src/backend/access/gist/gistscan.c b/src/backend/access/gist/gistscan.c
index d8ba7f7eff5..df05f282aa1 100644
--- a/src/backend/access/gist/gistscan.c
+++ b/src/backend/access/gist/gistscan.c
@@ -17,6 +17,8 @@
 #include "access/gist_private.h"
 #include "access/gistscan.h"
 #include "access/relscan.h"
+#include "access/table.h"
+#include "optimizer/cost.h"
 #include "utils/float.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -70,6 +72,176 @@ pairingheap_GISTSearchItem_cmp(const pairingheap_node *a, const pairingheap_node
  * Index AM API functions for scanning GiST indexes
  */
 
+/*
+ * gist_stream_read_next
+ *		Return the next block to read from the read stream.
+ *
+ * Returns the next block from the current leaf page. The first block is
+ * when accessing the first tuple, after already receiving the TID from the
+ * index (for the item itemIndex points at).
+ *
+ * With index-only scans this skips all-visible pages. The visibility info
+ * is stored, so that we can later pass it to the scan (we must not access
+ * the VM again, the bit might have changes, and the read stream would get
+ * out of sync (we'd get different blocks than we expect expect).
+ *
+ * Returns the block number to get from the read stream. InvalidBlockNumber
+ * means we've ran out of item on the current leaf page - the stream will
+ * end, and we'll need to reset it after reading the next page (or after
+ * changing the scan direction).
+ *
+ * XXX Should skip duplicate blocks (for correlated indexes). But that's
+ * not implemented yet.
+ */
+static BlockNumber
+gist_stream_read_next(ReadStream *stream,
+					  void *callback_private_data,
+					  void *per_buffer_data)
+{
+	IndexScanDesc	scan = (IndexScanDesc) callback_private_data;
+	GISTScanOpaque	so = (GISTScanOpaque) scan->opaque;
+	BlockNumber		block = InvalidBlockNumber;
+
+	/*
+	 * Is this the first request for the read stream (possibly after a reset)?
+	 * If yes, initialize the stream to the current item (itemIndex).
+	 */
+	if (so->streamPageData == (OffsetNumber) - 1)
+		so->streamPageData = (so->curPageData - 1);
+
+	/*
+	 * Find the next block to read. For plain index scans we will return the
+	 * very next item, but with index-only scans we skip TIDs from all-visible
+	 * pages (because we won't read those).
+	 */
+	while (so->streamPageData < so->nPageData)
+	{
+		ItemPointer		tid;
+		GISTSearchHeapItem  *item;
+
+		item = &so->pageData[so->streamPageData];
+
+		tid = &item->heapPtr;
+		block = ItemPointerGetBlockNumber(tid);
+
+		/*
+		 * For index-only scans, check the VM and remember the result. If the page
+		 * is all-visible, don't return the block number, try reading the next one.
+		 *
+		 * XXX Maybe this could use the same logic to check for duplicate blocks,
+		 * and reuse the VM result if possible.
+		 */
+		if (scan->xs_want_itup)
+		{
+			if (!item->allVisibleSet)
+			{
+				item->allVisibleSet = true;
+				item->allVisible = VM_ALL_VISIBLE(scan->heapRelation,
+												  ItemPointerGetBlockNumber(tid),
+												  &so->vmBuffer);
+			}
+
+			/* don't prefetch this all-visible block, try the next one */
+			if (item->allVisible)
+				block = InvalidBlockNumber;
+		}
+
+		/* advance to the next item, assuming the current scan direction */
+		so->streamPageData++;
+
+		/* don't return the same block twice (and remember this one) */
+		if (so->lastBlock == block)
+			block = InvalidBlockNumber;
+
+		/* Did we find a valid block? If yes, we're done. */
+		if (block != InvalidBlockNumber)
+			break;
+	}
+
+	/* remember the block we're returning */
+	so->lastBlock = block;
+
+	return block;
+}
+
+/*
+ * gist_ordered_stream_read_next
+ *		Return the next block to read from the read stream.
+ *
+ * A variant of gist_stream_read_next for ordered scans, returning items from
+ * a small secondary queue.
+ */
+static BlockNumber
+gist_ordered_stream_read_next(ReadStream *stream,
+							  void *callback_private_data,
+							  void *per_buffer_data)
+{
+	IndexScanDesc	scan = (IndexScanDesc) callback_private_data;
+	GISTScanOpaque	so = (GISTScanOpaque) scan->opaque;
+	BlockNumber		block = InvalidBlockNumber;
+
+	/*
+	 * Is this the first request for the read stream (possibly after a reset)?
+	 * If yes, initialize the stream to the current item (itemIndex).
+	 */
+	if (so->queueStream == - 1)
+		so->queueStream = (so->queueItem - 1);
+
+	/*
+	 * Find the next block to read. For plain index scans we will return the
+	 * very next item, but with index-only scans we skip TIDs from all-visible
+	 * pages (because we won't read those).
+	 */
+	while (so->queueStream < so->queueUsed)
+	{
+		ItemPointer		tid;
+		GISTSearchHeapItem  *item;
+
+		item = &so->queueItems[so->queueStream];
+
+		tid = &item->heapPtr;
+		block = ItemPointerGetBlockNumber(tid);
+
+		/*
+		 * For index-only scans, check the VM and remember the result. If the page
+		 * is all-visible, don't return the block number, try reading the next one.
+		 *
+		 * XXX Maybe this could use the same logic to check for duplicate blocks,
+		 * and reuse the VM result if possible.
+		 */
+		if (scan->xs_want_itup)
+		{
+			if (!item->allVisibleSet)
+			{
+				item->allVisibleSet = true;
+				item->allVisible = VM_ALL_VISIBLE(scan->heapRelation,
+												  ItemPointerGetBlockNumber(tid),
+												  &so->vmBuffer);
+			}
+
+			/* don't prefetch this all-visible block, try the next one */
+			if (item->allVisible)
+				block = InvalidBlockNumber;
+		}
+
+		/* advance to the next item, assuming the current scan direction */
+		so->queueStream++;
+
+		/* don't return the same block twice (and remember this one) */
+		if (so->lastBlock == block)
+			block = InvalidBlockNumber;
+
+		/* Did we find a valid block? If yes, we're done. */
+		if (block != InvalidBlockNumber)
+			break;
+	}
+
+	/* remember the block we're returning */
+	so->lastBlock = block;
+
+	return block;
+}
+
 IndexScanDesc
 gistbeginscan(Relation heap, Relation index, int nkeys, int norderbys)
 {
@@ -110,6 +282,14 @@ gistbeginscan(Relation heap, Relation index, int nkeys, int norderbys)
 	so->numKilled = 0;
 	so->curBlkno = InvalidBlockNumber;
 	so->curPageLSN = InvalidXLogRecPtr;
+	so->vmBuffer = InvalidBuffer;
+
+	/* initialize small prefetch queue */
+	so->queueUsed = 0;
+	so->queueItem = 0;
+
+	/* nothing returned */
+	so->lastBlock = InvalidBlockNumber;
 
 	scan->opaque = so;
 
@@ -120,6 +300,31 @@ gistbeginscan(Relation heap, Relation index, int nkeys, int norderbys)
 
 	MemoryContextSwitchTo(oldCxt);
 
+	/*
+	 * Initialize the read stream to opt-in into prefetching.
+	 *
+	 * XXX See the comments in btbeginscan().
+	 */
+	if (enable_indexscan_prefetch && heap)
+	{
+		if (scan->numberOfOrderBys == 0)
+			scan->xs_rs = read_stream_begin_relation(READ_STREAM_DEFAULT,
+													 NULL,
+													 heap,
+													 MAIN_FORKNUM,
+													 gist_stream_read_next,
+													 scan,
+													 0);
+		else
+			scan->xs_rs = read_stream_begin_relation(READ_STREAM_DEFAULT,
+													 NULL,
+													 heap,
+													 MAIN_FORKNUM,
+													 gist_ordered_stream_read_next,
+													 scan,
+													 0);
+	}
+
 	return scan;
 }
 
@@ -341,6 +546,15 @@ gistrescan(IndexScanDesc scan, ScanKey key, int nkeys,
 
 	/* any previous xs_hitup will have been pfree'd in context resets above */
 	scan->xs_hitup = NULL;
+
+	/* reset stream */
+	if (scan->xs_rs)
+	{
+		so->streamPageData = -1;
+		so->lastBlock = InvalidBlockNumber;
+		read_stream_reset(scan->xs_rs);
+		so->queueItem = so->queueUsed = so->queueStream = 0;
+	}
 }
 
 void
@@ -348,9 +562,20 @@ gistendscan(IndexScanDesc scan)
 {
 	GISTScanOpaque so = (GISTScanOpaque) scan->opaque;
 
+	/* needs to happen before freeGISTstate */
+	if (so->vmBuffer != InvalidBuffer)
+	{
+		ReleaseBuffer(so->vmBuffer);
+		so->vmBuffer = InvalidBuffer;
+	}
+
 	/*
 	 * freeGISTstate is enough to clean up everything made by gistbeginscan,
 	 * as well as the queueCxt if there is a separate context for it.
 	 */
 	freeGISTstate(so->giststate);
+
+	/* reset stream */
+	if (scan->xs_rs)
+		read_stream_end(scan->xs_rs);
 }
diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h
index 39404ec7cdb..924bbae22e2 100644
--- a/src/include/access/gist_private.h
+++ b/src/include/access/gist_private.h
@@ -17,6 +17,7 @@
 #include "access/amapi.h"
 #include "access/gist.h"
 #include "access/itup.h"
+#include "access/visibilitymap.h"
 #include "lib/pairingheap.h"
 #include "storage/bufmgr.h"
 #include "storage/buffile.h"
@@ -124,6 +125,10 @@ typedef struct GISTSearchHeapItem
 								 * index-only scans */
 	OffsetNumber offnum;		/* track offset in page to mark tuple as
 								 * LP_DEAD */
+	bool		allVisible;
+	bool		allVisibleSet;
+	Datum	   *orderbyvals;
+	bool	   *orderbynulls;
 } GISTSearchHeapItem;
 
 /* Unvisited item, either index page or heap tuple */
@@ -169,13 +174,24 @@ typedef struct GISTScanOpaqueData
 	int			numKilled;		/* number of currently stored items */
 	BlockNumber curBlkno;		/* current number of block */
 	GistNSN		curPageLSN;		/* pos in the WAL stream when page was read */
+	Buffer		vmBuffer;
 
 	/* In a non-ordered search, returnable heap items are stored here: */
 	GISTSearchHeapItem pageData[BLCKSZ / sizeof(IndexTupleData)];
 	OffsetNumber nPageData;		/* number of valid items in array */
 	OffsetNumber curPageData;	/* next item to return */
+	OffsetNumber streamPageData;	/* next item to queue */
 	MemoryContext pageDataCxt;	/* context holding the fetched tuples, for
 								 * index-only scans */
+
+	/* last block returned by the read_next stream callback */
+	BlockNumber	lastBlock;
+
+	/* queue to allow prefetching with ordered scans */
+	GISTSearchHeapItem	queueItems[32];
+	int					queueItem;
+	int					queueStream;
+	int					queueUsed;
 } GISTScanOpaqueData;
 
 typedef GISTScanOpaqueData *GISTScanOpaque;
-- 
2.50.0

