From 546809304986e5cdcdee4ee41387c5edeb435fb2 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Mon, 8 Jun 2026 11:25:47 -0700
Subject: [PATCH v1] Add a hook for handling logical messages on subscribers.

Add a "message" subscription option (default off) that asks the
publisher's output plugin to stream messages, and dispatch each
received message to a new LogicalMessageHandle_hook.  When the option
is off or no hook is installed, behavior is unchanged.	This lets an
extension act on logical messages on the subscriber -- for instance to
carry deparsed DDL to a custom apply worker -- without modifying core
apply code.

Message handling honors transaction streaming and ALTER SUBSCRIPTION
... SKIP, as exercised by the new test_logicalmsg_hooks module.

XXX Bump catalog version for the new pg_subscription.submessage column.
---
 src/backend/catalog/pg_subscription.c         |   1 +
 src/backend/catalog/system_views.sql          |   2 +-
 src/backend/commands/subscriptioncmds.c       |  24 ++-
 .../libpqwalreceiver/libpqwalreceiver.c       |   4 +
 src/backend/replication/logical/proto.c       |  25 +++
 src/backend/replication/logical/worker.c      |  42 +++-
 src/bin/pg_dump/pg_dump.c                     |  16 +-
 src/bin/pg_dump/pg_dump.h                     |   1 +
 src/bin/psql/describe.c                       |   5 +-
 src/include/catalog/pg_subscription.h         |   5 +
 src/include/replication/logicalproto.h        |  13 ++
 src/include/replication/walreceiver.h         |   1 +
 src/include/replication/worker_internal.h     |   4 +
 src/test/modules/Makefile                     |   1 +
 src/test/modules/meson.build                  |   1 +
 .../modules/test_logicalmsg_hooks/.gitignore  |   4 +
 .../modules/test_logicalmsg_hooks/Makefile    |  20 ++
 .../modules/test_logicalmsg_hooks/meson.build |  28 +++
 .../test_logicalmsg_hooks/t/001_basic.pl      | 113 ++++++++++
 .../test_logicalmsg_hooks.c                   |  40 ++++
 src/test/regress/expected/subscription.out    | 202 ++++++++++--------
 src/test/regress/sql/subscription.sql         |  17 ++
 src/tools/pgindent/typedefs.list              |   1 +
 23 files changed, 469 insertions(+), 101 deletions(-)
 create mode 100644 src/test/modules/test_logicalmsg_hooks/.gitignore
 create mode 100644 src/test/modules/test_logicalmsg_hooks/Makefile
 create mode 100644 src/test/modules/test_logicalmsg_hooks/meson.build
 create mode 100644 src/test/modules/test_logicalmsg_hooks/t/001_basic.pl
 create mode 100644 src/test/modules/test_logicalmsg_hooks/test_logicalmsg_hooks.c

diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index b5cb301db88..f14649a245b 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -132,6 +132,7 @@ GetSubscription(Oid subid, bool missing_ok, bool conninfo_needed,
 	sub->retaindeadtuples = subform->subretaindeadtuples;
 	sub->maxretention = subform->submaxretention;
 	sub->retentionactive = subform->subretentionactive;
+	sub->message = subform->submessage;
 
 	if (conninfo_needed)
 	{
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 8f129baec90..6ebaa93b1ff 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1528,7 +1528,7 @@ GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
 			  subpasswordrequired, subrunasowner, subfailover,
               subretaindeadtuples, submaxretention, subretentionactive,
               subserver, subslotname, subsynccommit, subwalrcvtimeout,
-              subpublications, suborigin)
+              subpublications, suborigin, submessage)
     ON pg_subscription TO public;
 
 CREATE VIEW pg_stat_subscription_stats AS
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index ee06a726f42..5183ab01156 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -79,6 +79,7 @@
 #define SUBOPT_WAL_RECEIVER_TIMEOUT			0x00010000
 #define SUBOPT_LSN					0x00020000
 #define SUBOPT_ORIGIN				0x00040000
+#define SUBOPT_MESSAGE				0x00080000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -105,6 +106,7 @@ typedef struct SubOpts
 	bool		runasowner;
 	bool		failover;
 	bool		retaindeadtuples;
+	bool		message;
 	int32		maxretention;
 	char	   *origin;
 	XLogRecPtr	lsn;
@@ -192,6 +194,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->failover = false;
 	if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES))
 		opts->retaindeadtuples = false;
+	if (IsSet(supported_opts, SUBOPT_MESSAGE))
+		opts->message = false;
 	if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION))
 		opts->maxretention = 0;
 	if (IsSet(supported_opts, SUBOPT_ORIGIN))
@@ -348,6 +352,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES;
 			opts->retaindeadtuples = defGetBoolean(defel);
 		}
+		else if (IsSet(supported_opts, SUBOPT_MESSAGE) &&
+				 strcmp(defel->defname, "message") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_MESSAGE))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_MESSAGE;
+			opts->message = defGetBoolean(defel);
+		}
 		else if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION) &&
 				 strcmp(defel->defname, "max_retention_duration") == 0)
 		{
@@ -674,7 +687,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
 					  SUBOPT_RETAIN_DEAD_TUPLES |
 					  SUBOPT_MAX_RETENTION_DURATION |
-					  SUBOPT_WAL_RECEIVER_TIMEOUT | SUBOPT_ORIGIN);
+					  SUBOPT_WAL_RECEIVER_TIMEOUT | SUBOPT_ORIGIN | SUBOPT_MESSAGE);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -824,6 +837,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
 	values[Anum_pg_subscription_subretaindeadtuples - 1] =
 		BoolGetDatum(opts.retaindeadtuples);
+	values[Anum_pg_subscription_submessage - 1] = BoolGetDatum(opts.message);
 	values[Anum_pg_subscription_submaxretention - 1] =
 		Int32GetDatum(opts.maxretention);
 	values[Anum_pg_subscription_subretentionactive - 1] =
@@ -1499,7 +1513,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 							  SUBOPT_RETAIN_DEAD_TUPLES |
 							  SUBOPT_MAX_RETENTION_DURATION |
 							  SUBOPT_WAL_RECEIVER_TIMEOUT |
-							  SUBOPT_ORIGIN);
+							  SUBOPT_ORIGIN | SUBOPT_MESSAGE);
 			break;
 
 		case ALTER_SUBSCRIPTION_ENABLED:
@@ -1858,6 +1872,12 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					replaces[Anum_pg_subscription_subwalrcvtimeout - 1] = true;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_MESSAGE))
+				{
+					values[Anum_pg_subscription_submessage - 1] = BoolGetDatum(opts.message);
+					replaces[Anum_pg_subscription_submessage - 1] = true;
+				}
+
 				update_tuple = true;
 				break;
 			}
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 5376519fea5..490e0590970 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -614,6 +614,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 			appendQuotedLiteral(&cmd, options->proto.logical.origin);
 		}
 
+		if (options->proto.logical.messages &&
+			PQserverVersion(conn->streamConn) >= 140000)
+			appendStringInfo(&cmd, ", messages 'on'");
+
 		pubnames = options->proto.logical.publication_names;
 		pubnames_str = stringlist_to_identifierstr(pubnames);
 		appendStringInfoString(&cmd, ", publication_names ");
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 86ad97cd937..af8ee26ed0c 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -660,6 +660,31 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
 	pq_sendbytes(out, message, sz);
 }
 
+/*
+ * Read MESSAGE from stream.
+ */
+void
+logicalrep_read_message(StringInfo in, LogicalRepMessageData *msg_data)
+{
+	Size		len;
+	char	   *msg;
+
+	msg_data->flags = pq_getmsgint(in, 1);
+	msg_data->lsn = pq_getmsgint64(in);
+	msg_data->prefix = pstrdup(pq_getmsgstring(in));
+
+	/* read message length */
+	len = pq_getmsgint(in, 4);
+	msg_data->message_size = len;
+
+	/* and data */
+	msg = palloc(len + 1);
+	pq_copymsgbytes(in, msg, len);
+
+	msg[len] = '\0';
+	msg_data->message = msg;
+}
+
 /*
  * Write relation description to the output stream.
  */
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 7799266c614..3457ed7b379 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -549,6 +549,9 @@ typedef struct ApplySubXactData
 
 static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};
 
+/* Hook for plugins to get control in handling logical decoding messages */
+LogicalRepMessageHandle_hook_type LogicalRepMessageHandle_hook = NULL;
+
 static inline void subxact_filename(char *path, Oid subid, TransactionId xid);
 static inline void changes_filename(char *path, Oid subid, TransactionId xid);
 
@@ -1694,6 +1697,34 @@ apply_handle_origin(StringInfo s)
 				 errmsg_internal("ORIGIN message sent out of order")));
 }
 
+/*
+ * Handle MESSAGE message.
+ *
+ * Invoke a hook function if set.
+ */
+static void
+apply_handle_message(StringInfo s)
+{
+	LogicalRepMessageData msg;
+
+	if (!LogicalRepMessageHandle_hook)
+		return;
+
+	/*
+	 * Logical messages are handled only the (parallel) apply workers
+	 */
+	if (am_tablesync_worker() || am_sequencesync_worker())
+		return;
+
+	if (is_skipping_changes() ||
+		handle_streamed_transaction(LOGICAL_REP_MSG_MESSAGE, s))
+		return;
+
+	logicalrep_read_message(s, &msg);
+
+	(*LogicalRepMessageHandle_hook) (&msg);
+}
+
 /*
  * Initialize fileset (if not already done).
  *
@@ -3846,12 +3877,7 @@ apply_dispatch(StringInfo s)
 			break;
 
 		case LOGICAL_REP_MSG_MESSAGE:
-
-			/*
-			 * Logical replication does not use generic logical messages yet.
-			 * Although, it could be used by other applications that use this
-			 * output plugin.
-			 */
+			apply_handle_message(s);
 			break;
 
 		case LOGICAL_REP_MSG_STREAM_START:
@@ -5128,7 +5154,8 @@ maybe_reread_subscription(void)
 		newsub->passwordrequired != MySubscription->passwordrequired ||
 		strcmp(newsub->origin, MySubscription->origin) != 0 ||
 		newsub->owner != MySubscription->owner ||
-		!equal(newsub->publications, MySubscription->publications))
+		!equal(newsub->publications, MySubscription->publications) ||
+		newsub->message != MySubscription->message)
 	{
 		if (am_parallel_apply_worker())
 			ereport(LOG,
@@ -5614,6 +5641,7 @@ set_stream_options(WalRcvStreamOptions *options,
 
 	options->proto.logical.twophase = false;
 	options->proto.logical.origin = pstrdup(MySubscription->origin);
+	options->proto.logical.messages = MySubscription->message;
 }
 
 /*
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index c56437d6057..4441b14082c 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -5184,6 +5184,7 @@ getSubscriptions(Archive *fout)
 	int			i_subfailover;
 	int			i_subretaindeadtuples;
 	int			i_submaxretention;
+	int			i_submessage;
 	int			i,
 				ntups;
 
@@ -5282,9 +5283,14 @@ getSubscriptions(Archive *fout)
 							 " '-1' AS subwalrcvtimeout,\n");
 
 	if (fout->remoteVersion >= 190000)
-		appendPQExpBufferStr(query, " fs.srvname AS subservername\n");
+		appendPQExpBufferStr(query, " fs.srvname AS subservername,\n");
 	else
-		appendPQExpBufferStr(query, " NULL AS subservername\n");
+		appendPQExpBufferStr(query, " NULL AS subservername,\n");
+
+	if (fout->remoteVersion >= 190000)
+		appendPQExpBufferStr(query, " s.submessage\n");
+	else
+		appendPQExpBufferStr(query, "false AS submessage\n");
 
 	appendPQExpBufferStr(query,
 						 "FROM pg_subscription s\n");
@@ -5333,6 +5339,7 @@ getSubscriptions(Archive *fout)
 	i_subpublications = PQfnumber(res, "subpublications");
 	i_suborigin = PQfnumber(res, "suborigin");
 	i_suboriginremotelsn = PQfnumber(res, "suboriginremotelsn");
+	i_submessage = PQfnumber(res, "submessage");
 
 	subinfo = pg_malloc_array(SubscriptionInfo, ntups);
 
@@ -5390,6 +5397,8 @@ getSubscriptions(Archive *fout)
 		else
 			subinfo[i].suboriginremotelsn =
 				pg_strdup(PQgetvalue(res, i, i_suboriginremotelsn));
+		subinfo[i].submessage =
+			(strcmp(PQgetvalue(res, i, i_submessage), "t") == 0);
 
 		/* Decide whether we want to dump it */
 		selectDumpableObject(&(subinfo[i].dobj), fout);
@@ -5642,6 +5651,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	if (subinfo->subretaindeadtuples)
 		appendPQExpBufferStr(query, ", retain_dead_tuples = true");
 
+	if (subinfo->submessage)
+		appendPQExpBufferStr(query, ", message = true");
+
 	if (subinfo->submaxretention)
 		appendPQExpBuffer(query, ", max_retention_duration = %d", subinfo->submaxretention);
 
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 5a6726d8b12..30bd1575a26 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -721,6 +721,7 @@ typedef struct _SubscriptionInfo
 	bool		subrunasowner;
 	bool		subfailover;
 	bool		subretaindeadtuples;
+	bool		submessage;
 	int			submaxretention;
 	char	   *subservername;
 	char	   *subconninfo;
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index af3935b0078..ac5a2c1835b 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -7093,7 +7093,7 @@ describeSubscriptions(const char *pattern, bool verbose)
 	printQueryOpt myopt = pset.popt;
 	static const bool translate_columns[] = {false, false, false, false,
 		false, false, false, false, false, false, false, false, false, false,
-	false, false, false, false, false, false, false};
+	false, false, false, false, false, false, false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -7179,6 +7179,9 @@ describeSubscriptions(const char *pattern, bool verbose)
 			appendPQExpBuffer(&buf,
 							  ", subretentionactive AS \"%s\"\n",
 							  gettext_noop("Retention active"));
+			appendPQExpBuffer(&buf,
+							  ", submessage AS \"%s\"\n",
+							  gettext_noop("Message"));
 		}
 
 		appendPQExpBuffer(&buf,
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 48944201889..cdda8265573 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -83,6 +83,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 	bool		subretaindeadtuples;	/* True if dead tuples useful for
 										 * conflict detection are retained */
 
+	bool		submessage;		/* True if the subscription wants to receive
+								 * logical messages. */
+
 	int32		submaxretention;	/* The maximum duration (in milliseconds)
 									 * for which information useful for
 									 * conflict detection can be retained */
@@ -171,6 +174,8 @@ typedef struct Subscription
 	List	   *publications;	/* List of publication names to subscribe to */
 	char	   *origin;			/* Only publish data originating from the
 								 * specified origin */
+	bool		message;		/* True if the subscription wants to receive
+								 * logical messages */
 } Subscription;
 
 #ifdef EXPOSE_TO_CLIENT_CODE
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 058a955e20c..85add073202 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -191,6 +191,18 @@ typedef struct LogicalRepStreamAbortData
 	TimestampTz abort_time;
 } LogicalRepStreamAbortData;
 
+/*
+ * Logical decoding message information
+ */
+typedef struct LogicalRepMessageData
+{
+	uint8		flags;
+	XLogRecPtr	lsn;
+	const char *prefix;
+	Size		message_size;	/* length of message in bytes */
+	const char *message;		/* payload; may contain embedded nulls */
+} LogicalRepMessageData;
+
 extern void logicalrep_write_begin(StringInfo out, ReorderBufferTXN *txn);
 extern void logicalrep_read_begin(StringInfo in,
 								  LogicalRepBeginData *begin_data);
@@ -248,6 +260,7 @@ extern List *logicalrep_read_truncate(StringInfo in,
 									  bool *cascade, bool *restart_seqs);
 extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
 									 bool transactional, const char *prefix, Size sz, const char *message);
+extern void logicalrep_read_message(StringInfo in, LogicalRepMessageData *msg_data);
 extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
 								 Relation rel, Bitmapset *columns,
 								 PublishGencolsType include_gencols_type);
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 47c07574d4d..44a08cc2170 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -188,6 +188,7 @@ typedef struct
 									 * prepare time */
 			char	   *origin; /* Only publish data originating from the
 								 * specified origin */
+			bool		messages;	/* Logical message */
 		}			logical;
 	}			proto;
 } WalRcvStreamOptions;
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 745b7d9e969..b07042846cb 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -18,6 +18,7 @@
 #include "miscadmin.h"
 #include "replication/logicalrelation.h"
 #include "replication/walreceiver.h"
+#include "replication/logicalproto.h"
 #include "storage/buffile.h"
 #include "storage/fileset.h"
 #include "storage/shm_mq.h"
@@ -255,6 +256,9 @@ extern PGDLLIMPORT bool InitializingApplyWorker;
 
 extern PGDLLIMPORT List *table_states_not_ready;
 
+typedef void (*LogicalRepMessageHandle_hook_type) (LogicalRepMessageData *msg);
+extern PGDLLIMPORT LogicalRepMessageHandle_hook_type LogicalRepMessageHandle_hook;
+
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(LogicalRepWorkerType wtype,
 												Oid subid, Oid relid,
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 0a74ab5c86f..603fe1ff907 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -36,6 +36,7 @@ SUBDIRS = \
 		  test_integerset \
 		  test_json_parser \
 		  test_lfind \
+		  test_logicalmsg_hooks \
 		  test_lwlock_tranches \
 		  test_misc \
 		  test_oat_hooks \
diff --git a/src/test/modules/meson.build b/src/test/modules/meson.build
index 4bca42bb370..5f6c0cae77a 100644
--- a/src/test/modules/meson.build
+++ b/src/test/modules/meson.build
@@ -37,6 +37,7 @@ subdir('test_int128')
 subdir('test_integerset')
 subdir('test_json_parser')
 subdir('test_lfind')
+subdir('test_logicalmsg_hooks')
 subdir('test_lwlock_tranches')
 subdir('test_misc')
 subdir('test_oat_hooks')
diff --git a/src/test/modules/test_logicalmsg_hooks/.gitignore b/src/test/modules/test_logicalmsg_hooks/.gitignore
new file mode 100644
index 00000000000..5dcb3ff9723
--- /dev/null
+++ b/src/test/modules/test_logicalmsg_hooks/.gitignore
@@ -0,0 +1,4 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
diff --git a/src/test/modules/test_logicalmsg_hooks/Makefile b/src/test/modules/test_logicalmsg_hooks/Makefile
new file mode 100644
index 00000000000..af129009b8a
--- /dev/null
+++ b/src/test/modules/test_logicalmsg_hooks/Makefile
@@ -0,0 +1,20 @@
+# src/test/modules/test_logicalmsg_hooks/Makefile
+
+MODULE_big = test_logicalmsg_hooks
+OBJS = \
+	$(WIN32RES) \
+	test_logicalmsg_hooks.o
+PGFILEDESC = "test_logicalmsg_hooks - test logical replication message hooks"
+
+TAP_TESTS = 1
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_logicalmsg_hooks
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_logicalmsg_hooks/meson.build b/src/test/modules/test_logicalmsg_hooks/meson.build
new file mode 100644
index 00000000000..8084d82e47b
--- /dev/null
+++ b/src/test/modules/test_logicalmsg_hooks/meson.build
@@ -0,0 +1,28 @@
+# Copyright (c) 2022-2026, PostgreSQL Global Development Group
+
+test_logicalmsg_hooks_sources = files(
+  'test_logicalmsg_hooks.c',
+)
+
+if host_system == 'windows'
+  test_logicalmsg_hooks_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
+    '--NAME', 'test_logicalmsg_hooks',
+    '--FILEDESC', 'test_logicalmsg_hooks - test logical message hooks',])
+endif
+
+test_logicalmsg_hooks = shared_module('test_logicalmsg_hooks',
+  test_logicalmsg_hooks_sources,
+  kwargs: pg_test_mod_args,
+)
+test_install_libs += test_logicalmsg_hooks
+
+tests += {
+  'name': 'test_logicalmsg_hooks',
+  'sd': meson.current_source_dir(),
+  'bd': meson.current_build_dir(),
+  'tap': {
+    'tests': [
+      't/001_basic.pl',
+    ],
+  },
+}
diff --git a/src/test/modules/test_logicalmsg_hooks/t/001_basic.pl b/src/test/modules/test_logicalmsg_hooks/t/001_basic.pl
new file mode 100644
index 00000000000..64ca51d1786
--- /dev/null
+++ b/src/test/modules/test_logicalmsg_hooks/t/001_basic.pl
@@ -0,0 +1,113 @@
+# Copyright (c) 2021-2026, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $pub = PostgreSQL::Test::Cluster->new('publisher');
+$pub->init(allows_streaming => 'logical');
+$pub->start;
+
+my $sub = PostgreSQL::Test::Cluster->new('subscriber');
+$sub->init(allows_streaming => 'logical');
+$sub->append_conf(
+	'postgresql.conf', q{
+shared_preload_libraries = 'test_logicalmsg_hooks'
+});
+$sub->start;
+
+$pub->safe_psql(
+	'postgres',
+	qq{
+CREATE TABLE test (a int);
+INSERT INTO test VALUES (1);
+CREATE PUBLICATION pub FOR ALL TABLES;
+});
+
+my $pub_connstr = $pub->connstr . ' dbname=postgres';
+
+$sub->safe_psql(
+	'postgres',
+	qq{
+CREATE TABLE test (a int primary key);
+CREATE SUBSCRIPTION sub CONNECTION '$pub_connstr' PUBLICATION pub WITH (message = true, disable_on_error = true)
+});
+
+$sub->wait_for_subscription_sync($pub, 'sub');
+
+my $log_location = -s $sub->logfile;
+
+# Emit a logical message and verify that the subscriber processed it in the
+# hook function.
+$pub->safe_psql(
+	'postgres',
+	qq{
+SELECT pg_logical_emit_message(true, 'test_prefix', 'test_mesasge');
+});
+$pub->wait_for_catchup('sub');
+$sub->wait_for_log(
+	qr/LOG[^\n]+received message: LSN [[:xdigit:]]+\/[[:xdigit:]]+, prefix: test_prefix, message: test_mesasge/,
+	$log_location);
+ok(1, "logical message is correctly handled on the subscriber");
+
+$log_location = -s $sub->logfile;
+
+# INSERT a tuple with a=1 conflicts on the subscriber. Test that ALTER SUBSCRIPTION ... SKIP
+# the transacitonal message as well.
+$pub->safe_psql(
+	'postgres',
+	q{
+BEGIN;
+INSERT INTO test VALUES (1);
+SELECT pg_logical_emit_message(true, 'conflict', 'should be skipped');
+COMMIT;
+});
+
+# Wait until a conflict occurs on the subscriber.
+$sub->poll_query_until('postgres',
+	q{SELECT subenabled = FALSE FROM pg_subscription WHERE subname = 'sub'});
+
+# Get the finish LSN of the error transaction.
+my $contents = slurp_file($sub->logfile, $log_location);
+$contents =~
+  qr/conflict detected on relation "public.test".*\n.*DETAIL:.*Could not apply remote change.*\n.*Key already exists in unique index "test_pkey", modified in transaction \d+: key .*, local row .*\n.*CONTEXT:.* for replication target relation "public.test" in transaction \d+, finished at ([[:xdigit:]]+\/[[:xdigit:]]+)/m
+  or die "could not get error-LSN";
+my $lsn = $1;
+
+$log_location = -s $sub->logfile;
+
+# Set skip LSN and re-enable the subscription.
+$sub->safe_psql('postgres', qq{ALTER SUBSCRIPTION sub SKIP (lsn = '$lsn');});
+$sub->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
+
+# Wait for the failed transaction to be skipped
+$sub->poll_query_until('postgres',
+	q{SELECT subskiplsn = '0/0' FROM pg_subscription WHERE subname = 'sub'});
+
+ok( !$sub->log_contains(
+		qr/LOG[^\n]+received message: LSN .*, prefix: conflict, message: should be skipped/
+	),
+	"logical message is skipped by ALTER SUBSCRIPTION SKIP");
+
+# Test that a non-transactional message is applied even if the transaction rolls back.
+$log_location = -s $sub->logfile;
+$pub->safe_psql(
+	'postgres',
+	q{
+BEGIN;
+SELECT pg_logical_emit_message(false, 'test_prefix', 'non-transactional message');
+ROLLBACK;
+});
+$pub->wait_for_catchup('sub');
+$sub->wait_for_log(
+	qr/LOG[^\n]+received message: LSN [[:xdigit:]]+\/[[:xdigit:]]+, prefix: test_prefix, message: non-transactional message/,
+	$log_location);
+ok(1, "logical message is correctly handled on the subscriber");
+
+
+$pub->stop;
+$sub->stop;
+done_testing();
diff --git a/src/test/modules/test_logicalmsg_hooks/test_logicalmsg_hooks.c b/src/test/modules/test_logicalmsg_hooks/test_logicalmsg_hooks.c
new file mode 100644
index 00000000000..9167043508e
--- /dev/null
+++ b/src/test/modules/test_logicalmsg_hooks/test_logicalmsg_hooks.c
@@ -0,0 +1,40 @@
+/*--------------------------------------------------------------------------
+ *
+ * test_logicalmsg_hooks.c
+ *		Code for testing LogicalMessageHandle_hook
+ *
+ * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *		src/test/modules/test_logicalmsg_hooks/test_logicalmsg_hooks.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "replication/worker_internal.h"
+
+PG_MODULE_MAGIC;
+
+static void test_logical_message_handler(LogicalRepMessageData *msg);
+
+/*
+ * Module load callback
+ */
+void
+_PG_init(void)
+{
+	LogicalRepMessageHandle_hook = &test_logical_message_handler;
+}
+
+void
+test_logical_message_handler(LogicalRepMessageData *msg)
+{
+	ereport(LOG,
+			(errmsg("received message: LSN %X/%08X, prefix: %s, message: %s",
+					LSN_FORMAT_ARGS(msg->lsn),
+					msg->prefix,
+					msg->message)));
+}
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 8dbfac66326..29c2fac6504 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -139,18 +139,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
 \dRs+ regress_testsub4
-                                                                                                                                                                       List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | none   | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                            List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | none   | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
 \dRs+ regress_testsub4
-                                                                                                                                                                       List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                            List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub3;
@@ -237,10 +237,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                                                                                                                                          List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  |    Description    
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | test subscription
+                                                                                                                                                                               List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  |    Description    
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | test subscription
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -249,10 +249,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname');
 ALTER SUBSCRIPTION regress_testsub SET (password_required = false);
 ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true);
 \dRs+
-                                                                                                                                                                              List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |           Conninfo           | Receiver timeout |  Skip LSN  |    Description    
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | f                 | t             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist2 | -1               | 0/00000000 | test subscription
+                                                                                                                                                                                   List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |           Conninfo           | Receiver timeout |  Skip LSN  |    Description    
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+------------------------------+------------------+------------+-------------------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | f                 | t             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist2 | -1               | 0/00000000 | test subscription
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (password_required = true);
@@ -268,10 +268,10 @@ ERROR:  unrecognized subscription parameter: "create_slot"
 -- ok
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
 \dRs+
-                                                                                                                                                                              List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |           Conninfo           | Receiver timeout |  Skip LSN  |    Description    
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist2 | -1               | 0/00012345 | test subscription
+                                                                                                                                                                                   List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |           Conninfo           | Receiver timeout |  Skip LSN  |    Description    
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+------------------------------+------------------+------------+-------------------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist2 | -1               | 0/00012345 | test subscription
 (1 row)
 
 -- ok - with lsn = NONE
@@ -280,10 +280,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
 ERROR:  invalid WAL location (LSN): 0/0
 \dRs+
-                                                                                                                                                                              List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |           Conninfo           | Receiver timeout |  Skip LSN  |    Description    
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist2 | -1               | 0/00000000 | test subscription
+                                                                                                                                                                                   List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |           Conninfo           | Receiver timeout |  Skip LSN  |    Description    
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+------------------------------+------------------+------------+-------------------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist2 | -1               | 0/00000000 | test subscription
 (1 row)
 
 BEGIN;
@@ -319,10 +319,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (wal_receiver_timeout = '80s');
 ALTER SUBSCRIPTION regress_testsub_foo SET (wal_receiver_timeout = 'foobar');
 ERROR:  invalid value for parameter "wal_receiver_timeout": "foobar"
 \dRs+
-                                                                                                                                                                                List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |           Conninfo           | Receiver timeout |  Skip LSN  |    Description    
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | local              | dbname=regress_doesnotexist2 | 80s              | 0/00000000 | test subscription
+                                                                                                                                                                                     List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |           Conninfo           | Receiver timeout |  Skip LSN  |    Description    
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+------------------------------+------------------+------------+-------------------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | local              | dbname=regress_doesnotexist2 | 80s              | 0/00000000 | test subscription
 (1 row)
 
 -- rename back to keep the rest simple
@@ -351,19 +351,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
 \dRs+
-                                                                                                                                                                       List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                                                                                                       List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -375,27 +375,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
 \dRs+
-                                                                                                                                                                       List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
 \dRs+
-                                                                                                                                                                       List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                                                                                                       List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 -- fail - publication already exists
@@ -410,10 +410,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
 ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
 ERROR:  publication "testpub1" is already in subscription "regress_testsub"
 \dRs+
-                                                                                                                                                                               List of subscriptions
-      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 -- fail - publication used more than once
@@ -428,10 +428,10 @@ ERROR:  publication "testpub3" is not in subscription "regress_testsub"
 -- ok - delete publications
 ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
 \dRs+
-                                                                                                                                                                       List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -467,19 +467,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
 \dRs+
-                                                                                                                                                                       List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | p                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | p                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 -- we can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
-                                                                                                                                                                       List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -489,10 +489,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
 \dRs+
-                                                                                                                                                                       List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -505,18 +505,18 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
 \dRs+
-                                                                                                                                                                       List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 \dRs+
-                                                                                                                                                                       List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | t                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | t                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -529,10 +529,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
 \dRs+
-                                                                                                                                                                       List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -549,10 +549,10 @@ NOTICE:  max_retention_duration is ineffective when retain_dead_tuples is disabl
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
 \dRs+
-                                                                                                                                                                       List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                   1000 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                   1000 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 -- fail - max_retention_duration must be non-negative
@@ -561,12 +561,38 @@ ERROR:  max_retention_duration cannot be negative
 -- ok
 ALTER SUBSCRIPTION regress_testsub SET (max_retention_duration = 0);
 \dRs+
-                                                                                                                                                                       List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+-- fail - message must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, message = foo);
+ERROR:  message requires a Boolean value
+-- ok
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, message = true);
+WARNING:  subscription was created, but is not connected
+HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
+\dRs+
+                                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | t       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+(1 row)
+
+-- ok
+ALTER SUBSCRIPTION regress_testsub SET (message = false);
+\dRs+
+                                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Message | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+---------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | f       | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
+-- cleanup
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 -- let's do some tests with pg_create_subscription rather than superuser
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 05533d66675..7eef3721a9a 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -423,6 +423,23 @@ ALTER SUBSCRIPTION regress_testsub SET (max_retention_duration = 0);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 
+-- fail - message must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, message = foo);
+
+-- ok
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, message = true);
+
+\dRs+
+
+-- ok
+ALTER SUBSCRIPTION regress_testsub SET (message = false);
+
+\dRs+
+
+-- cleanup
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+
 -- let's do some tests with pg_create_subscription rather than superuser
 SET SESSION AUTHORIZATION regress_subscription_user3;
 
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 1969d467c1d..09c9b87d483 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1701,6 +1701,7 @@ LogicalRepBeginData
 LogicalRepCommitData
 LogicalRepCommitPreparedTxnData
 LogicalRepCtxStruct
+LogicalRepMessageData
 LogicalRepMsgType
 LogicalRepPartMapEntry
 LogicalRepPreparedTxnData
-- 
2.54.0

