From dbb5ca5bb79e7910f00bff20e8295e2fa3005d2d Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 21 Dec 2017 21:20:20 +0900
Subject: [PATCH 1/4] Add WAL releaf vent for replication slots

Adds a capability to limit the number of segments kept by replication
slots by a GUC variable.
---
 src/backend/access/transam/xlog.c             | 116 +++++++++++++++++++++-----
 src/backend/utils/misc/guc.c                  |  11 +++
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/access/xlog.h                     |   1 +
 4 files changed, 108 insertions(+), 21 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 47a6c4d895..542e1d78fe 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -105,6 +105,7 @@ int			wal_level = WAL_LEVEL_MINIMAL;
 int			CommitDelay = 0;	/* precommit delay in microseconds */
 int			CommitSiblings = 5; /* # concurrent xacts needed to sleep */
 int			wal_retrieve_retry_interval = 5000;
+int			max_slot_wal_keep_size_mb = 0;
 
 #ifdef WAL_DEBUG
 bool		XLOG_DEBUG = false;
@@ -861,6 +862,7 @@ static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI,
 static void LocalSetXLogInsertAllowed(void);
 static void CreateEndOfRecoveryRecord(void);
 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
+static XLogSegNo GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr);
 static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
 static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
 
@@ -9344,6 +9346,74 @@ CreateRestartPoint(int flags)
 	return true;
 }
 
+/*
+ * Returns minimum segment number the next checktpoint must leave considering
+ * wal_keep_segments, replication slots and max_slot_wal_keep_size.
+ */
+static XLogSegNo
+GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr)
+{
+	uint64 keepSegs;
+	XLogSegNo currSeg;
+	XLogSegNo tailSeg;
+	uint64 slotlimitbytes;
+	uint64 slotlimitfragment;
+	uint64 currposoff;
+	XLogRecPtr slotpos = minSlotPtr;
+	XLogSegNo	slotSeg;
+
+	Assert(wal_keep_segments >= 0);
+	Assert(max_slot_wal_keep_size_mb >= 0);
+
+	XLByteToSeg(currpos, currSeg, wal_segment_size);
+	XLByteToSeg(slotpos, slotSeg, wal_segment_size);
+
+	/*
+	 * wal_keep_segments keeps more segments than slot, slotpos is no longer
+	 * useful. Don't perform subtraction to keep values positive.
+	 */
+	if (slotpos != InvalidXLogRecPtr && currSeg <= slotSeg + wal_keep_segments)
+		slotpos = InvalidXLogRecPtr;
+
+	/* slots aren't useful, consider only wal_keep_segments */
+	if (slotpos == InvalidXLogRecPtr)
+	{
+		/* avoid underflow, don't go below 1 */
+		if (currSeg <= wal_keep_segments)
+			return 1;
+
+		return currSeg - wal_keep_segments;
+	}
+
+	/* just return slotSeg if we don't put a limit */
+	if (max_slot_wal_keep_size_mb == 0)
+		return slotSeg;
+
+	/*
+	 * Slot limit is defined and slot gives the oldest segment to keep,
+	 * calculate the oldest segment that should not be removed
+	 */
+	slotlimitbytes = 1024 * 1024 * max_slot_wal_keep_size_mb;
+	slotlimitfragment = XLogSegmentOffset(slotlimitbytes,
+												 wal_segment_size);
+	currposoff = XLogSegmentOffset(currpos, wal_segment_size);
+	keepSegs = wal_keep_segments +
+		ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size);
+	if (currposoff < slotlimitfragment)
+		keepSegs++;
+
+	/*
+	 * calculate the oldest segment that is kept by wal_keep_segments and
+	 * max_slot_wal_keep_size.
+	 */
+	if (currSeg <= keepSegs)
+		tailSeg = 1;
+	else
+		tailSeg = currSeg - keepSegs;
+
+	return tailSeg;
+}
+
 /*
  * Retreat *logSegNo to the last segment that we need to retain because of
  * either wal_keep_segments or replication slots.
@@ -9356,33 +9426,37 @@ static void
 KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 {
 	XLogSegNo	segno;
-	XLogRecPtr	keep;
+	XLogRecPtr	slotminptr = InvalidXLogRecPtr;
+	XLogSegNo	minSegNo;
+	XLogSegNo	slotSegNo;
 
 	XLByteToSeg(recptr, segno, wal_segment_size);
-	keep = XLogGetReplicationSlotMinimumLSN();
 
-	/* compute limit for wal_keep_segments first */
-	if (wal_keep_segments > 0)
+	if (max_replication_slots > 0)
+		slotminptr = XLogGetReplicationSlotMinimumLSN();
+
+	/*
+	 * We should keep certain number of WAL segments after this checktpoint.
+	 */
+	minSegNo = GetOldestKeepSegment(recptr, slotminptr);
+
+	/*
+	 * warn if the checkpoint flushes the segments required by replication
+	 * slots.
+	 */
+	if (!XLogRecPtrIsInvalid(slotminptr))
 	{
-		/* avoid underflow, don't go below 1 */
-		if (segno <= wal_keep_segments)
-			segno = 1;
-		else
-			segno = segno - wal_keep_segments;
+		XLByteToSeg(slotminptr, slotSegNo, wal_segment_size);
+
+		if (slotSegNo < minSegNo)
+			ereport(WARNING,
+					(errmsg ("some replication slots have lost required WAL segments"),
+					 errdetail("The mostly affected slot has lost %ld segments.",
+						   minSegNo - slotSegNo)));
 	}
 
-	/* then check whether slots limit removal further */
-	if (max_replication_slots > 0 && keep != InvalidXLogRecPtr)
-	{
-		XLogSegNo	slotSegNo;
-
-		XLByteToSeg(keep, slotSegNo, wal_segment_size);
-
-		if (slotSegNo <= 0)
-			segno = 1;
-		else if (slotSegNo < segno)
-			segno = slotSegNo;
-	}
+	if (minSegNo < segno)
+		segno = minSegNo;
 
 	/* don't delete WAL segments newer than the calculated segment */
 	if (segno < *logSegNo)
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 7a7ac479c1..de43c7139b 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2406,6 +2406,17 @@ static struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"max_slot_wal_keep_size", PGC_SIGHUP, REPLICATION_SENDING,
+			gettext_noop("Sets the maximum size of extra WALs kept by replication slots."),
+		 NULL,
+		 GUC_UNIT_MB
+		},
+		&max_slot_wal_keep_size_mb,
+		0, 0, INT_MAX,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"wal_sender_timeout", PGC_SIGHUP, REPLICATION_SENDING,
 			gettext_noop("Sets the maximum time to wait for WAL replication."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 048bf4cccd..7d5171c32c 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -239,6 +239,7 @@
 #max_wal_senders = 10		# max number of walsender processes
 				# (change requires restart)
 #wal_keep_segments = 0		# in logfile segments; 0 disables
+#max_slot_wal_keep_size = 0	# measured in bytes; 0 disables
 #wal_sender_timeout = 60s	# in milliseconds; 0 disables
 
 #max_replication_slots = 10	# max number of replication slots
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 421ba6d775..12cd0d1d10 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -98,6 +98,7 @@ extern int	wal_segment_size;
 extern int	min_wal_size_mb;
 extern int	max_wal_size_mb;
 extern int	wal_keep_segments;
+extern int	max_slot_wal_keep_size_mb;
 extern int	XLOGbuffers;
 extern int	XLogArchiveTimeout;
 extern int	wal_retrieve_retry_interval;
-- 
2.16.2

