*** a/doc/src/sgml/config.sgml
--- b/doc/src/sgml/config.sgml
***************
*** 1823,1828 **** archive_command = 'copy "%p" "C:\\server\\archivedir\\%f"'  # Windows
--- 1823,1856 ----
         </para>
         </listitem>
        </varlistentry>
+ 
+       <varlistentry id="guc-replication-lag-segments" xreflabel="replication_lag_segments">
+        <term><varname>standby_keep_segments</varname> (<type>integer</type>)</term>
+        <indexterm>
+         <primary><varname>standby_keep_segments</> configuration parameter</primary>
+        </indexterm>
+        <listitem>
+        <para>
+         Specifies the number of log file segments kept in <filename>pg_xlog</>
+         directory, in case a standby server needs to fetch them via streaming
+         replciation. Each segment is normally 16 megabytes. If a standby
+         server connected to the primary falls behind more than
+         <varname>standby_keep_segments</> segments, the primary might remove
+         a WAL segment still needed by the standby and the replication
+         connection will be terminated.
+ 
+         This sets only the minimum number of segments retained for standby
+         purposes, the system might need to retain more segments for WAL
+         archival or to recover from a checkpoint. If standby_keep_segments
+         is zero (the default), the system doesn't keep any extra segments
+         for standby purposes, and the number of old WAL segments available
+         for standbys is determined based only on the location of the previous
+         checkpoint and status of WAL archival.
+         This parameter can only be set in the <filename>postgresql.conf</>
+         file or on the server command line.
+        </para>
+        </listitem>
+       </varlistentry>
       </variablelist>
      </sect2>
      <sect2 id="runtime-config-standby">
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 66,71 ****
--- 66,72 ----
  
  /* User-settable parameters */
  int			CheckPointSegments = 3;
+ int			StandbySegments = 0;
  int			XLOGbuffers = 8;
  int			XLogArchiveTimeout = 0;
  bool		XLogArchiveMode = false;
***************
*** 356,361 **** typedef struct XLogCtlData
--- 357,364 ----
  	uint32		ckptXidEpoch;	/* nextXID & epoch of latest checkpoint */
  	TransactionId ckptXid;
  	XLogRecPtr	asyncCommitLSN; /* LSN of newest async commit */
+ 	uint32		lastRemovedLog;	/* latest removed/recycled XLOG segment */
+ 	uint32		lastRemovedSeg;
  
  	/* Protected by WALWriteLock: */
  	XLogCtlWrite Write;
***************
*** 3150,3155 **** PreallocXlogFiles(XLogRecPtr endptr)
--- 3153,3174 ----
  }
  
  /*
+  * Get the log/seg of the latest removed or recycled WAL segment.
+  * Returns 0 if no WAL segments have been removed since startup.
+  */
+ void
+ XLogGetLastRemoved(uint32 *log, uint32 *seg)
+ {
+ 	/* use volatile pointer to prevent code rearrangement */
+ 	volatile XLogCtlData *xlogctl = XLogCtl;
+ 
+ 	SpinLockAcquire(&xlogctl->info_lck);
+ 	*log = xlogctl->lastRemovedLog;
+ 	*seg = xlogctl->lastRemovedSeg;
+ 	SpinLockRelease(&xlogctl->info_lck);
+ }
+ 
+ /*
   * Recycle or remove all log files older or equal to passed log/seg#
   *
   * endptr is current (or recent) end of xlog; this is used to determine
***************
*** 3170,3175 **** RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr)
--- 3189,3208 ----
  	char		newpath[MAXPGPATH];
  #endif
  	struct stat statbuf;
+ 	/* use volatile pointer to prevent code rearrangement */
+ 	volatile XLogCtlData *xlogctl = XLogCtl;
+ 
+ 	/* Update the last removed location in shared memory first */
+ 	SpinLockAcquire(&xlogctl->info_lck);
+ 	if (log > xlogctl->lastRemovedLog ||
+ 		(log == xlogctl->lastRemovedLog && seg > xlogctl->lastRemovedSeg))
+ 	{
+ 		xlogctl->lastRemovedLog = log;
+ 		xlogctl->lastRemovedSeg = seg;
+ 	}
+ 	SpinLockRelease(&xlogctl->info_lck);
+ 
+ 	elog(DEBUG1, "removing WAL segments older than %X/%X", log, seg);
  
  	/*
  	 * Initialize info about where to try to recycle to.  We allow recycling
***************
*** 7101,7136 **** CreateCheckPoint(int flags)
  	smgrpostckpt();
  
  	/*
! 	 * If there's connected standby servers doing XLOG streaming, don't delete
! 	 * XLOG files that have not been streamed to all of them yet. This does
! 	 * nothing to prevent them from being deleted when the standby is
! 	 * disconnected (e.g because of network problems), but at least it avoids
! 	 * an open replication connection from failing because of that.
  	 */
! 	if ((_logId || _logSeg) && max_wal_senders > 0)
  	{
! 		XLogRecPtr	oldest;
! 		uint32		log;
! 		uint32		seg;
! 
! 		oldest = GetOldestWALSendPointer();
! 		if (oldest.xlogid != 0 || oldest.xrecoff != 0)
  		{
! 			XLByteToSeg(oldest, log, seg);
  			if (log < _logId || (log == _logId && seg < _logSeg))
  			{
  				_logId = log;
  				_logSeg = seg;
  			}
  		}
- 	}
  
- 	/*
- 	 * Delete old log files (those no longer needed even for previous
- 	 * checkpoint or the standbys in XLOG streaming).
- 	 */
- 	if (_logId || _logSeg)
- 	{
  		PrevLogSeg(_logId, _logSeg);
  		RemoveOldXlogFiles(_logId, _logSeg, recptr);
  	}
--- 7134,7184 ----
  	smgrpostckpt();
  
  	/*
! 	 * Delete old log files (those no longer needed even for previous
! 	 * checkpoint or the standbys in XLOG streaming).
  	 */
! 	if (_logId || _logSeg)
  	{
! 		/*
! 		 * Calculate the last segment that we need to retain because of
! 		 * standby_keep_segments, by subtracting StandbySegments from the
! 		 * new checkpoint location.
! 		 */
! 		if (StandbySegments > 0)
  		{
! 			uint32		log;
! 			uint32		seg;
! 			int			d_log;
! 			int			d_seg;
! 
! 			XLByteToSeg(recptr, log, seg);
! 
! 			d_seg = StandbySegments % XLogSegsPerFile;
! 			d_log = StandbySegments / XLogSegsPerFile;
! 			if (seg < d_seg)
! 			{
! 				d_log += 1;
! 				seg = seg - d_seg + XLogSegsPerFile;
! 			}
! 			else
! 				seg = seg - d_seg;
! 			/* avoid underflow, don't go below (0,1) */
! 			if (log < d_log || (log == d_log && seg == 0))
! 			{
! 				log = 0;
! 				seg = 1;
! 			}
! 			else
! 				log = log - d_log;
! 
! 			/* don't delete WAL segments newer than the calculated segment */
  			if (log < _logId || (log == _logId && seg < _logSeg))
  			{
  				_logId = log;
  				_logSeg = seg;
  			}
  		}
  
  		PrevLogSeg(_logId, _logSeg);
  		RemoveOldXlogFiles(_logId, _logSeg, recptr);
  	}
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 508,513 **** XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
--- 508,517 ----
  {
  	char		path[MAXPGPATH];
  	uint32		startoff;
+ 	uint32		lastRemovedLog;
+ 	uint32		lastRemovedSeg;
+ 	uint32		log;
+ 	uint32		seg;
  
  	while (nbytes > 0)
  	{
***************
*** 527,536 **** XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
  
  			sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
  			if (sendFile < 0)
! 				ereport(FATAL,	/* XXX: Why FATAL? */
! 						(errcode_for_file_access(),
! 						 errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
! 								path, sendId, sendSeg)));
  			sendOff = 0;
  		}
  
--- 531,557 ----
  
  			sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
  			if (sendFile < 0)
! 			{
! 				/*
! 				 * If the file is not found, assume it's because the
! 				 * standby asked for a too old WAL segment that has already
! 				 * been removed or recycled.
! 				 */
! 				if (errno == ENOENT)
! 				{
! 					char filename[MAXFNAMELEN];
! 					XLogFileName(filename, ThisTimeLineID, sendId, sendSeg);
! 					ereport(ERROR,
! 							(errcode_for_file_access(),
! 							 errmsg("requested WAL segment %s has already been removed",
! 									filename)));
! 				}
! 				else
! 					ereport(ERROR,
! 							(errcode_for_file_access(),
! 							 errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
! 									path, sendId, sendSeg)));
! 			}
  			sendOff = 0;
  		}
  
***************
*** 538,544 **** XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
  		if (sendOff != startoff)
  		{
  			if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
! 				ereport(FATAL,
  						(errcode_for_file_access(),
  						 errmsg("could not seek in log file %u, segment %u to offset %u: %m",
  								sendId, sendSeg, startoff)));
--- 559,565 ----
  		if (sendOff != startoff)
  		{
  			if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
! 				ereport(ERROR,
  						(errcode_for_file_access(),
  						 errmsg("could not seek in log file %u, segment %u to offset %u: %m",
  								sendId, sendSeg, startoff)));
***************
*** 553,559 **** XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
  
  		readbytes = read(sendFile, buf, segbytes);
  		if (readbytes <= 0)
! 			ereport(FATAL,
  					(errcode_for_file_access(),
  			errmsg("could not read from log file %u, segment %u, offset %u, "
  				   "length %lu: %m",
--- 574,580 ----
  
  		readbytes = read(sendFile, buf, segbytes);
  		if (readbytes <= 0)
! 			ereport(ERROR,
  					(errcode_for_file_access(),
  			errmsg("could not read from log file %u, segment %u, offset %u, "
  				   "length %lu: %m",
***************
*** 566,571 **** XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
--- 587,612 ----
  		nbytes -= readbytes;
  		buf += readbytes;
  	}
+ 
+ 	/*
+ 	 * After reading into the buffer, check that what we read was valid.
+ 	 * We do this after reading, because even though the segment was present
+ 	 * when we opened it, it might get recycled or removed while we read it.
+ 	 * The read() succeeds in that case, but the data we tried to read might
+ 	 * already have been overwritten with new WAL records.
+ 	 */
+ 	XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg);
+ 	XLByteToPrevSeg(recptr, log, seg);
+ 	if (log < lastRemovedLog ||
+ 		(log == lastRemovedLog && seg <= lastRemovedSeg))
+ 	{
+ 		char filename[MAXFNAMELEN];
+ 		XLogFileName(filename, ThisTimeLineID, log, seg);
+ 		ereport(ERROR,
+ 				(errcode_for_file_access(),
+ 				 errmsg("requested WAL segment %s has already been removed",
+ 						filename)));
+ 	}
  }
  
  /*
*** a/src/backend/utils/misc/guc.c
--- b/src/backend/utils/misc/guc.c
***************
*** 1648,1653 **** static struct config_int ConfigureNamesInt[] =
--- 1648,1662 ----
  	},
  
  	{
+ 		{"standby_keep_segments", PGC_SIGHUP, WAL_CHECKPOINTS,
+ 			gettext_noop("Sets the number of WAL files held for standby servers"),
+ 			NULL
+ 		},
+ 		&StandbySegments,
+ 		0, 0, INT_MAX, NULL, NULL
+ 	},
+ 
+ 	{
  		{"checkpoint_segments", PGC_SIGHUP, WAL_CHECKPOINTS,
  			gettext_noop("Sets the maximum distance in log segments between automatic WAL checkpoints."),
  			NULL
*** a/src/backend/utils/misc/postgresql.conf.sample
--- b/src/backend/utils/misc/postgresql.conf.sample
***************
*** 193,198 ****
--- 193,199 ----
  
  #max_wal_senders = 0		# max number of walsender processes
  #wal_sender_delay = 200ms	# 1-10000 milliseconds
+ #standby_keep_segments = 0	# in logfile segments, 16MB each; 0 disables
  
  
  #------------------------------------------------------------------------------
*** a/src/include/access/xlog.h
--- b/src/include/access/xlog.h
***************
*** 187,192 **** extern XLogRecPtr XactLastRecEnd;
--- 187,193 ----
  
  /* these variables are GUC parameters related to XLOG */
  extern int	CheckPointSegments;
+ extern int	StandbySegments;
  extern int	XLOGbuffers;
  extern bool XLogArchiveMode;
  extern char *XLogArchiveCommand;
***************
*** 267,272 **** extern int XLogFileInit(uint32 log, uint32 seg,
--- 268,274 ----
  extern int	XLogFileOpen(uint32 log, uint32 seg);
  
  
+ extern void XLogGetLastRemoved(uint32 *log, uint32 *seg);
  extern void XLogSetAsyncCommitLSN(XLogRecPtr record);
  
  extern void RestoreBkpBlocks(XLogRecPtr lsn, XLogRecord *record, bool cleanup);
