From 9bc7ca30006ebe0fe13c6ffbf4bfc87e52176876 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 21 Dec 2017 21:20:20 +0900
Subject: [PATCH 1/6] Add WAL relief vent for replication slots

Replication slot is useful to maintain replication connection in the
configurations where replication is so delayed that connection is
broken. On the other hand so many WAL files can fill up disk that the
master downs by a long delay. This feature, which is activated by a
GUC "max_slot_wal_keep_size", protects master servers from suffering
disk full by limiting the number of WAL files reserved by replication
slots.
---
 src/backend/access/transam/xlog.c             | 128 +++++++++++++++++++++-----
 src/backend/replication/slot.c                |  57 ++++++++++++
 src/backend/utils/misc/guc.c                  |  12 +++
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/access/xlog.h                     |   1 +
 src/include/replication/slot.h                |   1 +
 6 files changed, 175 insertions(+), 25 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index ecd12fc53a..998b779277 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -100,6 +100,7 @@ int			wal_level = WAL_LEVEL_MINIMAL;
 int			CommitDelay = 0;	/* precommit delay in microseconds */
 int			CommitSiblings = 5; /* # concurrent xacts needed to sleep */
 int			wal_retrieve_retry_interval = 5000;
+int			max_slot_wal_keep_size_mb = -1;
 
 #ifdef WAL_DEBUG
 bool		XLOG_DEBUG = false;
@@ -872,6 +873,7 @@ static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI,
 static void LocalSetXLogInsertAllowed(void);
 static void CreateEndOfRecoveryRecord(void);
 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
+static XLogSegNo GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr);
 static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
 static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
 
@@ -9329,6 +9331,54 @@ CreateRestartPoint(int flags)
 	return true;
 }
 
+/*
+ * Returns minimum segment number that the next checkpoint must leave
+ * considering wal_keep_segments, replication slots and
+ * max_slot_wal_keep_size.
+ *
+ * currLSN is the current insert location.
+ * minSlotLSN is the minimum restart_lsn of all active slots.
+ */
+static XLogSegNo
+GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN)
+{
+	XLogSegNo	currSeg;
+	XLogSegNo	minSlotSeg;
+	uint64		keepSegs = 0;	/* # of segments actually kept */
+
+	XLByteToSeg(currLSN, currSeg, wal_segment_size);
+	XLByteToSeg(minSlotLSN, minSlotSeg, wal_segment_size);
+
+	/*
+	 * Calculate how many segments are kept by slots first. The second
+	 * term of the condition is just a sanity check.
+	 */
+	if (minSlotLSN != InvalidXLogRecPtr && minSlotSeg <= currSeg)
+		keepSegs = currSeg - minSlotSeg;
+
+	/* Cap keepSegs by max_slot_wal_keep_size */
+	if (max_slot_wal_keep_size_mb >= 0)
+	{
+		uint64 limitSegs;
+
+		limitSegs = ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size);
+
+		/* Reduce it if slots already reserves too many. */
+		if (limitSegs < keepSegs)
+			keepSegs = limitSegs;
+	}
+
+	/* but, keep at least wal_keep_segments segments if any */
+	if (wal_keep_segments > 0 && keepSegs < wal_keep_segments)
+		keepSegs = wal_keep_segments;
+
+	/* avoid underflow, don't go below 1 */
+	if (currSeg <= keepSegs)
+		return 1;
+
+	return currSeg - keepSegs;
+}
+
 /*
  * Retreat *logSegNo to the last segment that we need to retain because of
  * either wal_keep_segments or replication slots.
@@ -9340,38 +9390,66 @@ CreateRestartPoint(int flags)
 static void
 KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 {
-	XLogSegNo	segno;
-	XLogRecPtr	keep;
+	XLogRecPtr	slotminptr = InvalidXLogRecPtr;
+	XLogSegNo	minSegNo;
+	XLogSegNo	slotSegNo;
 
-	XLByteToSeg(recptr, segno, wal_segment_size);
-	keep = XLogGetReplicationSlotMinimumLSN();
+	if (max_replication_slots > 0)
+		slotminptr = XLogGetReplicationSlotMinimumLSN();
 
-	/* compute limit for wal_keep_segments first */
-	if (wal_keep_segments > 0)
+	/*
+	 * We should keep certain number of WAL segments after this checkpoint.
+	 */
+	minSegNo = GetOldestKeepSegment(recptr, slotminptr);
+
+	/*
+	 * Warn the checkpoint is going to flush the segments required by
+	 * replication slots.
+	 */
+	if (!XLogRecPtrIsInvalid(slotminptr))
 	{
-		/* avoid underflow, don't go below 1 */
-		if (segno <= wal_keep_segments)
-			segno = 1;
+		static XLogSegNo prev_lost_segs = 0;	/* avoid duplicate messages */
+
+		XLByteToSeg(slotminptr, slotSegNo, wal_segment_size);
+
+		if (slotSegNo < minSegNo)
+		{
+			XLogSegNo lost_segs = minSegNo - slotSegNo;
+			if (prev_lost_segs != lost_segs)
+			{
+				/* We have lost a new segment, warn it.*/
+				XLogRecPtr minlsn;
+				char *slot_names;
+				int nslots;
+
+				XLogSegNoOffsetToRecPtr(minSegNo, 0, wal_segment_size, minlsn);
+				slot_names =
+					ReplicationSlotsEnumerateBehinds(minlsn, ", ", &nslots);
+
+				/*
+				 * Some of the affected slots could have just been removed.
+				 * We don't need show anything here if no affected slot
+				 * remains.
+				 */
+				if (slot_names)
+				{
+					ereport(WARNING,
+							(errmsg ("some replication slots have lost required WAL segments"),
+							 errdetail_plural(
+								 "Slot %s lost %ld segment(s).",
+								 "Slots %s lost at most %ld segment(s).",
+								 nslots, slot_names, lost_segs)));
+				}
+			}
+			prev_lost_segs = lost_segs;
+		}
 		else
-			segno = segno - wal_keep_segments;
-	}
-
-	/* then check whether slots limit removal further */
-	if (max_replication_slots > 0 && keep != InvalidXLogRecPtr)
-	{
-		XLogSegNo	slotSegNo;
-
-		XLByteToSeg(keep, slotSegNo, wal_segment_size);
-
-		if (slotSegNo <= 0)
-			segno = 1;
-		else if (slotSegNo < segno)
-			segno = slotSegNo;
+			prev_lost_segs = 0;
 	}
 
 	/* don't delete WAL segments newer than the calculated segment */
-	if (segno < *logSegNo)
-		*logSegNo = segno;
+	if (minSegNo < *logSegNo)
+		*logSegNo = minSegNo;
 }
 
 /*
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 33b23b6b6d..6ef63ae7c0 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1064,6 +1064,63 @@ ReplicationSlotReserveWal(void)
 	}
 }
 
+/*
+ * Returns names of replication slots that their restart_lsn are behind
+ * specified LSN, in palloc'ed character array stuffed with slot names
+ * delimited by the given separator.  Returns NULL if no slot matches. If
+ * pnslots is given, the number of the returned slots is returned there.
+ */
+char *
+ReplicationSlotsEnumerateBehinds(XLogRecPtr target, char *separator, int *pnslots)
+{
+	static StringInfoData retstr;
+	static bool retstr_initialized = false;
+	bool insert_separator = false;
+	int i;
+	int nslots = 0;
+
+	Assert (separator);
+	if (max_replication_slots <= 0)
+		return NULL;
+
+	if (!retstr_initialized)
+	{
+		initStringInfo(&retstr);
+		retstr_initialized = true;
+	}
+	else
+		resetStringInfo(&retstr);
+
+	/* construct name list */
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	for (i = 0 ; i < max_replication_slots ; i++)
+	{
+		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+		if (s->in_use && s->data.restart_lsn < target)
+		{
+			if (insert_separator)
+				appendStringInfoString(&retstr, separator);
+
+			/*
+			 * Slot names consist only with lower-case letters. We don't
+			 * bother quoting.
+			 */
+			appendStringInfoString(&retstr, NameStr(s->data.name));
+			insert_separator = true;
+			nslots++;
+		}
+	}
+	LWLockRelease(ReplicationSlotControlLock);
+
+	/* return the number of slots in the list if requested */
+	if (pnslots)
+		*pnslots = nslots;
+
+	/* return NULL instead of an empty string */
+	return retstr.data[0] ? retstr.data : NULL;
+}
+
 /*
  * Flush all replication slots to disk.
  *
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 156d147c85..c5f04fb8a5 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2629,6 +2629,18 @@ static struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"max_slot_wal_keep_size", PGC_SIGHUP, REPLICATION_SENDING,
+			gettext_noop("Sets the maximum size of extra WALs kept by replication slots."),
+		 NULL,
+		 GUC_UNIT_MB
+		},
+		&max_slot_wal_keep_size_mb,
+		-1, -1,
+		MAX_KILOBYTES, /* XXX: This is in megabytes, like max/min_wal_size */
+		NULL, NULL, NULL
+	},
+
 	{
 		{"wal_sender_timeout", PGC_USERSET, REPLICATION_SENDING,
 			gettext_noop("Sets the maximum time to wait for WAL replication."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 194f312096..6f96177bbd 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -282,6 +282,7 @@
 #max_wal_senders = 10		# max number of walsender processes
 				# (change requires restart)
 #wal_keep_segments = 0		# in logfile segments; 0 disables
+#max_slot_wal_keep_size = -1	# measured in bytes; -1 disables
 #wal_sender_timeout = 60s	# in milliseconds; 0 disables
 
 #max_replication_slots = 10	# max number of replication slots
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index f90a6a9139..b2eb30b779 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -108,6 +108,7 @@ extern int	wal_segment_size;
 extern int	min_wal_size_mb;
 extern int	max_wal_size_mb;
 extern int	wal_keep_segments;
+extern int	max_slot_wal_keep_size_mb;
 extern int	XLOGbuffers;
 extern int	XLogArchiveTimeout;
 extern int	wal_retrieve_retry_interval;
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index a8f1d66bae..9c3635dc0e 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -199,6 +199,7 @@ extern void ReplicationSlotsComputeRequiredLSN(void);
 extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
+extern char *ReplicationSlotsEnumerateBehinds(XLogRecPtr target, char *separator, int *pnslots);
 
 extern void StartupReplicationSlots(void);
 extern void CheckPointReplicationSlots(void);
-- 
2.16.3

