From 31a0b829b3aca31542dc3236b408f1e86133aea7 Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Fri, 19 Jan 2024 16:10:30 -0500 Subject: [PATCH v1 2/2] use streaming reads in index scan --- src/backend/access/heap/heapam_handler.c | 14 +++- src/backend/access/index/indexam.c | 2 + src/backend/executor/nodeIndexscan.c | 83 ++++++++++++++++++++---- src/backend/storage/aio/streaming_read.c | 10 ++- src/include/access/relscan.h | 6 ++ src/include/storage/streaming_read.h | 2 + 6 files changed, 101 insertions(+), 16 deletions(-) diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index d15a02b2be7..0ef5f824546 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -127,9 +127,17 @@ heapam_index_fetch_tuple(struct IndexFetchTableData *scan, /* Switch to correct buffer if we don't have it already */ Buffer prev_buf = hscan->xs_cbuf; - hscan->xs_cbuf = ReleaseAndReadBuffer(hscan->xs_cbuf, - hscan->xs_base.rel, - ItemPointerGetBlockNumber(tid)); + if (scan->pgsr) + { + hscan->xs_cbuf = pg_streaming_read_buffer_get_next(scan->pgsr, NULL); + if (!BufferIsValid(hscan->xs_cbuf)) + return false; + } + else + hscan->xs_cbuf = ReleaseAndReadBuffer(hscan->xs_cbuf, + hscan->xs_base.rel, + ItemPointerGetBlockNumber(tid)); + /* * Prune page, but only if we weren't already on this page diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c index 63dff101e29..c118cc3861f 100644 --- a/src/backend/access/index/indexam.c +++ b/src/backend/access/index/indexam.c @@ -237,6 +237,8 @@ index_beginscan(Relation heapRelation, /* prepare to fetch index matches from table */ scan->xs_heapfetch = table_index_fetch_begin(heapRelation); + scan->index_done = false; + scan->xs_heapfetch->blk_queue = InvalidBlockNumber; return scan; } diff --git a/src/backend/executor/nodeIndexscan.c b/src/backend/executor/nodeIndexscan.c index 03142b4a946..41437faff06 100644 --- a/src/backend/executor/nodeIndexscan.c +++ b/src/backend/executor/nodeIndexscan.c @@ -77,6 +77,33 @@ static HeapTuple reorderqueue_pop(IndexScanState *node); * using the index specified in the IndexScanState information. * ---------------------------------------------------------------- */ + +#define QUEUE_FULL(q) ((q) != InvalidBlockNumber) + +static void +blk_enqueue(BlockNumber blkno, BlockNumber *blk_queue) +{ + Assert(*blk_queue == InvalidBlockNumber); + *blk_queue = blkno; +} + +static BlockNumber +blk_dequeue(BlockNumber *blk_queue) +{ + BlockNumber result = *blk_queue; + *blk_queue = InvalidBlockNumber; + return result; +} + + +static BlockNumber +index_pgsr_next_single(PgStreamingRead *pgsr, void *pgsr_private, void *per_buffer_data) +{ + IndexFetchTableData *scan = (IndexFetchTableData *) pgsr_private; + return blk_dequeue(&scan->blk_queue); +} + + static TupleTableSlot * IndexNext(IndexScanState *node) { @@ -123,31 +150,63 @@ IndexNext(IndexScanState *node) index_rescan(scandesc, node->iss_ScanKeys, node->iss_NumScanKeys, node->iss_OrderByKeys, node->iss_NumOrderByKeys); + + // TODO: can't put this here bc not AM agnostic + scandesc->xs_heapfetch->pgsr = pg_streaming_read_buffer_alloc(PGSR_FLAG_DEFAULT, + scandesc->xs_heapfetch, + 0, + NULL, + BMR_REL(scandesc->heapRelation), + MAIN_FORKNUM, + index_pgsr_next_single); + + pg_streaming_read_set_resumable(scandesc->xs_heapfetch->pgsr); } /* * ok, now that we have what we need, fetch the next tuple. */ - while (index_getnext_slot(scandesc, direction, slot)) + + while (true) { CHECK_FOR_INTERRUPTS(); - /* - * If the index was lossy, we have to recheck the index quals using - * the fetched tuple. - */ - if (scandesc->xs_recheck) + if (index_fetch_heap(scandesc, slot)) { - econtext->ecxt_scantuple = slot; - if (!ExecQualAndReset(node->indexqualorig, econtext)) + /* + * If the index was lossy, we have to recheck the index quals using + * the fetched tuple. + */ + if (scandesc->xs_recheck) { - /* Fails recheck, so drop it and loop back for another */ - InstrCountFiltered2(node, 1); - continue; + econtext->ecxt_scantuple = slot; + if (!ExecQualAndReset(node->indexqualorig, econtext)) + { + /* Fails recheck, so drop it and loop back for another */ + InstrCountFiltered2(node, 1); + continue; + } } + + return slot; } - return slot; + if (scandesc->index_done) + break; + + Assert(!QUEUE_FULL(scandesc->xs_heapfetch->blk_queue)); + do + { + ItemPointer tid = index_getnext_tid(scandesc, direction); + + if (!tid) + { + scandesc->index_done = true; + break; + } + + blk_enqueue(ItemPointerGetBlockNumber(tid), &scandesc->xs_heapfetch->blk_queue); + } while (!QUEUE_FULL(scandesc->xs_heapfetch->blk_queue)); } /* diff --git a/src/backend/storage/aio/streaming_read.c b/src/backend/storage/aio/streaming_read.c index 19605090fea..6465963f837 100644 --- a/src/backend/storage/aio/streaming_read.c +++ b/src/backend/storage/aio/streaming_read.c @@ -34,6 +34,7 @@ struct PgStreamingRead int pinned_buffers_trigger; int next_tail_buffer; bool finished; + bool resumable; void *pgsr_private; PgStreamingReadBufferCB callback; BufferAccessStrategy strategy; @@ -292,7 +293,8 @@ pg_streaming_read_look_ahead(PgStreamingRead *pgsr) blocknum = pgsr->callback(pgsr, pgsr->pgsr_private, per_buffer_data); if (blocknum == InvalidBlockNumber) { - pgsr->finished = true; + if (!pgsr->resumable) + pgsr->finished = true; break; } bmr = pgsr->bmr; @@ -433,3 +435,9 @@ pg_streaming_read_free(PgStreamingRead *pgsr) pfree(pgsr->per_buffer_data); pfree(pgsr); } + +void +pg_streaming_read_set_resumable(PgStreamingRead *pgsr) +{ + pgsr->resumable = true; +} diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h index 521043304ab..d476cb206d5 100644 --- a/src/include/access/relscan.h +++ b/src/include/access/relscan.h @@ -18,6 +18,7 @@ #include "access/itup.h" #include "port/atomics.h" #include "storage/buf.h" +#include "storage/streaming_read.h" #include "storage/spin.h" #include "utils/relcache.h" @@ -104,6 +105,8 @@ typedef struct ParallelBlockTableScanWorkerData *ParallelBlockTableScanWorker; typedef struct IndexFetchTableData { Relation rel; + PgStreamingRead *pgsr; + BlockNumber blk_queue; } IndexFetchTableData; /* @@ -162,6 +165,9 @@ typedef struct IndexScanDescData bool *xs_orderbynulls; bool xs_recheckorderby; + bool index_done; + + /* parallel index scan information, in shared memory */ struct ParallelIndexScanDescData *parallel_scan; } IndexScanDescData; diff --git a/src/include/storage/streaming_read.h b/src/include/storage/streaming_read.h index 40c3408c541..2288b7b5eb0 100644 --- a/src/include/storage/streaming_read.h +++ b/src/include/storage/streaming_read.h @@ -42,4 +42,6 @@ extern void pg_streaming_read_prefetch(PgStreamingRead *pgsr); extern Buffer pg_streaming_read_buffer_get_next(PgStreamingRead *pgsr, void **per_buffer_private); extern void pg_streaming_read_free(PgStreamingRead *pgsr); +extern void pg_streaming_read_set_resumable(PgStreamingRead *pgsr); + #endif -- 2.37.2