*** a/src/backend/access/nbtree/README
--- b/src/backend/access/nbtree/README
***************
*** 168,173 **** parent item does still exist and can't have been deleted.  Also, because
--- 168,203 ----
  we are matching downlink page numbers and not data keys, we don't have any
  problem with possibly misidentifying the parent item.
  
+ VACUUM needs to do a linear scan of an index to search for deleted pages
+ that can be reclaimed because they are older than all open transactions.
+ For efficiency's sake, we'd like to use the same linear scan to search for
+ deletable tuples.  Before Postgres 8.2, btbulkdelete scanned the leaf pages
+ in index order, but it is possible to visit them in physical order instead.
+ The tricky part of this is to avoid missing any deletable tuples in the
+ presence of concurrent page splits: a page split could easily move some
+ tuples from a page not yet passed over by the sequential scan to a
+ lower-numbered page already passed over.  (This wasn't a concern for the
+ index-order scan, because splits always split right.)  To implement this,
+ we provide a "vacuum cycle ID" mechanism that makes it possible to
+ determine whether a page has been split since the current btbulkdelete
+ cycle started.  If btbulkdelete finds a page that has been split since
+ it started, and has a right-link pointing to a lower page number, then
+ it temporarily suspends its sequential scan and visits that page instead.
+ It must continue to follow right-links and vacuum dead tuples until
+ reaching a page that either hasn't been split since btbulkdelete started,
+ or is above the location of the outer sequential scan.  Then it can resume
+ the sequential scan.  This ensures that all tuples are visited.  It may be
+ that some tuples are visited twice, but that has no worse effect than an
+ inaccurate index tuple count (and we can't guarantee an accurate count
+ anyway in the face of concurrent activity).  Note that this still works
+ if the has-been-recently-split test has a small probability of false
+ positives, so long as it never gives a false negative.  This makes it
+ possible to implement the test with a small counter value stored on each
+ index page.
+ 
+ Page Deletion Algorithm
+ -----------------------
+ 
  We consider deleting an entire page from the btree only when it's become
  completely empty of items.  (Merging partly-full pages would allow better
  space reuse, but it seems impractical to move existing data items left or
***************
*** 216,229 **** The notion of a half-dead page means that the key space relationship between
  the half-dead page's level and its parent's level may be a little out of
  whack: key space that appears to belong to the half-dead page's parent on the
  parent level may really belong to its right sibling.  To prevent any possible
! problems, we hold lock on the deleted child page until we have finished
! deleting any now-half-dead parent page(s).  This prevents any insertions into
! the transferred keyspace until the operation is complete.  The reason for
! doing this is that a sufficiently large number of insertions into the
! transferred keyspace, resulting in multiple page splits, could propagate keys
! from that keyspace into the parent level, resulting in transiently
! out-of-order keys in that level.  It is thought that that wouldn't cause any
! serious problem, but it seems too risky to allow.
  
  A deleted page cannot be reclaimed immediately, since there may be other
  processes waiting to reference it (ie, search processes that just left the
--- 246,262 ----
  the half-dead page's level and its parent's level may be a little out of
  whack: key space that appears to belong to the half-dead page's parent on the
  parent level may really belong to its right sibling.  To prevent any possible
! problems, we mark the half-dead page's right sibling, which now owns the
! keyspace of the deleted page, with a flag indicating that the left sibling
! is half-dead. Any insertion into that page will force the half-dead page to be
! deleted first, similar we require any insertions to an incompletely-split page
! to finish the split first.
! 
! The reason for doing this is that a sufficiently large number of insertions
! into the transferred keyspace, resulting in multiple page splits, could
! propagate keys from that keyspace into the parent level, resulting in
! transiently out-of-order keys in that level.  It is thought that that wouldn't
! cause any serious problem, but it seems too risky to allow.
  
  A deleted page cannot be reclaimed immediately, since there may be other
  processes waiting to reference it (ie, search processes that just left the
***************
*** 298,330 **** as part of the atomic update for the delete (either way, the metapage has
  to be the last page locked in the update to avoid deadlock risks).  This
  avoids race conditions if two such operations are executing concurrently.
  
- VACUUM needs to do a linear scan of an index to search for deleted pages
- that can be reclaimed because they are older than all open transactions.
- For efficiency's sake, we'd like to use the same linear scan to search for
- deletable tuples.  Before Postgres 8.2, btbulkdelete scanned the leaf pages
- in index order, but it is possible to visit them in physical order instead.
- The tricky part of this is to avoid missing any deletable tuples in the
- presence of concurrent page splits: a page split could easily move some
- tuples from a page not yet passed over by the sequential scan to a
- lower-numbered page already passed over.  (This wasn't a concern for the
- index-order scan, because splits always split right.)  To implement this,
- we provide a "vacuum cycle ID" mechanism that makes it possible to
- determine whether a page has been split since the current btbulkdelete
- cycle started.  If btbulkdelete finds a page that has been split since
- it started, and has a right-link pointing to a lower page number, then
- it temporarily suspends its sequential scan and visits that page instead.
- It must continue to follow right-links and vacuum dead tuples until
- reaching a page that either hasn't been split since btbulkdelete started,
- or is above the location of the outer sequential scan.  Then it can resume
- the sequential scan.  This ensures that all tuples are visited.  It may be
- that some tuples are visited twice, but that has no worse effect than an
- inaccurate index tuple count (and we can't guarantee an accurate count
- anyway in the face of concurrent activity).  Note that this still works
- if the has-been-recently-split test has a small probability of false
- positives, so long as it never gives a false negative.  This makes it
- possible to implement the test with a small counter value stored on each
- index page.
- 
  On-the-Fly Deletion Of Index Tuples
  -----------------------------------
  
--- 331,336 ----
***************
*** 384,395 **** an additional insertion above that, etc).
  For a root split, the followon WAL entry is a "new root" entry rather than
  an "insertion" entry, but details are otherwise much the same.
  
! Because insertion involves multiple atomic actions, the WAL replay logic
! has to detect the case where a page split isn't followed by a matching
! insertion on the parent level, and then do that insertion on its own (and
! recursively for any subsequent parent insertion, of course).  This is
! feasible because the WAL entry for the split contains enough info to know
! what must be inserted in the parent level.
  
  When splitting a non-root page that is alone on its level, the required
  metapage update (of the "fast root" link) is performed and logged as part
--- 390,430 ----
  For a root split, the followon WAL entry is a "new root" entry rather than
  an "insertion" entry, but details are otherwise much the same.
  
! Because splitting involves multiple atomic actions, it's possible that the
! system crashes between splitting a page and inserting the downlink for the
! new half to the parent. After recovery, the downlink for the new page will
! be missing. The search algorithm works correctly, as the page will be found
! by following the right-link from its left sibling, although if a lot of
! downlinks in the tree are missing, performance will suffer. A more serious
! consequence is that if the page without a downlink gets split again, the
! insertion algorithm will fail to find the location in the parent level to
! insert the downlink.
! 
! Our approach is to create any missing downlinks on-they-fly, when
! searching the tree for a new insertion. It could be done during searches,
! too, but it seems best not to put any extra updates in what would otherwise
! be a read-only operation (updating is not possible in hot standby mode
! anyway). To identify missing downlinks, when a page is split, the left page
! is flagged to indicate that the split is not yet complete (INCOMPLETE_SPLIT).
! When the downlink is inserted to the parent, the flag is cleared atomically
! with the insertion. The child page is kept locked until the insertion in the
! parent is finished and the flag in the child cleared, but can be released
! immediately after that, before recursing up the tree, if the parent also
! needs to be split. This ensures that incompletely split pages should not be
! seen under normal circumstances; only when insertion to the parent fails
! for some reason.
! 
! We flag the left page, even though it's the right page that's missing the
! downlink, beacuse it's more convenient to know already when following the
! right-link from the left page to the right page that it will need to have
! its downlink inserted to the parent.
! 
! We used to keep track of incomplete splits during recovery and finish them
! immediately at end of recovery, instead of doing it lazily at the next
! insertion. However, that made the recovery much more complicated, and only
! fixed the problem when crash recovery was performed. An incomplete split can
! also occur if an otherwise recoverable error, like out-of-memory or
! out-of-disk-space, happens while inserting the downlink to the parent.
  
  When splitting a non-root page that is alone on its level, the required
  metapage update (of the "fast root" link) is performed and logged as part
*** a/src/backend/access/nbtree/nbtinsert.c
--- b/src/backend/access/nbtree/nbtinsert.c
***************
*** 58,72 **** static void _bt_findinsertloc(Relation rel,
  				  int keysz,
  				  ScanKey scankey,
  				  IndexTuple newtup,
  				  Relation heapRel);
! static void _bt_insertonpg(Relation rel, Buffer buf,
  			   BTStack stack,
  			   IndexTuple itup,
  			   OffsetNumber newitemoff,
  			   bool split_only_page);
! static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
! 		  OffsetNumber newitemoff, Size newitemsz,
  		  IndexTuple newitem, bool newitemonleft);
  static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
  				 OffsetNumber newitemoff,
  				 Size newitemsz,
--- 58,76 ----
  				  int keysz,
  				  ScanKey scankey,
  				  IndexTuple newtup,
+ 				  BTStack stack,
  				  Relation heapRel);
! static void _bt_insertonpg(Relation rel, Buffer buf, Buffer cbuf,
  			   BTStack stack,
  			   IndexTuple itup,
  			   OffsetNumber newitemoff,
  			   bool split_only_page);
! static Buffer _bt_split(Relation rel, Buffer buf, Buffer cbuf,
! 		  OffsetNumber firstright, OffsetNumber newitemoff, Size newitemsz,
  		  IndexTuple newitem, bool newitemonleft);
+ static void _bt_insert_parent(Relation rel, Buffer buf, Buffer rbuf,
+ 				  BTStack stack, bool is_root, bool is_only);
+ static void _bt_finish_split(Relation rel, Buffer lbuf, BTStack stack);
  static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
  				 OffsetNumber newitemoff,
  				 Size newitemsz,
***************
*** 130,136 **** top:
  	 * move right in the tree.	See Lehman and Yao for an excruciatingly
  	 * precise description.
  	 */
! 	buf = _bt_moveright(rel, buf, natts, itup_scankey, false, BT_WRITE);
  
  	/*
  	 * If we're not allowing duplicates, make sure the key isn't already in
--- 134,141 ----
  	 * move right in the tree.	See Lehman and Yao for an excruciatingly
  	 * precise description.
  	 */
! 	buf = _bt_moveright(rel, buf, natts, itup_scankey, false,
! 						true, stack, BT_WRITE);
  
  	/*
  	 * If we're not allowing duplicates, make sure the key isn't already in
***************
*** 183,190 **** top:
  		 */
  		CheckForSerializableConflictIn(rel, NULL, buf);
  		/* do the insertion */
! 		_bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup, heapRel);
! 		_bt_insertonpg(rel, buf, stack, itup, offset, false);
  	}
  	else
  	{
--- 188,196 ----
  		 */
  		CheckForSerializableConflictIn(rel, NULL, buf);
  		/* do the insertion */
! 		_bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup,
! 						  stack, heapRel);
! 		_bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false);
  	}
  	else
  	{
***************
*** 508,517 **** _bt_findinsertloc(Relation rel,
  				  int keysz,
  				  ScanKey scankey,
  				  IndexTuple newtup,
  				  Relation heapRel)
  {
  	Buffer		buf = *bufptr;
! 	Page		page = BufferGetPage(buf);
  	Size		itemsz;
  	BTPageOpaque lpageop;
  	bool		movedright,
--- 514,524 ----
  				  int keysz,
  				  ScanKey scankey,
  				  IndexTuple newtup,
+ 				  BTStack stack,
  				  Relation heapRel)
  {
  	Buffer		buf = *bufptr;
! 	Page		page;
  	Size		itemsz;
  	BTPageOpaque lpageop;
  	bool		movedright,
***************
*** 519,524 **** _bt_findinsertloc(Relation rel,
--- 526,532 ----
  	OffsetNumber newitemoff;
  	OffsetNumber firstlegaloff = *offsetptr;
  
+ 	page = BufferGetPage(buf);
  	lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
  
  	itemsz = IndexTupleDSize(*newtup);
***************
*** 570,575 **** _bt_findinsertloc(Relation rel,
--- 578,584 ----
  	while (PageGetFreeSpace(page) < itemsz)
  	{
  		Buffer		rbuf;
+ 		BlockNumber	rblkno;
  
  		/*
  		 * before considering moving right, see if we can obtain enough space
***************
*** 607,624 **** _bt_findinsertloc(Relation rel,
  		 */
  		rbuf = InvalidBuffer;
  
  		for (;;)
  		{
- 			BlockNumber rblkno = lpageop->btpo_next;
- 
  			rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
  			page = BufferGetPage(rbuf);
  			lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
  			if (!P_IGNORE(lpageop))
  				break;
  			if (P_RIGHTMOST(lpageop))
  				elog(ERROR, "fell off the end of index \"%s\"",
  					 RelationGetRelationName(rel));
  		}
  		_bt_relbuf(rel, buf);
  		buf = rbuf;
--- 616,648 ----
  		 */
  		rbuf = InvalidBuffer;
  
+ 		rblkno = lpageop->btpo_next;
  		for (;;)
  		{
  			rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
  			page = BufferGetPage(rbuf);
  			lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
+ 
+ 			/*
+ 			 * If this page was incompletely split, do that now. We do this
+ 			 * while holding a lock on the left sibling, which is not good
+ 			 * because finishing the split could be a fairly length operation.
+ 			 * But this should happen very seldom.
+ 			 */
+ 			if (P_NEEDS_FIXUP(lpageop))
+ 			{
+ 				_bt_fixup(rel, rbuf, stack);
+ 				rbuf = InvalidBuffer;
+ 				continue;
+ 			}
+ 
  			if (!P_IGNORE(lpageop))
  				break;
  			if (P_RIGHTMOST(lpageop))
  				elog(ERROR, "fell off the end of index \"%s\"",
  					 RelationGetRelationName(rel));
+ 
+ 			rblkno = lpageop->btpo_next;
  		}
  		_bt_relbuf(rel, buf);
  		buf = rbuf;
***************
*** 664,669 **** _bt_findinsertloc(Relation rel,
--- 688,697 ----
   *		insertion, and the buffer must be pinned and write-locked.	On return,
   *		we will have dropped both the pin and the lock on the buffer.
   *
+  *		When inserting to a non-leaf page, 'cbuf' is the left-sibling of the
+  *		page we're inserting the downlink for. This function will clear the
+  *		INCOMPLETE_SPLIT flag on it, and release the buffer.
+  *
   *		The locking interactions in this code are critical.  You should
   *		grok Lehman and Yao's paper before making any changes.  In addition,
   *		you need to understand how we disambiguate duplicate keys in this
***************
*** 677,682 **** _bt_findinsertloc(Relation rel,
--- 705,711 ----
  static void
  _bt_insertonpg(Relation rel,
  			   Buffer buf,
+ 			   Buffer cbuf,
  			   BTStack stack,
  			   IndexTuple itup,
  			   OffsetNumber newitemoff,
***************
*** 690,695 **** _bt_insertonpg(Relation rel,
--- 719,735 ----
  	page = BufferGetPage(buf);
  	lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
  
+ 	/*
+ 	 * The caller should've finished any incomplete splits and page deletions
+ 	 * already
+ 	 */
+ 	if (P_INCOMPLETE_SPLIT(lpageop))
+ 		elog(ERROR, "cannot insert to incompletely-split page %u",
+ 			 BufferGetBlockNumber(buf));
+ 	if (P_LEFT_HALF_DEAD(lpageop))
+ 		elog(ERROR, "cannot insert to page %u with half-dead left sibling",
+ 			 BufferGetBlockNumber(buf));
+ 
  	itemsz = IndexTupleDSize(*itup);
  	itemsz = MAXALIGN(itemsz);	/* be safe, PageAddItem will do this but we
  								 * need to be consistent */
***************
*** 714,720 **** _bt_insertonpg(Relation rel,
  									  &newitemonleft);
  
  		/* split the buffer into left and right halves */
! 		rbuf = _bt_split(rel, buf, firstright,
  						 newitemoff, itemsz, itup, newitemonleft);
  		PredicateLockPageSplit(rel,
  							   BufferGetBlockNumber(buf),
--- 754,760 ----
  									  &newitemonleft);
  
  		/* split the buffer into left and right halves */
! 		rbuf = _bt_split(rel, buf, cbuf, firstright,
  						 newitemoff, itemsz, itup, newitemonleft);
  		PredicateLockPageSplit(rel,
  							   BufferGetBlockNumber(buf),
***************
*** 788,798 **** _bt_insertonpg(Relation rel,
  			MarkBufferDirty(metabuf);
  		}
  
  		/* XLOG stuff */
  		if (RelationNeedsWAL(rel))
  		{
  			xl_btree_insert xlrec;
! 			BlockNumber xldownlink;
  			xl_btree_metadata xlmeta;
  			uint8		xlinfo;
  			XLogRecPtr	recptr;
--- 828,848 ----
  			MarkBufferDirty(metabuf);
  		}
  
+ 		/* clear INCOMPLETE_SPLIT flag on child if this finishes a split */
+ 		if (!P_ISLEAF(lpageop))
+ 		{
+ 			Page		cpage = BufferGetPage(cbuf);
+ 			BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage);
+ 			Assert(P_INCOMPLETE_SPLIT(cpageop));
+ 			cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
+ 			MarkBufferDirty(cbuf);
+ 		}
+ 
  		/* XLOG stuff */
  		if (RelationNeedsWAL(rel))
  		{
  			xl_btree_insert xlrec;
! 			BlockNumber xlleftchild;
  			xl_btree_metadata xlmeta;
  			uint8		xlinfo;
  			XLogRecPtr	recptr;
***************
*** 812,823 **** _bt_insertonpg(Relation rel,
  				xlinfo = XLOG_BTREE_INSERT_LEAF;
  			else
  			{
! 				xldownlink = ItemPointerGetBlockNumber(&(itup->t_tid));
! 				Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
! 
! 				nextrdata->data = (char *) &xldownlink;
  				nextrdata->len = sizeof(BlockNumber);
! 				nextrdata->buffer = InvalidBuffer;
  				nextrdata->next = nextrdata + 1;
  				nextrdata++;
  
--- 862,876 ----
  				xlinfo = XLOG_BTREE_INSERT_LEAF;
  			else
  			{
! 				/*
! 				 * Include the block number of the left child, whose
! 				 * INCOMPLETE_SPLIT flag is cleared.
! 				 */
! 				xlleftchild = BufferGetBlockNumber(cbuf);
! 				nextrdata->data = (char *) &xlleftchild;
  				nextrdata->len = sizeof(BlockNumber);
! 				nextrdata->buffer = cbuf;
! 				nextrdata->buffer_std = true;
  				nextrdata->next = nextrdata + 1;
  				nextrdata++;
  
***************
*** 870,875 **** _bt_insertonpg(Relation rel,
--- 923,930 ----
  		END_CRIT_SECTION();
  
  		/* release buffers; send out relcache inval if metapage changed */
+ 		if (!P_ISLEAF(lpageop))
+ 			_bt_relbuf(rel, cbuf);
  		if (BufferIsValid(metabuf))
  		{
  			if (!InRecovery)
***************
*** 889,899 **** _bt_insertonpg(Relation rel,
   *		new right page.  newitemoff etc. tell us about the new item that
   *		must be inserted along with the data from the old page.
   *
   *		Returns the new right sibling of buf, pinned and write-locked.
   *		The pin and lock on buf are maintained.
   */
  static Buffer
! _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
  		  OffsetNumber newitemoff, Size newitemsz, IndexTuple newitem,
  		  bool newitemonleft)
  {
--- 944,958 ----
   *		new right page.  newitemoff etc. tell us about the new item that
   *		must be inserted along with the data from the old page.
   *
+  *		When splitting a non-leaf page, 'cbuf' is the left-sibling of the
+  *		page we're inserting the downlink for. This function will clear the
+  *		INCOMPLETE_SPLIT flag on it, and release the buffer.
+  *
   *		Returns the new right sibling of buf, pinned and write-locked.
   *		The pin and lock on buf are maintained.
   */
  static Buffer
! _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
  		  OffsetNumber newitemoff, Size newitemsz, IndexTuple newitem,
  		  bool newitemonleft)
  {
***************
*** 961,966 **** _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
--- 1020,1027 ----
  	lopaque->btpo_flags = oopaque->btpo_flags;
  	lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE);
  	ropaque->btpo_flags = lopaque->btpo_flags;
+ 	/* set flag in left page indicating that the right page has no downlink */
+ 	lopaque->btpo_flags |= BTP_INCOMPLETE_SPLIT;
  	lopaque->btpo_prev = oopaque->btpo_prev;
  	lopaque->btpo_next = rightpagenumber;
  	ropaque->btpo_prev = origpagenumber;
***************
*** 1184,1189 **** _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
--- 1245,1262 ----
  		MarkBufferDirty(sbuf);
  	}
  
+ 	/*
+ 	 * Clear INCOMPLETE_SPLIT flag on child if inserting the new item finishes
+ 	 * a split.
+ 	 */
+ 	if (ropaque->btpo.level > 0)
+ 	{
+ 		Page		cpage = BufferGetPage(cbuf);
+ 		BTPageOpaque cpageop = (BTPageOpaque) PageGetSpecialPointer(cpage);
+ 		cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
+ 		MarkBufferDirty(cbuf);
+ 	}
+ 
  	/* XLOG stuff */
  	if (RelationNeedsWAL(rel))
  	{
***************
*** 1206,1239 **** _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
  
  		lastrdata = &rdata[0];
  
- 		if (ropaque->btpo.level > 0)
- 		{
- 			/* Log downlink on non-leaf pages */
- 			lastrdata->next = lastrdata + 1;
- 			lastrdata++;
- 
- 			lastrdata->data = (char *) &newitem->t_tid.ip_blkid;
- 			lastrdata->len = sizeof(BlockIdData);
- 			lastrdata->buffer = InvalidBuffer;
- 
- 			/*
- 			 * We must also log the left page's high key, because the right
- 			 * page's leftmost key is suppressed on non-leaf levels.  Show it
- 			 * as belonging to the left page buffer, so that it is not stored
- 			 * if XLogInsert decides it needs a full-page image of the left
- 			 * page.
- 			 */
- 			lastrdata->next = lastrdata + 1;
- 			lastrdata++;
- 
- 			itemid = PageGetItemId(origpage, P_HIKEY);
- 			item = (IndexTuple) PageGetItem(origpage, itemid);
- 			lastrdata->data = (char *) item;
- 			lastrdata->len = MAXALIGN(IndexTupleSize(item));
- 			lastrdata->buffer = buf;	/* backup block 1 */
- 			lastrdata->buffer_std = true;
- 		}
- 
  		/*
  		 * Log the new item and its offset, if it was inserted on the left
  		 * page. (If it was put on the right page, we don't need to explicitly
--- 1279,1284 ----
***************
*** 1260,1276 **** _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
  			lastrdata->buffer = buf;	/* backup block 1 */
  			lastrdata->buffer_std = true;
  		}
! 		else if (ropaque->btpo.level == 0)
  		{
  			/*
! 			 * Although we don't need to WAL-log the new item, we still need
! 			 * XLogInsert to consider storing a full-page image of the left
! 			 * page, so make an empty entry referencing that buffer. This also
! 			 * ensures that the left page is always backup block 1.
  			 */
  			lastrdata->next = lastrdata + 1;
  			lastrdata++;
  
  			lastrdata->data = NULL;
  			lastrdata->len = 0;
  			lastrdata->buffer = buf;	/* backup block 1 */
--- 1305,1344 ----
  			lastrdata->buffer = buf;	/* backup block 1 */
  			lastrdata->buffer_std = true;
  		}
! 
! 		/* Log left page */
! 		if (ropaque->btpo.level > 0)
  		{
+ 			lastrdata->next = lastrdata + 1;
+ 			lastrdata++;
+ 
  			/*
! 			 * We must also log the left page's high key, because the right
! 			 * page's leftmost key is suppressed on non-leaf levels.  Show it
! 			 * as belonging to the left page buffer, so that it is not stored
! 			 * if XLogInsert decides it needs a full-page image of the left
! 			 * page.
  			 */
+ 			itemid = PageGetItemId(origpage, P_HIKEY);
+ 			item = (IndexTuple) PageGetItem(origpage, itemid);
+ 			lastrdata->data = (char *) item;
+ 			lastrdata->len = MAXALIGN(IndexTupleSize(item));
+ 			lastrdata->buffer = buf;	/* backup block 1 */
+ 			lastrdata->buffer_std = true;
+ 		}
+ 
+ 		if (ropaque->btpo.level == 0 && !newitemonleft)
+ 		{
  			lastrdata->next = lastrdata + 1;
  			lastrdata++;
  
+ 			/*
+ 			 * Although we don't need to WAL-log anything on the left page,
+ 			 * the new item, we still need XLogInsert to consider storing a
+ 			 * full-page image of the left page, so make an empty entry
+ 			 * referencing that buffer. This also ensures that the left page
+ 			 * is always backup block 1.
+ 			 */
  			lastrdata->data = NULL;
  			lastrdata->len = 0;
  			lastrdata->buffer = buf;	/* backup block 1 */
***************
*** 1278,1283 **** _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
--- 1346,1367 ----
  		}
  
  		/*
+ 		 * Log block number of left child, whose INCOMPLETE_SPLIT flag this
+ 		 * insertion clears.
+ 		 */
+ 		if (ropaque->btpo.level > 0)
+ 		{
+ 			BlockNumber cblkno = BufferGetBlockNumber(cbuf);
+ 			lastrdata->next = lastrdata + 1;
+ 			lastrdata++;
+ 
+ 			lastrdata->data = (char *) &cblkno;
+ 			lastrdata->len = sizeof(BlockNumber);
+ 			lastrdata->buffer = cbuf;	/* backup block 2 */
+ 			lastrdata->buffer_std = true;
+ 		}
+ 
+ 		/*
  		 * Log the contents of the right page in the format understood by
  		 * _bt_restore_page(). We set lastrdata->buffer to InvalidBuffer,
  		 * because we're going to recreate the whole page anyway, so it should
***************
*** 1306,1312 **** _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
  
  			lastrdata->data = NULL;
  			lastrdata->len = 0;
! 			lastrdata->buffer = sbuf;	/* backup block 2 */
  			lastrdata->buffer_std = true;
  		}
  
--- 1390,1396 ----
  
  			lastrdata->data = NULL;
  			lastrdata->len = 0;
! 			lastrdata->buffer = sbuf;	/* bkp block 2 (leaf) or 3 (non-leaf) */
  			lastrdata->buffer_std = true;
  		}
  
***************
*** 1333,1338 **** _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
--- 1417,1426 ----
  	if (!P_RIGHTMOST(ropaque))
  		_bt_relbuf(rel, sbuf);
  
+ 	/* release the child */
+ 	if (ropaque->btpo.level > 0)
+ 		_bt_relbuf(rel, cbuf);
+ 
  	/* split's done */
  	return rbuf;
  }
***************
*** 1603,1612 **** _bt_checksplitloc(FindSplitData *state,
   *			have to be efficient (concurrent ROOT split, WAL recovery)
   * is_root - we split the true root
   * is_only - we split a page alone on its level (might have been fast root)
-  *
-  * This is exported so it can be called by nbtxlog.c.
   */
! void
  _bt_insert_parent(Relation rel,
  				  Buffer buf,
  				  Buffer rbuf,
--- 1691,1698 ----
   *			have to be efficient (concurrent ROOT split, WAL recovery)
   * is_root - we split the true root
   * is_only - we split a page alone on its level (might have been fast root)
   */
! static void
  _bt_insert_parent(Relation rel,
  				  Buffer buf,
  				  Buffer rbuf,
***************
*** 1685,1696 **** _bt_insert_parent(Relation rel,
  		 * 05/27/97
  		 */
  		ItemPointerSet(&(stack->bts_btentry.t_tid), bknum, P_HIKEY);
- 
  		pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
  
! 		/* Now we can unlock the children */
  		_bt_relbuf(rel, rbuf);
- 		_bt_relbuf(rel, buf);
  
  		/* Check for error only after writing children */
  		if (pbuf == InvalidBuffer)
--- 1771,1783 ----
  		 * 05/27/97
  		 */
  		ItemPointerSet(&(stack->bts_btentry.t_tid), bknum, P_HIKEY);
  		pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
  
! 		/*
! 		 * Now we can unlock the right child. The left child will be unlocked
! 		 * by _bt_insertonpg().
! 		 */
  		_bt_relbuf(rel, rbuf);
  
  		/* Check for error only after writing children */
  		if (pbuf == InvalidBuffer)
***************
*** 1698,1704 **** _bt_insert_parent(Relation rel,
  				 RelationGetRelationName(rel), bknum, rbknum);
  
  		/* Recursively update the parent */
! 		_bt_insertonpg(rel, pbuf, stack->bts_parent,
  					   new_item, stack->bts_offset + 1,
  					   is_only);
  
--- 1785,1791 ----
  				 RelationGetRelationName(rel), bknum, rbknum);
  
  		/* Recursively update the parent */
! 		_bt_insertonpg(rel, pbuf, buf, stack->bts_parent,
  					   new_item, stack->bts_offset + 1,
  					   is_only);
  
***************
*** 1708,1713 **** _bt_insert_parent(Relation rel,
--- 1795,1895 ----
  }
  
  /*
+  * _bt_fixup() -- Finish incomplete actions on a page.
+  *
+  * A crash or other failure can leave a split or the deletion of a half-dead
+  * page incomplete. The insertion routines won't allow to insert on a page
+  * that is incompletely split, or whose left sibling is half-dead (in which
+  * case the key of the page's downlink is too high, and needs to be replaced
+  * with the half-dead left siblings downlink key). Before inserting on such
+  * a page, call _bt_fixup() to finish the incomplete action.
+  */
+ void
+ _bt_fixup(Relation rel, Buffer buf, BTStack stack)
+ {
+ 	Page		page = BufferGetPage(buf);
+ 	BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+ 
+ 	if (P_ISHALFDEAD(opaque))
+ 	{
+ 		elog(LOG, "finishing deletion of half-dead page %u",
+ 			 BufferGetBlockNumber(buf));
+ 		(void) _bt_pagedel(rel, buf, stack);
+ 	}
+ 	else if (P_LEFT_HALF_DEAD(opaque))
+ 	{
+ 		buf = _bt_walk_left(rel, buf);
+ 
+ 		elog(LOG, "finishing deletion of half-dead page %u",
+ 			 BufferGetBlockNumber(buf));
+ 		(void) _bt_pagedel(rel, buf, NULL);
+ 	}
+ 	else if (P_INCOMPLETE_SPLIT(opaque))
+ 	{
+ 		/* upgrade the lock */
+ 		buf = _bt_relandgetbuf(rel, buf, BufferGetBlockNumber(buf), BT_WRITE);
+ 		page = BufferGetPage(buf);
+ 		opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+ 		if (!P_INCOMPLETE_SPLIT(opaque))
+ 			_bt_relbuf(rel, buf);
+ 		else
+ 			_bt_finish_split(rel, buf, stack);
+ 	}
+ 	else
+ 		_bt_relbuf(rel, buf);
+ }
+ 
+ /*
+  * _bt_finish_split() -- Finish an incomplete split
+  *
+  * On entry, we hold write-mode lock on it, and the lock is released on
+  * exit.
+  */
+ static void
+ _bt_finish_split(Relation rel, Buffer lbuf, BTStack stack)
+ {
+ 	Page		lpage = BufferGetPage(lbuf);
+ 	BTPageOpaque lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage);
+ 	Buffer		rbuf;
+ 	Page		rpage;
+ 	BTPageOpaque rpageop;
+ 	bool		was_root;
+ 	bool		was_only;
+ 
+ 	/* Lock right sibling, the one missing the downlink */
+ 	rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE);
+ 	rpage = BufferGetPage(rbuf);
+ 	rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage);
+ 
+ 	/* Could this be a root split? */
+ 	if (!stack)
+ 	{
+ 		Buffer		metabuf;
+ 		Page		metapg;
+ 		BTMetaPageData *metad;
+ 
+ 		/* acquire lock on the metapage */
+ 		metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
+ 		metapg = BufferGetPage(metabuf);
+ 		metad = BTPageGetMeta(metapg);
+ 
+ 		was_root = (metad->btm_root == BufferGetBlockNumber(lbuf));
+ 
+ 		_bt_relbuf(rel, metabuf);
+ 	}
+ 	else
+ 		was_root = false;
+ 
+ 	/* Was this the only page on the level before split? */
+ 	was_only = (P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop));
+ 
+ 	elog(DEBUG1, "finishing incomplete split of %u/%u",
+ 		 BufferGetBlockNumber(lbuf), BufferGetBlockNumber(rbuf));
+ 
+ 	_bt_insert_parent(rel, lbuf, rbuf, stack, was_root, was_only);
+ }
+ 
+ /*
   *	_bt_getstackbuf() -- Walk back up the tree one step, and find the item
   *						 we last looked at in the parent.
   *
***************
*** 1739,1744 **** _bt_getstackbuf(Relation rel, BTStack stack, int access)
--- 1921,1932 ----
  		page = BufferGetPage(buf);
  		opaque = (BTPageOpaque) PageGetSpecialPointer(page);
  
+ 		if (P_NEEDS_FIXUP(opaque))
+ 		{
+ 			_bt_fixup(rel, buf, stack);
+ 			continue;
+ 		}
+ 
  		if (!P_IGNORE(opaque))
  		{
  			OffsetNumber offnum,
***************
*** 1843,1848 **** _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
--- 2031,2037 ----
  				rbkno;
  	BlockNumber rootblknum;
  	BTPageOpaque rootopaque;
+ 	BTPageOpaque lopaque;
  	ItemId		itemid;
  	IndexTuple	item;
  	Size		itemsz;
***************
*** 1854,1859 **** _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
--- 2043,2049 ----
  	lbkno = BufferGetBlockNumber(lbuf);
  	rbkno = BufferGetBlockNumber(rbuf);
  	lpage = BufferGetPage(lbuf);
+ 	lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage);
  
  	/* get a new root page */
  	rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
***************
*** 1927,1932 **** _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
--- 2117,2127 ----
  			 BufferGetBlockNumber(lbuf), RelationGetRelationName(rel));
  	pfree(new_item);
  
+ 	/* Clear the incomplete-split flag in the left child */
+ 	Assert(P_INCOMPLETE_SPLIT(lopaque));
+ 	lopaque->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
+ 	MarkBufferDirty(lbuf);
+ 
  	MarkBufferDirty(rootbuf);
  	MarkBufferDirty(metabuf);
  
***************
*** 1935,1941 **** _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
  	{
  		xl_btree_newroot xlrec;
  		XLogRecPtr	recptr;
! 		XLogRecData rdata[2];
  
  		xlrec.node = rel->rd_node;
  		xlrec.rootblk = rootblknum;
--- 2130,2136 ----
  	{
  		xl_btree_newroot xlrec;
  		XLogRecPtr	recptr;
! 		XLogRecData rdata[3];
  
  		xlrec.node = rel->rd_node;
  		xlrec.rootblk = rootblknum;
***************
*** 1954,1960 **** _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
  		rdata[1].len = ((PageHeader) rootpage)->pd_special -
  			((PageHeader) rootpage)->pd_upper;
  		rdata[1].buffer = InvalidBuffer;
! 		rdata[1].next = NULL;
  
  		recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, rdata);
  
--- 2149,2161 ----
  		rdata[1].len = ((PageHeader) rootpage)->pd_special -
  			((PageHeader) rootpage)->pd_upper;
  		rdata[1].buffer = InvalidBuffer;
! 		rdata[1].next = &(rdata[2]);
! 
! 		/* Make a full-page image of the left child if needed */
! 		rdata[2].data = NULL;
! 		rdata[2].len = 0;
! 		rdata[2].buffer = lbuf;
! 		rdata[2].next = NULL;
  
  		recptr = XLogInsert(RM_BTREE_ID, XLOG_BTREE_NEWROOT, rdata);
  
*** a/src/backend/access/nbtree/nbtpage.c
--- b/src/backend/access/nbtree/nbtpage.c
***************
*** 980,993 **** _bt_parent_deletion_safe(Relation rel, BlockNumber target, BTStack stack)
  	Page		page;
  	BTPageOpaque opaque;
  
- 	/*
- 	 * In recovery mode, assume the deletion being replayed is valid.  We
- 	 * can't always check it because we won't have a full search stack, and we
- 	 * should complain if there's a problem, anyway.
- 	 */
- 	if (InRecovery)
- 		return true;
- 
  	/* Locate the parent's downlink (updating the stack entry if needed) */
  	ItemPointerSet(&(stack->bts_btentry.t_tid), target, P_HIKEY);
  	pbuf = _bt_getstackbuf(rel, stack, BT_READ);
--- 980,985 ----
***************
*** 1081,1089 **** _bt_pagedel(Relation rel, Buffer buf, BTStack stack)
  	ScanKey		itup_scankey;
  	Buffer		lbuf,
  				rbuf,
! 				pbuf;
  	bool		parent_half_dead;
  	bool		parent_one_child;
  	bool		rightsib_empty;
  	Buffer		metabuf = InvalidBuffer;
  	Page		metapg = NULL;
--- 1073,1083 ----
  	ScanKey		itup_scankey;
  	Buffer		lbuf,
  				rbuf,
! 				pbuf,
! 				prbuf;
  	bool		parent_half_dead;
  	bool		parent_one_child;
+ 	bool		leftsib_half_dead;
  	bool		rightsib_empty;
  	Buffer		metabuf = InvalidBuffer;
  	Page		metapg = NULL;
***************
*** 1133,1182 **** _bt_pagedel(Relation rel, Buffer buf, BTStack stack)
  	 */
  	if (stack == NULL)
  	{
! 		if (!InRecovery)
! 		{
! 			/* we need an insertion scan key to do our search, so build one */
! 			itup_scankey = _bt_mkscankey(rel, targetkey);
! 			/* find the leftmost leaf page containing this key */
! 			stack = _bt_search(rel, rel->rd_rel->relnatts, itup_scankey, false,
! 							   &lbuf, BT_READ);
! 			/* don't need a pin on that either */
! 			_bt_relbuf(rel, lbuf);
  
! 			/*
! 			 * If we are trying to delete an interior page, _bt_search did
! 			 * more than we needed.  Locate the stack item pointing to our
! 			 * parent level.
! 			 */
! 			ilevel = 0;
! 			for (;;)
! 			{
! 				if (stack == NULL)
! 					elog(ERROR, "not enough stack items");
! 				if (ilevel == targetlevel)
! 					break;
! 				stack = stack->bts_parent;
! 				ilevel++;
! 			}
! 		}
! 		else
  		{
! 			/*
! 			 * During WAL recovery, we can't use _bt_search (for one reason,
! 			 * it might invoke user-defined comparison functions that expect
! 			 * facilities not available in recovery mode).	Instead, just set
! 			 * up a dummy stack pointing to the left end of the parent tree
! 			 * level, from which _bt_getstackbuf will walk right to the parent
! 			 * page.  Painful, but we don't care too much about performance in
! 			 * this scenario.
! 			 */
! 			pbuf = _bt_get_endpoint(rel, targetlevel + 1, false);
! 			stack = (BTStack) palloc(sizeof(BTStackData));
! 			stack->bts_blkno = BufferGetBlockNumber(pbuf);
! 			stack->bts_offset = InvalidOffsetNumber;
! 			/* bts_btentry will be initialized below */
! 			stack->bts_parent = NULL;
! 			_bt_relbuf(rel, pbuf);
  		}
  	}
  
--- 1127,1155 ----
  	 */
  	if (stack == NULL)
  	{
! 		/* we need an insertion scan key to do our search, so build one */
! 		itup_scankey = _bt_mkscankey(rel, targetkey);
! 		/* find the leftmost leaf page containing this key */
! 		stack = _bt_search(rel, rel->rd_rel->relnatts, itup_scankey, false,
! 						   &lbuf, BT_READ);
! 		/* don't need a pin on that either */
! 		_bt_relbuf(rel, lbuf);
! 		lbuf = InvalidBuffer;
  
! 		/*
! 		 * If we are trying to delete an interior page, _bt_search did
! 		 * more than we needed.  Locate the stack item pointing to our
! 		 * parent level.
! 		 */
! 		ilevel = 0;
! 		for (;;)
  		{
! 			if (stack == NULL)
! 				elog(ERROR, "not enough stack items");
! 			if (ilevel == targetlevel)
! 				break;
! 			stack = stack->bts_parent;
! 			ilevel++;
  		}
  	}
  
***************
*** 1199,1207 **** _bt_pagedel(Relation rel, Buffer buf, BTStack stack)
  	 * target page.  The sibling that was current a moment ago could have
  	 * split, so we may have to move right.  This search could fail if either
  	 * the sibling or the target page was deleted by someone else meanwhile;
! 	 * if so, give up.	(Right now, that should never happen, since page
! 	 * deletion is only done in VACUUM and there shouldn't be multiple VACUUMs
! 	 * concurrently on the same table.)
  	 */
  	if (leftsib != P_NONE)
  	{
--- 1172,1180 ----
  	 * target page.  The sibling that was current a moment ago could have
  	 * split, so we may have to move right.  This search could fail if either
  	 * the sibling or the target page was deleted by someone else meanwhile;
! 	 * if so, give up. Although page deletion is only initiated by VACUUM,
! 	 * other backends can delete half-dead pages they encounter during
! 	 * insertions.
  	 */
  	if (leftsib != P_NONE)
  	{
***************
*** 1223,1228 **** _bt_pagedel(Relation rel, Buffer buf, BTStack stack)
--- 1196,1203 ----
  			page = BufferGetPage(lbuf);
  			opaque = (BTPageOpaque) PageGetSpecialPointer(page);
  		}
+ 
+ 		leftsib_half_dead = P_ISHALFDEAD(opaque);
  	}
  	else
  		lbuf = InvalidBuffer;
***************
*** 1311,1316 **** _bt_pagedel(Relation rel, Buffer buf, BTStack stack)
--- 1286,1300 ----
  	}
  
  	/*
+ 	 * Also lock the parent's right sibling, if we need to set its
+ 	 * LEFT_IS_HALF_DEAD flag.
+ 	 */
+ 	if (parent_half_dead)
+ 		prbuf = _bt_getbuf(rel, opaque->btpo_next, BT_WRITE);
+ 	else
+ 		prbuf = InvalidBuffer;
+ 
+ 	/*
  	 * If we are deleting the next-to-last page on the target's level, then
  	 * the rightsib is a candidate to become the new fast root. (In theory, it
  	 * might be possible to push the fast root even further down, but the odds
***************
*** 1397,1402 **** _bt_pagedel(Relation rel, Buffer buf, BTStack stack)
--- 1381,1390 ----
  	{
  		PageIndexTupleDelete(page, poffset);
  		opaque->btpo_flags |= BTP_HALF_DEAD;
+ 
+ 		page = BufferGetPage(prbuf);
+ 		opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+ 		opaque->btpo_flags |= BTP_LEFT_HALF_DEAD;
  	}
  	else
  	{
***************
*** 1426,1431 **** _bt_pagedel(Relation rel, Buffer buf, BTStack stack)
--- 1414,1424 ----
  	opaque = (BTPageOpaque) PageGetSpecialPointer(page);
  	Assert(opaque->btpo_prev == target);
  	opaque->btpo_prev = leftsib;
+ 	/* Update the right page's left-is-half-dead flag */
+ 	if (leftsib_half_dead)
+ 		opaque->btpo_flags |= BTP_LEFT_HALF_DEAD;
+ 	else
+ 		opaque->btpo_flags &= ~BTP_LEFT_HALF_DEAD;
  	rightsib_empty = (P_FIRSTDATAKEY(opaque) > PageGetMaxOffsetNumber(page));
  
  	/*
***************
*** 1458,1463 **** _bt_pagedel(Relation rel, Buffer buf, BTStack stack)
--- 1451,1458 ----
  	MarkBufferDirty(buf);
  	if (BufferIsValid(lbuf))
  		MarkBufferDirty(lbuf);
+ 	if (BufferIsValid(prbuf))
+ 		MarkBufferDirty(prbuf);
  
  	/* XLOG stuff */
  	if (RelationNeedsWAL(rel))
***************
*** 1466,1472 **** _bt_pagedel(Relation rel, Buffer buf, BTStack stack)
  		xl_btree_metadata xlmeta;
  		uint8		xlinfo;
  		XLogRecPtr	recptr;
! 		XLogRecData rdata[5];
  		XLogRecData *nextrdata;
  
  		xlrec.target.node = rel->rd_node;
--- 1461,1467 ----
  		xl_btree_metadata xlmeta;
  		uint8		xlinfo;
  		XLogRecPtr	recptr;
! 		XLogRecData rdata[6];
  		XLogRecData *nextrdata;
  
  		xlrec.target.node = rel->rd_node;
***************
*** 1474,1479 **** _bt_pagedel(Relation rel, Buffer buf, BTStack stack)
--- 1469,1481 ----
  		xlrec.deadblk = target;
  		xlrec.leftblk = leftsib;
  		xlrec.rightblk = rightsib;
+ 		xlrec.flags = 0;
+ 		if (leftsib_half_dead)
+ 			xlrec.flags |= DP_LEFT_IS_HALF_DEAD;
+ 		if (BufferIsValid(prbuf))
+ 			xlrec.parentright = BufferGetBlockNumber(prbuf);
+ 		else
+ 			xlrec.parentright = InvalidBlockNumber;
  		xlrec.btpo_xact = opaque->btpo.xact;
  
  		rdata[0].data = (char *) &xlrec;
***************
*** 1502,1517 **** _bt_pagedel(Relation rel, Buffer buf, BTStack stack)
  
  		nextrdata->data = NULL;
  		nextrdata->len = 0;
- 		nextrdata->next = nextrdata + 1;
  		nextrdata->buffer = pbuf;
  		nextrdata->buffer_std = true;
  		nextrdata++;
  
  		nextrdata->data = NULL;
  		nextrdata->len = 0;
  		nextrdata->buffer = rbuf;
  		nextrdata->buffer_std = true;
- 		nextrdata->next = NULL;
  
  		if (BufferIsValid(lbuf))
  		{
--- 1504,1518 ----
  
  		nextrdata->data = NULL;
  		nextrdata->len = 0;
  		nextrdata->buffer = pbuf;
  		nextrdata->buffer_std = true;
+ 		nextrdata->next = nextrdata + 1;
  		nextrdata++;
  
  		nextrdata->data = NULL;
  		nextrdata->len = 0;
  		nextrdata->buffer = rbuf;
  		nextrdata->buffer_std = true;
  
  		if (BufferIsValid(lbuf))
  		{
***************
*** 1521,1529 **** _bt_pagedel(Relation rel, Buffer buf, BTStack stack)
  			nextrdata->len = 0;
  			nextrdata->buffer = lbuf;
  			nextrdata->buffer_std = true;
- 			nextrdata->next = NULL;
  		}
  
  		recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
  
  		if (BufferIsValid(metabuf))
--- 1522,1540 ----
  			nextrdata->len = 0;
  			nextrdata->buffer = lbuf;
  			nextrdata->buffer_std = true;
  		}
  
+ 		if (BufferIsValid(prbuf))
+ 		{
+ 			nextrdata->next = nextrdata + 1;
+ 			nextrdata++;
+ 			nextrdata->data = NULL;
+ 			nextrdata->len = 0;
+ 			nextrdata->buffer = prbuf;
+ 			nextrdata->buffer_std = true;
+ 		}
+ 		nextrdata->next = NULL;
+ 
  		recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
  
  		if (BufferIsValid(metabuf))
***************
*** 1541,1546 **** _bt_pagedel(Relation rel, Buffer buf, BTStack stack)
--- 1552,1562 ----
  			page = BufferGetPage(lbuf);
  			PageSetLSN(page, recptr);
  		}
+ 		if (BufferIsValid(prbuf))
+ 		{
+ 			page = BufferGetPage(prbuf);
+ 			PageSetLSN(page, recptr);
+ 		}
  	}
  
  	END_CRIT_SECTION();
***************
*** 1551,1593 **** _bt_pagedel(Relation rel, Buffer buf, BTStack stack)
  		CacheInvalidateRelcache(rel);
  		_bt_relbuf(rel, metabuf);
  	}
! 	/* can always release leftsib immediately */
  	if (BufferIsValid(lbuf))
  		_bt_relbuf(rel, lbuf);
  
  	/*
  	 * If parent became half dead, recurse to delete it. Otherwise, if right
  	 * sibling is empty and is now the last child of the parent, recurse to
  	 * try to delete it.  (These cases cannot apply at the same time, though
  	 * the second case might itself recurse to the first.)
- 	 *
- 	 * When recursing to parent, we hold the lock on the target page until
- 	 * done.  This delays any insertions into the keyspace that was just
- 	 * effectively reassigned to the parent's right sibling.  If we allowed
- 	 * that, and there were enough such insertions before we finish deleting
- 	 * the parent, page splits within that keyspace could lead to inserting
- 	 * out-of-order keys into the grandparent level.  It is thought that that
- 	 * wouldn't have any serious consequences, but it still seems like a
- 	 * pretty bad idea.
  	 */
  	if (parent_half_dead)
  	{
  		/* recursive call will release pbuf */
  		_bt_relbuf(rel, rbuf);
  		result = _bt_pagedel(rel, pbuf, stack->bts_parent) + 1;
- 		_bt_relbuf(rel, buf);
  	}
  	else if (parent_one_child && rightsib_empty)
  	{
  		_bt_relbuf(rel, pbuf);
- 		_bt_relbuf(rel, buf);
  		/* recursive call will release rbuf */
  		result = _bt_pagedel(rel, rbuf, stack) + 1;
  	}
  	else
  	{
  		_bt_relbuf(rel, pbuf);
- 		_bt_relbuf(rel, buf);
  		_bt_relbuf(rel, rbuf);
  		result = 1;
  	}
--- 1567,1601 ----
  		CacheInvalidateRelcache(rel);
  		_bt_relbuf(rel, metabuf);
  	}
! 	/* can always release buffer and its left sibling immediately */
! 	_bt_relbuf(rel, buf);
  	if (BufferIsValid(lbuf))
  		_bt_relbuf(rel, lbuf);
+ 	if (BufferIsValid(prbuf))
+ 		_bt_relbuf(rel, prbuf);
  
  	/*
  	 * If parent became half dead, recurse to delete it. Otherwise, if right
  	 * sibling is empty and is now the last child of the parent, recurse to
  	 * try to delete it.  (These cases cannot apply at the same time, though
  	 * the second case might itself recurse to the first.)
  	 */
  	if (parent_half_dead)
  	{
  		/* recursive call will release pbuf */
  		_bt_relbuf(rel, rbuf);
+ 
  		result = _bt_pagedel(rel, pbuf, stack->bts_parent) + 1;
  	}
  	else if (parent_one_child && rightsib_empty)
  	{
  		_bt_relbuf(rel, pbuf);
  		/* recursive call will release rbuf */
  		result = _bt_pagedel(rel, rbuf, stack) + 1;
  	}
  	else
  	{
  		_bt_relbuf(rel, pbuf);
  		_bt_relbuf(rel, rbuf);
  		result = 1;
  	}
*** a/src/backend/access/nbtree/nbtsearch.c
--- b/src/backend/access/nbtree/nbtsearch.c
***************
*** 29,35 **** static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir,
  static void _bt_saveitem(BTScanOpaque so, int itemIndex,
  			 OffsetNumber offnum, IndexTuple itup);
  static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir);
- static Buffer _bt_walk_left(Relation rel, Buffer buf);
  static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir);
  
  
--- 29,34 ----
***************
*** 51,57 **** static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir);
   * NOTE that the returned buffer is read-locked regardless of the access
   * parameter.  However, access = BT_WRITE will allow an empty root page
   * to be created and returned.	When access = BT_READ, an empty index
!  * will result in *bufP being set to InvalidBuffer.
   */
  BTStack
  _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
--- 50,58 ----
   * NOTE that the returned buffer is read-locked regardless of the access
   * parameter.  However, access = BT_WRITE will allow an empty root page
   * to be created and returned.	When access = BT_READ, an empty index
!  * will result in *bufP being set to InvalidBuffer. Also, in BT_WRITE mode,
!  * any incomplete splits or half-dead pages encountered during the search
!  * will be finished.
   */
  BTStack
  _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
***************
*** 82,89 **** _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
  		 * Race -- the page we just grabbed may have split since we read its
  		 * pointer in the parent (or metapage).  If it has, we may need to
  		 * move right to its new sibling.  Do that.
  		 */
! 		*bufP = _bt_moveright(rel, *bufP, keysz, scankey, nextkey, BT_READ);
  
  		/* if this is a leaf page, we're done */
  		page = BufferGetPage(*bufP);
--- 83,95 ----
  		 * Race -- the page we just grabbed may have split since we read its
  		 * pointer in the parent (or metapage).  If it has, we may need to
  		 * move right to its new sibling.  Do that.
+ 		 *
+ 		 * In write-mode, allow _bt_moveright to finish any incomplete splits
+ 		 * along the way.
  		 */
! 		*bufP = _bt_moveright(rel, *bufP, keysz, scankey, nextkey,
! 							  (access == BT_WRITE), stack_in,
! 							  BT_READ);
  
  		/* if this is a leaf page, we're done */
  		page = BufferGetPage(*bufP);
***************
*** 148,153 **** _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
--- 154,164 ----
   * item >= scankey.  When nextkey is true, we are looking for the first
   * item strictly greater than scankey.
   *
+  * If forupdate is true, we will attempt to finish any incomplete splits
+  * or half-dead pages that we encounter. This is required when searching for
+  * a target page for an insertion, because we don't allow inserting on a
+  * page with incomplete actions. 'stack' is only used if forupdate is true.
+  *
   * On entry, we have the buffer pinned and a lock of the type specified by
   * 'access'.  If we move right, we release the buffer and lock and acquire
   * the same on the right sibling.  Return value is the buffer we stop at.
***************
*** 158,163 **** _bt_moveright(Relation rel,
--- 169,176 ----
  			  int keysz,
  			  ScanKey scankey,
  			  bool nextkey,
+ 			  bool forupdate,
+ 			  BTStack stack,
  			  int access)
  {
  	Page		page;
***************
*** 186,199 **** _bt_moveright(Relation rel,
  
  	while (!P_RIGHTMOST(opaque) &&
  		   (P_IGNORE(opaque) ||
  			_bt_compare(rel, keysz, scankey, page, P_HIKEY) >= cmpval))
  	{
! 		/* step right one page */
! 		BlockNumber rblkno = opaque->btpo_next;
  
! 		buf = _bt_relandgetbuf(rel, buf, rblkno, access);
! 		page = BufferGetPage(buf);
! 		opaque = (BTPageOpaque) PageGetSpecialPointer(page);
  	}
  
  	if (P_IGNORE(opaque))
--- 199,226 ----
  
  	while (!P_RIGHTMOST(opaque) &&
  		   (P_IGNORE(opaque) ||
+ 			(forupdate && P_NEEDS_FIXUP(opaque)) ||
  			_bt_compare(rel, keysz, scankey, page, P_HIKEY) >= cmpval))
  	{
! 		/*
! 		 * Finish any incomplete splits and remove any half-dead pages we
! 		 * encounter along the way.
! 		 */
! 		if (forupdate && P_NEEDS_FIXUP(opaque))
! 		{
! 			BlockNumber blkno = BufferGetBlockNumber(buf);
! 			_bt_fixup(rel, buf, stack);
! 			_bt_getbuf(rel, blkno, access);
! 		}
! 		else
! 		{
! 			/* step right one page */
! 			BlockNumber rblkno = opaque->btpo_next;
  
! 			buf = _bt_relandgetbuf(rel, buf, rblkno, access);
! 			page = BufferGetPage(buf);
! 			opaque = (BTPageOpaque) PageGetSpecialPointer(page);
! 		}
  	}
  
  	if (P_IGNORE(opaque))
***************
*** 1314,1320 **** _bt_steppage(IndexScanDesc scan, ScanDirection dir)
   * to be half-dead; the caller should check that condition and step left
   * again if it's important.
   */
! static Buffer
  _bt_walk_left(Relation rel, Buffer buf)
  {
  	Page		page;
--- 1341,1347 ----
   * to be half-dead; the caller should check that condition and step left
   * again if it's important.
   */
! Buffer
  _bt_walk_left(Relation rel, Buffer buf)
  {
  	Page		page;
*** a/src/backend/access/nbtree/nbtxlog.c
--- b/src/backend/access/nbtree/nbtxlog.c
***************
*** 21,122 ****
  #include "miscadmin.h"
  
  /*
-  * We must keep track of expected insertions due to page splits, and apply
-  * them manually if they are not seen in the WAL log during replay.  This
-  * makes it safe for page insertion to be a multiple-WAL-action process.
-  *
-  * Similarly, deletion of an only child page and deletion of its parent page
-  * form multiple WAL log entries, and we have to be prepared to follow through
-  * with the deletion if the log ends between.
-  *
-  * The data structure is a simple linked list --- this should be good enough,
-  * since we don't expect a page split or multi deletion to remain incomplete
-  * for long.  In any case we need to respect the order of operations.
-  */
- typedef struct bt_incomplete_action
- {
- 	RelFileNode node;			/* the index */
- 	bool		is_split;		/* T = pending split, F = pending delete */
- 	/* these fields are for a split: */
- 	bool		is_root;		/* we split the root */
- 	BlockNumber leftblk;		/* left half of split */
- 	BlockNumber rightblk;		/* right half of split */
- 	/* these fields are for a delete: */
- 	BlockNumber delblk;			/* parent block to be deleted */
- } bt_incomplete_action;
- 
- static List *incomplete_actions;
- 
- 
- static void
- log_incomplete_split(RelFileNode node, BlockNumber leftblk,
- 					 BlockNumber rightblk, bool is_root)
- {
- 	bt_incomplete_action *action = palloc(sizeof(bt_incomplete_action));
- 
- 	action->node = node;
- 	action->is_split = true;
- 	action->is_root = is_root;
- 	action->leftblk = leftblk;
- 	action->rightblk = rightblk;
- 	incomplete_actions = lappend(incomplete_actions, action);
- }
- 
- static void
- forget_matching_split(RelFileNode node, BlockNumber downlink, bool is_root)
- {
- 	ListCell   *l;
- 
- 	foreach(l, incomplete_actions)
- 	{
- 		bt_incomplete_action *action = (bt_incomplete_action *) lfirst(l);
- 
- 		if (RelFileNodeEquals(node, action->node) &&
- 			action->is_split &&
- 			downlink == action->rightblk)
- 		{
- 			if (is_root != action->is_root)
- 				elog(LOG, "forget_matching_split: fishy is_root data (expected %d, got %d)",
- 					 action->is_root, is_root);
- 			incomplete_actions = list_delete_ptr(incomplete_actions, action);
- 			pfree(action);
- 			break;				/* need not look further */
- 		}
- 	}
- }
- 
- static void
- log_incomplete_deletion(RelFileNode node, BlockNumber delblk)
- {
- 	bt_incomplete_action *action = palloc(sizeof(bt_incomplete_action));
- 
- 	action->node = node;
- 	action->is_split = false;
- 	action->delblk = delblk;
- 	incomplete_actions = lappend(incomplete_actions, action);
- }
- 
- static void
- forget_matching_deletion(RelFileNode node, BlockNumber delblk)
- {
- 	ListCell   *l;
- 
- 	foreach(l, incomplete_actions)
- 	{
- 		bt_incomplete_action *action = (bt_incomplete_action *) lfirst(l);
- 
- 		if (RelFileNodeEquals(node, action->node) &&
- 			!action->is_split &&
- 			delblk == action->delblk)
- 		{
- 			incomplete_actions = list_delete_ptr(incomplete_actions, action);
- 			pfree(action);
- 			break;				/* need not look further */
- 		}
- 	}
- }
- 
- /*
   * _bt_restore_page -- re-enter all the index tuples on a page
   *
   * The page is freshly init'd, and *from (length len) is a copy of what
--- 21,26 ----
***************
*** 190,212 **** _bt_restore_meta(RelFileNode rnode, XLogRecPtr lsn,
  	UnlockReleaseBuffer(metabuf);
  }
  
  static void
  btree_xlog_insert(bool isleaf, bool ismeta,
  				  XLogRecPtr lsn, XLogRecord *record)
  {
  	xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record);
  	Buffer		buffer;
  	Page		page;
  	char	   *datapos;
  	int			datalen;
  	xl_btree_metadata md;
! 	BlockNumber downlink = 0;
  
  	datapos = (char *) xlrec + SizeOfBtreeInsert;
  	datalen = record->xl_len - SizeOfBtreeInsert;
! 	if (!isleaf)
  	{
! 		memcpy(&downlink, datapos, sizeof(BlockNumber));
  		datapos += sizeof(BlockNumber);
  		datalen -= sizeof(BlockNumber);
  	}
--- 94,153 ----
  	UnlockReleaseBuffer(metabuf);
  }
  
+ /*
+  * _bt_clear_incomplete_split -- clear INCOMPLETE_SPLIT flag on a page
+  *
+  * This is a common subroutine of the redo functions of all the WAL record
+  * types that can insert a downlink: insert, split, and newroot.
+  */
+ static void
+ _bt_clear_incomplete_split(XLogRecPtr lsn, XLogRecord *record,
+ 						   RelFileNode rnode, BlockNumber cblock)
+ {
+ 	Buffer buf;
+ 
+ 	buf = XLogReadBuffer(rnode, cblock, false);
+ 	if (BufferIsValid(buf))
+ 	{
+ 		Page		page = (Page) BufferGetPage(buf);
+ 
+ 		if (lsn > PageGetLSN(page))
+ 		{
+ 			BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+ 			Assert((pageop->btpo_flags & BTP_INCOMPLETE_SPLIT) != 0);
+ 			pageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
+ 
+ 			PageSetLSN(page, lsn);
+ 			MarkBufferDirty(buf);
+ 		}
+ 		UnlockReleaseBuffer(buf);
+ 	}
+ }
+ 
  static void
  btree_xlog_insert(bool isleaf, bool ismeta,
  				  XLogRecPtr lsn, XLogRecord *record)
  {
  	xl_btree_insert *xlrec = (xl_btree_insert *) XLogRecGetData(record);
  	Buffer		buffer;
+ 	Buffer		cbuffer = InvalidBuffer;
  	Page		page;
  	char	   *datapos;
  	int			datalen;
  	xl_btree_metadata md;
! 	BlockNumber cblkno = 0;
! 	int			main_blk_index;
  
  	datapos = (char *) xlrec + SizeOfBtreeInsert;
  	datalen = record->xl_len - SizeOfBtreeInsert;
! 	/*
! 	 * if this insert finishes a split at lower level, extract the block
! 	 * number of the (left) child.
! 	 */
! 	if (!isleaf && (record->xl_info & XLR_BKP_BLOCK(0)) == 0)
  	{
! 		memcpy(&cblkno, datapos, sizeof(BlockNumber));
! 		Assert(cblkno != 0);
  		datapos += sizeof(BlockNumber);
  		datalen -= sizeof(BlockNumber);
  	}
***************
*** 217,224 **** btree_xlog_insert(bool isleaf, bool ismeta,
  		datalen -= sizeof(xl_btree_metadata);
  	}
  
! 	if (record->xl_info & XLR_BKP_BLOCK(0))
! 		(void) RestoreBackupBlock(lsn, record, 0, false, false);
  	else
  	{
  		buffer = XLogReadBuffer(xlrec->target.node,
--- 158,176 ----
  		datalen -= sizeof(xl_btree_metadata);
  	}
  
! 	if (!isleaf)
! 	{
! 		if (record->xl_info & XLR_BKP_BLOCK(0))
! 			(void) RestoreBackupBlock(lsn, record, 0, false, false);
! 		else
! 			_bt_clear_incomplete_split(lsn, record, xlrec->target.node, cblkno);
! 		main_blk_index = 1;
! 	}
! 	else
! 		main_blk_index = 0;
! 
! 	if (record->xl_info & XLR_BKP_BLOCK(main_blk_index))
! 		(void) RestoreBackupBlock(lsn, record, main_blk_index, false, false);
  	else
  	{
  		buffer = XLogReadBuffer(xlrec->target.node,
***************
*** 228,238 **** btree_xlog_insert(bool isleaf, bool ismeta,
  		{
  			page = (Page) BufferGetPage(buffer);
  
! 			if (lsn <= PageGetLSN(page))
! 			{
! 				UnlockReleaseBuffer(buffer);
! 			}
! 			else
  			{
  				if (PageAddItem(page, (Item) datapos, datalen,
  							ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
--- 180,186 ----
  		{
  			page = (Page) BufferGetPage(buffer);
  
! 			if (lsn > PageGetLSN(page))
  			{
  				if (PageAddItem(page, (Item) datapos, datalen,
  							ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
***************
*** 241,251 **** btree_xlog_insert(bool isleaf, bool ismeta,
  
  				PageSetLSN(page, lsn);
  				MarkBufferDirty(buffer);
- 				UnlockReleaseBuffer(buffer);
  			}
  		}
  	}
  
  	/*
  	 * Note: in normal operation, we'd update the metapage while still holding
  	 * lock on the page we inserted into.  But during replay it's not
--- 189,202 ----
  
  				PageSetLSN(page, lsn);
  				MarkBufferDirty(buffer);
  			}
+ 			UnlockReleaseBuffer(buffer);
  		}
  	}
  
+ 	if (BufferIsValid(cbuffer))
+ 		UnlockReleaseBuffer(cbuffer);
+ 
  	/*
  	 * Note: in normal operation, we'd update the metapage while still holding
  	 * lock on the page we inserted into.  But during replay it's not
***************
*** 257,266 **** btree_xlog_insert(bool isleaf, bool ismeta,
  		_bt_restore_meta(xlrec->target.node, lsn,
  						 md.root, md.level,
  						 md.fastroot, md.fastlevel);
- 
- 	/* Forget any split this insertion completes */
- 	if (!isleaf)
- 		forget_matching_split(xlrec->target.node, downlink, false);
  }
  
  static void
--- 208,213 ----
***************
*** 268,273 **** btree_xlog_split(bool onleft, bool isroot,
--- 215,222 ----
  				 XLogRecPtr lsn, XLogRecord *record)
  {
  	xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
+ 	bool		isleaf = (xlrec->level == 0);
+ 	Buffer		lbuf;
  	Buffer		rbuf;
  	Page		rpage;
  	BTPageOpaque ropaque;
***************
*** 278,319 **** btree_xlog_split(bool onleft, bool isroot,
  	Size		newitemsz = 0;
  	Item		left_hikey = NULL;
  	Size		left_hikeysz = 0;
  
  	datapos = (char *) xlrec + SizeOfBtreeSplit;
  	datalen = record->xl_len - SizeOfBtreeSplit;
  
! 	/* Forget any split this insertion completes */
! 	if (xlrec->level > 0)
! 	{
! 		/* we assume SizeOfBtreeSplit is at least 16-bit aligned */
! 		BlockNumber downlink = BlockIdGetBlockNumber((BlockId) datapos);
! 
! 		datapos += sizeof(BlockIdData);
! 		datalen -= sizeof(BlockIdData);
! 
! 		forget_matching_split(xlrec->node, downlink, false);
! 
! 		/* Extract left hikey and its size (still assuming 16-bit alignment) */
! 		if (!(record->xl_info & XLR_BKP_BLOCK(0)))
! 		{
! 			/* We assume 16-bit alignment is enough for IndexTupleSize */
! 			left_hikey = (Item) datapos;
! 			left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey));
! 
! 			datapos += left_hikeysz;
! 			datalen -= left_hikeysz;
! 		}
! 	}
! 
! 	/* Extract newitem and newitemoff, if present */
  	if (onleft)
  	{
- 		/* Extract the offset (still assuming 16-bit alignment) */
  		memcpy(&newitemoff, datapos, sizeof(OffsetNumber));
  		datapos += sizeof(OffsetNumber);
  		datalen -= sizeof(OffsetNumber);
  	}
- 
  	if (onleft && !(record->xl_info & XLR_BKP_BLOCK(0)))
  	{
  		/*
--- 227,244 ----
  	Size		newitemsz = 0;
  	Item		left_hikey = NULL;
  	Size		left_hikeysz = 0;
+ 	BlockNumber cblkno = InvalidBlockNumber;
  
  	datapos = (char *) xlrec + SizeOfBtreeSplit;
  	datalen = record->xl_len - SizeOfBtreeSplit;
  
! 	/* Extract newitemoff and newitem, if present */
  	if (onleft)
  	{
  		memcpy(&newitemoff, datapos, sizeof(OffsetNumber));
  		datapos += sizeof(OffsetNumber);
  		datalen -= sizeof(OffsetNumber);
  	}
  	if (onleft && !(record->xl_info & XLR_BKP_BLOCK(0)))
  	{
  		/*
***************
*** 327,332 **** btree_xlog_split(bool onleft, bool isroot,
--- 252,288 ----
  		datalen -= newitemsz;
  	}
  
+ 	/* Extract left hikey and its size (still assuming 16-bit alignment) */
+ 	if (!isleaf && !(record->xl_info & XLR_BKP_BLOCK(0)))
+ 	{
+ 		left_hikey = (Item) datapos;
+ 		left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey));
+ 		datapos += left_hikeysz;
+ 		datalen -= left_hikeysz;
+ 	}
+ 	/*
+ 	 * If this insertion finishes an incomplete split, get the block number
+ 	 * of the child.
+ 	 */
+ 	if (!isleaf && !(record->xl_info & XLR_BKP_BLOCK(1)))
+ 	{
+ 		memcpy(&cblkno, datapos, sizeof(BlockNumber));
+ 		datapos += sizeof(BlockNumber);
+ 		datalen -= sizeof(BlockNumber);
+ 	}
+ 
+ 	/*
+ 	 * Clear the incomplete split flag on the left sibling of the child page
+ 	 * this is a downlink for.
+ 	 */
+ 	if (!isleaf)
+ 	{
+ 		if (record->xl_info & XLR_BKP_BLOCK(2))
+ 			(void) RestoreBackupBlock(lsn, record, 2, false, false);
+ 		else
+ 			_bt_clear_incomplete_split(lsn, record, xlrec->node, cblkno);
+ 	}
+ 
  	/* Reconstruct right (new) sibling page from scratch */
  	rbuf = XLogReadBuffer(xlrec->node, xlrec->rightsib, true);
  	Assert(BufferIsValid(rbuf));
***************
*** 338,344 **** btree_xlog_split(bool onleft, bool isroot,
  	ropaque->btpo_prev = xlrec->leftsib;
  	ropaque->btpo_next = xlrec->rnext;
  	ropaque->btpo.level = xlrec->level;
! 	ropaque->btpo_flags = (xlrec->level == 0) ? BTP_LEAF : 0;
  	ropaque->btpo_cycleid = 0;
  
  	_bt_restore_page(rpage, datapos, datalen);
--- 294,300 ----
  	ropaque->btpo_prev = xlrec->leftsib;
  	ropaque->btpo_next = xlrec->rnext;
  	ropaque->btpo.level = xlrec->level;
! 	ropaque->btpo_flags = isleaf ? BTP_LEAF : 0;
  	ropaque->btpo_cycleid = 0;
  
  	_bt_restore_page(rpage, datapos, datalen);
***************
*** 347,353 **** btree_xlog_split(bool onleft, bool isroot,
  	 * On leaf level, the high key of the left page is equal to the first key
  	 * on the right page.
  	 */
! 	if (xlrec->level == 0)
  	{
  		ItemId		hiItemId = PageGetItemId(rpage, P_FIRSTDATAKEY(ropaque));
  
--- 303,309 ----
  	 * On leaf level, the high key of the left page is equal to the first key
  	 * on the right page.
  	 */
! 	if (isleaf)
  	{
  		ItemId		hiItemId = PageGetItemId(rpage, P_FIRSTDATAKEY(ropaque));
  
***************
*** 362,371 **** btree_xlog_split(bool onleft, bool isroot,
  
  	/* Now reconstruct left (original) sibling page */
  	if (record->xl_info & XLR_BKP_BLOCK(0))
! 		(void) RestoreBackupBlock(lsn, record, 0, false, false);
  	else
  	{
! 		Buffer		lbuf = XLogReadBuffer(xlrec->node, xlrec->leftsib, false);
  
  		if (BufferIsValid(lbuf))
  		{
--- 318,327 ----
  
  	/* Now reconstruct left (original) sibling page */
  	if (record->xl_info & XLR_BKP_BLOCK(0))
! 		lbuf = RestoreBackupBlock(lsn, record, 0, false, true);
  	else
  	{
! 		lbuf = XLogReadBuffer(xlrec->node, xlrec->leftsib, false);
  
  		if (BufferIsValid(lbuf))
  		{
***************
*** 422,440 **** btree_xlog_split(bool onleft, bool isroot,
  					elog(PANIC, "failed to add high key to left page after split");
  
  				/* Fix opaque fields */
! 				lopaque->btpo_flags = (xlrec->level == 0) ? BTP_LEAF : 0;
  				lopaque->btpo_next = xlrec->rightsib;
  				lopaque->btpo_cycleid = 0;
  
  				PageSetLSN(lpage, lsn);
  				MarkBufferDirty(lbuf);
  			}
- 
- 			UnlockReleaseBuffer(lbuf);
  		}
  	}
  
! 	/* We no longer need the right buffer */
  	UnlockReleaseBuffer(rbuf);
  
  	/*
--- 378,398 ----
  					elog(PANIC, "failed to add high key to left page after split");
  
  				/* Fix opaque fields */
! 				lopaque->btpo_flags = BTP_INCOMPLETE_SPLIT;
! 				if (isleaf)
! 					lopaque->btpo_flags |= BTP_LEAF;
  				lopaque->btpo_next = xlrec->rightsib;
  				lopaque->btpo_cycleid = 0;
  
  				PageSetLSN(lpage, lsn);
  				MarkBufferDirty(lbuf);
  			}
  		}
  	}
  
! 	/* We no longer need the buffers */
! 	if (BufferIsValid(lbuf))
! 		UnlockReleaseBuffer(lbuf);
  	UnlockReleaseBuffer(rbuf);
  
  	/*
***************
*** 445,476 **** btree_xlog_split(bool onleft, bool isroot,
  	 * replay, because no other index update can be in progress, and readers
  	 * will cope properly when following an obsolete left-link.
  	 */
! 	if (record->xl_info & XLR_BKP_BLOCK(1))
! 		(void) RestoreBackupBlock(lsn, record, 1, false, false);
! 	else if (xlrec->rnext != P_NONE)
  	{
! 		Buffer		buffer = XLogReadBuffer(xlrec->node, xlrec->rnext, false);
  
! 		if (BufferIsValid(buffer))
  		{
! 			Page		page = (Page) BufferGetPage(buffer);
  
! 			if (lsn > PageGetLSN(page))
  			{
! 				BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
  
! 				pageop->btpo_prev = xlrec->rightsib;
  
! 				PageSetLSN(page, lsn);
! 				MarkBufferDirty(buffer);
  			}
- 			UnlockReleaseBuffer(buffer);
  		}
  	}
- 
- 	/* The job ain't done till the parent link is inserted... */
- 	log_incomplete_split(xlrec->node,
- 						 xlrec->leftsib, xlrec->rightsib, isroot);
  }
  
  static void
--- 403,441 ----
  	 * replay, because no other index update can be in progress, and readers
  	 * will cope properly when following an obsolete left-link.
  	 */
! 	if (xlrec->rnext != P_NONE)
  	{
! 		/*
! 		 * the backup block containing right sibling is 2 or 3, depending
! 		 * whether this was a leaf or internal page.
! 		 */
! 		int		rnext_index = isleaf ? 2 : 3;
  
! 		if (record->xl_info & XLR_BKP_BLOCK(rnext_index))
! 			(void) RestoreBackupBlock(lsn, record, rnext_index, false, false);
! 		else
  		{
! 			Buffer		buffer;
  
! 			buffer = XLogReadBuffer(xlrec->node, xlrec->rnext, false);
! 
! 			if (BufferIsValid(buffer))
  			{
! 				Page		page = (Page) BufferGetPage(buffer);
  
! 				if (lsn > PageGetLSN(page))
! 				{
! 					BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page);
  
! 					pageop->btpo_prev = xlrec->rightsib;
! 
! 					PageSetLSN(page, lsn);
! 					MarkBufferDirty(buffer);
! 				}
! 				UnlockReleaseBuffer(buffer);
  			}
  		}
  	}
  }
  
  static void
***************
*** 850,856 **** btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
  		}
  	}
  
! 	/* Fix left-link of right sibling */
  	if (record->xl_info & XLR_BKP_BLOCK(1))
  		(void) RestoreBackupBlock(lsn, record, 1, false, false);
  	else
--- 815,821 ----
  		}
  	}
  
! 	/* Fix left-link and flags of right sibling */
  	if (record->xl_info & XLR_BKP_BLOCK(1))
  		(void) RestoreBackupBlock(lsn, record, 1, false, false);
  	else
***************
*** 867,872 **** btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
--- 832,841 ----
  			{
  				pageop = (BTPageOpaque) PageGetSpecialPointer(page);
  				pageop->btpo_prev = leftsib;
+ 				if (xlrec->flags & DP_LEFT_IS_HALF_DEAD)
+ 					pageop->btpo_flags |= BTP_LEFT_HALF_DEAD;
+ 				else
+ 					pageop->btpo_flags &= ~BTP_LEFT_HALF_DEAD;
  
  				PageSetLSN(page, lsn);
  				MarkBufferDirty(buffer);
***************
*** 876,886 **** btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
  	}
  
  	/* Fix right-link of left sibling, if any */
! 	if (record->xl_info & XLR_BKP_BLOCK(2))
! 		(void) RestoreBackupBlock(lsn, record, 2, false, false);
! 	else
  	{
! 		if (leftsib != P_NONE)
  		{
  			buffer = XLogReadBuffer(xlrec->target.node, leftsib, false);
  			if (BufferIsValid(buffer))
--- 845,855 ----
  	}
  
  	/* Fix right-link of left sibling, if any */
! 	if (leftsib != P_NONE)
  	{
! 		if (record->xl_info & XLR_BKP_BLOCK(2))
! 			(void) RestoreBackupBlock(lsn, record, 2, false, false);
! 		else
  		{
  			buffer = XLogReadBuffer(xlrec->target.node, leftsib, false);
  			if (BufferIsValid(buffer))
***************
*** 903,908 **** btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
--- 872,910 ----
  		}
  	}
  
+ 	/*
+ 	 * Set LEFT_HALF_DEAD flag in the right sibling of the parent page that's
+ 	 * now half-dead.
+ 	 */
+ 	if (info == XLOG_BTREE_DELETE_PAGE_HALF)
+ 	{
+ 		int		blkidx = (leftsib == P_NONE) ? 2 : 3;
+ 		if (record->xl_info & XLR_BKP_BLOCK(blkidx))
+ 			(void) RestoreBackupBlock(lsn, record, blkidx, false, false);
+ 		else
+ 		{
+ 			buffer = XLogReadBuffer(xlrec->target.node, xlrec->parentright, false);
+ 			if (BufferIsValid(buffer))
+ 			{
+ 				page = (Page) BufferGetPage(buffer);
+ 				if (lsn <= PageGetLSN(page))
+ 				{
+ 					UnlockReleaseBuffer(buffer);
+ 				}
+ 				else
+ 				{
+ 					pageop = (BTPageOpaque) PageGetSpecialPointer(page);
+ 					pageop->btpo_flags |= BTP_LEFT_HALF_DEAD;
+ 
+ 					PageSetLSN(page, lsn);
+ 					MarkBufferDirty(buffer);
+ 					UnlockReleaseBuffer(buffer);
+ 				}
+ 			}
+ 		}
+ 
+ 	}
+ 
  	/* Rewrite target page as empty deleted page */
  	buffer = XLogReadBuffer(xlrec->target.node, target, true);
  	Assert(BufferIsValid(buffer));
***************
*** 932,944 **** btree_xlog_delete_page(uint8 info, XLogRecPtr lsn, XLogRecord *record)
  						 md.root, md.level,
  						 md.fastroot, md.fastlevel);
  	}
- 
- 	/* Forget any completed deletion */
- 	forget_matching_deletion(xlrec->target.node, target);
- 
- 	/* If parent became half-dead, remember it for deletion */
- 	if (info == XLOG_BTREE_DELETE_PAGE_HALF)
- 		log_incomplete_deletion(xlrec->target.node, parent);
  }
  
  static void
--- 934,939 ----
***************
*** 948,957 **** btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
  	Buffer		buffer;
  	Page		page;
  	BTPageOpaque pageop;
! 	BlockNumber downlink = 0;
! 
! 	/* Backup blocks are not used in newroot records */
! 	Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
  
  	buffer = XLogReadBuffer(xlrec->node, xlrec->rootblk, true);
  	Assert(BufferIsValid(buffer));
--- 943,949 ----
  	Buffer		buffer;
  	Page		page;
  	BTPageOpaque pageop;
! 	BlockNumber cblkno;
  
  	buffer = XLogReadBuffer(xlrec->node, xlrec->rootblk, true);
  	Assert(BufferIsValid(buffer));
***************
*** 974,983 **** btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
  		_bt_restore_page(page,
  						 (char *) xlrec + SizeOfBtreeNewroot,
  						 record->xl_len - SizeOfBtreeNewroot);
! 		/* extract downlink to the right-hand split page */
! 		itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, P_FIRSTKEY));
! 		downlink = ItemPointerGetBlockNumber(&(itup->t_tid));
  		Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
  	}
  
  	PageSetLSN(page, lsn);
--- 966,981 ----
  		_bt_restore_page(page,
  						 (char *) xlrec + SizeOfBtreeNewroot,
  						 record->xl_len - SizeOfBtreeNewroot);
! 		/* extract block number of the left-hand split page */
! 		itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, P_HIKEY));
! 		cblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
  		Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
+ 
+ 		/* Clear the incomplete-split flag in left child */
+ 		if (record->xl_info & XLR_BKP_BLOCK(0))
+ 			(void) RestoreBackupBlock(lsn, record, 0, false, false);
+ 		else
+ 			_bt_clear_incomplete_split(lsn, record, xlrec->node, cblkno);
  	}
  
  	PageSetLSN(page, lsn);
***************
*** 987,996 **** btree_xlog_newroot(XLogRecPtr lsn, XLogRecord *record)
  	_bt_restore_meta(xlrec->node, lsn,
  					 xlrec->rootblk, xlrec->level,
  					 xlrec->rootblk, xlrec->level);
- 
- 	/* Check to see if this satisfies any incomplete insertions */
- 	if (record->xl_len > SizeOfBtreeNewroot)
- 		forget_matching_split(xlrec->node, downlink, true);
  }
  
  static void
--- 985,990 ----
***************
*** 1068,1146 **** btree_redo(XLogRecPtr lsn, XLogRecord *record)
  			elog(PANIC, "btree_redo: unknown op code %u", info);
  	}
  }
- 
- void
- btree_xlog_startup(void)
- {
- 	incomplete_actions = NIL;
- }
- 
- void
- btree_xlog_cleanup(void)
- {
- 	ListCell   *l;
- 
- 	foreach(l, incomplete_actions)
- 	{
- 		bt_incomplete_action *action = (bt_incomplete_action *) lfirst(l);
- 
- 		if (action->is_split)
- 		{
- 			/* finish an incomplete split */
- 			Buffer		lbuf,
- 						rbuf;
- 			Page		lpage,
- 						rpage;
- 			BTPageOpaque lpageop,
- 						rpageop;
- 			bool		is_only;
- 			Relation	reln;
- 
- 			lbuf = XLogReadBuffer(action->node, action->leftblk, false);
- 			/* failure is impossible because we wrote this page earlier */
- 			if (!BufferIsValid(lbuf))
- 				elog(PANIC, "btree_xlog_cleanup: left block unfound");
- 			lpage = (Page) BufferGetPage(lbuf);
- 			lpageop = (BTPageOpaque) PageGetSpecialPointer(lpage);
- 			rbuf = XLogReadBuffer(action->node, action->rightblk, false);
- 			/* failure is impossible because we wrote this page earlier */
- 			if (!BufferIsValid(rbuf))
- 				elog(PANIC, "btree_xlog_cleanup: right block unfound");
- 			rpage = (Page) BufferGetPage(rbuf);
- 			rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage);
- 
- 			/* if the pages are all of their level, it's a only-page split */
- 			is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop);
- 
- 			reln = CreateFakeRelcacheEntry(action->node);
- 			_bt_insert_parent(reln, lbuf, rbuf, NULL,
- 							  action->is_root, is_only);
- 			FreeFakeRelcacheEntry(reln);
- 		}
- 		else
- 		{
- 			/* finish an incomplete deletion (of a half-dead page) */
- 			Buffer		buf;
- 
- 			buf = XLogReadBuffer(action->node, action->delblk, false);
- 			if (BufferIsValid(buf))
- 			{
- 				Relation	reln;
- 
- 				reln = CreateFakeRelcacheEntry(action->node);
- 				if (_bt_pagedel(reln, buf, NULL) == 0)
- 					elog(PANIC, "btree_xlog_cleanup: _bt_pagedel failed");
- 				FreeFakeRelcacheEntry(reln);
- 			}
- 		}
- 	}
- 	incomplete_actions = NIL;
- }
- 
- bool
- btree_safe_restartpoint(void)
- {
- 	if (incomplete_actions)
- 		return false;
- 	return true;
- }
--- 1062,1064 ----
*** a/src/include/access/nbtree.h
--- b/src/include/access/nbtree.h
***************
*** 73,78 **** typedef BTPageOpaqueData *BTPageOpaque;
--- 73,80 ----
  #define BTP_HALF_DEAD	(1 << 4)	/* empty, but still in tree */
  #define BTP_SPLIT_END	(1 << 5)	/* rightmost page of split group */
  #define BTP_HAS_GARBAGE (1 << 6)	/* page has LP_DEAD tuples */
+ #define BTP_INCOMPLETE_SPLIT (1 << 7) /* right sibling's downlink is missing */
+ #define BTP_LEFT_HALF_DEAD (1 << 8)	/* left sibling is half-dead */
  
  /*
   * The max allowed value of a cycle ID is a bit less than 64K.	This is
***************
*** 178,183 **** typedef struct BTMetaPageData
--- 180,189 ----
  #define P_ISHALFDEAD(opaque)	((opaque)->btpo_flags & BTP_HALF_DEAD)
  #define P_IGNORE(opaque)		((opaque)->btpo_flags & (BTP_DELETED|BTP_HALF_DEAD))
  #define P_HAS_GARBAGE(opaque)	((opaque)->btpo_flags & BTP_HAS_GARBAGE)
+ #define P_INCOMPLETE_SPLIT(opaque)	((opaque)->btpo_flags & BTP_INCOMPLETE_SPLIT)
+ #define P_LEFT_HALF_DEAD(opaque) ((opaque)->btpo_flags & BTP_LEFT_HALF_DEAD)
+ #define P_NEEDS_FIXUP(opaque)	\
+ 	((opaque)->btpo_flags & (BTP_INCOMPLETE_SPLIT | BTP_LEFT_HALF_DEAD))
  
  /*
   *	Lehman and Yao's algorithm requires a ``high key'' on every non-rightmost
***************
*** 254,260 **** typedef struct xl_btree_metadata
  typedef struct xl_btree_insert
  {
  	xl_btreetid target;			/* inserted tuple id */
! 	/* BlockNumber downlink field FOLLOWS IF NOT XLOG_BTREE_INSERT_LEAF */
  	/* xl_btree_metadata FOLLOWS IF XLOG_BTREE_INSERT_META */
  	/* INDEX TUPLE FOLLOWS AT END OF STRUCT */
  } xl_btree_insert;
--- 260,266 ----
  typedef struct xl_btree_insert
  {
  	xl_btreetid target;			/* inserted tuple id */
! 	/* BlockNumber finishes_split field FOLLOWS IF NOT XLOG_BTREE_INSERT_LEAF */
  	/* xl_btree_metadata FOLLOWS IF XLOG_BTREE_INSERT_META */
  	/* INDEX TUPLE FOLLOWS AT END OF STRUCT */
  } xl_btree_insert;
***************
*** 287,305 **** typedef struct xl_btree_split
  	OffsetNumber firstright;	/* first item moved to right page */
  
  	/*
! 	 * If level > 0, BlockIdData downlink follows.	(We use BlockIdData rather
! 	 * than BlockNumber for alignment reasons: SizeOfBtreeSplit is only 16-bit
! 	 * aligned.)
  	 *
  	 * If level > 0, an IndexTuple representing the HIKEY of the left page
  	 * follows.  We don't need this on leaf pages, because it's the same as
  	 * the leftmost key in the new right page.	Also, it's suppressed if
  	 * XLogInsert chooses to store the left page's whole page image.
  	 *
! 	 * In the _L variants, next are OffsetNumber newitemoff and the new item.
! 	 * (In the _R variants, the new item is one of the right page's tuples.)
! 	 * The new item, but not newitemoff, is suppressed if XLogInsert chooses
! 	 * to store the left page's whole page image.
  	 *
  	 * Last are the right page's tuples in the form used by _bt_restore_page.
  	 */
--- 293,310 ----
  	OffsetNumber firstright;	/* first item moved to right page */
  
  	/*
! 	 * In the _L variants, next are OffsetNumber newitemoff and the new item.
! 	 * (In the _R variants, the new item is one of the right page's tuples.)
! 	 * The new item, but not newitemoff, is suppressed if XLogInsert chooses
! 	 * to store the left page's whole page image.
  	 *
  	 * If level > 0, an IndexTuple representing the HIKEY of the left page
  	 * follows.  We don't need this on leaf pages, because it's the same as
  	 * the leftmost key in the new right page.	Also, it's suppressed if
  	 * XLogInsert chooses to store the left page's whole page image.
  	 *
! 	 * If level > 0, BlockNumber of the page whose incomplete-split flag
! 	 * this insertion clears. (not aligned)
  	 *
  	 * Last are the right page's tuples in the form used by _bt_restore_page.
  	 */
***************
*** 379,393 **** typedef struct xl_btree_vacuum
--- 384,404 ----
  typedef struct xl_btree_delete_page
  {
  	xl_btreetid target;			/* deleted tuple id in parent page */
+ 	uint16		flags;			/* see below */
  	BlockNumber deadblk;		/* child block being deleted */
  	BlockNumber leftblk;		/* child block's left sibling, if any */
  	BlockNumber rightblk;		/* child block's right sibling */
+ 	BlockNumber parentright;	/* if parent is half-dead, its right sibling */
  	TransactionId btpo_xact;	/* value of btpo.xact for use in recovery */
  	/* xl_btree_metadata FOLLOWS IF XLOG_BTREE_DELETE_PAGE_META */
  } xl_btree_delete_page;
  
  #define SizeOfBtreeDeletePage	(offsetof(xl_btree_delete_page, btpo_xact) + sizeof(TransactionId))
  
+ /* Flags for xl_btree_delete_page */
+ #define DP_LEFT_IS_HALF_DEAD	0x01 /* deleted page's left sibling is
+ 									  *	half-dead */
+ 
  /*
   * New root log record.  There are zero tuples if this is to establish an
   * empty root, or two if it is the result of splitting an old root.
***************
*** 617,624 **** extern Datum btoptions(PG_FUNCTION_ARGS);
  extern bool _bt_doinsert(Relation rel, IndexTuple itup,
  			 IndexUniqueCheck checkUnique, Relation heapRel);
  extern Buffer _bt_getstackbuf(Relation rel, BTStack stack, int access);
! extern void _bt_insert_parent(Relation rel, Buffer buf, Buffer rbuf,
! 				  BTStack stack, bool is_root, bool is_only);
  
  /*
   * prototypes for functions in nbtpage.c
--- 628,634 ----
  extern bool _bt_doinsert(Relation rel, IndexTuple itup,
  			 IndexUniqueCheck checkUnique, Relation heapRel);
  extern Buffer _bt_getstackbuf(Relation rel, BTStack stack, int access);
! extern void _bt_fixup(Relation rel, Buffer bbuf, BTStack stack);
  
  /*
   * prototypes for functions in nbtpage.c
***************
*** 648,654 **** extern BTStack _bt_search(Relation rel,
  		   int keysz, ScanKey scankey, bool nextkey,
  		   Buffer *bufP, int access);
  extern Buffer _bt_moveright(Relation rel, Buffer buf, int keysz,
! 			  ScanKey scankey, bool nextkey, int access);
  extern OffsetNumber _bt_binsrch(Relation rel, Buffer buf, int keysz,
  			ScanKey scankey, bool nextkey);
  extern int32 _bt_compare(Relation rel, int keysz, ScanKey scankey,
--- 658,665 ----
  		   int keysz, ScanKey scankey, bool nextkey,
  		   Buffer *bufP, int access);
  extern Buffer _bt_moveright(Relation rel, Buffer buf, int keysz,
! 			  ScanKey scankey, bool nextkey,
! 			  bool finishsplits, BTStack stack,  int access);
  extern OffsetNumber _bt_binsrch(Relation rel, Buffer buf, int keysz,
  			ScanKey scankey, bool nextkey);
  extern int32 _bt_compare(Relation rel, int keysz, ScanKey scankey,
***************
*** 656,661 **** extern int32 _bt_compare(Relation rel, int keysz, ScanKey scankey,
--- 667,673 ----
  extern bool _bt_first(IndexScanDesc scan, ScanDirection dir);
  extern bool _bt_next(IndexScanDesc scan, ScanDirection dir);
  extern Buffer _bt_get_endpoint(Relation rel, uint32 level, bool rightmost);
+ extern Buffer _bt_walk_left(Relation rel, Buffer buf);
  
  /*
   * prototypes for functions in nbtutils.c
***************
*** 697,704 **** extern void _bt_leafbuild(BTSpool *btspool, BTSpool *spool2);
   */
  extern void btree_redo(XLogRecPtr lsn, XLogRecord *record);
  extern void btree_desc(StringInfo buf, uint8 xl_info, char *rec);
- extern void btree_xlog_startup(void);
- extern void btree_xlog_cleanup(void);
- extern bool btree_safe_restartpoint(void);
  
  #endif   /* NBTREE_H */
--- 709,713 ----
*** a/src/include/access/rmgrlist.h
--- b/src/include/access/rmgrlist.h
***************
*** 36,42 **** PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, NULL, NULL, NULL)
  PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, NULL, NULL, NULL)
  PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, NULL, NULL, NULL)
  PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, NULL, NULL, NULL)
! PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup, btree_safe_restartpoint)
  PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, NULL, NULL, NULL)
  PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup, gin_safe_restartpoint)
  PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup, NULL)
--- 36,42 ----
  PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, NULL, NULL, NULL)
  PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, NULL, NULL, NULL)
  PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, NULL, NULL, NULL)
! PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, NULL, NULL, NULL)
  PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, NULL, NULL, NULL)
  PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup, gin_safe_restartpoint)
  PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup, NULL)
