From ead994a29826be6033c3acc8fca90a6f6cc58a30 Mon Sep 17 00:00:00 2001
From: Nazir Bilal Yavuz <byavuz81@gmail.com>
Date: Mon, 19 Feb 2024 14:30:47 +0300
Subject: [PATCH v1 2/2] Use streaming read API in ANALYZE

ANALYZE command gets random tuples using BlockSampler algorithm. Use
streaming reads to get these tuples by using BlockSampler algorithm in
streaming read API prefetch logic.
---
 src/include/access/tableam.h             | 16 ++--
 src/backend/access/heap/heapam_handler.c | 11 +--
 src/backend/commands/analyze.c           | 97 ++++++++----------------
 3 files changed, 45 insertions(+), 79 deletions(-)

diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 5f8474871d2..7e6e99ba71d 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -21,6 +21,7 @@
 #include "access/sdir.h"
 #include "access/xact.h"
 #include "executor/tuptable.h"
+#include "storage/streaming_read.h"
 #include "utils/rel.h"
 #include "utils/snapshot.h"
 
@@ -648,9 +649,9 @@ typedef struct TableAmRoutine
 									BufferAccessStrategy bstrategy);
 
 	/*
-	 * Prepare to analyze block `blockno` of `scan`. The scan has been started
-	 * with table_beginscan_analyze().  See also
-	 * table_scan_analyze_next_block().
+	 * Prepare to analyze next block of `scan`. Next block is decided by
+	 * callback function of `pgsr`. The scan has been started with
+	 * table_beginscan_analyze(). See also table_scan_analyze_next_block().
 	 *
 	 * The callback may acquire resources like locks that are held until
 	 * table_scan_analyze_next_tuple() returns false. It e.g. can make sense
@@ -665,8 +666,7 @@ typedef struct TableAmRoutine
 	 * isn't one yet.
 	 */
 	bool		(*scan_analyze_next_block) (TableScanDesc scan,
-											BlockNumber blockno,
-											BufferAccessStrategy bstrategy);
+											PgStreamingRead *pgsr);
 
 	/*
 	 * See table_scan_analyze_next_tuple().
@@ -1714,11 +1714,9 @@ table_relation_vacuum(Relation rel, struct VacuumParams *params,
  * Returns false if block is unsuitable for sampling, true otherwise.
  */
 static inline bool
-table_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno,
-							  BufferAccessStrategy bstrategy)
+table_scan_analyze_next_block(TableScanDesc scan, PgStreamingRead *pgsr)
 {
-	return scan->rs_rd->rd_tableam->scan_analyze_next_block(scan, blockno,
-															bstrategy);
+	return scan->rs_rd->rd_tableam->scan_analyze_next_block(scan, pgsr);
 }
 
 /*
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 680a50bf8b1..7ffaf8ac402 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -993,10 +993,10 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
 }
 
 static bool
-heapam_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno,
-							   BufferAccessStrategy bstrategy)
+heapam_scan_analyze_next_block(TableScanDesc scan, PgStreamingRead *pgsr)
 {
 	HeapScanDesc hscan = (HeapScanDesc) scan;
+	BlockNumber *current_block;
 
 	/*
 	 * We must maintain a pin on the target page's buffer to ensure that
@@ -1007,10 +1007,11 @@ heapam_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno,
 	 * doing much work per tuple, the extra lock traffic is probably better
 	 * avoided.
 	 */
-	hscan->rs_cblock = blockno;
+	hscan->rs_cbuf = pg_streaming_read_buffer_get_next(pgsr, (void **) &current_block);
+	hscan->rs_cblock = *current_block;
 	hscan->rs_cindex = FirstOffsetNumber;
-	hscan->rs_cbuf = ReadBufferExtended(scan->rs_rd, MAIN_FORKNUM,
-										blockno, RBM_NORMAL, bstrategy);
+
+	Assert(BufferIsValid(hscan->rs_cbuf));
 	LockBuffer(hscan->rs_cbuf, BUFFER_LOCK_SHARE);
 
 	/* in heap all blocks can contain tuples, so always return true */
diff --git a/src/backend/commands/analyze.c b/src/backend/commands/analyze.c
index a03495d6c95..7576afcf655 100644
--- a/src/backend/commands/analyze.c
+++ b/src/backend/commands/analyze.c
@@ -1112,6 +1112,26 @@ examine_attribute(Relation onerel, int attnum, Node *index_expr)
 	return stats;
 }
 
+/*
+ * Prefetch callback function to get next block number while using
+ * BlockSampling algorithm
+ */
+static BlockNumber
+pg_block_sampling_streaming_read_next(PgStreamingRead *pgsr,
+									  void *pgsr_private,
+									  void *per_io_data)
+{
+	BlockSamplerData *bs = pgsr_private;
+	BlockNumber *current_block = per_io_data;
+
+	if (BlockSampler_HasMore(bs))
+		*current_block = BlockSampler_Next(bs);
+	else
+		*current_block = InvalidBlockNumber;
+
+	return *current_block;
+}
+
 /*
  * acquire_sample_rows -- acquire a random sample of rows from the table
  *
@@ -1164,10 +1184,7 @@ acquire_sample_rows(Relation onerel, int elevel,
 	TableScanDesc scan;
 	BlockNumber nblocks;
 	BlockNumber blksdone = 0;
-#ifdef USE_PREFETCH
-	int			prefetch_maximum = 0;	/* blocks to prefetch if enabled */
-	BlockSamplerData prefetch_bs;
-#endif
+	PgStreamingRead *pgsr = NULL;
 
 	Assert(targrows > 0);
 
@@ -1180,13 +1197,6 @@ acquire_sample_rows(Relation onerel, int elevel,
 	randseed = pg_prng_uint32(&pg_global_prng_state);
 	nblocks = BlockSampler_Init(&bs, totalblocks, targrows, randseed);
 
-#ifdef USE_PREFETCH
-	prefetch_maximum = get_tablespace_maintenance_io_concurrency(onerel->rd_rel->reltablespace);
-	/* Create another BlockSampler, using the same seed, for prefetching */
-	if (prefetch_maximum)
-		(void) BlockSampler_Init(&prefetch_bs, totalblocks, targrows, randseed);
-#endif
-
 	/* Report sampling block numbers */
 	pgstat_progress_update_param(PROGRESS_ANALYZE_BLOCKS_TOTAL,
 								 nblocks);
@@ -1197,68 +1207,23 @@ acquire_sample_rows(Relation onerel, int elevel,
 	scan = table_beginscan_analyze(onerel);
 	slot = table_slot_create(onerel, NULL);
 
-#ifdef USE_PREFETCH
-
-	/*
-	 * If we are doing prefetching, then go ahead and tell the kernel about
-	 * the first set of pages we are going to want.  This also moves our
-	 * iterator out ahead of the main one being used, where we will keep it so
-	 * that we're always pre-fetching out prefetch_maximum number of blocks
-	 * ahead.
-	 */
-	if (prefetch_maximum)
-	{
-		for (int i = 0; i < prefetch_maximum; i++)
-		{
-			BlockNumber prefetch_block;
-
-			if (!BlockSampler_HasMore(&prefetch_bs))
-				break;
-
-			prefetch_block = BlockSampler_Next(&prefetch_bs);
-			PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, prefetch_block);
-		}
-	}
-#endif
+	pgsr = pg_streaming_read_buffer_alloc(PGSR_FLAG_DEFAULT,
+										  &bs,
+										  sizeof(BlockSamplerData),
+										  vac_strategy,
+										  BMR_REL(scan->rs_rd),
+										  MAIN_FORKNUM,
+										  pg_block_sampling_streaming_read_next);
 
 	/* Outer loop over blocks to sample */
-	while (BlockSampler_HasMore(&bs))
+	while (nblocks)
 	{
 		bool		block_accepted;
-		BlockNumber targblock = BlockSampler_Next(&bs);
-#ifdef USE_PREFETCH
-		BlockNumber prefetch_targblock = InvalidBlockNumber;
-
-		/*
-		 * Make sure that every time the main BlockSampler is moved forward
-		 * that our prefetch BlockSampler also gets moved forward, so that we
-		 * always stay out ahead.
-		 */
-		if (prefetch_maximum && BlockSampler_HasMore(&prefetch_bs))
-			prefetch_targblock = BlockSampler_Next(&prefetch_bs);
-#endif
 
 		vacuum_delay_point();
 
-		block_accepted = table_scan_analyze_next_block(scan, targblock, vac_strategy);
+		block_accepted = table_scan_analyze_next_block(scan, pgsr);
 
-#ifdef USE_PREFETCH
-
-		/*
-		 * When pre-fetching, after we get a block, tell the kernel about the
-		 * next one we will want, if there's any left.
-		 *
-		 * We want to do this even if the table_scan_analyze_next_block() call
-		 * above decides against analyzing the block it picked.
-		 */
-		if (prefetch_maximum && prefetch_targblock != InvalidBlockNumber)
-			PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, prefetch_targblock);
-#endif
-
-		/*
-		 * Don't analyze if table_scan_analyze_next_block() indicated this
-		 * block is unsuitable for analyzing.
-		 */
 		if (!block_accepted)
 			continue;
 
@@ -1309,7 +1274,9 @@ acquire_sample_rows(Relation onerel, int elevel,
 
 		pgstat_progress_update_param(PROGRESS_ANALYZE_BLOCKS_DONE,
 									 ++blksdone);
+		nblocks--;
 	}
+	pg_streaming_read_free(pgsr);
 
 	ExecDropSingleTupleTableSlot(slot);
 	table_endscan(scan);
-- 
2.43.0

