diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 40ec0e1..e2a2ebb 100644
*** a/src/bin/pg_basebackup/pg_basebackup.c
--- b/src/bin/pg_basebackup/pg_basebackup.c
*************** LogStreamerMain(logstreamer_param *param
*** 480,485 ****
--- 480,490 ----
  	stream.timeline = param->timeline;
  	stream.sysidentifier = param->sysidentifier;
  	stream.stream_stop = reached_end_position;
+ #ifndef WIN32
+ 	stream.stop_socket = bgpipe[0];
+ #else
+ 	stream.stop_socket = PGINVALID_SOCKET;
+ #endif
  	stream.standby_message_timeout = standby_message_timeout;
  	stream.synchronous = false;
  	stream.do_sync = do_sync;
diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c
index 1a9fe81..09385c5 100644
*** a/src/bin/pg_basebackup/pg_receivewal.c
--- b/src/bin/pg_basebackup/pg_receivewal.c
*************** StreamLog(void)
*** 409,414 ****
--- 409,415 ----
  				stream.timeline);
  
  	stream.stream_stop = stop_streaming;
+ 	stream.stop_socket = PGINVALID_SOCKET;
  	stream.standby_message_timeout = standby_message_timeout;
  	stream.synchronous = synchronous;
  	stream.do_sync = true;
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index 8511e57..c41bba2 100644
*** a/src/bin/pg_basebackup/receivelog.c
--- b/src/bin/pg_basebackup/receivelog.c
*************** static bool still_sending = true;		/* fe
*** 39,46 ****
  
  static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
  				 XLogRecPtr *stoppos);
! static int	CopyStreamPoll(PGconn *conn, long timeout_ms);
! static int	CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
  static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
  					int len, XLogRecPtr blockpos, TimestampTz *last_status);
  static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
--- 39,47 ----
  
  static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
  				 XLogRecPtr *stoppos);
! static int	CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket);
! static int CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
! 				  char **buffer);
  static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
  					int len, XLogRecPtr blockpos, TimestampTz *last_status);
  static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
*************** CheckServerVersionForStreaming(PGconn *c
*** 417,424 ****
--- 418,432 ----
   * return. As long as it returns false, streaming will continue
   * indefinitely.
   *
+  * If stream_stop() checks for external input, stop_socket should be set to
+  * the FD it checks.  This will allow such input to be detected promptly
+  * rather than after standby_message_timeout (which might be indefinite).
+  * Note that signals will interrupt waits for input as well, but that is
+  * race-y since a signal received while busy won't interrupt the wait.
+  *
   * standby_message_timeout controls how often we send a message
   * back to the master letting it know our progress, in milliseconds.
+  * Zero means no messages are sent.
   * This message will only contain the write location, and never
   * flush or replay.
   *
*************** HandleCopyStream(PGconn *conn, StreamCtl
*** 825,831 ****
  		sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
  												 last_status);
  
! 		r = CopyStreamReceive(conn, sleeptime, &copybuf);
  		while (r != 0)
  		{
  			if (r == -1)
--- 833,839 ----
  		sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
  												 last_status);
  
! 		r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, &copybuf);
  		while (r != 0)
  		{
  			if (r == -1)
*************** HandleCopyStream(PGconn *conn, StreamCtl
*** 870,876 ****
  			 * Process the received data, and any subsequent data we can read
  			 * without blocking.
  			 */
! 			r = CopyStreamReceive(conn, 0, &copybuf);
  		}
  	}
  
--- 878,884 ----
  			 * Process the received data, and any subsequent data we can read
  			 * without blocking.
  			 */
! 			r = CopyStreamReceive(conn, 0, stream->stop_socket, &copybuf);
  		}
  	}
  
*************** error:
*** 881,900 ****
  }
  
  /*
!  * Wait until we can read CopyData message, or timeout.
   *
   * Returns 1 if data has become available for reading, 0 if timed out
!  * or interrupted by signal, and -1 on an error.
   */
  static int
! CopyStreamPoll(PGconn *conn, long timeout_ms)
  {
  	int			ret;
  	fd_set		input_mask;
  	struct timeval timeout;
  	struct timeval *timeoutptr;
  
! 	if (PQsocket(conn) < 0)
  	{
  		fprintf(stderr, _("%s: invalid socket: %s"), progname,
  				PQerrorMessage(conn));
--- 889,913 ----
  }
  
  /*
!  * Wait until we can read a CopyData message,
!  * or timeout, or occurrence of a signal or input on the stop_socket.
!  * (timeout_ms < 0 means wait indefinitely; 0 means don't wait.)
   *
   * Returns 1 if data has become available for reading, 0 if timed out
!  * or interrupted by signal or stop_socket input, and -1 on an error.
   */
  static int
! CopyStreamPoll(PGconn *conn, long timeout_ms, pgsocket stop_socket)
  {
  	int			ret;
  	fd_set		input_mask;
+ 	int			connsocket;
+ 	int			maxfd;
  	struct timeval timeout;
  	struct timeval *timeoutptr;
  
! 	connsocket = PQsocket(conn);
! 	if (connsocket < 0)
  	{
  		fprintf(stderr, _("%s: invalid socket: %s"), progname,
  				PQerrorMessage(conn));
*************** CopyStreamPoll(PGconn *conn, long timeou
*** 902,908 ****
  	}
  
  	FD_ZERO(&input_mask);
! 	FD_SET(PQsocket(conn), &input_mask);
  
  	if (timeout_ms < 0)
  		timeoutptr = NULL;
--- 915,927 ----
  	}
  
  	FD_ZERO(&input_mask);
! 	FD_SET(connsocket, &input_mask);
! 	maxfd = connsocket;
! 	if (stop_socket != PGINVALID_SOCKET)
! 	{
! 		FD_SET(stop_socket, &input_mask);
! 		maxfd = Max(maxfd, stop_socket);
! 	}
  
  	if (timeout_ms < 0)
  		timeoutptr = NULL;
*************** CopyStreamPoll(PGconn *conn, long timeou
*** 913,929 ****
  		timeoutptr = &timeout;
  	}
  
! 	ret = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
! 	if (ret == 0 || (ret < 0 && errno == EINTR))
! 		return 0;				/* Got a timeout or signal */
! 	else if (ret < 0)
  	{
  		fprintf(stderr, _("%s: select() failed: %s\n"),
  				progname, strerror(errno));
  		return -1;
  	}
  
! 	return 1;
  }
  
  /*
--- 932,951 ----
  		timeoutptr = &timeout;
  	}
  
! 	ret = select(maxfd + 1, &input_mask, NULL, NULL, timeoutptr);
! 
! 	if (ret < 0)
  	{
+ 		if (errno == EINTR)
+ 			return 0;			/* Got a signal, so not an error */
  		fprintf(stderr, _("%s: select() failed: %s\n"),
  				progname, strerror(errno));
  		return -1;
  	}
+ 	if (ret > 0 && FD_ISSET(connsocket, &input_mask))
+ 		return 1;				/* Got input on connection socket */
  
! 	return 0;					/* Got timeout or input on stop_socket */
  }
  
  /*
*************** CopyStreamPoll(PGconn *conn, long timeou
*** 934,944 ****
   * point to a buffer holding the received message. The buffer is only valid
   * until the next CopyStreamReceive call.
   *
!  * 0 if no data was available within timeout, or wait was interrupted
!  * by signal. -1 on error. -2 if the server ended the COPY.
   */
  static int
! CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
  {
  	char	   *copybuf = NULL;
  	int			rawlen;
--- 956,968 ----
   * point to a buffer holding the received message. The buffer is only valid
   * until the next CopyStreamReceive call.
   *
!  * Returns 0 if no data was available within timeout, or if wait was
!  * interrupted by signal or stop_socket input.
!  * -1 on error. -2 if the server ended the COPY.
   */
  static int
! CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,
! 				  char **buffer)
  {
  	char	   *copybuf = NULL;
  	int			rawlen;
*************** CopyStreamReceive(PGconn *conn, long tim
*** 951,970 ****
  	rawlen = PQgetCopyData(conn, &copybuf, 1);
  	if (rawlen == 0)
  	{
  		/*
! 		 * No data available. Wait for some to appear, but not longer than the
! 		 * specified timeout, so that we can ping the server.
  		 */
! 		if (timeout != 0)
! 		{
! 			int			ret;
! 
! 			ret = CopyStreamPoll(conn, timeout);
! 			if (ret <= 0)
! 				return ret;
! 		}
  
! 		/* Else there is actually data on the socket */
  		if (PQconsumeInput(conn) == 0)
  		{
  			fprintf(stderr,
--- 975,992 ----
  	rawlen = PQgetCopyData(conn, &copybuf, 1);
  	if (rawlen == 0)
  	{
+ 		int			ret;
+ 
  		/*
! 		 * No data available.  Wait for some to appear, but not longer than
! 		 * the specified timeout, so that we can ping the server.  Also stop
! 		 * waiting if input appears on stop_socket.
  		 */
! 		ret = CopyStreamPoll(conn, timeout, stop_socket);
! 		if (ret <= 0)
! 			return ret;
  
! 		/* Now there is actually data on the socket */
  		if (PQconsumeInput(conn) == 0)
  		{
  			fprintf(stderr,
diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h
index 42e93ac..9a51d9a 100644
*** a/src/bin/pg_basebackup/receivelog.h
--- b/src/bin/pg_basebackup/receivelog.h
*************** typedef struct StreamCtl
*** 42,47 ****
--- 42,50 ----
  
  	stream_stop_callback stream_stop;	/* Stop streaming when returns true */
  
+ 	pgsocket	stop_socket;	/* if valid, watch for input on this socket
+ 								 * and check stream_stop() when there is any */
+ 
  	WalWriteMethod *walmethod;	/* How to write the WAL */
  	char	   *partial_suffix; /* Suffix appended to partially received files */
  	char	   *replication_slot;		/* Replication slot to use, or NULL */
