*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 42,47 ****
--- 42,48 ----
  #include "postmaster/startup.h"
  #include "replication/walreceiver.h"
  #include "replication/walsender.h"
+ #include "storage/barrier.h"
  #include "storage/bufmgr.h"
  #include "storage/fd.h"
  #include "storage/ipc.h"
***************
*** 290,315 **** static XLogRecPtr RedoStartLSN = {0, 0};
   * slightly different functions.
   *
   * We do a lot of pushups to minimize the amount of access to lockable
!  * shared memory values.  There are actually three shared-memory copies of
!  * LogwrtResult, plus one unshared copy in each backend.  Here's how it works:
!  *		XLogCtl->LogwrtResult is protected by info_lck
!  *		XLogCtl->Write.LogwrtResult is protected by WALWriteLock
!  *		XLogCtl->Insert.LogwrtResult is protected by WALInsertLock
!  * One must hold the associated lock to read or write any of these, but
!  * of course no lock is needed to read/write the unshared LogwrtResult.
!  *
!  * XLogCtl->LogwrtResult and XLogCtl->Write.LogwrtResult are both "always
!  * right", since both are updated by a write or flush operation before
!  * it releases WALWriteLock.  The point of keeping XLogCtl->Write.LogwrtResult
!  * is that it can be examined/modified by code that already holds WALWriteLock
!  * without needing to grab info_lck as well.
!  *
!  * XLogCtl->Insert.LogwrtResult may lag behind the reality of the other two,
!  * but is updated when convenient.	Again, it exists for the convenience of
!  * code that is already holding WALInsertLock but not the other locks.
!  *
!  * The unshared LogwrtResult may lag behind any or all of these, and again
!  * is updated when convenient.
   *
   * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
   * (protected by info_lck), but we don't need to cache any copies of it.
--- 291,301 ----
   * slightly different functions.
   *
   * We do a lot of pushups to minimize the amount of access to lockable
!  * shared memory values.  There is one shared-memory copy of LogwrtResult,
!  * plus one unshared copy in each backend. To read the shared copy, you need
!  * to hold info_lck *or* WALWriteLock. To update it, you need to hold both
!  * locks. The unshared LogwrtResult may lag behind the shared copy, and is
!  * updated when convenient.
   *
   * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
   * (protected by info_lck), but we don't need to cache any copies of it.
***************
*** 319,328 **** static XLogRecPtr RedoStartLSN = {0, 0};
   * values is "more up to date".
   *
   * info_lck is only held long enough to read/update the protected variables,
!  * so it's a plain spinlock.  The other locks are held longer (potentially
!  * over I/O operations), so we use LWLocks for them.  These locks are:
   *
!  * WALInsertLock: must be held to insert a record into the WAL buffers.
   *
   * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
   * XLogFlush).
--- 305,319 ----
   * values is "more up to date".
   *
   * info_lck is only held long enough to read/update the protected variables,
!  * so it's a plain spinlock.  insertpos_lck protects the current logical
!  * insert location, ie. the head of reserved WAL space.  The other locks are
!  * held longer (potentially over I/O operations), so we use LWLocks for them.
!  * These locks are:
   *
!  * WALBufMappingLock: must be held to replace a page in the WAL buffer cache.
!  * This is only held while initializing and changing the mapping. If the
!  * contents of the buffer being replaced haven't been written yet, the mapping
!  * lock is released while the write is done, and reacquired afterwards.
   *
   * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
   * XLogFlush).
***************
*** 334,339 **** static XLogRecPtr RedoStartLSN = {0, 0};
--- 325,399 ----
   * only one checkpointer at a time; currently, with all checkpoints done by
   * the checkpointer, this is just pro forma).
   *
+  *
+  * Inserting a new WAL record is a two-step process:
+  *
+  * 1. Reserve the right amount of space from the WAL, and the next insertion
+  *    slot to advertise that the insertion is in progress. The current head
+  *    of reserved space is kept in Insert->CurrPos, and is protected by
+  *    insertpos_lck. Try to keep this section as short as possible,
+  *    insertpos_lck can be heavily contended on a busy system
+  *
+  * 2. Copy the record to the reserved WAL space. This involves finding the
+  *    correct WAL buffer containing the reserved space, and copying the
+  *    record in place. This can be done concurrently in multiple processes.
+  *
+  * To allow as much parallelism as possible for step 2, we try hard to avoid
+  * lock contention in that code path. Each insertion is asssigned its own
+  * "XLog insertion slot", which is used to advertise the position the backend
+  * is writing to. The slot is marked as in-use in step 1, while holding
+  * insertpos_lck, by setting the position field in the slot. When the backend
+  * is finished with the insertion, it clears its slot. Each slot is protected
+  * by a separate spinlock, to keep contention minimal.
+  *
+  * The insertion slots also provide a mechanism to wait for an insertion to
+  * finish. This is important when an XLOG page is written out - any
+  * in-progress insertions must finish copying data to the page first, or the
+  * on-disk copy will be incomplete. Waiting is done by the
+  * WaitXLogInsertionsToFinish() function. It adds the current process to the
+  * waiting queue in the slot it needs to wait for, and when that insertion
+  * finishes (or proceeds to the next page, at least), the inserter wakes up
+  * the process.
+  *
+  * The insertion slots form a ring. Insert->nextslot points to the next free
+  * slot, and Insert->lastslot points to the last slot that's still in use.
+  * lastslot can lag behind reality by any number of slots, as long as nextslot
+  * doesn't catch up with it. lastslot is advanced by
+  * WaitXLogInsertionsToFinish(), and is protected by WALInsertTailLock.
+  * nextslot is advanced in ReserveXLogInsertLocation() and is protected by
+  * insertpos_lck. Both slot variables are 32-bit integers, so that they can
+  * be read atomically without holding a lock.
+  *
+  * Whenever the ring fills up, ie. when nextslot wraps around and catches up
+  * with lastslot, ReserveXLogInsertLocation() has to wait for the oldest
+  * insertion to finish and advance lastslot, to make room for the new
+  * insertion. This is done by WaitForXLogInsertionSlotToBecomeFree() function,
+  * which is similar to WaitXLogInsertionsToFinish(), but instead of waiting
+  * for all insertions up to a given point to finish, it just waits for the
+  * inserter in the oldest slot to finish.
+  *
+  *
+  * Deadlock analysis
+  * -----------------
+  *
+  * It's important to call WaitXLogInsertionsToFinish() *before* acquiring
+  * WALWriteLock. Otherwise you might get stuck waiting for an insertion to
+  * finish (or at least advance to next uninitialized page), while you're
+  * holding WALWriteLock. That would be bad, because the backend you're waiting
+  * for might need to acquire WALWriteLock, too, to evict an old buffer, so
+  * you'd get deadlock.
+  *
+  * WaitXLogInsertionsToFinish() will not get stuck indefinitely, as long as
+  * it's called with a location that's known to be already allocated in the WAL
+  * buffers. Calling it with the position of a record you've already inserted
+  * satisfies that condition, so the common pattern:
+  *
+  *   recptr = XLogInsert(...)
+  *   XLogFlush(recptr)
+  *
+  * is safe. It can't get stuck, because an insertion to a WAL page that's
+  * already initialized in cache can always proceed without waiting on a lock.
+  *
   *----------
   */
  
***************
*** 354,364 **** typedef struct XLogwrtResult
   */
  typedef struct XLogCtlInsert
  {
! 	XLogwrtResult LogwrtResult; /* a recent value of LogwrtResult */
! 	XLogRecPtr	PrevRecord;		/* start of previously-inserted record */
! 	int			curridx;		/* current block index in cache */
! 	XLogPageHeader currpage;	/* points to header of block in cache */
! 	char	   *currpos;		/* current insertion point in cache */
  	XLogRecPtr	RedoRecPtr;		/* current redo point for insertions */
  	bool		forcePageWrites;	/* forcing full-page writes for PITR? */
  
--- 414,443 ----
   */
  typedef struct XLogCtlInsert
  {
! 	slock_t		insertpos_lck;	/* protects all the fields in this struct
! 								 * (except lastslot). */
! 
! 	int32		nextslot;		/* next insertion slot to use */
! 	int32		lastslot;		/* last in-use insertion slot (protected by
! 								 * WALInsertTailLock) */
! 
! 	/*
! 	 * CurrPos is the very tip of the reserved WAL space at the moment. The
! 	 * next record will be inserted there (or somewhere after it if there's
! 	 * not enough space on the current page). PrevRecord points to the
! 	 * beginning of the last record already reserved. It might not be fully
! 	 * copied into place yet, but we know its exact location already.
! 	 */
! 	XLogRecPtr	CurrPos;
! 	XLogRecPtr	PrevRecord;
! 
! 	/*
! 	 * padding to push RedoRecPtr and forcePageWrites, which rarely change,
! 	 * to a different cache line than the rapidly-changing CurrPos and
! 	 * PrevRecord values. XXX: verify if this makes any difference
! 	 */
! 	char		padding[128];
! 
  	XLogRecPtr	RedoRecPtr;		/* current redo point for insertions */
  	bool		forcePageWrites;	/* forcing full-page writes for PITR? */
  
***************
*** 388,406 **** typedef struct XLogCtlInsert
   */
  typedef struct XLogCtlWrite
  {
- 	XLogwrtResult LogwrtResult; /* current value of LogwrtResult */
  	int			curridx;		/* cache index of next block to write */
  	pg_time_t	lastSegSwitchTime;		/* time of last xlog segment switch */
  } XLogCtlWrite;
  
  /*
   * Total shared-memory state for XLOG.
   */
  typedef struct XLogCtlData
  {
! 	/* Protected by WALInsertLock: */
  	XLogCtlInsert Insert;
  
  	/* Protected by info_lck: */
  	XLogwrtRqst LogwrtRqst;
  	XLogwrtResult LogwrtResult;
--- 467,500 ----
   */
  typedef struct XLogCtlWrite
  {
  	int			curridx;		/* cache index of next block to write */
  	pg_time_t	lastSegSwitchTime;		/* time of last xlog segment switch */
  } XLogCtlWrite;
  
+ 
+ /*
+  * Slots for in-progress WAL insertions.
+  */
+ typedef struct
+ {
+ 	slock_t		lck;
+ 	XLogRecPtr	CurrPos;	/* current position this process is inserting to */
+ 	PGPROC	   *head;		/* head of list of waiting PGPROCs */
+ 	PGPROC	   *tail;		/* tail of list of waiting PGPROCs */
+ } XLogInsertSlot;
+ 
+ #define NumXLogInsertSlots	1000
+ 
  /*
   * Total shared-memory state for XLOG.
   */
  typedef struct XLogCtlData
  {
! 	/* Protected by insertpos_lck: */
  	XLogCtlInsert Insert;
  
+ 	XLogInsertSlot *XLogInsertSlots;
+ 
  	/* Protected by info_lck: */
  	XLogwrtRqst LogwrtRqst;
  	XLogwrtResult LogwrtResult;
***************
*** 414,422 **** typedef struct XLogCtlData
  	XLogCtlWrite Write;
  
  	/*
  	 * These values do not change after startup, although the pointed-to pages
  	 * and xlblocks values certainly do.  Permission to read/write the pages
! 	 * and xlblocks values depends on WALInsertLock and WALWriteLock.
  	 */
  	char	   *pages;			/* buffers for unwritten XLOG pages */
  	XLogRecPtr *xlblocks;		/* 1st byte ptr-s + XLOG_BLCKSZ */
--- 508,526 ----
  	XLogCtlWrite Write;
  
  	/*
+ 	 * To change curridx and the identity of a buffer, you need to hold
+ 	 * WALBufMappingLock. To change the identity of a buffer that's
+ 	 * still dirty, the old page needs to be written out first, and for that
+ 	 * you need WALWriteLock, and you need to ensure that there's no
+ 	 * in-progress insertions to the page by calling
+ 	 * WaitXLogInsertionsToFinish().
+ 	 */
+ 	int			curridx;		/* current (latest) block index in cache */
+ 
+ 	/*
  	 * These values do not change after startup, although the pointed-to pages
  	 * and xlblocks values certainly do.  Permission to read/write the pages
! 	 * and xlblocks values depends on WALBufMappingLock and WALWriteLock.
  	 */
  	char	   *pages;			/* buffers for unwritten XLOG pages */
  	XLogRecPtr *xlblocks;		/* 1st byte ptr-s + XLOG_BLCKSZ */
***************
*** 494,521 **** static XLogCtlData *XLogCtl = NULL;
  static ControlFileData *ControlFile = NULL;
  
  /*
!  * Macros for managing XLogInsert state.  In most cases, the calling routine
!  * has local copies of XLogCtl->Insert and/or XLogCtl->Insert->curridx,
!  * so these are passed as parameters instead of being fetched via XLogCtl.
   */
  
! /* Free space remaining in the current xlog page buffer */
! #define INSERT_FREESPACE(Insert)  \
! 	(XLOG_BLCKSZ - ((Insert)->currpos - (char *) (Insert)->currpage))
  
- /* Construct XLogRecPtr value for current insertion point */
- #define INSERT_RECPTR(recptr,Insert,curridx)  \
- 	( \
- 	  (recptr).xlogid = XLogCtl->xlblocks[curridx].xlogid, \
- 	  (recptr).xrecoff = \
- 		XLogCtl->xlblocks[curridx].xrecoff - INSERT_FREESPACE(Insert) \
- 	)
  
! #define PrevBufIdx(idx)		\
! 		(((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1))
  
! #define NextBufIdx(idx)		\
! 		(((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
  
  /*
   * Private, possibly out-of-date copy of shared LogwrtResult.
--- 598,628 ----
  static ControlFileData *ControlFile = NULL;
  
  /*
!  * Calculate the amount of space left on the page after 'endptr'.
!  * Beware multiple evaluation!
   */
+ #define INSERT_FREESPACE(endptr)	\
+ 	(((endptr).xrecoff % XLOG_BLCKSZ == 0) ? 0 : (XLOG_BLCKSZ - (endptr).xrecoff % XLOG_BLCKSZ))
  
! #define NextBufIdx(idx)		\
! 		(((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
  
  
! #define NextSlotNo(idx)		\
! 		(((idx) == NumXLogInsertSlots) ? 0 : ((idx) + 1))
  
! /*
!  * XLogRecPtrToBufIdx returns the index of the WAL buffer that holds, or
!  * would hold if it was in cache, the page containing 'recptr'.
!  *
!  * XLogRecEndPtrToBufIdx is the same, but a pointer to the first byte of a
!  * page is taken to mean the previous page.
!  */
! #define XLogRecPtrToBufIdx(recptr)	\
! 	((((((uint64) (recptr).xlogid * (uint64) XLogSegsPerFile * XLogSegSize) + (recptr).xrecoff)) / XLOG_BLCKSZ) % (XLogCtl->XLogCacheBlck + 1))
! 
! #define XLogRecEndPtrToBufIdx(recptr)	\
! 	((((((uint64) (recptr).xlogid * (uint64) XLogSegsPerFile * XLogSegSize) + (recptr).xrecoff - 1)) / XLOG_BLCKSZ) % (XLogCtl->XLogCacheBlck + 1))
  
  /*
   * Private, possibly out-of-date copy of shared LogwrtResult.
***************
*** 641,649 **** static void KeepLogSeg(XLogRecPtr recptr, uint32 *logId, uint32 *logSeg);
  
  static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
  				XLogRecPtr *lsn, BkpBlock *bkpb);
! static bool AdvanceXLInsertBuffer(bool new_segment);
  static bool XLogCheckpointNeeded(uint32 logid, uint32 logseg);
! static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
  static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
  					   bool find_free, int *max_advance,
  					   bool use_lock);
--- 748,756 ----
  
  static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
  				XLogRecPtr *lsn, BkpBlock *bkpb);
! static void AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic);
  static bool XLogCheckpointNeeded(uint32 logid, uint32 logseg);
! static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
  static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
  					   bool find_free, int *max_advance,
  					   bool use_lock);
***************
*** 690,695 **** static bool read_backup_label(XLogRecPtr *checkPointLoc,
--- 797,820 ----
  static void rm_redo_error_callback(void *arg);
  static int	get_sync_bit(int method);
  
+ static XLogRecPtr PerformXLogInsert(int write_len,
+ 				  bool isLogSwitch,
+ 				  XLogRecord *rechdr,
+ 				  XLogRecData *rdata, pg_crc32 rdata_crc,
+ 				  bool didPageWrites);
+ static bool ReserveXLogInsertLocation(int size, bool forcePageWrites,
+ 						  bool isLogSwitch,
+ 						  XLogRecPtr *PrevRecord_p, XLogRecPtr *StartPos_p,
+ 						  XLogRecPtr *EndPos_p,
+ 						  XLogInsertSlot **myslot_p, bool *updrqst_p);
+ static void UpdateSlotCurrPos(volatile XLogInsertSlot *myslot,
+ 				  XLogRecPtr CurrPos);
+ static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto,
+ 						   XLogRecPtr CurrPos);
+ static void WaitForXLogInsertionSlotToBecomeFree(void);
+ static char *GetXLogBuffer(XLogRecPtr ptr);
+ static XLogRecPtr AdvanceXLogRecPtrToNextPage(XLogRecPtr ptr);
+ 
  
  /*
   * Insert an XLOG record having the specified RMID and info bytes,
***************
*** 710,721 **** XLogRecPtr
  XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
  {
  	XLogCtlInsert *Insert = &XLogCtl->Insert;
- 	XLogRecord *record;
- 	XLogContRecord *contrecord;
  	XLogRecPtr	RecPtr;
- 	XLogRecPtr	WriteRqst;
- 	uint32		freespace;
- 	int			curridx;
  	XLogRecData *rdt;
  	XLogRecData *rdt_lastnormal;
  	Buffer		dtbuf[XLR_MAX_BKP_BLOCKS];
--- 835,841 ----
***************
*** 729,739 **** XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
  	uint32		len,
  				write_len;
  	unsigned	i;
- 	bool		updrqst;
  	bool		doPageWrites;
! 	bool		isLogSwitch = false;
! 	bool		fpwChange = false;
  	uint8		info_orig = info;
  
  	/* cross-check on whether we should be here or not */
  	if (!XLogInsertAllowed())
--- 849,858 ----
  	uint32		len,
  				write_len;
  	unsigned	i;
  	bool		doPageWrites;
! 	bool		isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
  	uint8		info_orig = info;
+ 	XLogRecord	rechdr;
  
  	/* cross-check on whether we should be here or not */
  	if (!XLogInsertAllowed())
***************
*** 746,775 **** XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
  	TRACE_POSTGRESQL_XLOG_INSERT(rmid, info);
  
  	/*
! 	 * Handle special cases/records.
  	 */
! 	if (rmid == RM_XLOG_ID)
  	{
- 		switch (info)
- 		{
- 			case XLOG_SWITCH:
- 				isLogSwitch = true;
- 				break;
- 
- 			case XLOG_FPW_CHANGE:
- 				fpwChange = true;
- 				break;
- 
- 			default:
- 				break;
- 		}
- 	}
- 	else if (IsBootstrapProcessingMode())
- 	{
- 		/*
- 		 * In bootstrap mode, we don't actually log anything but XLOG resources;
- 		 * return a phony record pointer.
- 		 */
  		RecPtr.xlogid = 0;
  		RecPtr.xrecoff = SizeOfXLogLongPHD;		/* start of 1st chkpt record */
  		return RecPtr;
--- 865,875 ----
  	TRACE_POSTGRESQL_XLOG_INSERT(rmid, info);
  
  	/*
! 	 * In bootstrap mode, we don't actually log anything but XLOG resources;
! 	 * return a phony record pointer.
  	 */
! 	if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
  	{
  		RecPtr.xlogid = 0;
  		RecPtr.xrecoff = SizeOfXLogLongPHD;		/* start of 1st chkpt record */
  		return RecPtr;
***************
*** 939,1072 **** begin:;
  	for (rdt = rdata; rdt != NULL; rdt = rdt->next)
  		COMP_CRC32(rdata_crc, rdt->data, rdt->len);
  
- 	START_CRIT_SECTION();
- 
- 	/* Now wait to get insert lock */
- 	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
- 
  	/*
! 	 * Check to see if my RedoRecPtr is out of date.  If so, may have to go
! 	 * back and recompute everything.  This can only happen just after a
! 	 * checkpoint, so it's better to be slow in this case and fast otherwise.
! 	 *
! 	 * If we aren't doing full-page writes then RedoRecPtr doesn't actually
! 	 * affect the contents of the XLOG record, so we'll update our local copy
! 	 * but not force a recomputation.
  	 */
! 	if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr))
! 	{
! 		Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr));
! 		RedoRecPtr = Insert->RedoRecPtr;
  
! 		if (doPageWrites)
! 		{
! 			for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
! 			{
! 				if (dtbuf[i] == InvalidBuffer)
! 					continue;
! 				if (dtbuf_bkp[i] == false &&
! 					XLByteLE(dtbuf_lsn[i], RedoRecPtr))
! 				{
! 					/*
! 					 * Oops, this buffer now needs to be backed up, but we
! 					 * didn't think so above.  Start over.
! 					 */
! 					LWLockRelease(WALInsertLock);
! 					END_CRIT_SECTION();
! 					rdt_lastnormal->next = NULL;
! 					info = info_orig;
! 					goto begin;
! 				}
! 			}
! 		}
! 	}
  
  	/*
! 	 * Also check to see if fullPageWrites or forcePageWrites was just turned on;
! 	 * if we weren't already doing full-page writes then go back and recompute.
! 	 * (If it was just turned off, we could recompute the record without full pages,
! 	 * but we choose not to bother.)
  	 */
! 	if ((Insert->fullPageWrites || Insert->forcePageWrites) && !doPageWrites)
  	{
! 		/* Oops, must redo it with full-page data. */
! 		LWLockRelease(WALInsertLock);
! 		END_CRIT_SECTION();
  		rdt_lastnormal->next = NULL;
  		info = info_orig;
  		goto begin;
  	}
  
- 	/*
- 	 * If there isn't enough space on the current XLOG page for a record
- 	 * header, advance to the next page (leaving the unused space as zeroes).
- 	 */
- 	updrqst = false;
- 	freespace = INSERT_FREESPACE(Insert);
- 	if (freespace < SizeOfXLogRecord)
- 	{
- 		updrqst = AdvanceXLInsertBuffer(false);
- 		freespace = INSERT_FREESPACE(Insert);
- 	}
- 
- 	/* Compute record's XLOG location */
- 	curridx = Insert->curridx;
- 	INSERT_RECPTR(RecPtr, Insert, curridx);
- 
- 	/*
- 	 * If the record is an XLOG_SWITCH, and we are exactly at the start of a
- 	 * segment, we need not insert it (and don't want to because we'd like
- 	 * consecutive switch requests to be no-ops).  Instead, make sure
- 	 * everything is written and flushed through the end of the prior segment,
- 	 * and return the prior segment's end address.
- 	 */
- 	if (isLogSwitch &&
- 		(RecPtr.xrecoff % XLogSegSize) == SizeOfXLogLongPHD)
- 	{
- 		/* We can release insert lock immediately */
- 		LWLockRelease(WALInsertLock);
- 
- 		RecPtr.xrecoff -= SizeOfXLogLongPHD;
- 		if (RecPtr.xrecoff == 0)
- 		{
- 			/* crossing a logid boundary */
- 			RecPtr.xlogid -= 1;
- 			RecPtr.xrecoff = XLogFileSize;
- 		}
- 
- 		LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
- 		LogwrtResult = XLogCtl->Write.LogwrtResult;
- 		if (!XLByteLE(RecPtr, LogwrtResult.Flush))
- 		{
- 			XLogwrtRqst FlushRqst;
- 
- 			FlushRqst.Write = RecPtr;
- 			FlushRqst.Flush = RecPtr;
- 			XLogWrite(FlushRqst, false, false);
- 		}
- 		LWLockRelease(WALWriteLock);
- 
- 		END_CRIT_SECTION();
- 
- 		return RecPtr;
- 	}
- 
- 	/* Insert record header */
- 
- 	record = (XLogRecord *) Insert->currpos;
- 	record->xl_prev = Insert->PrevRecord;
- 	record->xl_xid = GetCurrentTransactionIdIfAny();
- 	record->xl_tot_len = SizeOfXLogRecord + write_len;
- 	record->xl_len = len;		/* doesn't include backup blocks */
- 	record->xl_info = info;
- 	record->xl_rmid = rmid;
- 
- 	/* Now we can finish computing the record's CRC */
- 	COMP_CRC32(rdata_crc, (char *) record + sizeof(pg_crc32),
- 			   SizeOfXLogRecord - sizeof(pg_crc32));
- 	FIN_CRC32(rdata_crc);
- 	record->xl_crc = rdata_crc;
- 
  #ifdef WAL_DEBUG
  	if (XLOG_DEBUG)
  	{
--- 1039,1078 ----
  	for (rdt = rdata; rdt != NULL; rdt = rdt->next)
  		COMP_CRC32(rdata_crc, rdt->data, rdt->len);
  
  	/*
! 	 * Construct record header. We can't CRC it yet, because the prev-link
! 	 * needs to be covered by the CRC and we don't know that yet. We will
! 	 * finish computing the CRC when we do.
  	 */
! 	MemSet(&rechdr, 0, sizeof(rechdr));
! 	/* rechdr.xl_prev is set in PerformXLogInsert() */
! 	rechdr.xl_xid = GetCurrentTransactionIdIfAny();
! 	rechdr.xl_tot_len = SizeOfXLogRecord + write_len;
! 	rechdr.xl_len = len;		/* doesn't include backup blocks */
! 	rechdr.xl_info = info;
! 	rechdr.xl_rmid = rmid;
  
! 	START_CRIT_SECTION();
  
  	/*
! 	 * Try to do the insertion.
  	 */
! 	RecPtr = PerformXLogInsert(write_len, isLogSwitch, &rechdr,
! 							   rdata, rdata_crc, doPageWrites);
! 	END_CRIT_SECTION();
! 
! 	if (XLogRecPtrIsInvalid(RecPtr))
  	{
! 		/*
! 		 * Oops, must redo it with full-page data. Unlink the backup blocks
! 		 * from the chain and reset info bitmask to undo the changes we've
! 		 * done.
! 		 */
  		rdt_lastnormal->next = NULL;
  		info = info_orig;
  		goto begin;
  	}
  
  #ifdef WAL_DEBUG
  	if (XLOG_DEBUG)
  	{
***************
*** 1075,1267 **** begin:;
  		initStringInfo(&buf);
  		appendStringInfo(&buf, "INSERT @ %X/%X: ",
  						 RecPtr.xlogid, RecPtr.xrecoff);
! 		xlog_outrec(&buf, record);
  		if (rdata->data != NULL)
  		{
  			appendStringInfo(&buf, " - ");
! 			RmgrTable[record->xl_rmid].rm_desc(&buf, record->xl_info, rdata->data);
  		}
  		elog(LOG, "%s", buf.data);
  		pfree(buf.data);
  	}
  #endif
  
! 	/* Record begin of record in appropriate places */
! 	ProcLastRecPtr = RecPtr;
! 	Insert->PrevRecord = RecPtr;
  
! 	Insert->currpos += SizeOfXLogRecord;
! 	freespace -= SizeOfXLogRecord;
  
  	/*
! 	 * Append the data, including backup blocks if any
  	 */
! 	while (write_len)
! 	{
! 		while (rdata->data == NULL)
! 			rdata = rdata->next;
  
! 		if (freespace > 0)
  		{
! 			if (rdata->len > freespace)
  			{
! 				memcpy(Insert->currpos, rdata->data, freespace);
  				rdata->data += freespace;
  				rdata->len -= freespace;
! 				write_len -= freespace;
! 			}
! 			else
! 			{
! 				memcpy(Insert->currpos, rdata->data, rdata->len);
! 				freespace -= rdata->len;
! 				write_len -= rdata->len;
! 				Insert->currpos += rdata->len;
! 				rdata = rdata->next;
! 				continue;
  			}
  		}
  
! 		/* Use next buffer */
! 		updrqst = AdvanceXLInsertBuffer(false);
! 		curridx = Insert->curridx;
! 		/* Insert cont-record header */
! 		Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
! 		contrecord = (XLogContRecord *) Insert->currpos;
! 		contrecord->xl_rem_len = write_len;
! 		Insert->currpos += SizeOfXLogContRecord;
! 		freespace = INSERT_FREESPACE(Insert);
  	}
  
! 	/* Ensure next record will be properly aligned */
! 	Insert->currpos = (char *) Insert->currpage +
! 		MAXALIGN(Insert->currpos - (char *) Insert->currpage);
! 	freespace = INSERT_FREESPACE(Insert);
  
  	/*
! 	 * The recptr I return is the beginning of the *next* record. This will be
! 	 * stored as LSN for changed data pages...
  	 */
! 	INSERT_RECPTR(RecPtr, Insert, curridx);
  
  	/*
! 	 * If the record is an XLOG_SWITCH, we must now write and flush all the
! 	 * existing data, and then forcibly advance to the start of the next
! 	 * segment.  It's not good to do this I/O while holding the insert lock,
! 	 * but there seems too much risk of confusion if we try to release the
! 	 * lock sooner.  Fortunately xlog switch needn't be a high-performance
! 	 * operation anyway...
  	 */
! 	if (isLogSwitch)
  	{
! 		XLogCtlWrite *Write = &XLogCtl->Write;
! 		XLogwrtRqst FlushRqst;
! 		XLogRecPtr	OldSegEnd;
  
! 		TRACE_POSTGRESQL_XLOG_SWITCH();
  
! 		LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
  
  		/*
! 		 * Flush through the end of the page containing XLOG_SWITCH, and
! 		 * perform end-of-segment actions (eg, notifying archiver).
  		 */
! 		WriteRqst = XLogCtl->xlblocks[curridx];
! 		FlushRqst.Write = WriteRqst;
! 		FlushRqst.Flush = WriteRqst;
! 		XLogWrite(FlushRqst, false, true);
! 
! 		/* Set up the next buffer as first page of next segment */
! 		/* Note: AdvanceXLInsertBuffer cannot need to do I/O here */
! 		(void) AdvanceXLInsertBuffer(true);
! 
! 		/* There should be no unwritten data */
! 		curridx = Insert->curridx;
! 		Assert(curridx == Write->curridx);
! 
! 		/* Compute end address of old segment */
! 		OldSegEnd = XLogCtl->xlblocks[curridx];
! 		OldSegEnd.xrecoff -= XLOG_BLCKSZ;
! 		if (OldSegEnd.xrecoff == 0)
! 		{
! 			/* crossing a logid boundary */
! 			OldSegEnd.xlogid -= 1;
! 			OldSegEnd.xrecoff = XLogFileSize;
! 		}
  
! 		/* Make it look like we've written and synced all of old segment */
! 		LogwrtResult.Write = OldSegEnd;
! 		LogwrtResult.Flush = OldSegEnd;
  
  		/*
! 		 * Update shared-memory status --- this code should match XLogWrite
  		 */
  		{
! 			/* use volatile pointer to prevent code rearrangement */
! 			volatile XLogCtlData *xlogctl = XLogCtl;
  
! 			SpinLockAcquire(&xlogctl->info_lck);
! 			xlogctl->LogwrtResult = LogwrtResult;
! 			if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write))
! 				xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
! 			if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush))
! 				xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
! 			SpinLockRelease(&xlogctl->info_lck);
  		}
  
! 		Write->LogwrtResult = LogwrtResult;
  
! 		LWLockRelease(WALWriteLock);
  
! 		updrqst = false;		/* done already */
  	}
  	else
  	{
! 		/* normal case, ie not xlog switch */
  
! 		/* Need to update shared LogwrtRqst if some block was filled up */
! 		if (freespace < SizeOfXLogRecord)
  		{
! 			/* curridx is filled and available for writing out */
! 			updrqst = true;
  		}
  		else
  		{
! 			/* if updrqst already set, write through end of previous buf */
! 			curridx = PrevBufIdx(curridx);
  		}
- 		WriteRqst = XLogCtl->xlblocks[curridx];
  	}
  
  	/*
! 	 * If the record is an XLOG_FPW_CHANGE, we update full_page_writes
! 	 * in shared memory before releasing WALInsertLock. This ensures that
! 	 * an XLOG_FPW_CHANGE record precedes any WAL record affected
! 	 * by this change of full_page_writes.
  	 */
! 	if (fpwChange)
! 		Insert->fullPageWrites = fullPageWrites;
  
! 	LWLockRelease(WALInsertLock);
! 
! 	if (updrqst)
  	{
! 		/* use volatile pointer to prevent code rearrangement */
! 		volatile XLogCtlData *xlogctl = XLogCtl;
  
! 		SpinLockAcquire(&xlogctl->info_lck);
! 		/* advance global request to include new block(s) */
! 		if (XLByteLT(xlogctl->LogwrtRqst.Write, WriteRqst))
! 			xlogctl->LogwrtRqst.Write = WriteRqst;
! 		/* update local result copy while I have the chance */
! 		LogwrtResult = xlogctl->LogwrtResult;
! 		SpinLockRelease(&xlogctl->info_lck);
  	}
  
! 	XactLastRecEnd = RecPtr;
  
! 	END_CRIT_SECTION();
  
! 	return RecPtr;
  }
  
  /*
--- 1081,1796 ----
  		initStringInfo(&buf);
  		appendStringInfo(&buf, "INSERT @ %X/%X: ",
  						 RecPtr.xlogid, RecPtr.xrecoff);
! 		xlog_outrec(&buf, &rechdr);
  		if (rdata->data != NULL)
  		{
  			appendStringInfo(&buf, " - ");
! 			RmgrTable[rmid].rm_desc(&buf, rechdr.xl_info, rdata->data);
  		}
  		elog(LOG, "%s", buf.data);
  		pfree(buf.data);
  	}
  #endif
  
! 	/*
! 	 * The recptr I return is the beginning of the *next* record. This will be
! 	 * stored as LSN for changed data pages...
! 	 */
! 	return RecPtr;
! }
! 
! /*
!  * Subroutine of XLogInsert. All the changes to shared state are done here,
!  * XLogInsert only prepares the record for insertion.
!  *
!  * On success, returns pointer to end of inserted record like XLogInsert().
!  * If RedoRecPtr or forcePageWrites had changed, returns InvalidRecPtr, and
!  * the caller must recalculate full-page-images and retry.
!  */
! static XLogRecPtr
! PerformXLogInsert(int write_len, bool isLogSwitch, XLogRecord *rechdr,
! 				  XLogRecData *rdata, pg_crc32 rdata_crc,
! 				  bool didPageWrites)
! {
! 	volatile XLogInsertSlot *myslot = NULL;
! 	char	   *currpos;
! 	XLogRecord *record;
! 	int			tot_len;
! 	int			freespace;
! 	int			written;
! 	XLogRecPtr	PrevRecord;
! 	XLogRecPtr	StartPos;
! 	XLogRecPtr	EndPos;
! 	XLogRecPtr	CurrPos;
! 	bool		updrqst;
  
! 	/* Get an insert location  */
! 	tot_len = SizeOfXLogRecord + write_len;
! 	if (!ReserveXLogInsertLocation(tot_len, didPageWrites, isLogSwitch,
! 								   &PrevRecord, &StartPos, &EndPos,
! 								   (XLogInsertSlot **) &myslot, &updrqst))
! 	{
! 		return EndPos;
! 	}
  
  	/*
! 	 * Got it! Now that we know the prev-link, we can finish computing the
! 	 * record's CRC.
  	 */
! 	rechdr->xl_prev = PrevRecord;
! 
! 	/* Get the right WAL page to start inserting to */
! 	CurrPos = StartPos;
! 	currpos = GetXLogBuffer(CurrPos);
! 
! 	/* Copy the record header in place */
! 	record = (XLogRecord *) currpos;
! 	memcpy(record, rechdr, sizeof(XLogRecord));
! 	COMP_CRC32(rdata_crc, currpos + sizeof(pg_crc32),
! 			   SizeOfXLogRecord - sizeof(pg_crc32));
! 	FIN_CRC32(rdata_crc);
! 	record->xl_crc = rdata_crc;
! 
! 	currpos += SizeOfXLogRecord;
! 	CurrPos.xrecoff += SizeOfXLogRecord;
  
! 	if (!isLogSwitch)
! 	{
! 		/* Copy record data */
! 		written = 0;
! 		freespace = INSERT_FREESPACE(CurrPos);
! 		while (rdata != NULL)
  		{
! 			while (rdata->len > freespace)
  			{
! 				/*
! 				 * Write what fits on this page, then write the continuation
! 				 * record, and continue.
! 				 */
! 				XLogContRecord *contrecord;
! 
! 				memcpy(currpos, rdata->data, freespace);
  				rdata->data += freespace;
  				rdata->len -= freespace;
! 				written += freespace;
! 				CurrPos.xrecoff += freespace;
! 
! 				/*
! 				 * CurrPos now points to the page boundary, ie. the first byte
! 				 * of the next page. Advertise that as our CurrPos before
! 				 * calling GetXLogBuffer(), because GetXLogBuffer() might need
! 				 * to wait for some insertions to finish so that it can write
! 				 * out a buffer to make room for the new page. Updating CurrPos
! 				 * before waiting for a new buffer ensures that we don't
! 				 * deadlock with ourselves if we run out of clean buffers.
! 				 *
! 				 * However, we must not advance CurrPos past the page header
! 				 * yet, otherwise someone might try to flush up to that point,
! 				 * which would fail if the next page was not initialized yet.
! 				 */
! 				UpdateSlotCurrPos(myslot, CurrPos);
! 
! 				/*
! 				 * Get pointer to beginning of next page, and set the
! 				 * XLP_FIRST_IS_CONTRECORD flag in the page header.
! 				 *
! 				 * It's safe to set the contrecord flag without a lock on the
! 				 * page. All the other flags are set in AdvanceXLInsertBuffer,
! 				 * and we're the only backend that needs to set the contrecord
! 				 * flag.
! 				 */
! 				currpos = GetXLogBuffer(CurrPos);
! 				((XLogPageHeader) currpos)->xlp_info |= XLP_FIRST_IS_CONTRECORD;
! 
! 				/* skip over the page header, and write continuation record */
! 				CurrPos = AdvanceXLogRecPtrToNextPage(CurrPos);
! 				currpos = GetXLogBuffer(CurrPos);
! 
! 				contrecord = (XLogContRecord *) currpos;
! 				contrecord->xl_rem_len = write_len - written;
! 
! 				currpos += SizeOfXLogContRecord;
! 				CurrPos.xrecoff += SizeOfXLogContRecord;
! 
! 				freespace = INSERT_FREESPACE(CurrPos);
  			}
+ 
+ 			memcpy(currpos, rdata->data, rdata->len);
+ 			currpos += rdata->len;
+ 			CurrPos.xrecoff += rdata->len;
+ 			freespace -= rdata->len;
+ 			written += rdata->len;
+ 
+ 			rdata = rdata->next;
  		}
+ 		Assert(written == write_len);
  
! 		CurrPos.xrecoff = MAXALIGN(CurrPos.xrecoff);
! 		Assert(XLByteEQ(CurrPos, EndPos));
  	}
+ 	else
+ 	{
+ 		/* An xlog-switch record doesn't contain any data besides the header */
+ 		Assert(write_len == 0);
  
! 		/*
! 		 * An xlog-switch record consumes all the remaining space on the
! 		 * WAL segment. We have already reserved it for us, but we still need
! 		 * to make sure it's been allocated and zeroed in the WAL buffers so
! 		 * that when the caller (or someone else) does XLogWrite(), it can
! 		 * really write out all the zeros.
! 		 *
! 		 * We do this one page at a time, to make sure we don't deadlock
! 		 * against ourselves if wal_buffers < XLOG_SEG_SIZE.
! 		 */
! 		while (XLByteLT(CurrPos, EndPos))
! 		{
! 			/* use up all the remaining space in this page */
! 			freespace = INSERT_FREESPACE(CurrPos);
! 			XLByteAdvance(CurrPos, freespace);
! 			/*
! 			 * like in the non-xlog-switch codepath, let others know that
! 			 * we're done writing up to the end of this page
! 			 */
! 			UpdateSlotCurrPos(myslot, CurrPos);
! 			/*
! 			 * let GetXLogBuffer initialize next page if necessary.
! 			 */
! 			CurrPos = AdvanceXLogRecPtrToNextPage(CurrPos);
! 			(void) GetXLogBuffer(CurrPos);
! 		}
! 
! 		/*
! 		 * Even though we reserved the rest of the segment for us, which
! 		 * is reflected in EndPos, we need to return a value that points just
! 		 * to the end of the xlog-switch record.
! 		 */
! 		EndPos.xlogid = StartPos.xlogid;
! 		EndPos.xrecoff = StartPos.xrecoff + SizeOfXLogRecord;
! 	}
  
  	/*
! 	 * Done! Clear CurrPos in our slot to let others know that we're
! 	 * finished.
  	 */
! 	UpdateSlotCurrPos(myslot, InvalidXLogRecPtr);
  
  	/*
! 	 * Update shared LogwrtRqst.Write, if we crossed page boundary.
  	 */
! 	if (updrqst)
  	{
! 		/* use volatile pointer to prevent code rearrangement */
! 		volatile XLogCtlData *xlogctl = XLogCtl;
! 
! 		SpinLockAcquire(&xlogctl->info_lck);
! 		/* advance global request to include new block(s) */
! 		if (XLByteLT(xlogctl->LogwrtRqst.Write, EndPos))
! 			xlogctl->LogwrtRqst.Write = EndPos;
! 		/* update local result copy while I have the chance */
! 		LogwrtResult = xlogctl->LogwrtResult;
! 		SpinLockRelease(&xlogctl->info_lck);
! 	}
  
! 	/* update our global variables */
! 	ProcLastRecPtr = StartPos;
! 	XactLastRecEnd = EndPos;
  
! 	return EndPos;
! }
  
+ /*
+  * Reserves the right amount of space for a record of given size from the WAL.
+  * *StartPos_p is set to the beginning of the reserved section, *EndPos_p to
+  * its end, and *Prev_record_p points to the beginning of the previous record
+  * to set to the prev-link of the record header.
+  *
+  * A log-switch record is handled slightly differently. The rest of the
+  * segment will be reserved for this insertion, as indicated by the returned
+  * *EndPos_p value. However, if we are already at the beginning of the current
+  * segment, the *EndPos_p is set to the current location without reserving
+  * any space, and the function returns false.
+  *
+  * *updrqst_p is set to true, if this record ends on different page than
+  * the previous one - the caller should update the shared LogwrtRqst value
+  * after it's done inserting the record in that case, so that the WAL page
+  * that filled up gets written out at the next convenient moment.
+  *
+  * While holding insertpos_lck, sets myslot->CurrPos to the starting position,
+  * (or the end of previous record, to be exact) to let others know that we're
+  * busy inserting to the reserved area. The caller must clear it when the
+  * insertion is finished.
+  *
+  * Returns true on success, or false if RedoRecPtr or forcePageWrites was
+  * changed. On failure, the shared state is not modified.
+  *
+  * This is the performance critical part of XLogInsert that must be
+  * serialized across backends. The rest can happen mostly in parallel.
+  *
+  * NB: The space calculation here must match the code in PerformXLogInsert,
+  * where we actually copy the record to the reserved space.
+  */
+ static bool
+ ReserveXLogInsertLocation(int size, bool didPageWrites,
+ 						  bool isLogSwitch,
+ 						  XLogRecPtr *PrevRecord_p, XLogRecPtr *StartPos_p,
+ 						  XLogRecPtr *EndPos_p,
+ 						  XLogInsertSlot **myslot_p, bool *updrqst_p)
+ {
+ 	volatile XLogInsertSlot *myslot;
+ 	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+ 	int			freespace;
+ 	XLogRecPtr	ptr;
+ 	XLogRecPtr	StartPos;
+ 	XLogRecPtr	LastEndPos;
+ 	int32		nextslot;
+ 	int32		lastslot;
+ 	bool		updrqst = false;
+ 
+ retry:
+ 	SpinLockAcquire(&Insert->insertpos_lck);
+ 
+ 	if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr) ||
+ 		(!didPageWrites && (Insert->forcePageWrites || Insert->fullPageWrites)))
+ 	{
  		/*
! 		 * Oops, a checkpoint just happened, or forcePageWrites was just
! 		 * turned on. Start XLogInsert() all over, because we might have to
! 		 * include more full-page images in the record.
  		 */
! 		RedoRecPtr = Insert->RedoRecPtr;
! 		SpinLockRelease(&Insert->insertpos_lck);
! 		*EndPos_p = InvalidXLogRecPtr;
! 		return false;
! 	}
! 
! 	/*
! 	 * Reserve the next insertion slot for us.
! 	 *
! 	 * First check that the slot is not still in use. Modifications to
! 	 * lastslot are protected by WALInsertTailLock, but here we assume that
! 	 * reading an int32 is atomic. Another process might advance lastslot at
! 	 * the same time, but not past nextslot.
! 	 */
! 	lastslot = Insert->lastslot;
! 	nextslot = Insert->nextslot;
! 	if (NextSlotNo(nextslot) == lastslot)
! 	{
! 		/*
! 		 * Oops, we've "caught our tail" and the oldest slot is still in use.
! 		 * Have to wait for it to become vacant.
! 		 */
! 		SpinLockRelease(&Insert->insertpos_lck);
! 		WaitForXLogInsertionSlotToBecomeFree();
! 		goto retry;
! 	}
! 	myslot = &XLogCtl->XLogInsertSlots[nextslot];
! 	nextslot = NextSlotNo(nextslot);
! 
! 	/*
! 	 * Got the slot, now reserve the right amount of space from the WAL for
! 	 * our record.
! 	 */
! 	LastEndPos = ptr = Insert->CurrPos;
! 	*PrevRecord_p = Insert->PrevRecord;
! 
! 	/*
! 	 * If there isn't enough space on the current XLOG page for a record
! 	 * header, advance to the next page (leaving the unused space as zeroes).
! 	 */
! 	freespace = INSERT_FREESPACE(ptr);
! 	if (freespace < SizeOfXLogRecord)
! 	{
! 		ptr = AdvanceXLogRecPtrToNextPage(ptr);
! 		freespace = INSERT_FREESPACE(ptr);
! 		updrqst = true;
! 	}
  
! 	/*
! 	 * We are now at the starting position of our record. Now figure out how
! 	 * the data will be split across the WAL pages, to calculate where the
! 	 * record ends.
! 	 */
! 	StartPos = ptr;
  
+ 	if (isLogSwitch)
+ 	{
  		/*
! 		 * If the record is an XLOG_SWITCH, and we are exactly at the start of a
! 		 * segment, we need not insert it (and don't want to because we'd like
! 		 * consecutive switch requests to be no-ops). Otherwise the XLOG_SWITCH
! 		 * record should consume all the remaining space on the current segment.
  		 */
+ 		Assert(size == SizeOfXLogRecord);
+ 		if ((ptr.xrecoff % XLogSegSize) == SizeOfXLogLongPHD)
  		{
! 			/* We can release insert lock immediately */
! 			SpinLockRelease(&Insert->insertpos_lck);
  
! 			ptr.xrecoff -= SizeOfXLogLongPHD;
! 			if (ptr.xrecoff == 0)
! 			{
! 				/* crossing a logid boundary */
! 				ptr.xlogid -= 1;
! 				ptr.xrecoff = XLogFileSize;
! 			}
! 
! 			*EndPos_p = ptr;
! 			*StartPos_p = ptr;
! 			*myslot_p = NULL;
! 
! 			return false;
! 		}
! 		else
! 		{
! 			if (ptr.xrecoff % XLOG_SEG_SIZE != 0)
! 			{
! 				int segleft = XLOG_SEG_SIZE - (ptr.xrecoff % XLOG_SEG_SIZE);
! 				XLByteAdvance(ptr, segleft);
! 			}
! 			updrqst = true;
  		}
+ 	}
+ 	else
+ 	{
+ 		/* A normal record, ie. not xlog-switch */
+ 		int sizeleft = size;
+ 		while (freespace < sizeleft)
+ 		{
+ 			/* fill this page, and continue on next page */
+ 			sizeleft -= freespace;
+ 			ptr = AdvanceXLogRecPtrToNextPage(ptr);
  
! 			/* account for continuation record header */
! 			ptr.xrecoff += SizeOfXLogContRecord;
! 			freespace = INSERT_FREESPACE(ptr);
  
! 			updrqst = true;
! 		}
! 		/* the rest fits on this page */
! 		ptr.xrecoff += sizeleft;
! 
! 		/* Align the end position, so that the next record starts aligned */
! 		ptr.xrecoff = MAXALIGN(ptr.xrecoff);
! 	}
! 
! 	/* Update the shared state, and our slot, before releasing the lock */
! 	myslot->CurrPos = LastEndPos;
! 	Insert->CurrPos = ptr;
! 	Insert->PrevRecord = StartPos;
! 	Insert->nextslot = nextslot;
! 
! 	SpinLockRelease(&Insert->insertpos_lck);
! 
! #ifdef RESERVE_XLOGINSERT_LOCATION_DEBUG
! 	elog(LOG, "reserved xlog: prev %X/%X, start %X/%X, end %X/%X (len %d)",
! 		 PrevRecord_p->xlogid, PrevRecord_p->xrecoff,
! 		 StartPos.xlogid, StartPos.xrecoff,
! 		 ptr.xlogid, ptr.xrecoff,
! 		 size);
! #endif
! 
! 	*EndPos_p = ptr;
! 	*StartPos_p = StartPos;
! 	*myslot_p = (XLogInsertSlot *) myslot;
! 	*updrqst_p = updrqst;
! 
! 	return true;
! }
! 
! /*
!  * Update slot's CurrPos variable, and wake up anyone waiting on it.
!  */
! static void
! UpdateSlotCurrPos(volatile XLogInsertSlot *myslot, XLogRecPtr CurrPos)
! {
! 	PGPROC	   *head;
! 
! 	/*
! 	 * The write-barrier ensures that the changes we made to the WAL pages
! 	 * are visible to everyone before the update of CurrPos.
! 	 *
! 	 * XXX: I'm not sure if this is necessary. Does a function call act
! 	 * as an implicit barrier?
! 	 */
! 	pg_write_barrier();
  
! 	SpinLockAcquire(&myslot->lck);
! 	myslot->CurrPos = CurrPos;
! 	head = myslot->head;
! 	myslot->head = myslot->tail = NULL;
! 	SpinLockRelease(&myslot->lck);
! 	while (head != NULL)
! 	{
! 		PGPROC *proc = head;
! 		head = proc->lwWaitLink;
! 		proc->lwWaitLink = NULL;
! 		proc->lwWaiting = false;
! 		PGSemaphoreUnlock(&proc->sem);
  	}
+ }
+ 
+ /*
+  * Get a pointer to the right location in the WAL buffer containing the
+  * given XLogRecPtr.
+  *
+  * If the page is not initialized yet, it is initialized. That might require
+  * evicting an old dirty buffer from the buffer cache, which means I/O.
+  *
+  * The caller must ensure that the page containing the requested location
+  * isn't evicted yet, and won't be evicted, by holding onto an
+  * XLogInsertSlot with CurrPos set to 'ptr'. Setting it to some value
+  * less than 'ptr' would suffice for GetXLogBuffer(), but risks deadlock:
+  * if we have to evict a buffer, we might have to wait for someone else to
+  * finish a write. And that someone else might not be able to finish the write
+  * if our CurrPos points to a buffer that's still in the buffer cache.
+  */
+ static char *
+ GetXLogBuffer(XLogRecPtr ptr)
+ {
+ 	int			idx;
+ 	XLogRecPtr	endptr;
+ 
+ 	/*
+ 	 * The XLog buffer cache is organized so that we can easily calculate the
+ 	 * buffer a given page must be loaded into from the XLogRecPtr alone.
+ 	 * A page must always be loaded to a particular buffer.
+ 	 */
+ 	idx = XLogRecPtrToBufIdx(ptr);
+ 
+ 	/*
+ 	 * See what page is loaded in the buffer at the moment. It could be the
+ 	 * page we're looking for, or something older. It can't be anything
+ 	 * newer - that would imply the page we're looking for has already
+ 	 * been written out to disk, which shouldn't happen as long as the caller
+ 	 * has set its slot's CurrPos correctly.
+ 	 *
+ 	 * However, we don't hold a lock while we read the value. If someone has
+ 	 * just initialized the page, it's possible that we get a "torn read",
+ 	 * and read a bogus value. That's ok, we'll grab the mapping lock (in
+ 	 * AdvanceXLInsertBuffer) and retry if we see anything else than the page
+ 	 * we're looking for. But it means that when we do this unlocked read, we
+ 	 * might see a value that appears to be ahead of the page we're looking
+ 	 * for. So don't PANIC on that, until we've verified the value while
+ 	 * holding the lock.
+ 	 */
+ 	endptr = XLogCtl->xlblocks[idx];
+ 	if (ptr.xlogid != endptr.xlogid ||
+ 		!(ptr.xrecoff < endptr.xrecoff &&
+ 		  ptr.xrecoff >= endptr.xrecoff - XLOG_BLCKSZ))
+ 	{
+ 		AdvanceXLInsertBuffer(ptr, false);
+ 		endptr = XLogCtl->xlblocks[idx];
+ 
+ 		if (ptr.xlogid != endptr.xlogid ||
+ 			!(ptr.xrecoff < endptr.xrecoff &&
+ 			  ptr.xrecoff >= endptr.xrecoff - XLOG_BLCKSZ))
+ 		{
+ 			elog(PANIC, "could not find WAL buffer for %X/%X",
+ 				 ptr.xlogid, ptr.xrecoff);
+ 		}
+ 	}
+ 
+ 	/*
+ 	 * Found the buffer holding this page. Return a pointer to the right
+ 	 * offset within the page.
+ 	 */
+ 	return (char *) XLogCtl->pages + idx * (Size) XLOG_BLCKSZ +
+ 		ptr.xrecoff % XLOG_BLCKSZ;
+ }
+ 
+ /*
+  * Advance an XLogRecPtr to the first valid insertion location on the next
+  * page, right after the page header. An XLogRecPtr pointing to a boundary,
+  * ie. the first byte of a page, is taken to belong to the previous page.
+  */
+ static XLogRecPtr
+ AdvanceXLogRecPtrToNextPage(XLogRecPtr ptr)
+ {
+ 	int			freespace;
+ 
+ 	freespace = INSERT_FREESPACE(ptr);
+ 	XLByteAdvance(ptr, freespace);
+ 	if (ptr.xrecoff % XLogSegSize == 0)
+ 		ptr.xrecoff += SizeOfXLogLongPHD;
  	else
+ 		ptr.xrecoff += SizeOfXLogShortPHD;
+ 
+ 	return ptr;
+ }
+ 
+ /*
+  * Wait for any insertions < upto to finish.
+  *
+  * Returns a value >= upto, which indicates the oldest in-progress insertion
+  * that we saw in the array, or CurrPos if there are no insertions in-progress
+  * at exit.
+  */
+ static XLogRecPtr
+ WaitXLogInsertionsToFinish(XLogRecPtr upto, XLogRecPtr CurrPos)
+ {
+ 	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+ 	int			lastslot;
+ 	int			nextslot;
+ 	volatile XLogInsertSlot *slot;
+ 	XLogRecPtr	slotptr = InvalidXLogRecPtr;
+ 	XLogRecPtr	LastPos;
+ 	int			extraWaits = 0;
+ 
+ 	if (MyProc == NULL)
+ 		elog(PANIC, "cannot wait without a PGPROC structure");
+ 
+ 	LastPos = CurrPos;
+ 
+ 	LWLockAcquire(WALInsertTailLock, LW_EXCLUSIVE);
+ 
+ 	lastslot = Insert->lastslot;
+ 	nextslot = Insert->nextslot;
+ 
+ 	/* Skip over slots that have finished already */
+ 	while (lastslot != nextslot)
  	{
! 		slot = &XLogCtl->XLogInsertSlots[lastslot];
! 		SpinLockAcquire(&slot->lck);
! 		slotptr = slot->CurrPos;
  
! 		if (XLogRecPtrIsInvalid(slotptr))
  		{
! 			lastslot = NextSlotNo(lastslot);
! 			SpinLockRelease(&slot->lck);
  		}
  		else
  		{
! 			/*
! 			 * This insertion is still in-progress. Wait for it to finish
! 			 * if it's <= upto, otherwise we're done.
! 			 */
! 			Insert->lastslot = lastslot;
! 
! 			if (XLogRecPtrIsInvalid(upto) || XLByteLE(upto, slotptr))
! 			{
! 				LastPos = slotptr;
! 				SpinLockRelease(&slot->lck);
! 				break;
! 			}
! 
! 			/* wait */
! 			MyProc->lwWaiting = true;
! 			MyProc->lwWaitMode = 0; /* doesn't matter */
! 			MyProc->lwWaitLink = NULL;
! 			if (slot->head == NULL)
! 				slot->head = MyProc;
! 			else
! 				slot->tail->lwWaitLink = MyProc;
! 			slot->tail = MyProc;
! 			SpinLockRelease(&slot->lck);
! 			LWLockRelease(WALInsertTailLock);
! 			for (;;)
! 			{
! 				PGSemaphoreLock(&MyProc->sem, false);
! 				if (!MyProc->lwWaiting)
! 					break;
! 				extraWaits++;
! 			}
! 			LWLockAcquire(WALInsertTailLock, LW_EXCLUSIVE);
! 			lastslot = Insert->lastslot;
! 			nextslot = Insert->nextslot;
  		}
  	}
  
+ 	Insert->lastslot = lastslot;
+ 	LWLockRelease(WALInsertTailLock);
+ 
+ 	while (extraWaits-- > 0)
+ 		PGSemaphoreUnlock(&MyProc->sem);
+ 
+ 	return LastPos;
+ }
+ 
+ /*
+  * Wait for the next insertion slot to become vacant.
+  */
+ static void
+ WaitForXLogInsertionSlotToBecomeFree(void)
+ {
+ 	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+ 	int			lastslot;
+ 	int			nextslot;
+ 	int			extraWaits = 0;
+ 
+ 	if (MyProc == NULL)
+ 		elog(PANIC, "cannot wait without a PGPROC structure");
+ 
+ 	LWLockAcquire(WALInsertTailLock, LW_EXCLUSIVE);
+ 
  	/*
! 	 * Re-read lastslot and nextslot, now that we have the wait-lock.
! 	 * We're reading nextslot without holding insertpos_lck. It could advance
! 	 * at the same time, but it can't advance beyond lastslot - 1.
  	 */
! 	lastslot = Insert->lastslot;
! 	nextslot = Insert->nextslot;
  
! 	/*
! 	 * If there are still no slots available, wait for the oldest slot to
! 	 * become vacant.
! 	 */
! 	while (NextSlotNo(nextslot) == lastslot)
  	{
! 		volatile XLogInsertSlot *slot = &XLogCtl->XLogInsertSlots[lastslot];
  
! 		SpinLockAcquire(&slot->lck);
! 		if (XLogRecPtrIsInvalid(slot->CurrPos))
! 		{
! 			SpinLockRelease(&slot->lck);
! 			break;
! 		}
! 
! 		/* wait */
! 		MyProc->lwWaiting = true;
! 		MyProc->lwWaitMode = 0; /* doesn't matter */
! 		MyProc->lwWaitLink = NULL;
! 		if (slot->head == NULL)
! 			slot->head = MyProc;
! 		else
! 			slot->tail->lwWaitLink = MyProc;
! 		slot->tail = MyProc;
! 		SpinLockRelease(&slot->lck);
! 		LWLockRelease(WALInsertTailLock);
! 		for (;;)
! 		{
! 			PGSemaphoreLock(&MyProc->sem, false);
! 			if (!MyProc->lwWaiting)
! 				break;
! 			extraWaits++;
! 		}
! 		LWLockAcquire(WALInsertTailLock, LW_EXCLUSIVE);
! 		lastslot = Insert->lastslot;
! 		nextslot = Insert->nextslot;
  	}
  
! 	/*
! 	 * Ok, there is at least one empty slot now. That's enouugh for our
! 	 * insertion, but ẃhile we're at it, advance lastslot as much as we
! 	 * can. That way we don't need to come back here on the next call
! 	 * again.
! 	 */
! 	while (lastslot != nextslot)
! 	{
! 		volatile XLogInsertSlot *slot = &XLogCtl->XLogInsertSlots[lastslot];
! 		/*
! 		 * Don't need to grab the slot's spinlock here, because we're not
! 		 * interested in the exact value of CurrPos, only whether it's
! 		 * valid or not.
! 		 */
! 		if (!XLogRecPtrIsInvalid(slot->CurrPos))
! 			break;
  
! 		lastslot = NextSlotNo(lastslot);
! 	}
! 	Insert->lastslot = lastslot;
  
! 	LWLockRelease(WALInsertTailLock);
  }
  
  /*
***************
*** 1488,1522 **** XLogArchiveCleanup(const char *xlog)
  }
  
  /*
!  * Advance the Insert state to the next buffer page, writing out the next
!  * buffer if it still contains unwritten data.
!  *
!  * If new_segment is TRUE then we set up the next buffer page as the first
!  * page of the next xlog segment file, possibly but not usually the next
!  * consecutive file page.
!  *
!  * The global LogwrtRqst.Write pointer needs to be advanced to include the
!  * just-filled page.  If we can do this for free (without an extra lock),
!  * we do so here.  Otherwise the caller must do it.  We return TRUE if the
!  * request update still needs to be done, FALSE if we did it internally.
!  *
!  * Must be called with WALInsertLock held.
   */
! static bool
! AdvanceXLInsertBuffer(bool new_segment)
  {
  	XLogCtlInsert *Insert = &XLogCtl->Insert;
! 	XLogCtlWrite *Write = &XLogCtl->Write;
! 	int			nextidx = NextBufIdx(Insert->curridx);
! 	bool		update_needed = true;
  	XLogRecPtr	OldPageRqstPtr;
  	XLogwrtRqst WriteRqst;
! 	XLogRecPtr	NewPageEndPtr;
  	XLogPageHeader NewPage;
  
! 	/* Use Insert->LogwrtResult copy if it's more fresh */
! 	if (XLByteLT(LogwrtResult.Write, Insert->LogwrtResult.Write))
! 		LogwrtResult = Insert->LogwrtResult;
  
  	/*
  	 * Get ending-offset of the buffer page we need to replace (this may be
--- 2017,2050 ----
  }
  
  /*
!  * Initialize XLOG buffers, writing out old buffers if they still contain
!  * unwritten data, upto the page containing 'upto'. Or if 'opportunistic' is
!  * true, initialize as many pages as we can without having to write out
!  * unwritten data. Any new pages are initialized to zeros, with pages headers
!  * initialized properly.
   */
! static void
! AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
  {
  	XLogCtlInsert *Insert = &XLogCtl->Insert;
! 	int			nextidx;
  	XLogRecPtr	OldPageRqstPtr;
  	XLogwrtRqst WriteRqst;
! 	XLogRecPtr	NewPageEndPtr = InvalidXLogRecPtr;
  	XLogPageHeader NewPage;
+ 	bool		needflush;
+ 	int			npages = 0;
  
! 	LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
! 
! 	/*
! 	 * Now that we have the lock, check if someone initialized the page
! 	 * already.
! 	 */
! /* XXX: fix indentation before commit */
! while (!XLByteLT(upto, XLogCtl->xlblocks[XLogCtl->curridx]) || opportunistic)
! {
! 	nextidx = NextBufIdx(XLogCtl->curridx);
  
  	/*
  	 * Get ending-offset of the buffer page we need to replace (this may be
***************
*** 1524,1535 **** AdvanceXLInsertBuffer(bool new_segment)
  	 * written out.
  	 */
  	OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
- 	if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
- 	{
- 		/* nope, got work to do... */
- 		XLogRecPtr	FinishedPageRqstPtr;
  
! 		FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx];
  
  		/* Before waiting, get info_lck and update LogwrtResult */
  		{
--- 2052,2068 ----
  	 * written out.
  	 */
  	OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
  
! 	needflush = !XLByteLE(OldPageRqstPtr, LogwrtResult.Write);
! 
! 	if (needflush)
! 	{
! 		/*
! 		 * Nope, got work to do. If we just want to pre-initialize as much as
! 		 * we can without flushing, give up now.
! 		 */
! 		if (opportunistic)
! 			break;
  
  		/* Before waiting, get info_lck and update LogwrtResult */
  		{
***************
*** 1537,1581 **** AdvanceXLInsertBuffer(bool new_segment)
  			volatile XLogCtlData *xlogctl = XLogCtl;
  
  			SpinLockAcquire(&xlogctl->info_lck);
! 			if (XLByteLT(xlogctl->LogwrtRqst.Write, FinishedPageRqstPtr))
! 				xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr;
  			LogwrtResult = xlogctl->LogwrtResult;
  			SpinLockRelease(&xlogctl->info_lck);
  		}
  
! 		update_needed = false;	/* Did the shared-request update */
! 
! 		if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
! 		{
! 			/* OK, someone wrote it already */
! 			Insert->LogwrtResult = LogwrtResult;
! 		}
! 		else
  		{
! 			/* Must acquire write lock */
  			LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
! 			LogwrtResult = Write->LogwrtResult;
  			if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
  			{
  				/* OK, someone wrote it already */
  				LWLockRelease(WALWriteLock);
- 				Insert->LogwrtResult = LogwrtResult;
  			}
  			else
  			{
  				/*
! 				 * Have to write buffers while holding insert lock. This is
  				 * not good, so only write as much as we absolutely must.
  				 */
  				TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START();
  				WriteRqst.Write = OldPageRqstPtr;
  				WriteRqst.Flush.xlogid = 0;
  				WriteRqst.Flush.xrecoff = 0;
! 				XLogWrite(WriteRqst, false, false);
  				LWLockRelease(WALWriteLock);
- 				Insert->LogwrtResult = LogwrtResult;
  				TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
  			}
  		}
  	}
  
--- 2070,2119 ----
  			volatile XLogCtlData *xlogctl = XLogCtl;
  
  			SpinLockAcquire(&xlogctl->info_lck);
! 			if (XLByteLT(xlogctl->LogwrtRqst.Write, OldPageRqstPtr))
! 			{
! 				Assert(XLByteLE(OldPageRqstPtr, xlogctl->Insert.CurrPos));
! 				xlogctl->LogwrtRqst.Write = OldPageRqstPtr;
! 			}
  			LogwrtResult = xlogctl->LogwrtResult;
  			SpinLockRelease(&xlogctl->info_lck);
  		}
  
! 		if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
  		{
! 			/*
! 			 * Must acquire write lock. Release WALBufMappingLock first, to
! 			 * make sure that all insertions that we need to wait for can
! 			 * finish (up to this same position). Otherwise we risk deadlock.
! 			 */
! 			LWLockRelease(WALBufMappingLock);
! 
! 			WaitXLogInsertionsToFinish(OldPageRqstPtr, InvalidXLogRecPtr);
! 
  			LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
! 			LogwrtResult = XLogCtl->LogwrtResult;
  			if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
  			{
  				/* OK, someone wrote it already */
  				LWLockRelease(WALWriteLock);
  			}
  			else
  			{
  				/*
! 				 * Have to write buffers while holding mapping lock. This is
  				 * not good, so only write as much as we absolutely must.
  				 */
  				TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START();
  				WriteRqst.Write = OldPageRqstPtr;
  				WriteRqst.Flush.xlogid = 0;
  				WriteRqst.Flush.xrecoff = 0;
! 				XLogWrite(WriteRqst, false);
  				LWLockRelease(WALWriteLock);
  				TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
  			}
+ 			/* Re-acquire WALBufMappingLock and retry */
+ 			LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
+ 			continue;
  		}
  	}
  
***************
*** 1583,1596 **** AdvanceXLInsertBuffer(bool new_segment)
  	 * Now the next buffer slot is free and we can set it up to be the next
  	 * output page.
  	 */
! 	NewPageEndPtr = XLogCtl->xlblocks[Insert->curridx];
! 
! 	if (new_segment)
! 	{
! 		/* force it to a segment start point */
! 		NewPageEndPtr.xrecoff += XLogSegSize - 1;
! 		NewPageEndPtr.xrecoff -= NewPageEndPtr.xrecoff % XLogSegSize;
! 	}
  
  	if (NewPageEndPtr.xrecoff >= XLogFileSize)
  	{
--- 2121,2127 ----
  	 * Now the next buffer slot is free and we can set it up to be the next
  	 * output page.
  	 */
! 	NewPageEndPtr = XLogCtl->xlblocks[XLogCtl->curridx];
  
  	if (NewPageEndPtr.xrecoff >= XLogFileSize)
  	{
***************
*** 1600,1612 **** AdvanceXLInsertBuffer(bool new_segment)
  	}
  	else
  		NewPageEndPtr.xrecoff += XLOG_BLCKSZ;
! 	XLogCtl->xlblocks[nextidx] = NewPageEndPtr;
! 	NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
  
! 	Insert->curridx = nextidx;
! 	Insert->currpage = NewPage;
! 
! 	Insert->currpos = ((char *) NewPage) +SizeOfXLogShortPHD;
  
  	/*
  	 * Be sure to re-zero the buffer so that bytes beyond what we've written
--- 2131,2140 ----
  	}
  	else
  		NewPageEndPtr.xrecoff += XLOG_BLCKSZ;
! 	Assert(NewPageEndPtr.xrecoff % XLOG_BLCKSZ == 0);
! 	Assert(XLogRecEndPtrToBufIdx(NewPageEndPtr) == nextidx);
  
! 	NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
  
  	/*
  	 * Be sure to re-zero the buffer so that bytes beyond what we've written
***************
*** 1650,1660 **** AdvanceXLInsertBuffer(bool new_segment)
  		NewLongPage->xlp_seg_size = XLogSegSize;
  		NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
  		NewPage   ->xlp_info |= XLP_LONG_HEADER;
- 
- 		Insert->currpos = ((char *) NewPage) +SizeOfXLogLongPHD;
  	}
  
! 	return update_needed;
  }
  
  /*
--- 2178,2205 ----
  		NewLongPage->xlp_seg_size = XLogSegSize;
  		NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
  		NewPage   ->xlp_info |= XLP_LONG_HEADER;
  	}
  
! 	/*
! 	 * Make sure the initialization of the page becomes visible to others
! 	 * before the xlblocks update. GetXLogBuffer() reads xlblocks without
! 	 * holding a lock.
! 	 */
! 	pg_write_barrier();
! 
! 	*((volatile XLogRecPtr *) &XLogCtl->xlblocks[nextidx]) = NewPageEndPtr;
! 
! 	XLogCtl->curridx = nextidx;
! 
! 	npages++;
! }
! 	LWLockRelease(WALBufMappingLock);
! 
! #ifdef WAL_DEBUG
! 	if (npages > 0)
! 		elog(DEBUG1, "initialized %d pages, upto %X/%X",
! 			 npages, NewPageEndPtr.xlogid, NewPageEndPtr.xrecoff);
! #endif
  }
  
  /*
***************
*** 1699,1714 **** XLogCheckpointNeeded(uint32 logid, uint32 logseg)
   * This option allows us to avoid uselessly issuing multiple writes when a
   * single one would do.
   *
!  * If xlog_switch == TRUE, we are intending an xlog segment switch, so
!  * perform end-of-segment actions after writing the last page, even if
!  * it's not physically the end of its segment.  (NB: this will work properly
!  * only if caller specifies WriteRqst == page-end and flexible == false,
!  * and there is some data to write.)
!  *
!  * Must be called with WALWriteLock held.
   */
  static void
! XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
  {
  	XLogCtlWrite *Write = &XLogCtl->Write;
  	bool		ispartialpage;
--- 2244,2255 ----
   * This option allows us to avoid uselessly issuing multiple writes when a
   * single one would do.
   *
!  * Must be called with WALWriteLock held. And you must've called
!  * WaitXLogInsertionsToFinish(WriteRqst) before grabbing the lock to make sure
!  * the data is ready to write.
   */
  static void
! XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
  {
  	XLogCtlWrite *Write = &XLogCtl->Write;
  	bool		ispartialpage;
***************
*** 1726,1732 **** XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
  	/*
  	 * Update local LogwrtResult (caller probably did this already, but...)
  	 */
! 	LogwrtResult = Write->LogwrtResult;
  
  	/*
  	 * Since successive pages in the xlog cache are consecutively allocated,
--- 2267,2273 ----
  	/*
  	 * Update local LogwrtResult (caller probably did this already, but...)
  	 */
! 	LogwrtResult = XLogCtl->LogwrtResult;
  
  	/*
  	 * Since successive pages in the xlog cache are consecutively allocated,
***************
*** 1757,1770 **** XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
  		 * if we're passed a bogus WriteRqst.Write that is past the end of the
  		 * last page that's been initialized by AdvanceXLInsertBuffer.
  		 */
! 		if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[curridx]))
  			elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
  				 LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
! 				 XLogCtl->xlblocks[curridx].xlogid,
! 				 XLogCtl->xlblocks[curridx].xrecoff);
  
  		/* Advance LogwrtResult.Write to end of current buffer page */
! 		LogwrtResult.Write = XLogCtl->xlblocks[curridx];
  		ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write);
  
  		if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
--- 2298,2311 ----
  		 * if we're passed a bogus WriteRqst.Write that is past the end of the
  		 * last page that's been initialized by AdvanceXLInsertBuffer.
  		 */
! 		XLogRecPtr EndPtr = XLogCtl->xlblocks[curridx];
! 		if (!XLByteLT(LogwrtResult.Write, EndPtr))
  			elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
  				 LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
! 				 EndPtr.xlogid, EndPtr.xrecoff);
  
  		/* Advance LogwrtResult.Write to end of current buffer page */
! 		LogwrtResult.Write = EndPtr;
  		ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write);
  
  		if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
***************
*** 1861,1876 **** XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
  			 * later. Doing it here ensures that one and only one backend will
  			 * perform this fsync.
  			 *
- 			 * We also do this if this is the last page written for an xlog
- 			 * switch.
- 			 *
  			 * This is also the right place to notify the Archiver that the
  			 * segment is ready to copy to archival storage, and to update the
  			 * timer for archive_timeout, and to signal for a checkpoint if
  			 * too many logfile segments have been used since the last
  			 * checkpoint.
  			 */
! 			if (finishing_seg || (xlog_switch && last_iteration))
  			{
  				issue_xlog_fsync(openLogFile, openLogId, openLogSeg);
  				LogwrtResult.Flush = LogwrtResult.Write;		/* end of page */
--- 2402,2414 ----
  			 * later. Doing it here ensures that one and only one backend will
  			 * perform this fsync.
  			 *
  			 * This is also the right place to notify the Archiver that the
  			 * segment is ready to copy to archival storage, and to update the
  			 * timer for archive_timeout, and to signal for a checkpoint if
  			 * too many logfile segments have been used since the last
  			 * checkpoint.
  			 */
! 			if (finishing_seg)
  			{
  				issue_xlog_fsync(openLogFile, openLogId, openLogSeg);
  				LogwrtResult.Flush = LogwrtResult.Write;		/* end of page */
***************
*** 1960,1967 **** XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
  			xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
  		SpinLockRelease(&xlogctl->info_lck);
  	}
- 
- 	Write->LogwrtResult = LogwrtResult;
  }
  
  /*
--- 2498,2503 ----
***************
*** 2124,2131 **** XLogFlush(XLogRecPtr record)
  	 */
  	for (;;)
  	{
! 		/* use volatile pointer to prevent code rearrangement */
  		volatile XLogCtlData *xlogctl = XLogCtl;
  
  		/* read LogwrtResult and update local state */
  		SpinLockAcquire(&xlogctl->info_lck);
--- 2660,2670 ----
  	 */
  	for (;;)
  	{
! 		/* use volatile pointers to prevent code rearrangement */
  		volatile XLogCtlData *xlogctl = XLogCtl;
+ 		volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
+ 		uint32		freespace;
+ 		XLogRecPtr	insertpos;
  
  		/* read LogwrtResult and update local state */
  		SpinLockAcquire(&xlogctl->info_lck);
***************
*** 2139,2144 **** XLogFlush(XLogRecPtr record)
--- 2678,2712 ----
  			break;
  
  		/*
+ 		 * Get the current insert position.
+ 		 *
+ 		 * XXX: This used to do LWLockConditionalAcquire(WALInsertLock),
+ 		 * fall back to writing just up to 'record' if we couldn't get t
+ 		 * lock. I wonder if it would be a good idea to have a
+ 		 * SpinLockConditionalAcquire function and use that? On one hand
+ 		 * it would be good to not cause more contention on the lock if
+ 		 * busy, but on the other hand, this spinlock is much more light
+ 		 * than the WALInsertLock was, so maybe it's better to just grab
+ 		 * spinlock. Also note that if we stored the XLogRecPtr as one 6
+ 		 * integer, we could just read it with no lock on platforms wher
+ 		 * 64-bit integer accesses are atomic, which covers many common
+ 		 * platforms nowadays.
+ 		 */
+ 		SpinLockAcquire(&Insert->insertpos_lck);
+ 		insertpos = Insert->CurrPos;
+ 		SpinLockRelease(&Insert->insertpos_lck);
+ 
+ 		freespace = INSERT_FREESPACE(insertpos);
+ 		if (freespace < SizeOfXLogRecord)               /* buffer is full */
+ 			insertpos.xrecoff += freespace;
+ 
+ 		/*
+ 		 * Before actually performing the write, wait for all in-flight
+ 		 * insertions to the pages we're about to write to finish.
+ 		 */
+ 		insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr, insertpos);
+ 
+ 		/*
  		 * Try to get the write lock. If we can't get it immediately, wait
  		 * until it's released, and recheck if we still need to do the flush
  		 * or if the backend that held the lock did it for us already. This
***************
*** 2155,2186 **** XLogFlush(XLogRecPtr record)
  			continue;
  		}
  		/* Got the lock */
! 		LogwrtResult = XLogCtl->Write.LogwrtResult;
  		if (!XLByteLE(record, LogwrtResult.Flush))
  		{
! 			/* try to write/flush later additions to XLOG as well */
! 			if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE))
! 			{
! 				XLogCtlInsert *Insert = &XLogCtl->Insert;
! 				uint32		freespace = INSERT_FREESPACE(Insert);
  
! 				if (freespace < SizeOfXLogRecord)		/* buffer is full */
! 					WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
! 				else
! 				{
! 					WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
! 					WriteRqstPtr.xrecoff -= freespace;
! 				}
! 				LWLockRelease(WALInsertLock);
! 				WriteRqst.Write = WriteRqstPtr;
! 				WriteRqst.Flush = WriteRqstPtr;
! 			}
! 			else
! 			{
! 				WriteRqst.Write = WriteRqstPtr;
! 				WriteRqst.Flush = record;
! 			}
! 			XLogWrite(WriteRqst, false, false);
  		}
  		LWLockRelease(WALWriteLock);
  		/* done */
--- 2723,2735 ----
  			continue;
  		}
  		/* Got the lock */
! 		LogwrtResult = XLogCtl->LogwrtResult;
  		if (!XLByteLE(record, LogwrtResult.Flush))
  		{
! 			WriteRqst.Write = insertpos;
! 			WriteRqst.Flush = insertpos;
  
! 			XLogWrite(WriteRqst, false);
  		}
  		LWLockRelease(WALWriteLock);
  		/* done */
***************
*** 2292,2314 **** XLogBackgroundFlush(void)
  			 LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
  			 LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
  #endif
- 
  	START_CRIT_SECTION();
  
! 	/* now wait for the write lock */
  	LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
! 	LogwrtResult = XLogCtl->Write.LogwrtResult;
  	if (!XLByteLE(WriteRqstPtr, LogwrtResult.Flush))
  	{
  		XLogwrtRqst WriteRqst;
  
  		WriteRqst.Write = WriteRqstPtr;
  		WriteRqst.Flush = WriteRqstPtr;
! 		XLogWrite(WriteRqst, flexible, false);
  	}
! 	LWLockRelease(WALWriteLock);
  
  	END_CRIT_SECTION();
  }
  
  /*
--- 2841,2871 ----
  			 LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
  			 LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
  #endif
  	START_CRIT_SECTION();
  
! 	/* now wait for any in-progress insertions to finish and get write lock */
! 	WaitXLogInsertionsToFinish(WriteRqstPtr, InvalidXLogRecPtr);
  	LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
! 	LogwrtResult = XLogCtl->LogwrtResult;
  	if (!XLByteLE(WriteRqstPtr, LogwrtResult.Flush))
  	{
  		XLogwrtRqst WriteRqst;
  
  		WriteRqst.Write = WriteRqstPtr;
  		WriteRqst.Flush = WriteRqstPtr;
! 		XLogWrite(WriteRqst, flexible);
  	}
! 	LogwrtResult = XLogCtl->LogwrtResult;
  
  	END_CRIT_SECTION();
+ 
+ 	LWLockRelease(WALWriteLock);
+ 
+ 	/*
+ 	 * Great, done. To take some work off the critical path, try to initialize
+ 	 * as many of the no-longer-needed WAL buffers for future use as we can.
+ 	 */
+ 	AdvanceXLInsertBuffer(InvalidXLogRecPtr, true);
  }
  
  /*
***************
*** 5102,5107 **** XLOGShmemSize(void)
--- 5659,5667 ----
  	/* and the buffers themselves */
  	size = add_size(size, mul_size(XLOG_BLCKSZ, XLOGbuffers));
  
+ 	/* XLog insertion slots */
+ 	size = add_size(size, mul_size(sizeof(XLogInsertSlot), NumXLogInsertSlots));
+ 
  	/*
  	 * Note: we don't count ControlFileData, it comes out of the "slop factor"
  	 * added by CreateSharedMemoryAndSemaphores.  This lets us use this
***************
*** 5117,5122 **** XLOGShmemInit(void)
--- 5677,5683 ----
  	bool		foundCFile,
  				foundXLog;
  	char	   *allocptr;
+ 	int			i;
  
  	ControlFile = (ControlFileData *)
  		ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile);
***************
*** 5142,5147 **** XLOGShmemInit(void)
--- 5703,5721 ----
  	memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
  	allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
  
+ 	/* Initialize insertion slots */
+ 	XLogCtl->XLogInsertSlots = (XLogInsertSlot *) allocptr;
+ 	for (i = 0; i < NumXLogInsertSlots; i++)
+ 	{
+ 		XLogInsertSlot *slot = &XLogCtl->XLogInsertSlots[i];
+ 		slot->CurrPos = InvalidXLogRecPtr;
+ 		slot->head = slot->tail = NULL;
+ 		SpinLockInit(&slot->lck);
+ 	}
+ 	XLogCtl->Insert.nextslot = 1;
+ 	XLogCtl->Insert.lastslot = 0;
+ 	allocptr += sizeof(XLogInsertSlot) * NumXLogInsertSlots;
+ 
  	/*
  	 * Align the start of the page buffers to an ALIGNOF_XLOG_BUFFER boundary.
  	 */
***************
*** 5156,5166 **** XLOGShmemInit(void)
  	XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
  	XLogCtl->SharedRecoveryInProgress = true;
  	XLogCtl->SharedHotStandbyActive = false;
- 	XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
  	SpinLockInit(&XLogCtl->info_lck);
  	InitSharedLatch(&XLogCtl->recoveryWakeupLatch);
  	InitSharedLatch(&XLogCtl->WALWriterLatch);
  
  	/*
  	 * If we are not in bootstrap mode, pg_control should already exist. Read
  	 * and validate it immediately (see comments in ReadControlFile() for the
--- 5730,5741 ----
  	XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
  	XLogCtl->SharedRecoveryInProgress = true;
  	XLogCtl->SharedHotStandbyActive = false;
  	SpinLockInit(&XLogCtl->info_lck);
  	InitSharedLatch(&XLogCtl->recoveryWakeupLatch);
  	InitSharedLatch(&XLogCtl->WALWriterLatch);
  
+ 	SpinLockInit(&XLogCtl->Insert.insertpos_lck);
+ 
  	/*
  	 * If we are not in bootstrap mode, pg_control should already exist. Read
  	 * and validate it immediately (see comments in ReadControlFile() for the
***************
*** 6038,6043 **** StartupXLOG(void)
--- 6613,6619 ----
  	bool		backupEndRequired = false;
  	bool		backupFromStandby = false;
  	DBState		dbstate_at_startup;
+ 	int			firstIdx;
  
  	/*
  	 * Read control file and check XLOG status looks valid.
***************
*** 6844,6851 **** StartupXLOG(void)
  	openLogOff = 0;
  	Insert = &XLogCtl->Insert;
  	Insert->PrevRecord = LastRec;
! 	XLogCtl->xlblocks[0].xlogid = openLogId;
! 	XLogCtl->xlblocks[0].xrecoff =
  		((EndOfLog.xrecoff - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ;
  
  	/*
--- 7420,7431 ----
  	openLogOff = 0;
  	Insert = &XLogCtl->Insert;
  	Insert->PrevRecord = LastRec;
! 
! 	firstIdx = XLogRecPtrToBufIdx(EndOfLog);
! 	XLogCtl->curridx = firstIdx;
! 
! 	XLogCtl->xlblocks[firstIdx].xlogid = openLogId;
! 	XLogCtl->xlblocks[firstIdx].xrecoff =
  		((EndOfLog.xrecoff - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ;
  
  	/*
***************
*** 6853,6878 **** StartupXLOG(void)
  	 * record spans, not the one it starts in.	The last block is indeed the
  	 * one we want to use.
  	 */
! 	Assert(readOff == (XLogCtl->xlblocks[0].xrecoff - XLOG_BLCKSZ) % XLogSegSize);
! 	memcpy((char *) Insert->currpage, readBuf, XLOG_BLCKSZ);
! 	Insert->currpos = (char *) Insert->currpage +
! 		(EndOfLog.xrecoff + XLOG_BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
  
  	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
  
- 	XLogCtl->Write.LogwrtResult = LogwrtResult;
- 	Insert->LogwrtResult = LogwrtResult;
  	XLogCtl->LogwrtResult = LogwrtResult;
  
  	XLogCtl->LogwrtRqst.Write = EndOfLog;
  	XLogCtl->LogwrtRqst.Flush = EndOfLog;
  
! 	freespace = INSERT_FREESPACE(Insert);
  	if (freespace > 0)
  	{
  		/* Make sure rest of page is zero */
! 		MemSet(Insert->currpos, 0, freespace);
! 		XLogCtl->Write.curridx = 0;
  	}
  	else
  	{
--- 7433,7455 ----
  	 * record spans, not the one it starts in.	The last block is indeed the
  	 * one we want to use.
  	 */
! 	Assert(readOff == (XLogCtl->xlblocks[firstIdx].xrecoff - XLOG_BLCKSZ) % XLogSegSize);
! 	memcpy((char *) &XLogCtl->pages[firstIdx * XLOG_BLCKSZ], readBuf, XLOG_BLCKSZ);
! 	Insert->CurrPos = EndOfLog;
  
  	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
  
  	XLogCtl->LogwrtResult = LogwrtResult;
  
  	XLogCtl->LogwrtRqst.Write = EndOfLog;
  	XLogCtl->LogwrtRqst.Flush = EndOfLog;
  
! 	freespace = XLOG_BLCKSZ - EndRecPtr.xrecoff % XLOG_BLCKSZ;
  	if (freespace > 0)
  	{
  		/* Make sure rest of page is zero */
! 		MemSet(&XLogCtl->pages[firstIdx * XLOG_BLCKSZ] + EndRecPtr.xrecoff % XLOG_BLCKSZ, 0, freespace);
! 		XLogCtl->Write.curridx = firstIdx;
  	}
  	else
  	{
***************
*** 6884,6890 **** StartupXLOG(void)
  		 * this is sufficient.	The first actual attempt to insert a log
  		 * record will advance the insert state.
  		 */
! 		XLogCtl->Write.curridx = NextBufIdx(0);
  	}
  
  	/* Pre-scan prepared transactions to find out the range of XIDs present */
--- 7461,7467 ----
  		 * this is sufficient.	The first actual attempt to insert a log
  		 * record will advance the insert state.
  		 */
! 		XLogCtl->Write.curridx = NextBufIdx(firstIdx);
  	}
  
  	/* Pre-scan prepared transactions to find out the range of XIDs present */
***************
*** 7390,7396 **** GetRedoRecPtr(void)
   *
   * NOTE: The value *actually* returned is the position of the last full
   * xlog page. It lags behind the real insert position by at most 1 page.
!  * For that, we don't need to acquire WALInsertLock which can be quite
   * heavily contended, and an approximation is enough for the current
   * usage of this function.
   */
--- 7967,7973 ----
   *
   * NOTE: The value *actually* returned is the position of the last full
   * xlog page. It lags behind the real insert position by at most 1 page.
!  * For that, we don't need to acquire insertpos_lck which can be quite
   * heavily contended, and an approximation is enough for the current
   * usage of this function.
   */
***************
*** 7666,7671 **** CreateCheckPoint(int flags)
--- 8243,8249 ----
  	uint32		insert_logSeg;
  	TransactionId *inCommitXids;
  	int			nInCommit;
+ 	XLogRecPtr	curInsert;
  
  	/*
  	 * An end-of-recovery checkpoint is really a shutdown checkpoint, just
***************
*** 7734,7743 **** CreateCheckPoint(int flags)
  		checkPoint.oldestActiveXid = InvalidTransactionId;
  
  	/*
! 	 * We must hold WALInsertLock while examining insert state to determine
! 	 * the checkpoint REDO pointer.
  	 */
! 	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
  
  	/*
  	 * If this isn't a shutdown or forced checkpoint, and we have not switched
--- 8312,8321 ----
  		checkPoint.oldestActiveXid = InvalidTransactionId;
  
  	/*
! 	 * Determine the checkpoint REDO pointer.
  	 */
! 	SpinLockAcquire(&Insert->insertpos_lck);
! 	curInsert = Insert->CurrPos;
  
  	/*
  	 * If this isn't a shutdown or forced checkpoint, and we have not switched
***************
*** 7749,7755 **** CreateCheckPoint(int flags)
  	 * (Perhaps it'd make even more sense to checkpoint only when the previous
  	 * checkpoint record is in a different xlog page?)
  	 *
! 	 * While holding the WALInsertLock we find the current WAL insertion point
  	 * and compare that with the starting point of the last checkpoint, which
  	 * is the redo pointer. We use the redo pointer because the start and end
  	 * points of a checkpoint can be hundreds of files apart on large systems
--- 8327,8333 ----
  	 * (Perhaps it'd make even more sense to checkpoint only when the previous
  	 * checkpoint record is in a different xlog page?)
  	 *
! 	 * While holding insertpos_lck we find the current WAL insertion point
  	 * and compare that with the starting point of the last checkpoint, which
  	 * is the redo pointer. We use the redo pointer because the start and end
  	 * points of a checkpoint can be hundreds of files apart on large systems
***************
*** 7758,7772 **** CreateCheckPoint(int flags)
  	if ((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY |
  				  CHECKPOINT_FORCE)) == 0)
  	{
- 		XLogRecPtr	curInsert;
- 
- 		INSERT_RECPTR(curInsert, Insert, Insert->curridx);
  		XLByteToSeg(curInsert, insert_logId, insert_logSeg);
  		XLByteToSeg(ControlFile->checkPointCopy.redo, redo_logId, redo_logSeg);
  		if (insert_logId == redo_logId &&
  			insert_logSeg == redo_logSeg)
  		{
! 			LWLockRelease(WALInsertLock);
  			LWLockRelease(CheckpointLock);
  			END_CRIT_SECTION();
  			return;
--- 8336,8347 ----
  	if ((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY |
  				  CHECKPOINT_FORCE)) == 0)
  	{
  		XLByteToSeg(curInsert, insert_logId, insert_logSeg);
  		XLByteToSeg(ControlFile->checkPointCopy.redo, redo_logId, redo_logSeg);
  		if (insert_logId == redo_logId &&
  			insert_logSeg == redo_logSeg)
  		{
! 			SpinLockRelease(&Insert->insertpos_lck);
  			LWLockRelease(CheckpointLock);
  			END_CRIT_SECTION();
  			return;
***************
*** 7793,7806 **** CreateCheckPoint(int flags)
  	 * the buffer flush work.  Those XLOG records are logically after the
  	 * checkpoint, even though physically before it.  Got that?
  	 */
! 	freespace = INSERT_FREESPACE(Insert);
  	if (freespace < SizeOfXLogRecord)
! 	{
! 		(void) AdvanceXLInsertBuffer(false);
! 		/* OK to ignore update return flag, since we will do flush anyway */
! 		freespace = INSERT_FREESPACE(Insert);
! 	}
! 	INSERT_RECPTR(checkPoint.redo, Insert, Insert->curridx);
  
  	/*
  	 * Here we update the shared RedoRecPtr for future XLogInsert calls; this
--- 8368,8377 ----
  	 * the buffer flush work.  Those XLOG records are logically after the
  	 * checkpoint, even though physically before it.  Got that?
  	 */
! 	freespace = INSERT_FREESPACE(curInsert);
  	if (freespace < SizeOfXLogRecord)
! 		curInsert = AdvanceXLogRecPtrToNextPage(curInsert);
! 	checkPoint.redo = curInsert;
  
  	/*
  	 * Here we update the shared RedoRecPtr for future XLogInsert calls; this
***************
*** 7826,7832 **** CreateCheckPoint(int flags)
  	 * Now we can release WAL insert lock, allowing other xacts to proceed
  	 * while we are flushing disk buffers.
  	 */
! 	LWLockRelease(WALInsertLock);
  
  	/*
  	 * If enabled, log checkpoint start.  We postpone this until now so as not
--- 8397,8403 ----
  	 * Now we can release WAL insert lock, allowing other xacts to proceed
  	 * while we are flushing disk buffers.
  	 */
! 	SpinLockRelease(&Insert->insertpos_lck);
  
  	/*
  	 * If enabled, log checkpoint start.  We postpone this until now so as not
***************
*** 7846,7852 **** CreateCheckPoint(int flags)
  	 * we wait till he's out of his commit critical section before proceeding.
  	 * See notes in RecordTransactionCommit().
  	 *
! 	 * Because we've already released WALInsertLock, this test is a bit fuzzy:
  	 * it is possible that we will wait for xacts we didn't really need to
  	 * wait for.  But the delay should be short and it seems better to make
  	 * checkpoint take a bit longer than to hold locks longer than necessary.
--- 8417,8423 ----
  	 * we wait till he's out of his commit critical section before proceeding.
  	 * See notes in RecordTransactionCommit().
  	 *
! 	 * Because we've already released insertpos_lck, this test is a bit fuzzy:
  	 * it is possible that we will wait for xacts we didn't really need to
  	 * wait for.  But the delay should be short and it seems better to make
  	 * checkpoint take a bit longer than to hold locks longer than necessary.
***************
*** 8213,8227 **** CreateRestartPoint(int flags)
  	 * the number of segments replayed since last restartpoint, and request a
  	 * restartpoint if it exceeds checkpoint_segments.
  	 *
! 	 * You need to hold WALInsertLock and info_lck to update it, although
! 	 * during recovery acquiring WALInsertLock is just pro forma, because
! 	 * there is no other processes updating Insert.RedoRecPtr.
  	 */
! 	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
  	SpinLockAcquire(&xlogctl->info_lck);
  	xlogctl->Insert.RedoRecPtr = lastCheckPoint.redo;
  	SpinLockRelease(&xlogctl->info_lck);
! 	LWLockRelease(WALInsertLock);
  
  	/*
  	 * Prepare to accumulate statistics.
--- 8784,8798 ----
  	 * the number of segments replayed since last restartpoint, and request a
  	 * restartpoint if it exceeds checkpoint_segments.
  	 *
! 	 * Like in CreateCheckPoint(), you need both insertpos_lck and info_lck
! 	 * to update it, although during recovery acquiring insertpos_lck is just
! 	 * pro forma, because no WAL insertions are happening.
  	 */
! 	SpinLockAcquire(&xlogctl->Insert.insertpos_lck);
  	SpinLockAcquire(&xlogctl->info_lck);
  	xlogctl->Insert.RedoRecPtr = lastCheckPoint.redo;
  	SpinLockRelease(&xlogctl->info_lck);
! 	SpinLockRelease(&xlogctl->Insert.insertpos_lck);
  
  	/*
  	 * Prepare to accumulate statistics.
***************
*** 8414,8419 **** RequestXLogSwitch(void)
--- 8985,8991 ----
  {
  	XLogRecPtr	RecPtr;
  	XLogRecData rdata;
+ 	XLogwrtRqst FlushRqst;
  
  	/* XLOG SWITCH, alone among xlog record types, has no data */
  	rdata.buffer = InvalidBuffer;
***************
*** 8423,8428 **** RequestXLogSwitch(void)
--- 8995,9021 ----
  
  	RecPtr = XLogInsert(RM_XLOG_ID, XLOG_SWITCH, &rdata);
  
+ 	/* XXX: before this patch, TRACE_POSTGRESQL_XLOG_SWITCH was not called
+ 	 * if the xlog switch had no work to do, ie. if we were already at the
+ 	 * beginning of a new XLOG segment. You can check if RecPtr points to
+ 	 * beginning of a segment if you want to keep the distinction.
+ 	 */
+ 	TRACE_POSTGRESQL_XLOG_SWITCH();
+ 
+ 	/*
+ 	 * Flush through the end of the page containing XLOG_SWITCH, and
+ 	 * perform end-of-segment actions (eg, notifying archiver).
+ 	 */
+ 	WaitXLogInsertionsToFinish(RecPtr, InvalidXLogRecPtr);
+ 
+ 	LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
+ 	FlushRqst.Write = RecPtr;
+ 	FlushRqst.Flush = RecPtr;
+ 	START_CRIT_SECTION();
+ 	XLogWrite(FlushRqst, false);
+ 	END_CRIT_SECTION();
+ 	LWLockRelease(WALWriteLock);
+ 
  	return RecPtr;
  }
  
***************
*** 8501,8522 **** XLogReportParameters(void)
  /*
   * Update full_page_writes in shared memory, and write an
   * XLOG_FPW_CHANGE record if necessary.
   */
  void
  UpdateFullPageWrites(void)
  {
! 	XLogCtlInsert *Insert = &XLogCtl->Insert;
  
  	/*
  	 * Do nothing if full_page_writes has not been changed.
  	 *
  	 * It's safe to check the shared full_page_writes without the lock,
! 	 * because we can guarantee that there is no concurrently running
! 	 * process which can update it.
  	 */
  	if (fullPageWrites == Insert->fullPageWrites)
  		return;
  
  	/*
  	 * Write an XLOG_FPW_CHANGE record. This allows us to keep
  	 * track of full_page_writes during archive recovery, if required.
--- 9094,9134 ----
  /*
   * Update full_page_writes in shared memory, and write an
   * XLOG_FPW_CHANGE record if necessary.
+  *
+  * Note: this function assumes there is no other process running
+  * concurrently that could update it.
   */
  void
  UpdateFullPageWrites(void)
  {
! 	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
  
  	/*
  	 * Do nothing if full_page_writes has not been changed.
  	 *
  	 * It's safe to check the shared full_page_writes without the lock,
! 	 * because we assume that there is no concurrently running process
! 	 * which can update it.
  	 */
  	if (fullPageWrites == Insert->fullPageWrites)
  		return;
  
+ 	START_CRIT_SECTION();
+ 
+ 	/*
+ 	 * It's always safe to take full page images, even when not strictly
+ 	 * required, but not the other round. So if we're setting full_page_writes
+ 	 * to true, first set it true and then write the WAL record. If we're
+ 	 * setting it to false, first write the WAL record and then set the
+ 	 * global flag.
+ 	 */
+ 	if (fullPageWrites)
+ 	{
+ 		SpinLockAcquire(&Insert->insertpos_lck);
+ 		Insert->fullPageWrites = true;
+ 		SpinLockRelease(&Insert->insertpos_lck);
+ 	}
+ 
  	/*
  	 * Write an XLOG_FPW_CHANGE record. This allows us to keep
  	 * track of full_page_writes during archive recovery, if required.
***************
*** 8532,8543 **** UpdateFullPageWrites(void)
  
  		XLogInsert(RM_XLOG_ID, XLOG_FPW_CHANGE, &rdata);
  	}
! 	else
  	{
! 		LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
! 		Insert->fullPageWrites = fullPageWrites;
! 		LWLockRelease(WALInsertLock);
  	}
  }
  
  /*
--- 9144,9157 ----
  
  		XLogInsert(RM_XLOG_ID, XLOG_FPW_CHANGE, &rdata);
  	}
! 
! 	if (!fullPageWrites)
  	{
! 		SpinLockAcquire(&Insert->insertpos_lck);
! 		Insert->fullPageWrites = false;
! 		SpinLockRelease(&Insert->insertpos_lck);
  	}
+ 	END_CRIT_SECTION();
  }
  
  /*
***************
*** 9063,9068 **** issue_xlog_fsync(int fd, uint32 log, uint32 seg)
--- 9677,9683 ----
  XLogRecPtr
  do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
  {
+ 	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
  	bool		exclusive = (labelfile == NULL);
  	bool		backup_started_in_recovery = false;
  	XLogRecPtr	checkpointloc;
***************
*** 9125,9150 **** do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
  	 * Note that forcePageWrites has no effect during an online backup from
  	 * the standby.
  	 *
! 	 * We must hold WALInsertLock to change the value of forcePageWrites, to
  	 * ensure adequate interlocking against XLogInsert().
  	 */
! 	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
  	if (exclusive)
  	{
! 		if (XLogCtl->Insert.exclusiveBackup)
  		{
! 			LWLockRelease(WALInsertLock);
  			ereport(ERROR,
  					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
  					 errmsg("a backup is already in progress"),
  					 errhint("Run pg_stop_backup() and try again.")));
  		}
! 		XLogCtl->Insert.exclusiveBackup = true;
  	}
  	else
! 		XLogCtl->Insert.nonExclusiveBackups++;
! 	XLogCtl->Insert.forcePageWrites = true;
! 	LWLockRelease(WALInsertLock);
  
  	/* Ensure we release forcePageWrites if fail below */
  	PG_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) BoolGetDatum(exclusive));
--- 9740,9765 ----
  	 * Note that forcePageWrites has no effect during an online backup from
  	 * the standby.
  	 *
! 	 * We must hold insertpos_lck to change the value of forcePageWrites, to
  	 * ensure adequate interlocking against XLogInsert().
  	 */
! 	SpinLockAcquire(&Insert->insertpos_lck);
  	if (exclusive)
  	{
! 		if (Insert->exclusiveBackup)
  		{
! 			SpinLockRelease(&Insert->insertpos_lck);
  			ereport(ERROR,
  					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
  					 errmsg("a backup is already in progress"),
  					 errhint("Run pg_stop_backup() and try again.")));
  		}
! 		Insert->exclusiveBackup = true;
  	}
  	else
! 		Insert->nonExclusiveBackups++;
! 	Insert->forcePageWrites = true;
! 	SpinLockRelease(&Insert->insertpos_lck);
  
  	/* Ensure we release forcePageWrites if fail below */
  	PG_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) BoolGetDatum(exclusive));
***************
*** 9257,9269 **** do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
  			 * taking a checkpoint right after another is not that expensive
  			 * either because only few buffers have been dirtied yet.
  			 */
! 			LWLockAcquire(WALInsertLock, LW_SHARED);
! 			if (XLByteLT(XLogCtl->Insert.lastBackupStart, startpoint))
  			{
! 				XLogCtl->Insert.lastBackupStart = startpoint;
  				gotUniqueStartpoint = true;
  			}
! 			LWLockRelease(WALInsertLock);
  		} while (!gotUniqueStartpoint);
  
  		XLByteToSeg(startpoint, _logId, _logSeg);
--- 9872,9884 ----
  			 * taking a checkpoint right after another is not that expensive
  			 * either because only few buffers have been dirtied yet.
  			 */
! 			SpinLockAcquire(&Insert->insertpos_lck);
! 			if (XLByteLT(Insert->lastBackupStart, startpoint))
  			{
! 				Insert->lastBackupStart = startpoint;
  				gotUniqueStartpoint = true;
  			}
! 			SpinLockRelease(&Insert->insertpos_lck);
  		} while (!gotUniqueStartpoint);
  
  		XLByteToSeg(startpoint, _logId, _logSeg);
***************
*** 9347,9356 **** do_pg_start_backup(const char *backupidstr, bool fast, char **labelfile)
  static void
  pg_start_backup_callback(int code, Datum arg)
  {
  	bool		exclusive = DatumGetBool(arg);
  
  	/* Update backup counters and forcePageWrites on failure */
! 	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
  	if (exclusive)
  	{
  		Assert(XLogCtl->Insert.exclusiveBackup);
--- 9962,9972 ----
  static void
  pg_start_backup_callback(int code, Datum arg)
  {
+ 	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
  	bool		exclusive = DatumGetBool(arg);
  
  	/* Update backup counters and forcePageWrites on failure */
! 	SpinLockAcquire(&Insert->insertpos_lck);
  	if (exclusive)
  	{
  		Assert(XLogCtl->Insert.exclusiveBackup);
***************
*** 9367,9373 **** pg_start_backup_callback(int code, Datum arg)
  	{
  		XLogCtl->Insert.forcePageWrites = false;
  	}
! 	LWLockRelease(WALInsertLock);
  }
  
  /*
--- 9983,9989 ----
  	{
  		XLogCtl->Insert.forcePageWrites = false;
  	}
! 	SpinLockRelease(&Insert->insertpos_lck);
  }
  
  /*
***************
*** 9380,9385 **** pg_start_backup_callback(int code, Datum arg)
--- 9996,10002 ----
  XLogRecPtr
  do_pg_stop_backup(char *labelfile, bool waitforarchive)
  {
+ 	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
  	bool		exclusive = (labelfile == NULL);
  	bool		backup_started_in_recovery = false;
  	XLogRecPtr	startpoint;
***************
*** 9433,9441 **** do_pg_stop_backup(char *labelfile, bool waitforarchive)
  	/*
  	 * OK to update backup counters and forcePageWrites
  	 */
! 	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
  	if (exclusive)
! 		XLogCtl->Insert.exclusiveBackup = false;
  	else
  	{
  		/*
--- 10050,10058 ----
  	/*
  	 * OK to update backup counters and forcePageWrites
  	 */
! 	SpinLockAcquire(&Insert->insertpos_lck);
  	if (exclusive)
! 		Insert->exclusiveBackup = false;
  	else
  	{
  		/*
***************
*** 9444,9459 **** do_pg_stop_backup(char *labelfile, bool waitforarchive)
  		 * backups, it is expected that each do_pg_start_backup() call is
  		 * matched by exactly one do_pg_stop_backup() call.
  		 */
! 		Assert(XLogCtl->Insert.nonExclusiveBackups > 0);
! 		XLogCtl->Insert.nonExclusiveBackups--;
  	}
  
! 	if (!XLogCtl->Insert.exclusiveBackup &&
! 		XLogCtl->Insert.nonExclusiveBackups == 0)
  	{
! 		XLogCtl->Insert.forcePageWrites = false;
  	}
! 	LWLockRelease(WALInsertLock);
  
  	if (exclusive)
  	{
--- 10061,10076 ----
  		 * backups, it is expected that each do_pg_start_backup() call is
  		 * matched by exactly one do_pg_stop_backup() call.
  		 */
! 		Assert(Insert->nonExclusiveBackups > 0);
! 		Insert->nonExclusiveBackups--;
  	}
  
! 	if (!Insert->exclusiveBackup &&
! 		Insert->nonExclusiveBackups == 0)
  	{
! 		Insert->forcePageWrites = false;
  	}
! 	SpinLockRelease(&Insert->insertpos_lck);
  
  	if (exclusive)
  	{
***************
*** 9731,9746 **** do_pg_stop_backup(char *labelfile, bool waitforarchive)
  void
  do_pg_abort_backup(void)
  {
! 	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
! 	Assert(XLogCtl->Insert.nonExclusiveBackups > 0);
! 	XLogCtl->Insert.nonExclusiveBackups--;
  
! 	if (!XLogCtl->Insert.exclusiveBackup &&
! 		XLogCtl->Insert.nonExclusiveBackups == 0)
  	{
! 		XLogCtl->Insert.forcePageWrites = false;
  	}
! 	LWLockRelease(WALInsertLock);
  }
  
  /*
--- 10348,10365 ----
  void
  do_pg_abort_backup(void)
  {
! 	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
  
! 	SpinLockAcquire(&Insert->insertpos_lck);
! 	Assert(Insert->nonExclusiveBackups > 0);
! 	Insert->nonExclusiveBackups--;
! 
! 	if (!Insert->exclusiveBackup &&
! 		Insert->nonExclusiveBackups == 0)
  	{
! 		Insert->forcePageWrites = false;
  	}
! 	SpinLockRelease(&Insert->insertpos_lck);
  }
  
  /*
***************
*** 9794,9805 **** GetStandbyFlushRecPtr(void)
  XLogRecPtr
  GetXLogInsertRecPtr(void)
  {
! 	XLogCtlInsert *Insert = &XLogCtl->Insert;
  	XLogRecPtr	current_recptr;
  
! 	LWLockAcquire(WALInsertLock, LW_SHARED);
! 	INSERT_RECPTR(current_recptr, Insert, Insert->curridx);
! 	LWLockRelease(WALInsertLock);
  
  	return current_recptr;
  }
--- 10413,10424 ----
  XLogRecPtr
  GetXLogInsertRecPtr(void)
  {
! 	volatile XLogCtlInsert *Insert = &XLogCtl->Insert;
  	XLogRecPtr	current_recptr;
  
! 	SpinLockAcquire(&Insert->insertpos_lck);
! 	current_recptr = Insert->CurrPos;
! 	SpinLockRelease(&Insert->insertpos_lck);
  
  	return current_recptr;
  }
*** a/src/backend/storage/lmgr/spin.c
--- b/src/backend/storage/lmgr/spin.c
***************
*** 56,61 **** SpinlockSemas(void)
--- 56,64 ----
  	 *
  	 * For now, though, we just need a few spinlocks (10 should be plenty)
  	 * plus one for each LWLock and one for each buffer header.
+ 	 *
+ 	 * XXX: remember to adjust this for the number of spinlocks needed by the
+ 	 * xlog.c changes before committing!
  	 */
  	return NumLWLocks() + NBuffers + 10;
  }
*** a/src/include/storage/lwlock.h
--- b/src/include/storage/lwlock.h
***************
*** 53,59 **** typedef enum LWLockId
  	ProcArrayLock,
  	SInvalReadLock,
  	SInvalWriteLock,
! 	WALInsertLock,
  	WALWriteLock,
  	ControlFileLock,
  	CheckpointLock,
--- 53,59 ----
  	ProcArrayLock,
  	SInvalReadLock,
  	SInvalWriteLock,
! 	WALBufMappingLock,
  	WALWriteLock,
  	ControlFileLock,
  	CheckpointLock,
***************
*** 79,84 **** typedef enum LWLockId
--- 79,85 ----
  	SerializablePredicateLockListLock,
  	OldSerXidLock,
  	SyncRepLock,
+ 	WALInsertTailLock,
  	/* Individual lock IDs end here */
  	FirstBufMappingLock,
  	FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
