From 425ccad723c34ef8eeeb1d6d80400b1705c4ec2a Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas@vondra.me>
Date: Fri, 25 Apr 2025 12:34:15 +0200
Subject: [PATCH v20250709 3/6] prefetch for hash indexes

Implements the hash_stream_read_next() callback, returning blocks from
HashScanOpaque.
---
 src/backend/access/hash/hash.c       | 105 +++++++++++++++++++++++++++
 src/backend/access/hash/hashsearch.c |  37 ++++++++++
 src/include/access/hash.h            |   8 ++
 3 files changed, 150 insertions(+)

diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c
index 2133e454e9b..0884f0e05d9 100644
--- a/src/backend/access/hash/hash.c
+++ b/src/backend/access/hash/hash.c
@@ -22,12 +22,14 @@
 #include "access/hash_xlog.h"
 #include "access/relscan.h"
 #include "access/stratnum.h"
+#include "access/table.h"
 #include "access/tableam.h"
 #include "access/xloginsert.h"
 #include "commands/progress.h"
 #include "commands/vacuum.h"
 #include "miscadmin.h"
 #include "nodes/execnodes.h"
+#include "optimizer/cost.h"
 #include "optimizer/plancat.h"
 #include "pgstat.h"
 #include "utils/fmgrprotos.h"
@@ -366,6 +368,78 @@ hashgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
 	return ntids;
 }
 
+/*
+ * hash_stream_read_next
+ *		Return the next block to read from the read stream.
+ *
+ * Returns the next block from the current leaf page. The first block is
+ * when accessing the first tuple, after already receiving the TID from the
+ * index (for the item itemIndex points at).
+ *
+ * Returns the block number to get from the read stream. InvalidBlockNumber
+ * means we've ran out of item on the current leaf page - the stream will
+ * end, and we'll need to reset it after reading the next page (or after
+ * changing the scan direction).
+ *
+ * XXX Should skip duplicate blocks (for correlated indexes). But that's
+ * not implemented yet.
+ */
+static BlockNumber
+hash_stream_read_next(ReadStream *stream,
+					  void *callback_private_data,
+					  void *per_buffer_data)
+{
+	IndexScanDesc	scan = (IndexScanDesc) callback_private_data;
+	HashScanOpaque	so = (HashScanOpaque) scan->opaque;
+	BlockNumber		block = InvalidBlockNumber;
+
+	/*
+	 * Is this the first request for the read stream (possibly after a reset)?
+	 * If yes, initialize the stream to the current item (itemIndex).
+	 */
+	if (so->currPos.streamIndex == -1)
+		so->currPos.streamIndex = so->currPos.itemIndex;
+
+	/*
+	 * Find the next block to read. For plain index scans we will return the
+	 * very next item, but we might also skip duplicate blocks (in the future).
+	 */
+	while ((so->currPos.streamIndex >= so->currPos.firstItem) &&
+		   (so->currPos.streamIndex <= so->currPos.lastItem))
+	{
+		ItemPointer		tid;
+		HashScanPosItem  *item;
+
+		item = &so->currPos.items[so->currPos.streamIndex];
+
+		tid = &item->heapTid;
+		block = ItemPointerGetBlockNumber(tid);
+
+		/* advance to the next item, depending on scan direction */
+		if (ScanDirectionIsForward(so->currPos.dir))
+		{
+			so->currPos.streamIndex++;
+		}
+		else
+		{
+			so->currPos.streamIndex--;
+		}
+
+		/* don't return the same block twice (and remember this one) */
+		if (so->lastBlock == block)
+			block = InvalidBlockNumber;
+
+		/* Did we find a valid block? If yes, we're done. */
+		if (block != InvalidBlockNumber)
+			break;
+	}
+
+	/* remember the block we're returning */
+	so->lastBlock = block;
+
+	return block;
+}
+
 
 /*
  *	hashbeginscan() -- start a scan on a hash index
@@ -394,6 +468,25 @@ hashbeginscan(Relation heap, Relation index, int nkeys, int norderbys)
 
 	scan->opaque = so;
 
+	/* nothing returned */
+	so->lastBlock = InvalidBlockNumber;
+
+	/*
+	 * Initialize the read stream, to opt-in into prefetching.
+	 *
+	 * XXX See comments in btbeginscan().
+	 */
+	if (enable_indexscan_prefetch && heap)
+	{
+		scan->xs_rs = read_stream_begin_relation(READ_STREAM_DEFAULT,
+												 NULL,
+												 heap,
+												 MAIN_FORKNUM,
+												 hash_stream_read_next,
+												 scan,
+												 0);
+	}
+
 	return scan;
 }
 
@@ -425,6 +518,14 @@ hashrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
 
 	so->hashso_buc_populated = false;
 	so->hashso_buc_split = false;
+
+	/* reset the stream, to start over */
+	if (scan->xs_rs)
+	{
+		so->currPos.streamIndex = -1;
+		so->lastBlock = InvalidBlockNumber;
+		read_stream_reset(scan->xs_rs);
+	}
 }
 
 /*
@@ -449,6 +550,10 @@ hashendscan(IndexScanDesc scan)
 		pfree(so->killedItems);
 	pfree(so);
 	scan->opaque = NULL;
+
+	/* terminate read stream */
+	if (scan->xs_rs)
+		read_stream_end(scan->xs_rs);
 }
 
 /*
diff --git a/src/backend/access/hash/hashsearch.c b/src/backend/access/hash/hashsearch.c
index 92c15a65be2..d5b045deb8c 100644
--- a/src/backend/access/hash/hashsearch.c
+++ b/src/backend/access/hash/hashsearch.c
@@ -54,6 +54,28 @@ _hash_next(IndexScanDesc scan, ScanDirection dir)
 	Buffer		buf;
 	bool		end_of_scan = false;
 
+	/*
+	 * We need to reset the read stream when the scan direction changes. Hash
+	 * indexes are not ordered, but there's still scrollable cursors, and those
+	 * do have irection. So handle that here, and also remember the direction,
+	 * so that the read_next callback can consider that.
+	 *
+	 * XXX we can't do that in the read_next callback, because we might have
+	 * already hit the end of the stream (returned InvalidBlockNumber), in
+	 * which case the callback won't be called.
+	 */
+	if (so->currPos.dir != dir)
+	{
+		so->currPos.dir = dir;
+
+		if (scan->xs_rs)
+		{
+			so->currPos.streamIndex = -1;
+			so->lastBlock = InvalidBlockNumber;
+			read_stream_reset(scan->xs_rs);
+		}
+	}
+
 	/*
 	 * Advance to the next tuple on the current page; or if done, try to read
 	 * data from the next or previous page based on the scan direction. Before
@@ -592,6 +614,21 @@ _hash_readpage(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
 		so->currPos.buf = InvalidBuffer;
 	}
 
+	/*
+	 * restart the stream for this page
+	 *
+	 * XXX Maybe we should not reset prefetch distance to 0, but start from
+	 * a somewhat higher value. We're merely continuing the same scan as
+	 * before ... maybe reduce it a bit, to not harm LIMIT queries, but not
+	 * reset it all the way to 0.
+	 */
+	if (scan->xs_rs)
+	{
+		so->currPos.streamIndex = - 1;
+		so->lastBlock = InvalidBlockNumber;
+		read_stream_reset(scan->xs_rs);
+	}
+
 	Assert(so->currPos.firstItem <= so->currPos.lastItem);
 	return true;
 }
diff --git a/src/include/access/hash.h b/src/include/access/hash.h
index 6befa3ebf60..3916cf746c6 100644
--- a/src/include/access/hash.h
+++ b/src/include/access/hash.h
@@ -113,6 +113,9 @@ typedef struct HashScanPosData
 	BlockNumber nextPage;		/* next overflow page */
 	BlockNumber prevPage;		/* prev overflow or bucket page */
 
+	/* scan direction for the saved position's call to _hash_next */
+	ScanDirection dir;
+
 	/*
 	 * The items array is always ordered in index order (ie, increasing
 	 * indexoffset).  When scanning backwards it is convenient to fill the
@@ -123,6 +126,7 @@ typedef struct HashScanPosData
 	int			firstItem;		/* first valid index in items[] */
 	int			lastItem;		/* last valid index in items[] */
 	int			itemIndex;		/* current index in items[] */
+	int			streamIndex;	/* position of the read stream */
 
 	HashScanPosItem items[MaxIndexTuplesPerPage];	/* MUST BE LAST */
 } HashScanPosData;
@@ -150,6 +154,7 @@ typedef struct HashScanPosData
 		(scanpos).firstItem = 0; \
 		(scanpos).lastItem = 0; \
 		(scanpos).itemIndex = 0; \
+		(scanpos).dir = NoMovementScanDirection; \
 	} while (0)
 
 /*
@@ -182,6 +187,9 @@ typedef struct HashScanOpaqueData
 	int		   *killedItems;	/* currPos.items indexes of killed items */
 	int			numKilled;		/* number of currently stored items */
 
+	/* last block returned by the read_next stream callback */
+	BlockNumber	lastBlock;
+
 	/*
 	 * Identify all the matching items on a page and save them in
 	 * HashScanPosData
-- 
2.50.0

