From b4a555e4ddb1ce770ed1356e3b4da54e4fbeaf12 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 21 Dec 2017 21:23:25 +0900
Subject: [PATCH 2/4] Add monitoring aid for max_slot_wal_keep_size.

Adds two columns "status" and "remain" in pg_replication_slot.
Setting max_slot_wal_keep_size, long-disconnected slots may lose sync.
The two columns shows whether the slot can be reconnected or not, or
about to lose reserving WAL segments, and the remaining bytes of WAL
that can be written until the slot loses reserving WAL records.
---
 contrib/test_decoding/expected/ddl.out |   4 +-
 src/backend/access/transam/xlog.c      | 141 +++++++++++++++++++++++++++++++--
 src/backend/catalog/system_views.sql   |   4 +-
 src/backend/replication/slotfuncs.c    |  32 +++++++-
 src/include/access/xlog.h              |   1 +
 src/include/catalog/pg_proc.dat        |   6 +-
 src/test/regress/expected/rules.out    |   6 +-
 7 files changed, 180 insertions(+), 14 deletions(-)

diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out
index b7c76469fc..6b6a2df213 100644
--- a/contrib/test_decoding/expected/ddl.out
+++ b/contrib/test_decoding/expected/ddl.out
@@ -706,7 +706,7 @@ SELECT pg_drop_replication_slot('regression_slot');
 
 /* check that the slot is gone */
 SELECT * FROM pg_replication_slots;
- slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn 
------------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------
+ slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | remain 
+-----------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+--------
 (0 rows)
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index deda43607d..bad9db51b3 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -868,7 +868,8 @@ static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI,
 static void LocalSetXLogInsertAllowed(void);
 static void CreateEndOfRecoveryRecord(void);
 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
-static XLogSegNo GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr);
+static XLogSegNo GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr,
+					   XLogRecPtr targetLSN, uint64 *restBytes);
 static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
 static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
 
@@ -9507,19 +9508,126 @@ CreateRestartPoint(int flags)
 	return true;
 }
 
+
+/*
+ * Finds the segment number of the oldest file in XLOG directory.
+ *
+ * This function is intended to be used only when we haven't removed a WAL
+ * segment. Read XLogCtl->lastRemovedSegNo if any.
+ */
+static XLogSegNo
+GetOldestXLogFileSegNo(void)
+{
+	DIR		*xldir;
+	struct dirent *xlde;
+	XLogSegNo segno = 0;
+
+	xldir = AllocateDir(XLOGDIR);
+	while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
+	{
+		TimeLineID tli;
+		XLogSegNo fsegno;
+
+		/* Ignore files that are not XLOG segments */
+		if (!IsXLogFileName(xlde->d_name) &&
+			!IsPartialXLogFileName(xlde->d_name))
+			continue;
+
+		XLogFromFileName(xlde->d_name, &tli, &fsegno, wal_segment_size);
+
+		/* get minimum segment ignoring timeline ID */
+		if (segno == 0 || fsegno < segno)
+			segno = fsegno;
+	}
+
+	FreeDir(xldir);
+
+	return segno;
+}
+
+/*
+ * Check if the record on the given targetLSN is present in XLOG files.
+ *
+ * Returns three kind of values.
+ * 0 means that WAL record at targetLSN is already removed.
+ * 1 means that WAL record at targetLSN is available.
+ * 2 means that WAL record at targetLSN is available but about to be removed by
+ * the next checkpoint.
+ */
+int
+IsLsnStillAvaiable(XLogRecPtr targetLSN, uint64 *restBytes)
+{
+	XLogRecPtr currpos;
+	XLogRecPtr slotPtr;
+	XLogSegNo targetSeg;
+	XLogSegNo tailSeg;
+	XLogSegNo oldestSeg;
+
+	Assert(!XLogRecPtrIsInvalid(targetLSN));
+	Assert(restBytes);
+
+	currpos = GetXLogWriteRecPtr();
+
+	SpinLockAcquire(&XLogCtl->info_lck);
+	oldestSeg = XLogCtl->lastRemovedSegNo;
+	SpinLockRelease(&XLogCtl->info_lck);
+
+	if (oldestSeg != 0)
+	{
+		/* oldest segment is just after the last removed segment */
+		oldestSeg++;
+	}
+	else
+	{
+		/*
+		 * We haven't removed a WAL segment since startup. Get the number
+		 * looking WAL files.
+		 */
+		static XLogSegNo oldestFileSeg = 0;
+
+		/* Must do it the hard way for the first time */
+		if (oldestFileSeg == 0)
+			oldestFileSeg = GetOldestXLogFileSegNo();
+
+		oldestSeg = oldestFileSeg;
+	}
+
+	XLByteToSeg(targetLSN, targetSeg, wal_segment_size);
+
+	slotPtr = XLogGetReplicationSlotMinimumLSN();
+	tailSeg = GetOldestKeepSegment(currpos, slotPtr, targetLSN, restBytes);
+
+	/* targetSeg is being reserved by slots */
+	if (tailSeg <= targetSeg)
+		return 1;
+
+	/* targetSeg is not reserved but still available */
+	if (oldestSeg <= targetSeg)
+		return 2;
+
+	/* targetSeg has gone */
+	return	0;
+}
+
 /*
  * Returns minimum segment number the next checkpoint must leave considering
  * wal_keep_segments, replication slots and max_slot_wal_keep_size.
  *
  * currLSN is the current insert location
  * minSlotLSN is the minimum restart_lsn of all active slots
+ * targetLSN is used when restBytes is not NULL.
+ *
+ * If restBytes is not NULL, sets the remaining LSN bytes to advance until the
+ * segment that contains targetLSN will be removed.
  */
 static XLogSegNo
-GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN)
+GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN,
+					 XLogRecPtr targetLSN, uint64 *restBytes)
 {
 	uint64		keepSegs = 0;
 	XLogSegNo	currSeg;
 	XLogSegNo	minSlotSeg;
+	uint64		limitSegs = 0;
 
 	XLByteToSeg(currLSN, currSeg, wal_segment_size);
 	XLByteToSeg(minSlotLSN, minSlotSeg, wal_segment_size);
@@ -9534,8 +9642,6 @@ GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN)
 	/* Cap keepSegs by max_slot_wal_keep_size */
 	if (max_slot_wal_keep_size_mb >= 0)
 	{
-		uint64 limitSegs;
-
 		limitSegs = ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size);
 
 		/* Apply max_slot_wal_keep_size to keepSegs */
@@ -9547,6 +9653,30 @@ GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN)
 	if (wal_keep_segments > 0 && keepSegs < wal_keep_segments)
 		keepSegs = wal_keep_segments;
 
+	/*
+	 * If requested, return remaining LSN bytes to advance until the slot
+	 * gives up reserving WAL records.
+	 */
+	if (restBytes)
+	{
+		uint64 fragbytes;
+		XLogSegNo restartSeg;
+
+		*restBytes = 0;
+
+		XLByteToSeg(targetLSN, restartSeg, wal_segment_size);
+		if (max_slot_wal_keep_size_mb >= 0 && currSeg <= restartSeg + limitSegs)
+		{
+			/*
+			 * This slot still has all required segments. Calculate how many
+			 * LSN bytes the slot has until it loses targetLSN.
+			 */
+			fragbytes = wal_segment_size - (currLSN % wal_segment_size);
+			XLogSegNoOffsetToRecPtr(restartSeg + limitSegs - currSeg, fragbytes,
+									wal_segment_size, *restBytes);
+		}
+	}
+
 	/* avoid underflow, don't go below 1 */
 	if (currSeg <= keepSegs)
 		return 1;
@@ -9575,7 +9705,8 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 	/*
 	 * We should keep certain number of WAL segments after this checkpoint.
 	 */
-	minSegNo = GetOldestKeepSegment(recptr, slotminptr);
+	minSegNo =
+		GetOldestKeepSegment(recptr, slotminptr, InvalidXLogRecPtr, NULL);
 
 	/*
 	 * warn if the checkpoint flushes the segments required by replication
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 7251552419..5db294f64e 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -797,7 +797,9 @@ CREATE VIEW pg_replication_slots AS
             L.xmin,
             L.catalog_xmin,
             L.restart_lsn,
-            L.confirmed_flush_lsn
+            L.confirmed_flush_lsn,
+            L.wal_status,
+            L.remain
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 8782bad4a2..d9ed9e8cf2 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -185,7 +185,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 11
+#define PG_GET_REPLICATION_SLOTS_COLS 13
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
 	Tuplestorestate *tupstore;
@@ -307,6 +307,36 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		else
 			nulls[i++] = true;
 
+		if (restart_lsn == InvalidXLogRecPtr)
+		{
+			values[i++] = CStringGetTextDatum("unknown");
+			values[i++] = LSNGetDatum(InvalidXLogRecPtr);
+		}
+		else
+		{
+			uint64	remaining_bytes;
+			char *status;
+
+			switch (IsLsnStillAvaiable(restart_lsn, &remaining_bytes))
+			{
+			case 0:
+				status = "lost";
+				break;
+			case 1:
+				status = "streaming";
+				break;
+			case 2:
+				status = "keeping";
+				break;
+			default:
+				status = "unknown";
+				break;
+			}
+
+			values[i++] = CStringGetTextDatum(status);
+			values[i++] = Int64GetDatum(remaining_bytes);
+		}
+
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
 	}
 	LWLockRelease(ReplicationSlotControlLock);
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 12cd0d1d10..ad9d1dec29 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -269,6 +269,7 @@ extern void ShutdownXLOG(int code, Datum arg);
 extern void InitXLOGAccess(void);
 extern void CreateCheckPoint(int flags);
 extern bool CreateRestartPoint(int flags);
+extern int IsLsnStillAvaiable(XLogRecPtr targetLSN, uint64 *restBytes);
 extern void XLogPutNextOid(Oid nextOid);
 extern XLogRecPtr XLogRestorePoint(const char *rpName);
 extern void UpdateFullPageWrites(void);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 860571440a..2c7cdbb66e 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -9796,9 +9796,9 @@
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}',
+  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,remain}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 078129f251..02286cdfe8 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1451,8 +1451,10 @@ pg_replication_slots| SELECT l.slot_name,
     l.xmin,
     l.catalog_xmin,
     l.restart_lsn,
-    l.confirmed_flush_lsn
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn)
+    l.confirmed_flush_lsn,
+    l.wal_status,
+    l.remain
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, remain)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
-- 
2.16.3

