From 89a943032f0a10fd093c126d15fbf81e5861dbe3 Mon Sep 17 00:00:00 2001 From: Marco Nenciarini Date: Mon, 3 Nov 2014 17:52:27 +0100 Subject: [PATCH] LSN Map This is a WIP. Only heap is supported. No indexes, no sequences. --- src/backend/access/heap/Makefile | 2 +- src/backend/access/heap/heapam.c | 239 ++++++++++++++++++++++-- src/backend/access/heap/hio.c | 11 +- src/backend/access/heap/lsnmap.c | 336 ++++++++++++++++++++++++++++++++++ src/backend/access/heap/pruneheap.c | 10 + src/backend/access/heap/rewriteheap.c | 37 +++- src/backend/catalog/storage.c | 8 + src/backend/commands/tablecmds.c | 5 +- src/backend/commands/vacuumlazy.c | 35 +++- src/backend/storage/smgr/smgr.c | 1 + src/common/relpath.c | 5 +- src/include/access/hio.h | 3 +- src/include/access/lsnmap.h | 28 +++ src/include/common/relpath.h | 5 +- src/include/storage/smgr.h | 1 + 15 files changed, 687 insertions(+), 39 deletions(-) create mode 100644 src/backend/access/heap/lsnmap.c create mode 100644 src/include/access/lsnmap.h diff --git a/src/backend/access/heap/Makefile b/src/backend/access/heap/Makefile index b83d496..776ee7d 100644 *** a/src/backend/access/heap/Makefile --- b/src/backend/access/heap/Makefile *************** subdir = src/backend/access/heap *** 12,17 **** top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global ! OBJS = heapam.o hio.o pruneheap.o rewriteheap.o syncscan.o tuptoaster.o visibilitymap.o include $(top_srcdir)/src/backend/common.mk --- 12,17 ---- top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global ! OBJS = heapam.o hio.o pruneheap.o rewriteheap.o syncscan.o tuptoaster.o visibilitymap.o lsnmap.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 21e9d06..9486562 100644 *** a/src/backend/access/heap/heapam.c --- b/src/backend/access/heap/heapam.c *************** *** 48,53 **** --- 48,54 ---- #include "access/tuptoaster.h" #include "access/valid.h" #include "access/visibilitymap.h" + #include "access/lsnmap.h" #include "access/xact.h" #include "access/xlog.h" #include "access/xloginsert.h" *************** heap_insert(Relation relation, HeapTuple *** 2067,2073 **** TransactionId xid = GetCurrentTransactionId(); HeapTuple heaptup; Buffer buffer; ! Buffer vmbuffer = InvalidBuffer; bool all_visible_cleared = false; /* --- 2068,2075 ---- TransactionId xid = GetCurrentTransactionId(); HeapTuple heaptup; Buffer buffer; ! Buffer vmbuffer = InvalidBuffer, ! lmbuffer = InvalidBuffer; bool all_visible_cleared = false; /* *************** heap_insert(Relation relation, HeapTuple *** 2097,2103 **** */ buffer = RelationGetBufferForTuple(relation, heaptup->t_len, InvalidBuffer, options, bistate, ! &vmbuffer, NULL); /* NO EREPORT(ERROR) from here till changes are logged */ START_CRIT_SECTION(); --- 2099,2106 ---- */ buffer = RelationGetBufferForTuple(relation, heaptup->t_len, InvalidBuffer, options, bistate, ! &vmbuffer, NULL, ! &lmbuffer, NULL); /* NO EREPORT(ERROR) from here till changes are logged */ START_CRIT_SECTION(); *************** heap_insert(Relation relation, HeapTuple *** 2192,2197 **** --- 2195,2205 ---- recptr = XLogInsert(RM_HEAP_ID, info); PageSetLSN(page, recptr); + + /* + * Update the LSN map + */ + lsnmap_set(relation, BufferGetBlockNumber(buffer), lmbuffer, recptr); } END_CRIT_SECTION(); *************** heap_insert(Relation relation, HeapTuple *** 2199,2204 **** --- 2207,2214 ---- UnlockReleaseBuffer(buffer); if (vmbuffer != InvalidBuffer) ReleaseBuffer(vmbuffer); + if (lmbuffer != InvalidBuffer) + ReleaseBuffer(lmbuffer); /* * If tuple is cachable, mark it for invalidation from the caches in case *************** heap_multi_insert(Relation relation, Hea *** 2346,2352 **** while (ndone < ntuples) { Buffer buffer; ! Buffer vmbuffer = InvalidBuffer; bool all_visible_cleared = false; int nthispage; --- 2356,2363 ---- while (ndone < ntuples) { Buffer buffer; ! Buffer vmbuffer = InvalidBuffer, ! lmbuffer = InvalidBuffer; bool all_visible_cleared = false; int nthispage; *************** heap_multi_insert(Relation relation, Hea *** 2358,2364 **** */ buffer = RelationGetBufferForTuple(relation, heaptuples[ndone]->t_len, InvalidBuffer, options, bistate, ! &vmbuffer, NULL); page = BufferGetPage(buffer); /* NO EREPORT(ERROR) from here till changes are logged */ --- 2369,2376 ---- */ buffer = RelationGetBufferForTuple(relation, heaptuples[ndone]->t_len, InvalidBuffer, options, bistate, ! &vmbuffer, NULL, ! &lmbuffer, NULL); page = BufferGetPage(buffer); /* NO EREPORT(ERROR) from here till changes are logged */ *************** heap_multi_insert(Relation relation, Hea *** 2502,2507 **** --- 2514,2521 ---- recptr = XLogInsert(RM_HEAP2_ID, info); PageSetLSN(page, recptr); + + lsnmap_set(relation, BufferGetBlockNumber(buffer), lmbuffer, recptr); } END_CRIT_SECTION(); *************** heap_multi_insert(Relation relation, Hea *** 2509,2514 **** --- 2523,2530 ---- UnlockReleaseBuffer(buffer); if (vmbuffer != InvalidBuffer) ReleaseBuffer(vmbuffer); + if (lmbuffer != InvalidBuffer) + ReleaseBuffer(lmbuffer); ndone += nthispage; } *************** heap_delete(Relation relation, ItemPoint *** 2629,2635 **** Page page; BlockNumber block; Buffer buffer; ! Buffer vmbuffer = InvalidBuffer; TransactionId new_xmax; uint16 new_infomask, new_infomask2; --- 2645,2652 ---- Page page; BlockNumber block; Buffer buffer; ! Buffer vmbuffer = InvalidBuffer, ! lmbuffer = InvalidBuffer; TransactionId new_xmax; uint16 new_infomask, new_infomask2; *************** heap_delete(Relation relation, ItemPoint *** 2645,2650 **** --- 2662,2670 ---- buffer = ReadBuffer(relation, block); page = BufferGetPage(buffer); + if (RelationNeedsWAL(relation)) + lsnmap_pin(relation, block, &lmbuffer); + /* * Before locking the buffer, pin the visibility map page if it appears to * be necessary. Since we haven't got the lock yet, someone else might be *************** l1: *** 2797,2802 **** --- 2817,2824 ---- UnlockTupleTuplock(relation, &(tp.t_self), LockTupleExclusive); if (vmbuffer != InvalidBuffer) ReleaseBuffer(vmbuffer); + if (lmbuffer != InvalidBuffer) + ReleaseBuffer(lmbuffer); return result; } *************** l1: *** 2912,2917 **** --- 2934,2941 ---- recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE); PageSetLSN(page, recptr); + + lsnmap_set(relation, block, lmbuffer, recptr); } END_CRIT_SECTION(); *************** l1: *** 2920,2926 **** if (vmbuffer != InvalidBuffer) ReleaseBuffer(vmbuffer); ! /* * If the tuple has toasted out-of-line attributes, we need to delete * those items too. We have to do this before releasing the buffer --- 2944,2951 ---- if (vmbuffer != InvalidBuffer) ReleaseBuffer(vmbuffer); ! if (lmbuffer != InvalidBuffer) ! ReleaseBuffer(lmbuffer); /* * If the tuple has toasted out-of-line attributes, we need to delete * those items too. We have to do this before releasing the buffer *************** heap_update(Relation relation, ItemPoint *** 3053,3059 **** Buffer buffer, newbuf, vmbuffer = InvalidBuffer, ! vmbuffer_new = InvalidBuffer; bool need_toast, already_marked; Size newtupsize, --- 3078,3086 ---- Buffer buffer, newbuf, vmbuffer = InvalidBuffer, ! vmbuffer_new = InvalidBuffer, ! lmbuffer = InvalidBuffer, ! lmbuffer_new = InvalidBuffer; bool need_toast, already_marked; Size newtupsize, *************** heap_update(Relation relation, ItemPoint *** 3099,3104 **** --- 3126,3134 ---- buffer = ReadBuffer(relation, block); page = BufferGetPage(buffer); + if (RelationNeedsWAL(relation)) + lsnmap_pin(relation, block, &lmbuffer); + /* * Before locking the buffer, pin the visibility map page if it appears to * be necessary. Since we haven't got the lock yet, someone else might be *************** l2: *** 3390,3395 **** --- 3420,3427 ---- UnlockTupleTuplock(relation, &(oldtup.t_self), *lockmode); if (vmbuffer != InvalidBuffer) ReleaseBuffer(vmbuffer); + if (lmbuffer != InvalidBuffer) + ReleaseBuffer(lmbuffer); bms_free(hot_attrs); bms_free(key_attrs); return result; *************** l2: *** 3570,3576 **** /* Assume there's no chance to put heaptup on same page. */ newbuf = RelationGetBufferForTuple(relation, heaptup->t_len, buffer, 0, NULL, ! &vmbuffer_new, &vmbuffer); } else { --- 3602,3609 ---- /* Assume there's no chance to put heaptup on same page. */ newbuf = RelationGetBufferForTuple(relation, heaptup->t_len, buffer, 0, NULL, ! &vmbuffer_new, &vmbuffer, ! &lmbuffer_new, &lmbuffer); } else { *************** l2: *** 3588,3594 **** LockBuffer(buffer, BUFFER_LOCK_UNLOCK); newbuf = RelationGetBufferForTuple(relation, heaptup->t_len, buffer, 0, NULL, ! &vmbuffer_new, &vmbuffer); } else { --- 3621,3628 ---- LockBuffer(buffer, BUFFER_LOCK_UNLOCK); newbuf = RelationGetBufferForTuple(relation, heaptup->t_len, buffer, 0, NULL, ! &vmbuffer_new, &vmbuffer, ! &lmbuffer_new, &lmbuffer); } else { *************** l2: *** 3740,3747 **** --- 3774,3783 ---- if (newbuf != buffer) { PageSetLSN(BufferGetPage(newbuf), recptr); + lsnmap_set(relation, BufferGetBlockNumber(newbuf), lmbuffer_new, recptr); } PageSetLSN(BufferGetPage(buffer), recptr); + lsnmap_set(relation, BufferGetBlockNumber(buffer), lmbuffer, recptr); } END_CRIT_SECTION(); *************** l2: *** 3768,3774 **** ReleaseBuffer(vmbuffer_new); if (BufferIsValid(vmbuffer)) ReleaseBuffer(vmbuffer); ! /* * Release the lmgr tuple lock, if we had it. */ --- 3804,3813 ---- ReleaseBuffer(vmbuffer_new); if (BufferIsValid(vmbuffer)) ReleaseBuffer(vmbuffer); ! if (BufferIsValid(lmbuffer_new)) ! ReleaseBuffer(lmbuffer_new); ! if (BufferIsValid(lmbuffer)) ! ReleaseBuffer(lmbuffer); /* * Release the lmgr tuple lock, if we had it. */ *************** heap_lock_tuple(Relation relation, HeapT *** 4091,4096 **** --- 4130,4136 ---- HTSU_Result result; ItemPointer tid = &(tuple->t_self); ItemId lp; + Buffer lmbuffer = InvalidBuffer; Page page; TransactionId xid, xmax; *************** failed: *** 4567,4572 **** --- 4607,4615 ---- return HeapTupleMayBeUpdated; } + if (RelationNeedsWAL(relation)) + lsnmap_pin(relation, BufferGetBlockNumber(*buffer), &lmbuffer); + /* * If this is the first possibly-multixact-able operation in the current * transaction, set my per-backend OldestMemberMXactId setting. We can be *************** failed: *** 4647,4652 **** --- 4690,4697 ---- recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK); PageSetLSN(page, recptr); + + lsnmap_set(relation, BufferGetBlockNumber(*buffer), lmbuffer, recptr); } END_CRIT_SECTION(); *************** failed: *** 4658,4663 **** --- 4703,4711 ---- * visibility info. */ + if (lmbuffer != InvalidBuffer) + ReleaseBuffer(lmbuffer); + /* * Now that we have successfully marked the tuple as locked, we can * release the lmgr tuple lock, if we had it. *************** heap_lock_updated_tuple_rec(Relation rel *** 5100,5106 **** { ItemPointerData tupid; HeapTupleData mytup; ! Buffer buf; uint16 new_infomask, new_infomask2, old_infomask, --- 5148,5155 ---- { ItemPointerData tupid; HeapTupleData mytup; ! Buffer buf, ! lmbuffer = InvalidBuffer; uint16 new_infomask, new_infomask2, old_infomask, *************** heap_lock_updated_tuple_rec(Relation rel *** 5129,5134 **** --- 5178,5186 ---- return HeapTupleMayBeUpdated; } + if (RelationNeedsWAL(rel)) + lsnmap_pin(rel, BufferGetBlockNumber(buf), &lmbuffer); + l4: CHECK_FOR_INTERRUPTS(); LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); *************** l4: *** 5142,5147 **** --- 5194,5201 ---- priorXmax)) { UnlockReleaseBuffer(buf); + if (lmbuffer != InvalidBuffer) + ReleaseBuffer(lmbuffer); return HeapTupleMayBeUpdated; } *************** l4: *** 5189,5194 **** --- 5243,5250 ---- if (res != HeapTupleMayBeUpdated) { UnlockReleaseBuffer(buf); + if (lmbuffer != InvalidBuffer) + ReleaseBuffer(lmbuffer); pfree(members); return res; } *************** l4: *** 5249,5254 **** --- 5305,5312 ---- if (res != HeapTupleMayBeUpdated) { UnlockReleaseBuffer(buf); + if (lmbuffer != InvalidBuffer) + ReleaseBuffer(lmbuffer); return res; } } *************** l4: *** 5289,5298 **** --- 5347,5361 ---- recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_LOCK_UPDATED); PageSetLSN(page, recptr); + + lsnmap_set(rel, BufferGetBlockNumber(buf), lmbuffer, recptr); } END_CRIT_SECTION(); + if (lmbuffer != InvalidBuffer) + ReleaseBuffer(lmbuffer); + /* if we find the end of update chain, we're done. */ if (mytup.t_data->t_infomask & HEAP_XMAX_INVALID || ItemPointerEquals(&mytup.t_self, &mytup.t_data->t_ctid) || *************** heap_lock_updated_tuple(Relation rel, He *** 5374,5380 **** void heap_inplace_update(Relation relation, HeapTuple tuple) { ! Buffer buffer; Page page; OffsetNumber offnum; ItemId lp = NULL; --- 5437,5444 ---- void heap_inplace_update(Relation relation, HeapTuple tuple) { ! Buffer buffer, ! lmbuffer = InvalidBuffer; Page page; OffsetNumber offnum; ItemId lp = NULL; *************** heap_inplace_update(Relation relation, H *** 5383,5388 **** --- 5447,5456 ---- uint32 newlen; buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(&(tuple->t_self))); + + if (RelationNeedsWAL(relation)) + lsnmap_pin(relation, BufferGetBlockNumber(buffer), &lmbuffer); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); page = (Page) BufferGetPage(buffer); *************** heap_inplace_update(Relation relation, H *** 5426,5437 **** --- 5494,5510 ---- recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_INPLACE); PageSetLSN(page, recptr); + + lsnmap_set(relation, BufferGetBlockNumber(buffer), lmbuffer, recptr); } END_CRIT_SECTION(); UnlockReleaseBuffer(buffer); + if (lmbuffer != InvalidBuffer) + ReleaseBuffer(lmbuffer); + /* * Send out shared cache inval if necessary. Note that because we only * pass the new version of the tuple, this mustn't be used for any *************** heap_xlog_clean(XLogReaderState *record) *** 7024,7029 **** --- 7097,7115 ---- ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode); /* + * Update the LSN map + */ + { + Relation reln = CreateFakeRelcacheEntry(rnode); + Buffer lmbuffer = InvalidBuffer; + + lsnmap_pin(reln, blkno, &lmbuffer); + lsnmap_set(reln, blkno, lmbuffer, lsn); + ReleaseBuffer(lmbuffer); + FreeFakeRelcacheEntry(reln); + } + + /* * If we have a full-page image, restore it (using a cleanup lock) and * we're done. */ *************** heap_xlog_freeze_page(XLogReaderState *r *** 7208,7225 **** xl_heap_freeze_page *xlrec = (xl_heap_freeze_page *) XLogRecGetData(record); TransactionId cutoff_xid = xlrec->cutoff_xid; Buffer buffer; int ntup; /* * In Hot Standby mode, ensure that there's no queries running which still * consider the frozen xids as running. */ if (InHotStandby) { ! RelFileNode rnode; ! XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL); ! ResolveRecoveryConflictWithSnapshot(cutoff_xid, rnode); } if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) --- 7294,7323 ---- xl_heap_freeze_page *xlrec = (xl_heap_freeze_page *) XLogRecGetData(record); TransactionId cutoff_xid = xlrec->cutoff_xid; Buffer buffer; + RelFileNode rnode; + BlockNumber blkno; int ntup; + XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno); + /* * In Hot Standby mode, ensure that there's no queries running which still * consider the frozen xids as running. */ if (InHotStandby) + ResolveRecoveryConflictWithSnapshot(cutoff_xid, rnode); + + /* + * Update the LSN map + */ { ! Relation reln = CreateFakeRelcacheEntry(rnode); ! Buffer lmbuffer = InvalidBuffer; ! lsnmap_pin(reln, blkno, &lmbuffer); ! lsnmap_set(reln, blkno, lmbuffer, lsn); ! ReleaseBuffer(lmbuffer); ! FreeFakeRelcacheEntry(reln); } if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) *************** heap_xlog_delete(XLogReaderState *record *** 7309,7314 **** --- 7407,7425 ---- FreeFakeRelcacheEntry(reln); } + /* + * Update the LSN map + */ + { + Relation reln = CreateFakeRelcacheEntry(target_node); + Buffer lmbuffer = InvalidBuffer; + + lsnmap_pin(reln, blkno, &lmbuffer); + lsnmap_set(reln, blkno, lmbuffer, lsn); + ReleaseBuffer(lmbuffer); + FreeFakeRelcacheEntry(reln); + } + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { page = BufferGetPage(buffer); *************** heap_xlog_insert(XLogReaderState *record *** 7385,7390 **** --- 7496,7514 ---- } /* + * Update the LSN map + */ + { + Relation reln = CreateFakeRelcacheEntry(target_node); + Buffer lmbuffer = InvalidBuffer; + + lsnmap_pin(reln, blkno, &lmbuffer); + lsnmap_set(reln, blkno, lmbuffer, lsn); + ReleaseBuffer(lmbuffer); + FreeFakeRelcacheEntry(reln); + } + + /* * If we inserted the first and only tuple on the page, re-initialize the * page from scratch. */ *************** heap_xlog_multi_insert(XLogReaderState * *** 7504,7509 **** --- 7628,7646 ---- FreeFakeRelcacheEntry(reln); } + /* + * Update the LSN map + */ + { + Relation reln = CreateFakeRelcacheEntry(rnode); + Buffer lmbuffer = InvalidBuffer; + + lsnmap_pin(reln, blkno, &lmbuffer); + lsnmap_set(reln, blkno, lmbuffer, lsn); + ReleaseBuffer(lmbuffer); + FreeFakeRelcacheEntry(reln); + } + if (isinit) { buffer = XLogInitBufferForRedo(record, 0); *************** heap_xlog_update(XLogReaderState *record *** 7660,7665 **** --- 7797,7820 ---- } /* + * Update the LSN map + */ + { + Relation reln = CreateFakeRelcacheEntry(rnode); + Buffer lmbuffer = InvalidBuffer; + + lsnmap_pin(reln, oldblk, &lmbuffer); + lsnmap_set(reln, oldblk, lmbuffer, lsn); + if (oldblk != newblk) + { + lsnmap_pin(reln, newblk, &lmbuffer); + lsnmap_set(reln, newblk, lmbuffer, lsn); + } + ReleaseBuffer(lmbuffer); + FreeFakeRelcacheEntry(reln); + } + + /* * In normal operation, it is important to lock the two pages in * page-number order, to avoid possible deadlocks against other update * operations going the other way. However, during WAL replay there can *************** heap_xlog_lock(XLogReaderState *record) *** 7882,7887 **** --- 8037,8060 ---- ItemId lp = NULL; HeapTupleHeader htup; + /* + * Update the LSN map + */ + { + Relation reln; + RelFileNode rnode; + BlockNumber blkno; + Buffer lmbuffer = InvalidBuffer; + + XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno); + reln = CreateFakeRelcacheEntry(rnode); + + lsnmap_pin(reln, blkno, &lmbuffer); + lsnmap_set(reln, blkno, lmbuffer, lsn); + ReleaseBuffer(lmbuffer); + FreeFakeRelcacheEntry(reln); + } + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { page = (Page) BufferGetPage(buffer); *************** heap_xlog_lock_updated(XLogReaderState * *** 7930,7935 **** --- 8103,8126 ---- ItemId lp = NULL; HeapTupleHeader htup; + /* + * Update the LSN map + */ + { + Relation reln; + RelFileNode rnode; + BlockNumber blkno; + Buffer lmbuffer = InvalidBuffer; + + XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno); + reln = CreateFakeRelcacheEntry(rnode); + + lsnmap_pin(reln, blkno, &lmbuffer); + lsnmap_set(reln, blkno, lmbuffer, lsn); + ReleaseBuffer(lmbuffer); + FreeFakeRelcacheEntry(reln); + } + xlrec = (xl_heap_lock_updated *) XLogRecGetData(record); if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) *************** heap_xlog_inplace(XLogReaderState *recor *** 7969,7974 **** --- 8160,8183 ---- uint32 oldlen; Size newlen; + /* + * Update the LSN map + */ + { + Relation reln; + RelFileNode rnode; + BlockNumber blkno; + Buffer lmbuffer = InvalidBuffer; + + XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno); + reln = CreateFakeRelcacheEntry(rnode); + + lsnmap_pin(reln, blkno, &lmbuffer); + lsnmap_set(reln, blkno, lmbuffer, lsn); + ReleaseBuffer(lmbuffer); + FreeFakeRelcacheEntry(reln); + } + if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO) { char *newtup = XLogRecGetBlockData(record, 0, &newlen); diff --git a/src/backend/access/heap/hio.c b/src/backend/access/heap/hio.c index 6d091f6..09e93d0 100644 *** a/src/backend/access/heap/hio.c --- b/src/backend/access/heap/hio.c *************** *** 19,24 **** --- 19,25 ---- #include "access/hio.h" #include "access/htup_details.h" #include "access/visibilitymap.h" + #include "access/lsnmap.h" #include "storage/bufmgr.h" #include "storage/freespace.h" #include "storage/lmgr.h" *************** Buffer *** 215,221 **** RelationGetBufferForTuple(Relation relation, Size len, Buffer otherBuffer, int options, BulkInsertState bistate, ! Buffer *vmbuffer, Buffer *vmbuffer_other) { bool use_fsm = !(options & HEAP_INSERT_SKIP_FSM); Buffer buffer = InvalidBuffer; --- 216,223 ---- RelationGetBufferForTuple(Relation relation, Size len, Buffer otherBuffer, int options, BulkInsertState bistate, ! Buffer *vmbuffer, Buffer *vmbuffer_other, ! Buffer *lmbuffer, Buffer *lmbuffer_other) { bool use_fsm = !(options & HEAP_INSERT_SKIP_FSM); Buffer buffer = InvalidBuffer; *************** RelationGetBufferForTuple(Relation relat *** 297,302 **** --- 299,308 ---- while (targetBlock != InvalidBlockNumber) { + + if (RelationNeedsWAL(relation)) + lsnmap_pin(relation, targetBlock, lmbuffer); + /* * Read and exclusive-lock the target block, as well as the other * block if one was given, taking suitable care with lock ordering and *************** RelationGetBufferForTuple(Relation relat *** 438,443 **** --- 444,452 ---- */ buffer = ReadBufferBI(relation, P_NEW, bistate); + if (RelationNeedsWAL(relation)) + lsnmap_pin(relation, BufferGetBlockNumber(buffer), lmbuffer); + /* * We can be certain that locking the otherBuffer first is OK, since it * must have a lower page number. diff --git a/src/backend/access/heap/lsnmap.c b/src/backend/access/heap/lsnmap.c index ...e736ed6 100644 *** a/src/backend/access/heap/lsnmap.c --- b/src/backend/access/heap/lsnmap.c *************** *** 0 **** --- 1,336 ---- + /*------------------------------------------------------------------------- + * + * lsnmap.c + * map for tracking LSN of heap blocks + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/access/heap/lsnmap.c + * + * INTERFACE ROUTINES + * lsnmap_pin - pin a map page for setting a bit + * lsnmap_set - set a bit in a previously pinned page + * lsnmap_truncate - truncate the LSN map + * + * NOTES + * + * The LSN map contains an LSN per HEAPBLOCKS_PER_ENTRY heap pages. Every entry + * means that no modification have been made to the pages that are part of group + * after that LSN. + * + * The LSN map is not wal logged, but is updated during log reply. + * + *------------------------------------------------------------------------- + */ + #include "postgres.h" + + #include "access/heapam_xlog.h" + #include "access/lsnmap.h" + #include "miscadmin.h" + #include "storage/bufmgr.h" + #include "storage/lmgr.h" + #include "storage/smgr.h" + #include "utils/inval.h" + + + /*#define TRACE_LSNMAP */ + + /* Number of pages per LSN map entry/ */ + #define HEAPBLOCKS_PER_ENTRY 2048 + + /* Size of an LSN map entry */ + #define BYTES_PER_ENTRY (sizeof(XLogRecPtr)) + + /* + * Size of the map on each LSN map page, in bytes. There's no + * extra headers, so the whole page minus the standard page header is + * used for the bitmap. + */ + #define MAPSIZE TYPEALIGN_DOWN(BYTES_PER_ENTRY, BLCKSZ - MAXALIGN(SizeOfPageHeaderData)) + + /* Number of heap blocks we can represent in one LSN map page. */ + #define HEAPBLOCKS_PER_PAGE (MAPSIZE / BYTES_PER_ENTRY * HEAPBLOCKS_PER_ENTRY) + + /* Mapping from heap block number to the right bit in the LSN map */ + #define HEAPBLK_TO_MAPBLOCK(x) ((x) / HEAPBLOCKS_PER_PAGE) + #define HEAPBLK_TO_MAPPOS(x) (((x) % HEAPBLOCKS_PER_PAGE) / HEAPBLOCKS_PER_ENTRY) + + /* prototypes for internal routines */ + static Buffer lm_readbuf(Relation rel, BlockNumber blkno, bool extend); + static void lm_extend(Relation rel, BlockNumber nlmblocks); + + /* + * lsnmap_pin - pin a map page for setting an entry + * + * Setting an entry in the LSN map is a two-phase operation. First, call + * lsnmap_pin, to pin the LSN map page containing the bit for + * the heap page. Because that can require I/O to read the map page, you + * shouldn't hold a lock on the heap page while doing that. Then, call + * lsnmap_set to actually set the bit. + * + * On entry, *buf should be InvalidBuffer or a valid buffer returned by + * an earlier call to lsnmap_pin on the same relation. + * On return, *buf is a valid buffer with the map page containing + * the entry for heapBlk. + * + * If the page doesn't exist in the map file yet, it is extended. + */ + void + lsnmap_pin(Relation rel, BlockNumber heapBlk, Buffer *buf) + { + BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk); + + /* Reuse the old pinned buffer if possible */ + if (BufferIsValid(*buf)) + { + if (BufferGetBlockNumber(*buf) == mapBlock) + return; + + ReleaseBuffer(*buf); + } + *buf = lm_readbuf(rel, mapBlock, true); + } + + /* + * lsnmap_set - set an entry on a previously pinned page + * + * You must pass a buffer containing the correct map page to this function. + * Call lsnmap_pin first to pin the right one. This function doesn't do + * any I/O. + */ + void + lsnmap_set(Relation rel, BlockNumber heapBlk, Buffer lmBuf, XLogRecPtr lsn) + { + BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk); + uint32 mapPos = HEAPBLK_TO_MAPPOS(heapBlk); + XLogRecPtr *map; + + #ifdef TRACE_LSNMAP + elog(DEBUG1, "lm_set %s %d", RelationGetRelationName(rel), heapBlk); + #endif + + /* Check that we have the right LM page pinned */ + if (!BufferIsValid(lmBuf) || BufferGetBlockNumber(lmBuf) != mapBlock) + elog(ERROR, "wrong LM buffer passed to lsnmap_set"); + + LockBuffer(lmBuf, BUFFER_LOCK_EXCLUSIVE); + + map = (XLogRecPtr *) PageGetContents(BufferGetPage(lmBuf)); + + if (map[mapPos] < lsn) + { + map[mapPos] = lsn; + MarkBufferDirty(lmBuf); + } + + LockBuffer(lmBuf, BUFFER_LOCK_UNLOCK); + } + + /* + * lsnmap_truncate - truncate the LSN map + * + * The caller must hold AccessExclusiveLock on the relation, to ensure that + * other backends receive the smgr invalidation event that this function sends + * before they access the LM again. + * + * nheapblocks is the new size of the heap. + */ + void + lsnmap_truncate(Relation rel, BlockNumber nheapblocks) + { + BlockNumber newnblocks; + + /* last remaining block, byte, and bit */ + BlockNumber truncBlock = HEAPBLK_TO_MAPBLOCK(nheapblocks); + uint32 truncPos = HEAPBLK_TO_MAPPOS(nheapblocks); + + #ifdef TRACE_LSNMAP + elog(DEBUG1, "lm_truncate %s %d", RelationGetRelationName(rel), nheapblocks); + #endif + + RelationOpenSmgr(rel); + + /* + * If no LSN map has been created yet for this relation, there's + * nothing to truncate. + */ + if (!smgrexists(rel->rd_smgr, LSNMAP_FORKNUM)) + return; + + /* + * Unless the new size is exactly at a LSN map page boundary, the + * tail bits in the last remaining map page, representing truncated heap + * blocks, need to be cleared. This is not only tidy, but also necessary + * because we don't get a chance to clear the bits if the heap is extended + * again. + */ + if (truncPos != 0) + { + Buffer mapBuffer; + Page page; + XLogRecPtr *map; + + newnblocks = truncBlock + 1; + + mapBuffer = lm_readbuf(rel, truncBlock, false); + if (!BufferIsValid(mapBuffer)) + { + /* nothing to do, the file was already smaller */ + return; + } + + page = BufferGetPage(mapBuffer); + map = (XLogRecPtr *) PageGetContents(page); + + LockBuffer(mapBuffer, BUFFER_LOCK_EXCLUSIVE); + + /* Clear out the unwanted bytes. */ + MemSet(&map[truncPos + 1], 0, MAPSIZE - (truncPos * BYTES_PER_ENTRY + 1)); + + MarkBufferDirty(mapBuffer); + UnlockReleaseBuffer(mapBuffer); + } + else + newnblocks = truncBlock; + + if (smgrnblocks(rel->rd_smgr, LSNMAP_FORKNUM) <= newnblocks) + { + /* nothing to do, the file was already smaller than requested size */ + return; + } + + /* Truncate the unused LM pages, and send smgr inval message */ + smgrtruncate(rel->rd_smgr, LSNMAP_FORKNUM, newnblocks); + + /* + * We might as well update the local smgr_lm_nblocks setting. smgrtruncate + * sent an smgr cache inval message, which will cause other backends to + * invalidate their copy of smgr_lm_nblocks, and this one too at the next + * command boundary. But this ensures it isn't outright wrong until then. + */ + if (rel->rd_smgr) + rel->rd_smgr->smgr_lm_nblocks = newnblocks; + } + + /* + * Read a LSN map page. + * + * If the page doesn't exist, InvalidBuffer is returned, or if 'extend' is + * true, the LSN map file is extended. + */ + static Buffer + lm_readbuf(Relation rel, BlockNumber blkno, bool extend) + { + Buffer buf; + + /* + * We might not have opened the relation at the smgr level yet, or we + * might have been forced to close it by a sinval message. The code below + * won't necessarily notice relation extension immediately when extend = + * false, so we rely on sinval messages to ensure that our ideas about the + * size of the map aren't too far out of date. + */ + RelationOpenSmgr(rel); + + /* + * If we haven't cached the size of the LSN map fork yet, check it + * first. + */ + if (rel->rd_smgr->smgr_lm_nblocks == InvalidBlockNumber) + { + if (smgrexists(rel->rd_smgr, LSNMAP_FORKNUM)) + rel->rd_smgr->smgr_lm_nblocks = smgrnblocks(rel->rd_smgr, + LSNMAP_FORKNUM); + else + rel->rd_smgr->smgr_lm_nblocks = 0; + } + + /* Handle requests beyond EOF */ + if (blkno >= rel->rd_smgr->smgr_lm_nblocks) + { + if (extend) + lm_extend(rel, blkno + 1); + else + return InvalidBuffer; + } + + /* + * Use ZERO_ON_ERROR mode, and initialize the page if necessary. It's + * always safe to clear bits, so it's better to clear corrupt pages than + * error out. + */ + buf = ReadBufferExtended(rel, LSNMAP_FORKNUM, blkno, + RBM_ZERO_ON_ERROR, NULL); + if (PageIsNew(BufferGetPage(buf))) + PageInit(BufferGetPage(buf), BLCKSZ, 0); + return buf; + } + + /* + * Ensure that the LSN map fork is at least lm_nblocks long, extending + * it if necessary with zeroed pages. + */ + static void + lm_extend(Relation rel, BlockNumber lm_nblocks) + { + BlockNumber lm_nblocks_now; + Page pg; + + pg = (Page) palloc(BLCKSZ); + PageInit(pg, BLCKSZ, 0); + + /* + * We use the relation extension lock to lock out other backends trying to + * extend the LSN map at the same time. It also locks out extension + * of the main fork, unnecessarily, but extending the LSN map + * happens seldom enough that it doesn't seem worthwhile to have a + * separate lock tag type for it. + * + * Note that another backend might have extended or created the relation + * by the time we get the lock. + */ + LockRelationForExtension(rel, ExclusiveLock); + + /* Might have to re-open if a cache flush happened */ + RelationOpenSmgr(rel); + + /* + * Create the file first if it doesn't exist. If smgr_lm_nblocks is + * positive then it must exist, no need for an smgrexists call. + */ + if ((rel->rd_smgr->smgr_lm_nblocks == 0 || + rel->rd_smgr->smgr_lm_nblocks == InvalidBlockNumber) && + !smgrexists(rel->rd_smgr, LSNMAP_FORKNUM)) + smgrcreate(rel->rd_smgr, LSNMAP_FORKNUM, false); + + lm_nblocks_now = smgrnblocks(rel->rd_smgr, LSNMAP_FORKNUM); + + /* Now extend the file */ + while (lm_nblocks_now < lm_nblocks) + { + PageSetChecksumInplace(pg, lm_nblocks_now); + + smgrextend(rel->rd_smgr, LSNMAP_FORKNUM, lm_nblocks_now, + (char *) pg, false); + lm_nblocks_now++; + } + + /* + * Send a shared-inval message to force other backends to close any smgr + * references they may have for this rel, which we are about to change. + * This is a useful optimization because it means that backends don't have + * to keep checking for creation or extension of the file, which happens + * infrequently. + */ + CacheInvalidateSmgr(rel->rd_smgr->smgr_rnode); + + /* Update local cache with the up-to-date size */ + rel->rd_smgr->smgr_lm_nblocks = lm_nblocks_now; + + UnlockRelationForExtension(rel, ExclusiveLock); + + pfree(pg); + } diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c index 563e5c3..4586ef3 100644 *** a/src/backend/access/heap/pruneheap.c --- b/src/backend/access/heap/pruneheap.c *************** *** 18,23 **** --- 18,24 ---- #include "access/heapam_xlog.h" #include "access/transam.h" #include "access/htup_details.h" + #include "access/lsnmap.h" #include "access/xlog.h" #include "catalog/catalog.h" #include "miscadmin.h" *************** heap_page_prune(Relation relation, Buffe *** 175,184 **** --- 176,189 ---- { int ndeleted = 0; Page page = BufferGetPage(buffer); + Buffer lmbuffer = InvalidBuffer; OffsetNumber offnum, maxoff; PruneState prstate; + if (RelationNeedsWAL(relation)) + lsnmap_pin(relation, BufferGetBlockNumber(buffer), &lmbuffer); + /* * Our strategy is to scan the page and make lists of items to change, * then apply the changes within a critical section. This keeps as much *************** heap_page_prune(Relation relation, Buffe *** 262,267 **** --- 267,274 ---- prstate.latestRemovedXid); PageSetLSN(BufferGetPage(buffer), recptr); + + lsnmap_set(relation, BufferGetBlockNumber(buffer), lmbuffer, recptr); } } else *************** heap_page_prune(Relation relation, Buffe *** 286,291 **** --- 293,301 ---- END_CRIT_SECTION(); + if (lmbuffer != InvalidBuffer) + ReleaseBuffer(lmbuffer); + /* * If requested, report the number of tuples reclaimed to pgstats. This is * ndeleted minus ndead, because we don't want to count a now-DEAD root diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c index af5c158..0e10567 100644 *** a/src/backend/access/heap/rewriteheap.c --- b/src/backend/access/heap/rewriteheap.c *************** *** 109,114 **** --- 109,115 ---- #include "access/heapam.h" #include "access/heapam_xlog.h" + #include "access/lsnmap.h" #include "access/rewriteheap.h" #include "access/transam.h" #include "access/tuptoaster.h" *************** typedef struct RewriteStateData *** 143,148 **** --- 144,150 ---- Page rs_buffer; /* page currently being built */ BlockNumber rs_blockno; /* block where page will go */ bool rs_buffer_valid; /* T if any tuples in buffer */ + Buffer rs_lmbuffer; /* LSN map buffer */ bool rs_use_wal; /* must we WAL-log inserts? */ bool rs_logical_rewrite; /* do we need to do logical rewriting */ TransactionId rs_oldest_xmin; /* oldest xmin used by caller to *************** begin_heap_rewrite(Relation old_heap, Re *** 272,277 **** --- 274,280 ---- /* new_heap needn't be empty, just locked */ state->rs_blockno = RelationGetNumberOfBlocks(new_heap); state->rs_buffer_valid = false; + state->rs_lmbuffer = InvalidBuffer; state->rs_use_wal = use_wal; state->rs_oldest_xmin = oldest_xmin; state->rs_freeze_xid = freeze_xid; *************** end_heap_rewrite(RewriteState state) *** 332,342 **** if (state->rs_buffer_valid) { if (state->rs_use_wal) ! log_newpage(&state->rs_new_rel->rd_node, ! MAIN_FORKNUM, ! state->rs_blockno, ! state->rs_buffer, ! true); RelationOpenSmgr(state->rs_new_rel); PageSetChecksumInplace(state->rs_buffer, state->rs_blockno); --- 335,350 ---- if (state->rs_buffer_valid) { if (state->rs_use_wal) ! { ! XLogRecPtr recptr; ! lsnmap_pin(state->rs_new_rel, state->rs_blockno, &state->rs_lmbuffer); ! recptr = log_newpage(&state->rs_new_rel->rd_node, ! MAIN_FORKNUM, ! state->rs_blockno, ! state->rs_buffer, ! true); ! lsnmap_set(state->rs_new_rel, state->rs_blockno, state->rs_lmbuffer, recptr); ! } RelationOpenSmgr(state->rs_new_rel); PageSetChecksumInplace(state->rs_buffer, state->rs_blockno); *************** end_heap_rewrite(RewriteState state) *** 361,366 **** --- 369,378 ---- logical_end_heap_rewrite(state); + /* release the LSN map buffer*/ + if (state->rs_lmbuffer != InvalidBuffer) + ReleaseBuffer(state->rs_lmbuffer); + /* Deleting the context frees everything */ MemoryContextDelete(state->rs_cxt); } *************** raw_heap_insert(RewriteState state, Heap *** 681,691 **** /* XLOG stuff */ if (state->rs_use_wal) ! log_newpage(&state->rs_new_rel->rd_node, ! MAIN_FORKNUM, ! state->rs_blockno, ! page, ! true); /* * Now write the page. We say isTemp = true even if it's not a --- 693,708 ---- /* XLOG stuff */ if (state->rs_use_wal) ! { ! XLogRecPtr recptr; ! lsnmap_pin(state->rs_new_rel, state->rs_blockno, &state->rs_lmbuffer); ! recptr = log_newpage(&state->rs_new_rel->rd_node, ! MAIN_FORKNUM, ! state->rs_blockno, ! page, ! true); ! lsnmap_set(state->rs_new_rel, state->rs_blockno, state->rs_lmbuffer, recptr); ! } /* * Now write the page. We say isTemp = true even if it's not a diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c index ce398fc..979b649 100644 *** a/src/backend/catalog/storage.c --- b/src/backend/catalog/storage.c *************** *** 20,25 **** --- 20,26 ---- #include "postgres.h" #include "access/visibilitymap.h" + #include "access/lsnmap.h" #include "access/xact.h" #include "access/xlog.h" #include "access/xloginsert.h" *************** RelationTruncate(Relation rel, BlockNumb *** 228,233 **** --- 229,235 ---- { bool fsm; bool vm; + bool lm; /* Open it at the smgr level if not already done */ RelationOpenSmgr(rel); *************** RelationTruncate(Relation rel, BlockNumb *** 238,243 **** --- 240,246 ---- rel->rd_smgr->smgr_targblock = InvalidBlockNumber; rel->rd_smgr->smgr_fsm_nblocks = InvalidBlockNumber; rel->rd_smgr->smgr_vm_nblocks = InvalidBlockNumber; + rel->rd_smgr->smgr_lm_nblocks = InvalidBlockNumber; /* Truncate the FSM first if it exists */ fsm = smgrexists(rel->rd_smgr, FSM_FORKNUM); *************** RelationTruncate(Relation rel, BlockNumb *** 249,254 **** --- 252,262 ---- if (vm) visibilitymap_truncate(rel, nblocks); + /* Truncate the LSN map too if it exists. */ + lm = smgrexists(rel->rd_smgr, LSNMAP_FORKNUM); + if (lm) + lsnmap_truncate(rel, nblocks); + /* * We WAL-log the truncation before actually truncating, which means * trouble if the truncation fails. If we then crash, the WAL replay diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 66d5083..e805324 100644 *** a/src/backend/commands/tablecmds.c --- b/src/backend/commands/tablecmds.c *************** ATExecSetTableSpace(Oid tableOid, Oid ne *** 9299,9305 **** /* copy those extra forks that exist */ for (forkNum = MAIN_FORKNUM + 1; forkNum <= MAX_FORKNUM; forkNum++) { ! if (smgrexists(rel->rd_smgr, forkNum)) { smgrcreate(dstrel, forkNum, false); copy_relation_data(rel->rd_smgr, dstrel, forkNum, --- 9299,9306 ---- /* copy those extra forks that exist */ for (forkNum = MAIN_FORKNUM + 1; forkNum <= MAX_FORKNUM; forkNum++) { ! /* LSN map need to be skipped as it contains invalid data */ ! if (forkNum != LSNMAP_FORKNUM && smgrexists(rel->rd_smgr, forkNum)) { smgrcreate(dstrel, forkNum, false); copy_relation_data(rel->rd_smgr, dstrel, forkNum, *************** ATExecSetTableSpace(Oid tableOid, Oid ne *** 9307,9312 **** --- 9308,9315 ---- } } + /* TODO: build a correct LSN map here */ + /* drop old relation, and close new one */ RelationDropStorage(rel); smgrclose(dstrel); diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c index e653bbd..b0d24d7 100644 *** a/src/backend/commands/vacuumlazy.c --- b/src/backend/commands/vacuumlazy.c *************** *** 41,46 **** --- 41,47 ---- #include "access/heapam.h" #include "access/heapam_xlog.h" #include "access/htup_details.h" + #include "access/lsnmap.h" #include "access/multixact.h" #include "access/transam.h" #include "access/visibilitymap.h" *************** static void lazy_cleanup_index(Relation *** 146,152 **** IndexBulkDeleteResult *stats, LVRelStats *vacrelstats); static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer, ! int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer); static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats); static BlockNumber count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats); --- 147,153 ---- IndexBulkDeleteResult *stats, LVRelStats *vacrelstats); static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer, ! int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer, Buffer *lmbuffer); static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats); static BlockNumber count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats); *************** lazy_scan_heap(Relation onerel, LVRelSta *** 456,462 **** IndexBulkDeleteResult **indstats; int i; PGRUsage ru0; ! Buffer vmbuffer = InvalidBuffer; BlockNumber next_not_all_visible_block; bool skipping_all_visible_blocks; xl_heap_freeze_tuple *frozen; --- 457,464 ---- IndexBulkDeleteResult **indstats; int i; PGRUsage ru0; ! Buffer vmbuffer = InvalidBuffer, ! lmbuffer = InvalidBuffer; BlockNumber next_not_all_visible_block; bool skipping_all_visible_blocks; xl_heap_freeze_tuple *frozen; *************** lazy_scan_heap(Relation onerel, LVRelSta *** 618,623 **** --- 620,628 ---- vacrelstats->num_index_scans++; } + if (RelationNeedsWAL(onerel)) + lsnmap_pin(onerel, blkno, &lmbuffer); + /* * Pin the visibility map page in case we need to mark the page * all-visible. In most cases this will be very cheap, because we'll *************** lazy_scan_heap(Relation onerel, LVRelSta *** 966,971 **** --- 971,978 ---- recptr = log_heap_freeze(onerel, buf, FreezeLimit, frozen, nfrozen); PageSetLSN(page, recptr); + + lsnmap_set(onerel, BufferGetBlockNumber(buf), lmbuffer, recptr); } END_CRIT_SECTION(); *************** lazy_scan_heap(Relation onerel, LVRelSta *** 979,985 **** vacrelstats->num_dead_tuples > 0) { /* Remove tuples from heap */ ! lazy_vacuum_page(onerel, blkno, buf, 0, vacrelstats, &vmbuffer); has_dead_tuples = false; /* --- 986,992 ---- vacrelstats->num_dead_tuples > 0) { /* Remove tuples from heap */ ! lazy_vacuum_page(onerel, blkno, buf, 0, vacrelstats, &vmbuffer, &lmbuffer); has_dead_tuples = false; /* *************** lazy_scan_heap(Relation onerel, LVRelSta *** 1090,1095 **** --- 1097,1107 ---- ReleaseBuffer(vmbuffer); vmbuffer = InvalidBuffer; } + if (BufferIsValid(lmbuffer)) + { + ReleaseBuffer(lmbuffer); + lmbuffer = InvalidBuffer; + } /* If any tuples need to be deleted, perform final vacuum cycle */ /* XXX put a threshold on min number of tuples here? */ *************** lazy_vacuum_heap(Relation onerel, LVRelS *** 1170,1176 **** int tupindex; int npages; PGRUsage ru0; ! Buffer vmbuffer = InvalidBuffer; pg_rusage_init(&ru0); npages = 0; --- 1182,1189 ---- int tupindex; int npages; PGRUsage ru0; ! Buffer vmbuffer = InvalidBuffer, ! lmbuffer = InvalidBuffer; pg_rusage_init(&ru0); npages = 0; *************** lazy_vacuum_heap(Relation onerel, LVRelS *** 1195,1201 **** continue; } tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, vacrelstats, ! &vmbuffer); /* Now that we've compacted the page, record its available space */ page = BufferGetPage(buf); --- 1208,1214 ---- continue; } tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, vacrelstats, ! &vmbuffer, &lmbuffer); /* Now that we've compacted the page, record its available space */ page = BufferGetPage(buf); *************** lazy_vacuum_heap(Relation onerel, LVRelS *** 1211,1216 **** --- 1224,1234 ---- ReleaseBuffer(vmbuffer); vmbuffer = InvalidBuffer; } + if (BufferIsValid(lmbuffer)) + { + ReleaseBuffer(lmbuffer); + lmbuffer = InvalidBuffer; + } ereport(elevel, (errmsg("\"%s\": removed %d row versions in %d pages", *************** lazy_vacuum_heap(Relation onerel, LVRelS *** 1232,1244 **** */ static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer, ! int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer) { Page page = BufferGetPage(buffer); OffsetNumber unused[MaxOffsetNumber]; int uncnt = 0; TransactionId visibility_cutoff_xid; START_CRIT_SECTION(); for (; tupindex < vacrelstats->num_dead_tuples; tupindex++) --- 1250,1265 ---- */ static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer, ! int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer, Buffer *lmbuffer) { Page page = BufferGetPage(buffer); OffsetNumber unused[MaxOffsetNumber]; int uncnt = 0; TransactionId visibility_cutoff_xid; + if (RelationNeedsWAL(onerel)) + lsnmap_pin(onerel, blkno, lmbuffer); + START_CRIT_SECTION(); for (; tupindex < vacrelstats->num_dead_tuples; tupindex++) *************** lazy_vacuum_page(Relation onerel, BlockN *** 1273,1278 **** --- 1294,1301 ---- unused, uncnt, vacrelstats->latestRemovedXid); PageSetLSN(page, recptr); + + lsnmap_set(onerel, BufferGetBlockNumber(buffer), *lmbuffer, recptr); } /* diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c index 244b4ea..882dcbe 100644 *** a/src/backend/storage/smgr/smgr.c --- b/src/backend/storage/smgr/smgr.c *************** smgropen(RelFileNode rnode, BackendId ba *** 168,173 **** --- 168,174 ---- reln->smgr_targblock = InvalidBlockNumber; reln->smgr_fsm_nblocks = InvalidBlockNumber; reln->smgr_vm_nblocks = InvalidBlockNumber; + reln->smgr_lm_nblocks = InvalidBlockNumber; reln->smgr_which = 0; /* we only have md.c at present */ /* mark it not open */ diff --git a/src/common/relpath.c b/src/common/relpath.c index 66dfef1..8d52be7 100644 *** a/src/common/relpath.c --- b/src/common/relpath.c *************** const char *const forkNames[] = { *** 35,41 **** "main", /* MAIN_FORKNUM */ "fsm", /* FSM_FORKNUM */ "vm", /* VISIBILITYMAP_FORKNUM */ ! "init" /* INIT_FORKNUM */ }; /* --- 35,42 ---- "main", /* MAIN_FORKNUM */ "fsm", /* FSM_FORKNUM */ "vm", /* VISIBILITYMAP_FORKNUM */ ! "init", /* INIT_FORKNUM */ ! "lm" /* LSNMAP_FORKNUM*/ }; /* *************** forkname_to_number(const char *forkName) *** 58,64 **** (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid fork name"), errhint("Valid fork names are \"main\", \"fsm\", " ! "\"vm\", and \"init\"."))); #endif return InvalidForkNumber; --- 59,65 ---- (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid fork name"), errhint("Valid fork names are \"main\", \"fsm\", " ! "\"vm\", \"init\" and \"lm\"."))); #endif return InvalidForkNumber; diff --git a/src/include/access/hio.h b/src/include/access/hio.h index b014029..1ac5762 100644 *** a/src/include/access/hio.h --- b/src/include/access/hio.h *************** extern void RelationPutHeapTuple(Relatio *** 40,45 **** extern Buffer RelationGetBufferForTuple(Relation relation, Size len, Buffer otherBuffer, int options, BulkInsertState bistate, ! Buffer *vmbuffer, Buffer *vmbuffer_other); #endif /* HIO_H */ --- 40,46 ---- extern Buffer RelationGetBufferForTuple(Relation relation, Size len, Buffer otherBuffer, int options, BulkInsertState bistate, ! Buffer *vmbuffer, Buffer *vmbuffer_other, ! Buffer *lmbuffer, Buffer *lmbuffer_other); #endif /* HIO_H */ diff --git a/src/include/access/lsnmap.h b/src/include/access/lsnmap.h index ...e61bbc3 100644 *** a/src/include/access/lsnmap.h --- b/src/include/access/lsnmap.h *************** *** 0 **** --- 1,28 ---- + /*------------------------------------------------------------------------- + * + * lsnmap.h + * lsn map interface + * + * + * Portions Copyright (c) 2007-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/lsnmap.h + * + *------------------------------------------------------------------------- + */ + #ifndef LSNMAP_H + #define LSNMAP_H + + #include "access/xlogdefs.h" + #include "storage/block.h" + #include "storage/buf.h" + #include "utils/relcache.h" + + extern void lsnmap_pin(Relation rel, BlockNumber heapBlk, + Buffer *lmbuf); + extern void lsnmap_set(Relation rel, BlockNumber heapBlk, Buffer lmBuf, + XLogRecPtr lsn); + extern void lsnmap_truncate(Relation rel, BlockNumber nheapblocks); + + #endif /* LSNMAP_H */ diff --git a/src/include/common/relpath.h b/src/include/common/relpath.h index a263779..0c90191 100644 *** a/src/include/common/relpath.h --- b/src/include/common/relpath.h *************** typedef enum ForkNumber *** 27,33 **** MAIN_FORKNUM = 0, FSM_FORKNUM, VISIBILITYMAP_FORKNUM, ! INIT_FORKNUM /* * NOTE: if you add a new fork, change MAX_FORKNUM and possibly --- 27,34 ---- MAIN_FORKNUM = 0, FSM_FORKNUM, VISIBILITYMAP_FORKNUM, ! INIT_FORKNUM, ! LSNMAP_FORKNUM /* * NOTE: if you add a new fork, change MAX_FORKNUM and possibly *************** typedef enum ForkNumber *** 36,42 **** */ } ForkNumber; ! #define MAX_FORKNUM INIT_FORKNUM #define FORKNAMECHARS 4 /* max chars for a fork name */ --- 37,43 ---- */ } ForkNumber; ! #define MAX_FORKNUM LSNMAP_FORKNUM #define FORKNAMECHARS 4 /* max chars for a fork name */ diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h index 69a624f..f40532f 100644 *** a/src/include/storage/smgr.h --- b/src/include/storage/smgr.h *************** typedef struct SMgrRelationData *** 55,60 **** --- 55,61 ---- BlockNumber smgr_targblock; /* current insertion target block */ BlockNumber smgr_fsm_nblocks; /* last known size of fsm fork */ BlockNumber smgr_vm_nblocks; /* last known size of vm fork */ + BlockNumber smgr_lm_nblocks; /* last known size of lm fork */ /* additional public fields may someday exist here */ -- 2.2.0