From ecb7a8dd6a59ededd24578920bc1b4fbaf481b10 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 21 Dec 2017 21:20:20 +0900
Subject: [PATCH 1/4] Add WAL relief vent for replication slots

Adds a capability to limit the number of segments kept by replication
slots by a GUC variable.
---
 src/backend/access/transam/xlog.c             | 108 ++++++++++++++++++++------
 src/backend/utils/misc/guc.c                  |  12 +++
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/access/xlog.h                     |   1 +
 4 files changed, 97 insertions(+), 25 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 7375a78ffc..814cd01f79 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -105,6 +105,7 @@ int			wal_level = WAL_LEVEL_MINIMAL;
 int			CommitDelay = 0;	/* precommit delay in microseconds */
 int			CommitSiblings = 5; /* # concurrent xacts needed to sleep */
 int			wal_retrieve_retry_interval = 5000;
+int			max_slot_wal_keep_size_mb = -1;
 
 #ifdef WAL_DEBUG
 bool		XLOG_DEBUG = false;
@@ -867,6 +868,7 @@ static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI,
 static void LocalSetXLogInsertAllowed(void);
 static void CreateEndOfRecoveryRecord(void);
 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
+static XLogSegNo GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr);
 static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
 static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
 
@@ -9513,6 +9515,53 @@ CreateRestartPoint(int flags)
 	return true;
 }
 
+/*
+ * Returns minimum segment number the next checkpoint must leave considering
+ * wal_keep_segments, replication slots and max_slot_wal_keep_size.
+ *
+ * currLSN is the current insert location
+ * minSlotLSN is the minimum restart_lsn of all active slots
+ */
+static XLogSegNo
+GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN)
+{
+	uint64		keepSegs = 0;
+	XLogSegNo	currSeg;
+	XLogSegNo	minSlotSeg;
+
+	XLByteToSeg(currLSN, currSeg, wal_segment_size);
+	XLByteToSeg(minSlotLSN, minSlotSeg, wal_segment_size);
+
+	/*
+	 * Calculate keep segments by slots first. The second term of the
+	 * condition is just a sanity check.
+	 */
+	if (minSlotLSN != InvalidXLogRecPtr && minSlotSeg <= currSeg)
+		keepSegs = currSeg - minSlotSeg;
+
+	/* Cap keepSegs by max_slot_wal_keep_size */
+	if (max_slot_wal_keep_size_mb >= 0)
+	{
+		uint64 limitSegs;
+
+		limitSegs = ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size);
+
+		/* Apply max_slot_wal_keep_size to keepSegs */
+		if (limitSegs < keepSegs)
+			keepSegs = limitSegs;
+	}
+
+	/* but, keep at least wal_keep_segments segments if any */
+	if (wal_keep_segments > 0 && keepSegs < wal_keep_segments)
+		keepSegs = wal_keep_segments;
+
+	/* avoid underflow, don't go below 1 */
+	if (currSeg <= keepSegs)
+		return 1;
+
+	return currSeg - keepSegs;
+}
+
 /*
  * Retreat *logSegNo to the last segment that we need to retain because of
  * either wal_keep_segments or replication slots.
@@ -9524,38 +9573,47 @@ CreateRestartPoint(int flags)
 static void
 KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 {
-	XLogSegNo	segno;
-	XLogRecPtr	keep;
+	XLogRecPtr	slotminptr = InvalidXLogRecPtr;
+	XLogSegNo	minSegNo;
+	XLogSegNo	slotSegNo;
 
-	XLByteToSeg(recptr, segno, wal_segment_size);
-	keep = XLogGetReplicationSlotMinimumLSN();
+	if (max_replication_slots > 0)
+		slotminptr = XLogGetReplicationSlotMinimumLSN();
 
-	/* compute limit for wal_keep_segments first */
-	if (wal_keep_segments > 0)
+	/*
+	 * We should keep certain number of WAL segments after this checkpoint.
+	 */
+	minSegNo = GetOldestKeepSegment(recptr, slotminptr);
+
+	/*
+	 * warn if the checkpoint flushes the segments required by replication
+	 * slots.
+	 */
+	if (!XLogRecPtrIsInvalid(slotminptr))
 	{
-		/* avoid underflow, don't go below 1 */
-		if (segno <= wal_keep_segments)
-			segno = 1;
+		static XLogSegNo prev_lost_segs = 0;	/* avoid duplicate messages */
+
+		XLByteToSeg(slotminptr, slotSegNo, wal_segment_size);
+
+		if (slotSegNo < minSegNo)
+		{
+			XLogSegNo lost_segs = minSegNo - slotSegNo;
+			if (prev_lost_segs != lost_segs)
+				ereport(WARNING,
+						(errmsg ("some replication slots have lost required WAL segments"),
+						 errdetail_plural(
+							 "The mostly affected slot has lost %ld segment.",
+							 "The mostly affected slot has lost %ld segments.",
+							 lost_segs, lost_segs)));
+			prev_lost_segs = lost_segs;
+		}
 		else
-			segno = segno - wal_keep_segments;
-	}
-
-	/* then check whether slots limit removal further */
-	if (max_replication_slots > 0 && keep != InvalidXLogRecPtr)
-	{
-		XLogSegNo	slotSegNo;
-
-		XLByteToSeg(keep, slotSegNo, wal_segment_size);
-
-		if (slotSegNo <= 0)
-			segno = 1;
-		else if (slotSegNo < segno)
-			segno = slotSegNo;
+			prev_lost_segs = 0;
 	}
 
 	/* don't delete WAL segments newer than the calculated segment */
-	if (segno < *logSegNo)
-		*logSegNo = segno;
+	if (minSegNo < *logSegNo)
+		*logSegNo = minSegNo;
 }
 
 /*
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 2317e8be6b..b26c9abec0 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2538,6 +2538,18 @@ static struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"max_slot_wal_keep_size", PGC_SIGHUP, REPLICATION_SENDING,
+			gettext_noop("Sets the maximum size of extra WALs kept by replication slots."),
+		 NULL,
+		 GUC_UNIT_MB
+		},
+		&max_slot_wal_keep_size_mb,
+		-1, -1,
+		MAX_KILOBYTES, /* XXX: This is in megabytes, like max/min_wal_size */
+		NULL, NULL, NULL
+	},
+
 	{
 		{"wal_sender_timeout", PGC_USERSET, REPLICATION_SENDING,
 			gettext_noop("Sets the maximum time to wait for WAL replication."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 4e61bc6521..0fe00d99a9 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -238,6 +238,7 @@
 #max_wal_senders = 10		# max number of walsender processes
 				# (change requires restart)
 #wal_keep_segments = 0		# in logfile segments; 0 disables
+#max_slot_wal_keep_size = -1	# measured in bytes; -1 disables
 #wal_sender_timeout = 60s	# in milliseconds; 0 disables
 
 #max_replication_slots = 10	# max number of replication slots
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 421ba6d775..12cd0d1d10 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -98,6 +98,7 @@ extern int	wal_segment_size;
 extern int	min_wal_size_mb;
 extern int	max_wal_size_mb;
 extern int	wal_keep_segments;
+extern int	max_slot_wal_keep_size_mb;
 extern int	XLOGbuffers;
 extern int	XLogArchiveTimeout;
 extern int	wal_retrieve_retry_interval;
-- 
2.16.3

