From 339b39d7f2cc5fc4bd6aa3429a12e6f3a4f9d2db Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Fri, 19 Jan 2024 16:10:30 -0500 Subject: [PATCH v3 2/2] use streaming reads in index scan ci-os-only: --- src/backend/access/heap/heapam_handler.c | 19 ++++++-- src/backend/access/index/indexam.c | 42 +++++++++++++++++ src/backend/executor/nodeIndexonlyscan.c | 26 ++++++++++- src/backend/executor/nodeIndexscan.c | 57 +++++++++++++++++++----- src/backend/storage/aio/streaming_read.c | 10 ++++- src/include/access/relscan.h | 7 +++ src/include/executor/nodeIndexscan.h | 6 +++ src/include/storage/streaming_read.h | 2 + 8 files changed, 151 insertions(+), 18 deletions(-) diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index d15a02b2be..e5e13e92d8 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -127,9 +127,22 @@ heapam_index_fetch_tuple(struct IndexFetchTableData *scan, /* Switch to correct buffer if we don't have it already */ Buffer prev_buf = hscan->xs_cbuf; - hscan->xs_cbuf = ReleaseAndReadBuffer(hscan->xs_cbuf, - hscan->xs_base.rel, - ItemPointerGetBlockNumber(tid)); + if (scan->pgsr && scan->do_pgsr) + { + hscan->xs_cbuf = pg_streaming_read_buffer_get_next(scan->pgsr, (void **) &tid); + if (!BufferIsValid(hscan->xs_cbuf)) + return false; + } + else + { + ItemPointerSet(&scan->tid_queue, InvalidBlockNumber, InvalidOffsetNumber); + if (!ItemPointerIsValid(tid)) + return false; + hscan->xs_cbuf = ReleaseAndReadBuffer(hscan->xs_cbuf, + hscan->xs_base.rel, + ItemPointerGetBlockNumber(tid)); + } + /* * Prune page, but only if we weren't already on this page diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c index 63dff101e2..f247a1d2d3 100644 --- a/src/backend/access/index/indexam.c +++ b/src/backend/access/index/indexam.c @@ -211,6 +211,29 @@ index_insert_cleanup(Relation indexRelation, indexRelation->rd_indam->aminsertcleanup(indexInfo); } +static ItemPointerData +index_tid_dequeue(ItemPointer tid_queue) +{ + ItemPointerData result = *tid_queue; + ItemPointerSet(tid_queue, InvalidBlockNumber, InvalidOffsetNumber); + return result; +} + + +static BlockNumber +index_pgsr_next_single(PgStreamingRead *pgsr, void *pgsr_private, void *per_buffer_data) +{ + IndexFetchTableData *scan = (IndexFetchTableData *) pgsr_private; + ItemPointerData data = index_tid_dequeue(&scan->tid_queue); + + ItemPointer dest = per_buffer_data; + *dest = data; + + if (!ItemPointerIsValid(&data)) + return InvalidBlockNumber; + return ItemPointerGetBlockNumber(&data); +} + /* * index_beginscan - start a scan of an index with amgettuple * @@ -236,7 +259,22 @@ index_beginscan(Relation heapRelation, scan->xs_snapshot = snapshot; /* prepare to fetch index matches from table */ + scan->index_done = false; scan->xs_heapfetch = table_index_fetch_begin(heapRelation); + ItemPointerSet(&scan->xs_heapfetch->tid_queue, InvalidBlockNumber, + InvalidOffsetNumber); + + // TODO: can't put this here bc not AM agnostic + scan->xs_heapfetch->pgsr = pg_streaming_read_buffer_alloc(PGSR_FLAG_DEFAULT, + scan->xs_heapfetch, + sizeof(ItemPointerData), + NULL, + BMR_REL(scan->heapRelation), + MAIN_FORKNUM, + index_pgsr_next_single); + + pg_streaming_read_set_resumable(scan->xs_heapfetch->pgsr); + scan->xs_heapfetch->do_pgsr = false; return scan; } @@ -525,6 +563,9 @@ index_beginscan_parallel(Relation heaprel, Relation indexrel, int nkeys, /* prepare to fetch index matches from table */ scan->xs_heapfetch = table_index_fetch_begin(heaprel); + ItemPointerSet(&scan->xs_heapfetch->tid_queue, InvalidBlockNumber, + InvalidOffsetNumber); + scan->xs_heapfetch->do_pgsr = false; return scan; } @@ -566,6 +607,7 @@ index_getnext_tid(IndexScanDesc scan, ScanDirection direction) if (scan->xs_heapfetch) table_index_fetch_reset(scan->xs_heapfetch); + ItemPointerSet(&scan->xs_heaptid, InvalidBlockNumber, InvalidOffsetNumber); return NULL; } Assert(ItemPointerIsValid(&scan->xs_heaptid)); diff --git a/src/backend/executor/nodeIndexonlyscan.c b/src/backend/executor/nodeIndexonlyscan.c index 2c2c9c10b5..7979ecf1e4 100644 --- a/src/backend/executor/nodeIndexonlyscan.c +++ b/src/backend/executor/nodeIndexonlyscan.c @@ -111,17 +111,36 @@ IndexOnlyNext(IndexOnlyScanState *node) node->ioss_NumScanKeys, node->ioss_OrderByKeys, node->ioss_NumOrderByKeys); + + scandesc->xs_heapfetch->do_pgsr = true; } /* * OK, now that we have what we need, fetch the next tuple. */ - while ((tid = index_getnext_tid(scandesc, direction)) != NULL) + while (true) { bool tuple_from_heap = false; CHECK_FOR_INTERRUPTS(); + Assert(!TID_QUEUE_FULL(&scandesc->xs_heapfetch->tid_queue)); + do + { + tid = index_getnext_tid(scandesc, direction); + + if (!tid) + { + scandesc->index_done = true; + break; + } + + index_tid_enqueue(tid, &scandesc->xs_heapfetch->tid_queue); + } while (!TID_QUEUE_FULL(&scandesc->xs_heapfetch->tid_queue)); + + if (!tid && TID_QUEUE_EMPTY(&scandesc->xs_heapfetch->tid_queue)) + break; + /* * We can skip the heap fetch if the TID references a heap page on * which all tuples are known visible to everybody. In any case, @@ -156,7 +175,7 @@ IndexOnlyNext(IndexOnlyScanState *node) * It's worth going through this complexity to avoid needing to lock * the VM buffer, which could cause significant contention. */ - if (!VM_ALL_VISIBLE(scandesc->heapRelation, + if (!tid || !VM_ALL_VISIBLE(scandesc->heapRelation, ItemPointerGetBlockNumber(tid), &node->ioss_VMBuffer)) { @@ -187,6 +206,9 @@ IndexOnlyNext(IndexOnlyScanState *node) tuple_from_heap = true; } + else + ItemPointerSet(&scandesc->xs_heapfetch->tid_queue, + InvalidBlockNumber, InvalidOffsetNumber); /* * Fill the scan tuple slot with data from the index. This might be diff --git a/src/backend/executor/nodeIndexscan.c b/src/backend/executor/nodeIndexscan.c index 03142b4a94..be91854436 100644 --- a/src/backend/executor/nodeIndexscan.c +++ b/src/backend/executor/nodeIndexscan.c @@ -77,6 +77,16 @@ static HeapTuple reorderqueue_pop(IndexScanState *node); * using the index specified in the IndexScanState information. * ---------------------------------------------------------------- */ + +void +index_tid_enqueue(ItemPointer tid, ItemPointer tid_queue) +{ + Assert(!ItemPointerIsValid(tid_queue)); + + ItemPointerSet(tid_queue, ItemPointerGetBlockNumber(tid), + ItemPointerGetOffsetNumber(tid)); +} + static TupleTableSlot * IndexNext(IndexScanState *node) { @@ -123,31 +133,54 @@ IndexNext(IndexScanState *node) index_rescan(scandesc, node->iss_ScanKeys, node->iss_NumScanKeys, node->iss_OrderByKeys, node->iss_NumOrderByKeys); + + scandesc->xs_heapfetch->do_pgsr = true; } /* * ok, now that we have what we need, fetch the next tuple. */ - while (index_getnext_slot(scandesc, direction, slot)) + + while (true) { CHECK_FOR_INTERRUPTS(); - /* - * If the index was lossy, we have to recheck the index quals using - * the fetched tuple. - */ - if (scandesc->xs_recheck) + Assert(!TID_QUEUE_FULL(&scandesc->xs_heapfetch->tid_queue)); + do { - econtext->ecxt_scantuple = slot; - if (!ExecQualAndReset(node->indexqualorig, econtext)) + ItemPointer tid = index_getnext_tid(scandesc, direction); + + if (!tid) { - /* Fails recheck, so drop it and loop back for another */ - InstrCountFiltered2(node, 1); - continue; + scandesc->index_done = true; + break; } + + index_tid_enqueue(tid, &scandesc->xs_heapfetch->tid_queue); + } while (!TID_QUEUE_FULL(&scandesc->xs_heapfetch->tid_queue)); + + if (index_fetch_heap(scandesc, slot)) + { + /* + * If the index was lossy, we have to recheck the index quals using + * the fetched tuple. + */ + if (scandesc->xs_recheck) + { + econtext->ecxt_scantuple = slot; + if (!ExecQualAndReset(node->indexqualorig, econtext)) + { + /* Fails recheck, so drop it and loop back for another */ + InstrCountFiltered2(node, 1); + continue; + } + } + + return slot; } - return slot; + if (scandesc->index_done) + break; } /* diff --git a/src/backend/storage/aio/streaming_read.c b/src/backend/storage/aio/streaming_read.c index 19605090fe..6465963f83 100644 --- a/src/backend/storage/aio/streaming_read.c +++ b/src/backend/storage/aio/streaming_read.c @@ -34,6 +34,7 @@ struct PgStreamingRead int pinned_buffers_trigger; int next_tail_buffer; bool finished; + bool resumable; void *pgsr_private; PgStreamingReadBufferCB callback; BufferAccessStrategy strategy; @@ -292,7 +293,8 @@ pg_streaming_read_look_ahead(PgStreamingRead *pgsr) blocknum = pgsr->callback(pgsr, pgsr->pgsr_private, per_buffer_data); if (blocknum == InvalidBlockNumber) { - pgsr->finished = true; + if (!pgsr->resumable) + pgsr->finished = true; break; } bmr = pgsr->bmr; @@ -433,3 +435,9 @@ pg_streaming_read_free(PgStreamingRead *pgsr) pfree(pgsr->per_buffer_data); pfree(pgsr); } + +void +pg_streaming_read_set_resumable(PgStreamingRead *pgsr) +{ + pgsr->resumable = true; +} diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h index 521043304a..ade7f59946 100644 --- a/src/include/access/relscan.h +++ b/src/include/access/relscan.h @@ -18,6 +18,7 @@ #include "access/itup.h" #include "port/atomics.h" #include "storage/buf.h" +#include "storage/streaming_read.h" #include "storage/spin.h" #include "utils/relcache.h" @@ -104,6 +105,9 @@ typedef struct ParallelBlockTableScanWorkerData *ParallelBlockTableScanWorker; typedef struct IndexFetchTableData { Relation rel; + PgStreamingRead *pgsr; + ItemPointerData tid_queue; + bool do_pgsr; } IndexFetchTableData; /* @@ -162,6 +166,9 @@ typedef struct IndexScanDescData bool *xs_orderbynulls; bool xs_recheckorderby; + bool index_done; + + /* parallel index scan information, in shared memory */ struct ParallelIndexScanDescData *parallel_scan; } IndexScanDescData; diff --git a/src/include/executor/nodeIndexscan.h b/src/include/executor/nodeIndexscan.h index 3cddece67c..7dbff789e9 100644 --- a/src/include/executor/nodeIndexscan.h +++ b/src/include/executor/nodeIndexscan.h @@ -44,4 +44,10 @@ extern bool ExecIndexEvalArrayKeys(ExprContext *econtext, IndexArrayKeyInfo *arrayKeys, int numArrayKeys); extern bool ExecIndexAdvanceArrayKeys(IndexArrayKeyInfo *arrayKeys, int numArrayKeys); +#define TID_QUEUE_FULL(tid_queue) (ItemPointerIsValid(tid_queue)) +/* If it were a real queue empty and full wouldn't be opposites */ +#define TID_QUEUE_EMPTY(tid_queue) (!ItemPointerIsValid(tid_queue)) + +extern void index_tid_enqueue(ItemPointer tid, ItemPointer tid_queue); + #endif /* NODEINDEXSCAN_H */ diff --git a/src/include/storage/streaming_read.h b/src/include/storage/streaming_read.h index 40c3408c54..2288b7b5eb 100644 --- a/src/include/storage/streaming_read.h +++ b/src/include/storage/streaming_read.h @@ -42,4 +42,6 @@ extern void pg_streaming_read_prefetch(PgStreamingRead *pgsr); extern Buffer pg_streaming_read_buffer_get_next(PgStreamingRead *pgsr, void **per_buffer_private); extern void pg_streaming_read_free(PgStreamingRead *pgsr); +extern void pg_streaming_read_set_resumable(PgStreamingRead *pgsr); + #endif -- 2.37.2