*** src/backend/access/heap/Makefile
--- src/backend/access/heap/Makefile
***************
*** 12,17 **** subdir = src/backend/access/heap
  top_builddir = ../../../..
  include $(top_builddir)/src/Makefile.global
  
! OBJS = heapam.o hio.o pruneheap.o rewriteheap.o syncscan.o tuptoaster.o
  
  include $(top_srcdir)/src/backend/common.mk
--- 12,17 ----
  top_builddir = ../../../..
  include $(top_builddir)/src/Makefile.global
  
! OBJS = heapam.o hio.o pruneheap.o rewriteheap.o syncscan.o tuptoaster.o visibilitymap.o
  
  include $(top_srcdir)/src/backend/common.mk
*** src/backend/access/heap/heapam.c
--- src/backend/access/heap/heapam.c
***************
*** 47,52 ****
--- 47,53 ----
  #include "access/transam.h"
  #include "access/tuptoaster.h"
  #include "access/valid.h"
+ #include "access/visibilitymap.h"
  #include "access/xact.h"
  #include "access/xlogutils.h"
  #include "catalog/catalog.h"
***************
*** 194,199 **** heapgetpage(HeapScanDesc scan, BlockNumber page)
--- 195,201 ----
  	int			ntup;
  	OffsetNumber lineoff;
  	ItemId		lpp;
+ 	bool		all_visible;
  
  	Assert(page < scan->rs_nblocks);
  
***************
*** 233,252 **** heapgetpage(HeapScanDesc scan, BlockNumber page)
  	lines = PageGetMaxOffsetNumber(dp);
  	ntup = 0;
  
  	for (lineoff = FirstOffsetNumber, lpp = PageGetItemId(dp, lineoff);
  		 lineoff <= lines;
  		 lineoff++, lpp++)
  	{
  		if (ItemIdIsNormal(lpp))
  		{
- 			HeapTupleData loctup;
  			bool		valid;
  
! 			loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
! 			loctup.t_len = ItemIdGetLength(lpp);
! 			ItemPointerSet(&(loctup.t_self), page, lineoff);
  
! 			valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
  			if (valid)
  				scan->rs_vistuples[ntup++] = lineoff;
  		}
--- 235,266 ----
  	lines = PageGetMaxOffsetNumber(dp);
  	ntup = 0;
  
+ 	/*
+ 	 * If the all-visible flag indicates that all tuples on the page are
+ 	 * visible to everyone, we can skip the per-tuple visibility tests.
+ 	 */
+ 	all_visible = PageIsAllVisible(dp);
+ 
  	for (lineoff = FirstOffsetNumber, lpp = PageGetItemId(dp, lineoff);
  		 lineoff <= lines;
  		 lineoff++, lpp++)
  	{
  		if (ItemIdIsNormal(lpp))
  		{
  			bool		valid;
  
! 			if (all_visible)
! 				valid = true;
! 			else
! 			{
! 				HeapTupleData loctup;
! 
! 				loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
! 				loctup.t_len = ItemIdGetLength(lpp);
! 				ItemPointerSet(&(loctup.t_self), page, lineoff);
  
! 				valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
! 			}
  			if (valid)
  				scan->rs_vistuples[ntup++] = lineoff;
  		}
***************
*** 1914,1919 **** heap_insert(Relation relation, HeapTuple tup, CommandId cid,
--- 1928,1934 ----
  		Page		page = BufferGetPage(buffer);
  		uint8		info = XLOG_HEAP_INSERT;
  
+ 		xlrec.all_visible_cleared = PageIsAllVisible(page);
  		xlrec.target.node = relation->rd_node;
  		xlrec.target.tid = heaptup->t_self;
  		rdata[0].data = (char *) &xlrec;
***************
*** 1961,1966 **** heap_insert(Relation relation, HeapTuple tup, CommandId cid,
--- 1976,1991 ----
  		PageSetTLI(page, ThisTimeLineID);
  	}
  
+ 	if (PageIsAllVisible(BufferGetPage(buffer)))
+ 	{
+ 		/*
+ 		 * The bit in the visibility map was already cleared by
+ 		 * RelationGetBufferForTuple
+ 		 */
+ 		/* visibilitymap_clear(relation, BufferGetBlockNumber(buffer)); */
+ 		PageClearAllVisible(BufferGetPage(buffer));
+ 	}
+ 
  	END_CRIT_SECTION();
  
  	UnlockReleaseBuffer(buffer);
***************
*** 2045,2050 **** heap_delete(Relation relation, ItemPointer tid,
--- 2070,2080 ----
  	Assert(ItemPointerIsValid(tid));
  
  	buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
+ 
+ 	/* Clear the bit in the visibility map if necessary */
+ 	if (PageIsAllVisible(BufferGetPage(buffer)))
+ 		visibilitymap_clear(relation, BufferGetBlockNumber(buffer));
+ 
  	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
  
  	page = BufferGetPage(buffer);
***************
*** 2208,2213 **** l1:
--- 2238,2244 ----
  		XLogRecPtr	recptr;
  		XLogRecData rdata[2];
  
+ 		xlrec.all_visible_cleared = PageIsAllVisible(page);
  		xlrec.target.node = relation->rd_node;
  		xlrec.target.tid = tp.t_self;
  		rdata[0].data = (char *) &xlrec;
***************
*** 2229,2234 **** l1:
--- 2260,2268 ----
  
  	END_CRIT_SECTION();
  
+ 	if (PageIsAllVisible(page))
+ 		PageClearAllVisible(page);
+ 
  	LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
  
  	/*
***************
*** 2627,2632 **** l2:
--- 2661,2670 ----
  		}
  		else
  		{
+ 			/* Clear bit in visibility map */
+ 			if (PageIsAllVisible(page))
+ 				visibilitymap_clear(relation, BufferGetBlockNumber(buffer));
+ 
  			/* Re-acquire the lock on the old tuple's page. */
  			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
  			/* Re-check using the up-to-date free space */
***************
*** 2750,2755 **** l2:
--- 2788,2799 ----
  		PageSetTLI(BufferGetPage(buffer), ThisTimeLineID);
  	}
  
+ 	/* The bits in visibility map were already cleared */
+ 	if (PageIsAllVisible(BufferGetPage(buffer)))
+ 		PageClearAllVisible(BufferGetPage(buffer));
+ 	if (newbuf != buffer && PageIsAllVisible(BufferGetPage(newbuf)))
+ 		PageClearAllVisible(BufferGetPage(newbuf));
+ 
  	END_CRIT_SECTION();
  
  	if (newbuf != buffer)
***************
*** 3381,3386 **** l3:
--- 3425,3436 ----
  
  	END_CRIT_SECTION();
  
+ 	/*
+ 	 * Don't update the visibility map here. Locking a tuple doesn't
+ 	 * change visibility info.
+ 	 */
+ 	/* visibilitymap_clear(relation, tuple->t_self); */
+ 
  	LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
  
  	/*
***************
*** 3727,3733 **** log_heap_clean(Relation reln, Buffer buffer,
  			   OffsetNumber *redirected, int nredirected,
  			   OffsetNumber *nowdead, int ndead,
  			   OffsetNumber *nowunused, int nunused,
! 			   bool redirect_move)
  {
  	xl_heap_clean xlrec;
  	uint8		info;
--- 3777,3783 ----
  			   OffsetNumber *redirected, int nredirected,
  			   OffsetNumber *nowdead, int ndead,
  			   OffsetNumber *nowunused, int nunused,
! 			   bool redirect_move, bool all_visible_set)
  {
  	xl_heap_clean xlrec;
  	uint8		info;
***************
*** 3741,3746 **** log_heap_clean(Relation reln, Buffer buffer,
--- 3791,3797 ----
  	xlrec.block = BufferGetBlockNumber(buffer);
  	xlrec.nredirected = nredirected;
  	xlrec.ndead = ndead;
+ 	xlrec.all_visible_set = all_visible_set;
  
  	rdata[0].data = (char *) &xlrec;
  	rdata[0].len = SizeOfHeapClean;
***************
*** 3892,3900 **** log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,
--- 3943,3953 ----
  	else
  		info = XLOG_HEAP_UPDATE;
  
+ 	xlrec.all_visible_cleared = PageIsAllVisible(BufferGetPage(oldbuf));
  	xlrec.target.node = reln->rd_node;
  	xlrec.target.tid = from;
  	xlrec.newtid = newtup->t_self;
+ 	xlrec.new_all_visible_cleared = PageIsAllVisible(BufferGetPage(newbuf));
  
  	rdata[0].data = (char *) &xlrec;
  	rdata[0].len = SizeOfHeapUpdate;
***************
*** 4029,4034 **** heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record, bool clean_move)
--- 4082,4088 ----
  	int			nredirected;
  	int			ndead;
  	int			nunused;
+ 	bool		all_visible_set;
  
  	if (record->xl_info & XLR_BKP_BLOCK_1)
  		return;
***************
*** 4046,4051 **** heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record, bool clean_move)
--- 4100,4106 ----
  
  	nredirected = xlrec->nredirected;
  	ndead = xlrec->ndead;
+ 	all_visible_set = xlrec->all_visible_set;
  	end = (OffsetNumber *) ((char *) xlrec + record->xl_len);
  	redirected = (OffsetNumber *) ((char *) xlrec + SizeOfHeapClean);
  	nowdead = redirected + (nredirected * 2);
***************
*** 4058,4064 **** heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record, bool clean_move)
  							redirected, nredirected,
  							nowdead, ndead,
  							nowunused, nunused,
! 							clean_move);
  
  	/*
  	 * Note: we don't worry about updating the page's prunability hints.
--- 4113,4119 ----
  							redirected, nredirected,
  							nowdead, ndead,
  							nowunused, nunused,
! 							clean_move, all_visible_set);
  
  	/*
  	 * Note: we don't worry about updating the page's prunability hints.
***************
*** 4152,4157 **** heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
--- 4207,4224 ----
  	ItemId		lp = NULL;
  	HeapTupleHeader htup;
  
+ 	/*
+ 	 * The visibility map always needs to be updated, even if the heap page
+ 	 * is already up-to-date.
+ 	 */
+ 	if (xlrec->all_visible_cleared)
+ 	{
+ 		Relation reln = CreateFakeRelcacheEntry(xlrec->target.node);
+ 
+ 		visibilitymap_clear(reln, ItemPointerGetBlockNumber(&(xlrec->target.tid)));
+ 		FreeFakeRelcacheEntry(reln);
+ 	}
+ 
  	if (record->xl_info & XLR_BKP_BLOCK_1)
  		return;
  
***************
*** 4189,4194 **** heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
--- 4256,4264 ----
  	/* Mark the page as a candidate for pruning */
  	PageSetPrunable(page, record->xl_xid);
  
+ 	if (xlrec->all_visible_cleared)
+ 		PageClearAllVisible(page);
+ 
  	/* Make sure there is no forward chain link in t_ctid */
  	htup->t_ctid = xlrec->target.tid;
  	PageSetLSN(page, lsn);
***************
*** 4213,4218 **** heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
--- 4283,4299 ----
  	xl_heap_header xlhdr;
  	uint32		newlen;
  
+ 	/*
+ 	 * The visibility map always needs to be updated, even if the heap page
+ 	 * is already up-to-date.
+ 	 */
+ 	if (xlrec->all_visible_cleared)
+ 	{
+ 		Relation reln = CreateFakeRelcacheEntry(xlrec->target.node);
+ 		visibilitymap_clear(reln, ItemPointerGetBlockNumber(&xlrec->target.tid));
+ 		FreeFakeRelcacheEntry(reln);
+ 	}
+ 
  	if (record->xl_info & XLR_BKP_BLOCK_1)
  		return;
  
***************
*** 4270,4275 **** heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
--- 4351,4360 ----
  		elog(PANIC, "heap_insert_redo: failed to add tuple");
  	PageSetLSN(page, lsn);
  	PageSetTLI(page, ThisTimeLineID);
+ 
+ 	if (xlrec->all_visible_cleared)
+ 		PageClearAllVisible(page);
+ 
  	MarkBufferDirty(buffer);
  	UnlockReleaseBuffer(buffer);
  }
***************
*** 4297,4302 **** heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)
--- 4382,4398 ----
  	int			hsize;
  	uint32		newlen;
  
+ 	/*
+ 	 * The visibility map always needs to be updated, even if the heap page
+ 	 * is already up-to-date.
+ 	 */
+ 	if (xlrec->all_visible_cleared)
+ 	{
+ 		Relation reln = CreateFakeRelcacheEntry(xlrec->target.node);
+ 		visibilitymap_clear(reln, ItemPointerGetBlockNumber(&xlrec->target.tid));
+ 		FreeFakeRelcacheEntry(reln);
+ 	}
+ 
  	if (record->xl_info & XLR_BKP_BLOCK_1)
  	{
  		if (samepage)
***************
*** 4361,4366 **** heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)
--- 4457,4465 ----
  	/* Mark the page as a candidate for pruning */
  	PageSetPrunable(page, record->xl_xid);
  
+ 	if (xlrec->all_visible_cleared)
+ 		PageClearAllVisible(page);
+ 
  	/*
  	 * this test is ugly, but necessary to avoid thinking that insert change
  	 * is already applied
***************
*** 4376,4381 **** heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool move, bool hot_update)
--- 4475,4491 ----
  
  newt:;
  
+ 	/*
+ 	 * The visibility map always needs to be updated, even if the heap page
+ 	 * is already up-to-date.
+ 	 */
+ 	if (xlrec->new_all_visible_cleared)
+ 	{
+ 		Relation reln = CreateFakeRelcacheEntry(xlrec->target.node);
+ 		visibilitymap_clear(reln, ItemPointerGetBlockNumber(&xlrec->newtid));
+ 		FreeFakeRelcacheEntry(reln);
+ 	}
+ 
  	if (record->xl_info & XLR_BKP_BLOCK_2)
  		return;
  
***************
*** 4453,4458 **** newsame:;
--- 4563,4572 ----
  	offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
  	if (offnum == InvalidOffsetNumber)
  		elog(PANIC, "heap_update_redo: failed to add tuple");
+ 
+ 	if (xlrec->new_all_visible_cleared)
+ 		PageClearAllVisible(page);
+ 
  	PageSetLSN(page, lsn);
  	PageSetTLI(page, ThisTimeLineID);
  	MarkBufferDirty(buffer);
*** src/backend/access/heap/hio.c
--- src/backend/access/heap/hio.c
***************
*** 16,21 ****
--- 16,22 ----
  #include "postgres.h"
  
  #include "access/hio.h"
+ #include "access/visibilitymap.h"
  #include "storage/bufmgr.h"
  #include "storage/freespace.h"
  #include "storage/lmgr.h"
***************
*** 221,229 **** RelationGetBufferForTuple(Relation relation, Size len,
  		pageFreeSpace = PageGetHeapFreeSpace(page);
  		if (len + saveFreeSpace <= pageFreeSpace)
  		{
! 			/* use this page as future insert target, too */
! 			relation->rd_targblock = targetBlock;
! 			return buffer;
  		}
  
  		/*
--- 222,278 ----
  		pageFreeSpace = PageGetHeapFreeSpace(page);
  		if (len + saveFreeSpace <= pageFreeSpace)
  		{
! 			if (PageIsAllVisible(page))
! 			{
! 				/*
! 				 * Need to update the visibility map first. Let's drop the
! 				 * locks while we do that.
! 				 */
! 				LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
! 				if (otherBlock != targetBlock && BufferIsValid(otherBuffer))
! 					LockBuffer(otherBuffer, BUFFER_LOCK_UNLOCK);
! 
! 				visibilitymap_clear(relation, BufferGetBlockNumber(buffer));
! 
! 				/* relock */
! 				if (otherBuffer == InvalidBuffer)
! 				{
! 					/* easy case */
! 					LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
! 				}
! 				else if (otherBlock == targetBlock)
! 				{
! 					/* also easy case */
! 					LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
! 				}
! 				else if (otherBlock < targetBlock)
! 				{
! 					/* lock other buffer first */
! 					LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
! 					LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
! 				}
! 				else
! 				{
! 					/* lock target buffer first */
! 					LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
! 					LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
! 				}
! 				
! 				/* Check if it still has enough space */
! 				pageFreeSpace = PageGetHeapFreeSpace(page);
! 				if (len + saveFreeSpace <= pageFreeSpace)
! 				{
! 					/* use this page as future insert target, too */
! 					relation->rd_targblock = targetBlock;
! 					return buffer;
! 				}
! 			}
! 			else
! 			{
! 				/* use this page as future insert target, too */
! 				relation->rd_targblock = targetBlock;
! 				return buffer;
! 			}
  		}
  
  		/*
***************
*** 276,281 **** RelationGetBufferForTuple(Relation relation, Size len,
--- 325,332 ----
  	 */
  	buffer = ReadBuffer(relation, P_NEW);
  
+ 	visibilitymap_extend(relation, BufferGetBlockNumber(buffer) + 1);
+ 
  	/*
  	 * We can be certain that locking the otherBuffer first is OK, since it
  	 * must have a lower page number.
*** src/backend/access/heap/pruneheap.c
--- src/backend/access/heap/pruneheap.c
***************
*** 17,22 ****
--- 17,24 ----
  #include "access/heapam.h"
  #include "access/htup.h"
  #include "access/transam.h"
+ #include "access/visibilitymap.h"
+ #include "access/xlogdefs.h"
  #include "miscadmin.h"
  #include "pgstat.h"
  #include "storage/bufmgr.h"
***************
*** 37,42 **** typedef struct
--- 39,45 ----
  	OffsetNumber redirected[MaxHeapTuplesPerPage * 2];
  	OffsetNumber nowdead[MaxHeapTuplesPerPage];
  	OffsetNumber nowunused[MaxHeapTuplesPerPage];
+ 	bool		all_visible_set;
  	/* marked[i] is TRUE if item i is entered in one of the above arrays */
  	bool		marked[MaxHeapTuplesPerPage + 1];
  } PruneState;
***************
*** 156,161 **** heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
--- 159,166 ----
  	OffsetNumber offnum,
  				maxoff;
  	PruneState	prstate;
+ 	bool		all_visible, all_visible_in_future;
+ 	TransactionId newest_xid;
  
  	/*
  	 * Our strategy is to scan the page and make lists of items to change,
***************
*** 177,182 **** heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
--- 182,188 ----
  	 */
  	prstate.new_prune_xid = InvalidTransactionId;
  	prstate.nredirected = prstate.ndead = prstate.nunused = 0;
+ 	prstate.all_visible_set = false;
  	memset(prstate.marked, 0, sizeof(prstate.marked));
  
  	/* Scan the page */
***************
*** 215,220 **** heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
--- 221,317 ----
  	if (redirect_move)
  		EndNonTransactionalInvalidation();
  
+ 	/* Update the visibility map */
+ 	all_visible = true;
+ 	all_visible_in_future = true;
+ 	newest_xid = InvalidTransactionId;
+ 	maxoff = PageGetMaxOffsetNumber(page);
+ 	for (offnum = FirstOffsetNumber;
+ 		 offnum <= maxoff;
+ 		 offnum = OffsetNumberNext(offnum))
+ 	{
+ 		ItemId itemid = PageGetItemId(page, offnum);
+ 		HeapTupleHeader htup;
+ 		HTSV_Result status;
+ 
+ 		if (!ItemIdIsUsed(itemid) || ItemIdIsRedirected(itemid))
+ 			continue;
+ 
+ 		if (ItemIdIsDead(itemid))
+ 		{
+ 			all_visible = false;
+ 			all_visible_in_future = false;
+ 			break;
+ 		}
+ 
+ 		htup = (HeapTupleHeader) PageGetItem(page, itemid);
+ 		status = HeapTupleSatisfiesVacuum(htup, OldestXmin, buffer);
+ 		switch(status)
+ 		{
+ 			case HEAPTUPLE_DEAD:
+ 				/*
+ 				 * There shouldn't be any dead tuples left on the page, since
+ 				 * we just pruned. They should've been truncated to just dead
+ 				 * line pointers.
+ 				 */
+ 				Assert(false);
+ 			case HEAPTUPLE_RECENTLY_DEAD:
+ 				/*
+ 				 * This tuple is not visible to all, and it won't become
+ 				 * so in the future
+ 				 */
+ 				all_visible = false;
+ 				all_visible_in_future = false;
+ 				break;
+ 			case HEAPTUPLE_INSERT_IN_PROGRESS:
+ 				/*
+ 				 * This tuple is not visible to all. But it might become
+ 				 * so in the future, if the inserter commits.
+ 				 */
+ 				all_visible = false;
+ 				if (TransactionIdFollows(HeapTupleHeaderGetXmin(htup), newest_xid))
+ 					newest_xid = HeapTupleHeaderGetXmin(htup);
+ 				break;
+ 			case HEAPTUPLE_DELETE_IN_PROGRESS:
+ 				/*
+ 				 * This tuple is not visible to all. But it might become
+ 				 * so in the future, if the deleter aborts.
+ 				 */
+ 				all_visible = false;
+ 				if (TransactionIdFollows(HeapTupleHeaderGetXmax(htup), newest_xid))
+ 					newest_xid = HeapTupleHeaderGetXmax(htup);
+ 				break;
+ 			case HEAPTUPLE_LIVE:
+ 				/*
+ 				 * Check if the inserter is old enough that this tuple is
+ 				 * visible to all
+ 				 */
+ 				if (!TransactionIdPrecedes(HeapTupleHeaderGetXmin(htup), OldestXmin))
+ 				{
+ 					/*
+ 					 * Nope. But as OldestXmin advances beyond xmin, this
+ 					 * will become visible to all
+ 					 */
+ 					all_visible = false;
+ 					if (TransactionIdFollows(HeapTupleHeaderGetXmin(htup), newest_xid))
+ 						newest_xid = HeapTupleHeaderGetXmin(htup);
+ 				}
+ 		}
+ 	}
+ 	if (all_visible)
+ 	{
+ 		if (!PageIsAllVisible(page))
+ 			prstate.all_visible_set = true;
+ 	}
+ 	else if (all_visible_in_future && TransactionIdIsValid(newest_xid))
+ 	{
+ 		/*
+ 		 * We still have hope that all tuples will become visible
+ 		 * in the future
+ 		 */
+ 		heap_prune_record_prunable(&prstate, newest_xid);
+ 	}
+ 
  	/* Any error while applying the changes is critical */
  	START_CRIT_SECTION();
  
***************
*** 230,236 **** heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
  								prstate.redirected, prstate.nredirected,
  								prstate.nowdead, prstate.ndead,
  								prstate.nowunused, prstate.nunused,
! 								redirect_move);
  
  		/*
  		 * Update the page's pd_prune_xid field to either zero, or the lowest
--- 327,333 ----
  								prstate.redirected, prstate.nredirected,
  								prstate.nowdead, prstate.ndead,
  								prstate.nowunused, prstate.nunused,
! 								redirect_move, prstate.all_visible_set);
  
  		/*
  		 * Update the page's pd_prune_xid field to either zero, or the lowest
***************
*** 253,264 **** heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
  		if (!relation->rd_istemp)
  		{
  			XLogRecPtr	recptr;
- 
  			recptr = log_heap_clean(relation, buffer,
  									prstate.redirected, prstate.nredirected,
  									prstate.nowdead, prstate.ndead,
  									prstate.nowunused, prstate.nunused,
! 									redirect_move);
  
  			PageSetLSN(BufferGetPage(buffer), recptr);
  			PageSetTLI(BufferGetPage(buffer), ThisTimeLineID);
--- 350,360 ----
  		if (!relation->rd_istemp)
  		{
  			XLogRecPtr	recptr;
  			recptr = log_heap_clean(relation, buffer,
  									prstate.redirected, prstate.nredirected,
  									prstate.nowdead, prstate.ndead,
  									prstate.nowunused, prstate.nunused,
! 									redirect_move, prstate.all_visible_set);
  
  			PageSetLSN(BufferGetPage(buffer), recptr);
  			PageSetTLI(BufferGetPage(buffer), ThisTimeLineID);
***************
*** 701,707 **** heap_page_prune_execute(Buffer buffer,
  						OffsetNumber *redirected, int nredirected,
  						OffsetNumber *nowdead, int ndead,
  						OffsetNumber *nowunused, int nunused,
! 						bool redirect_move)
  {
  	Page		page = (Page) BufferGetPage(buffer);
  	OffsetNumber *offnum;
--- 797,803 ----
  						OffsetNumber *redirected, int nredirected,
  						OffsetNumber *nowdead, int ndead,
  						OffsetNumber *nowunused, int nunused,
! 						bool redirect_move, bool all_visible)
  {
  	Page		page = (Page) BufferGetPage(buffer);
  	OffsetNumber *offnum;
***************
*** 766,771 **** heap_page_prune_execute(Buffer buffer,
--- 862,875 ----
  	 * whether it has free pointers.
  	 */
  	PageRepairFragmentation(page);
+ 
+ 	/*
+ 	 * We don't want poke the visibility map from here, as that might mean
+ 	 * physical I/O; just set the flag on the heap page. The caller can
+ 	 * update the visibility map afterwards if it wants to.
+ 	 */
+ 	if (all_visible)
+ 		PageSetAllVisible(page);
  }
  
  
*** /dev/null
--- src/backend/access/heap/visibilitymap.c
***************
*** 0 ****
--- 1,312 ----
+ /*-------------------------------------------------------------------------
+  *
+  * visibilitymap.c
+  *	  Visibility map
+  *
+  * Portions Copyright (c) 2008, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  *
+  * IDENTIFICATION
+  *	  $PostgreSQL$
+  *
+  * NOTES
+  *
+  * The visibility map is a bitmap with one bit per heap page. A set bit means
+  * that all tuples on the page are visible to all transactions. The
+  * map is conservative in the sense that we make sure that whenever a bit is
+  * set, we know the condition is true, but if a bit is not set, it might
+  * or might not be.
+  *
+  * From that it follows that when a bit is set, we need to update the LSN
+  * of the page to make sure that it doesn't get written to disk before the
+  * WAL record of the changes that made it possible to set the bit is flushed.
+  * But when a bit is cleared, we don't have to do that because if the page is
+  * flushed early, it's ok.
+  *
+  * There's no explicit WAL logging in the functions in this file. The callers
+  * must make sure that whenever a bit is cleared, the bit is cleared on WAL
+  * replay of the updating operation as well. XXX: the WAL-logging of setting
+  * bit needs more thought.
+  *
+  * LOCKING
+  *
+  * To clear a bit for a heap page, caller must hold an exclusive lock
+  * on the heap page. To set a bit, a clean up lock on the heap page is
+  * needed.
+  *
+  *-------------------------------------------------------------------------
+  */
+ #include "postgres.h"
+ 
+ #include "access/visibilitymap.h"
+ #include "storage/bufmgr.h"
+ #include "storage/bufpage.h"
+ #include "storage/smgr.h"
+ 
+ //#define TRACE_VISIBILITYMAP
+ 
+ /* Number of bits allocated for each heap block. */
+ #define BITS_PER_HEAPBLOCK 1
+ 
+ /* Number of heap blocks we can represent in one byte. */
+ #define HEAPBLOCKS_PER_BYTE 8
+ 
+ /* Number of heap blocks we can represent in one visibility map page */
+ #define HEAPBLOCKS_PER_PAGE ((BLCKSZ - SizeOfPageHeaderData) * HEAPBLOCKS_PER_BYTE )
+ 
+ /* Mapping from heap block number to the right bit in the visibility map */
+ #define HEAPBLK_TO_MAPBLOCK(x) ((x) / HEAPBLOCKS_PER_PAGE)
+ #define HEAPBLK_TO_MAPBYTE(x) (((x) % HEAPBLOCKS_PER_PAGE) / HEAPBLOCKS_PER_BYTE)
+ #define HEAPBLK_TO_MAPBIT(x) ((x) % HEAPBLOCKS_PER_BYTE)
+ 
+ static Buffer ReadVMBuffer(Relation rel, BlockNumber blkno);
+ static Buffer ReleaseAndReadVMBuffer(Relation rel, BlockNumber blkno, Buffer oldbuf);
+ 
+ static Buffer
+ ReadVMBuffer(Relation rel, BlockNumber blkno)
+ {
+ 	if (blkno == P_NEW)
+ 		return ReadBufferWithFork(rel, VISIBILITYMAP_FORKNUM, P_NEW);
+ 
+ 	if (rel->rd_vm_nblocks_cache == InvalidBlockNumber ||
+ 		rel->rd_vm_nblocks_cache <= blkno)
+ 		rel->rd_vm_nblocks_cache = smgrnblocks(rel->rd_smgr, VISIBILITYMAP_FORKNUM);
+ 
+ 	if (blkno >= rel->rd_fsm_nblocks_cache)
+ 		return InvalidBuffer;
+ 	else
+ 		return ReadBufferWithFork(rel, VISIBILITYMAP_FORKNUM, blkno);
+ }
+ 
+ static Buffer
+ ReleaseAndReadVMBuffer(Relation rel, BlockNumber blkno, Buffer oldbuf)
+ {
+ 	if (BufferIsValid(oldbuf))
+ 	{
+ 		if (BufferGetBlockNumber(oldbuf) == blkno)
+ 			return oldbuf;
+ 		else
+ 			ReleaseBuffer(oldbuf);
+ 	}
+ 
+ 	return ReadVMBuffer(rel, blkno);
+ }
+ 
+ void
+ visibilitymap_truncate(Relation rel, BlockNumber nheapblocks)
+ {
+ 	BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(nheapblocks);
+ 	uint32		mapByte  = HEAPBLK_TO_MAPBYTE(nheapblocks);
+ 	uint8		mapBit   = HEAPBLK_TO_MAPBIT(nheapblocks);
+ 
+ #ifdef TRACE_VISIBILITYMAP
+ 	elog(LOG, "vm_truncate %s %d", RelationGetRelationName(rel), nheapblocks);
+ #endif
+ 
+ 	/* Truncate away pages that are no longer needed */
+ 	if (mapBlock == 0 && mapBit == 0)
+ 		smgrtruncate(rel->rd_smgr, VISIBILITYMAP_FORKNUM, mapBlock,
+ 					 rel->rd_istemp);
+ 	else
+ 	{
+ 		Buffer mapBuffer;
+ 		Page page;
+ 		char *mappage;
+ 		int len;
+ 
+ 		smgrtruncate(rel->rd_smgr, VISIBILITYMAP_FORKNUM, mapBlock + 1,
+ 					 rel->rd_istemp);
+ 
+ 		/*
+ 		 * Clear all bits in the last map page, that represent the truncated
+ 		 * heap blocks. This is not only tidy, but also necessary because
+ 		 * we don't clear the bits on extension.
+ 		 */
+ 		mapBuffer = ReadVMBuffer(rel, mapBlock);
+ 		if (BufferIsValid(mapBuffer))
+ 		{
+ 			page = BufferGetPage(mapBuffer);
+ 			mappage = PageGetContents(page);
+ 
+ 			LockBuffer(mapBuffer, BUFFER_LOCK_EXCLUSIVE);
+ 
+ 			/*
+ 			 * Clear out the unwanted bytes.
+ 			 */
+ 			len = HEAPBLOCKS_PER_PAGE/HEAPBLOCKS_PER_BYTE - (mapByte + 1);
+ 			MemSet(&mappage[mapByte + 1], 0, len);
+ 
+ 			/*
+ 			 * Mask out the unwanted bits of the last remaining byte
+ 			 *
+ 			 * ((1 << 0) - 1) = 00000000
+ 			 * ((1 << 1) - 1) = 00000001
+ 			 * ...
+ 			 * ((1 << 6) - 1) = 00111111
+ 			 * ((1 << 7) - 1) = 01111111
+ 			 */
+ 			mappage[mapByte] &= (1 << mapBit) - 1;
+ 
+ 			/*
+ 			 * This needs to be WAL-logged. Although the now unused shouldn't
+ 			 * be accessed anymore, they better be zero if we extend again.
+ 			 */
+ 
+ 			MarkBufferDirty(mapBuffer);
+ 			UnlockReleaseBuffer(mapBuffer);
+ 		}
+ 	}
+ }
+ 
+ void
+ visibilitymap_extend(Relation rel, BlockNumber nheapblocks)
+ {
+ 	BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(nheapblocks);
+ 	BlockNumber size;
+ 
+ #ifdef TRACE_VISIBILITYMAP
+ 	elog(LOG, "vm_extend %s %d", RelationGetRelationName(rel), nheapblocks);
+ #endif
+ 
+ 	Assert(nheapblocks > 0);
+ 
+ 	/* Open it at the smgr level if not already done */
+ 	RelationOpenSmgr(rel);
+ 
+ 	size = smgrnblocks(rel->rd_smgr, VISIBILITYMAP_FORKNUM);
+ 	for(; size < mapBlock + 1; size++)
+ 	{
+ 		Buffer mapBuffer = ReadVMBuffer(rel, P_NEW);
+ 
+ 		LockBuffer(mapBuffer, BUFFER_LOCK_EXCLUSIVE);
+ 		PageInit(BufferGetPage(mapBuffer), BLCKSZ, 0);
+ 		MarkBufferDirty(mapBuffer);
+ 		UnlockReleaseBuffer(mapBuffer);
+ 	}
+ }
+ 
+ /*
+  * Marks that all tuples on a heap page are visible to all.
+  *
+  * *buf is a buffer, previously returned by visibilitymap_test(). This is
+  * an opportunistic function; if *buf doesn't contain the bit for heapBlk,
+  * we do nothing. We don't want to do any I/O, because the caller is holding
+  * a cleanup lock on the heap page.
+  */
+ void
+ visibilitymap_set_opt(Relation rel, BlockNumber heapBlk, XLogRecPtr recptr,
+ 					  Buffer *buf)
+ {
+ 	BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
+ 	uint32		mapByte = HEAPBLK_TO_MAPBYTE(heapBlk);
+ 	uint8		mapBit = HEAPBLK_TO_MAPBIT(heapBlk);
+ 	Page		page;
+ 	char	   *mappage;
+ 
+ #ifdef TRACE_VISIBILITYMAP
+ 	elog(WARNING, "vm_set %s %d", RelationGetRelationName(rel), heapBlk);
+ #endif
+ 
+ 	if (!BufferIsValid(*buf) || BufferGetBlockNumber(*buf) != mapBlock)
+ 		return;
+ 
+ 	page = BufferGetPage(*buf);
+ 	mappage = PageGetContents(page);
+ 	LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);
+ 
+ 	if (!(mappage[mapByte] & (1 << mapBit)))
+ 	{
+ 		mappage[mapByte] |= (1 << mapBit);
+ 
+ 		if (XLByteLT(PageGetLSN(page), recptr))
+ 			PageSetLSN(page, recptr);
+ 		PageSetTLI(page, ThisTimeLineID);
+ 		MarkBufferDirty(*buf);
+ 	}
+ 
+ 	LockBuffer(*buf, BUFFER_LOCK_UNLOCK);
+ }
+ 
+ /*
+  * Are all tuples on heap page visible to all?
+  *
+  * The page containing the bit for the heap block is (kept) pinned,
+  * and *buf is set to that buffer. If *buf is valid on entry, it should
+  * be a buffer previously returned by this function, for the same relation,
+  * and unless the new heap block is on the same page, it is released.
+  */
+ bool
+ visibilitymap_test(Relation rel, BlockNumber heapBlk, Buffer *buf)
+ {
+ 	BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
+ 	uint32		mapByte = HEAPBLK_TO_MAPBYTE(heapBlk);
+ 	uint8		mapBit = HEAPBLK_TO_MAPBIT(heapBlk);
+ 	bool		val;
+ 	char	   *mappage;
+ 
+ #ifdef TRACE_VISIBILITYMAP
+ 	elog(WARNING, "vm_test %s %d", RelationGetRelationName(rel), heapBlk);
+ #endif
+ 
+ 	*buf = ReleaseAndReadVMBuffer(rel, mapBlock, *buf);
+ 	if (!BufferIsValid(*buf))
+ 		return false;
+ 
+ 	/* XXX: Can we get away without locking? */
+ 	LockBuffer(*buf, BUFFER_LOCK_SHARE);
+ 
+ 	mappage = PageGetContents(BufferGetPage(*buf));
+ 
+ 	val = (mappage[mapByte] & (1 << mapBit)) ? true : false;
+ 
+ 	LockBuffer(*buf, BUFFER_LOCK_UNLOCK);
+ 
+ 	return val;
+ }
+ 
+ /*
+  * Mark that not all tuples are visible to all.
+  */
+ void
+ visibilitymap_clear(Relation rel, BlockNumber heapBlk)
+ {
+ 	BlockNumber mapBlock = HEAPBLK_TO_MAPBLOCK(heapBlk);
+ 	uint32		mapByte = HEAPBLK_TO_MAPBYTE(heapBlk);
+ 	uint8		mapBit = HEAPBLK_TO_MAPBIT(heapBlk);
+ 	Buffer		mapBuffer;
+ 	char	   *mappage;
+ 
+ #ifdef TRACE_VISIBILITYMAP
+ 	elog(WARNING, "vm_clear %s %d", RelationGetRelationName(rel), heapBlk);
+ #endif
+ 
+ 	mapBuffer = ReadVMBuffer(rel, mapBlock);
+ 	if (!BufferIsValid(mapBuffer))
+ 		return; /* nothing to do */
+ 
+ 	/* XXX: Can we get away without locking here?
+ 	 *
+ 	 * We mustn't re-set a bit that was just cleared, so it doesn't seem
+ 	 * safe. Clearing the bit is really "load; and; store", so without
+ 	 * the lock we might store back a bit that's just being cleared
+ 	 * by a concurrent updater.
+ 	 *
+ 	 * We could use the buffer header spinlock here, but the API to do
+ 	 * that is intended to be internal to buffer manager. We'd still need
+ 	 * to get a shared lock to mark the buffer as dirty, though.
+ 	 */
+ 	LockBuffer(mapBuffer, BUFFER_LOCK_EXCLUSIVE);
+ 
+ 	mappage = PageGetContents(BufferGetPage(mapBuffer));
+ 
+ 	if (mappage[mapByte] & (1 << mapBit))
+ 	{
+ 		mappage[mapByte] &= ~(1 << mapBit);
+ 
+ 		MarkBufferDirty(mapBuffer);
+ 	}
+ 
+ 	LockBuffer(mapBuffer, BUFFER_LOCK_UNLOCK);
+ 	ReleaseBuffer(mapBuffer);
+ }
*** src/backend/access/transam/xlogutils.c
--- src/backend/access/transam/xlogutils.c
***************
*** 360,365 **** CreateFakeRelcacheEntry(RelFileNode rnode)
--- 360,366 ----
  
  	rel->rd_targblock = InvalidBlockNumber;
  	rel->rd_fsm_nblocks_cache = InvalidBlockNumber;
+ 	rel->rd_vm_nblocks_cache = InvalidBlockNumber;
  	rel->rd_smgr = NULL;
  
  	return rel;
*** src/backend/catalog/heap.c
--- src/backend/catalog/heap.c
***************
*** 33,38 ****
--- 33,39 ----
  #include "access/heapam.h"
  #include "access/sysattr.h"
  #include "access/transam.h"
+ #include "access/visibilitymap.h"
  #include "access/xact.h"
  #include "catalog/catalog.h"
  #include "catalog/dependency.h"
***************
*** 306,316 **** heap_create(const char *relname,
  		smgrcreate(rel->rd_smgr, MAIN_FORKNUM, rel->rd_istemp, false);
  
  		/*
! 		 * For a real heap, create FSM fork as well. Indexams are
! 		 * responsible for creating any extra forks themselves.
  		 */
  		if (relkind == RELKIND_RELATION || relkind == RELKIND_TOASTVALUE)
  			smgrcreate(rel->rd_smgr, FSM_FORKNUM, rel->rd_istemp, false);
  	}
  
  	return rel;
--- 307,320 ----
  		smgrcreate(rel->rd_smgr, MAIN_FORKNUM, rel->rd_istemp, false);
  
  		/*
! 		 * For a real heap, create FSM and visibility map as well. Indexams
! 		 * are responsible for creating any extra forks themselves.
  		 */
  		if (relkind == RELKIND_RELATION || relkind == RELKIND_TOASTVALUE)
+ 		{
  			smgrcreate(rel->rd_smgr, FSM_FORKNUM, rel->rd_istemp, false);
+ 			smgrcreate(rel->rd_smgr, VISIBILITYMAP_FORKNUM, rel->rd_istemp, false);
+ 		}
  	}
  
  	return rel;
***************
*** 2324,2329 **** heap_truncate(List *relids)
--- 2328,2334 ----
  
  		/* Truncate the FSM and actual file (and discard buffers) */
  		FreeSpaceMapTruncateRel(rel, 0);
+ 		visibilitymap_truncate(rel, 0);
  		RelationTruncate(rel, 0);
  
  		/* If this relation has indexes, truncate the indexes too */
*** src/backend/catalog/index.c
--- src/backend/catalog/index.c
***************
*** 1343,1354 **** setNewRelfilenode(Relation relation, TransactionId freezeXid)
  	smgrcreate(srel, MAIN_FORKNUM, relation->rd_istemp, false);
  
  	/*
! 	 * For a heap, create FSM fork as well. Indexams are responsible for
! 	 * creating any extra forks themselves.
  	 */
  	if (relation->rd_rel->relkind == RELKIND_RELATION ||
  		relation->rd_rel->relkind == RELKIND_TOASTVALUE)
  		smgrcreate(srel, FSM_FORKNUM, relation->rd_istemp, false);
  
  	/* schedule unlinking old files */
  	for (i = 0; i <= MAX_FORKNUM; i++)
--- 1343,1357 ----
  	smgrcreate(srel, MAIN_FORKNUM, relation->rd_istemp, false);
  
  	/*
! 	 * For a heap, create FSM and visibility map as well. Indexams are
! 	 * responsible for creating any extra forks themselves.
  	 */
  	if (relation->rd_rel->relkind == RELKIND_RELATION ||
  		relation->rd_rel->relkind == RELKIND_TOASTVALUE)
+ 	{
  		smgrcreate(srel, FSM_FORKNUM, relation->rd_istemp, false);
+ 		smgrcreate(srel, VISIBILITYMAP_FORKNUM, relation->rd_istemp, false);
+ 	}
  
  	/* schedule unlinking old files */
  	for (i = 0; i <= MAX_FORKNUM; i++)
*** src/backend/commands/vacuum.c
--- src/backend/commands/vacuum.c
***************
*** 26,31 ****
--- 26,32 ----
  #include "access/genam.h"
  #include "access/heapam.h"
  #include "access/transam.h"
+ #include "access/visibilitymap.h"
  #include "access/xact.h"
  #include "access/xlog.h"
  #include "catalog/namespace.h"
***************
*** 1327,1332 **** scan_heap(VRelStats *vacrelstats, Relation onerel,
--- 1328,1336 ----
  
  	nblocks = RelationGetNumberOfBlocks(onerel);
  
+ 	if (nblocks > 0)
+ 		visibilitymap_extend(onerel, nblocks);
+ 
  	/*
  	 * We initially create each VacPage item in a maximal-sized workspace,
  	 * then copy the workspace into a just-large-enough copy.
***************
*** 2822,2828 **** repair_frag(VRelStats *vacrelstats, Relation onerel,
  				recptr = log_heap_clean(onerel, buf,
  										NULL, 0, NULL, 0,
  										unused, uncnt,
! 										false);
  				PageSetLSN(page, recptr);
  				PageSetTLI(page, ThisTimeLineID);
  			}
--- 2826,2832 ----
  				recptr = log_heap_clean(onerel, buf,
  										NULL, 0, NULL, 0,
  										unused, uncnt,
! 										false, false);
  				PageSetLSN(page, recptr);
  				PageSetTLI(page, ThisTimeLineID);
  			}
***************
*** 2843,2848 **** repair_frag(VRelStats *vacrelstats, Relation onerel,
--- 2847,2853 ----
  	if (blkno < nblocks)
  	{
  		FreeSpaceMapTruncateRel(onerel, blkno);
+ 		visibilitymap_truncate(onerel, blkno);
  		RelationTruncate(onerel, blkno);
  		vacrelstats->rel_pages = blkno; /* set new number of blocks */
  	}
***************
*** 2881,2886 **** move_chain_tuple(Relation rel,
--- 2886,2899 ----
  	Size		tuple_len = old_tup->t_len;
  
  	/*
+ 	 * we don't need to bother with the usual locking protocol for updating
+ 	 * the visibility map, since we're holding an AccessExclusiveLock on the
+ 	 * relation anyway.
+ 	 */
+ 	visibilitymap_clear(rel, BufferGetBlockNumber(old_buf));
+ 	visibilitymap_clear(rel, BufferGetBlockNumber(dst_buf));
+ 
+ 	/*
  	 * make a modifiable copy of the source tuple.
  	 */
  	heap_copytuple_with_tuple(old_tup, &newtup);
***************
*** 3020,3025 **** move_plain_tuple(Relation rel,
--- 3033,3046 ----
  	ItemId		newitemid;
  	Size		tuple_len = old_tup->t_len;
  
+ 	/*
+ 	 * we don't need to bother with the usual locking protocol for updating
+ 	 * the visibility map, since we're holding an AccessExclusiveLock on the
+ 	 * relation anyway.
+ 	 */
+ 	visibilitymap_clear(rel, BufferGetBlockNumber(old_buf));
+ 	visibilitymap_clear(rel, BufferGetBlockNumber(dst_buf));
+ 
  	/* copy tuple */
  	heap_copytuple_with_tuple(old_tup, &newtup);
  
***************
*** 3238,3243 **** vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
--- 3259,3265 ----
  						RelationGetRelationName(onerel),
  						vacrelstats->rel_pages, relblocks)));
  		FreeSpaceMapTruncateRel(onerel, relblocks);
+ 		visibilitymap_truncate(onerel, relblocks);
  		RelationTruncate(onerel, relblocks);
  		vacrelstats->rel_pages = relblocks;		/* set new number of blocks */
  	}
***************
*** 3279,3285 **** vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
  		recptr = log_heap_clean(onerel, buffer,
  								NULL, 0, NULL, 0,
  								vacpage->offsets, vacpage->offsets_free,
! 								false);
  		PageSetLSN(page, recptr);
  		PageSetTLI(page, ThisTimeLineID);
  	}
--- 3301,3307 ----
  		recptr = log_heap_clean(onerel, buffer,
  								NULL, 0, NULL, 0,
  								vacpage->offsets, vacpage->offsets_free,
! 								false, false);
  		PageSetLSN(page, recptr);
  		PageSetTLI(page, ThisTimeLineID);
  	}
*** src/backend/commands/vacuumlazy.c
--- src/backend/commands/vacuumlazy.c
***************
*** 40,45 ****
--- 40,46 ----
  #include "access/genam.h"
  #include "access/heapam.h"
  #include "access/transam.h"
+ #include "access/visibilitymap.h"
  #include "commands/dbcommands.h"
  #include "commands/vacuum.h"
  #include "miscadmin.h"
***************
*** 87,92 **** typedef struct LVRelStats
--- 88,94 ----
  	int			max_dead_tuples;	/* # slots allocated in array */
  	ItemPointer dead_tuples;	/* array of ItemPointerData */
  	int			num_index_scans;
+ 	bool		scanned_all;	/* have we scanned all pages (this far) in the rel? */
  } LVRelStats;
  
  
***************
*** 101,111 **** static BufferAccessStrategy vac_strategy;
  
  /* non-export function prototypes */
  static void lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
! 			   Relation *Irel, int nindexes);
  static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
  static void lazy_vacuum_index(Relation indrel,
  				  IndexBulkDeleteResult **stats,
  				  LVRelStats *vacrelstats);
  static void lazy_cleanup_index(Relation indrel,
  				   IndexBulkDeleteResult *stats,
  				   LVRelStats *vacrelstats);
--- 103,114 ----
  
  /* non-export function prototypes */
  static void lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
! 			   Relation *Irel, int nindexes, bool scan_all);
  static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
  static void lazy_vacuum_index(Relation indrel,
  				  IndexBulkDeleteResult **stats,
  				  LVRelStats *vacrelstats);
+ 
  static void lazy_cleanup_index(Relation indrel,
  				   IndexBulkDeleteResult *stats,
  				   LVRelStats *vacrelstats);
***************
*** 140,145 **** lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt,
--- 143,149 ----
  	BlockNumber possibly_freeable;
  	PGRUsage	ru0;
  	TimestampTz starttime = 0;
+ 	bool		scan_all;
  
  	pg_rusage_init(&ru0);
  
***************
*** 165,172 **** lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt,
  	vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &Irel);
  	vacrelstats->hasindex = (nindexes > 0);
  
  	/* Do the vacuuming */
! 	lazy_scan_heap(onerel, vacrelstats, Irel, nindexes);
  
  	/* Done with indexes */
  	vac_close_indexes(nindexes, Irel, NoLock);
--- 169,187 ----
  	vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &Irel);
  	vacrelstats->hasindex = (nindexes > 0);
  
+ 	/* Should we use the visibility map or scan all pages? */
+ 	if (vacstmt->freeze_min_age != -1)
+ 		scan_all = true;
+ 	else if (vacstmt->analyze)
+ 		scan_all = true;
+ 	else
+ 		scan_all = false;
+  
+ 	/* initialize this variable */
+ 	vacrelstats->scanned_all = true;
+  
  	/* Do the vacuuming */
! 	lazy_scan_heap(onerel, vacrelstats, Irel, nindexes, scan_all);
  
  	/* Done with indexes */
  	vac_close_indexes(nindexes, Irel, NoLock);
***************
*** 231,237 **** lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt,
   */
  static void
  lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
! 			   Relation *Irel, int nindexes)
  {
  	BlockNumber nblocks,
  				blkno;
--- 246,252 ----
   */
  static void
  lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
! 			   Relation *Irel, int nindexes, bool scan_all)
  {
  	BlockNumber nblocks,
  				blkno;
***************
*** 246,251 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 261,267 ----
  	IndexBulkDeleteResult **indstats;
  	int			i;
  	PGRUsage	ru0;
+ 	Buffer		vmbuffer = InvalidBuffer;
  
  	pg_rusage_init(&ru0);
  
***************
*** 267,272 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 283,291 ----
  
  	lazy_space_alloc(vacrelstats, nblocks);
  
+ 	if (nblocks > 0)
+ 		visibilitymap_extend(onerel, nblocks);
+ 
  	for (blkno = 0; blkno < nblocks; blkno++)
  	{
  		Buffer		buf;
***************
*** 279,284 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 298,320 ----
  		OffsetNumber frozen[MaxOffsetNumber];
  		int			nfrozen;
  		Size		freespace;
+ 		bool		all_visible_according_to_vm;
+ 
+ 		/*
+ 		 * If all tuples on page are visible to all, there's no
+ 		 * need to visit that page.
+ 		 *
+ 		 * Note that we test the visibility map even if we're scanning all
+ 		 * pages, to pin the visibility map page. We might set the bit there,
+ 		 * and we don't want to do the I/O while we're holding the heap page
+ 		 * locked.
+ 		 */
+ 		all_visible_according_to_vm = visibilitymap_test(onerel, blkno, &vmbuffer);
+ 		if (!scan_all && all_visible_according_to_vm)
+ 		{
+ 			vacrelstats->scanned_all = false;
+ 			continue;
+ 		}
  
  		vacuum_delay_point();
  
***************
*** 525,530 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 561,570 ----
  
  		freespace = PageGetHeapFreeSpace(page);
  
+ 		/* Update the visibility map */
+ 		if (PageIsAllVisible(page))
+ 			visibilitymap_set_opt(onerel, blkno, PageGetLSN(page), &vmbuffer);
+ 
  		/* Remember the location of the last page with nonremovable tuples */
  		if (hastup)
  			vacrelstats->nonempty_pages = blkno + 1;
***************
*** 560,565 **** lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
--- 600,611 ----
  		vacrelstats->num_index_scans++;
  	}
  
+ 	if (BufferIsValid(vmbuffer))
+ 	{
+ 		ReleaseBuffer(vmbuffer);
+ 		vmbuffer = InvalidBuffer;
+ 	}
+ 
  	/* Do post-vacuum cleanup and statistics update for each index */
  	for (i = 0; i < nindexes; i++)
  		lazy_cleanup_index(Irel[i], indstats[i], vacrelstats);
***************
*** 622,627 **** lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
--- 668,682 ----
  		LockBufferForCleanup(buf);
  		tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, vacrelstats);
  
+ 		/*
+ 		 * Before we let the page go, prune it. The primary reason is to
+ 		 * update the visibility map in the common special case that we just
+ 		 * vacuumed away the last tuple on the page that wasn't visible to
+ 		 * everyone.
+ 		 */
+ 		vacrelstats->tuples_deleted +=
+ 			heap_page_prune(onerel, buf, OldestXmin, false, false);
+ 
  		/* Now that we've compacted the page, record its available space */
  		page = BufferGetPage(buf);
  		freespace = PageGetHeapFreeSpace(page);
***************
*** 686,692 **** lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
  		recptr = log_heap_clean(onerel, buffer,
  								NULL, 0, NULL, 0,
  								unused, uncnt,
! 								false);
  		PageSetLSN(page, recptr);
  		PageSetTLI(page, ThisTimeLineID);
  	}
--- 741,747 ----
  		recptr = log_heap_clean(onerel, buffer,
  								NULL, 0, NULL, 0,
  								unused, uncnt,
! 								false, false);
  		PageSetLSN(page, recptr);
  		PageSetTLI(page, ThisTimeLineID);
  	}
***************
*** 829,834 **** lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats)
--- 884,890 ----
  	 * Okay to truncate.
  	 */
  	FreeSpaceMapTruncateRel(onerel, new_rel_pages);
+ 	visibilitymap_truncate(onerel, new_rel_pages);
  	RelationTruncate(onerel, new_rel_pages);
  
  	/*
*** src/backend/utils/cache/relcache.c
--- src/backend/utils/cache/relcache.c
***************
*** 305,310 **** AllocateRelationDesc(Relation relation, Form_pg_class relp)
--- 305,311 ----
  	MemSet(relation, 0, sizeof(RelationData));
  	relation->rd_targblock = InvalidBlockNumber;
  	relation->rd_fsm_nblocks_cache = InvalidBlockNumber;
+ 	relation->rd_vm_nblocks_cache = InvalidBlockNumber;
  
  	/* make sure relation is marked as having no open file yet */
  	relation->rd_smgr = NULL;
***************
*** 1366,1371 **** formrdesc(const char *relationName, Oid relationReltype,
--- 1367,1373 ----
  	relation = (Relation) palloc0(sizeof(RelationData));
  	relation->rd_targblock = InvalidBlockNumber;
  	relation->rd_fsm_nblocks_cache = InvalidBlockNumber;
+ 	relation->rd_vm_nblocks_cache = InvalidBlockNumber;
  
  	/* make sure relation is marked as having no open file yet */
  	relation->rd_smgr = NULL;
***************
*** 1654,1662 **** RelationReloadIndexInfo(Relation relation)
  	heap_freetuple(pg_class_tuple);
  	/* We must recalculate physical address in case it changed */
  	RelationInitPhysicalAddr(relation);
! 	/* Must reset targblock and fsm_nblocks_cache in case rel was truncated */
  	relation->rd_targblock = InvalidBlockNumber;
  	relation->rd_fsm_nblocks_cache = InvalidBlockNumber;
  	/* Must free any AM cached data, too */
  	if (relation->rd_amcache)
  		pfree(relation->rd_amcache);
--- 1656,1665 ----
  	heap_freetuple(pg_class_tuple);
  	/* We must recalculate physical address in case it changed */
  	RelationInitPhysicalAddr(relation);
! 	/* Must reset targblock and fsm_nblocks_cache and vm_nblocks_cache in case rel was truncated */
  	relation->rd_targblock = InvalidBlockNumber;
  	relation->rd_fsm_nblocks_cache = InvalidBlockNumber;
+ 	relation->rd_vm_nblocks_cache = InvalidBlockNumber;
  	/* Must free any AM cached data, too */
  	if (relation->rd_amcache)
  		pfree(relation->rd_amcache);
***************
*** 1740,1745 **** RelationClearRelation(Relation relation, bool rebuild)
--- 1743,1749 ----
  	{
  		relation->rd_targblock = InvalidBlockNumber;
  		relation->rd_fsm_nblocks_cache = InvalidBlockNumber;
+ 		relation->rd_vm_nblocks_cache = InvalidBlockNumber;
  		if (relation->rd_rel->relkind == RELKIND_INDEX)
  		{
  			relation->rd_isvalid = false;		/* needs to be revalidated */
***************
*** 2335,2340 **** RelationBuildLocalRelation(const char *relname,
--- 2339,2345 ----
  
  	rel->rd_targblock = InvalidBlockNumber;
  	rel->rd_fsm_nblocks_cache = InvalidBlockNumber;
+ 	rel->rd_vm_nblocks_cache = InvalidBlockNumber;
  
  	/* make sure relation is marked as having no open file yet */
  	rel->rd_smgr = NULL;
***************
*** 3592,3597 **** load_relcache_init_file(void)
--- 3597,3603 ----
  		rel->rd_smgr = NULL;
  		rel->rd_targblock = InvalidBlockNumber;
  		rel->rd_fsm_nblocks_cache = InvalidBlockNumber;
+ 		rel->rd_vm_nblocks_cache = InvalidBlockNumber;
  		if (rel->rd_isnailed)
  			rel->rd_refcnt = 1;
  		else
*** src/include/access/heapam.h
--- src/include/access/heapam.h
***************
*** 125,131 **** extern XLogRecPtr log_heap_clean(Relation reln, Buffer buffer,
  			   OffsetNumber *redirected, int nredirected,
  			   OffsetNumber *nowdead, int ndead,
  			   OffsetNumber *nowunused, int nunused,
! 			   bool redirect_move);
  extern XLogRecPtr log_heap_freeze(Relation reln, Buffer buffer,
  				TransactionId cutoff_xid,
  				OffsetNumber *offsets, int offcnt);
--- 125,131 ----
  			   OffsetNumber *redirected, int nredirected,
  			   OffsetNumber *nowdead, int ndead,
  			   OffsetNumber *nowunused, int nunused,
! 			   bool redirect_move, bool all_visible);
  extern XLogRecPtr log_heap_freeze(Relation reln, Buffer buffer,
  				TransactionId cutoff_xid,
  				OffsetNumber *offsets, int offcnt);
***************
*** 142,148 **** extern void heap_page_prune_execute(Buffer buffer,
  						OffsetNumber *redirected, int nredirected,
  						OffsetNumber *nowdead, int ndead,
  						OffsetNumber *nowunused, int nunused,
! 						bool redirect_move);
  extern void heap_get_root_tuples(Page page, OffsetNumber *root_offsets);
  
  /* in heap/syncscan.c */
--- 142,148 ----
  						OffsetNumber *redirected, int nredirected,
  						OffsetNumber *nowdead, int ndead,
  						OffsetNumber *nowunused, int nunused,
! 						bool redirect_move, bool all_visible);
  extern void heap_get_root_tuples(Page page, OffsetNumber *root_offsets);
  
  /* in heap/syncscan.c */
*** src/include/access/htup.h
--- src/include/access/htup.h
***************
*** 595,600 **** typedef struct xl_heaptid
--- 595,601 ----
  typedef struct xl_heap_delete
  {
  	xl_heaptid	target;			/* deleted tuple id */
+ 	bool all_visible_cleared;	/* PD_ALL_VISIBLE was cleared */
  } xl_heap_delete;
  
  #define SizeOfHeapDelete	(offsetof(xl_heap_delete, target) + SizeOfHeapTid)
***************
*** 620,635 **** typedef struct xl_heap_header
  typedef struct xl_heap_insert
  {
  	xl_heaptid	target;			/* inserted tuple id */
  	/* xl_heap_header & TUPLE DATA FOLLOWS AT END OF STRUCT */
  } xl_heap_insert;
  
! #define SizeOfHeapInsert	(offsetof(xl_heap_insert, target) + SizeOfHeapTid)
  
  /* This is what we need to know about update|move|hot_update */
  typedef struct xl_heap_update
  {
  	xl_heaptid	target;			/* deleted tuple id */
  	ItemPointerData newtid;		/* new inserted tuple id */
  	/* NEW TUPLE xl_heap_header (PLUS xmax & xmin IF MOVE OP) */
  	/* and TUPLE DATA FOLLOWS AT END OF STRUCT */
  } xl_heap_update;
--- 621,639 ----
  typedef struct xl_heap_insert
  {
  	xl_heaptid	target;			/* inserted tuple id */
+ 	bool all_visible_cleared;	/* PD_ALL_VISIBLE was cleared */
  	/* xl_heap_header & TUPLE DATA FOLLOWS AT END OF STRUCT */
  } xl_heap_insert;
  
! #define SizeOfHeapInsert	(offsetof(xl_heap_insert, all_visible_cleared) + sizeof(bool))
  
  /* This is what we need to know about update|move|hot_update */
  typedef struct xl_heap_update
  {
  	xl_heaptid	target;			/* deleted tuple id */
  	ItemPointerData newtid;		/* new inserted tuple id */
+ 	bool all_visible_cleared;	/* PD_ALL_VISIBLE was cleared */
+ 	bool new_all_visible_cleared; /* same for the page of newtid */
  	/* NEW TUPLE xl_heap_header (PLUS xmax & xmin IF MOVE OP) */
  	/* and TUPLE DATA FOLLOWS AT END OF STRUCT */
  } xl_heap_update;
***************
*** 660,665 **** typedef struct xl_heap_clean
--- 664,670 ----
  	BlockNumber block;
  	uint16		nredirected;
  	uint16		ndead;
+ 	bool		all_visible_set;
  	/* OFFSET NUMBERS FOLLOW */
  } xl_heap_clean;
  
*** /dev/null
--- src/include/access/visibilitymap.h
***************
*** 0 ****
--- 1,28 ----
+ /*-------------------------------------------------------------------------
+  *
+  * visibilitymap.h
+  *      visibility map interface
+  *
+  *
+  * Portions Copyright (c) 2007, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  * $PostgreSQL$
+  *
+  *-------------------------------------------------------------------------
+  */
+ #ifndef VISIBILITYMAP_H
+ #define VISIBILITYMAP_H
+ 
+ #include "utils/rel.h"
+ #include "storage/buf.h"
+ #include "storage/itemptr.h"
+ #include "access/xlogdefs.h"
+ 
+ extern void visibilitymap_set_opt(Relation rel, BlockNumber heapBlk, XLogRecPtr recptr, Buffer *vmbuf);
+ extern void visibilitymap_clear(Relation rel, BlockNumber heapBlk);
+ extern bool visibilitymap_test(Relation rel, BlockNumber heapBlk, Buffer *vmbuf);
+ extern void visibilitymap_extend(Relation rel, BlockNumber heapblk);
+ extern void visibilitymap_truncate(Relation rel, BlockNumber heapblk);
+ 
+ #endif   /* VISIBILITYMAP_H */
*** src/include/storage/bufpage.h
--- src/include/storage/bufpage.h
***************
*** 152,159 **** typedef PageHeaderData *PageHeader;
  #define PD_HAS_FREE_LINES	0x0001		/* are there any unused line pointers? */
  #define PD_PAGE_FULL		0x0002		/* not enough free space for new
  										 * tuple? */
  
! #define PD_VALID_FLAG_BITS	0x0003		/* OR of all valid pd_flags bits */
  
  /*
   * Page layout version number 0 is for pre-7.3 Postgres releases.
--- 152,161 ----
  #define PD_HAS_FREE_LINES	0x0001		/* are there any unused line pointers? */
  #define PD_PAGE_FULL		0x0002		/* not enough free space for new
  										 * tuple? */
+ #define PD_ALL_VISIBLE		0x0004		/* all tuples on page are visible to
+ 										 * everyone */
  
! #define PD_VALID_FLAG_BITS	0x0007		/* OR of all valid pd_flags bits */
  
  /*
   * Page layout version number 0 is for pre-7.3 Postgres releases.
***************
*** 336,341 **** typedef PageHeaderData *PageHeader;
--- 338,350 ----
  #define PageClearFull(page) \
  	(((PageHeader) (page))->pd_flags &= ~PD_PAGE_FULL)
  
+ #define PageIsAllVisible(page) \
+ 	(((PageHeader) (page))->pd_flags & PD_ALL_VISIBLE)
+ #define PageSetAllVisible(page) \
+ 	(((PageHeader) (page))->pd_flags |= PD_ALL_VISIBLE)
+ #define PageClearAllVisible(page) \
+ 	(((PageHeader) (page))->pd_flags &= ~PD_ALL_VISIBLE)
+ 
  #define PageIsPrunable(page, oldestxmin) \
  ( \
  	AssertMacro(TransactionIdIsNormal(oldestxmin)), \
*** src/include/storage/relfilenode.h
--- src/include/storage/relfilenode.h
***************
*** 24,37 **** typedef enum ForkNumber
  {
  	InvalidForkNumber = -1,
  	MAIN_FORKNUM = 0,
! 	FSM_FORKNUM
  	/*
  	 * NOTE: if you add a new fork, change MAX_FORKNUM below and update the
  	 * forkNames array in catalog.c
  	 */
  } ForkNumber;
  
! #define MAX_FORKNUM		FSM_FORKNUM
  
  /*
   * RelFileNode must provide all that we need to know to physically access
--- 24,38 ----
  {
  	InvalidForkNumber = -1,
  	MAIN_FORKNUM = 0,
! 	FSM_FORKNUM,
  	/*
  	 * NOTE: if you add a new fork, change MAX_FORKNUM below and update the
  	 * forkNames array in catalog.c
  	 */
+ 	VISIBILITYMAP_FORKNUM
  } ForkNumber;
  
! #define MAX_FORKNUM		VISIBILITYMAP_FORKNUM
  
  /*
   * RelFileNode must provide all that we need to know to physically access
*** src/include/utils/rel.h
--- src/include/utils/rel.h
***************
*** 195,202 **** typedef struct RelationData
  	List	   *rd_indpred;		/* index predicate tree, if any */
  	void	   *rd_amcache;		/* available for use by index AM */
  
! 	/* Cached last-seen size of the FSM */
  	BlockNumber	rd_fsm_nblocks_cache;
  
  	/* use "struct" here to avoid needing to include pgstat.h: */
  	struct PgStat_TableStatus *pgstat_info;		/* statistics collection area */
--- 195,203 ----
  	List	   *rd_indpred;		/* index predicate tree, if any */
  	void	   *rd_amcache;		/* available for use by index AM */
  
! 	/* Cached last-seen size of the FSM and visibility map */
  	BlockNumber	rd_fsm_nblocks_cache;
+ 	BlockNumber	rd_vm_nblocks_cache;
  
  	/* use "struct" here to avoid needing to include pgstat.h: */
  	struct PgStat_TableStatus *pgstat_info;		/* statistics collection area */
