From 148c7e9c9802ebcac5d159adf01341a2924bd889 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=C3=81lvaro=20Herrera?= <alvherre@kurilemu.de>
Date: Mon, 8 Jun 2026 20:57:29 +0200
Subject: [PATCH v2] Disallow direct use of the pgrepack plugin

Nothing can be gained from using pgrepack outside of REPACK
(CONCURRENTLY), and dragons may be there.  Reject it.

Reported-by: Nikita Kalinin
Discussion: https://postgr.es/m/19500-38a02529a69353a5@postgresql.org
---
 contrib/test_decoding/expected/repack.out   |  11 +++
 contrib/test_decoding/sql/repack.sql        |   7 ++
 src/backend/commands/repack_worker.c        | 100 ++++++++------------
 src/backend/replication/pgrepack/pgrepack.c |  20 +++-
 src/include/commands/repack_internal.h      |   2 -
 5 files changed, 79 insertions(+), 61 deletions(-)

diff --git a/contrib/test_decoding/expected/repack.out b/contrib/test_decoding/expected/repack.out
index 6204e620b43..c4ff41be690 100644
--- a/contrib/test_decoding/expected/repack.out
+++ b/contrib/test_decoding/expected/repack.out
@@ -101,3 +101,14 @@ DETAIL:  REPACK (CONCURRENTLY) does not support deferrable primary keys.
 HINT:  Use ALTER TABLE ... REPLICA IDENTITY USING INDEX to designate another index as replica identity.
 -- clean up
 DROP TABLE repack_conc_replident, clstrpart;
+-- verify that the pgrepack plugin cannot be called directly
+CREATE TABLE repack_plugin (a int);
+SELECT * FROM pg_create_logical_replication_slot('s_repack', 'pgrepack');
+ERROR:  unsupported use of logical decoding plugin "pgrepack"
+DETAIL:  This plugin can only be used by REPACK (CONCURRENTLY).
+CONTEXT:  slot "s_repack", output plugin "pgrepack", in the startup callback
+INSERT INTO repack_plugin VALUES (1);
+SELECT * FROM pg_logical_slot_get_binary_changes('s_repack', NULL, NULL);
+ERROR:  replication slot "s_repack" does not exist
+SELECT pg_drop_replication_slot('s_repack');
+ERROR:  replication slot "s_repack" does not exist
diff --git a/contrib/test_decoding/sql/repack.sql b/contrib/test_decoding/sql/repack.sql
index cea3bd33689..f461f5479f4 100644
--- a/contrib/test_decoding/sql/repack.sql
+++ b/contrib/test_decoding/sql/repack.sql
@@ -75,3 +75,10 @@ REPACK (CONCURRENTLY) repack_conc_replident;
 
 -- clean up
 DROP TABLE repack_conc_replident, clstrpart;
+
+-- verify that the pgrepack plugin cannot be called directly
+CREATE TABLE repack_plugin (a int);
+SELECT * FROM pg_create_logical_replication_slot('s_repack', 'pgrepack');
+INSERT INTO repack_plugin VALUES (1);
+SELECT * FROM pg_logical_slot_get_binary_changes('s_repack', NULL, NULL);
+SELECT pg_drop_replication_slot('s_repack');
diff --git a/src/backend/commands/repack_worker.c b/src/backend/commands/repack_worker.c
index b6b7b604b4f..3a8ddff9124 100644
--- a/src/backend/commands/repack_worker.c
+++ b/src/backend/commands/repack_worker.c
@@ -197,7 +197,7 @@ repack_setup_logical_decoding(Oid relid)
 	Relation	rel;
 	Oid			toastrelid;
 	LogicalDecodingContext *ctx;
-	NameData	slotname;
+	char		slotname[NAMEDATALEN];
 	RepackDecodingState *dstate;
 	MemoryContext oldcxt;
 
@@ -207,43 +207,26 @@ repack_setup_logical_decoding(Oid relid)
 	 */
 	Assert(!TransactionIdIsValid(GetTopTransactionIdIfAny()));
 
-	/*
-	 * Make sure we can use logical decoding.
-	 */
+	/* Make sure we can use logical decoding */
 	CheckLogicalDecodingRequirements(true);
 
 	/*
-	 * A single backend should not execute multiple REPACK commands at a time,
-	 * so use PID to make the slot unique.
+	 * Create the replication slot we'll use, and enable logical decoding in
+	 * case it isn't already on.
 	 *
-	 * RS_TEMPORARY so that the slot gets cleaned up on ERROR.
+	 * Make it RS_TEMPORARY so that it's removed on ERROR.  A single backend
+	 * should not execute multiple REPACK commands at a time, so use the PID
+	 * to make the slot name unique.
 	 */
-	snprintf(NameStr(slotname), NAMEDATALEN, "repack_%d", MyProcPid);
-	ReplicationSlotCreate(NameStr(slotname), true, RS_TEMPORARY, false, true,
+	snprintf(slotname, NAMEDATALEN, "repack_%d", MyProcPid);
+	ReplicationSlotCreate(slotname, true, RS_TEMPORARY, false, true,
 						  false, false);
-
 	EnsureLogicalDecodingEnabled();
 
 	/*
-	 * Neither prepare_write nor do_write callback nor update_progress is
-	 * useful for us.
+	 * Setup repacked_rel_locator and repacked_rel_toast_locator, which we use
+	 * to skip decoding of unrelated relations.
 	 */
-	ctx = CreateInitDecodingContext(REPL_PLUGIN_NAME,
-									NIL,
-									true,
-									true,
-									InvalidXLogRecPtr,
-									XL_ROUTINE(.page_read = read_local_xlog_page,
-											   .segment_open = wal_segment_open,
-											   .segment_close = wal_segment_close),
-									NULL, NULL, NULL);
-
-	/*
-	 * We don't have control on setting fast_forward, so at least check it.
-	 */
-	Assert(!ctx->fast_forward);
-
-	/* Avoid logical decoding of other relations. */
 	rel = table_open(relid, AccessShareLock);
 	repacked_rel_locator = rel->rd_locator;
 	toastrelid = rel->rd_rel->reltoastrelid;
@@ -258,16 +241,38 @@ repack_setup_logical_decoding(Oid relid)
 	}
 	table_close(rel, AccessShareLock);
 
-	DecodingContextFindStartpoint(ctx);
-
 	/*
-	 * decode_concurrent_changes() needs non-blocking callback.
+	 * Set up our logical decoding context.  We initially use the blocking
+	 * read_local_xlog_page until we have found the start point, and switch to
+	 * the non-blocking one after.
 	 */
-	ctx->reader->routine.page_read = read_local_xlog_page_no_wait;
+	ctx = CreateInitDecodingContext(REPL_PLUGIN_NAME,
+									NIL,
+									true,
+									true,
+									InvalidXLogRecPtr,
+									XL_ROUTINE(.page_read = read_local_xlog_page,
+											   .segment_open = wal_segment_open,
+											   .segment_close = wal_segment_close),
+									NULL, NULL, NULL);
+
+	/* Complete setup of output_writer_private */
+	dstate = (RepackDecodingState *) ctx->output_writer_private;
+	dstate->relid = relid;
+	dstate->worker_cxt = CurrentMemoryContext;
+	dstate->worker_resowner = CurrentResourceOwner;
+
+	/* We don't have control on fast_forward, but verify it's sane */
+	Assert(!ctx->fast_forward);
+
+	DecodingContextFindStartpoint(ctx);
 
 	/* Some WAL records should have been read. */
 	Assert(XLogRecPtrIsValid(ctx->reader->EndRecPtr));
 
+	/* Switch to using non-blocking WAL reads now. */
+	ctx->reader->routine.page_read = read_local_xlog_page_no_wait;
+
 	/*
 	 * Initialize repack_current_segment so that we can notice WAL segment
 	 * boundaries.
@@ -275,36 +280,15 @@ repack_setup_logical_decoding(Oid relid)
 	XLByteToSeg(ctx->reader->EndRecPtr, repack_current_segment,
 				wal_segment_size);
 
-	/* Our private state belongs to the decoding context. */
+	/*
+	 * Set up our reader private state to let the page-read callback notify
+	 * when end-of-WAL has been reached.  This lives in the same context as
+	 * the logical decoding itself.
+	 */
 	oldcxt = MemoryContextSwitchTo(ctx->context);
-
-	/*
-	 * read_local_xlog_page_no_wait() needs to be able to indicate the end of
-	 * WAL.
-	 */
 	ctx->reader->private_data = palloc0_object(ReadLocalXLogPageNoWaitPrivate);
-	dstate = palloc0_object(RepackDecodingState);
 	MemoryContextSwitchTo(oldcxt);
 
-#ifdef	USE_ASSERT_CHECKING
-	dstate->relid = relid;
-#endif
-
-	dstate->change_cxt = AllocSetContextCreate(ctx->context,
-											   "REPACK - change",
-											   ALLOCSET_DEFAULT_SIZES);
-
-	/* The file will be set as soon as we have it opened. */
-	dstate->file = NULL;
-
-	/*
-	 * Memory context and resource owner for long-lived resources.
-	 */
-	dstate->worker_cxt = CurrentMemoryContext;
-	dstate->worker_resowner = CurrentResourceOwner;
-
-	ctx->output_writer_private = dstate;
-
 	return ctx;
 }
 
diff --git a/src/backend/replication/pgrepack/pgrepack.c b/src/backend/replication/pgrepack/pgrepack.c
index a2615ce54c1..5d0f28a65a9 100644
--- a/src/backend/replication/pgrepack/pgrepack.c
+++ b/src/backend/replication/pgrepack/pgrepack.c
@@ -13,6 +13,7 @@
 #include "postgres.h"
 
 #include "access/detoast.h"
+#include "commands/repack.h"
 #include "commands/repack_internal.h"
 #include "replication/snapbuild.h"
 #include "utils/memutils.h"
@@ -47,7 +48,24 @@ static void
 repack_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 			   bool is_init)
 {
-	ctx->output_plugin_private = NULL;
+	RepackDecodingState *dstate;
+
+	if (!AmRepackWorker())
+		ereport(ERROR,
+				errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				errmsg("unsupported use of logical decoding plugin \"%s\"",
+					   "pgrepack"),
+				errdetail("This plugin can only be used by %s.",
+						  "REPACK (CONCURRENTLY)"));
+
+	/* Set up our private state. */
+	Assert(CurrentMemoryContext == ctx->context);
+	dstate = palloc0_object(RepackDecodingState);
+	dstate->change_cxt = AllocSetContextCreate(ctx->context,
+											   "REPACK - change",
+											   ALLOCSET_DEFAULT_SIZES);
+	/* repack_setup_logical_decoding fills in the rest */
+	ctx->output_writer_private = dstate;
 
 	/* Probably unnecessary, as we don't use the SQL interface ... */
 	opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
diff --git a/src/include/commands/repack_internal.h b/src/include/commands/repack_internal.h
index 6a85cee8910..42111aa4ae3 100644
--- a/src/include/commands/repack_internal.h
+++ b/src/include/commands/repack_internal.h
@@ -39,10 +39,8 @@ typedef char ConcurrentChangeKind;
  */
 typedef struct RepackDecodingState
 {
-#ifdef	USE_ASSERT_CHECKING
 	/* The relation whose changes we're decoding. */
 	Oid			relid;
-#endif
 
 	/* Per-change memory context. */
 	MemoryContext change_cxt;
-- 
2.47.3

