diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c index d708117a40..339dfc0763 100644 --- a/src/backend/storage/page/bufpage.c +++ b/src/backend/storage/page/bufpage.c @@ -411,51 +411,187 @@ PageRestoreTempPage(Page tempPage, Page oldPage) } /* - * sorting support for PageRepairFragmentation and PageIndexMultiDelete + * Item pruning support for PageRepairFragmentation and PageIndexMultiDelete */ -typedef struct itemIdSortData +typedef struct itemIdCompactData { uint16 offsetindex; /* linp array index */ int16 itemoff; /* page offset of item data */ uint16 alignedlen; /* MAXALIGN(item data len) */ -} itemIdSortData; -typedef itemIdSortData *itemIdSort; - -static int -itemoffcompare(const void *itemidp1, const void *itemidp2) -{ - /* Sort in decreasing itemoff order */ - return ((itemIdSort) itemidp2)->itemoff - - ((itemIdSort) itemidp1)->itemoff; -} +} itemIdCompactData; +typedef itemIdCompactData *itemIdCompact; /* * After removing or marking some line pointers unused, move the tuples to * remove the gaps caused by the removed items. + * + * Callers may pass 'presorted' as true if the itemidbase array is sorted in + * ascending order of itemoff. This allows a slightly more optimal code path + * to be taken. */ static void -compactify_tuples(itemIdSort itemidbase, int nitems, Page page) +compactify_tuples(itemIdCompact itemidbase, int nitems, Page page, bool presorted) { PageHeader phdr = (PageHeader) page; Offset upper; int i; - /* sort itemIdSortData array into decreasing itemoff order */ - qsort((char *) itemidbase, nitems, sizeof(itemIdSortData), - itemoffcompare); - - upper = phdr->pd_special; - for (i = 0; i < nitems; i++) + if (presorted) { - itemIdSort itemidptr = &itemidbase[i]; - ItemId lp; + Offset copy_tail; + Offset copy_head; + itemIdCompact itemidptr; + + /* + * The order of lineitem offsets is already in the optimal order, i.e, + * lower item pointers have a lower offset. This allows us to move + * the tuples up to the end of the heap in reverse order without + * having to worry about overwriting memory of other tuples during the + * move operation. + */ + + /* + * Double check the caller didn't mess up while setting the presorted + * flag to true. + */ +#ifdef USE_ASSERT_CHECKING + { + Offset lastoff = 0; + + for (i = 0; i < nitems; i++) + { + itemidptr = &itemidbase[i]; + if (lastoff > itemidptr->itemoff) + Assert(false); + lastoff = itemidptr->itemoff; + } + } +#endif /* USE_ASSERT_CHECKING */ - lp = PageGetItemId(page, itemidptr->offsetindex + 1); - upper -= itemidptr->alignedlen; + /* + * Do the tuple compactification. Collapse memmove calls for adjacent + * tuples. + */ + upper = phdr->pd_special; + copy_tail = copy_head = itemidbase[nitems - 1].itemoff + itemidbase[nitems - 1].alignedlen; + for (i = nitems - 1; i >= 0; i--) + { + ItemId lp; + + itemidptr = &itemidbase[i]; + lp = PageGetItemId(page, itemidptr->offsetindex + 1); + + if (copy_head != itemidptr->itemoff + itemidptr->alignedlen) + { + memmove((char *) page + upper, + page + copy_head, + copy_tail - copy_head); + copy_tail = itemidptr->itemoff + itemidptr->alignedlen; + } + upper -= itemidptr->alignedlen; + copy_head = itemidptr->itemoff; + + lp->lp_off = upper; + + } + + /* move the remaining chunk */ memmove((char *) page + upper, - (char *) page + itemidptr->itemoff, - itemidptr->alignedlen); - lp->lp_off = upper; + page + copy_head, + copy_tail - copy_head); + } + else + { + Offset copy_tail; + Offset copy_head; + itemIdCompact itemidptr; + PGAlignedBlock scratch; + char *scratchptr = scratch.data; + + /* + * The tuples in the itemidbase array may be in any order so in order to + * move these to the end of the page we must make a temp copy of each + * tuple before we copy them back into the page at the new position. + * + * If a large number of the tuples have been pruned (>75%) then we'll copy + * these into the temp buffer tuple-by-tuple, otherwise, we'll just copy + * the entire tuple section of the page in a single memcpy(). Doing this + * saves us doing large copies when we're pruning most of the tuples. + */ + if (nitems < PageGetMaxOffsetNumber(page) / 4) + { + for (i = 0; i < nitems; i++) + { + itemIdCompact itemidptr = &itemidbase[i]; + memcpy(scratchptr + itemidptr->itemoff, page + itemidptr->itemoff, + itemidptr->alignedlen); + } + } + else + { +#ifdef NOTUSED + /* + * Try to do the minimal amount of copying to get the required + * tuples into the temp buffer. + */ + copy_tail = copy_head = itemidbase[0].itemoff + itemidbase[0].alignedlen; + for (i = 0; i < nitems; i++) + { + itemidptr = &itemidbase[i]; + + if (copy_head != itemidptr->itemoff + itemidptr->alignedlen) + { + memcpy((char *) scratchptr + copy_head, + page + copy_head, + copy_tail - copy_head); + copy_tail = itemidptr->itemoff + itemidptr->alignedlen; + } + copy_head = itemidptr->itemoff; + } + + /* Copy the remaining chunk */ + memcpy((char *) scratchptr + copy_head, + page + copy_head, + copy_tail - copy_head); +#else + /* Copy the entire tuple space */ + memcpy(scratchptr + phdr->pd_upper, + page + phdr->pd_upper, + phdr->pd_special - phdr->pd_upper); +#endif + } + + /* + * Do the tuple compactification. Collapse memmove calls for adjacent + * tuples. + */ + upper = phdr->pd_special; + copy_tail = copy_head = itemidbase[nitems - 1].itemoff + itemidbase[nitems - 1].alignedlen; + for (i = nitems - 1; i >= 0; i--) + { + ItemId lp; + + itemidptr = &itemidbase[i]; + lp = PageGetItemId(page, itemidptr->offsetindex + 1); + + if (copy_head != itemidptr->itemoff + itemidptr->alignedlen) + { + memcpy((char *) page + upper, + scratchptr + copy_head, + copy_tail - copy_head); + copy_tail = itemidptr->itemoff + itemidptr->alignedlen; + } + upper -= itemidptr->alignedlen; + copy_head = itemidptr->itemoff; + + lp->lp_off = upper; + + } + + /* Copy the remaining chunk */ + memcpy((char *) page + upper, + scratchptr + copy_head, + copy_tail - copy_head); } phdr->pd_upper = upper; @@ -477,14 +613,16 @@ PageRepairFragmentation(Page page) Offset pd_lower = ((PageHeader) page)->pd_lower; Offset pd_upper = ((PageHeader) page)->pd_upper; Offset pd_special = ((PageHeader) page)->pd_special; - itemIdSortData itemidbase[MaxHeapTuplesPerPage]; - itemIdSort itemidptr; + Offset last_offset; + itemIdCompactData itemidbase[MaxHeapTuplesPerPage]; + itemIdCompact itemidptr; ItemId lp; int nline, nstorage, nunused; int i; Size totallen; + bool presorted = true; /* For now */ /* * It's worth the trouble to be more paranoid here than in most places, @@ -509,6 +647,7 @@ PageRepairFragmentation(Page page) nline = PageGetMaxOffsetNumber(page); itemidptr = itemidbase; nunused = totallen = 0; + last_offset = 0; for (i = FirstOffsetNumber; i <= nline; i++) { lp = PageGetItemId(page, i); @@ -518,6 +657,12 @@ PageRepairFragmentation(Page page) { itemidptr->offsetindex = i - 1; itemidptr->itemoff = ItemIdGetOffset(lp); + + if (last_offset > itemidptr->itemoff) + presorted = false; + else + last_offset = itemidptr->itemoff; + if (unlikely(itemidptr->itemoff < (int) pd_upper || itemidptr->itemoff >= (int) pd_special)) ereport(ERROR, @@ -552,7 +697,7 @@ PageRepairFragmentation(Page page) errmsg("corrupted item lengths: total %u, available space %u", (unsigned int) totallen, pd_special - pd_lower))); - compactify_tuples(itemidbase, nstorage, page); + compactify_tuples(itemidbase, nstorage, page, presorted); } /* Set hint bit for PageAddItem */ @@ -831,9 +976,9 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems) Offset pd_lower = phdr->pd_lower; Offset pd_upper = phdr->pd_upper; Offset pd_special = phdr->pd_special; - itemIdSortData itemidbase[MaxIndexTuplesPerPage]; + itemIdCompactData itemidbase[MaxIndexTuplesPerPage]; ItemIdData newitemids[MaxIndexTuplesPerPage]; - itemIdSort itemidptr; + itemIdCompact itemidptr; ItemId lp; int nline, nused; @@ -932,7 +1077,7 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems) phdr->pd_lower = SizeOfPageHeaderData + nused * sizeof(ItemIdData); /* and compactify the tuple data */ - compactify_tuples(itemidbase, nused, page); + compactify_tuples(itemidbase, nused, page, false); }