*** minmax.c.heikki	2014-08-20 19:06:27.000000000 -0400
--- src/backend/access/minmax/mmpageops.c	2014-08-20 17:10:55.000000000 -0400
***************
*** 1,8 ****
  /*
   * Update tuple origtup (size origsz), located in offset oldoff of buffer
   * oldbuf, to newtup (size newsz) as summary tuple for the page range starting
   * at heapBlk.  If samepage is true, then attempt to put the new tuple in the same
!  * page, otherwise get a new one.
   *
   * If the update is done, return true; the revmap is updated to point to the
   * new tuple.  If the update is not done for whatever reason, return false.
--- 1,37 ----
  /*
+  * mmpageops.c
+  *		Page-handling routines for Minmax indexes
+  *
+  * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+  * Portions Copyright (c) 1994, Regents of the University of California
+  *
+  * IDENTIFICATION
+  *	  src/backend/access/minmax/mmpageops.c
+  */
+ #include "postgres.h"
+ 
+ #include "access/minmax_pageops.h"
+ #include "access/minmax_page.h"
+ #include "access/minmax_revmap.h"
+ #include "access/minmax_xlog.h"
+ #include "miscadmin.h"
+ #include "storage/bufmgr.h"
+ #include "storage/freespace.h"
+ #include "storage/lmgr.h"
+ #include "storage/smgr.h"
+ #include "utils/rel.h"
+ 
+ 
+ static Buffer mm_getinsertbuffer(Relation irel, Buffer oldbuf, Size itemsz,
+ 				   bool *was_extended);
+ static Size mm_page_get_freespace(Page page);
+ 
+ 
+ /*
   * Update tuple origtup (size origsz), located in offset oldoff of buffer
   * oldbuf, to newtup (size newsz) as summary tuple for the page range starting
   * at heapBlk.  If samepage is true, then attempt to put the new tuple in the same
!  * page, otherwise use some other one.
   *
   * If the update is done, return true; the revmap is updated to point to the
   * new tuple.  If the update is not done for whatever reason, return false.
***************
*** 11,17 ****
   * If the index had to be extended in the course of this operation, *extended
   * is set to true.
   */
! static bool
  mm_doupdate(Relation idxrel, BlockNumber pagesPerRange,
  			mmRevmapAccess *rmAccess, BlockNumber heapBlk,
  			Buffer oldbuf, OffsetNumber oldoff,
--- 40,46 ----
   * If the index had to be extended in the course of this operation, *extended
   * is set to true.
   */
! bool
  mm_doupdate(Relation idxrel, BlockNumber pagesPerRange,
  			mmRevmapAccess *rmAccess, BlockNumber heapBlk,
  			Buffer oldbuf, OffsetNumber oldoff,
***************
*** 59,66 ****
  	oldsz = ItemIdGetLength(origlp);
  	oldtup = (MMTuple *) PageGetItem(oldpage, origlp);
  
! 	/* If both tuples are in fact equal, there is nothing to do */
! 	if (!minmax_tuples_equal(oldtup, oldsz, origtup, origsz))
  	{
  		LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
  		return false;
--- 88,99 ----
  	oldsz = ItemIdGetLength(origlp);
  	oldtup = (MMTuple *) PageGetItem(oldpage, origlp);
  
! 	/*
! 	 * If both tuples are identical, there is nothing to do; except that if we
! 	 * were requested to move the tuple across pages, we do it even if they are
! 	 * equal.
! 	 */
! 	if (samepage && minmax_tuples_equal(oldtup, oldsz, origtup, origsz))
  	{
  		LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
  		return false;
***************
*** 126,132 ****
  	{
  		/*
  		 * Not enough space, but caller said that there was. Tell them to
! 		 * start over
  		 */
  		LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
  		return false;
--- 159,165 ----
  	{
  		/*
  		 * Not enough space, but caller said that there was. Tell them to
! 		 * start over.
  		 */
  		LockBuffer(oldbuf, BUFFER_LOCK_UNLOCK);
  		return false;
***************
*** 222,231 ****
   * If the relation had to be extended to make room for the new index tuple,
   * *extended is set to true.
   */
! static void
  mm_doinsert(Relation idxrel, BlockNumber pagesPerRange,
! 			mmRevmapAccess *rmAccess, Buffer *buffer,
! 			BlockNumber heapBlk, MMTuple *tup, Size itemsz, bool *extended)
  {
  	Page		page;
  	BlockNumber blk;
--- 255,264 ----
   * If the relation had to be extended to make room for the new index tuple,
   * *extended is set to true.
   */
! void
  mm_doinsert(Relation idxrel, BlockNumber pagesPerRange,
! 			mmRevmapAccess *rmAccess, Buffer *buffer, BlockNumber heapBlk,
! 			MMTuple *tup, Size itemsz, bool *extended)
  {
  	Page		page;
  	BlockNumber blk;
***************
*** 248,273 ****
  	 */
  	if (BufferIsValid(*buffer))
  	{
- 		page = BufferGetPage(*buffer);
- 		LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
- 
  		/*
  		 * It's possible that another backend (or ourselves!) extended the
  		 * revmap over the page we held a pin on, so we cannot assume that
  		 * it's still a regular page.
  		 */
! 		if (mm_page_get_freespace(page) < itemsz)
  		{
  			UnlockReleaseBuffer(*buffer);
  			*buffer = InvalidBuffer;
  		}
  	}
  	if (!BufferIsValid(*buffer))
  	{
  		*buffer = mm_getinsertbuffer(idxrel, InvalidBuffer, itemsz, extended);
  		Assert(BufferIsValid(*buffer));
! 		page = BufferGetPage(*buffer);
! 		Assert(mm_page_get_freespace(page) >= itemsz);
  	}
  
  	page = BufferGetPage(*buffer);
--- 281,304 ----
  	 */
  	if (BufferIsValid(*buffer))
  	{
  		/*
  		 * It's possible that another backend (or ourselves!) extended the
  		 * revmap over the page we held a pin on, so we cannot assume that
  		 * it's still a regular page.
  		 */
! 		LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
! 		if (mm_page_get_freespace(BufferGetPage(*buffer)) < itemsz)
  		{
  			UnlockReleaseBuffer(*buffer);
  			*buffer = InvalidBuffer;
  		}
  	}
+ 
  	if (!BufferIsValid(*buffer))
  	{
  		*buffer = mm_getinsertbuffer(idxrel, InvalidBuffer, itemsz, extended);
  		Assert(BufferIsValid(*buffer));
! 		Assert(mm_page_get_freespace(BufferGetPage(*buffer)) >= itemsz);
  	}
  
  	page = BufferGetPage(*buffer);
***************
*** 327,336 ****
  }
  
  /*
!  * Checks if a regular minmax index page is empty.
   *
!  * If it's not, it's marked for "evacuation", meaning that no new tuples will
!  * be added to it.
   */
  bool
  mm_start_evacuating_page(Relation idxRel, Buffer buf)
--- 358,370 ----
  }
  
  /*
!  * Initiate page evacuation protocol.
   *
!  * The page must be locked in exclusive mode by the caller.
!  *
!  * If the page is not yet initialized or empty, return false without doing
!  * anything; it can be used for revmap without any further changes.  If it
!  * contains tuples, mark it for evacuation and return true.
   */
  bool
  mm_start_evacuating_page(Relation idxRel, Buffer buf)
***************
*** 355,361 ****
  		lp = PageGetItemId(page, off);
  		if (ItemIdIsUsed(lp))
  		{
! 			/* prevent other backends from adding more stuff to this page. */
  			special->flags |= MINMAX_EVACUATE_PAGE;
  			MarkBufferDirtyHint(buf, true);
  
--- 389,395 ----
  		lp = PageGetItemId(page, off);
  		if (ItemIdIsUsed(lp))
  		{
! 			/* prevent other backends from adding more stuff to this page */
  			special->flags |= MINMAX_EVACUATE_PAGE;
  			MarkBufferDirtyHint(buf, true);
  
***************
*** 368,387 ****
  /*
   * Move all tuples out of a page.
   *
!  * The caller must hold an exclusive lock on the page. The lock and pin are
!  * released.
   */
  void
! mm_evacuate_page(Relation idxRel, Buffer buf)
  {
  	OffsetNumber off;
  	OffsetNumber maxoff;
  	MinmaxSpecialSpace *special;
  	Page		page;
! 	mmRevmapAccess *rmAccess;
! 	BlockNumber pagesPerRange;
! 
! 	rmAccess = mmRevmapAccessInit(idxRel, &pagesPerRange);
  
  	page = BufferGetPage(buf);
  	special = (MinmaxSpecialSpace *) PageGetSpecialPointer(page);
--- 402,417 ----
  /*
   * Move all tuples out of a page.
   *
!  * The caller must hold lock on the page. The lock and pin are released.
   */
  void
! mm_evacuate_page(Relation idxRel, BlockNumber pagesPerRange, mmRevmapAccess *rmAccess, Buffer buf)
  {
  	OffsetNumber off;
  	OffsetNumber maxoff;
  	MinmaxSpecialSpace *special;
  	Page		page;
! 	bool		extended = false;
  
  	page = BufferGetPage(buf);
  	special = (MinmaxSpecialSpace *) PageGetSpecialPointer(page);
***************
*** 394,407 ****
  		MMTuple	   *tup;
  		Size		sz;
  		ItemId		lp;
! 		bool		extended = false;
  
  		lp = PageGetItemId(page, off);
  		if (ItemIdIsUsed(lp))
  		{
- 			tup = (MMTuple *) PageGetItem(page, lp);
  			sz = ItemIdGetLength(lp);
! 
  			tup = minmax_copy_tuple(tup, sz);
  
  			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
--- 424,437 ----
  		MMTuple	   *tup;
  		Size		sz;
  		ItemId		lp;
! 
! 		CHECK_FOR_INTERRUPTS();
  
  		lp = PageGetItemId(page, off);
  		if (ItemIdIsUsed(lp))
  		{
  			sz = ItemIdGetLength(lp);
! 			tup = (MMTuple *) PageGetItem(page, lp);
  			tup = minmax_copy_tuple(tup, sz);
  
  			LockBuffer(buf, BUFFER_LOCK_UNLOCK);
***************
*** 412,429 ****
  
  			LockBuffer(buf, BUFFER_LOCK_SHARE);
  
- 			if (extended)
- 				IndexFreeSpaceMapVacuum(idxRel);
- 
  			/* It's possible that someone extended the revmap over this page */
  			if (!MINMAX_IS_REGULAR_PAGE(page))
  				break;
  		}
  	}
  
- 	mmRevmapAccessTerminate(rmAccess);
- 
  	UnlockReleaseBuffer(buf);
  }
  
  /*
--- 442,457 ----
  
  			LockBuffer(buf, BUFFER_LOCK_SHARE);
  
  			/* It's possible that someone extended the revmap over this page */
  			if (!MINMAX_IS_REGULAR_PAGE(page))
  				break;
  		}
  	}
  
  	UnlockReleaseBuffer(buf);
+ 
+ 	if (extended)
+ 		FreeSpaceMapVacuum(idxRel);
  }
  
  /*
***************
*** 467,472 ****
--- 495,502 ----
  		Buffer		buf;
  		bool		extensionLockHeld = false;
  
+ 		CHECK_FOR_INTERRUPTS();
+ 
  		if (newblk == InvalidBlockNumber)
  		{
  			/*
***************
*** 498,503 ****
--- 528,539 ----
  			buf = ReadBuffer(irel, newblk);
  		}
  
+ 		/*
+ 		 * We lock the old buffer first, if it's earlier than the new one.
+ 		 * We also need to check that it hasn't been turned into a revmap
+ 		 * page concurrently; if we detect that it happened, give up and
+ 		 * tell caller to start over.
+ 		 */
  		if (BufferIsValid(oldbuf) && oldblk < newblk)
  		{
  			LockBuffer(oldbuf, BUFFER_LOCK_EXCLUSIVE);
***************
*** 520,529 ****
  			mm_page_init(page, MINMAX_PAGETYPE_REGULAR);
  
  		/*
! 		 * We have a new buffer from FSM now, and both pages are locked.
! 		 * Check that the new page has enough free space, and return it if it
! 		 * does; otherwise start over.  Note that we allow for the FSM to be
! 		 * out of date here, and in that case we update it and move on.
  		 *
  		 * (mm_page_get_freespace also checks that the FSM didn't hand us a
  		 * page that has since been repurposed for the revmap.)
--- 556,565 ----
  			mm_page_init(page, MINMAX_PAGETYPE_REGULAR);
  
  		/*
! 		 * We have a new buffer from FSM now.  Check that the new page has
! 		 * enough free space, and return it if it does; otherwise start over.
! 		 * Note that we allow for the FSM to be out of date here, and in that
! 		 * case we update it and move on.
  		 *
  		 * (mm_page_get_freespace also checks that the FSM didn't hand us a
  		 * page that has since been repurposed for the revmap.)
***************
*** 533,543 ****
  		{
  			if (extended)
  				*was_extended = true;
  			RelationSetTargetBlock(irel, BufferGetBlockNumber(buf));
  
! 			/* Lock the old buffer if not locked already */
! 			if (BufferIsValid(oldbuf) && newblk < oldblk)
  				LockBuffer(oldbuf, BUFFER_LOCK_EXCLUSIVE);
  
  			return buf;
  		}
--- 569,588 ----
  		{
  			if (extended)
  				*was_extended = true;
+ 
  			RelationSetTargetBlock(irel, BufferGetBlockNumber(buf));
  
! 			/*
! 			 * Lock the old buffer if not locked already.  Note that in this
! 			 * case we know for sure it's a regular page: it's later than the
! 			 * new page we just got, which is not a revmap page, and revmap
! 			 * pages are always consecutive.
! 			 */
! 			if (BufferIsValid(oldbuf) && oldblk > newblk)
! 			{
  				LockBuffer(oldbuf, BUFFER_LOCK_EXCLUSIVE);
+ 				Assert(MINMAX_IS_REGULAR_PAGE(BufferGetPage(oldbuf)));
+ 			}
  
  			return buf;
  		}
***************
*** 571,573 ****
--- 616,638 ----
  		newblk = RecordAndGetPageWithFreeSpace(irel, newblk, freespace, itemsz);
  	}
  }
+ 
+ /*
+  * Return the amount of free space on a regular minmax index page.
+  *
+  * If the page is not a regular page, or has been marked with the
+  * MINMAX_EVACUATE_PAGE flag, returns 0.
+  */
+ static Size
+ mm_page_get_freespace(Page page)
+ {
+ 	MinmaxSpecialSpace *special;
+ 
+ 	special = (MinmaxSpecialSpace *) PageGetSpecialPointer(page);
+ 	if (!MINMAX_IS_REGULAR_PAGE(page) ||
+ 		(special->flags & MINMAX_EVACUATE_PAGE) != 0)
+ 		return 0;
+ 	else
+ 		return PageGetFreeSpace(page);
+ 
+ }
