From c9e0527d39f697408e01bc0ed1730e5f7eeeffd5 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <bossartn@amazon.com>
Date: Sun, 5 Dec 2021 22:02:40 -0800
Subject: [PATCH v19 2/4] Move removal of old serialized snapshots to
 custodian.

This was only done during checkpoints because it was a convenient
place to put it.  However, if there are many snapshots to remove,
it can significantly extend checkpoint time.  To avoid this, move
this work to the newly-introduced custodian process.
---
 contrib/test_decoding/expected/rewrite.out  | 21 +++++++++++++++++++++
 contrib/test_decoding/sql/rewrite.sql       | 17 +++++++++++++++++
 src/backend/access/transam/xlog.c           |  6 ++++--
 src/backend/postmaster/custodian.c          |  2 ++
 src/backend/replication/logical/snapbuild.c |  9 ++++-----
 src/include/postmaster/custodian.h          |  2 +-
 src/include/replication/snapbuild.h         |  2 +-
 7 files changed, 50 insertions(+), 9 deletions(-)

diff --git a/contrib/test_decoding/expected/rewrite.out b/contrib/test_decoding/expected/rewrite.out
index b30999c436..8b97f15f6f 100644
--- a/contrib/test_decoding/expected/rewrite.out
+++ b/contrib/test_decoding/expected/rewrite.out
@@ -162,3 +162,24 @@ DROP TABLE IF EXISTS replication_example;
 DROP FUNCTION iamalongfunction();
 DROP FUNCTION exec(text);
 DROP ROLE regress_justforcomments;
+-- make sure custodian cleans up files
+CHECKPOINT;
+DO $$
+DECLARE
+    snaps_removed bool;
+    loops int := 0;
+BEGIN
+    LOOP
+        snaps_removed := count(*) = 0 FROM pg_ls_logicalsnapdir();
+        IF snaps_removed OR loops > 120 * 100 THEN EXIT; END IF;
+        PERFORM pg_sleep(0.01);
+        loops := loops + 1;
+    END LOOP;
+END
+$$;
+SELECT count(*) = 0 FROM pg_ls_logicalsnapdir();
+ ?column? 
+----------
+ t
+(1 row)
+
diff --git a/contrib/test_decoding/sql/rewrite.sql b/contrib/test_decoding/sql/rewrite.sql
index 62dead3a9b..d268fa559a 100644
--- a/contrib/test_decoding/sql/rewrite.sql
+++ b/contrib/test_decoding/sql/rewrite.sql
@@ -105,3 +105,20 @@ DROP TABLE IF EXISTS replication_example;
 DROP FUNCTION iamalongfunction();
 DROP FUNCTION exec(text);
 DROP ROLE regress_justforcomments;
+
+-- make sure custodian cleans up files
+CHECKPOINT;
+DO $$
+DECLARE
+    snaps_removed bool;
+    loops int := 0;
+BEGIN
+    LOOP
+        snaps_removed := count(*) = 0 FROM pg_ls_logicalsnapdir();
+        IF snaps_removed OR loops > 120 * 100 THEN EXIT; END IF;
+        PERFORM pg_sleep(0.01);
+        loops := loops + 1;
+    END LOOP;
+END
+$$;
+SELECT count(*) = 0 FROM pg_ls_logicalsnapdir();
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index fb4c860bde..382e59f723 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -76,12 +76,12 @@
 #include "port/atomics.h"
 #include "port/pg_iovec.h"
 #include "postmaster/bgwriter.h"
+#include "postmaster/custodian.h"
 #include "postmaster/startup.h"
 #include "postmaster/walwriter.h"
 #include "replication/logical.h"
 #include "replication/origin.h"
 #include "replication/slot.h"
-#include "replication/snapbuild.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "storage/bufmgr.h"
@@ -6997,10 +6997,12 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
 {
 	CheckPointRelationMap();
 	CheckPointReplicationSlots();
-	CheckPointSnapBuild();
 	CheckPointLogicalRewriteHeap();
 	CheckPointReplicationOrigin();
 
+	/* tasks offloaded to custodian */
+	RequestCustodian(CUSTODIAN_REMOVE_SERIALIZED_SNAPSHOTS, (Datum) 0);
+
 	/* Write out all dirty data in SLRUs and the main buffer pool */
 	TRACE_POSTGRESQL_BUFFER_CHECKPOINT_START(flags);
 	CheckpointStats.ckpt_write_t = GetCurrentTimestamp();
diff --git a/src/backend/postmaster/custodian.c b/src/backend/postmaster/custodian.c
index 98bb9efcfd..4e0ce1f7b3 100644
--- a/src/backend/postmaster/custodian.c
+++ b/src/backend/postmaster/custodian.c
@@ -25,6 +25,7 @@
 #include "pgstat.h"
 #include "postmaster/custodian.h"
 #include "postmaster/interrupt.h"
+#include "replication/snapbuild.h"
 #include "storage/bufmgr.h"
 #include "storage/condition_variable.h"
 #include "storage/fd.h"
@@ -70,6 +71,7 @@ struct cust_task_funcs_entry
  * whether the task is already enqueued.
  */
 static const struct cust_task_funcs_entry cust_task_functions[] = {
+	{CUSTODIAN_REMOVE_SERIALIZED_SNAPSHOTS, RemoveOldSerializedSnapshots, NULL},
 	{INVALID_CUSTODIAN_TASK, NULL, NULL}	/* must be last */
 };
 
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 829c568112..6b403a2bb4 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -2036,14 +2036,13 @@ SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path)
 
 /*
  * Remove all serialized snapshots that are not required anymore because no
- * slot can need them. This doesn't actually have to run during a checkpoint,
- * but it's a convenient point to schedule this.
+ * slot can need them.
  *
- * NB: We run this during checkpoints even if logical decoding is disabled so
- * we cleanup old slots at some point after it got disabled.
+ * NB: We run this even if logical decoding is disabled so we cleanup old slots
+ * at some point after it got disabled.
  */
 void
-CheckPointSnapBuild(void)
+RemoveOldSerializedSnapshots(void)
 {
 	XLogRecPtr	cutoff;
 	XLogRecPtr	redo;
diff --git a/src/include/postmaster/custodian.h b/src/include/postmaster/custodian.h
index 73d0bc5f02..ab6d4283b9 100644
--- a/src/include/postmaster/custodian.h
+++ b/src/include/postmaster/custodian.h
@@ -18,7 +18,7 @@
  */
 typedef enum CustodianTask
 {
-	FAKE_TASK,						/* placeholder until we have a real task */
+	CUSTODIAN_REMOVE_SERIALIZED_SNAPSHOTS,
 
 	NUM_CUSTODIAN_TASKS,			/* new tasks go above */
 	INVALID_CUSTODIAN_TASK
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index f49b941b53..5f1ba3842c 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -57,7 +57,7 @@ struct ReorderBuffer;
 struct xl_heap_new_cid;
 struct xl_running_xacts;
 
-extern void CheckPointSnapBuild(void);
+extern void RemoveOldSerializedSnapshots(void);
 
 extern SnapBuild *AllocateSnapshotBuilder(struct ReorderBuffer *reorder,
 										  TransactionId xmin_horizon, XLogRecPtr start_lsn,
-- 
2.25.1

