From 19d421790bb03cf93f2c944fdd70e38b0d710b3d Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 21 Dec 2017 21:23:25 +0900
Subject: [PATCH 2/4] Add monitoring aid for max_slot_wal_keep_size.

Adds two columns "status" and "remain" in pg_replication_slot.
Setting max_slot_wal_keep_size, long-disconnected slots may lose sync.
The two columns shows whether the slot can be reconnected or not, or
about to lose reserving WAL segments, and the remaining bytes of WAL
that can be written until the slot loses reserving WAL records.
---
 contrib/test_decoding/expected/ddl.out |   4 +-
 src/backend/access/transam/xlog.c      | 133 +++++++++++++++++++++++++++++++--
 src/backend/catalog/system_views.sql   |   4 +-
 src/backend/replication/slotfuncs.c    |  32 +++++++-
 src/include/access/xlog.h              |   1 +
 src/include/catalog/pg_proc.dat        |   6 +-
 src/test/regress/expected/rules.out    |   6 +-
 7 files changed, 172 insertions(+), 14 deletions(-)

diff --git a/contrib/test_decoding/expected/ddl.out b/contrib/test_decoding/expected/ddl.out
index b7c76469fc..6b6a2df213 100644
--- a/contrib/test_decoding/expected/ddl.out
+++ b/contrib/test_decoding/expected/ddl.out
@@ -706,7 +706,7 @@ SELECT pg_drop_replication_slot('regression_slot');
 
 /* check that the slot is gone */
 SELECT * FROM pg_replication_slots;
- slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn 
------------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------
+ slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn | wal_status | remain 
+-----------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------+------------+--------
 (0 rows)
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 982eedad32..1934f2165b 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -868,7 +868,7 @@ static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI,
 static void LocalSetXLogInsertAllowed(void);
 static void CreateEndOfRecoveryRecord(void);
 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
-static XLogSegNo GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr);
+static XLogSegNo GetOldestKeepSegment(XLogRecPtr currpos, XLogRecPtr minSlotPtr, XLogRecPtr restartLSN, uint64 *restBytes);
 static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
 static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
 
@@ -9490,19 +9490,119 @@ CreateRestartPoint(int flags)
 	return true;
 }
 
+
+/*
+ * Returns the segment number of the oldest file in XLOG directory.
+ */
+static XLogSegNo
+GetOldestXLogFileSegNo(void)
+{
+	DIR		*xldir;
+	struct dirent *xlde;
+	XLogSegNo segno = 0;
+
+	xldir = AllocateDir(XLOGDIR);
+	while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
+	{
+		TimeLineID tli;
+		XLogSegNo fsegno;
+
+		/* Ignore files that are not XLOG segments */
+		if (!IsXLogFileName(xlde->d_name) &&
+			!IsPartialXLogFileName(xlde->d_name))
+			continue;
+
+		XLogFromFileName(xlde->d_name, &tli, &fsegno, wal_segment_size);
+
+		/* get minimum segment ignoring timeline ID */
+		if (segno == 0 || fsegno < segno)
+			segno = fsegno;
+	}
+
+	FreeDir(xldir);
+
+	return segno;
+}
+
+/*
+ * Check if the record on the given targetLSN is present in XLOG files.
+ *
+ * Returns three kind of values.
+ * 0 means that WAL record at targetLSN is already removed.
+ * 1 means that WAL record at targetLSN is available.
+ * 2 means that WAL record at targetLSN is available but about to be removed by
+ * the next checkpoint.
+ */
+int
+IsLsnStillAvaiable(XLogRecPtr targetLSN, uint64 *restBytes)
+{
+	XLogRecPtr currpos;
+	XLogRecPtr slotPtr;
+	XLogSegNo targetSeg;
+	XLogSegNo tailSeg;
+	XLogSegNo oldestSeg;
+
+	Assert(!XLogRecPtrIsInvalid(targetLSN));
+	Assert(restBytes);
+
+	currpos = GetXLogWriteRecPtr();
+
+	SpinLockAcquire(&XLogCtl->info_lck);
+	oldestSeg = XLogCtl->lastRemovedSegNo;
+	SpinLockRelease(&XLogCtl->info_lck);
+
+	/*
+	 * oldestSeg is zero before at least one segment has been removed since
+	 * startup. Use oldest segno taken from file names.
+	 */
+	if (oldestSeg == 0)
+	{
+		static XLogSegNo oldestFileSeg = 0;
+
+		if (oldestFileSeg == 0)
+			oldestFileSeg = GetOldestXLogFileSegNo();
+		/* let it have the same meaning with lastRemovedSegNo here */
+		oldestSeg = oldestFileSeg - 1;
+	}
+
+	/* oldest segment is just after the last removed segment */
+	oldestSeg++;
+
+	XLByteToSeg(targetLSN, targetSeg, wal_segment_size);
+
+	slotPtr = XLogGetReplicationSlotMinimumLSN();
+	tailSeg = GetOldestKeepSegment(currpos, slotPtr, targetLSN, restBytes);
+
+	/* targetSeg is being reserved by slots */
+	if (tailSeg <= targetSeg)
+		return 1;
+
+	/* targetSeg is not reserved but still available */
+	if (oldestSeg <= targetSeg)
+		return 2;
+
+	/* targetSeg has gone */
+	return	0;
+}
+
 /*
  * Returns minimum segment number the next checkpoint must leave considering
  * wal_keep_segments, replication slots and max_slot_wal_keep_size.
  *
  * currLSN is the current insert location
  * minSlotLSN is the minimum restart_lsn of all active slots
+ * restartLSN is restart_lsn of a slot.
+
+ * If restBytes is not NULL, returns remaining LSN bytes to advance until the
+ * segment which contains restart_LSN is will be removed.
  */
 static XLogSegNo
-GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN)
+GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN, XLogRecPtr restartLSN, uint64 *restBytes)
 {
 	uint64		keepSegs = 0;
 	XLogSegNo	currSeg;
 	XLogSegNo	minSlotSeg;
+	uint64		limitSegs = 0;
 
 	XLByteToSeg(currLSN, currSeg, wal_segment_size);
 	XLByteToSeg(minSlotLSN, minSlotSeg, wal_segment_size);
@@ -9517,8 +9617,6 @@ GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN)
 	/* Cap keepSegs by max_slot_wal_keep_size */
 	if (max_slot_wal_keep_size_mb > 0)
 	{
-		uint64 limitSegs;
-
 		limitSegs = ConvertToXSegs(max_slot_wal_keep_size_mb, wal_segment_size);
 
 		/* Apply max_slot_wal_keep_size to keeping segments */
@@ -9530,6 +9628,30 @@ GetOldestKeepSegment(XLogRecPtr currLSN, XLogRecPtr minSlotLSN)
 	if (wal_keep_segments > 0 && keepSegs < wal_keep_segments)
 		keepSegs = wal_keep_segments;
 
+	/*
+	 * Return remaining LSN bytes to advance until the slot gives up reserving
+	 * WAL records if requested.
+	 */
+	if (restBytes)
+	{
+		uint64 fragbytes;
+		XLogSegNo restartSeg;
+
+		*restBytes = 0;
+
+		XLByteToSeg(restartLSN, restartSeg, wal_segment_size);
+		if (limitSegs > 0 && currSeg <= restartSeg + limitSegs)
+		{
+			/*
+			 * This slot still has all required segments. Calculate how many
+			 * LSN bytes the slot has until it loses restart_lsn.
+			 */
+			fragbytes = wal_segment_size - (currLSN % wal_segment_size);
+			XLogSegNoOffsetToRecPtr(restartSeg + limitSegs - currSeg, fragbytes,
+									wal_segment_size, *restBytes);
+		}
+	}
+
 	/* avoid underflow, don't go below 1 */
 	if (currSeg <= keepSegs)
 		return 1;
@@ -9558,7 +9680,8 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
 	/*
 	 * We should keep certain number of WAL segments after this checkpoint.
 	 */
-	minSegNo = GetOldestKeepSegment(recptr, slotminptr);
+	minSegNo =
+		GetOldestKeepSegment(recptr, slotminptr, InvalidXLogRecPtr, NULL);
 
 	/*
 	 * warn if the checkpoint flushes the segments required by replication
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 7251552419..5db294f64e 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -797,7 +797,9 @@ CREATE VIEW pg_replication_slots AS
             L.xmin,
             L.catalog_xmin,
             L.restart_lsn,
-            L.confirmed_flush_lsn
+            L.confirmed_flush_lsn,
+            L.wal_status,
+            L.remain
     FROM pg_get_replication_slots() AS L
             LEFT JOIN pg_database D ON (L.datoid = D.oid);
 
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 8782bad4a2..d9ed9e8cf2 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -185,7 +185,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_get_replication_slots(PG_FUNCTION_ARGS)
 {
-#define PG_GET_REPLICATION_SLOTS_COLS 11
+#define PG_GET_REPLICATION_SLOTS_COLS 13
 	ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
 	TupleDesc	tupdesc;
 	Tuplestorestate *tupstore;
@@ -307,6 +307,36 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
 		else
 			nulls[i++] = true;
 
+		if (restart_lsn == InvalidXLogRecPtr)
+		{
+			values[i++] = CStringGetTextDatum("unknown");
+			values[i++] = LSNGetDatum(InvalidXLogRecPtr);
+		}
+		else
+		{
+			uint64	remaining_bytes;
+			char *status;
+
+			switch (IsLsnStillAvaiable(restart_lsn, &remaining_bytes))
+			{
+			case 0:
+				status = "lost";
+				break;
+			case 1:
+				status = "streaming";
+				break;
+			case 2:
+				status = "keeping";
+				break;
+			default:
+				status = "unknown";
+				break;
+			}
+
+			values[i++] = CStringGetTextDatum(status);
+			values[i++] = Int64GetDatum(remaining_bytes);
+		}
+
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
 	}
 	LWLockRelease(ReplicationSlotControlLock);
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 12cd0d1d10..ad9d1dec29 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -269,6 +269,7 @@ extern void ShutdownXLOG(int code, Datum arg);
 extern void InitXLOGAccess(void);
 extern void CreateCheckPoint(int flags);
 extern bool CreateRestartPoint(int flags);
+extern int IsLsnStillAvaiable(XLogRecPtr targetLSN, uint64 *restBytes);
 extern void XLogPutNextOid(Oid nextOid);
 extern XLogRecPtr XLogRestorePoint(const char *rpName);
 extern void UpdateFullPageWrites(void);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index a14651010f..4a096c9478 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -9796,9 +9796,9 @@
   proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
   proretset => 't', provolatile => 's', prorettype => 'record',
   proargtypes => '',
-  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn}',
-  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn}',
+  proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8}',
+  proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,remain}',
   prosrc => 'pg_get_replication_slots' },
 { oid => '3786', descr => 'set up a logical replication slot',
   proname => 'pg_create_logical_replication_slot', provolatile => 'v',
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 078129f251..02286cdfe8 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1451,8 +1451,10 @@ pg_replication_slots| SELECT l.slot_name,
     l.xmin,
     l.catalog_xmin,
     l.restart_lsn,
-    l.confirmed_flush_lsn
-   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn)
+    l.confirmed_flush_lsn,
+    l.wal_status,
+    l.remain
+   FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, remain)
      LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
 pg_roles| SELECT pg_authid.rolname,
     pg_authid.rolsuper,
-- 
2.16.3

