Index: src/backend/access/nbtree/README =================================================================== RCS file: /projects/cvsroot/pgsql/src/backend/access/nbtree/README,v retrieving revision 1.10 diff -c -r1.10 README *** src/backend/access/nbtree/README 25 Apr 2006 22:46:05 -0000 1.10 --- src/backend/access/nbtree/README 1 May 2006 18:34:27 -0000 *************** *** 68,79 **** below.) Read locks on a page are held for as long as a scan is examining a page. ! But nbtree.c arranges to drop the read lock, but not the buffer pin, ! on the current page of a scan before control leaves nbtree. When we ! come back to resume the scan, we have to re-grab the read lock and ! then move right if the current item moved (see _bt_restscan()). Keeping ! the pin ensures that the current item cannot move left or be deleted ! (see btbulkdelete). In most cases we release our lock and pin on a page before attempting to acquire pin and lock on the page we are moving to. In a few places --- 68,84 ---- below.) Read locks on a page are held for as long as a scan is examining a page. ! The index page is scanned for all matching items in one pass before ! dropping the read lock. However, the buffer pin is kept. Logically, ! the scan always stops on page boundary. This eliminates the problem ! that the current item is moved by a page split, and deleted, so ! that we can't find it when we come back to continue the scan. The page ! boundary is a safe point to stop, because items are never moved from one ! page to another existing page. A page split could move items to a ! completely new page, however. Because of that, forward scan memorizes ! the right sibling pointer before dropping the lock, because if the ! current page splits, the right sibling would point to the new page, ! and the scan would see the items that were moved to the new page twice. In most cases we release our lock and pin on a page before attempting to acquire pin and lock on the page we are moving to. In a few places Index: src/backend/access/nbtree/nbtree.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v retrieving revision 1.145 diff -c -r1.145 nbtree.c *** src/backend/access/nbtree/nbtree.c 25 Apr 2006 22:46:05 -0000 1.145 --- src/backend/access/nbtree/nbtree.c 1 May 2006 18:34:27 -0000 *************** *** 48,54 **** } BTBuildState; - static void _bt_restscan(IndexScanDesc scan); static void btbuildCallback(Relation index, HeapTuple htup, Datum *values, --- 48,53 ---- *************** *** 211,282 **** /* * btgettuple() -- Get the next tuple in the scan. */ Datum btgettuple(PG_FUNCTION_ARGS) { IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0); ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1); - BTScanOpaque so = (BTScanOpaque) scan->opaque; - Page page; - OffsetNumber offnum; bool res; /* * If we've already initialized this scan, we can just advance it in the * appropriate direction. If we haven't done so yet, we call a routine to * get the first item in the scan. */ ! if (ItemPointerIsValid(&(scan->currentItemData))) { ! /* ! * Restore scan position using heap TID returned by previous call to ! * btgettuple(). _bt_restscan() re-grabs the read lock on the buffer, ! * too. ! */ ! _bt_restscan(scan); ! /* * Check to see if we should kill the previously-fetched tuple. */ if (scan->kill_prior_tuple) { ! /* ! * Yes, so mark it by setting the LP_DELETE bit in the item flags. ! */ ! offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData)); ! page = BufferGetPage(so->btso_curbuf); ! PageGetItemId(page, offnum)->lp_flags |= LP_DELETE; ! ! /* ! * Since this can be redone later if needed, it's treated the same ! * as a commit-hint-bit status update for heap tuples: we mark the ! * buffer dirty but don't make a WAL log entry. ! */ ! SetBufferCommitInfoNeedsSave(so->btso_curbuf); } ! ! /* ! * Now continue the scan. ! */ ! res = _bt_next(scan, dir); } else ! res = _bt_first(scan, dir); /* ! * Save heap TID to use it in _bt_restscan. Then release the read lock on ! * the buffer so that we aren't blocking other backends. * * NOTE: we do keep the pin on the buffer! This is essential to ensure * that someone else doesn't delete the index entry we are stopped on. */ if (res) ! { ! ((BTScanOpaque) scan->opaque)->curHeapIptr = scan->xs_ctup.t_self; ! LockBuffer(((BTScanOpaque) scan->opaque)->btso_curbuf, ! BUFFER_LOCK_UNLOCK); ! } PG_RETURN_BOOL(res); } --- 210,296 ---- /* * btgettuple() -- Get the next tuple in the scan. + * + * The index scan code in nbtsearch.c works one page at a time, + * to avoid problems with concurrent page splits. btgettuple + * implements tuple at time behaviour on top of that. */ Datum btgettuple(PG_FUNCTION_ARGS) { IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0); ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1); bool res; + BTScanOpaque so = (BTScanOpaque) scan->opaque; + BTScanPos currpos = &so->currPos; /* * If we've already initialized this scan, we can just advance it in the * appropriate direction. If we haven't done so yet, we call a routine to * get the first item in the scan. */ ! if (!so->scanInitialized) { ! so->scanInitialized = true; ! currpos->morePages = _bt_firstpage(scan, dir); ! currpos->itemIndex = 0; ! } else { /* * Check to see if we should kill the previously-fetched tuple. */ if (scan->kill_prior_tuple) { ! so->deadHeapPtrs[so->numDeadItems] = currpos->heapPtrs[currpos->itemIndex]; ! so->deadOffsets[so->numDeadItems] = currpos->offsets[currpos->itemIndex]; ! so->numDeadItems++; } ! currpos->itemIndex++; } + /* + * If we have items left in the buffer that we haven't returned to + * the caller yet, return next one from there. + */ + if(currpos->itemIndex < currpos->numItems) + res = true; else ! { ! /* No more items left in buffer. We have to fetch next page. ! * Loop until we find a page with matching items on it or ! * we reach end of scan. ! */ ! for(;;) ! { ! if(!currpos->morePages) ! { ! res = false; ! break; ! } ! ! currpos->morePages = _bt_nextpage(scan, dir); ! currpos->itemIndex = 0; ! ! if(currpos->numItems > 0) ! { ! res = true; ! break; ! } ! } ! } /* ! * Return the heap TID to the caller. * * NOTE: we do keep the pin on the buffer! This is essential to ensure * that someone else doesn't delete the index entry we are stopped on. + * + * XXX: Is the above true anymore? */ if (res) ! scan->xs_ctup.t_self = currpos->heapPtrs[currpos->itemIndex]; ! ! #ifdef NBTREE_DEBUG ! elog(LOG, "btgettuple: returns res = %d", res); ! #endif PG_RETURN_BOOL(res); } *************** *** 284,292 **** /* * btgetmulti() -- get multiple tuples at once * ! * This is a somewhat generic implementation: it avoids the _bt_restscan ! * overhead, but there's no smarts about picking especially good stopping ! * points such as index page boundaries. */ Datum btgetmulti(PG_FUNCTION_ARGS) --- 298,304 ---- /* * btgetmulti() -- get multiple tuples at once * ! * This returns tuples one index page at a time. */ Datum btgetmulti(PG_FUNCTION_ARGS) *************** *** 296,338 **** int32 max_tids = PG_GETARG_INT32(2); int32 *returned_tids = (int32 *) PG_GETARG_POINTER(3); BTScanOpaque so = (BTScanOpaque) scan->opaque; ! bool res = true; ! int32 ntids = 0; ! /* ! * Restore prior state if we were already called at least once. */ ! if (ItemPointerIsValid(&(scan->currentItemData))) ! _bt_restscan(scan); ! while (ntids < max_tids) ! { ! /* ! * Start scan, or advance to next tuple. ! */ ! if (ItemPointerIsValid(&(scan->currentItemData))) ! res = _bt_next(scan, ForwardScanDirection); ! else ! res = _bt_first(scan, ForwardScanDirection); ! if (!res) ! break; ! /* Save tuple ID, and continue scanning */ ! tids[ntids] = scan->xs_ctup.t_self; ! ntids++; ! } ! ! /* ! * Save heap TID to use it in _bt_restscan. Then release the read lock on ! * the buffer so that we aren't blocking other backends. */ ! if (res) { ! so->curHeapIptr = scan->xs_ctup.t_self; ! LockBuffer(so->btso_curbuf, BUFFER_LOCK_UNLOCK); } ! *returned_tids = ntids; ! PG_RETURN_BOOL(res); } /* --- 308,348 ---- int32 max_tids = PG_GETARG_INT32(2); int32 *returned_tids = (int32 *) PG_GETARG_POINTER(3); BTScanOpaque so = (BTScanOpaque) scan->opaque; ! ItemPointer old_heapPtrs; ! /* We assume that the caller supplied us an array big enough. ! * That's true for the ATM only caller of getmulti, the bitmap ! * index scan code. ! * ! * The getmulti API needs some rethinking. It's silly to ! * allocate the heapPtrs array in btrescan, and then not use ! * it since we have to fill the caller supplied array. */ ! Assert(max_tids >= 1024); ! /* This is a bit hackish: ! * We temporarily substitute the heapPtrs array we ! * allocated in btrescan with the caller-supplied array. ! * _bt_nextpage fills it in, and then we restore the ! * original array. */ ! old_heapPtrs = so->currPos.heapPtrs; ! so->currPos.heapPtrs = tids; ! ! if(!so->scanInitialized) { ! so->currPos.morePages = _bt_firstpage(scan, ForwardScanDirection); ! so->scanInitialized = true; ! } else { ! so->currPos.morePages = _bt_nextpage(scan, ForwardScanDirection); } + + /* restore the original array */ + so->currPos.heapPtrs = old_heapPtrs; ! *returned_tids = so->currPos.numItems; ! ! PG_RETURN_BOOL(so->currPos.morePages); } /* *************** *** 346,351 **** --- 356,365 ---- ScanKey scankey = (ScanKey) PG_GETARG_POINTER(2); IndexScanDesc scan; + #ifdef NBTREE_DEBUG + elog(LOG, "btbeginscan rel = %s", RelationGetRelationName(rel)); + #endif + /* get the scan */ scan = RelationGetIndexScan(rel, keysz, scankey); *************** *** 360,400 **** { IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0); ScanKey scankey = (ScanKey) PG_GETARG_POINTER(1); - ItemPointer iptr; BTScanOpaque so; so = (BTScanOpaque) scan->opaque; if (so == NULL) /* if called from btbeginscan */ { so = (BTScanOpaque) palloc(sizeof(BTScanOpaqueData)); - so->btso_curbuf = so->btso_mrkbuf = InvalidBuffer; - ItemPointerSetInvalid(&(so->curHeapIptr)); - ItemPointerSetInvalid(&(so->mrkHeapIptr)); if (scan->numberOfKeys > 0) so->keyData = (ScanKey) palloc(scan->numberOfKeys * sizeof(ScanKeyData)); else so->keyData = NULL; scan->opaque = so; - } ! /* we aren't holding any read locks, but gotta drop the pins */ ! if (ItemPointerIsValid(iptr = &(scan->currentItemData))) ! { ! ReleaseBuffer(so->btso_curbuf); ! so->btso_curbuf = InvalidBuffer; ! ItemPointerSetInvalid(&(so->curHeapIptr)); ! ItemPointerSetInvalid(iptr); ! } ! if (ItemPointerIsValid(iptr = &(scan->currentMarkData))) ! { ! ReleaseBuffer(so->btso_mrkbuf); ! so->btso_mrkbuf = InvalidBuffer; ! ItemPointerSetInvalid(&(so->mrkHeapIptr)); ! ItemPointerSetInvalid(iptr); } /* * Reset the scan keys. Note that keys ordering stuff moved to _bt_first. * - vadim 05/05/97 --- 374,413 ---- { IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0); ScanKey scankey = (ScanKey) PG_GETARG_POINTER(1); BTScanOpaque so; + + #ifdef NBTREE_DEBUG + elog(LOG, "btrescan"); + #endif + so = (BTScanOpaque) scan->opaque; if (so == NULL) /* if called from btbeginscan */ { so = (BTScanOpaque) palloc(sizeof(BTScanOpaqueData)); if (scan->numberOfKeys > 0) so->keyData = (ScanKey) palloc(scan->numberOfKeys * sizeof(ScanKeyData)); else so->keyData = NULL; scan->opaque = so; ! _bt_initscanpos(&so->currPos); ! _bt_initscanpos(&so->markPos); ! so->deadOffsets = palloc(sizeof(OffsetNumber) * MAX_TIDS); ! so->deadHeapPtrs = palloc(sizeof(ItemPointerData) * MAX_TIDS); ! so->numDeadItems = 0; } + so->scanInitialized = false; + + if(ScanPosIsValid(&so->currPos)) + _bt_releasescanpos(&so->currPos); + + if(ScanPosIsValid(&so->markPos)) + _bt_releasescanpos(&so->markPos); + /* * Reset the scan keys. Note that keys ordering stuff moved to _bt_first. * - vadim 05/05/97 *************** *** 415,441 **** btendscan(PG_FUNCTION_ARGS) { IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0); - ItemPointer iptr; BTScanOpaque so; so = (BTScanOpaque) scan->opaque; /* we aren't holding any read locks, but gotta drop the pins */ ! if (ItemPointerIsValid(iptr = &(scan->currentItemData))) { ! if (BufferIsValid(so->btso_curbuf)) ! ReleaseBuffer(so->btso_curbuf); ! so->btso_curbuf = InvalidBuffer; ! ItemPointerSetInvalid(iptr); } ! if (ItemPointerIsValid(iptr = &(scan->currentMarkData))) ! { ! if (BufferIsValid(so->btso_mrkbuf)) ! ReleaseBuffer(so->btso_mrkbuf); ! so->btso_mrkbuf = InvalidBuffer; ! ItemPointerSetInvalid(iptr); ! } if (so->keyData != NULL) pfree(so->keyData); --- 428,460 ---- btendscan(PG_FUNCTION_ARGS) { IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0); BTScanOpaque so; + #ifdef NBTREE_DEBUG + elog(LOG, "btendscan"); + #endif + so = (BTScanOpaque) scan->opaque; /* we aren't holding any read locks, but gotta drop the pins */ ! if(ScanPosIsValid(&so->currPos)) { ! /* kill any items we now know to be dead on the last page, ! * before closing down */ ! if(so->numDeadItems > 0) ! { ! LockBuffer(so->currPos.buf, BT_READ); ! _bt_killitems(so->currPos.buf, so->deadOffsets, so->deadHeapPtrs, so->numDeadItems); ! LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK); ! } ! _bt_releasescanpos(&so->currPos); } ! if(ScanPosIsValid(&so->markPos)) ! _bt_releasescanpos(&so->markPos); ! ! _bt_destroyscanpos(&so->currPos); ! _bt_destroyscanpos(&so->markPos); if (so->keyData != NULL) pfree(so->keyData); *************** *** 451,477 **** btmarkpos(PG_FUNCTION_ARGS) { IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0); - ItemPointer iptr; BTScanOpaque so; so = (BTScanOpaque) scan->opaque; ! /* we aren't holding any read locks, but gotta drop the pin */ ! if (ItemPointerIsValid(iptr = &(scan->currentMarkData))) ! { ! ReleaseBuffer(so->btso_mrkbuf); ! so->btso_mrkbuf = InvalidBuffer; ! ItemPointerSetInvalid(iptr); ! } ! /* bump pin on current buffer for assignment to mark buffer */ ! if (ItemPointerIsValid(&(scan->currentItemData))) ! { ! IncrBufferRefCount(so->btso_curbuf); ! so->btso_mrkbuf = so->btso_curbuf; ! scan->currentMarkData = scan->currentItemData; ! so->mrkHeapIptr = so->curHeapIptr; ! } PG_RETURN_VOID(); } --- 470,489 ---- btmarkpos(PG_FUNCTION_ARGS) { IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0); BTScanOpaque so; + #ifdef NBTREE_DEBUG + elog(LOG, "btmarkpos"); + #endif + so = (BTScanOpaque) scan->opaque; ! /* Drop the pin on previously marked position */ ! if(ScanPosIsValid(&so->markPos)) ! _bt_releasescanpos(&so->markPos); ! if(ScanPosIsValid(&so->currPos)) ! _bt_copyscanpos(&so->currPos, &so->markPos); PG_RETURN_VOID(); } *************** *** 483,508 **** btrestrpos(PG_FUNCTION_ARGS) { IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0); - ItemPointer iptr; BTScanOpaque so; so = (BTScanOpaque) scan->opaque; ! /* we aren't holding any read locks, but gotta drop the pin */ ! if (ItemPointerIsValid(iptr = &(scan->currentItemData))) ! { ! ReleaseBuffer(so->btso_curbuf); ! so->btso_curbuf = InvalidBuffer; ! ItemPointerSetInvalid(iptr); ! } /* bump pin on marked buffer */ ! if (ItemPointerIsValid(&(scan->currentMarkData))) { ! IncrBufferRefCount(so->btso_mrkbuf); ! so->btso_curbuf = so->btso_mrkbuf; ! scan->currentItemData = scan->currentMarkData; ! so->curHeapIptr = so->mrkHeapIptr; } PG_RETURN_VOID(); --- 495,516 ---- btrestrpos(PG_FUNCTION_ARGS) { IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0); BTScanOpaque so; + #ifdef NBTREE_DEBUG + elog(LOG, "btrestrpos"); + #endif + so = (BTScanOpaque) scan->opaque; ! if(ScanPosIsValid(&so->currPos)) ! _bt_releasescanpos(&so->currPos); /* bump pin on marked buffer */ ! if (ScanPosIsValid(&so->markPos)) { ! _bt_copyscanpos(&so->markPos, &so->currPos); ! so->numDeadItems = 0; } PG_RETURN_VOID(); *************** *** 521,654 **** Relation rel = (Relation) PG_GETARG_POINTER(0); IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(1); void *callback_state = (void *) PG_GETARG_POINTER(2); - IndexBulkDeleteResult *result; - double tuples_removed = 0; - OffsetNumber deletable[MaxOffsetNumber]; - int ndeletable; - Buffer buf; - BlockNumber num_pages; - - /* - * The outer loop iterates over index leaf pages, the inner over items on - * a leaf page. We issue just one _bt_delitems() call per page, so as to - * minimize WAL traffic. - * - * Note that we exclusive-lock every leaf page containing data items, in - * sequence left to right. It sounds attractive to only exclusive-lock - * those containing items we need to delete, but unfortunately that is not - * safe: we could then pass a stopped indexscan, which could in rare cases - * lead to deleting the item it needs to find when it resumes. (See - * _bt_restscan --- this could only happen if an indexscan stops on a - * deletable item and then a page split moves that item into a page - * further to its right, which the indexscan will have no pin on.) We can - * skip obtaining exclusive lock on empty pages though, since no indexscan - * could be stopped on those. - * - * We can skip the scan entirely if there's nothing to delete (indicated - * by callback_state == NULL). - */ - if (callback_state) - buf = _bt_get_endpoint(rel, 0, false); - else - buf = InvalidBuffer; ! if (BufferIsValid(buf)) /* check for empty index */ ! { ! for (;;) ! { ! Page page; ! BTPageOpaque opaque; ! OffsetNumber offnum, ! minoff, ! maxoff; ! BlockNumber nextpage; ! ! ndeletable = 0; ! page = BufferGetPage(buf); ! opaque = (BTPageOpaque) PageGetSpecialPointer(page); ! minoff = P_FIRSTDATAKEY(opaque); ! maxoff = PageGetMaxOffsetNumber(page); ! /* We probably cannot see deleted pages, but skip 'em if so */ ! if (minoff <= maxoff && !P_ISDELETED(opaque)) ! { ! /* ! * Trade in the initial read lock for a super-exclusive write ! * lock on this page. ! */ ! LockBuffer(buf, BUFFER_LOCK_UNLOCK); ! LockBufferForCleanup(buf); ! ! /* ! * Recompute minoff/maxoff, both of which could have changed ! * while we weren't holding the lock. ! */ ! minoff = P_FIRSTDATAKEY(opaque); ! maxoff = PageGetMaxOffsetNumber(page); ! ! /* ! * Scan over all items to see which ones need deleted ! * according to the callback function. ! */ ! for (offnum = minoff; ! offnum <= maxoff; ! offnum = OffsetNumberNext(offnum)) ! { ! IndexTuple itup; ! ItemPointer htup; ! ! itup = (IndexTuple) ! PageGetItem(page, PageGetItemId(page, offnum)); ! htup = &(itup->t_tid); ! if (callback(htup, callback_state)) ! { ! deletable[ndeletable++] = offnum; ! tuples_removed += 1; ! } ! } ! } ! ! /* Apply any needed deletes */ ! if (ndeletable > 0) ! _bt_delitems(rel, buf, deletable, ndeletable); ! ! /* Fetch nextpage link before releasing the buffer */ ! nextpage = opaque->btpo_next; ! _bt_relbuf(rel, buf); ! ! /* call vacuum_delay_point while not holding any buffer lock */ ! vacuum_delay_point(); ! ! /* And advance to next page, if any */ ! if (nextpage == P_NONE) ! break; ! buf = _bt_getbuf(rel, nextpage, BT_READ); ! } ! } ! ! /* return statistics */ ! num_pages = RelationGetNumberOfBlocks(rel); ! ! result = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult)); ! result->num_pages = num_pages; ! /* btvacuumcleanup will fill in num_index_tuples */ ! result->tuples_removed = tuples_removed; ! ! PG_RETURN_POINTER(result); ! } ! ! /* ! * Post-VACUUM cleanup. ! * ! * Here, we scan looking for pages we can delete or return to the freelist. ! * ! * Result: a palloc'd struct containing statistical info for VACUUM displays. ! */ ! Datum ! btvacuumcleanup(PG_FUNCTION_ARGS) ! { ! Relation rel = (Relation) PG_GETARG_POINTER(0); ! IndexVacuumCleanupInfo *info = (IndexVacuumCleanupInfo *) PG_GETARG_POINTER(1); ! IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(2); BlockNumber num_pages; BlockNumber blkno; BlockNumber *freePages; --- 529,536 ---- Relation rel = (Relation) PG_GETARG_POINTER(0); IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(1); void *callback_state = (void *) PG_GETARG_POINTER(2); ! IndexBulkDeleteResult *stats; BlockNumber num_pages; BlockNumber blkno; BlockNumber *freePages; *************** *** 660,666 **** MemoryContext oldcontext; bool needLock; ! Assert(stats != NULL); /* * First find out the number of pages in the index. We must acquire the --- 542,553 ---- MemoryContext oldcontext; bool needLock; ! OffsetNumber deletable[MaxOffsetNumber]; ! int ndeletable; ! double tuples_removed = 0; ! bool vacuum_full = false; /* XXX: How to figure this out in bulkdelete? */ ! ! stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult)); /* * First find out the number of pages in the index. We must acquire the *************** *** 711,716 **** --- 598,605 ---- Buffer buf; Page page; BTPageOpaque opaque; + OffsetNumber minoff, + maxoff; vacuum_delay_point(); *************** *** 723,728 **** --- 612,620 ---- LockBuffer(buf, BT_READ); page = BufferGetPage(buf); opaque = (BTPageOpaque) PageGetSpecialPointer(page); + minoff = P_FIRSTDATAKEY(opaque); + maxoff = PageGetMaxOffsetNumber(page); + if (!PageIsNew(page)) _bt_checkpage(rel, buf); if (_bt_page_recyclable(page)) *************** *** 738,744 **** pages_deleted++; } else if ((opaque->btpo_flags & BTP_HALF_DEAD) || ! P_FIRSTDATAKEY(opaque) > PageGetMaxOffsetNumber(page)) { /* Empty, try to delete */ int ndel; --- 630,636 ---- pages_deleted++; } else if ((opaque->btpo_flags & BTP_HALF_DEAD) || ! minoff > maxoff) { /* Empty, try to delete */ int ndel; *************** *** 747,753 **** MemoryContextReset(mycontext); oldcontext = MemoryContextSwitchTo(mycontext); ! ndel = _bt_pagedel(rel, buf, info->vacuum_full); /* count only this page, else may double-count parent */ if (ndel) --- 639,645 ---- MemoryContextReset(mycontext); oldcontext = MemoryContextSwitchTo(mycontext); ! ndel = _bt_pagedel(rel, buf, vacuum_full); /* count only this page, else may double-count parent */ if (ndel) *************** *** 762,768 **** * otherwise would mean we'd have to sort the list of recyclable * pages we're building.) */ ! if (ndel && info->vacuum_full) { if (nFreePages < maxFreePages) freePages[nFreePages++] = blkno; --- 654,660 ---- * otherwise would mean we'd have to sort the list of recyclable * pages we're building.) */ ! if (ndel && vacuum_full) { if (nFreePages < maxFreePages) freePages[nFreePages++] = blkno; *************** *** 773,778 **** --- 665,714 ---- } else if (P_ISLEAF(opaque)) { + OffsetNumber offnum; + + ndeletable = 0; + + /* + * Trade in the initial read lock for a super-exclusive write + * lock on this page. + */ + LockBuffer(buf, BUFFER_LOCK_UNLOCK); + LockBufferForCleanup(buf); + + /* + * Recompute minoff/maxoff, both of which could have changed + * while we weren't holding the lock. + */ + minoff = P_FIRSTDATAKEY(opaque); + maxoff = PageGetMaxOffsetNumber(page); + + /* + * Scan over all items to see which ones need deleted + * according to the callback function. + */ + for (offnum = minoff; + offnum <= maxoff; + offnum = OffsetNumberNext(offnum)) + { + IndexTuple itup; + ItemPointer htup; + + itup = (IndexTuple) + PageGetItem(page, PageGetItemId(page, offnum)); + htup = &(itup->t_tid); + if (callback(htup, callback_state)) + deletable[ndeletable++] = offnum; + } + + /* Apply any needed deletes */ + if (ndeletable > 0) { + _bt_delitems(rel, buf, deletable, ndeletable); + tuples_removed += ndeletable; + } + + /****************/ + /* Count the index entries of live leaf pages */ num_index_tuples += PageGetMaxOffsetNumber(page) + 1 - P_FIRSTDATAKEY(opaque); *************** *** 786,792 **** * acquiring exclusive lock on the index and then rechecking all the * pages; doesn't seem worth it. */ ! if (info->vacuum_full && nFreePages > 0) { BlockNumber new_pages = num_pages; --- 722,728 ---- * acquiring exclusive lock on the index and then rechecking all the * pages; doesn't seem worth it. */ ! if (vacuum_full && nFreePages > 0) { BlockNumber new_pages = num_pages; *************** *** 826,936 **** stats->num_index_tuples = num_index_tuples; stats->pages_deleted = pages_deleted; stats->pages_free = nFreePages; PG_RETURN_POINTER(stats); } /* ! * Restore scan position when btgettuple is called to continue a scan. * ! * This is nontrivial because concurrent insertions might have moved the ! * index tuple we stopped on. We assume the tuple can only have moved to ! * the right from our stop point, because we kept a pin on the buffer, ! * and so no deletion can have occurred on that page. * ! * On entry, we have a pin but no read lock on the buffer that contained ! * the index tuple we stopped the scan on. On exit, we have pin and read ! * lock on the buffer that now contains that index tuple, and the scandesc's ! * current position is updated to point at it. */ ! static void ! _bt_restscan(IndexScanDesc scan) { ! Relation rel = scan->indexRelation; ! BTScanOpaque so = (BTScanOpaque) scan->opaque; ! Buffer buf = so->btso_curbuf; ! Page page; ! ItemPointer current = &(scan->currentItemData); ! OffsetNumber offnum = ItemPointerGetOffsetNumber(current), ! maxoff; ! BTPageOpaque opaque; ! Buffer nextbuf; ! ItemPointer target = &(so->curHeapIptr); ! IndexTuple itup; ! BlockNumber blkno; ! ! /* ! * Reacquire read lock on the buffer. (We should still have a ! * reference-count pin on it, so need not get that.) ! */ ! LockBuffer(buf, BT_READ); ! ! page = BufferGetPage(buf); ! maxoff = PageGetMaxOffsetNumber(page); ! opaque = (BTPageOpaque) PageGetSpecialPointer(page); ! ! /* ! * We use this as flag when first index tuple on page is deleted but we do ! * not move left (this would slowdown vacuum) - so we set ! * current->ip_posid before first index tuple on the current page ! * (_bt_step will move it right)... XXX still needed? ! */ ! if (!ItemPointerIsValid(target)) ! { ! ItemPointerSetOffsetNumber(current, ! OffsetNumberPrev(P_FIRSTDATAKEY(opaque))); ! return; ! } ! ! /* ! * The item we were on may have moved right due to insertions. Find it ! * again. We use the heap TID to identify the item uniquely. ! */ ! for (;;) ! { ! /* Check for item on this page */ ! for (; ! offnum <= maxoff; ! offnum = OffsetNumberNext(offnum)) ! { ! itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum)); ! if (BTTidSame(itup->t_tid, *target)) ! { ! /* Found it */ ! current->ip_posid = offnum; ! return; ! } ! } ! /* ! * The item we're looking for moved right at least one page, so move ! * right. We are careful here to pin and read-lock the next non-dead ! * page before releasing the current one. This ensures that a ! * concurrent btbulkdelete scan cannot pass our position --- if it ! * did, it might be able to reach and delete our target item before we ! * can find it again. ! */ ! if (P_RIGHTMOST(opaque)) ! elog(ERROR, "failed to re-find previous key in \"%s\"", ! RelationGetRelationName(rel)); ! /* Advance to next non-dead page --- there must be one */ ! nextbuf = InvalidBuffer; ! for (;;) ! { ! blkno = opaque->btpo_next; ! nextbuf = _bt_relandgetbuf(rel, nextbuf, blkno, BT_READ); ! page = BufferGetPage(nextbuf); ! opaque = (BTPageOpaque) PageGetSpecialPointer(page); ! if (!P_IGNORE(opaque)) ! break; ! if (P_RIGHTMOST(opaque)) ! elog(ERROR, "fell off the end of \"%s\"", ! RelationGetRelationName(rel)); ! } ! _bt_relbuf(rel, buf); ! so->btso_curbuf = buf = nextbuf; ! maxoff = PageGetMaxOffsetNumber(page); ! offnum = P_FIRSTDATAKEY(opaque); ! ItemPointerSet(current, blkno, offnum); ! } } --- 762,790 ---- stats->num_index_tuples = num_index_tuples; stats->pages_deleted = pages_deleted; stats->pages_free = nFreePages; + stats->tuples_removed = tuples_removed; PG_RETURN_POINTER(stats); } /* ! * Post-VACUUM cleanup. ! * ! * Here, we scan looking for pages we can delete or return to the freelist. * ! * Result: a palloc'd struct containing statistical info for VACUUM displays. * ! * This is a no-op now. btvacuumcleanup should be removed completely ! * before applying the patch. I've left it for now so to avoid catalog ! * changes, to make it possible to test patched and original versions ! * with the same data directory. */ ! Datum ! btvacuumcleanup(PG_FUNCTION_ARGS) { ! Relation rel = (Relation) PG_GETARG_POINTER(0); ! IndexVacuumCleanupInfo *info = (IndexVacuumCleanupInfo *) PG_GETARG_POINTER(1); ! IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(2); ! PG_RETURN_POINTER(stats); } Index: src/backend/access/nbtree/nbtsearch.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/backend/access/nbtree/nbtsearch.c,v retrieving revision 1.104 diff -c -r1.104 nbtsearch.c *** src/backend/access/nbtree/nbtsearch.c 5 Mar 2006 15:58:21 -0000 1.104 --- src/backend/access/nbtree/nbtsearch.c 1 May 2006 18:34:28 -0000 *************** *** 22,28 **** static Buffer _bt_walk_left(Relation rel, Buffer buf); ! static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir); /* --- 22,30 ---- static Buffer _bt_walk_left(Relation rel, Buffer buf); ! ! static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir); ! static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber firstOffset); /* *************** *** 411,472 **** } /* ! * _bt_next() -- Get the next item in a scan. ! * ! * On entry, we have a valid currentItemData in the scan, and a ! * read lock and pin count on the page that contains that item. ! * We return the next item in the scan, or false if no more. ! * On successful exit, the page containing the new item is locked ! * and pinned; on failure exit, no lock or pin is held. ! */ ! bool ! _bt_next(IndexScanDesc scan, ScanDirection dir) ! { ! Relation rel; ! Buffer buf; ! Page page; ! OffsetNumber offnum; ! ItemPointer current; ! BTScanOpaque so; ! bool continuescan; ! ! rel = scan->indexRelation; ! so = (BTScanOpaque) scan->opaque; ! current = &(scan->currentItemData); ! ! /* we still have the buffer pinned and locked */ ! buf = so->btso_curbuf; ! Assert(BufferIsValid(buf)); ! ! do ! { ! /* step one tuple in the appropriate direction */ ! if (!_bt_step(scan, &buf, dir)) ! return false; ! ! /* current is the next candidate tuple to return */ ! offnum = ItemPointerGetOffsetNumber(current); ! page = BufferGetPage(buf); ! ! if (_bt_checkkeys(scan, page, offnum, dir, &continuescan)) ! { ! /* tuple passes all scan key conditions, so return it */ ! return true; ! } ! ! /* This tuple doesn't pass, but there might be more that do */ ! } while (continuescan); ! ! /* No more items, so close down the current-item info */ ! ItemPointerSetInvalid(current); ! so->btso_curbuf = InvalidBuffer; ! _bt_relbuf(rel, buf); ! ! return false; ! } ! ! /* ! * _bt_first() -- Find the first item in a scan. * * We need to be clever about the direction of scan, the search * conditions, and the tree ordering. We find the first item (or, --- 413,419 ---- } /* ! * _bt_firstpage() -- Read the first set of items in a scan. * * We need to be clever about the direction of scan, the search * conditions, and the tree ordering. We find the first item (or, *************** *** 481,487 **** * in locating the scan start position. */ bool ! _bt_first(IndexScanDesc scan, ScanDirection dir) { Relation rel = scan->indexRelation; BTScanOpaque so = (BTScanOpaque) scan->opaque; --- 428,434 ---- * in locating the scan start position. */ bool ! _bt_firstpage(IndexScanDesc scan, ScanDirection dir) { Relation rel = scan->indexRelation; BTScanOpaque so = (BTScanOpaque) scan->opaque; *************** *** 489,506 **** Page page; BTStack stack; OffsetNumber offnum; - ItemPointer current; BlockNumber blkno; StrategyNumber strat; bool res; bool nextkey; bool goback; - bool continuescan; ScanKey startKeys[INDEX_MAX_KEYS]; ScanKeyData scankeys[INDEX_MAX_KEYS]; int keysCount = 0; int i; StrategyNumber strat_total; pgstat_count_index_scan(&scan->xs_pgstat_info); --- 436,456 ---- Page page; BTStack stack; OffsetNumber offnum; BlockNumber blkno; StrategyNumber strat; bool res; bool nextkey; bool goback; ScanKey startKeys[INDEX_MAX_KEYS]; ScanKeyData scankeys[INDEX_MAX_KEYS]; int keysCount = 0; int i; StrategyNumber strat_total; + BTPageOpaque opaque; + + #ifdef NBTREE_DEBUG + elog(LOG, "_bt_firstpage: dir = %d", dir); + #endif pgstat_count_index_scan(&scan->xs_pgstat_info); *************** *** 643,650 **** * the tree. Walk down that edge to the first or last key, and scan from * there. */ ! if (keysCount == 0) ! return _bt_endpoint(scan, dir); /* * We want to start the scan somewhere within the index. Set up an --- 593,614 ---- * the tree. Walk down that edge to the first or last key, and scan from * there. */ ! if (keysCount == 0) { ! so->currPos.buf = _bt_get_endpoint(rel, 0, ScanDirectionIsBackward(dir)); ! if(!BufferIsValid(so->currPos.buf)) ! return false; /* The index was completely empty */ ! ! if(ScanDirectionIsForward(dir)) { ! opaque = (BTPageOpaque) PageGetSpecialPointer(BufferGetPage(so->currPos.buf)); ! so->currPos.nextPage = opaque->btpo_next; ! ! res = _bt_readpage(scan, ForwardScanDirection, InvalidOffsetNumber); ! } else ! res = _bt_readpage(scan, BackwardScanDirection, InvalidOffsetNumber); ! ! LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK); ! return res; ! } /* * We want to start the scan somewhere within the index. Set up an *************** *** 847,871 **** /* don't need to keep the stack around... */ _bt_freestack(stack); - current = &(scan->currentItemData); - if (!BufferIsValid(buf)) { /* Only get here if index is completely empty */ ! ItemPointerSetInvalid(current); ! so->btso_curbuf = InvalidBuffer; return false; } /* remember which buffer we have pinned */ ! so->btso_curbuf = buf; /* position to the precise item on the page */ offnum = _bt_binsrch(rel, buf, keysCount, scankeys, nextkey); page = BufferGetPage(buf); blkno = BufferGetBlockNumber(buf); ! ItemPointerSet(current, blkno, offnum); /* * If nextkey = false, we are positioned at the first item >= scan key, or --- 811,835 ---- /* don't need to keep the stack around... */ _bt_freestack(stack); if (!BufferIsValid(buf)) { /* Only get here if index is completely empty */ ! so->currPos.buf = InvalidBuffer; return false; } /* remember which buffer we have pinned */ ! so->currPos.buf = buf; /* position to the precise item on the page */ offnum = _bt_binsrch(rel, buf, keysCount, scankeys, nextkey); page = BufferGetPage(buf); blkno = BufferGetBlockNumber(buf); ! ! #ifdef NBTREE_DEBUG ! elog(LOG, "_bt_firstpage: goback = %d, nextkey = %d", goback, nextkey); ! #endif /* * If nextkey = false, we are positioned at the first item >= scan key, or *************** *** 880,1053 **** * * The actually desired starting point is either this item or the prior * one, or in the end-of-page case it's the first item on the next page or ! * the last item on this page. We apply _bt_step if needed to get to the ! * right place. * ! * If _bt_step fails (meaning we fell off the end of the index in one * direction or the other), then there are no matches so we just return * false. */ ! if (goback) ! { ! /* _bt_step will do the right thing if we are at end-of-page */ ! if (!_bt_step(scan, &buf, BackwardScanDirection)) ! return false; ! } ! else { ! /* If we're at end-of-page, must step forward to next page */ ! if (offnum > PageGetMaxOffsetNumber(page)) { ! if (!_bt_step(scan, &buf, ForwardScanDirection)) return false; } ! } ! /* okay, current item pointer for the scan is right */ ! offnum = ItemPointerGetOffsetNumber(current); ! page = BufferGetPage(buf); ! /* is the first item actually acceptable? */ ! if (_bt_checkkeys(scan, page, offnum, dir, &continuescan)) ! { ! /* yes, return it */ ! res = true; ! } ! else if (continuescan) { ! /* no, but there might be another one that is */ ! res = _bt_next(scan, dir); } else - { - /* no tuples in the index match this scan key */ - ItemPointerSetInvalid(current); - so->btso_curbuf = InvalidBuffer; - _bt_relbuf(rel, buf); res = false; - } return res; } /* ! * _bt_step() -- Step one item in the requested direction in a scan on * the tree. * ! * *bufP is the current buffer (read-locked and pinned). If we change ! * pages, it's updated appropriately. * * If successful, update scan's currentItemData and return true. * If no adjacent record exists in the requested direction, * release buffer pin/locks and return false. */ ! bool ! _bt_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir) { - ItemPointer current = &(scan->currentItemData); BTScanOpaque so = (BTScanOpaque) scan->opaque; Relation rel; Page page; BTPageOpaque opaque; OffsetNumber offnum, maxoff; - BlockNumber blkno; ! /* ! * Don't use ItemPointerGetOffsetNumber or you risk to get assertion due ! * to ability of ip_posid to be equal 0. ! */ ! offnum = current->ip_posid; ! ! page = BufferGetPage(*bufP); ! maxoff = PageGetMaxOffsetNumber(page); if (ScanDirectionIsForward(dir)) { ! if (offnum < maxoff) ! offnum = OffsetNumberNext(offnum); ! else { ! /* Walk right to the next page with data */ ! rel = scan->indexRelation; ! opaque = (BTPageOpaque) PageGetSpecialPointer(page); ! for (;;) { ! /* if we're at end of scan, release the buffer and return */ ! if (P_RIGHTMOST(opaque)) ! { ! _bt_relbuf(rel, *bufP); ! ItemPointerSetInvalid(current); ! *bufP = so->btso_curbuf = InvalidBuffer; ! return false; ! } ! /* step right one page */ ! blkno = opaque->btpo_next; ! *bufP = _bt_relandgetbuf(rel, *bufP, blkno, BT_READ); ! page = BufferGetPage(*bufP); ! opaque = (BTPageOpaque) PageGetSpecialPointer(page); ! if (!P_IGNORE(opaque)) ! { ! /* done if it's not empty */ ! maxoff = PageGetMaxOffsetNumber(page); ! offnum = P_FIRSTDATAKEY(opaque); ! if (offnum <= maxoff) ! break; ! } } } } else { /* backwards scan */ ! opaque = (BTPageOpaque) PageGetSpecialPointer(page); ! if (offnum > P_FIRSTDATAKEY(opaque)) ! offnum = OffsetNumberPrev(offnum); ! else { /* ! * Walk left to the next page with data. This is much more ! * complex than the walk-right case because of the possibility ! * that the page to our left splits while we are in flight to it, ! * plus the possibility that the page we were on gets deleted ! * after we leave it. See nbtree/README for details. */ ! rel = scan->indexRelation; ! for (;;) { ! *bufP = _bt_walk_left(rel, *bufP); ! ! /* if we're at end of scan, return failure */ ! if (*bufP == InvalidBuffer) ! { ! ItemPointerSetInvalid(current); ! so->btso_curbuf = InvalidBuffer; ! return false; ! } ! page = BufferGetPage(*bufP); ! opaque = (BTPageOpaque) PageGetSpecialPointer(page); ! ! /* ! * Okay, we managed to move left to a non-deleted page. Done ! * if it's not half-dead and not empty. Else loop back and do ! * it all again. ! */ ! if (!P_IGNORE(opaque)) ! { ! maxoff = PageGetMaxOffsetNumber(page); ! offnum = maxoff; ! if (maxoff >= P_FIRSTDATAKEY(opaque)) ! break; ! } } } } - - /* Update scan state */ - so->btso_curbuf = *bufP; - blkno = BufferGetBlockNumber(*bufP); - ItemPointerSet(current, blkno, offnum); - return true; } --- 844,1143 ---- * * The actually desired starting point is either this item or the prior * one, or in the end-of-page case it's the first item on the next page or ! * the last item on this page. We apply _bt_steppage if needed to get to ! * the right page. * ! * If _bt_steppage fails (meaning we fell off the end of the index in one * direction or the other), then there are no matches so we just return * false. */ ! ! if(goback) ! offnum = OffsetNumberPrev(offnum); ! ! opaque = (BTPageOpaque) PageGetSpecialPointer(page); ! ! so->currPos.nextPage = ! P_RIGHTMOST(opaque) ? InvalidBlockNumber : opaque->btpo_next; ! ! if(offnum > PageGetMaxOffsetNumber(page)) { ! if(ScanDirectionIsForward(dir)) { ! /* The desired starting point is the first item on the next page */ ! if(!_bt_steppage(scan, ForwardScanDirection)) return false; + + res = _bt_readpage(scan, ForwardScanDirection, InvalidOffsetNumber); + } else { + /* The desired starting point is the last item on this page */ + res = _bt_readpage(scan, BackwardScanDirection, InvalidOffsetNumber); } ! } else ! if(offnum < P_FIRSTDATAKEY(opaque)) ! { ! /* The desired starting point is the last item on the previous page ! * (regardless of the direction) */ ! if(!_bt_steppage(scan, BackwardScanDirection)) ! return false; ! page = BufferGetPage(so->currPos.buf); ! res = _bt_readpage(scan, dir, PageGetMaxOffsetNumber(page)); ! } else ! res = _bt_readpage(scan, dir, offnum); ! ! LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK); ! return res; ! } ! ! ! /* ! * _bt_nextpage -- Move to the next page, and read all matching items on it. ! * ! * Caller should have btso_curbuf pinned. On return, btso_curbuf points ! * to a new pinned buffer. The old one is unpinned. ! * ! * Returns false if we have reached end of scan. ! */ ! bool ! _bt_nextpage(IndexScanDesc scan, ScanDirection dir) ! { ! bool res; ! BTScanOpaque so; ! so = (BTScanOpaque) scan->opaque; ! ! /* we still have the buffer pinned */ ! Assert(BufferIsValid(so->currPos.buf)); ! ! LockBuffer(so->currPos.buf, BT_READ); ! ! /* kill any items we now know to be dead, before moving to the next page */ ! if(so->numDeadItems > 0) ! _bt_killitems(so->currPos.buf, so->deadOffsets, so->deadHeapPtrs, so->numDeadItems); ! ! so->currPos.numItems = 0; ! so->numDeadItems = 0; ! ! if(_bt_steppage(scan, dir)) { ! res = _bt_readpage(scan, dir, InvalidOffsetNumber); ! ! /* Unlock page, but keep pin. */ ! LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK); } else res = false; return res; } /* ! * _bt_readpage -- Read all matching items on current page ! * ! * Caller should have btso_curbuf pinned and locked. ! * ! * Returns false if we have reached end of scan. ! */ ! static bool ! _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum) ! { ! Relation rel; ! Page page; ! OffsetNumber maxoff; ! BTScanOpaque so; ! bool continuescan; ! BTPageOpaque opaque; ! bool morepages = true; ! ! #ifdef NBTREE_DEBUG ! elog(LOG, "_bt_readpage: dir = %d offnum = %d", dir, offnum); ! #endif ! ! rel = scan->indexRelation; ! so = (BTScanOpaque) scan->opaque; ! ! /* we still have the buffer pinned and locked */ ! Assert(BufferIsValid(so->currPos.buf)); ! ! so->currPos.numItems = 0; ! ! page = BufferGetPage(so->currPos.buf); ! opaque = (BTPageOpaque) PageGetSpecialPointer(page); ! maxoff = PageGetMaxOffsetNumber(page); ! ! if(ScanDirectionIsForward(dir)) ! { ! if(offnum == InvalidOffsetNumber) ! offnum = P_FIRSTDATAKEY(opaque); ! ! while(offnum <= maxoff) ! { ! if (_bt_checkkeys(scan, page, offnum, dir, &continuescan)) ! { ! /* tuple passes all scan key conditions, so return it */ ! ItemId iid = PageGetItemId(page, offnum); ! IndexTuple tuple = (IndexTuple) PageGetItem(page, iid); ! so->currPos.offsets[so->currPos.numItems] = offnum; ! so->currPos.heapPtrs[so->currPos.numItems] = tuple->t_tid; ! so->currPos.numItems++; ! } ! if(!continuescan) { ! morepages = false; ! break; ! } ! ! offnum = OffsetNumberNext(offnum); ! } ! ! /* if we're at end of scan, release the buffer and return */ ! if (P_RIGHTMOST(opaque)) ! { ! morepages = false; ! } ! } else { ! if(offnum == InvalidOffsetNumber) ! offnum = maxoff; ! ! while(offnum >= P_FIRSTDATAKEY(opaque)) ! { ! if (_bt_checkkeys(scan, page, offnum, dir, &continuescan)) ! { ! /* tuple passes all scan key conditions, so return it */ ! ItemId iid = PageGetItemId(page, offnum); ! IndexTuple tuple = (IndexTuple) PageGetItem(page, iid); ! so->currPos.offsets[so->currPos.numItems] = offnum; ! so->currPos.heapPtrs[so->currPos.numItems] = tuple->t_tid; ! so->currPos.numItems++; ! } ! if(!continuescan) { ! morepages = false; ! break; ! } ! ! offnum = OffsetNumberPrev(offnum); ! } ! } ! /* We're done with this page, but there's more pages pages left to read. ! * Or if this was a backward scan, there might actually not be any more ! * pages left, but we'll find that out on next call to _bt_steppage. ! */ ! ! #ifdef NBTREE_DEBUG ! elog(LOG, "_bt_readpage: returns, morepages = %d, numItems = %d", morepages, so->currPos.numItems); ! #endif ! ! return morepages; ! } ! ! /* ! * _bt_steppage() -- Step one page in the requested direction in a scan on * the tree. * ! * so->currPos.buf is the current buffer (read-locked and pinned). ! * If we change pages, it's updated appropriately. * * If successful, update scan's currentItemData and return true. * If no adjacent record exists in the requested direction, * release buffer pin/locks and return false. */ ! static bool ! _bt_steppage(IndexScanDesc scan, ScanDirection dir) { BTScanOpaque so = (BTScanOpaque) scan->opaque; Relation rel; Page page; BTPageOpaque opaque; OffsetNumber offnum, maxoff; ! page = BufferGetPage(so->currPos.buf); ! rel = scan->indexRelation; ! opaque = (BTPageOpaque) PageGetSpecialPointer(page); if (ScanDirectionIsForward(dir)) { ! /* Walk right to the next page with data */ ! ! for (;;) { ! /* if we're at end of scan, release the buffer and return */ ! if(so->currPos.nextPage == InvalidBlockNumber) { ! _bt_relbuf(rel, so->currPos.buf); ! so->currPos.buf = InvalidBuffer; ! return false; } + + /* step right one page */ + so->currPos.buf = _bt_relandgetbuf(rel, so->currPos.buf, so->currPos.nextPage, BT_READ); + + page = BufferGetPage(so->currPos.buf); + opaque = (BTPageOpaque) PageGetSpecialPointer(page); + + /* If the current page is split after _bt_steppage + * and _bt_readpage, the next time we step to the next page, + * nextPage points to the new split page. We don't want + * to step to the new split page, since it only contains + * items that we've already seen. Therefore we capture + * nextPage at this point and use that on next _bt_steppage + * call. + * + * Backward scans don't have this problem, since splits only + * move items to the right. In fact, capturing btpo_prev + * like this would be wrong. + */ + if (P_RIGHTMOST(opaque)) + so->currPos.nextPage = InvalidBlockNumber; + else + so->currPos.nextPage = opaque->btpo_next; + + if (!P_IGNORE(opaque)) + { + /* done if it's not empty */ + maxoff = PageGetMaxOffsetNumber(page); + offnum = P_FIRSTDATAKEY(opaque); + if (offnum <= maxoff) + break; + } } } else { /* backwards scan */ ! ! /* ! * Walk left to the next page with data. This is much more ! * complex than the walk-right case because of the possibility ! * that the page to our left splits while we are in flight to it, ! * plus the possibility that the page we were on gets deleted ! * after we leave it. See nbtree/README for details. ! */ ! for (;;) { + so->currPos.buf = _bt_walk_left(rel, so->currPos.buf); + + /* if we're at end of scan, return failure */ + if (so->currPos.buf == InvalidBuffer) + return false; + + page = BufferGetPage(so->currPos.buf); + opaque = (BTPageOpaque) PageGetSpecialPointer(page); + /* ! * Okay, we managed to move left to a non-deleted page. Done ! * if it's not half-dead and not empty. Else loop back and do ! * it all again. */ ! if (!P_IGNORE(opaque)) { ! maxoff = PageGetMaxOffsetNumber(page); ! offnum = maxoff; ! if (maxoff >= P_FIRSTDATAKEY(opaque)) ! break; } } } return true; } *************** *** 1170,1175 **** --- 1260,1306 ---- } /* + * _bt_killitems - sets LP_DELETE bit for items + * + * buf - index page, pinned and read locked. (a read lock is enough for setting hint bits) + * deadOffsets - an array of offsets on the index page that are to be marked + * deadHeapPtrs - an array of heap pointers, corresponding deadOffsets. + * numDeadItems - number of elements in the above arrays + * + * If there's no item on an offset, or it doesn't point to the heap + * tuple indicated in deadHeapPtrs, the item is ignored. + */ + void + _bt_killitems(Buffer buf, OffsetNumber *deadOffsets, ItemPointer deadHeapPtrs, int numDeadItems) + { + int i; + bool killedsomething = false; + + /* + * Yes, so mark it by setting the LP_DELETE bit in the item flags. + */ + Page page = BufferGetPage(buf); + for(i=0; i < numDeadItems; i++) { + OffsetNumber offnum = deadOffsets[i]; + ItemId iid = PageGetItemId(page, offnum); + IndexTuple ituple = (IndexTuple) PageGetItem(page, iid); + + /* Is the tuple still there? It might not be, if the page was split */ + if(ItemPointerEquals(&ituple->t_tid, &deadHeapPtrs[i])) { + PageGetItemId(page, offnum)->lp_flags |= LP_DELETE; + killedsomething = true; + } + } + /* + * Since this can be redone later if needed, it's treated the same + * as a commit-hint-bit status update for heap tuples: we mark the + * buffer dirty but don't make a WAL log entry. + */ + if(killedsomething) + SetBufferCommitInfoNeedsSave(buf); + } + + /* * _bt_get_endpoint() -- Find the first or last page on a given tree level * * If the index is empty, we will return InvalidBuffer; any other failure *************** *** 1248,1361 **** return buf; } - - /* - * _bt_endpoint() -- Find the first or last key in the index, and scan - * from there to the first key satisfying all the quals. - * - * This is used by _bt_first() to set up a scan when we've determined - * that the scan must start at the beginning or end of the index (for - * a forward or backward scan respectively). - */ - static bool - _bt_endpoint(IndexScanDesc scan, ScanDirection dir) - { - Relation rel; - Buffer buf; - Page page; - BTPageOpaque opaque; - ItemPointer current; - OffsetNumber maxoff; - OffsetNumber start; - BlockNumber blkno; - BTScanOpaque so; - bool res; - bool continuescan; - - rel = scan->indexRelation; - current = &(scan->currentItemData); - so = (BTScanOpaque) scan->opaque; - - /* - * Scan down to the leftmost or rightmost leaf page. This is a simplified - * version of _bt_search(). We don't maintain a stack since we know we - * won't need it. - */ - buf = _bt_get_endpoint(rel, 0, ScanDirectionIsBackward(dir)); - - if (!BufferIsValid(buf)) - { - /* empty index... */ - ItemPointerSetInvalid(current); - so->btso_curbuf = InvalidBuffer; - return false; - } - - blkno = BufferGetBlockNumber(buf); - page = BufferGetPage(buf); - opaque = (BTPageOpaque) PageGetSpecialPointer(page); - Assert(P_ISLEAF(opaque)); - - maxoff = PageGetMaxOffsetNumber(page); - - if (ScanDirectionIsForward(dir)) - { - /* There could be dead pages to the left, so not this: */ - /* Assert(P_LEFTMOST(opaque)); */ - - start = P_FIRSTDATAKEY(opaque); - } - else if (ScanDirectionIsBackward(dir)) - { - Assert(P_RIGHTMOST(opaque)); - - start = PageGetMaxOffsetNumber(page); - if (start < P_FIRSTDATAKEY(opaque)) /* watch out for empty page */ - start = P_FIRSTDATAKEY(opaque); - } - else - { - elog(ERROR, "invalid scan direction: %d", (int) dir); - start = 0; /* keep compiler quiet */ - } - - ItemPointerSet(current, blkno, start); - /* remember which buffer we have pinned */ - so->btso_curbuf = buf; - - /* - * Left/rightmost page could be empty due to deletions, if so step till we - * find a nonempty page. - */ - if (start > maxoff) - { - if (!_bt_step(scan, &buf, dir)) - return false; - start = ItemPointerGetOffsetNumber(current); - page = BufferGetPage(buf); - } - - /* - * Okay, we are on the first or last tuple. Does it pass all the quals? - */ - if (_bt_checkkeys(scan, page, start, dir, &continuescan)) - { - /* yes, return it */ - res = true; - } - else if (continuescan) - { - /* no, but there might be another one that does */ - res = _bt_next(scan, dir); - } - else - { - /* no tuples in the index match this scan key */ - ItemPointerSetInvalid(current); - so->btso_curbuf = InvalidBuffer; - _bt_relbuf(rel, buf); - res = false; - } - - return res; - } --- 1379,1381 ---- Index: src/backend/access/nbtree/nbtutils.c =================================================================== RCS file: /projects/cvsroot/pgsql/src/backend/access/nbtree/nbtutils.c,v retrieving revision 1.72 diff -c -r1.72 nbtutils.c *** src/backend/access/nbtree/nbtutils.c 5 Mar 2006 15:58:21 -0000 1.72 --- src/backend/access/nbtree/nbtutils.c 1 May 2006 18:34:28 -0000 *************** *** 826,828 **** --- 826,887 ---- return result; } + + + /** BTScanPos support routines **/ + + void + _bt_initscanpos(BTScanPos pos) + { + pos->buf = InvalidBuffer; + pos->offsets = palloc(sizeof(OffsetNumber) * MAX_TIDS); + pos->heapPtrs = palloc(sizeof(ItemPointerData) * MAX_TIDS); + pos->numItems = 0; + pos->itemIndex = 0; + pos->nextPage = InvalidBlockNumber; + } + + void + _bt_destroyscanpos(BTScanPos pos) + { + if(pos->offsets != NULL) { + pfree(pos->offsets); + pos->offsets = NULL; + } + + if(pos->heapPtrs != NULL) { + pfree(pos->heapPtrs); + pos->heapPtrs = NULL; + } + } + + void + _bt_releasescanpos(BTScanPos pos) + { + Assert(BufferIsValid(pos->buf)); + ReleaseBuffer(pos->buf); + pos->buf = InvalidBuffer; + pos->numItems = 0; + } + + void + _bt_copyscanpos(BTScanPos src, BTScanPos dst) + { + int i; + /* bump pin on current buffer for assignment to mark buffer */ + IncrBufferRefCount(src->buf); + + dst->buf = src->buf; + + /* Copy the arrays. We can skip the items from 0 to itemIndex, + * since the scan is already past those. */ + for(i = src->itemIndex; i < src->numItems; i++) + { + dst->offsets[i] = src->offsets[i]; + dst->heapPtrs[i] = src->heapPtrs[i]; + } + dst->numItems = src->numItems; + dst->itemIndex = src->itemIndex; + dst->nextPage = src->nextPage; + dst->morePages = src->morePages; + } Index: src/include/access/nbtree.h =================================================================== RCS file: /projects/cvsroot/pgsql/src/include/access/nbtree.h,v retrieving revision 1.96 diff -c -r1.96 nbtree.h *** src/include/access/nbtree.h 13 Apr 2006 03:53:05 -0000 1.96 --- src/include/access/nbtree.h 1 May 2006 18:34:30 -0000 *************** *** 14,24 **** --- 14,32 ---- #ifndef NBTREE_H #define NBTREE_H + /* #define NBTREE_DEBUG */ + #include "access/itup.h" #include "access/relscan.h" #include "access/sdir.h" #include "access/xlogutils.h" + /* Maximum number of items on an index page */ + /* TODO: This is a very conservative estimate. + * There's no way a real index page can hold this many entries. + */ + #define MAX_TIDS 1024 + /* * BTPageOpaqueData -- At the end of every page, we store a pointer * to both siblings in the tree. This is used to do forward/backward *************** *** 335,340 **** --- 343,375 ---- typedef BTStackData *BTStack; + + /* BTScanPos contains the state needed to stop on a tuple, and + * continue the scan later. It's handy to have that in a single struct, + * for markpos/restrpos support. + * + * Whenever buf is valid, the buffer is pinned. + */ + typedef struct BTScanPosData { + Buffer buf; + + /* offsets and heapPtrs are a pair of arrays, in scan order (low key -> high key + * for forward scan, high key -> low key for backward scan). + * offsets indicate the offset of the index tuple on the index page, + * and the corresponding entries in heapPtrs are pointers to the heap. + */ + OffsetNumber *offsets; + ItemPointer heapPtrs; + int numItems; + int itemIndex; + BlockNumber nextPage; /* Only used in forward scans */ + bool morePages; + } BTScanPosData; + + typedef BTScanPosData *BTScanPos; + + #define ScanPosIsValid(scanpos) BufferIsValid((scanpos)->buf) + /* * BTScanOpaqueData is used to remember which buffers we're currently * examining in an indexscan. Between calls to btgettuple or btgetmulti, *************** *** 352,365 **** typedef struct BTScanOpaqueData { - Buffer btso_curbuf; - Buffer btso_mrkbuf; - ItemPointerData curHeapIptr; - ItemPointerData mrkHeapIptr; /* these fields are set by _bt_preprocess_keys(): */ bool qual_ok; /* false if qual can never be satisfied */ int numberOfKeys; /* number of preprocessed scan keys */ ScanKey keyData; /* array of preprocessed scan keys */ } BTScanOpaqueData; typedef BTScanOpaqueData *BTScanOpaque; --- 387,405 ---- typedef struct BTScanOpaqueData { /* these fields are set by _bt_preprocess_keys(): */ bool qual_ok; /* false if qual can never be satisfied */ int numberOfKeys; /* number of preprocessed scan keys */ ScanKey keyData; /* array of preprocessed scan keys */ + + BTScanPosData currPos; + BTScanPosData markPos; + + OffsetNumber *deadOffsets; /* These two arrays contain index ptr heap ptr pairs */ + ItemPointer deadHeapPtrs; + int numDeadItems; + + bool scanInitialized; } BTScanOpaqueData; typedef BTScanOpaqueData *BTScanOpaque; *************** *** 424,434 **** ScanKey scankey, bool nextkey); extern int32 _bt_compare(Relation rel, int keysz, ScanKey scankey, Page page, OffsetNumber offnum); - extern bool _bt_next(IndexScanDesc scan, ScanDirection dir); - extern bool _bt_first(IndexScanDesc scan, ScanDirection dir); - extern bool _bt_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir); extern Buffer _bt_get_endpoint(Relation rel, uint32 level, bool rightmost); /* * prototypes for functions in nbtutils.c */ --- 464,476 ---- ScanKey scankey, bool nextkey); extern int32 _bt_compare(Relation rel, int keysz, ScanKey scankey, Page page, OffsetNumber offnum); extern Buffer _bt_get_endpoint(Relation rel, uint32 level, bool rightmost); + extern bool _bt_nextpage(IndexScanDesc scan, ScanDirection dir); + extern bool _bt_firstpage(IndexScanDesc scan, ScanDirection dir); + + extern void _bt_killitems(Buffer buf, OffsetNumber *deadOffsets, ItemPointer deadHeapPtrs, int numDeadItems); + /* * prototypes for functions in nbtutils.c */ *************** *** 441,446 **** --- 483,493 ---- Page page, OffsetNumber offnum, ScanDirection dir, bool *continuescan); + extern void _bt_initscanpos(BTScanPos pos); + extern void _bt_releasescanpos(BTScanPos pos); + extern void _bt_destroyscanpos(BTScanPos pos); + extern void _bt_copyscanpos(BTScanPos src, BTScanPos dst); + /* * prototypes for functions in nbtsort.c */