diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 64614b569c..01a7802ed4 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -9907,6 +9907,54 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
       </entry>
      </row>
 
+     <row>
+      <entry><structfield>wal_status</structfield></entry>
+      <entry><type>text</type></entry>
+      <entry></entry>
+
+      <entry>Availability of WAL files claimed by this slot.
+      Valid values are:
+       <simplelist>
+        <member>
+         <literal>normal</literal> means that the claimed files
+         are within <varname>max_wal_size</varname>
+        </member>
+        <member>
+         <literal>keeping</literal> means that <varname>max_wal_size</varname>
+         is exceeded but still held by replication slots or
+         <varname>wal_keep_segments</varname>
+        </member>
+        <member>
+         <literal>losing</literal> means that some of the files are on the verge
+         of deletion, but can still be accessed by a session that's currently
+         reading it
+        </member>
+        <member>
+         <literal>lost</literal> means that some of them are definitely lost
+         and the session using this slot cannot continue replication.
+         This state also implies that the session using this slot has been
+         stopped.
+        </member>
+       </simplelist>
+      The last two states are seen only when
+      <xref linkend="guc-max-slot-wal-keep-size"/> is
+      non-negative. If <structfield>restart_lsn</structfield> is NULL, this
+      field is null.
+      </entry>
+     </row>
+
+     <row>
+      <entry><structfield>remain</structfield></entry>
+      <entry><type>bigint</type></entry>
+      <entry></entry>
+      <entry>The amount in bytes of WAL that can be written before this slot
+        loses required WAL files.
+        If <structfield>restart_lsn</structfield> is null or
+        <structfield>wal_status</structfield> is <literal>losing</literal>
+        or <literal>lost</literal>, this field is null.
+      </entry>
+     </row>
+
     </tbody>
    </tgroup>
   </table>
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index c4d6ed4bbc..17c18386e2 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -3753,6 +3753,29 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
       </listitem>
      </varlistentry>
 
+      <varlistentry id="guc-max-slot-wal-keep-size" xreflabel="max_slot_wal_keep_size">
+       <term><varname>max_slot_wal_keep_size</varname> (<type>integer</type>)
+       <indexterm>
+        <primary><varname>max_slot_wal_keep_size</varname> configuration parameter</primary>
+       </indexterm>
+       </term>
+       <listitem>
+       <para>
+        Specify the maximum size of WAL files
+        that <link linkend="streaming-replication-slots">replication
+        slots</link> are allowed to retain in the <filename>pg_wal</filename>
+        directory at checkpoint time.
+        If <varname>max_slot_wal_keep_size</varname> is -1 (the default),
+        replication slots retain unlimited amount of WAL files.  If
+        restart_lsn of a replication slot gets behind more than that megabytes
+        from the current LSN, the standby using the slot may no longer be able
+        to continue replication due to removal of required WAL files. You
+        can see the WAL availability of replication slots
+        in <link linkend="view-pg-replication-slots">pg_replication_slots</link>.
+       </para>
+       </listitem>
+      </varlistentry>
+
      <varlistentry id="guc-wal-sender-timeout" xreflabel="wal_sender_timeout">
       <term><varname>wal_sender_timeout</varname> (<type>integer</type>)
       <indexterm>
diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml
index b5d32bb720..624e5f94ad 100644
--- a/doc/src/sgml/high-availability.sgml
+++ b/doc/src/sgml/high-availability.sgml
@@ -925,9 +925,11 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
     <xref linkend="guc-archive-command"/>.
     However, these methods often result in retaining more WAL segments than
     required, whereas replication slots retain only the number of segments
-    known to be needed.  An advantage of these methods is that they bound
-    the space requirement for <literal>pg_wal</literal>; there is currently no way
-    to do this using replication slots.
+    known to be needed.  On the other hand, replication slots can retain so
+    many WAL segments that they fill up the space allocated
+    for <literal>pg_wal</literal>;
+    <xref linkend="guc-max-slot-wal-keep-size"/> limits the size of WAL files
+    retained by replication slots.
    </para>
    <para>
     Similarly, <xref linkend="guc-hot-standby-feedback"/>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 977d448f50..bb735c6f68 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -106,6 +106,7 @@ int			wal_level = WAL_LEVEL_MINIMAL;
 int			CommitDelay = 0;	/* precommit delay in microseconds */
 int			CommitSiblings = 5; /* # concurrent xacts needed to sleep */
 int			wal_retrieve_retry_interval = 5000;
+int			max_slot_wal_keep_size_mb = -1;
 
 #ifdef WAL_DEBUG
 bool		XLOG_DEBUG = false;
@@ -759,7 +760,7 @@ static ControlFileData *ControlFile = NULL;
  */
 #define UsableBytesInPage (XLOG_BLCKSZ - SizeOfXLogShortPHD)
 
-/* Convert min_wal_size_mb and max_wal_size_mb to equivalent segment count */
+/* Convert values of GUCs measured in megabytes to equiv. segment count */
 #define ConvertToXSegs(x, segsize)	\
 	(x / ((segsize) / (1024 * 1024)))
 
@@ -896,6 +897,7 @@ static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI,
 static void LocalSetXLogInsertAllowed(void);
 static void CreateEndOfRecoveryRecord(void);
 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
+static XLogSegNo GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr);
 static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
 static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
 
@@ -3929,9 +3931,10 @@ XLogGetLastRemovedSegno(void)
 	return lastRemovedSegNo;
 }
 
+
 /*
- * Update the last removed segno pointer in shared memory, to reflect
- * that the given XLOG file has been removed.
+ * Update the last removed segno pointer in shared memory, to reflect that the
+ * given XLOG file has been removed.
  */
 static void
 UpdateLastRemovedPtr(char *filename)
@@ -9050,6 +9053,7 @@ CreateCheckPoint(int flags)
 	XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
 	KeepLogSeg(recptr, &_logSegNo);
 	_logSegNo--;
+	InvalidateObsoleteReplicationSlots(_logSegNo);
 	RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr);
 
 	/*
@@ -9401,6 +9405,7 @@ CreateRestartPoint(int flags)
 	if (RecoveryInProgress())
 		ThisTimeLineID = replayTLI;
 
+	InvalidateObsoleteReplicationSlots(_logSegNo);
 	RemoveOldXlogFiles(_logSegNo, RedoRecPtr, endptr);
 
 	/*
@@ -9451,13 +9456,165 @@ CreateRestartPoint(int flags)
 	return true;
 }
 
+/*
+ * Report availability of WAL for a replication slot
+ *		restart_lsn and active_pid are straight from the slot info
+ *
+ * *distance is set to the distance in bytes of ... XXX what?
+ *
+ * Returns one of the following enum values.
+ *
+ * WALAVAIL_NORMAL means targetLSN is available because it is in the range of
+ * max_wal_size.  If max_slot_wal_keep_size is smaller than max_wal_size, this
+ * state is not returned.
+ *
+ * WALAVAIL_PRESERVED means it is still available by preserving extra segments
+ * beyond max_wal_size.
+ *
+ * WALAVAIL_BEING_REMOVED means it is being removed or already removed but the
+ * replication stream on the given slot is live yet. The state may transit to
+ * WALAVAIL_PRESERVED or WALAVAIL_NORMAL state if the walsender advances
+ * restart_lsn.
+ *
+ * WALAVAIL_REMOVED means it is definitely lost. The replication stream on the
+ * slot cannot continue.
+ *
+ * WALAVAIL_INVALID_LSN means the slot hasn't been set to reserve WAL.
+ */
+WalAvailability
+GetWalAvailability(XLogRecPtr restart_lsn, pid_t walsender_pid, uint64 *distance)
+{
+	XLogRecPtr currpos;
+	XLogRecPtr slotPtr;
+	XLogSegNo currSeg;		/* segid of currpos */
+	XLogSegNo restartSeg;	/* segid of restart_lsn */
+	XLogSegNo oldestSeg;	/* actual oldest segid */
+	XLogSegNo oldestSegMaxWalSize;	/* oldest segid kept by max_wal_size */
+	XLogSegNo oldestSlotSeg;/* oldest segid kept by slot */
+	uint64	  keepSegs;
+
+	*distance = 0;	/* XXX figure out how to compute this */
+
+	/* slot does not reserve WAL. Either deactivated, or has never been active */
+	if (XLogRecPtrIsInvalid(restart_lsn))
+		return WALAVAIL_INVALID_LSN;
+
+	currpos = GetXLogWriteRecPtr();
+
+	/* calculate oldest segment currently needed by slots */
+	XLByteToSeg(restart_lsn, restartSeg, wal_segment_size);
+	slotPtr = XLogGetReplicationSlotMinimumLSN();
+	oldestSlotSeg = GetOldestKeepSegment(currpos, slotPtr);
+
+	/*
+	 * Find the oldest extant segment file. We get 1 until checkpoint removes
+	 * the first WAL segment file since startup, which causes the status being
+	 * wrong under certain abnormal conditions but that doesn't actually harm.
+	 */
+	oldestSeg = XLogGetLastRemovedSegno() + 1;
+
+	/* calculate oldest segment by max_wal_size */
+	XLByteToSeg(currpos, currSeg, wal_segment_size);
+	keepSegs = ConvertToXSegs(max_wal_size_mb, wal_segment_size) + 1;
+
+	if (currSeg > keepSegs)
+		oldestSegMaxWalSize = currSeg - keepSegs;
+	else
+		oldestSegMaxWalSize = 1;
+
+	/*
+	 * If max_slot_wal_keep_size has changed after the last call, the segment
+	 * that would been kept by the current setting might have been lost by the
+	 * previous setting. No point in showing normal or keeping status values if
+	 * the restartSeg is known to be lost.
+	 */
+	if (restartSeg >= oldestSeg)
+	{
+		/*
+		 * show "normal" when restartSeg is within max_wal_size. If
+		 * max_slot_wal_keep_size is smaller than max_wal_size, there's no
+		 * point in showing the status.
+		 */
+		if ((max_slot_wal_keep_size_mb <= 0 ||
+			 max_slot_wal_keep_size_mb >= max_wal_size_mb) &&
+			oldestSegMaxWalSize <= restartSeg)
+			return WALAVAIL_NORMAL;
+
+		/* being retained by slots */
+		if (oldestSlotSeg <= restartSeg)
+			return WALAVAIL_PRESERVED;
+	}
+
+	/*
+	 * The segment is already lost or being lost. If the oldest segment is just
+	 * after the restartSeg, running walsender may be reading the just removed
+	 * segment. The walsender may safely move to the oldest existing segment in
+	 * that case.
+	 */
+	if (oldestSeg == restartSeg + 1 && walsender_pid != 0)
+		return WALAVAIL_BEING_REMOVED;
+
+	/* definitely lost. the walsender can no longer restart */
+	return WALAVAIL_REMOVED;
+}
+
+/*
+ * Returns minimum segment number that the next checkpoint must leave
+ * considering wal_keep_segments, replication slots and
+ * max_slot_wal_keep_size.
+ *
+ * currLSN is the current insert location.
+ * minSlotLSN is the minimum restart_lsn of all active slots.
+ */
+static XLogSegNo
+GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN)
+{
+	XLogSegNo	currSeg;
+	XLogSegNo	minSlotSeg;
+	uint64		keepSegs = 0;	/* # of segments actually kept */
+
+	XLByteToSeg(currLSN, currSeg, wal_segment_size);
+	XLByteToSeg(minSlotLSN, minSlotSeg, wal_segment_size);
+
+	/*
+	 * Calculate how many segments are kept by slots first. The second
+	 * term of the condition is just a sanity check.
+	 */
+	if (minSlotLSN != InvalidXLogRecPtr && minSlotSeg <= currSeg)
+		keepSegs = currSeg - minSlotSeg;
+
+	/* Cap keepSegs by max_slot_wal_keep_size */
+	if (max_slot_wal_keep_size_mb >= 0)
+	{
+		uint64 limit;
+
+		limit = ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size);
+
+		/* Reduce it if slots already reserves too many. */
+		if (limit < keepSegs)
+			keepSegs = limit;
+	}
+
+	/* but, keep at least wal_keep_segments segments if any */
+	if (wal_keep_segments > 0 && keepSegs < wal_keep_segments)
+		keepSegs = wal_keep_segments;
+
+	/* avoid underflow, don't go below 1 */
+	if (currSeg <= keepSegs)
+		return 1;
+
+	return currSeg - keepSegs;
+}
+
 /*
  * Retreat *logSegNo to the last segment that we need to retain because of
  * either wal_keep_segments or replication slots.
  *
  * This is calculated by subtracting wal_keep_segments from the given xlog
  * location, recptr and by making sure that that result is below the
- * requirement of replication slots.
+ * requirement of replication slots.  For the latter criterion we do consider
+ * the effects of max_slot_wal_keep_size: reserve at most that much space back
+ * from recptr.
  */
 static void
 KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
@@ -9467,6 +9624,11 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 
 	XLByteToSeg(recptr, segno, wal_segment_size);
 	keep = XLogGetReplicationSlotMinimumLSN();
+	if (max_slot_wal_keep_size_mb > 0 &&
+		recptr - keep > max_slot_wal_keep_size_mb)
+	{
+		keep = recptr - max_slot_wal_keep_size;
+	}
 
 	/* compute limit for wal_keep_segments first */
 	if (wal_keep_segments > 0)
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 813ea8bfc3..e85f40c4c1 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -876,7 +876,9 @@ CREATE VIEW pg_replication_slots AS
             L.xmin,
             L.catalog_xmin,
             L.restart_lsn,
-            L.confirmed_flush_lsn
+            L.confirmed_flush_lsn,
+            L.wal_status,
+            L.remain
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index d90c7235e9..d32ed7e62f 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1064,6 +1064,53 @@ ReplicationSlotReserveWal(void)
 	}
 }
 
+/*
+ * Mark any slot that points to an LSN older than the given segment
+ * as invalid; it requires WAL that's about to be removed.
+ *
+ * NB - this runs as part of checkpoint, so avoid raising errors if possible.
+ */
+void
+InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
+{
+	XLogRecPtr	oldestLSN;
+	List	   *pids = NIL;
+
+	XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
+
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	for (int i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+		if (!s->in_use)
+			continue;
+
+		if (s->data.restart_lsn < oldestLSN)
+		{
+			/* mark this slot as invalid */
+			SpinLockAcquire(&s->mutex);
+			s->restart_lsn = InvalidXLogRecPtr;
+
+			/* remember PID for killing, if active*/
+			if (s->active_pid != 0)
+				pids = lappend_int(pids, s->active_pid);
+			SpinLockRelease(&s->mutex);
+		}
+	}
+	LWLockRelease(ReplicationSlotControlLock);
+
+	/*
+	 * Signal any active walsenders to terminate.  We do not wait to observe
+	 * them gone.
+	 */
+	foreach(cell, pids)
+	{
+		/* signal the walsender to terminate */
+		(void) kill(lfirst_int(cell), SIGTERM);
+	}
+}
+
 /*
  * Flush all replication slots to disk.
  *
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index ce0c9127bc..68746ad3c8 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -234,7 +234,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 11
+#define PG_GET_REPLICATION_SLOTS_COLS 13
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
 	Tuplestorestate *tupstore;
@@ -288,6 +288,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		Oid			database;
 		NameData	slot_name;
 		NameData	plugin;
+		WalAvailability walstate;
+		uint64		distance;
 		int			i;
 
 		if (!slot->in_use)
@@ -355,6 +357,39 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		else
 			nulls[i++] = true;
 
+		walstate = GetWalAvailability(restart_lsn, active_pid, &distance);
+
+		switch (walstate)
+		{
+			case WALAVAIL_INVALID_LSN:
+				nulls[i++] = true;
+				break;
+
+			case WALAVAIL_NORMAL:
+				values[i++] = CStringGetTextDatum("normal");
+				break;
+
+			case WALAVAIL_PRESERVED:
+				values[i++] = CStringGetTextDatum("keeping");
+				break;
+
+			case WALAVAIL_BEING_REMOVED:
+				values[i++] = CStringGetTextDatum("losing");
+				break;
+
+			case WALAVAIL_REMOVED:
+				values[i++] = CStringGetTextDatum("lost");
+				break;
+		}
+
+		if (max_slot_wal_keep_size_mb >= 0 &&
+			(walstate == WALAVAIL_NORMAL || walstate == WALAVAIL_PRESERVED))
+		{
+			values[i++] = Int64GetDatum(distance);
+		}
+		else
+			nulls[i++] = true;
+
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
 	}
 	LWLockRelease(ReplicationSlotControlLock);
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 64dc9fbd13..1cfb999748 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2763,6 +2763,19 @@ static struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"max_slot_wal_keep_size", PGC_SIGHUP, REPLICATION_SENDING,
+			gettext_noop("Sets the maximum WAL size that can be reserved by replication slots."),
+			gettext_noop("Replication slots will be marked as failed, and segments released "
+						 "for deletion or recycling, if this much space is occupied by WAL "
+						 "on disk."),
+			GUC_UNIT_MB
+		},
+		&max_slot_wal_keep_size_mb,
+		-1, -1, MAX_KILOBYTES,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"wal_sender_timeout", PGC_USERSET, REPLICATION_SENDING,
 			gettext_noop("Sets the maximum time to wait for WAL replication."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index e904fa7300..507a72b712 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -288,6 +288,7 @@
 #max_wal_senders = 10		# max number of walsender processes
 				# (change requires restart)
 #wal_keep_segments = 0		# in logfile segments; 0 disables
+#max_slot_wal_keep_size = -1	# measured in bytes; -1 disables
 #wal_sender_timeout = 60s	# in milliseconds; 0 disables
 
 #max_replication_slots = 10	# max number of replication slots
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 9ec7b31cce..d0b2509081 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -108,6 +108,7 @@ extern int	wal_segment_size;
 extern int	min_wal_size_mb;
 extern int	max_wal_size_mb;
 extern int	wal_keep_segments;
+extern int	max_slot_wal_keep_size_mb;
 extern int	XLOGbuffers;
 extern int	XLogArchiveTimeout;
 extern int	wal_retrieve_retry_interval;
@@ -255,6 +256,20 @@ typedef struct CheckpointStatsData
 
 extern CheckpointStatsData CheckpointStats;
 
+/*
+ * WAL segment availability status
+ *
+ * This is used as the return value of GetWalAvailability.
+ */
+typedef enum WalAvailability
+{
+	WALAVAIL_INVALID_LSN,			/* parameter error */
+	WALAVAIL_NORMAL,				/* WAL segment is within max_wal_size */
+	WALAVAIL_PRESERVED,				/* WAL segment is preserved by repslots */
+	WALAVAIL_BEING_REMOVED,			/* WAL segment is no longer preserved */
+	WALAVAIL_REMOVED				/* WAL segment has been removed */
+} WalAvailability;
+
 struct XLogRecData;
 
 extern XLogRecPtr XLogInsertRecord(struct XLogRecData *rdata,
@@ -305,6 +320,9 @@ extern void ShutdownXLOG(int code, Datum arg);
 extern void InitXLOGAccess(void);
 extern void CreateCheckPoint(int flags);
 extern bool CreateRestartPoint(int flags);
+extern WalAvailability GetWalAvailability(XLogRecPtr restart_lsn,
+										  pid_t walsender_pid,
+										  uint64 *distance);
 extern void XLogPutNextOid(Oid nextOid);
 extern XLogRecPtr XLogRestorePoint(const char *rpName);
 extern void UpdateFullPageWrites(void);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index a649e44d08..dd968adf74 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -9986,9 +9986,9 @@
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}',
+  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,remain}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
diff --git a/src/test/recovery/t/018_replslot_limit.pl b/src/test/recovery/t/018_replslot_limit.pl
new file mode 100644
index 0000000000..4ff2ef3b48
--- /dev/null
+++ b/src/test/recovery/t/018_replslot_limit.pl
@@ -0,0 +1,203 @@
+# Test for replication slot limit
+# Ensure that max_slot_wal_keep_size limits the number of WAL files to
+# be kept by replication slots.
+
+use strict;
+use warnings;
+use File::Path qw(rmtree);
+use PostgresNode;
+use TestLib;
+use Test::More tests => 13;
+use Time::HiRes qw(usleep);
+
+$ENV{PGDATABASE} = 'postgres';
+
+# Initialize master node, setting wal-segsize to 1MB
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1, extra => ['--wal-segsize=1']);
+$node_master->append_conf('postgresql.conf', qq(
+min_wal_size = 2MB
+max_wal_size = 4MB
+log_checkpoints = yes
+));
+$node_master->start;
+$node_master->safe_psql('postgres', "SELECT pg_create_physical_replication_slot('rep1')");
+
+# The slot state and remain should be null before the first connection
+my $result = $node_master->safe_psql('postgres', "SELECT restart_lsn is NULL, wal_status is NULL, remain is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "t|t|t", 'check the state of non-reserved slot is "unknown"');
+
+
+# Take backup
+my $backup_name = 'my_backup';
+$node_master->backup($backup_name);
+
+# Create a standby linking to it using the replication slot
+my $node_standby = get_new_node('standby_1');
+$node_standby->init_from_backup($node_master, $backup_name, has_streaming => 1);
+$node_standby->append_conf('postgresql.conf', "primary_slot_name = 'rep1'");
+
+$node_standby->start;
+
+# Wait until standby has replayed enough data
+my $start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+
+# Stop standby
+$node_standby->stop;
+
+
+# Preparation done, the slot is the state "normal" now
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, remain is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|normal|t", 'check the catching-up state');
+
+# Advance WAL by five segments (= 5MB) on master
+advance_wal($node_master, 1);
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# The slot is always "safe" when fitting max_wal_size
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, remain is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|normal|t", 'check that restart_lsn is in max_wal_size');
+
+advance_wal($node_master, 4);
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# The slot is always "safe" when max_slot_wal_keep_size is not set
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, remain is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|normal|t", 'check that slot is working');
+
+# The standby can reconnect to master
+$node_standby->start;
+
+$start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+
+$node_standby->stop;
+
+# Set max_slot_wal_keep_size on master
+my $max_slot_wal_keep_size_mb = 6;
+$node_master->append_conf('postgresql.conf', qq(
+max_slot_wal_keep_size = ${max_slot_wal_keep_size_mb}MB
+));
+$node_master->reload;
+
+# The slot is in safe state. The remaining bytes should be as almost
+# (max_slot_wal_keep_size + 1) times large as the segment size
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(remain) as remain FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|normal|7168 kB", 'check that max_slot_wal_keep_size is working');
+
+# Advance WAL again then checkpoint, reducing remain by 2 MB.
+advance_wal($node_master, 2);
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# The slot is still working
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(remain) as remain FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|normal|5120 kB", 'check that remaining byte is calculated correctly');
+
+# wal_keep_segments overrides max_slot_wal_keep_size
+$result = $node_master->safe_psql('postgres', "ALTER SYSTEM SET wal_keep_segments to 8; SELECT pg_reload_conf();");
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(remain) as remain FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|normal|7168 kB", 'check that wal_keep_segments overrides max_slot_wal_keep_size');
+
+# restore wal_keep_segments
+$result = $node_master->safe_psql('postgres', "ALTER SYSTEM SET wal_keep_segments to 0; SELECT pg_reload_conf();");
+
+# Advance WAL again without checkpoint, reducing remain by 4 MB.
+advance_wal($node_master, 4);
+
+# Slot gets into 'keeping' state
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, pg_size_pretty(remain) as remain FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|keeping|1024 kB", 'check that the slot state changes to "keeping"');
+
+# do checkpoint so that the next checkpoint runs too early
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# Advance WAL again without checkpoint; remain goes to 0.
+advance_wal($node_master, 1);
+
+# Slot gets into 'lost' state
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, remain is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|lost|t", 'check that the slot state changes to "lost"');
+
+# The standby still can connect to master before a checkpoint
+$node_standby->start;
+
+$start_lsn = $node_master->lsn('write');
+$node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
+
+$node_standby->stop;
+
+ok(!find_in_log($node_standby,
+				"requested WAL segment [0-9A-F]+ has already been removed"),
+   'check that required WAL segments are still available');
+
+# Advance WAL again, the slot loses the oldest segment.
+my $logstart = get_log_size($node_master);
+advance_wal($node_master, 7);
+$node_master->safe_psql('postgres', "CHECKPOINT;");
+
+# WARNING should be issued
+ok(find_in_log($node_master,
+			   "1 replication slot has lost required WAL segments by 1 segments\n".
+			   ".*Slot rep1 lost required segments.",
+			   $logstart),
+   'check that the warning is logged');
+
+# This slot should be broken
+$result = $node_master->safe_psql('postgres', "SELECT restart_lsn, wal_status, remain is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'");
+is($result, "$start_lsn|lost|t", 'check that the slot state changes to "lost"');
+
+# The standby no longer can connect to the master
+$logstart = get_log_size($node_standby);
+$node_standby->start;
+
+my $failed = 0;
+for (my $i = 0 ; $i < 10000 ; $i++)
+{
+	if (find_in_log($node_standby,
+					"requested WAL segment [0-9A-F]+ has already been removed",
+					$logstart))
+	{
+		$failed = 1;
+		last;
+	}
+	usleep(100_000);
+}
+ok($failed, 'check that replication has been broken');
+
+$node_standby->stop;
+
+#####################################
+# Advance WAL of $node by $n segments
+sub advance_wal
+{
+	my ($node, $n) = @_;
+
+	# Advance by $n segments (= (16 * $n) MB) on master
+	for (my $i = 0 ; $i < $n ; $i++)
+	{
+		$node->safe_psql('postgres', "CREATE TABLE t (); DROP TABLE t; SELECT pg_switch_wal();");
+	}
+}
+
+# return the size of logfile of $node in bytes
+sub get_log_size
+{
+	my ($node) = @_;
+
+	return (stat $node->logfile)[7];
+}
+
+# find $pat in logfile of $node after $off-th byte
+sub find_in_log
+{
+	my ($node, $pat, $off) = @_;
+
+	$off = 0 unless defined $off;
+	my $log = TestLib::slurp_file($node->logfile);
+	return 0 if (length($log) <= $off);
+
+	$log = substr($log, $off);
+
+	return $log =~ m/$pat/;
+}
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 6eec8ec568..3a0c619c1c 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1462,8 +1462,10 @@ pg_replication_slots| SELECT l.slot_name,
     l.xmin,
     l.catalog_xmin,
     l.restart_lsn,
-    l.confirmed_flush_lsn
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn)
+    l.confirmed_flush_lsn,
+    l.wal_status,
+    l.remain
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, remain)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
