From fb7740a535532673e50b87014a0b77e565c46af8 Mon Sep 17 00:00:00 2001
From: Melanie Plageman <melanieplageman@gmail.com>
Date: Fri, 22 Mar 2024 16:51:40 -0400
Subject: [PATCH v10 17/17] BitmapHeapScan uses streaming read API

Remove all of the code to do prefetching from BitmapHeapScan code and
rely on the streaming read API prefetching. Heap table AM implements a
streaming read callback which uses the iterator to get the next valid
block that needs to be fetched for the streaming read API.

ci-os-only:
---
 src/backend/access/heap/heapam.c          |  96 ++++--
 src/backend/access/heap/heapam_handler.c  | 347 +++-------------------
 src/backend/executor/nodeBitmapHeapscan.c |  43 +--
 src/include/access/heapam.h               |  21 +-
 src/include/access/relscan.h              |   6 -
 src/include/access/tableam.h              |  14 -
 src/include/nodes/execnodes.h             |   9 +-
 7 files changed, 114 insertions(+), 422 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 8de1a11164..5421b552d9 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -108,6 +108,8 @@ static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup);
 static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
 										bool *copy);
 
+static BlockNumber bitmapheap_pgsr_next(PgStreamingRead *pgsr, void *pgsr_private,
+										void *per_buffer_data);
 
 /*
  * Each tuple lock mode has a corresponding heavyweight lock, and one or two
@@ -330,6 +332,22 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
 	if (key != NULL && scan->rs_base.rs_nkeys > 0)
 		memcpy(scan->rs_base.rs_key, key, scan->rs_base.rs_nkeys * sizeof(ScanKeyData));
 
+	if (scan->rs_base.rs_flags & SO_TYPE_BITMAPSCAN)
+	{
+		if (scan->rs_pgsr)
+			pg_streaming_read_free(scan->rs_pgsr);
+
+		scan->rs_pgsr = pg_streaming_read_buffer_alloc(PGSR_FLAG_DEFAULT,
+													   scan,
+													   sizeof(TBMIterateResult),
+													   scan->rs_strategy,
+													   BMR_REL(scan->rs_base.rs_rd),
+													   MAIN_FORKNUM,
+													   bitmapheap_pgsr_next);
+
+
+	}
+
 	/*
 	 * Currently, we only have a stats counter for sequential heap scans (but
 	 * e.g for bitmap scans the underlying bitmap index scans will be counted,
@@ -950,16 +968,9 @@ heap_beginscan(Relation relation, Snapshot snapshot,
 	scan->rs_base.rs_flags = flags;
 	scan->rs_base.rs_parallel = parallel_scan;
 	scan->rs_strategy = NULL;	/* set in initscan */
-
-	scan->rs_base.blockno = InvalidBlockNumber;
-
+	scan->rs_pgsr = NULL;
 	scan->rs_vmbuffer = InvalidBuffer;
 	scan->rs_empty_tuples_pending = 0;
-	scan->pvmbuffer = InvalidBuffer;
-
-	scan->pfblockno = InvalidBlockNumber;
-	scan->prefetch_target = -1;
-	scan->prefetch_pages = 0;
 
 	/*
 	 * Disable page-at-a-time mode if it's not a MVCC-safe snapshot.
@@ -1042,12 +1053,6 @@ heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params,
 			scan->rs_base.rs_flags &= ~SO_ALLOW_PAGEMODE;
 	}
 
-	scan->rs_base.blockno = InvalidBlockNumber;
-
-	scan->pfblockno = InvalidBlockNumber;
-	scan->prefetch_target = -1;
-	scan->prefetch_pages = 0;
-
 	/*
 	 * unpin scan buffers
 	 */
@@ -1060,12 +1065,6 @@ heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params,
 		scan->rs_vmbuffer = InvalidBuffer;
 	}
 
-	if (BufferIsValid(scan->pvmbuffer))
-	{
-		ReleaseBuffer(scan->pvmbuffer);
-		scan->pvmbuffer = InvalidBuffer;
-	}
-
 	/*
 	 * reinitialize scan descriptor
 	 */
@@ -1091,12 +1090,6 @@ heap_endscan(TableScanDesc sscan)
 		scan->rs_vmbuffer = InvalidBuffer;
 	}
 
-	if (BufferIsValid(scan->pvmbuffer))
-	{
-		ReleaseBuffer(scan->pvmbuffer);
-		scan->pvmbuffer = InvalidBuffer;
-	}
-
 	/*
 	 * decrement relation reference count and free scan descriptor storage
 	 */
@@ -1114,6 +1107,9 @@ heap_endscan(TableScanDesc sscan)
 	if (scan->rs_base.rs_flags & SO_TEMP_SNAPSHOT)
 		UnregisterSnapshot(scan->rs_base.rs_snapshot);
 
+	if (scan->rs_base.rs_flags & SO_TYPE_BITMAPSCAN && scan->rs_pgsr)
+		pg_streaming_read_free(scan->rs_pgsr);
+
 	pfree(scan);
 }
 
@@ -10025,3 +10021,51 @@ HeapCheckForSerializableConflictOut(bool visible, Relation relation,
 
 	CheckForSerializableConflictOut(relation, xid, snapshot);
 }
+
+static BlockNumber
+bitmapheap_pgsr_next(PgStreamingRead *pgsr, void *pgsr_private,
+					 void *per_buffer_data)
+{
+	TBMIterateResult *tbmres = per_buffer_data;
+	HeapScanDesc hdesc = (HeapScanDesc) pgsr_private;
+
+	for (;;)
+	{
+		CHECK_FOR_INTERRUPTS();
+
+		bhs_iterate(hdesc->rs_base.rs_bhs_iterator, tbmres);
+
+		/* no more entries in the bitmap */
+		if (!BlockNumberIsValid(tbmres->blockno))
+			return InvalidBlockNumber;
+
+		/*
+		 * Ignore any claimed entries past what we think is the end of the
+		 * relation. It may have been extended after the start of our scan (we
+		 * only hold an AccessShareLock, and it could be inserts from this
+		 * backend).  We don't take this optimization in SERIALIZABLE
+		 * isolation though, as we need to examine all invisible tuples
+		 * reachable by the index.
+		 */
+		if (!IsolationIsSerializable() && tbmres->blockno >= hdesc->rs_nblocks)
+			continue;
+
+		/*
+		 * We can skip fetching the heap page if we don't need any fields from
+		 * the heap, the bitmap entries don't need rechecking, and all tuples
+		 * on the page are visible to our transaction.
+		 */
+		if (!(hdesc->rs_base.rs_flags & SO_NEED_TUPLE) &&
+			!tbmres->recheck &&
+			VM_ALL_VISIBLE(hdesc->rs_base.rs_rd, tbmres->blockno, &hdesc->rs_vmbuffer))
+		{
+			hdesc->rs_empty_tuples_pending += tbmres->ntuples;
+			continue;
+		}
+
+		return tbmres->blockno;
+	}
+
+	/* not reachable */
+	Assert(false);
+}
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 6af1791faa..fe9ee5976f 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -55,9 +55,6 @@ static bool SampleHeapTupleVisible(TableScanDesc scan, Buffer buffer,
 								   OffsetNumber tupoffset);
 
 static BlockNumber heapam_scan_get_blocks_done(HeapScanDesc hscan);
-static inline void BitmapAdjustPrefetchIterator(HeapScanDesc scan);
-static inline void BitmapAdjustPrefetchTarget(HeapScanDesc scan);
-static inline void BitmapPrefetch(HeapScanDesc scan);
 
 static const TableAmRoutine heapam_methods;
 
@@ -2109,147 +2106,68 @@ heapam_estimate_rel_size(Relation rel, int32 *attr_widths,
 									   HEAP_USABLE_BYTES_PER_PAGE);
 }
 
-/*
- *	BitmapAdjustPrefetchIterator - Adjust the prefetch iterator
- *
- *	We keep track of how far the prefetch iterator is ahead of the main
- *	iterator in prefetch_pages. For each block the main iterator returns, we
- *	decrement prefetch_pages.
- */
-static inline void
-BitmapAdjustPrefetchIterator(HeapScanDesc scan)
-{
-#ifdef USE_PREFETCH
-	BitmapHeapIterator *prefetch_iterator = scan->rs_base.rs_pf_bhs_iterator;
-	ParallelBitmapHeapState *pstate = scan->rs_base.bm_parallel;
-	TBMIterateResult tbmpre;
-
-	if (pstate == NULL)
-	{
-		if (scan->prefetch_pages > 0)
-		{
-			/* The main iterator has closed the distance by one page */
-			scan->prefetch_pages--;
-		}
-		else if (prefetch_iterator)
-		{
-			/* Do not let the prefetch iterator get behind the main one */
-			bhs_iterate(prefetch_iterator, &tbmpre);
-			scan->pfblockno = tbmpre.blockno;
-		}
-		return;
-	}
-
-	/*
-	 * Adjusting the prefetch iterator before invoking
-	 * heapam_bitmap_next_block() keeps prefetch distance higher across the
-	 * parallel workers.
-	 */
-	if (scan->rs_base.prefetch_maximum > 0)
-	{
-		SpinLockAcquire(&pstate->mutex);
-		if (pstate->prefetch_pages > 0)
-		{
-			pstate->prefetch_pages--;
-			SpinLockRelease(&pstate->mutex);
-		}
-		else
-		{
-			/* Release the mutex before iterating */
-			SpinLockRelease(&pstate->mutex);
-
-			/*
-			 * In case of shared mode, we can not ensure that the current
-			 * blockno of the main iterator and that of the prefetch iterator
-			 * are same.  It's possible that whatever blockno we are
-			 * prefetching will be processed by another process.  Therefore,
-			 * we don't validate the blockno here as we do in non-parallel
-			 * case.
-			 */
-			if (prefetch_iterator)
-			{
-				bhs_iterate(prefetch_iterator, &tbmpre);
-				scan->pfblockno = tbmpre.blockno;
-			}
-		}
-	}
-#endif							/* USE_PREFETCH */
-}
 
 static bool
-heapam_scan_bitmap_next_block(TableScanDesc scan,
-							  bool *recheck, BlockNumber *blockno,
+heapam_scan_bitmap_next_block(TableScanDesc scan, bool *recheck,
 							  long *lossy_pages, long *exact_pages)
 {
 	HeapScanDesc hscan = (HeapScanDesc) scan;
+	void	   *io_private;
 	BlockNumber block;
 	Buffer		buffer;
 	Snapshot	snapshot;
 	int			ntup;
-	TBMIterateResult tbmres;
+	TBMIterateResult *tbmres;
+
+	Assert(hscan->rs_pgsr);
 
 	hscan->rs_cindex = 0;
 	hscan->rs_ntuples = 0;
 
-	*blockno = InvalidBlockNumber;
 	*recheck = true;
 
-	BitmapAdjustPrefetchIterator(hscan);
-
-	do
+	/* Release buffer containing previous block. */
+	if (BufferIsValid(hscan->rs_cbuf))
 	{
-		CHECK_FOR_INTERRUPTS();
+		ReleaseBuffer(hscan->rs_cbuf);
+		hscan->rs_cbuf = InvalidBuffer;
+	}
 
-		bhs_iterate(scan->rs_bhs_iterator, &tbmres);
+	hscan->rs_cbuf = pg_streaming_read_buffer_get_next(hscan->rs_pgsr, &io_private);
 
-		if (!BlockNumberIsValid(tbmres.blockno))
+	if (BufferIsInvalid(hscan->rs_cbuf))
+	{
+		if (BufferIsValid(hscan->rs_vmbuffer))
 		{
-			/* no more entries in the bitmap */
-			Assert(hscan->rs_empty_tuples_pending == 0);
-			return false;
+			ReleaseBuffer(hscan->rs_vmbuffer);
+			hscan->rs_vmbuffer = InvalidBuffer;
 		}
 
 		/*
-		 * Ignore any claimed entries past what we think is the end of the
-		 * relation. It may have been extended after the start of our scan (we
-		 * only hold an AccessShareLock, and it could be inserts from this
-		 * backend).  We don't take this optimization in SERIALIZABLE
-		 * isolation though, as we need to examine all invisible tuples
-		 * reachable by the index.
+		 * Bitmap is exhausted. Time to emit empty tuples if relevant. We emit
+		 * all empty tuples at the end instead of emitting them per block we
+		 * skip fetching. This is necessary because the streaming read API
+		 * will only return TBMIterateResults for blocks actually fetched.
+		 * When we skip fetching a block, we keep track of how many empty
+		 * tuples to emit at the end of the BitmapHeapScan. We do not recheck
+		 * all NULL tuples.
 		 */
-	} while (!IsolationIsSerializable() && tbmres.blockno >= hscan->rs_nblocks);
+		*recheck = false;
+		return hscan->rs_empty_tuples_pending > 0;
+	}
 
-	/* Got a valid block */
-	*blockno = tbmres.blockno;
-	*recheck = tbmres.recheck;
+	Assert(io_private);
 
-	/*
-	 * We can skip fetching the heap page if we don't need any fields from the
-	 * heap, the bitmap entries don't need rechecking, and all tuples on the
-	 * page are visible to our transaction.
-	 */
-	if (!(scan->rs_flags & SO_NEED_TUPLE) &&
-		!tbmres.recheck &&
-		VM_ALL_VISIBLE(scan->rs_rd, tbmres.blockno, &hscan->rs_vmbuffer))
-	{
-		/* can't be lossy in the skip_fetch case */
-		Assert(tbmres.ntuples >= 0);
-		Assert(hscan->rs_empty_tuples_pending >= 0);
+	tbmres = io_private;
 
-		hscan->rs_empty_tuples_pending += tbmres.ntuples;
+	Assert(BufferGetBlockNumber(hscan->rs_cbuf) == tbmres->blockno);
 
-		return true;
-	}
+	*recheck = tbmres->recheck;
 
-	block = tbmres.blockno;
+	hscan->rs_cblock = tbmres->blockno;
+	hscan->rs_ntuples = tbmres->ntuples;
 
-	/*
-	 * Acquire pin on the target heap page, trading in any pin we held before.
-	 */
-	hscan->rs_cbuf = ReleaseAndReadBuffer(hscan->rs_cbuf,
-										  scan->rs_rd,
-										  block);
-	hscan->rs_cblock = block;
+	block = tbmres->blockno;
 	buffer = hscan->rs_cbuf;
 	snapshot = scan->rs_snapshot;
 
@@ -2270,7 +2188,7 @@ heapam_scan_bitmap_next_block(TableScanDesc scan,
 	/*
 	 * We need two separate strategies for lossy and non-lossy cases.
 	 */
-	if (tbmres.ntuples >= 0)
+	if (tbmres->ntuples >= 0)
 	{
 		/*
 		 * Bitmap is non-lossy, so we just look through the offsets listed in
@@ -2279,9 +2197,9 @@ heapam_scan_bitmap_next_block(TableScanDesc scan,
 		 */
 		int			curslot;
 
-		for (curslot = 0; curslot < tbmres.ntuples; curslot++)
+		for (curslot = 0; curslot < tbmres->ntuples; curslot++)
 		{
-			OffsetNumber offnum = tbmres.offsets[curslot];
+			OffsetNumber offnum = tbmres->offsets[curslot];
 			ItemPointerData tid;
 			HeapTupleData heapTuple;
 
@@ -2331,23 +2249,11 @@ heapam_scan_bitmap_next_block(TableScanDesc scan,
 	Assert(ntup <= MaxHeapTuplesPerPage);
 	hscan->rs_ntuples = ntup;
 
-	if (tbmres.ntuples < 0)
+	if (tbmres->ntuples < 0)
 		(*lossy_pages)++;
 	else
 		(*exact_pages)++;
 
-	/*
-	 * If serial, we can error out if the the prefetch block doesn't stay
-	 * ahead of the current block.
-	 */
-	if (scan->bm_parallel == NULL &&
-		scan->rs_pf_bhs_iterator &&
-		hscan->pfblockno > hscan->rs_base.blockno)
-		elog(ERROR, "prefetch and main iterators are out of sync");
-
-	/* Adjust the prefetch target */
-	BitmapAdjustPrefetchTarget(hscan);
-
 	/*
 	 * Return true to indicate that a valid block was found and the bitmap is
 	 * not exhausted. If there are no visible tuples on this page,
@@ -2358,153 +2264,6 @@ heapam_scan_bitmap_next_block(TableScanDesc scan,
 	return true;
 }
 
-/*
- * BitmapAdjustPrefetchTarget - Adjust the prefetch target
- *
- * Increase prefetch target if it's not yet at the max.  Note that
- * we will increase it to zero after fetching the very first
- * page/tuple, then to one after the second tuple is fetched, then
- * it doubles as later pages are fetched.
- */
-static inline void
-BitmapAdjustPrefetchTarget(HeapScanDesc scan)
-{
-#ifdef USE_PREFETCH
-	ParallelBitmapHeapState *pstate = scan->rs_base.bm_parallel;
-	int			prefetch_maximum = scan->rs_base.prefetch_maximum;
-
-	if (pstate == NULL)
-	{
-		if (scan->prefetch_target >= prefetch_maximum)
-			 /* don't increase any further */ ;
-		else if (scan->prefetch_target >= prefetch_maximum / 2)
-			scan->prefetch_target = prefetch_maximum;
-		else if (scan->prefetch_target > 0)
-			scan->prefetch_target *= 2;
-		else
-			scan->prefetch_target++;
-		return;
-	}
-
-	/* Do an unlocked check first to save spinlock acquisitions. */
-	if (pstate->prefetch_target < prefetch_maximum)
-	{
-		SpinLockAcquire(&pstate->mutex);
-		if (pstate->prefetch_target >= prefetch_maximum)
-			 /* don't increase any further */ ;
-		else if (pstate->prefetch_target >= prefetch_maximum / 2)
-			pstate->prefetch_target = prefetch_maximum;
-		else if (pstate->prefetch_target > 0)
-			pstate->prefetch_target *= 2;
-		else
-			pstate->prefetch_target++;
-		SpinLockRelease(&pstate->mutex);
-	}
-#endif							/* USE_PREFETCH */
-}
-
-
-/*
- * BitmapPrefetch - Prefetch, if prefetch_pages are behind prefetch_target
- */
-static inline void
-BitmapPrefetch(HeapScanDesc scan)
-{
-#ifdef USE_PREFETCH
-	ParallelBitmapHeapState *pstate = scan->rs_base.bm_parallel;
-	BitmapHeapIterator *prefetch_iterator = scan->rs_base.rs_pf_bhs_iterator;
-
-	if (pstate == NULL)
-	{
-		if (prefetch_iterator)
-		{
-			while (scan->prefetch_pages < scan->prefetch_target)
-			{
-				TBMIterateResult tbmpre;
-				bool		skip_fetch;
-
-				bhs_iterate(prefetch_iterator, &tbmpre);
-
-				if (!BlockNumberIsValid(tbmpre.blockno))
-				{
-					/* No more pages to prefetch */
-					bhs_end_iterate(prefetch_iterator);
-					scan->rs_base.rs_pf_bhs_iterator = NULL;
-					break;
-				}
-				scan->prefetch_pages++;
-				scan->pfblockno = tbmpre.blockno;
-
-				/*
-				 * If we expect not to have to actually read this heap page,
-				 * skip this prefetch call, but continue to run the prefetch
-				 * logic normally.  (Would it be better not to increment
-				 * prefetch_pages?)
-				 */
-				skip_fetch = (!(scan->rs_base.rs_flags & SO_NEED_TUPLE) &&
-							  !tbmpre.recheck &&
-							  VM_ALL_VISIBLE(scan->rs_base.rs_rd,
-											 tbmpre.blockno,
-											 &scan->pvmbuffer));
-
-				if (!skip_fetch)
-					PrefetchBuffer(scan->rs_base.rs_rd, MAIN_FORKNUM, tbmpre.blockno);
-			}
-		}
-
-		return;
-	}
-
-	if (pstate->prefetch_pages < pstate->prefetch_target)
-	{
-		if (prefetch_iterator)
-		{
-			while (1)
-			{
-				TBMIterateResult tbmpre;
-				bool		do_prefetch = false;
-				bool		skip_fetch;
-
-				/*
-				 * Recheck under the mutex. If some other process has already
-				 * done enough prefetching then we need not to do anything.
-				 */
-				SpinLockAcquire(&pstate->mutex);
-				if (pstate->prefetch_pages < pstate->prefetch_target)
-				{
-					pstate->prefetch_pages++;
-					do_prefetch = true;
-				}
-				SpinLockRelease(&pstate->mutex);
-
-				if (!do_prefetch)
-					return;
-
-				bhs_iterate(prefetch_iterator, &tbmpre);
-				if (!BlockNumberIsValid(tbmpre.blockno))
-				{
-					/* No more pages to prefetch */
-					bhs_end_iterate(prefetch_iterator);
-					scan->rs_base.rs_pf_bhs_iterator = NULL;
-					break;
-				}
-
-				scan->pfblockno = tbmpre.blockno;
-
-				/* As above, skip prefetch if we expect not to need page */
-				skip_fetch = (!(scan->rs_base.rs_flags & SO_NEED_TUPLE) &&
-							  !tbmpre.recheck &&
-							  VM_ALL_VISIBLE(scan->rs_base.rs_rd,
-											 tbmpre.blockno,
-											 &scan->pvmbuffer));
-
-				if (!skip_fetch)
-					PrefetchBuffer(scan->rs_base.rs_rd, MAIN_FORKNUM, tbmpre.blockno);
-			}
-		}
-	}
-#endif							/* USE_PREFETCH */
-}
 
 /* ------------------------------------------------------------------------
  * Executor related callbacks for the heap AM
@@ -2539,41 +2298,11 @@ heapam_scan_bitmap_next_tuple(TableScanDesc scan,
 			return true;
 		}
 
-		if (!heapam_scan_bitmap_next_block(scan, recheck, &scan->blockno,
+		if (!heapam_scan_bitmap_next_block(scan, recheck,
 										   lossy_pages, exact_pages))
 			return false;
 	}
 
-#ifdef USE_PREFETCH
-
-	/*
-	 * Try to prefetch at least a few pages even before we get to the second
-	 * page if we don't stop reading after the first tuple.
-	 */
-	if (!scan->bm_parallel)
-	{
-		if (hscan->prefetch_target < scan->prefetch_maximum)
-			hscan->prefetch_target++;
-	}
-	else if (scan->bm_parallel->prefetch_target < scan->prefetch_maximum)
-	{
-		/* take spinlock while updating shared state */
-		SpinLockAcquire(&scan->bm_parallel->mutex);
-		if (scan->bm_parallel->prefetch_target < scan->prefetch_maximum)
-			scan->bm_parallel->prefetch_target++;
-		SpinLockRelease(&scan->bm_parallel->mutex);
-	}
-
-	/*
-	 * We issue prefetch requests *after* fetching the current page to try to
-	 * avoid having prefetching interfere with the main I/O. Also, this should
-	 * happen only when we have determined there is still something to do on
-	 * the current page, else we may uselessly prefetch the same page we are
-	 * just about to request for real.
-	 */
-	BitmapPrefetch(hscan);
-#endif							/* USE_PREFETCH */
-
 	targoffset = hscan->rs_vistuples[hscan->rs_cindex];
 	page = BufferGetPage(hscan->rs_cbuf);
 	lp = PageGetItemId(page, targoffset);
diff --git a/src/backend/executor/nodeBitmapHeapscan.c b/src/backend/executor/nodeBitmapHeapscan.c
index 2f9387e51a..f2662ea542 100644
--- a/src/backend/executor/nodeBitmapHeapscan.c
+++ b/src/backend/executor/nodeBitmapHeapscan.c
@@ -131,14 +131,6 @@ BitmapHeapNext(BitmapHeapScanState *node)
 	/*
 	 * If we haven't yet performed the underlying index scan, do it, and begin
 	 * the iteration over the bitmap.
-	 *
-	 * For prefetching, we use *two* iterators, one for the pages we are
-	 * actually scanning and another that runs ahead of the first for
-	 * prefetching.  node->prefetch_pages tracks exactly how many pages ahead
-	 * the prefetch iterator is.  Also, node->prefetch_target tracks the
-	 * desired prefetch distance, which starts small and increases up to the
-	 * scan->prefetch_maximum.  This is to avoid doing a lot of prefetching in
-	 * a scan that stops after a few tuples because of a LIMIT.
 	 */
 	if (!node->initialized)
 	{
@@ -149,15 +141,6 @@ BitmapHeapNext(BitmapHeapScanState *node)
 		bool		init_shared_state = node->pstate ?
 			BitmapShouldInitializeSharedState(node->pstate) : false;
 
-		/*
-		 * Maximum number of prefetches for the tablespace if configured,
-		 * otherwise the current value of the effective_io_concurrency GUC.
-		 */
-		int			pf_maximum = 0;
-#ifdef USE_PREFETCH
-		pf_maximum = get_tablespace_io_concurrency(node->ss.ss_currentRelation->rd_rel->reltablespace);
-#endif
-
 		if (!node->pstate || init_shared_state)
 		{
 			tbm = (TIDBitmap *) MultiExecProcNode(outerPlanState(node));
@@ -174,13 +157,7 @@ BitmapHeapNext(BitmapHeapScanState *node)
 				 * multiple processes to iterate jointly.
 				 */
 				node->pstate->tbmiterator = tbm_prepare_shared_iterate(tbm);
-#ifdef USE_PREFETCH
-				if (pf_maximum > 0)
-				{
-					node->pstate->prefetch_iterator =
-						tbm_prepare_shared_iterate(tbm);
-				}
-#endif
+
 				/* We have initialized the shared state so wake up others. */
 				BitmapDoneInitializingSharedState(node->pstate);
 			}
@@ -213,22 +190,12 @@ BitmapHeapNext(BitmapHeapScanState *node)
 																	extra_flags);
 		}
 
-		scan->prefetch_maximum = pf_maximum;
 		scan->bm_parallel = node->pstate;
 
 		scan->rs_bhs_iterator = bhs_begin_iterate(tbm,
 												  scan->bm_parallel ? scan->bm_parallel->tbmiterator : InvalidDsaPointer,
 												  dsa);
 
-#ifdef USE_PREFETCH
-		if (scan->prefetch_maximum > 0)
-		{
-			scan->rs_pf_bhs_iterator = bhs_begin_iterate(tbm,
-														 scan->bm_parallel ? scan->bm_parallel->prefetch_iterator : InvalidDsaPointer,
-														 dsa);
-		}
-#endif							/* USE_PREFETCH */
-
 		node->initialized = true;
 	}
 
@@ -525,14 +492,10 @@ ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node,
 		return;
 
 	pstate = shm_toc_allocate(pcxt->toc, sizeof(ParallelBitmapHeapState));
-
 	pstate->tbmiterator = 0;
-	pstate->prefetch_iterator = 0;
 
 	/* Initialize the mutex */
 	SpinLockInit(&pstate->mutex);
-	pstate->prefetch_pages = 0;
-	pstate->prefetch_target = -1;
 	pstate->state = BM_INITIAL;
 
 	ConditionVariableInit(&pstate->cv);
@@ -563,11 +526,7 @@ ExecBitmapHeapReInitializeDSM(BitmapHeapScanState *node,
 	if (DsaPointerIsValid(pstate->tbmiterator))
 		tbm_free_shared_area(dsa, pstate->tbmiterator);
 
-	if (DsaPointerIsValid(pstate->prefetch_iterator))
-		tbm_free_shared_area(dsa, pstate->prefetch_iterator);
-
 	pstate->tbmiterator = InvalidDsaPointer;
-	pstate->prefetch_iterator = InvalidDsaPointer;
 }
 
 /* ----------------------------------------------------------------
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 29fdd55893..1b8ce82c9e 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -26,6 +26,7 @@
 #include "storage/dsm.h"
 #include "storage/lockdefs.h"
 #include "storage/shm_toc.h"
+#include "storage/streaming_read.h"
 #include "utils/relcache.h"
 #include "utils/snapshot.h"
 
@@ -72,6 +73,9 @@ typedef struct HeapScanDescData
 	 */
 	ParallelBlockTableScanWorkerData *rs_parallelworkerdata;
 
+	/* Streaming read control object for scans supporting it */
+	PgStreamingRead *rs_pgsr;
+
 	/*
 	 * These fields are only used for bitmap scans for the "skip fetch"
 	 * optimization. Bitmap scans needing no fields from the heap may skip
@@ -82,23 +86,6 @@ typedef struct HeapScanDescData
 	Buffer		rs_vmbuffer;
 	int			rs_empty_tuples_pending;
 
-	/*
-	 * These fields only used for prefetching in bitmap table scans
-	 */
-
-	/* buffer for visibility-map lookups of prefetched pages */
-	Buffer		pvmbuffer;
-
-	/*
-	 * These fields only used in serial BHS
-	 */
-	/* Current target for prefetch distance */
-	int			prefetch_target;
-	/* # pages prefetch iterator is ahead of current */
-	int			prefetch_pages;
-	/* used to validate prefetch block stays ahead of current block  */
-	BlockNumber pfblockno;
-
 	/* these fields only used in page-at-a-time mode and for bitmap scans */
 	int			rs_cindex;		/* current tuple's index in vistuples */
 	int			rs_ntuples;		/* number of visible tuples on page */
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index 7938b741d6..02893bf99b 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -46,13 +46,7 @@ typedef struct TableScanDescData
 
 	/* Only used for Bitmap table scans */
 	struct BitmapHeapIterator *rs_bhs_iterator;
-	struct BitmapHeapIterator *rs_pf_bhs_iterator;
-
-	/* maximum value for prefetch_target */
-	int			prefetch_maximum;
 	struct ParallelBitmapHeapState *bm_parallel;
-	/* used to validate BHS prefetch and current block stay in sync */
-	BlockNumber blockno;
 
 	/*
 	 * Information about type and behaviour of the scan, a bitmask of members
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 44d0885d9e..b1b09bbac2 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -933,8 +933,6 @@ table_beginscan_bm(Relation rel, Snapshot snapshot,
 
 	result = rel->rd_tableam->scan_begin(rel, snapshot, nkeys, key, NULL, flags);
 	result->rs_bhs_iterator = NULL;
-	result->rs_pf_bhs_iterator = NULL;
-	result->prefetch_maximum = 0;
 	result->bm_parallel = NULL;
 	return result;
 }
@@ -1000,12 +998,6 @@ table_endscan(TableScanDesc scan)
 	{
 		bhs_end_iterate(scan->rs_bhs_iterator);
 		scan->rs_bhs_iterator = NULL;
-
-		if (scan->rs_pf_bhs_iterator)
-		{
-			bhs_end_iterate(scan->rs_pf_bhs_iterator);
-			scan->rs_pf_bhs_iterator = NULL;
-		}
 	}
 
 	scan->rs_rd->rd_tableam->scan_end(scan);
@@ -1022,12 +1014,6 @@ table_rescan(TableScanDesc scan,
 	{
 		bhs_end_iterate(scan->rs_bhs_iterator);
 		scan->rs_bhs_iterator = NULL;
-
-		if (scan->rs_pf_bhs_iterator)
-		{
-			bhs_end_iterate(scan->rs_pf_bhs_iterator);
-			scan->rs_pf_bhs_iterator = NULL;
-		}
 	}
 
 	scan->rs_rd->rd_tableam->scan_rescan(scan, key, false, false, false, false);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 60916bf0d0..430668f597 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1758,11 +1758,7 @@ typedef enum
 /* ----------------
  *	 ParallelBitmapHeapState information
  *		tbmiterator				iterator for scanning current pages
- *		prefetch_iterator		iterator for prefetching ahead of current page
- *		mutex					mutual exclusion for the prefetching variable
- *								and state
- *		prefetch_pages			# pages prefetch iterator is ahead of current
- *		prefetch_target			current target prefetch distance
+ *		mutex					mutual exclusion for state
  *		state					current state of the TIDBitmap
  *		cv						conditional wait variable
  * ----------------
@@ -1770,10 +1766,7 @@ typedef enum
 typedef struct ParallelBitmapHeapState
 {
 	dsa_pointer tbmiterator;
-	dsa_pointer prefetch_iterator;
 	slock_t		mutex;
-	int			prefetch_pages;
-	int			prefetch_target;
 	SharedBitmapState state;
 	ConditionVariable cv;
 } ParallelBitmapHeapState;
-- 
2.40.1

