Re: vacuum.c refactoring

From: Bruce Momjian <pgman(at)candle(dot)pha(dot)pa(dot)us>
To: Manfred Koizar <mkoi-pg(at)aon(dot)at>
Cc: pgsql-patches(at)postgresql(dot)org
Subject: Re: vacuum.c refactoring
Date: 2004-06-08 13:59:41
Message-ID: 200406081359.i58Dxf701152@candle.pha.pa.us
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-patches


Patch applied. Thanks.

---------------------------------------------------------------------------

Manfred Koizar wrote:
> . rename variables
> . cur_buffer -> dst_buffer
> . ToPage -> dst_page
> . cur_page -> dst_vacpage
> . move variable declarations into block where variable is used
> . various Asserts instead of elog(ERROR, ...)
> . extract functionality from repair_frag() into new routines
> . move_chain_tuple()
> . move_plain_tuple()
> . update_hint_bits()
> . create type ExecContext
> . add comments
>
> This patch does not intend to change any behaviour. It passes make
> check, make installcheck and some manual tests. It might be hard to
> review, because some lines are affected by more than one change. If
> it's too much to swallow at once, I can provide it in smaller chunks ...
>
> Servus
> Manfred

> diff -Ncr ../base/src/backend/commands/vacuum.c src/backend/commands/vacuum.c
> *** ../base/src/backend/commands/vacuum.c Mon May 31 21:24:05 2004
> --- src/backend/commands/vacuum.c Wed Jun 2 21:46:59 2004
> ***************
> *** 99,104 ****
> --- 99,162 ----
> VTupleLink vtlinks;
> } VRelStats;
>
> + /*----------------------------------------------------------------------
> + * ExecContext:
> + *
> + * As these variables always appear together, we put them into one struct
> + * and pull initialization and cleanup into separate routines.
> + * ExecContext is used by repair_frag() and move_xxx_tuple(). More
> + * accurately: It is *used* only in move_xxx_tuple(), but because this
> + * routine is called many times, we initialize the struct just once in
> + * repair_frag() and pass it on to move_xxx_tuple().
> + */
> + typedef struct ExecContextData
> + {
> + ResultRelInfo *resultRelInfo;
> + EState *estate;
> + TupleTable tupleTable;
> + TupleTableSlot *slot;
> + } ExecContextData;
> + typedef ExecContextData *ExecContext;
> +
> + static void
> + ExecContext_Init(ExecContext ec, Relation rel)
> + {
> + TupleDesc tupdesc = RelationGetDescr(rel);
> +
> + /*
> + * We need a ResultRelInfo and an EState so we can use the regular
> + * executor's index-entry-making machinery.
> + */
> + ec->estate = CreateExecutorState();
> +
> + ec->resultRelInfo = makeNode(ResultRelInfo);
> + ec->resultRelInfo->ri_RangeTableIndex = 1; /* dummy */
> + ec->resultRelInfo->ri_RelationDesc = rel;
> + ec->resultRelInfo->ri_TrigDesc = NULL; /* we don't fire triggers */
> +
> + ExecOpenIndices(ec->resultRelInfo);
> +
> + ec->estate->es_result_relations = ec->resultRelInfo;
> + ec->estate->es_num_result_relations = 1;
> + ec->estate->es_result_relation_info = ec->resultRelInfo;
> +
> + /* Set up a dummy tuple table too */
> + ec->tupleTable = ExecCreateTupleTable(1);
> + ec->slot = ExecAllocTableSlot(ec->tupleTable);
> + ExecSetSlotDescriptor(ec->slot, tupdesc, false);
> + }
> +
> + static void
> + ExecContext_Finish(ExecContext ec)
> + {
> + ExecDropTupleTable(ec->tupleTable, true);
> + ExecCloseIndices(ec->resultRelInfo);
> + FreeExecutorState(ec->estate);
> + }
> + /*
> + * End of ExecContext Implementation
> + *----------------------------------------------------------------------
> + */
>
> static MemoryContext vac_context = NULL;
>
> ***************
> *** 122,127 ****
> --- 180,196 ----
> static void repair_frag(VRelStats *vacrelstats, Relation onerel,
> VacPageList vacuum_pages, VacPageList fraged_pages,
> int nindexes, Relation *Irel);
> + static void move_chain_tuple(Relation rel,
> + Buffer old_buf, Page old_page, HeapTuple old_tup,
> + Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
> + ExecContext ec, ItemPointer ctid, bool cleanVpd);
> + static void move_plain_tuple(Relation rel,
> + Buffer old_buf, Page old_page, HeapTuple old_tup,
> + Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
> + ExecContext ec);
> + static void update_hint_bits(Relation rel, VacPageList fraged_pages,
> + int num_fraged_pages, BlockNumber last_move_dest_block,
> + int num_moved);
> static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
> VacPageList vacpagelist);
> static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
> ***************
> *** 675,681 ****
> static void
> vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
> {
> ! TransactionId myXID;
> Relation relation;
> HeapScanDesc scan;
> HeapTuple tuple;
> --- 744,750 ----
> static void
> vac_truncate_clog(TransactionId vacuumXID, TransactionId frozenXID)
> {
> ! TransactionId myXID = GetCurrentTransactionId();
> Relation relation;
> HeapScanDesc scan;
> HeapTuple tuple;
> ***************
> *** 683,689 ****
> bool vacuumAlreadyWrapped = false;
> bool frozenAlreadyWrapped = false;
>
> - myXID = GetCurrentTransactionId();
>
> relation = heap_openr(DatabaseRelationName, AccessShareLock);
>
> --- 752,757 ----
> ***************
> *** 1059,1075 ****
> {
> BlockNumber nblocks,
> blkno;
> - ItemId itemid;
> - Buffer buf;
> HeapTupleData tuple;
> - OffsetNumber offnum,
> - maxoff;
> - bool pgchanged,
> - tupgone,
> - notup;
> char *relname;
> ! VacPage vacpage,
> ! vacpagecopy;
> BlockNumber empty_pages,
> empty_end_pages;
> double num_tuples,
> --- 1127,1135 ----
> {
> BlockNumber nblocks,
> blkno;
> HeapTupleData tuple;
> char *relname;
> ! VacPage vacpage;
> BlockNumber empty_pages,
> empty_end_pages;
> double num_tuples,
> ***************
> *** 1080,1086 ****
> usable_free_space;
> Size min_tlen = MaxTupleSize;
> Size max_tlen = 0;
> - int i;
> bool do_shrinking = true;
> VTupleLink vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
> int num_vtlinks = 0;
> --- 1140,1145 ----
> ***************
> *** 1113,1118 ****
> --- 1172,1182 ----
> tempPage = NULL;
> bool do_reap,
> do_frag;
> + Buffer buf;
> + OffsetNumber offnum,
> + maxoff;
> + bool pgchanged,
> + notup;
>
> vacuum_delay_point();
>
> ***************
> *** 1125,1130 ****
> --- 1189,1196 ----
>
> if (PageIsNew(page))
> {
> + VacPage vacpagecopy;
> +
> ereport(WARNING,
> (errmsg("relation \"%s\" page %u is uninitialized --- fixing",
> relname, blkno)));
> ***************
> *** 1142,1147 ****
> --- 1208,1215 ----
>
> if (PageIsEmpty(page))
> {
> + VacPage vacpagecopy;
> +
> vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
> free_space += vacpage->free;
> empty_pages++;
> ***************
> *** 1161,1168 ****
> offnum = OffsetNumberNext(offnum))
> {
> uint16 sv_infomask;
> !
> ! itemid = PageGetItemId(page, offnum);
>
> /*
> * Collect un-used items too - it's possible to have indexes
> --- 1229,1236 ----
> offnum = OffsetNumberNext(offnum))
> {
> uint16 sv_infomask;
> ! ItemId itemid = PageGetItemId(page, offnum);
> ! bool tupgone = false;
>
> /*
> * Collect un-used items too - it's possible to have indexes
> ***************
> *** 1180,1186 ****
> tuple.t_len = ItemIdGetLength(itemid);
> ItemPointerSet(&(tuple.t_self), blkno, offnum);
>
> - tupgone = false;
> sv_infomask = tuple.t_data->t_infomask;
>
> switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin))
> --- 1248,1253 ----
> ***************
> *** 1269,1275 ****
> do_shrinking = false;
> break;
> default:
> ! elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
> break;
> }
>
> --- 1336,1343 ----
> do_shrinking = false;
> break;
> default:
> ! /* unexpected HeapTupleSatisfiesVacuum result */
> ! Assert(false);
> break;
> }
>
> ***************
> *** 1344,1350 ****
>
> if (do_reap || do_frag)
> {
> ! vacpagecopy = copy_vac_page(vacpage);
> if (do_reap)
> vpage_insert(vacuum_pages, vacpagecopy);
> if (do_frag)
> --- 1412,1418 ----
>
> if (do_reap || do_frag)
> {
> ! VacPage vacpagecopy = copy_vac_page(vacpage);
> if (do_reap)
> vpage_insert(vacuum_pages, vacpagecopy);
> if (do_frag)
> ***************
> *** 1390,1395 ****
> --- 1458,1465 ----
> */
> if (do_shrinking)
> {
> + int i;
> +
> Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
> fraged_pages->num_pages -= empty_end_pages;
> usable_free_space = 0;
> ***************
> *** 1453,1528 ****
> VacPageList vacuum_pages, VacPageList fraged_pages,
> int nindexes, Relation *Irel)
> {
> ! TransactionId myXID;
> ! CommandId myCID;
> ! Buffer buf,
> ! cur_buffer;
> BlockNumber nblocks,
> blkno;
> BlockNumber last_move_dest_block = 0,
> last_vacuum_block;
> ! Page page,
> ! ToPage = NULL;
> ! OffsetNumber offnum,
> ! maxoff,
> ! newoff,
> ! max_offset;
> ! ItemId itemid,
> ! newitemid;
> ! HeapTupleData tuple,
> ! newtup;
> ! TupleDesc tupdesc;
> ! ResultRelInfo *resultRelInfo;
> ! EState *estate;
> ! TupleTable tupleTable;
> ! TupleTableSlot *slot;
> VacPageListData Nvacpagelist;
> ! VacPage cur_page = NULL,
> last_vacuum_page,
> vacpage,
> *curpage;
> - int cur_item = 0;
> int i;
> ! Size tuple_len;
> ! int num_moved,
> num_fraged_pages,
> vacuumed_pages;
> ! int checked_moved,
> ! num_tuples,
> ! keep_tuples = 0;
> ! bool isempty,
> ! dowrite,
> ! chain_tuple_moved;
> VacRUsage ru0;
>
> vac_init_rusage(&ru0);
>
> ! myXID = GetCurrentTransactionId();
> ! myCID = GetCurrentCommandId();
> !
> ! tupdesc = RelationGetDescr(onerel);
> !
> ! /*
> ! * We need a ResultRelInfo and an EState so we can use the regular
> ! * executor's index-entry-making machinery.
> ! */
> ! estate = CreateExecutorState();
> !
> ! resultRelInfo = makeNode(ResultRelInfo);
> ! resultRelInfo->ri_RangeTableIndex = 1; /* dummy */
> ! resultRelInfo->ri_RelationDesc = onerel;
> ! resultRelInfo->ri_TrigDesc = NULL; /* we don't fire triggers */
> !
> ! ExecOpenIndices(resultRelInfo);
> !
> ! estate->es_result_relations = resultRelInfo;
> ! estate->es_num_result_relations = 1;
> ! estate->es_result_relation_info = resultRelInfo;
> !
> ! /* Set up a dummy tuple table too */
> ! tupleTable = ExecCreateTupleTable(1);
> ! slot = ExecAllocTableSlot(tupleTable);
> ! ExecSetSlotDescriptor(slot, tupdesc, false);
>
> Nvacpagelist.num_pages = 0;
> num_fraged_pages = fraged_pages->num_pages;
> --- 1523,1551 ----
> VacPageList vacuum_pages, VacPageList fraged_pages,
> int nindexes, Relation *Irel)
> {
> ! TransactionId myXID = GetCurrentTransactionId();
> ! Buffer dst_buffer = InvalidBuffer;
> BlockNumber nblocks,
> blkno;
> BlockNumber last_move_dest_block = 0,
> last_vacuum_block;
> ! Page dst_page = NULL;
> ! ExecContextData ec;
> VacPageListData Nvacpagelist;
> ! VacPage dst_vacpage = NULL,
> last_vacuum_page,
> vacpage,
> *curpage;
> int i;
> ! int num_moved = 0,
> num_fraged_pages,
> vacuumed_pages;
> ! int keep_tuples = 0;
> VacRUsage ru0;
>
> vac_init_rusage(&ru0);
>
> ! ExecContext_Init(&ec, onerel);
>
> Nvacpagelist.num_pages = 0;
> num_fraged_pages = fraged_pages->num_pages;
> ***************
> *** 1539,1546 ****
> last_vacuum_page = NULL;
> last_vacuum_block = InvalidBlockNumber;
> }
> - cur_buffer = InvalidBuffer;
> - num_moved = 0;
>
> vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
> vacpage->offsets_used = vacpage->offsets_free = 0;
> --- 1562,1567 ----
> ***************
> *** 1560,1565 ****
> --- 1581,1594 ----
> blkno > last_move_dest_block;
> blkno--)
> {
> + Buffer buf;
> + Page page;
> + OffsetNumber offnum,
> + maxoff;
> + bool isempty,
> + dowrite,
> + chain_tuple_moved;
> +
> vacuum_delay_point();
>
> /*
> ***************
> *** 1635,1641 ****
> offnum <= maxoff;
> offnum = OffsetNumberNext(offnum))
> {
> ! itemid = PageGetItemId(page, offnum);
>
> if (!ItemIdIsUsed(itemid))
> continue;
> --- 1664,1672 ----
> offnum <= maxoff;
> offnum = OffsetNumberNext(offnum))
> {
> ! Size tuple_len;
> ! HeapTupleData tuple;
> ! ItemId itemid = PageGetItemId(page, offnum);
>
> if (!ItemIdIsUsed(itemid))
> continue;
> ***************
> *** 1645,1689 ****
> tuple_len = tuple.t_len = ItemIdGetLength(itemid);
> ItemPointerSet(&(tuple.t_self), blkno, offnum);
>
> if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
> {
> ! if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
> ! elog(ERROR, "HEAP_MOVED_IN was not expected");
>
> /*
> * If this (chain) tuple is moved by me already then I
> * have to check is it in vacpage or not - i.e. is it
> * moved while cleaning this page or some previous one.
> */
> ! if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
> ! {
> ! if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
> ! elog(ERROR, "invalid XVAC in tuple header");
> ! if (keep_tuples == 0)
> ! continue;
> ! if (chain_tuple_moved) /* some chains was moved
> ! * while */
> ! { /* cleaning this page */
> ! Assert(vacpage->offsets_free > 0);
> ! for (i = 0; i < vacpage->offsets_free; i++)
> ! {
> ! if (vacpage->offsets[i] == offnum)
> ! break;
> ! }
> ! if (i >= vacpage->offsets_free) /* not found */
> ! {
> ! vacpage->offsets[vacpage->offsets_free++] = offnum;
> ! keep_tuples--;
> ! }
> }
> ! else
> {
> vacpage->offsets[vacpage->offsets_free++] = offnum;
> keep_tuples--;
> }
> - continue;
> }
> ! elog(ERROR, "HEAP_MOVED_OFF was expected");
> }
>
> /*
> --- 1676,1746 ----
> tuple_len = tuple.t_len = ItemIdGetLength(itemid);
> ItemPointerSet(&(tuple.t_self), blkno, offnum);
>
> + /*
> + * VACUUM FULL has an exclusive lock on the relation. So
> + * normally no other transaction can have pending INSERTs or
> + * DELETEs in this relation. A tuple is either
> + * (a) a tuple in a system catalog, inserted or deleted by
> + * a not yet committed transaction or
> + * (b) dead (XMIN_INVALID or XMAX_COMMITTED) or
> + * (c) inserted by a committed xact (XMIN_COMMITTED) or
> + * (d) moved by the currently running VACUUM.
> + * In case (a) we wouldn't be in repair_frag() at all.
> + * In case (b) we cannot be here, because scan_heap() has
> + * already marked the item as unused, see continue above.
> + * Case (c) is what normally is to be expected.
> + * Case (d) is only possible, if a whole tuple chain has been
> + * moved while processing this or a higher numbered block.
> + */
> if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
> {
> ! /*
> ! * There cannot be another concurrently running VACUUM. If
> ! * the tuple had been moved in by a previous VACUUM, the
> ! * visibility check would have set XMIN_COMMITTED. If the
> ! * tuple had been moved in by the currently running VACUUM,
> ! * the loop would have been terminated. We had
> ! * elog(ERROR, ...) here, but as we are testing for a
> ! * can't-happen condition, Assert() seems more appropriate.
> ! */
> ! Assert(!(tuple.t_data->t_infomask & HEAP_MOVED_IN));
>
> /*
> * If this (chain) tuple is moved by me already then I
> * have to check is it in vacpage or not - i.e. is it
> * moved while cleaning this page or some previous one.
> */
> ! Assert(tuple.t_data->t_infomask & HEAP_MOVED_OFF);
> ! /*
> ! * MOVED_OFF by another VACUUM would have caused the
> ! * visibility check to set XMIN_COMMITTED or XMIN_INVALID.
> ! */
> ! Assert(HeapTupleHeaderGetXvac(tuple.t_data) == myXID);
> !
> ! /* Can't we Assert(keep_tuples > 0) here? */
> ! if (keep_tuples == 0)
> ! continue;
> ! if (chain_tuple_moved) /* some chains was moved
> ! * while */
> ! { /* cleaning this page */
> ! Assert(vacpage->offsets_free > 0);
> ! for (i = 0; i < vacpage->offsets_free; i++)
> ! {
> ! if (vacpage->offsets[i] == offnum)
> ! break;
> }
> ! if (i >= vacpage->offsets_free) /* not found */
> {
> vacpage->offsets[vacpage->offsets_free++] = offnum;
> keep_tuples--;
> }
> }
> ! else
> ! {
> ! vacpage->offsets[vacpage->offsets_free++] = offnum;
> ! keep_tuples--;
> ! }
> ! continue;
> }
>
> /*
> ***************
> *** 1716,1723 ****
> Buffer Cbuf = buf;
> bool freeCbuf = false;
> bool chain_move_failed = false;
> - Page Cpage;
> - ItemId Citemid;
> ItemPointerData Ctid;
> HeapTupleData tp = tuple;
> Size tlen = tuple_len;
> --- 1773,1778 ----
> ***************
> *** 1728,1737 ****
> int to_item = 0;
> int ti;
>
> ! if (cur_buffer != InvalidBuffer)
> {
> ! WriteBuffer(cur_buffer);
> ! cur_buffer = InvalidBuffer;
> }
>
> /* Quick exit if we have no vtlinks to search in */
> --- 1783,1792 ----
> int to_item = 0;
> int ti;
>
> ! if (dst_buffer != InvalidBuffer)
> {
> ! WriteBuffer(dst_buffer);
> ! dst_buffer = InvalidBuffer;
> }
>
> /* Quick exit if we have no vtlinks to search in */
> ***************
> *** 1754,1759 ****
> --- 1809,1818 ----
> !(ItemPointerEquals(&(tp.t_self),
> &(tp.t_data->t_ctid))))
> {
> + Page Cpage;
> + ItemId Citemid;
> + ItemPointerData Ctid;
> +
> Ctid = tp.t_data->t_ctid;
> if (freeCbuf)
> ReleaseBuffer(Cbuf);
> ***************
> *** 1929,1940 ****
> }
>
> /*
> ! * Okay, move the whle tuple chain
> */
> ItemPointerSetInvalid(&Ctid);
> for (ti = 0; ti < num_vtmove; ti++)
> {
> VacPage destvacpage = vtmove[ti].vacpage;
>
> /* Get page to move from */
> tuple.t_self = vtmove[ti].tid;
> --- 1988,2001 ----
> }
>
> /*
> ! * Okay, move the whole tuple chain
> */
> ItemPointerSetInvalid(&Ctid);
> for (ti = 0; ti < num_vtmove; ti++)
> {
> VacPage destvacpage = vtmove[ti].vacpage;
> + Page Cpage;
> + ItemId Citemid;
>
> /* Get page to move from */
> tuple.t_self = vtmove[ti].tid;
> ***************
> *** 1942,1954 ****
> ItemPointerGetBlockNumber(&(tuple.t_self)));
>
> /* Get page to move to */
> ! cur_buffer = ReadBuffer(onerel, destvacpage->blkno);
>
> ! LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
> ! if (cur_buffer != Cbuf)
> LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);
>
> ! ToPage = BufferGetPage(cur_buffer);
> Cpage = BufferGetPage(Cbuf);
>
> Citemid = PageGetItemId(Cpage,
> --- 2003,2015 ----
> ItemPointerGetBlockNumber(&(tuple.t_self)));
>
> /* Get page to move to */
> ! dst_buffer = ReadBuffer(onerel, destvacpage->blkno);
>
> ! LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
> ! if (dst_buffer != Cbuf)
> LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);
>
> ! dst_page = BufferGetPage(dst_buffer);
> Cpage = BufferGetPage(Cbuf);
>
> Citemid = PageGetItemId(Cpage,
> ***************
> *** 1961,2081 ****
> * make a copy of the source tuple, and then mark the
> * source tuple MOVED_OFF.
> */
> ! heap_copytuple_with_tuple(&tuple, &newtup);
> !
> ! /*
> ! * register invalidation of source tuple in catcaches.
> ! */
> ! CacheInvalidateHeapTuple(onerel, &tuple);
> !
> ! /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
> ! START_CRIT_SECTION();
> !
> ! tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
> ! HEAP_XMIN_INVALID |
> ! HEAP_MOVED_IN);
> ! tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
> ! HeapTupleHeaderSetXvac(tuple.t_data, myXID);
> !
> ! /*
> ! * If this page was not used before - clean it.
> ! *
> ! * NOTE: a nasty bug used to lurk here. It is possible
> ! * for the source and destination pages to be the same
> ! * (since this tuple-chain member can be on a page
> ! * lower than the one we're currently processing in
> ! * the outer loop). If that's true, then after
> ! * vacuum_page() the source tuple will have been
> ! * moved, and tuple.t_data will be pointing at
> ! * garbage. Therefore we must do everything that uses
> ! * tuple.t_data BEFORE this step!!
> ! *
> ! * This path is different from the other callers of
> ! * vacuum_page, because we have already incremented
> ! * the vacpage's offsets_used field to account for the
> ! * tuple(s) we expect to move onto the page. Therefore
> ! * vacuum_page's check for offsets_used == 0 is wrong.
> ! * But since that's a good debugging check for all
> ! * other callers, we work around it here rather than
> ! * remove it.
> ! */
> ! if (!PageIsEmpty(ToPage) && vtmove[ti].cleanVpd)
> ! {
> ! int sv_offsets_used = destvacpage->offsets_used;
> !
> ! destvacpage->offsets_used = 0;
> ! vacuum_page(onerel, cur_buffer, destvacpage);
> ! destvacpage->offsets_used = sv_offsets_used;
> ! }
> !
> ! /*
> ! * Update the state of the copied tuple, and store it
> ! * on the destination page.
> ! */
> ! newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
> ! HEAP_XMIN_INVALID |
> ! HEAP_MOVED_OFF);
> ! newtup.t_data->t_infomask |= HEAP_MOVED_IN;
> ! HeapTupleHeaderSetXvac(newtup.t_data, myXID);
> ! newoff = PageAddItem(ToPage,
> ! (Item) newtup.t_data,
> ! tuple_len,
> ! InvalidOffsetNumber,
> ! LP_USED);
> ! if (newoff == InvalidOffsetNumber)
> ! {
> ! elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
> ! (unsigned long) tuple_len, destvacpage->blkno);
> ! }
> ! newitemid = PageGetItemId(ToPage, newoff);
> ! pfree(newtup.t_data);
> ! newtup.t_datamcxt = NULL;
> ! newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
> ! ItemPointerSet(&(newtup.t_self), destvacpage->blkno, newoff);
> !
> ! /* XLOG stuff */
> ! if (!onerel->rd_istemp)
> ! {
> ! XLogRecPtr recptr =
> ! log_heap_move(onerel, Cbuf, tuple.t_self,
> ! cur_buffer, &newtup);
> !
> ! if (Cbuf != cur_buffer)
> ! {
> ! PageSetLSN(Cpage, recptr);
> ! PageSetSUI(Cpage, ThisStartUpID);
> ! }
> ! PageSetLSN(ToPage, recptr);
> ! PageSetSUI(ToPage, ThisStartUpID);
> ! }
> ! else
> ! {
> ! /*
> ! * No XLOG record, but still need to flag that XID
> ! * exists on disk
> ! */
> ! MyXactMadeTempRelUpdate = true;
> ! }
> !
> ! END_CRIT_SECTION();
>
> if (destvacpage->blkno > last_move_dest_block)
> last_move_dest_block = destvacpage->blkno;
>
> /*
> - * Set new tuple's t_ctid pointing to itself for last
> - * tuple in chain, and to next tuple in chain
> - * otherwise.
> - */
> - if (!ItemPointerIsValid(&Ctid))
> - newtup.t_data->t_ctid = newtup.t_self;
> - else
> - newtup.t_data->t_ctid = Ctid;
> - Ctid = newtup.t_self;
> -
> - num_moved++;
> -
> - /*
> * Remember that we moved tuple from the current page
> * (corresponding index tuple will be cleaned).
> */
> --- 2022,2036 ----
> * make a copy of the source tuple, and then mark the
> * source tuple MOVED_OFF.
> */
> ! move_chain_tuple(onerel, Cbuf, Cpage, &tuple,
> ! dst_buffer, dst_page, destvacpage,
> ! &ec, &Ctid, vtmove[ti].cleanVpd);
>
> + num_moved++;
> if (destvacpage->blkno > last_move_dest_block)
> last_move_dest_block = destvacpage->blkno;
>
> /*
> * Remember that we moved tuple from the current page
> * (corresponding index tuple will be cleaned).
> */
> ***************
> *** 2085,2107 ****
> else
> keep_tuples++;
>
> ! LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
> ! if (cur_buffer != Cbuf)
> ! LockBuffer(Cbuf, BUFFER_LOCK_UNLOCK);
> !
> ! /* Create index entries for the moved tuple */
> ! if (resultRelInfo->ri_NumIndices > 0)
> ! {
> ! ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
> ! ExecInsertIndexTuples(slot, &(newtup.t_self),
> ! estate, true);
> ! }
> !
> ! WriteBuffer(cur_buffer);
> WriteBuffer(Cbuf);
> } /* end of move-the-tuple-chain loop */
>
> ! cur_buffer = InvalidBuffer;
> pfree(vtmove);
> chain_tuple_moved = true;
>
> --- 2040,2050 ----
> else
> keep_tuples++;
>
> ! WriteBuffer(dst_buffer);
> WriteBuffer(Cbuf);
> } /* end of move-the-tuple-chain loop */
>
> ! dst_buffer = InvalidBuffer;
> pfree(vtmove);
> chain_tuple_moved = true;
>
> ***************
> *** 2110,2122 ****
> } /* end of is-tuple-in-chain test */
>
> /* try to find new page for this tuple */
> ! if (cur_buffer == InvalidBuffer ||
> ! !enough_space(cur_page, tuple_len))
> {
> ! if (cur_buffer != InvalidBuffer)
> {
> ! WriteBuffer(cur_buffer);
> ! cur_buffer = InvalidBuffer;
> }
> for (i = 0; i < num_fraged_pages; i++)
> {
> --- 2053,2065 ----
> } /* end of is-tuple-in-chain test */
>
> /* try to find new page for this tuple */
> ! if (dst_buffer == InvalidBuffer ||
> ! !enough_space(dst_vacpage, tuple_len))
> {
> ! if (dst_buffer != InvalidBuffer)
> {
> ! WriteBuffer(dst_buffer);
> ! dst_buffer = InvalidBuffer;
> }
> for (i = 0; i < num_fraged_pages; i++)
> {
> ***************
> *** 2125,2234 ****
> }
> if (i == num_fraged_pages)
> break; /* can't move item anywhere */
> ! cur_item = i;
> ! cur_page = fraged_pages->pagedesc[cur_item];
> ! cur_buffer = ReadBuffer(onerel, cur_page->blkno);
> ! LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
> ! ToPage = BufferGetPage(cur_buffer);
> /* if this page was not used before - clean it */
> ! if (!PageIsEmpty(ToPage) && cur_page->offsets_used == 0)
> ! vacuum_page(onerel, cur_buffer, cur_page);
> }
> else
> ! LockBuffer(cur_buffer, BUFFER_LOCK_EXCLUSIVE);
>
> LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
>
> ! /* copy tuple */
> ! heap_copytuple_with_tuple(&tuple, &newtup);
> !
> ! /*
> ! * register invalidation of source tuple in catcaches.
> ! *
> ! * (Note: we do not need to register the copied tuple, because we
> ! * are not changing the tuple contents and so there cannot be
> ! * any need to flush negative catcache entries.)
> ! */
> ! CacheInvalidateHeapTuple(onerel, &tuple);
>
> - /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
> - START_CRIT_SECTION();
>
> ! /*
> ! * Mark new tuple as MOVED_IN by me.
> ! */
> ! newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
> ! HEAP_XMIN_INVALID |
> ! HEAP_MOVED_OFF);
> ! newtup.t_data->t_infomask |= HEAP_MOVED_IN;
> ! HeapTupleHeaderSetXvac(newtup.t_data, myXID);
> !
> ! /* add tuple to the page */
> ! newoff = PageAddItem(ToPage, (Item) newtup.t_data, tuple_len,
> ! InvalidOffsetNumber, LP_USED);
> ! if (newoff == InvalidOffsetNumber)
> ! {
> ! elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
> ! (unsigned long) tuple_len,
> ! cur_page->blkno, (unsigned long) cur_page->free,
> ! cur_page->offsets_used, cur_page->offsets_free);
> ! }
> ! newitemid = PageGetItemId(ToPage, newoff);
> ! pfree(newtup.t_data);
> ! newtup.t_datamcxt = NULL;
> ! newtup.t_data = (HeapTupleHeader) PageGetItem(ToPage, newitemid);
> ! ItemPointerSet(&(newtup.t_data->t_ctid), cur_page->blkno, newoff);
> ! newtup.t_self = newtup.t_data->t_ctid;
>
> /*
> ! * Mark old tuple as MOVED_OFF by me.
> */
> - tuple.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
> - HEAP_XMIN_INVALID |
> - HEAP_MOVED_IN);
> - tuple.t_data->t_infomask |= HEAP_MOVED_OFF;
> - HeapTupleHeaderSetXvac(tuple.t_data, myXID);
> -
> - /* XLOG stuff */
> - if (!onerel->rd_istemp)
> - {
> - XLogRecPtr recptr =
> - log_heap_move(onerel, buf, tuple.t_self,
> - cur_buffer, &newtup);
> -
> - PageSetLSN(page, recptr);
> - PageSetSUI(page, ThisStartUpID);
> - PageSetLSN(ToPage, recptr);
> - PageSetSUI(ToPage, ThisStartUpID);
> - }
> - else
> - {
> - /*
> - * No XLOG record, but still need to flag that XID exists
> - * on disk
> - */
> - MyXactMadeTempRelUpdate = true;
> - }
> -
> - END_CRIT_SECTION();
> -
> - cur_page->offsets_used++;
> - num_moved++;
> - cur_page->free = ((PageHeader) ToPage)->pd_upper - ((PageHeader) ToPage)->pd_lower;
> - if (cur_page->blkno > last_move_dest_block)
> - last_move_dest_block = cur_page->blkno;
> -
> vacpage->offsets[vacpage->offsets_free++] = offnum;
> -
> - LockBuffer(cur_buffer, BUFFER_LOCK_UNLOCK);
> - LockBuffer(buf, BUFFER_LOCK_UNLOCK);
> -
> - /* insert index' tuples if needed */
> - if (resultRelInfo->ri_NumIndices > 0)
> - {
> - ExecStoreTuple(&newtup, slot, InvalidBuffer, false);
> - ExecInsertIndexTuples(slot, &(newtup.t_self), estate, true);
> - }
> } /* walk along page */
>
> /*
> --- 2068,2099 ----
> }
> if (i == num_fraged_pages)
> break; /* can't move item anywhere */
> ! dst_vacpage = fraged_pages->pagedesc[i];
> ! dst_buffer = ReadBuffer(onerel, dst_vacpage->blkno);
> ! LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
> ! dst_page = BufferGetPage(dst_buffer);
> /* if this page was not used before - clean it */
> ! if (!PageIsEmpty(dst_page) && dst_vacpage->offsets_used == 0)
> ! vacuum_page(onerel, dst_buffer, dst_vacpage);
> }
> else
> ! LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
>
> LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
>
> ! move_plain_tuple(onerel, buf, page, &tuple,
> ! dst_buffer, dst_page, dst_vacpage, &ec);
>
>
> ! num_moved++;
> ! if (dst_vacpage->blkno > last_move_dest_block)
> ! last_move_dest_block = dst_vacpage->blkno;
>
> /*
> ! * Remember that we moved tuple from the current page
> ! * (corresponding index tuple will be cleaned).
> */
> vacpage->offsets[vacpage->offsets_free++] = offnum;
> } /* walk along page */
>
> /*
> ***************
> *** 2249,2284 ****
> off <= maxoff;
> off = OffsetNumberNext(off))
> {
> ! itemid = PageGetItemId(page, off);
> if (!ItemIdIsUsed(itemid))
> continue;
> ! tuple.t_datamcxt = NULL;
> ! tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
> ! if (tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)
> continue;
> ! if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
> ! elog(ERROR, "HEAP_MOVED_IN was not expected");
> ! if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
> {
> ! if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
> ! elog(ERROR, "invalid XVAC in tuple header");
> ! /* some chains was moved while */
> ! if (chain_tuple_moved)
> ! { /* cleaning this page */
> ! Assert(vacpage->offsets_free > 0);
> ! for (i = 0; i < vacpage->offsets_free; i++)
> ! {
> ! if (vacpage->offsets[i] == off)
> ! break;
> ! }
> ! if (i >= vacpage->offsets_free) /* not found */
> ! {
> ! vacpage->offsets[vacpage->offsets_free++] = off;
> ! Assert(keep_tuples > 0);
> ! keep_tuples--;
> ! }
> }
> ! else
> {
> vacpage->offsets[vacpage->offsets_free++] = off;
> Assert(keep_tuples > 0);
> --- 2114,2144 ----
> off <= maxoff;
> off = OffsetNumberNext(off))
> {
> ! ItemId itemid = PageGetItemId(page, off);
> ! HeapTupleHeader htup;
> !
> if (!ItemIdIsUsed(itemid))
> continue;
> ! htup = (HeapTupleHeader) PageGetItem(page, itemid);
> ! if (htup->t_infomask & HEAP_XMIN_COMMITTED)
> continue;
> ! /*
> ! ** See comments in the walk-along-page loop above, why we
> ! ** have Asserts here instead of if (...) elog(ERROR).
> ! */
> ! Assert(!(htup->t_infomask & HEAP_MOVED_IN));
> ! Assert(htup->t_infomask & HEAP_MOVED_OFF);
> ! Assert(HeapTupleHeaderGetXvac(htup) == myXID);
> ! if (chain_tuple_moved)
> {
> ! /* some chains was moved while cleaning this page */
> ! Assert(vacpage->offsets_free > 0);
> ! for (i = 0; i < vacpage->offsets_free; i++)
> ! {
> ! if (vacpage->offsets[i] == off)
> ! break;
> }
> ! if (i >= vacpage->offsets_free) /* not found */
> {
> vacpage->offsets[vacpage->offsets_free++] = off;
> Assert(keep_tuples > 0);
> ***************
> *** 2286,2292 ****
> }
> }
> else
> ! elog(ERROR, "HEAP_MOVED_OFF was expected");
> }
> }
>
> --- 2146,2156 ----
> }
> }
> else
> ! {
> ! vacpage->offsets[vacpage->offsets_free++] = off;
> ! Assert(keep_tuples > 0);
> ! keep_tuples--;
> ! }
> }
> }
>
> ***************
> *** 2312,2321 ****
>
> blkno++; /* new number of blocks */
>
> ! if (cur_buffer != InvalidBuffer)
> {
> Assert(num_moved > 0);
> ! WriteBuffer(cur_buffer);
> }
>
> if (num_moved > 0)
> --- 2176,2185 ----
>
> blkno++; /* new number of blocks */
>
> ! if (dst_buffer != InvalidBuffer)
> {
> Assert(num_moved > 0);
> ! WriteBuffer(dst_buffer);
> }
>
> if (num_moved > 0)
> ***************
> *** 2348,2353 ****
> --- 2212,2220 ----
> Assert((*curpage)->blkno < blkno);
> if ((*curpage)->offsets_used == 0)
> {
> + Buffer buf;
> + Page page;
> +
> /* this page was not used as a move target, so must clean it */
> buf = ReadBuffer(onerel, (*curpage)->blkno);
> LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
> ***************
> *** 2363,2424 ****
> * Now scan all the pages that we moved tuples onto and update tuple
> * status bits. This is not really necessary, but will save time for
> * future transactions examining these tuples.
> - *
> - * XXX NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
> - * pages that were move source pages but not move dest pages. One
> - * also wonders whether it wouldn't be better to skip this step and
> - * let the tuple status updates happen someplace that's not holding an
> - * exclusive lock on the relation.
> */
> ! checked_moved = 0;
> ! for (i = 0, curpage = fraged_pages->pagedesc;
> ! i < num_fraged_pages;
> ! i++, curpage++)
> ! {
> ! vacuum_delay_point();
> !
> ! Assert((*curpage)->blkno < blkno);
> ! if ((*curpage)->blkno > last_move_dest_block)
> ! break; /* no need to scan any further */
> ! if ((*curpage)->offsets_used == 0)
> ! continue; /* this page was never used as a move dest */
> ! buf = ReadBuffer(onerel, (*curpage)->blkno);
> ! LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
> ! page = BufferGetPage(buf);
> ! num_tuples = 0;
> ! max_offset = PageGetMaxOffsetNumber(page);
> ! for (newoff = FirstOffsetNumber;
> ! newoff <= max_offset;
> ! newoff = OffsetNumberNext(newoff))
> ! {
> ! itemid = PageGetItemId(page, newoff);
> ! if (!ItemIdIsUsed(itemid))
> ! continue;
> ! tuple.t_datamcxt = NULL;
> ! tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
> ! if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
> ! {
> ! if (!(tuple.t_data->t_infomask & HEAP_MOVED))
> ! elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
> ! if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
> ! elog(ERROR, "invalid XVAC in tuple header");
> ! if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
> ! {
> ! tuple.t_data->t_infomask |= HEAP_XMIN_COMMITTED;
> ! tuple.t_data->t_infomask &= ~HEAP_MOVED;
> ! num_tuples++;
> ! }
> ! else
> ! tuple.t_data->t_infomask |= HEAP_XMIN_INVALID;
> ! }
> ! }
> ! LockBuffer(buf, BUFFER_LOCK_UNLOCK);
> ! WriteBuffer(buf);
> ! Assert((*curpage)->offsets_used == num_tuples);
> ! checked_moved += num_tuples;
> ! }
> ! Assert(num_moved == checked_moved);
> !
> /*
> * It'd be cleaner to make this report at the bottom of this routine,
> * but then the rusage would double-count the second pass of index
> --- 2230,2239 ----
> * Now scan all the pages that we moved tuples onto and update tuple
> * status bits. This is not really necessary, but will save time for
> * future transactions examining these tuples.
> */
> ! update_hint_bits(onerel, fraged_pages, num_fraged_pages,
> ! last_move_dest_block, num_moved);
> !
> /*
> * It'd be cleaner to make this report at the bottom of this routine,
> * but then the rusage would double-count the second pass of index
> ***************
> *** 2455,2460 ****
> --- 2270,2282 ----
> *vpleft = *vpright;
> *vpright = vpsave;
> }
> + /*
> + * keep_tuples is the number of tuples that have been moved
> + * off a page during chain moves but not been scanned over
> + * subsequently. The tuple ids of these tuples are not
> + * recorded as free offsets for any VacPage, so they will not
> + * be cleared from the indexes.
> + */
> Assert(keep_tuples >= 0);
> for (i = 0; i < nindexes; i++)
> vacuum_index(&Nvacpagelist, Irel[i],
> ***************
> *** 2465,2500 ****
> if (vacpage->blkno == (blkno - 1) &&
> vacpage->offsets_free > 0)
> {
> ! OffsetNumber unused[BLCKSZ / sizeof(OffsetNumber)];
> ! int uncnt;
>
> buf = ReadBuffer(onerel, vacpage->blkno);
> LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
> page = BufferGetPage(buf);
> - num_tuples = 0;
> maxoff = PageGetMaxOffsetNumber(page);
> for (offnum = FirstOffsetNumber;
> offnum <= maxoff;
> offnum = OffsetNumberNext(offnum))
> {
> ! itemid = PageGetItemId(page, offnum);
> if (!ItemIdIsUsed(itemid))
> continue;
> ! tuple.t_datamcxt = NULL;
> ! tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
>
> ! if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
> ! {
> ! if (tuple.t_data->t_infomask & HEAP_MOVED_OFF)
> ! {
> ! if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
> ! elog(ERROR, "invalid XVAC in tuple header");
> ! itemid->lp_flags &= ~LP_USED;
> ! num_tuples++;
> ! }
> ! else
> ! elog(ERROR, "HEAP_MOVED_OFF was expected");
> ! }
>
> }
> Assert(vacpage->offsets_free == num_tuples);
> --- 2287,2327 ----
> if (vacpage->blkno == (blkno - 1) &&
> vacpage->offsets_free > 0)
> {
> ! Buffer buf;
> ! Page page;
> ! OffsetNumber unused[BLCKSZ / sizeof(OffsetNumber)];
> ! OffsetNumber offnum,
> ! maxoff;
> ! int uncnt;
> ! int num_tuples = 0;
>
> buf = ReadBuffer(onerel, vacpage->blkno);
> LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
> page = BufferGetPage(buf);
> maxoff = PageGetMaxOffsetNumber(page);
> for (offnum = FirstOffsetNumber;
> offnum <= maxoff;
> offnum = OffsetNumberNext(offnum))
> {
> ! ItemId itemid = PageGetItemId(page, offnum);
> ! HeapTupleHeader htup;
> !
> if (!ItemIdIsUsed(itemid))
> continue;
> ! htup = (HeapTupleHeader) PageGetItem(page, itemid);
> ! if (htup->t_infomask & HEAP_XMIN_COMMITTED)
> ! continue;
>
> ! /*
> ! ** See comments in the walk-along-page loop above, why we
> ! ** have Asserts here instead of if (...) elog(ERROR).
> ! */
> ! Assert(!(htup->t_infomask & HEAP_MOVED_IN));
> ! Assert(htup->t_infomask & HEAP_MOVED_OFF);
> ! Assert(HeapTupleHeaderGetXvac(htup) == myXID);
> !
> ! itemid->lp_flags &= ~LP_USED;
> ! num_tuples++;
>
> }
> Assert(vacpage->offsets_free == num_tuples);
> ***************
> *** 2554,2564 ****
> if (vacrelstats->vtlinks != NULL)
> pfree(vacrelstats->vtlinks);
>
> ! ExecDropTupleTable(tupleTable, true);
>
> ! ExecCloseIndices(resultRelInfo);
>
> ! FreeExecutorState(estate);
> }
>
> /*
> --- 2381,2717 ----
> if (vacrelstats->vtlinks != NULL)
> pfree(vacrelstats->vtlinks);
>
> ! ExecContext_Finish(&ec);
> ! }
> !
> ! /*
> ! * move_chain_tuple() -- move one tuple that is part of a tuple chain
> ! *
> ! * This routine moves old_tup from old_page to dst_page.
> ! * old_page and dst_page might be the same page.
> ! * On entry old_buf and dst_buf are locked exclusively, both locks (or
> ! * the single lock, if this is a intra-page-move) are released before
> ! * exit.
> ! *
> ! * Yes, a routine with ten parameters is ugly, but it's still better
> ! * than having these 120 lines of code in repair_frag() which is
> ! * already too long and almost unreadable.
> ! */
> ! static void
> ! move_chain_tuple(Relation rel,
> ! Buffer old_buf, Page old_page, HeapTuple old_tup,
> ! Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
> ! ExecContext ec, ItemPointer ctid, bool cleanVpd)
> ! {
> ! TransactionId myXID = GetCurrentTransactionId();
> ! HeapTupleData newtup;
> ! OffsetNumber newoff;
> ! ItemId newitemid;
> ! Size tuple_len = old_tup->t_len;
> !
> ! heap_copytuple_with_tuple(old_tup, &newtup);
> !
> ! /*
> ! * register invalidation of source tuple in catcaches.
> ! */
> ! CacheInvalidateHeapTuple(rel, old_tup);
> !
> ! /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
> ! START_CRIT_SECTION();
> !
> ! old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
> ! HEAP_XMIN_INVALID |
> ! HEAP_MOVED_IN);
> ! old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
> ! HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
> !
> ! /*
> ! * If this page was not used before - clean it.
> ! *
> ! * NOTE: a nasty bug used to lurk here. It is possible
> ! * for the source and destination pages to be the same
> ! * (since this tuple-chain member can be on a page
> ! * lower than the one we're currently processing in
> ! * the outer loop). If that's true, then after
> ! * vacuum_page() the source tuple will have been
> ! * moved, and tuple.t_data will be pointing at
> ! * garbage. Therefore we must do everything that uses
> ! * old_tup->t_data BEFORE this step!!
> ! *
> ! * This path is different from the other callers of
> ! * vacuum_page, because we have already incremented
> ! * the vacpage's offsets_used field to account for the
> ! * tuple(s) we expect to move onto the page. Therefore
> ! * vacuum_page's check for offsets_used == 0 is wrong.
> ! * But since that's a good debugging check for all
> ! * other callers, we work around it here rather than
> ! * remove it.
> ! */
> ! if (!PageIsEmpty(dst_page) && cleanVpd)
> ! {
> ! int sv_offsets_used = dst_vacpage->offsets_used;
> !
> ! dst_vacpage->offsets_used = 0;
> ! vacuum_page(rel, dst_buf, dst_vacpage);
> ! dst_vacpage->offsets_used = sv_offsets_used;
> ! }
>
> ! /*
> ! * Update the state of the copied tuple, and store it
> ! * on the destination page.
> ! */
> ! newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
> ! HEAP_XMIN_INVALID |
> ! HEAP_MOVED_OFF);
> ! newtup.t_data->t_infomask |= HEAP_MOVED_IN;
> ! HeapTupleHeaderSetXvac(newtup.t_data, myXID);
> ! newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
> ! InvalidOffsetNumber, LP_USED);
> ! if (newoff == InvalidOffsetNumber)
> ! {
> ! elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
> ! (unsigned long) tuple_len, dst_vacpage->blkno);
> ! }
> ! newitemid = PageGetItemId(dst_page, newoff);
> ! pfree(newtup.t_data);
> ! newtup.t_datamcxt = NULL;
> ! newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
> ! ItemPointerSet(&(newtup.t_self), dst_vacpage->blkno, newoff);
> !
> ! /* XLOG stuff */
> ! if (!rel->rd_istemp)
> ! {
> ! XLogRecPtr recptr = log_heap_move(rel, old_buf, old_tup->t_self,
> ! dst_buf, &newtup);
> !
> ! if (old_buf != dst_buf)
> ! {
> ! PageSetLSN(old_page, recptr);
> ! PageSetSUI(old_page, ThisStartUpID);
> ! }
> ! PageSetLSN(dst_page, recptr);
> ! PageSetSUI(dst_page, ThisStartUpID);
> ! }
> ! else
> ! {
> ! /*
> ! * No XLOG record, but still need to flag that XID
> ! * exists on disk
> ! */
> ! MyXactMadeTempRelUpdate = true;
> ! }
> !
> ! END_CRIT_SECTION();
> !
> ! /*
> ! * Set new tuple's t_ctid pointing to itself for last
> ! * tuple in chain, and to next tuple in chain
> ! * otherwise.
> ! */
> ! /* Is this ok after log_heap_move() and END_CRIT_SECTION()? */
> ! if (!ItemPointerIsValid(ctid))
> ! newtup.t_data->t_ctid = newtup.t_self;
> ! else
> ! newtup.t_data->t_ctid = *ctid;
> ! *ctid = newtup.t_self;
>
> ! LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
> ! if (dst_buf != old_buf)
> ! LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
> !
> ! /* Create index entries for the moved tuple */
> ! if (ec->resultRelInfo->ri_NumIndices > 0)
> ! {
> ! ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
> ! ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
> ! }
> ! }
> !
> ! /*
> ! * move_plain_tuple() -- move one tuple that is not part of a chain
> ! *
> ! * This routine moves old_tup from old_page to dst_page.
> ! * On entry old_buf and dst_buf are locked exclusively, both locks are
> ! * released before exit.
> ! *
> ! * Yes, a routine with eight parameters is ugly, but it's still better
> ! * than having these 90 lines of code in repair_frag() which is already
> ! * too long and almost unreadable.
> ! */
> ! static void
> ! move_plain_tuple(Relation rel,
> ! Buffer old_buf, Page old_page, HeapTuple old_tup,
> ! Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
> ! ExecContext ec)
> ! {
> ! TransactionId myXID = GetCurrentTransactionId();
> ! HeapTupleData newtup;
> ! OffsetNumber newoff;
> ! ItemId newitemid;
> ! Size tuple_len = old_tup->t_len;
> !
> ! /* copy tuple */
> ! heap_copytuple_with_tuple(old_tup, &newtup);
> !
> ! /*
> ! * register invalidation of source tuple in catcaches.
> ! *
> ! * (Note: we do not need to register the copied tuple, because we
> ! * are not changing the tuple contents and so there cannot be
> ! * any need to flush negative catcache entries.)
> ! */
> ! CacheInvalidateHeapTuple(rel, old_tup);
> !
> ! /* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
> ! START_CRIT_SECTION();
> !
> ! /*
> ! * Mark new tuple as MOVED_IN by me.
> ! */
> ! newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
> ! HEAP_XMIN_INVALID |
> ! HEAP_MOVED_OFF);
> ! newtup.t_data->t_infomask |= HEAP_MOVED_IN;
> ! HeapTupleHeaderSetXvac(newtup.t_data, myXID);
> !
> ! /* add tuple to the page */
> ! newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
> ! InvalidOffsetNumber, LP_USED);
> ! if (newoff == InvalidOffsetNumber)
> ! {
> ! elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
> ! (unsigned long) tuple_len,
> ! dst_vacpage->blkno, (unsigned long) dst_vacpage->free,
> ! dst_vacpage->offsets_used, dst_vacpage->offsets_free);
> ! }
> ! newitemid = PageGetItemId(dst_page, newoff);
> ! pfree(newtup.t_data);
> ! newtup.t_datamcxt = NULL;
> ! newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
> ! ItemPointerSet(&(newtup.t_data->t_ctid), dst_vacpage->blkno, newoff);
> ! newtup.t_self = newtup.t_data->t_ctid;
> !
> ! /*
> ! * Mark old tuple as MOVED_OFF by me.
> ! */
> ! old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
> ! HEAP_XMIN_INVALID |
> ! HEAP_MOVED_IN);
> ! old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
> ! HeapTupleHeaderSetXvac(old_tup->t_data, myXID);
> !
> ! /* XLOG stuff */
> ! if (!rel->rd_istemp)
> ! {
> ! XLogRecPtr recptr = log_heap_move(rel, old_buf, old_tup->t_self,
> ! dst_buf, &newtup);
> !
> ! PageSetLSN(old_page, recptr);
> ! PageSetSUI(old_page, ThisStartUpID);
> ! PageSetLSN(dst_page, recptr);
> ! PageSetSUI(dst_page, ThisStartUpID);
> ! }
> ! else
> ! {
> ! /*
> ! * No XLOG record, but still need to flag that XID exists
> ! * on disk
> ! */
> ! MyXactMadeTempRelUpdate = true;
> ! }
> !
> ! END_CRIT_SECTION();
> !
> ! dst_vacpage->free = ((PageHeader) dst_page)->pd_upper -
> ! ((PageHeader) dst_page)->pd_lower;
> ! LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
> ! LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
> !
> ! dst_vacpage->offsets_used++;
> !
> ! /* insert index' tuples if needed */
> ! if (ec->resultRelInfo->ri_NumIndices > 0)
> ! {
> ! ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
> ! ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
> ! }
> ! }
> !
> ! /*
> ! * update_hint_bits() -- update hint bits in destination pages
> ! *
> ! * Scan all the pages that we moved tuples onto and update tuple
> ! * status bits. This is not really necessary, but will save time for
> ! * future transactions examining these tuples.
> ! *
> ! * XXX NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
> ! * pages that were move source pages but not move dest pages. One
> ! * also wonders whether it wouldn't be better to skip this step and
> ! * let the tuple status updates happen someplace that's not holding an
> ! * exclusive lock on the relation.
> ! */
> ! static void
> ! update_hint_bits(Relation rel, VacPageList fraged_pages, int num_fraged_pages,
> ! BlockNumber last_move_dest_block, int num_moved)
> ! {
> ! int checked_moved = 0;
> ! int i;
> ! VacPage *curpage;
> !
> ! for (i = 0, curpage = fraged_pages->pagedesc;
> ! i < num_fraged_pages;
> ! i++, curpage++)
> ! {
> ! Buffer buf;
> ! Page page;
> ! OffsetNumber max_offset;
> ! OffsetNumber off;
> ! int num_tuples = 0;
> !
> ! vacuum_delay_point();
> !
> ! if ((*curpage)->blkno > last_move_dest_block)
> ! break; /* no need to scan any further */
> ! if ((*curpage)->offsets_used == 0)
> ! continue; /* this page was never used as a move dest */
> ! buf = ReadBuffer(rel, (*curpage)->blkno);
> ! LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
> ! page = BufferGetPage(buf);
> ! max_offset = PageGetMaxOffsetNumber(page);
> ! for (off = FirstOffsetNumber;
> ! off <= max_offset;
> ! off = OffsetNumberNext(off))
> ! {
> ! ItemId itemid = PageGetItemId(page, off);
> ! HeapTupleHeader htup;
> !
> ! if (!ItemIdIsUsed(itemid))
> ! continue;
> ! htup = (HeapTupleHeader) PageGetItem(page, itemid);
> ! if (htup->t_infomask & HEAP_XMIN_COMMITTED)
> ! continue;
> ! /*
> ! * See comments in the walk-along-page loop above, why we
> ! * have Asserts here instead of if (...) elog(ERROR). The
> ! * difference here is that we may see MOVED_IN.
> ! */
> ! Assert(htup->t_infomask & HEAP_MOVED);
> ! Assert(HeapTupleHeaderGetXvac(htup) == GetCurrentTransactionId());
> ! if (htup->t_infomask & HEAP_MOVED_IN)
> ! {
> ! htup->t_infomask |= HEAP_XMIN_COMMITTED;
> ! htup->t_infomask &= ~HEAP_MOVED;
> ! num_tuples++;
> ! }
> ! else
> ! htup->t_infomask |= HEAP_XMIN_INVALID;
> ! }
> ! LockBuffer(buf, BUFFER_LOCK_UNLOCK);
> ! WriteBuffer(buf);
> ! Assert((*curpage)->offsets_used == num_tuples);
> ! checked_moved += num_tuples;
> ! }
> ! Assert(num_moved == checked_moved);
> }
>
> /*

>
> ---------------------------(end of broadcast)---------------------------
> TIP 6: Have you searched our list archives?
>
> http://archives.postgresql.org

--
Bruce Momjian | http://candle.pha.pa.us
pgman(at)candle(dot)pha(dot)pa(dot)us | (610) 359-1001
+ If your life is a hard drive, | 13 Roberts Road
+ Christ can be your backup. | Newtown Square, Pennsylvania 19073

In response to

Browse pgsql-patches by date

  From Date Subject
Next Message Fabien COELHO 2004-06-08 14:00:07 fix schema ownership for database owner on first connection
Previous Message Bruce Momjian 2004-06-08 13:49:22 Re: Relocatable locale