From e4800e6c85c562ac3856a65e2838fbc3e5a79b63 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <bossartn@amazon.com>
Date: Sun, 12 Dec 2021 22:07:11 -0800
Subject: [PATCH v20 3/4] Move removal of old logical rewrite mapping files to
 custodian.

If there are many such files to remove, checkpoints can take much
longer.  To avoid this, move this work to the newly-introduced
custodian process.

Since the mapping files include 32-bit transaction IDs, there is a
risk of wraparound if the files are not cleaned up fast enough.
Removing these files in checkpoints offered decent wraparound
protection simply due to the relatively high frequency of
checkpointing.  With this change, servers should still clean up
mappings files with decently high frequency, but in theory the
wraparound risk might worsen for some (e.g., if the custodian is
spending a lot of time on a different task).  Given this is an
existing problem, this change makes no effort to handle the
wraparound risk, and it is left as a future exercise.
---
 contrib/test_decoding/expected/rewrite.out | 19 ++++++
 contrib/test_decoding/sql/rewrite.sql      | 14 ++++
 src/backend/access/heap/rewriteheap.c      | 78 +++++++++++++++++++---
 src/backend/postmaster/custodian.c         | 43 ++++++++++++
 src/include/access/rewriteheap.h           |  1 +
 src/include/postmaster/custodian.h         |  4 ++
 6 files changed, 149 insertions(+), 10 deletions(-)

diff --git a/contrib/test_decoding/expected/rewrite.out b/contrib/test_decoding/expected/rewrite.out
index 8b97f15f6f..214a514a0a 100644
--- a/contrib/test_decoding/expected/rewrite.out
+++ b/contrib/test_decoding/expected/rewrite.out
@@ -183,3 +183,22 @@ SELECT count(*) = 0 FROM pg_ls_logicalsnapdir();
  t
 (1 row)
 
+DO $$
+DECLARE
+	mappings_removed bool;
+	loops int := 0;
+BEGIN
+	LOOP
+		mappings_removed := count(*) = 0 FROM pg_ls_logicalmapdir();
+		IF mappings_removed OR loops > 120 * 100 THEN EXIT; END IF;
+		PERFORM pg_sleep(0.01);
+		loops := loops + 1;
+	END LOOP;
+END
+$$;
+SELECT count(*) = 0 FROM pg_ls_logicalmapdir();
+ ?column? 
+----------
+ t
+(1 row)
+
diff --git a/contrib/test_decoding/sql/rewrite.sql b/contrib/test_decoding/sql/rewrite.sql
index d268fa559a..d66f70f837 100644
--- a/contrib/test_decoding/sql/rewrite.sql
+++ b/contrib/test_decoding/sql/rewrite.sql
@@ -122,3 +122,17 @@ BEGIN
 END
 $$;
 SELECT count(*) = 0 FROM pg_ls_logicalsnapdir();
+DO $$
+DECLARE
+	mappings_removed bool;
+	loops int := 0;
+BEGIN
+	LOOP
+		mappings_removed := count(*) = 0 FROM pg_ls_logicalmapdir();
+		IF mappings_removed OR loops > 120 * 100 THEN EXIT; END IF;
+		PERFORM pg_sleep(0.01);
+		loops := loops + 1;
+	END LOOP;
+END
+$$;
+SELECT count(*) = 0 FROM pg_ls_logicalmapdir();
diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index 8993c1ed5a..9ea0f81ac3 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -116,6 +116,7 @@
 #include "lib/ilist.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "postmaster/custodian.h"
 #include "replication/logical.h"
 #include "replication/slot.h"
 #include "storage/bufmgr.h"
@@ -123,6 +124,7 @@
 #include "storage/procarray.h"
 #include "storage/smgr.h"
 #include "utils/memutils.h"
+#include "utils/pg_lsn.h"
 #include "utils/rel.h"
 
 /*
@@ -1179,7 +1181,8 @@ heap_xlog_logical_rewrite(XLogReaderState *r)
  * Perform a checkpoint for logical rewrite mappings
  *
  * This serves two tasks:
- * 1) Remove all mappings not needed anymore based on the logical restart LSN
+ * 1) Alert the custodian to remove all mappings not needed anymore based on the
+ *    logical restart LSN
  * 2) Flush all remaining mappings to disk, so that replay after a checkpoint
  *	  only has to deal with the parts of a mapping that have been written out
  *	  after the checkpoint started.
@@ -1207,6 +1210,9 @@ CheckPointLogicalRewriteHeap(void)
 	if (cutoff != InvalidXLogRecPtr && redo < cutoff)
 		cutoff = redo;
 
+	/* let the custodian know what it can remove */
+	RequestCustodian(CUSTODIAN_REMOVE_REWRITE_MAPPINGS, LSNGetDatum(cutoff));
+
 	mappings_dir = AllocateDir("pg_logical/mappings");
 	while ((mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL)
 	{
@@ -1239,15 +1245,7 @@ CheckPointLogicalRewriteHeap(void)
 
 		lsn = ((uint64) hi) << 32 | lo;
 
-		if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
-		{
-			elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
-			if (unlink(path) < 0)
-				ereport(ERROR,
-						(errcode_for_file_access(),
-						 errmsg("could not remove file \"%s\": %m", path)));
-		}
-		else
+		if (lsn >= cutoff && cutoff != InvalidXLogRecPtr)
 		{
 			/* on some operating systems fsyncing a file requires O_RDWR */
 			int			fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
@@ -1285,3 +1283,63 @@ CheckPointLogicalRewriteHeap(void)
 	/* persist directory entries to disk */
 	fsync_fname("pg_logical/mappings", true);
 }
+
+/*
+ * Remove all mappings not needed anymore based on the logical restart LSN saved
+ * by the checkpointer.  We use this saved value instead of calling
+ * ReplicationSlotsComputeLogicalRestartLSN() so that we don't try to remove
+ * files that a concurrent call to CheckPointLogicalRewriteHeap() is trying to
+ * flush to disk.
+ */
+void
+RemoveOldLogicalRewriteMappings(void)
+{
+	XLogRecPtr	cutoff;
+	DIR		   *mappings_dir;
+	struct dirent *mapping_de;
+	char		path[MAXPGPATH + 20];
+
+	cutoff = CustodianGetLogicalRewriteCutoff();
+
+	mappings_dir = AllocateDir("pg_logical/mappings");
+	while ((mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL)
+	{
+		Oid			dboid;
+		Oid			relid;
+		XLogRecPtr	lsn;
+		TransactionId rewrite_xid;
+		TransactionId create_xid;
+		uint32		hi,
+					lo;
+		PGFileType	de_type;
+
+		if (strcmp(mapping_de->d_name, ".") == 0 ||
+			strcmp(mapping_de->d_name, "..") == 0)
+			continue;
+
+		snprintf(path, sizeof(path), "pg_logical/mappings/%s", mapping_de->d_name);
+		de_type = get_dirent_type(path, mapping_de, false, DEBUG1);
+
+		if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG)
+			continue;
+
+		/* Skip over files that cannot be ours. */
+		if (strncmp(mapping_de->d_name, "map-", 4) != 0)
+			continue;
+
+		if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
+				   &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6)
+			elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
+
+		lsn = ((uint64) hi) << 32 | lo;
+		if (lsn >= cutoff && cutoff != InvalidXLogRecPtr)
+			continue;
+
+		elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
+		if (unlink(path) < 0)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not remove file \"%s\": %m", path)));
+	}
+	FreeDir(mappings_dir);
+}
diff --git a/src/backend/postmaster/custodian.c b/src/backend/postmaster/custodian.c
index 4e0ce1f7b3..4cbd89fae9 100644
--- a/src/backend/postmaster/custodian.c
+++ b/src/backend/postmaster/custodian.c
@@ -21,6 +21,7 @@
  */
 #include "postgres.h"
 
+#include "access/rewriteheap.h"
 #include "libpq/pqsignal.h"
 #include "pgstat.h"
 #include "postmaster/custodian.h"
@@ -33,11 +34,13 @@
 #include "storage/procsignal.h"
 #include "storage/smgr.h"
 #include "utils/memutils.h"
+#include "utils/pg_lsn.h"
 
 static void DoCustodianTasks(void);
 static CustodianTask CustodianGetNextTask(void);
 static void CustodianEnqueueTask(CustodianTask task);
 static const struct cust_task_funcs_entry *LookupCustodianFunctions(CustodianTask task);
+static void CustodianSetLogicalRewriteCutoff(Datum arg);
 
 typedef struct
 {
@@ -45,6 +48,8 @@ typedef struct
 
 	CustodianTask task_queue_elems[NUM_CUSTODIAN_TASKS];
 	int			task_queue_head;
+
+	XLogRecPtr  logical_rewrite_mappings_cutoff;    /* can remove older mappings */
 } CustodianShmemStruct;
 
 static CustodianShmemStruct *CustodianShmem;
@@ -72,6 +77,7 @@ struct cust_task_funcs_entry
  */
 static const struct cust_task_funcs_entry cust_task_functions[] = {
 	{CUSTODIAN_REMOVE_SERIALIZED_SNAPSHOTS, RemoveOldSerializedSnapshots, NULL},
+	{CUSTODIAN_REMOVE_REWRITE_MAPPINGS, RemoveOldLogicalRewriteMappings, CustodianSetLogicalRewriteCutoff},
 	{INVALID_CUSTODIAN_TASK, NULL, NULL}	/* must be last */
 };
 
@@ -377,3 +383,40 @@ LookupCustodianFunctions(CustodianTask task)
 	elog(ERROR, "could not lookup functions for custodian task %d", task);
 	pg_unreachable();
 }
+
+/*
+ * Stores the provided cutoff LSN in the custodian's shared memory.
+ *
+ * It's okay if the cutoff LSN is updated before a previously set cutoff has
+ * been used for cleaning up files.  If that happens, it just means that the
+ * next invocation of RemoveOldLogicalRewriteMappings() will use a more accurate
+ * cutoff.
+ */
+static void
+CustodianSetLogicalRewriteCutoff(Datum arg)
+{
+	SpinLockAcquire(&CustodianShmem->cust_lck);
+	CustodianShmem->logical_rewrite_mappings_cutoff = DatumGetLSN(arg);
+	SpinLockRelease(&CustodianShmem->cust_lck);
+
+	/* if pass-by-ref, free Datum memory */
+#ifndef USE_FLOAT8_BYVAL
+	pfree(DatumGetPointer(arg));
+#endif
+}
+
+/*
+ * Used by the custodian to determine which logical rewrite mapping files it can
+ * remove.
+ */
+XLogRecPtr
+CustodianGetLogicalRewriteCutoff(void)
+{
+	XLogRecPtr  cutoff;
+
+	SpinLockAcquire(&CustodianShmem->cust_lck);
+	cutoff = CustodianShmem->logical_rewrite_mappings_cutoff;
+	SpinLockRelease(&CustodianShmem->cust_lck);
+
+	return cutoff;
+}
diff --git a/src/include/access/rewriteheap.h b/src/include/access/rewriteheap.h
index 1125457053..dc3eb3e308 100644
--- a/src/include/access/rewriteheap.h
+++ b/src/include/access/rewriteheap.h
@@ -53,5 +53,6 @@ typedef struct LogicalRewriteMappingData
  */
 #define LOGICAL_REWRITE_FORMAT "map-%x-%x-%X_%X-%x-%x"
 extern void CheckPointLogicalRewriteHeap(void);
+extern void RemoveOldLogicalRewriteMappings(void);
 
 #endif							/* REWRITE_HEAP_H */
diff --git a/src/include/postmaster/custodian.h b/src/include/postmaster/custodian.h
index ab6d4283b9..00280c203b 100644
--- a/src/include/postmaster/custodian.h
+++ b/src/include/postmaster/custodian.h
@@ -12,6 +12,8 @@
 #ifndef _CUSTODIAN_H
 #define _CUSTODIAN_H
 
+#include "access/xlogdefs.h"
+
 /*
  * If you add a new task here, be sure to add its corresponding function
  * pointers to cust_task_functions in custodian.c.
@@ -19,6 +21,7 @@
 typedef enum CustodianTask
 {
 	CUSTODIAN_REMOVE_SERIALIZED_SNAPSHOTS,
+	CUSTODIAN_REMOVE_REWRITE_MAPPINGS,
 
 	NUM_CUSTODIAN_TASKS,			/* new tasks go above */
 	INVALID_CUSTODIAN_TASK
@@ -28,5 +31,6 @@ extern void CustodianMain(void) pg_attribute_noreturn();
 extern Size CustodianShmemSize(void);
 extern void CustodianShmemInit(void);
 extern void RequestCustodian(CustodianTask task, Datum arg);
+extern XLogRecPtr CustodianGetLogicalRewriteCutoff(void);
 
 #endif						/* _CUSTODIAN_H */
-- 
2.25.1

