From bb582d90242e32b34fe81d0fe8fc419460c28da9 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumarb@google.com>
Date: Wed, 12 Nov 2025 10:43:19 +0530
Subject: [PATCH v13 1/2] Add configurable conflict log table for Logical
 Replication

This patch adds a feature to provide a structured, queryable record of all
logical replication conflicts. The current approach of logging conflicts as
plain text in the server logs makes it difficult to query, analyze, and
use for external monitoring and automation.

This patch addresses these limitations by introducing a configurable
conflict_log_table option in the CREATE SUBSCRIPTION command. Key design
decisions include:

User-Managed Table: The conflict log is stored in a user-managed table
rather than a system catalog.

Structured Data: Conflict details, including the original and remote tuples,
are stored in JSON columns, providing a flexible format to accommodate different
table schemas.

Comprehensive Information: The log table captures essential attributes such as
local and remote transaction IDs, LSNs, commit timestamps, and conflict type,
providing a complete record for post-mortem analysis.

This feature will make logical replication conflicts easier to monitor and manage,
significantly improving the overall resilience and operability of replication setups.

The conflict log tables will not be included in a publication, even if the publication
is configured to include ALL TABLES or ALL TABLES IN SCHEMA.

Note: A single remote tuple may conflict with multiple local tuples when conflict type
is CT_MULTIPLE_UNIQUE_CONFLICTS, so for handling this case we create a single row in
conflict log table with respect to each remote conflict tuple even if it conflicts with
multiple local tuples and we store the multiple conflict tuples as a single JSON array
element in format as
[ { "xid": "1001", "commit_ts": "...", "origin": "...", "tuple": {...} }, ... ]
We can extract the elements of the local tuples from the conflict log table row
as given in below example.

SELECT remote_xid, relname, remote_origin, local_conflicts[1] ->> 'xid' AS local_xid,
       local_conflicts[1] ->> 'tuple' AS local_tuple
FROM myschema.conflict_log_history2;

 remote_xid | relname  | remote_origin | local_xid |     local_tuple
------------+----------+---------------+-----------+---------------------
        760 | test     | pg_16406      | 771       | {"a":1,"b":10}
        765 | conf_tab | pg_16406      | 775       | {"a":2,"b":2,"c":2}
---
 src/backend/catalog/pg_publication.c       |  26 +-
 src/backend/commands/subscriptioncmds.c    | 307 ++++++++++-
 src/backend/replication/logical/conflict.c | 569 ++++++++++++++++++++-
 src/backend/replication/logical/launcher.c |   1 +
 src/backend/replication/logical/worker.c   |  37 +-
 src/backend/utils/cache/lsyscache.c        |  38 ++
 src/bin/psql/describe.c                    |  24 +-
 src/bin/psql/tab-complete.in.c             |   8 +-
 src/include/catalog/pg_subscription.h      |   5 +
 src/include/commands/subscriptioncmds.h    |   2 +
 src/include/replication/conflict.h         |  32 ++
 src/include/replication/worker_internal.h  |   7 +
 src/include/utils/lsyscache.h              |   1 +
 src/test/regress/expected/subscription.out | 370 ++++++++++----
 src/test/regress/sql/subscription.sql      | 114 +++++
 15 files changed, 1413 insertions(+), 128 deletions(-)

diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 7aa3f179924..9f84e02b7ef 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -31,6 +31,7 @@
 #include "catalog/pg_publication_rel.h"
 #include "catalog/pg_type.h"
 #include "commands/publicationcmds.h"
+#include "commands/subscriptioncmds.h"
 #include "funcapi.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
@@ -85,6 +86,15 @@ check_publication_add_relation(Relation targetrel)
 				 errmsg("cannot add relation \"%s\" to publication",
 						RelationGetRelationName(targetrel)),
 				 errdetail("This operation is not supported for unlogged tables.")));
+
+	/* Can't be conflict log table */
+	if (IsConflictLogTable(RelationGetRelid(targetrel)))
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("cannot add relation \"%s.%s\" to publication",
+						get_namespace_name(RelationGetNamespace(targetrel)),
+						RelationGetRelationName(targetrel)),
+				 errdetail("This operation is not supported for conflict log tables.")));
 }
 
 /*
@@ -145,6 +155,13 @@ is_publishable_class(Oid relid, Form_pg_class reltuple)
 
 /*
  * Another variant of is_publishable_class(), taking a Relation.
+ *
+ * Note: Conflict log tables are not publishable.  However, we intentionally
+ * skip this check here because this function is called for every change and
+ * performing this check during every change publication is costly.  To ensure
+ * unpublishable entries are ignored without incurring performance overhead,
+ * tuples inserted into the conflict log table uses the HEAP_INSERT_NO_LOGICAL
+ * flag.  This allows the decoding layer to bypass these entries automatically.
  */
 bool
 is_publishable_relation(Relation rel)
@@ -169,7 +186,10 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS)
 	tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
 	if (!HeapTupleIsValid(tuple))
 		PG_RETURN_NULL();
-	result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple));
+
+	/* Subscription conflict log tables are not published */
+	result = is_publishable_class(relid, (Form_pg_class) GETSTRUCT(tuple)) &&
+			 !IsConflictLogTable(relid);
 	ReleaseSysCache(tuple);
 	PG_RETURN_BOOL(result);
 }
@@ -890,7 +910,9 @@ GetAllPublicationRelations(char relkind, bool pubviaroot)
 		Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple);
 		Oid			relid = relForm->oid;
 
+		/* Subscription conflict log tables are not published */
 		if (is_publishable_class(relid, relForm) &&
+			!IsConflictLogTable(relid) &&
 			!(relForm->relispartition && pubviaroot))
 			result = lappend_oid(result, relid);
 	}
@@ -1018,7 +1040,7 @@ GetSchemaPublicationRelations(Oid schemaid, PublicationPartOpt pub_partopt)
 		Oid			relid = relForm->oid;
 		char		relkind;
 
-		if (!is_publishable_class(relid, relForm))
+		if (!is_publishable_class(relid, relForm) || IsConflictLogTable(relid))
 			continue;
 
 		relkind = get_rel_relkind(relid);
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index abbcaff0838..b044ed70a2a 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -15,16 +15,19 @@
 #include "postgres.h"
 
 #include "access/commit_ts.h"
+#include "access/heapam.h"
 #include "access/htup_details.h"
 #include "access/table.h"
 #include "access/twophase.h"
 #include "access/xact.h"
 #include "catalog/catalog.h"
 #include "catalog/dependency.h"
+#include "catalog/heap.h"
 #include "catalog/indexing.h"
 #include "catalog/namespace.h"
 #include "catalog/objectaccess.h"
 #include "catalog/objectaddress.h"
+#include "catalog/pg_am_d.h"
 #include "catalog/pg_authid_d.h"
 #include "catalog/pg_database_d.h"
 #include "catalog/pg_subscription.h"
@@ -34,6 +37,7 @@
 #include "commands/event_trigger.h"
 #include "commands/subscriptioncmds.h"
 #include "executor/executor.h"
+#include "funcapi.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
@@ -51,6 +55,7 @@
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/pg_lsn.h"
+#include "utils/regproc.h"
 #include "utils/syscache.h"
 
 /*
@@ -75,6 +80,7 @@
 #define SUBOPT_MAX_RETENTION_DURATION	0x00008000
 #define SUBOPT_LSN					0x00010000
 #define SUBOPT_ORIGIN				0x00020000
+#define SUBOPT_CONFLICT_LOG_TABLE	0x00040000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -103,6 +109,7 @@ typedef struct SubOpts
 	bool		retaindeadtuples;
 	int32		maxretention;
 	char	   *origin;
+	char	   *conflictlogtable;
 	XLogRecPtr	lsn;
 } SubOpts;
 
@@ -135,7 +142,8 @@ static List *merge_publications(List *oldpublist, List *newpublist, bool addpub,
 static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
 static void CheckAlterSubOption(Subscription *sub, const char *option,
 								bool slot_needs_update, bool isTopLevel);
-
+static void create_conflict_log_table(Oid namespaceId, char *conflictrel,
+									  Oid subid);
 
 /*
  * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands.
@@ -191,6 +199,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->maxretention = 0;
 	if (IsSet(supported_opts, SUBOPT_ORIGIN))
 		opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
+	if (IsSet(supported_opts, SUBOPT_CONFLICT_LOG_TABLE))
+		opts->conflictlogtable = NULL;
 
 	/* Parse options */
 	foreach(lc, stmt_options)
@@ -402,6 +412,19 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_LSN;
 			opts->lsn = lsn;
 		}
+		else if (IsSet(supported_opts, SUBOPT_CONFLICT_LOG_TABLE) &&
+				 strcmp(defel->defname, "conflict_log_table") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_CONFLICT_LOG_TABLE))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_CONFLICT_LOG_TABLE;
+			opts->conflictlogtable = defGetString(defel);
+
+			/* Setting conflict_log_table = NONE is treated as no table. */
+			if (strcmp(opts->conflictlogtable, "none") == 0)
+				opts->conflictlogtable = NULL;
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -599,6 +622,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	bits32		supported_opts;
 	SubOpts		opts = {0};
 	AclResult	aclresult;
+	Oid			conflictlogtable_nspid = InvalidOid;
+	char	   *conflictlogtable = NULL;
 
 	/*
 	 * Parse and check options.
@@ -612,7 +637,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 					  SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
 					  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
 					  SUBOPT_RETAIN_DEAD_TUPLES |
-					  SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN);
+					  SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN |
+					  SUBOPT_CONFLICT_LOG_TABLE);
 	parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
 	/*
@@ -747,6 +773,34 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	values[Anum_pg_subscription_suborigin - 1] =
 		CStringGetTextDatum(opts.origin);
 
+	/*
+	 * If a conflict log table name is specified, parse the schema and table
+	 * name from the string. Store the namespace OID and the table name in
+	 * the pg_subscription catalog tuple.
+	 */
+	if (opts.conflictlogtable)
+	{
+		List   *names;
+
+		/* Explicitly check for empty string before any processing. */
+		if (opts.conflictlogtable[0] == '\0')
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("conflict log table name cannot be empty"),
+					 errhint("Provide a valid table name or omit the parameter.")));
+
+		names = stringToQualifiedNameList(opts.conflictlogtable, NULL);
+
+		conflictlogtable_nspid =
+				QualifiedNameGetCreationNamespace(names, &conflictlogtable);
+		values[Anum_pg_subscription_subconflictlognspid - 1] =
+					ObjectIdGetDatum(conflictlogtable_nspid);
+		values[Anum_pg_subscription_subconflictlogtable - 1] =
+					CStringGetTextDatum(conflictlogtable);
+	}
+	else
+		nulls[Anum_pg_subscription_subconflictlogtable - 1] = true;
+
 	tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
 
 	/* Insert tuple into catalog. */
@@ -768,6 +822,11 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
 	replorigin_create(originname);
 
+	/* If a conflict log table name is given then create the table. */
+	if (opts.conflictlogtable)
+		create_conflict_log_table(conflictlogtable_nspid, conflictlogtable,
+								  subid);
+
 	/*
 	 * Connect to remote side to execute requested commands and fetch table
 	 * and sequence info.
@@ -1410,7 +1469,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 								  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
 								  SUBOPT_RETAIN_DEAD_TUPLES |
 								  SUBOPT_MAX_RETENTION_DURATION |
-								  SUBOPT_ORIGIN);
+								  SUBOPT_ORIGIN |
+								  SUBOPT_CONFLICT_LOG_TABLE);
 
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
@@ -1665,6 +1725,96 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					origin = opts.origin;
 				}
 
+				if (IsSet(opts.specified_opts, SUBOPT_CONFLICT_LOG_TABLE))
+				{
+					Oid		nspid = InvalidOid;
+					Oid     old_nspid = InvalidOid;
+					char   *old_relname = NULL;
+					char   *relname = NULL;
+					List   *names = NIL;
+
+					if (opts.conflictlogtable != NULL)
+					{
+						/* Explicitly check for empty string before any processing. */
+						if (opts.conflictlogtable[0] == '\0')
+							ereport(ERROR,
+									(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+									errmsg("conflict log table name cannot be empty"),
+									errhint("Provide a valid table name or omit the parameter.")));
+
+						names = stringToQualifiedNameList(opts.conflictlogtable,
+														  NULL);
+						nspid = QualifiedNameGetCreationNamespace(names, &relname);
+					}
+
+					/* Fetch the existing conflict table information. */
+					old_relname =
+						get_subscription_conflict_log_table(subid, &old_nspid);
+
+					/*
+					 * If the subscription already uses this conflict log table
+					 * and it exists, just issue a notice.
+					 */
+					if (old_relname != NULL && relname != NULL
+						&& (strcmp(old_relname, relname) == 0) &&
+						old_nspid == nspid &&
+						OidIsValid(get_relname_relid(old_relname, old_nspid)))
+					{
+						char *nspname = get_namespace_name(nspid);
+
+						ereport(NOTICE,
+								(errmsg("\"%s.%s\" is already in use as the conflict log table for this subscription",
+										nspname, relname)));
+						pfree(nspname);
+					}
+					else
+					{
+						ObjectAddress	object;
+
+						/*
+						 * Conflict log tables are recorded as internal
+						 * dependencies of the subscription.  Before
+						 * associating a new table, drop the existing table to
+						 * avoid stale or orphaned relations.
+						 *
+						 * XXX: At present, only conflict log tables are
+						 * managed this way.  In future if we introduce
+						 * additional internal dependencies, we may need
+						 * a targeted deletion to avoid deletion of any
+						 * other objects.
+						 */
+						ObjectAddressSet(object, SubscriptionRelationId, subid);
+						performDeletion(&object, DROP_CASCADE,
+										PERFORM_DELETION_INTERNAL |
+										PERFORM_DELETION_SKIP_ORIGINAL);
+
+						/*
+						 * Need to create a new table if a new name was
+						 * provided.
+						 */
+						if (relname != NULL)
+							create_conflict_log_table(nspid, relname, subid);
+
+						values[Anum_pg_subscription_subconflictlognspid - 1] =
+									ObjectIdGetDatum(nspid);
+
+						if (relname != NULL)
+							values[Anum_pg_subscription_subconflictlogtable - 1] =
+									CStringGetTextDatum(relname);
+						else
+							nulls[Anum_pg_subscription_subconflictlogtable - 1] =
+									true;
+
+						replaces[Anum_pg_subscription_subconflictlognspid - 1] =
+									true;
+						replaces[Anum_pg_subscription_subconflictlogtable - 1] =
+									true;
+					}
+
+					if (old_relname != NULL)
+						pfree(old_relname);
+				}
+
 				update_tuple = true;
 				break;
 			}
@@ -2027,6 +2177,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	Form_pg_subscription form;
 	List	   *rstates;
 	bool		must_use_password;
+	ObjectAddress	object;
 
 	/*
 	 * The launcher may concurrently start a new worker for this subscription.
@@ -2184,6 +2335,19 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	/* Clean up dependencies */
 	deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
 
+	/*
+	 * Conflict log tables are recorded as internal dependencies of the
+	 * subscription.  We must drop the dependent objects before the
+	 * subscription itself is removed.  By using
+	 * PERFORM_DELETION_SKIP_ORIGINAL, we ensure that only the conflict log
+	 * table is reaped while the  subscription remains for the final deletion
+	 * step.
+	 */
+	ObjectAddressSet(object, SubscriptionRelationId, subid);
+	performDeletion(&object, DROP_CASCADE,
+					PERFORM_DELETION_INTERNAL |
+					PERFORM_DELETION_SKIP_ORIGINAL);
+
 	/* Remove any associated relation synchronization states. */
 	RemoveSubscriptionRel(subid, InvalidOid);
 
@@ -3188,3 +3352,140 @@ defGetStreamingMode(DefElem *def)
 					def->defname)));
 	return LOGICALREP_STREAM_OFF;	/* keep compiler quiet */
 }
+
+/*
+ * Builds the TupleDesc for the conflict log table.
+ */
+static TupleDesc
+create_conflict_log_table_tupdesc(void)
+{
+	TupleDesc	tupdesc;
+	int			i;
+
+	tupdesc = CreateTemplateTupleDesc(MAX_CONFLICT_ATTR_NUM);
+
+	for (i = 0; i < MAX_CONFLICT_ATTR_NUM; i++)
+	{
+		Oid type_oid = ConflictLogSchema[i].atttypid;
+
+		/* Special handling for the JSON array type for proper TupleDescInitEntry call */
+		if (type_oid == JSONARRAYOID)
+			type_oid = get_array_type(JSONOID);
+
+		TupleDescInitEntry(tupdesc, i + 1,
+						   ConflictLogSchema[i].attname,
+						   type_oid,
+						   -1, 0);
+	}
+
+	return BlessTupleDesc(tupdesc);
+}
+
+/*
+ * Create conflict log table.
+ *
+ * The subscription owner becomes the owner of this table and has all
+ * privileges on it.
+ */
+static void
+create_conflict_log_table(Oid namespaceId, char *conflictrel, Oid subid)
+{
+	TupleDesc	tupdesc;
+	Oid			relid;
+	ObjectAddress	myself;
+	ObjectAddress	subaddr;
+
+	/* Report an error if the specified conflict log table already exists. */
+	if (OidIsValid(get_relname_relid(conflictrel, namespaceId)))
+		ereport(ERROR,
+				(errcode(ERRCODE_DUPLICATE_TABLE),
+				 errmsg("cannot create conflict log table \"%s.%s\" because a table with that name already exists",
+						get_namespace_name(namespaceId), conflictrel),
+				 errhint("Use a different name for the conflict log table or drop the existing table.")));
+
+	/*
+	 * Conflict log tables must be permanent relations.  Disallow creation in
+	 * temporary namespaces to ensure the same.
+	 */
+	if (isTempNamespace(namespaceId))
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("cannot create conflict log table \"%s\" in a temporary namespace",
+						conflictrel),
+				 errhint("Use a permanent schema.")));
+
+	/* Build the tuple descriptor for the new table. */
+	tupdesc = create_conflict_log_table_tupdesc();
+
+	/* Create conflict log table. */
+	relid = heap_create_with_catalog(conflictrel,
+									 namespaceId,
+									 0,
+									 InvalidOid,
+									 InvalidOid,
+									 InvalidOid,
+									 GetUserId(),
+									 HEAP_TABLE_AM_OID,
+									 tupdesc,
+									 NIL,
+									 RELKIND_RELATION,
+									 RELPERSISTENCE_PERMANENT,
+									 false,
+									 false,
+									 ONCOMMIT_NOOP,
+									 (Datum) 0,
+									 false,
+									 false,
+									 false,
+									 InvalidOid,
+									 NULL);
+
+	/*
+	 * Establish an internal dependency between the conflict log table and
+	 * the subscription.  By using DEPENDENCY_INTERNAL, we ensure the table
+	 * is automatically reaped when the subscription is dropped. This also
+	 * prevents the table from being dropped independently unless the
+	 * subscription itself is removed.
+	 */
+	ObjectAddressSet(myself, RelationRelationId, relid);
+	ObjectAddressSet(subaddr, SubscriptionRelationId, subid);
+	recordDependencyOn(&myself, &subaddr, DEPENDENCY_INTERNAL);
+
+	/* Release tuple descriptor memory. */
+	FreeTupleDesc(tupdesc);
+}
+
+/*
+ * Check if the specified relation is used as a conflict log table by any
+ * subscription.
+ */
+bool
+IsConflictLogTable(Oid relid)
+{
+	Relation		rel;
+	TableScanDesc	scan;
+	HeapTuple		tup;
+	bool			is_clt = false;
+
+	rel = table_open(SubscriptionRelationId, AccessShareLock);
+	scan = table_beginscan_catalog(rel, 0, NULL);
+
+	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
+	{
+		Form_pg_subscription subform = (Form_pg_subscription) GETSTRUCT(tup);
+		Oid		nspid;
+		char   *relname;
+
+		relname = get_subscription_conflict_log_table(subform->oid, &nspid);
+		if (relname && relid == get_relname_relid(relname, nspid))
+		{
+			is_clt = true;
+			break;
+		}
+	}
+
+	table_endscan(scan);
+	table_close(rel, AccessShareLock);
+
+	return is_clt;
+}
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 16695592265..1d357805eca 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -15,13 +15,21 @@
 #include "postgres.h"
 
 #include "access/commit_ts.h"
+#include "access/heapam.h"
 #include "access/tableam.h"
 #include "executor/executor.h"
+#include "funcapi.h"
 #include "pgstat.h"
 #include "replication/conflict.h"
 #include "replication/worker_internal.h"
 #include "storage/lmgr.h"
+#include "utils/builtins.h"
+#include "utils/fmgroids.h"
 #include "utils/lsyscache.h"
+#include "utils/pg_lsn.h"
+#include "utils/jsonb.h"
+
+#define MAX_LOCAL_CONFLICT_INFO_ATTRS 5
 
 static const char *const ConflictTypeNames[] = {
 	[CT_INSERT_EXISTS] = "insert_exists",
@@ -50,8 +58,27 @@ static char *build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
 									   TupleTableSlot *localslot,
 									   TupleTableSlot *remoteslot,
 									   Oid indexoid);
+static void build_index_datums_from_slot(EState *estate, Relation localrel,
+										 TupleTableSlot *slot,
+										 Relation indexDesc, Datum *values,
+										 bool *isnull);
 static char *build_index_value_desc(EState *estate, Relation localrel,
 									TupleTableSlot *slot, Oid indexoid);
+static Datum tuple_table_slot_to_json_datum(TupleTableSlot *slot);
+static Datum tuple_table_slot_to_indextup_json(EState *estate,
+											   Relation localrel,
+											   Oid replica_index,
+											   TupleTableSlot *slot);
+static TupleDesc build_conflict_tupledesc(void);
+static Datum build_local_conflicts_json_array(EState *estate, Relation rel,
+											  ConflictType conflict_type,
+											  List *conflicttuples);
+static void prepare_conflict_log_tuple(EState *estate, Relation rel,
+									   Relation conflictlogrel,
+									   ConflictType conflict_type,
+									   TupleTableSlot *searchslot,
+									   List *conflicttuples,
+									   TupleTableSlot *remoteslot);
 
 /*
  * Get the xmin and commit timestamp data (origin and timestamp) associated
@@ -106,6 +133,7 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
 					TupleTableSlot *remoteslot, List *conflicttuples)
 {
 	Relation	localrel = relinfo->ri_RelationDesc;
+	Relation	conflictlogrel = GetConflictLogTableRel();
 	StringInfoData err_detail;
 
 	initStringInfo(&err_detail);
@@ -120,6 +148,37 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
 								 conflicttuple->ts,
 								 &err_detail);
 
+	/* Insert conflict details to conflict log table. */
+	if (conflictlogrel)
+	{
+		if (ValidateConflictLogTable(conflictlogrel))
+		{
+			/*
+			 * Prepare the conflict log tuple. If the error level is below
+			 * ERROR, insert it immediately. Otherwise, defer the insertion to
+			 * a new transaction after the current one aborts, ensuring the
+			 * insertion of the log tuple is not rolled back.
+			 */
+			prepare_conflict_log_tuple(estate,
+									   relinfo->ri_RelationDesc,
+									   conflictlogrel,
+									   type,
+									   searchslot,
+									   conflicttuples,
+									   remoteslot);
+			if (elevel < ERROR)
+				InsertConflictLogTuple(conflictlogrel);
+		}
+		else
+			ereport(WARNING,
+					errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					errmsg("Conflict log table \"%s.%s\" structure changed, skipping insertion",
+							get_namespace_name(RelationGetNamespace(conflictlogrel)),
+							RelationGetRelationName(conflictlogrel)));
+
+		table_close(conflictlogrel, RowExclusiveLock);
+	}
+
 	pgstat_report_subscription_conflict(MySubscription->oid, type);
 
 	ereport(elevel,
@@ -162,6 +221,141 @@ InitConflictIndexes(ResultRelInfo *relInfo)
 	relInfo->ri_onConflictArbiterIndexes = uniqueIndexes;
 }
 
+/*
+ * GetConflictLogTableRel
+ *
+ * Get the information of the specific conflict log table defined in
+ * pg_subscription and opens the relation for insertion.  The caller is
+ * responsible for  closing the returned relation handle.
+ */
+Relation
+GetConflictLogTableRel(void)
+{
+	Oid			nspid;
+	Oid			conflictlogrelid;
+	Relation	conflictlogrel = NULL;
+	char	   *conflictlogtable;
+
+	/* If conflict log table is not set for the subscription just return. */
+	conflictlogtable = get_subscription_conflict_log_table(
+						MyLogicalRepWorker->subid, &nspid);
+	if (conflictlogtable == NULL)
+		return NULL;
+
+	conflictlogrelid = get_relname_relid(conflictlogtable, nspid);
+	if (OidIsValid(conflictlogrelid))
+		conflictlogrel = table_open(conflictlogrelid, RowExclusiveLock);
+
+	/* Conflict log table is dropped or not accessible. */
+	if (conflictlogrel == NULL)
+		ereport(WARNING,
+				(errcode(ERRCODE_UNDEFINED_TABLE),
+				 errmsg("conflict log table \"%s.%s\" does not exist",
+						get_namespace_name(nspid), conflictlogtable)));
+
+	pfree(conflictlogtable);
+
+	return conflictlogrel;
+}
+
+/*
+ * InsertConflictLogTuple
+ *
+ * Insert conflict log tuple into the conflict log table. It uses
+ * HEAP_INSERT_NO_LOGICAL to explicitly block logical decoding
+ * of the tuple inserted into the conflict log table.
+ */
+void
+InsertConflictLogTuple(Relation conflictlogrel)
+{
+	int			options = HEAP_INSERT_NO_LOGICAL;
+
+	/* A valid tuple must be prepared and stored in MyLogicalRepWorker. */
+	Assert(MyLogicalRepWorker->conflict_log_tuple != NULL);
+
+	heap_insert(conflictlogrel, MyLogicalRepWorker->conflict_log_tuple,
+				GetCurrentCommandId(true), options, NULL);
+
+	/* Free conflict log tuple. */
+	heap_freetuple(MyLogicalRepWorker->conflict_log_tuple);
+	MyLogicalRepWorker->conflict_log_tuple = NULL;
+}
+
+/*
+ * ValidateConflictLogTable - Validate conflict log table
+ *
+ * Validate whether the conflict log table is still suitable for considering as
+ * conflict log table.
+ */
+bool
+ValidateConflictLogTable(Relation rel)
+{
+	Relation    pg_attribute;
+	HeapTuple   atup;
+	ScanKeyData scankey;
+	SysScanDesc scan;
+	Form_pg_attribute attForm;
+	int         attcnt = 0;
+	bool        tbl_ok = true;
+
+	/*
+	 * Check whether the table definition including its column names, data
+	 * types, and column ordering meets the requirements for conflict log
+	 * table.
+	 */
+	pg_attribute = table_open(AttributeRelationId, AccessShareLock);
+	ScanKeyInit(&scankey,
+				Anum_pg_attribute_attrelid,
+				BTEqualStrategyNumber, F_OIDEQ,
+				ObjectIdGetDatum(RelationGetRelid(rel)));
+
+	scan = systable_beginscan(pg_attribute, AttributeRelidNumIndexId, true,
+							  SnapshotSelf, 1, &scankey);
+
+	/* We only need to check up to MAX_CONFLICT_ATTR_NUM attributes */
+	while (HeapTupleIsValid(atup = systable_getnext(scan)))
+	{
+		const ConflictLogColumnDef *expected;
+		int		schema_idx;
+
+		attForm = (Form_pg_attribute) GETSTRUCT(atup);
+
+		/* Skip system columns and dropped columns */
+		if (attForm->attnum < 1 || attForm->attisdropped)
+			continue;
+
+		attcnt++;
+
+		/* attnum 1 corresponds to index 0 in ConflictLogSchema */
+		schema_idx = attForm->attnum - 1;
+
+		/* Check against the central schema definition */
+		if (schema_idx >= MAX_CONFLICT_ATTR_NUM)
+		{
+			/* Found an extra column beyond the required set */
+			tbl_ok = false;
+			break;
+		}
+
+		expected = &ConflictLogSchema[schema_idx];
+
+		if (attForm->atttypid != expected->atttypid ||
+			strcmp(NameStr(attForm->attname), expected->attname) != 0)
+		{
+			tbl_ok = false;
+			break;
+		}
+	}
+
+	systable_endscan(scan);
+	table_close(pg_attribute, AccessShareLock);
+
+	if (attcnt != MAX_CONFLICT_ATTR_NUM || !tbl_ok)
+		return false;
+
+	return true;
+}
+
 /*
  * Add SQLSTATE error code to the current conflict report.
  */
@@ -472,6 +666,40 @@ build_tuple_value_details(EState *estate, ResultRelInfo *relinfo,
 	return tuple_value.data;
 }
 
+/*
+ * Helper function to extract the "raw" index key Datums and their null flags
+ * from a TupleTableSlot, given an already open index descriptor.
+ * This is the reusable core logic.
+ */
+static void
+build_index_datums_from_slot(EState *estate, Relation localrel,
+							 TupleTableSlot *slot,
+							 Relation indexDesc, Datum *values,
+							 bool *isnull)
+{
+	TupleTableSlot *tableslot = slot;
+
+	/*
+	 * If the slot is a virtual slot, copy it into a heap tuple slot as
+	 * FormIndexDatum only works with heap tuple slots.
+	 */
+	if (TTS_IS_VIRTUAL(slot))
+	{
+		/* Slot is created within the EState's tuple table */
+		tableslot = table_slot_create(localrel, &estate->es_tupleTable);
+		tableslot = ExecCopySlot(tableslot, slot);
+	}
+
+	/*
+	 * Initialize ecxt_scantuple for potential use in FormIndexDatum
+	 */
+	GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
+
+	/* Form the index datums */
+	FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values,
+				   isnull);
+}
+
 /*
  * Helper functions to construct a string describing the contents of an index
  * entry. See BuildIndexValueDescription for details.
@@ -487,41 +715,336 @@ build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot,
 	Relation	indexDesc;
 	Datum		values[INDEX_MAX_KEYS];
 	bool		isnull[INDEX_MAX_KEYS];
-	TupleTableSlot *tableslot = slot;
 
-	if (!tableslot)
+	if (!slot)
 		return NULL;
 
 	Assert(CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
 
 	indexDesc = index_open(indexoid, NoLock);
 
-	/*
-	 * If the slot is a virtual slot, copy it into a heap tuple slot as
-	 * FormIndexDatum only works with heap tuple slots.
-	 */
-	if (TTS_IS_VIRTUAL(slot))
+	build_index_datums_from_slot(estate, localrel, slot, indexDesc, values,
+								 isnull);
+
+	index_value = BuildIndexValueDescription(indexDesc, values, isnull);
+
+	index_close(indexDesc, NoLock);
+
+	return index_value;
+}
+
+/*
+ * tuple_table_slot_to_json_datum
+ *
+ * Helper function to convert a TupleTableSlot to Jsonb.
+ */
+static Datum
+tuple_table_slot_to_json_datum(TupleTableSlot *slot)
+{
+	HeapTuple	tuple;
+	Datum		datum;
+	Datum		json;
+
+	Assert(slot != NULL);
+
+	tuple = ExecCopySlotHeapTuple(slot);
+	datum = heap_copy_tuple_as_datum(tuple, slot->tts_tupleDescriptor);
+
+	json = DirectFunctionCall1(row_to_json, datum);
+	heap_freetuple(tuple);
+
+	return json;
+}
+
+/*
+ * tuple_table_slot_to_indextup_json
+ *
+ * Fetch replica identity key from the tuple table slot and convert into a
+ * jsonb datum.
+ */
+static Datum
+tuple_table_slot_to_indextup_json(EState *estate, Relation localrel,
+								  Oid indexid, TupleTableSlot *slot)
+{
+	Relation	indexDesc;
+	Datum		values[INDEX_MAX_KEYS];
+	bool		isnull[INDEX_MAX_KEYS];
+	HeapTuple	tuple;
+	TupleDesc	tupdesc;
+	Datum		datum;
+
+	Assert(slot != NULL);
+
+	Assert(CheckRelationOidLockedByMe(indexid, RowExclusiveLock, true));
+
+	indexDesc = index_open(indexid, NoLock);
+
+	build_index_datums_from_slot(estate, localrel, slot, indexDesc, values,
+								 isnull);
+	tupdesc = RelationGetDescr(indexDesc);
+
+	/* Bless the tupdesc so it can be looked up by row_to_json. */
+	BlessTupleDesc(tupdesc);
+
+	/* Form the replica identity tuple. */
+	tuple = heap_form_tuple(tupdesc, values, isnull);
+	datum = heap_copy_tuple_as_datum(tuple, tupdesc);
+
+	index_close(indexDesc, NoLock);
+
+	/* Convert to a JSONB datum. */
+	return DirectFunctionCall1(row_to_json, datum);
+}
+
+/*
+ * Initialize the tuple descriptor for local conflict info.
+ */
+static TupleDesc
+build_conflict_tupledesc(void)
+{
+	TupleDesc	tupdesc;
+	int			attno = 1;
+
+	tupdesc = CreateTemplateTupleDesc(MAX_LOCAL_CONFLICT_INFO_ATTRS);
+
+	TupleDescInitEntry(tupdesc, (AttrNumber) attno++, "xid",
+						XIDOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) attno++, "commit_ts",
+						TIMESTAMPTZOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) attno++, "origin",
+						TEXTOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) attno++, "key",
+						JSONOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) attno, "tuple",
+						JSONOID, -1, 0);
+
+	BlessTupleDesc(tupdesc);
+
+	Assert(attno == MAX_LOCAL_CONFLICT_INFO_ATTRS);
+
+	return tupdesc;
+}
+
+/*
+ * Builds the local conflicts JSONB array column from the list of
+ * ConflictTupleInfo objects.
+ *
+ * Example output structure:
+ * [ { "xid": "1001", "commit_ts": "...", "origin": "...", "tuple": {...} }, ... ]
+ */
+static Datum
+build_local_conflicts_json_array(EState *estate, Relation rel,
+								 ConflictType conflict_type,
+								 List *conflicttuples)
+{
+	ListCell   *lc;
+	List	   *json_datums = NIL; /* List to hold the row_to_json results (type json) */
+	Datum	   *json_datum_array;
+	bool	   *json_null_array;
+	Datum		json_array_datum;
+	int			num_conflicts;
+	int			i;
+	int16		typlen;
+	bool		typbyval;
+	char		typalign;
+	TupleDesc	tupdesc;
+
+	/* Build local conflicts tuple descriptor. */
+	tupdesc = build_conflict_tupledesc();
+
+	/* Process local conflict tuple list and prepare an array of JSON. */
+	foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples)
 	{
-		tableslot = table_slot_create(localrel, &estate->es_tupleTable);
-		tableslot = ExecCopySlot(tableslot, slot);
+		Datum		values[MAX_LOCAL_CONFLICT_INFO_ATTRS];
+		bool		nulls[MAX_LOCAL_CONFLICT_INFO_ATTRS];
+		char	   *origin_name = NULL;
+		HeapTuple	tuple;
+		Datum		json_datum;
+		int			attno;
+
+		memset(values, 0, sizeof(Datum) * MAX_LOCAL_CONFLICT_INFO_ATTRS);
+		memset(nulls, 0, sizeof(bool) * MAX_LOCAL_CONFLICT_INFO_ATTRS);
+
+		attno = 0;
+		values[attno++] = TransactionIdGetDatum(conflicttuple->xmin);
+
+		if (conflicttuple->ts)
+			values[attno++] = TimestampTzGetDatum(conflicttuple->ts);
+		else
+			nulls[attno++] = true;
+
+		if (conflicttuple->origin != InvalidRepOriginId)
+			replorigin_by_oid(conflicttuple->origin, true, &origin_name);
+
+		/* Store empty string if origin name for the tuple is NULL. */
+		if (origin_name != NULL)
+			values[attno++] = CStringGetTextDatum(origin_name);
+		else
+			nulls[attno++] = true;
+
+		/*
+		 * Add the conflicting key values in the case of a unique constraint
+		 * violation.
+		 */
+		if (conflict_type == CT_INSERT_EXISTS ||
+			conflict_type == CT_UPDATE_EXISTS ||
+			conflict_type == CT_MULTIPLE_UNIQUE_CONFLICTS)
+		{
+			Oid	indexoid = conflicttuple->indexoid;
+
+			Assert(OidIsValid(indexoid) && conflicttuple->slot &&
+				   CheckRelationOidLockedByMe(indexoid, RowExclusiveLock,
+											  true));
+			values[attno++] =
+					tuple_table_slot_to_indextup_json(estate, rel,
+													  indexoid,
+													  conflicttuple->slot);
+		}
+		else
+			nulls[attno++] = true;
+
+		/* Convert conflicting tuple to JSON datum. */
+		if (conflicttuple->slot)
+			values[attno] = tuple_table_slot_to_json_datum(conflicttuple->slot);
+		else
+			nulls[attno] = true;
+
+		Assert(attno + 1 == MAX_LOCAL_CONFLICT_INFO_ATTRS);
+
+		tuple = heap_form_tuple(tupdesc, values, nulls);
+
+		json_datum = heap_copy_tuple_as_datum(tuple, tupdesc);
+
+		/*
+		 * Build the higher level JSON datum in format described in function
+		 * header.
+		 */
+		json_datum = DirectFunctionCall1(row_to_json, json_datum);
+
+		/* Done with the temporary tuple. */
+		heap_freetuple(tuple);
+
+		/* Add to the array element. */
+		json_datums = lappend(json_datums, (void *) json_datum);
 	}
 
-	/*
-	 * Initialize ecxt_scantuple for potential use in FormIndexDatum when
-	 * index expressions are present.
-	 */
-	GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot;
+	num_conflicts = list_length(json_datums);
 
-	/*
-	 * The values/nulls arrays passed to BuildIndexValueDescription should be
-	 * the results of FormIndexDatum, which are the "raw" input to the index
-	 * AM.
-	 */
-	FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, isnull);
+	json_datum_array = (Datum *) palloc(num_conflicts * sizeof(Datum));
+	json_null_array = (bool *) palloc0(num_conflicts * sizeof(bool));
 
-	index_value = BuildIndexValueDescription(indexDesc, values, isnull);
+	i = 0;
+	foreach(lc, json_datums)
+	{
+		json_datum_array[i] = (Datum) lfirst(lc);
+		i++;
+	}
 
-	index_close(indexDesc, NoLock);
+	/* Construct the json[] array Datum. */
+	get_typlenbyvalalign(JSONOID, &typlen, &typbyval, &typalign);
+	json_array_datum = PointerGetDatum(construct_array(json_datum_array,
+													   num_conflicts,
+													   JSONOID,
+													   typlen,
+													   typbyval,
+													   typalign));
+	pfree(json_datum_array);
+	pfree(json_null_array);
+
+	return json_array_datum;
+}
 
-	return index_value;
+/*
+ * prepare_conflict_log_tuple
+ *
+ * This routine prepares a tuple detailing a conflict encountered during
+ * logical replication. The prepared tuple will be stored in
+ * MyLogicalRepWorker->conflict_log_tuple which should be inserted into the
+ * conflict log table by calling InsertConflictLogTuple.
+ */
+static void
+prepare_conflict_log_tuple(EState *estate, Relation rel,
+						   Relation conflictlogrel,
+						   ConflictType conflict_type,
+						   TupleTableSlot *searchslot,
+						   List *conflicttuples,
+						   TupleTableSlot *remoteslot)
+{
+	Datum		values[MAX_CONFLICT_ATTR_NUM];
+	bool		nulls[MAX_CONFLICT_ATTR_NUM];
+	int			attno;
+	char	   *remote_origin = NULL;
+	MemoryContext	oldctx;
+
+	Assert(MyLogicalRepWorker->conflict_log_tuple == NULL);
+
+	/* Initialize values and nulls arrays. */
+	memset(values, 0, sizeof(Datum) * MAX_CONFLICT_ATTR_NUM);
+	memset(nulls, 0, sizeof(bool) * MAX_CONFLICT_ATTR_NUM);
+
+	/* Populate the values and nulls arrays. */
+	attno = 0;
+	values[attno++] = ObjectIdGetDatum(RelationGetRelid(rel));
+
+	values[attno++] =
+			CStringGetTextDatum(get_namespace_name(RelationGetNamespace(rel)));
+
+	values[attno++] = CStringGetTextDatum(RelationGetRelationName(rel));
+
+	values[attno++] = CStringGetTextDatum(ConflictTypeNames[conflict_type]);
+
+	if (TransactionIdIsValid(remote_xid))
+		values[attno++] = TransactionIdGetDatum(remote_xid);
+	else
+		nulls[attno++] = true;
+
+	values[attno++] = LSNGetDatum(remote_final_lsn);
+
+	if (remote_commit_ts > 0)
+		values[attno++] = TimestampTzGetDatum(remote_commit_ts);
+	else
+		nulls[attno++] = true;
+
+	if (replorigin_session_origin != InvalidRepOriginId)
+		replorigin_by_oid(replorigin_session_origin, true, &remote_origin);
+
+	if (remote_origin != NULL)
+		values[attno++] = CStringGetTextDatum(remote_origin);
+	else
+		nulls[attno++] = true;
+
+	if (!TupIsNull(searchslot))
+	{
+		Oid		replica_index = GetRelationIdentityOrPK(rel);
+
+		/*
+		 * If the table has a valid replica identity index, build the index
+		 * json datum from key value. Otherwise, construct it from the complete
+		 * tuple in REPLICA IDENTITY FULL cases.
+		 */
+		if (OidIsValid(replica_index))
+			values[attno++] = tuple_table_slot_to_indextup_json(estate, rel,
+																replica_index,
+																searchslot);
+		else
+			values[attno++] = tuple_table_slot_to_json_datum(searchslot);
+	}
+	else
+		nulls[attno++] = true;
+
+	if (!TupIsNull(remoteslot))
+		values[attno++] = tuple_table_slot_to_json_datum(remoteslot);
+	else
+		nulls[attno++] = true;
+
+	values[attno] = build_local_conflicts_json_array(estate, rel,
+													 conflict_type,
+													 conflicttuples);
+
+	Assert(attno + 1 == MAX_CONFLICT_ATTR_NUM);
+
+	oldctx = MemoryContextSwitchTo(ApplyContext);
+	MyLogicalRepWorker->conflict_log_tuple =
+		heap_form_tuple(RelationGetDescr(conflictlogrel), values, nulls);
+	MemoryContextSwitchTo(oldctx);
 }
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 3991e1495d4..bc7e1d9ebde 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -477,6 +477,7 @@ retry:
 	worker->oldest_nonremovable_xid = retain_dead_tuples
 		? MyReplicationSlot->data.xmin
 		: InvalidTransactionId;
+	worker->conflict_log_tuple = NULL;
 	worker->last_lsn = InvalidXLogRecPtr;
 	TIMESTAMP_NOBEGIN(worker->last_send_time);
 	TIMESTAMP_NOBEGIN(worker->last_recv_time);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index fc64476a9ef..e8f7ab3d5d6 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -482,7 +482,9 @@ static bool MySubscriptionValid = false;
 static List *on_commit_wakeup_workers_subids = NIL;
 
 bool		in_remote_transaction = false;
-static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+TransactionId	remote_xid = InvalidTransactionId;
+TimestampTz	remote_commit_ts = 0;
 
 /* fields valid only when processing streamed transaction */
 static bool in_streamed_transaction = false;
@@ -1219,6 +1221,8 @@ apply_handle_begin(StringInfo s)
 	set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
 
 	remote_final_lsn = begin_data.final_lsn;
+	remote_commit_ts = begin_data.committime;
+	remote_xid = begin_data.xid;
 
 	maybe_start_skipping_changes(begin_data.final_lsn);
 
@@ -1745,6 +1749,10 @@ apply_handle_stream_start(StringInfo s)
 	/* extract XID of the top-level transaction */
 	stream_xid = logicalrep_read_stream_start(s, &first_segment);
 
+	remote_xid = stream_xid;
+	remote_final_lsn = InvalidXLogRecPtr;
+	remote_commit_ts = 0;
+
 	if (!TransactionIdIsValid(stream_xid))
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -5609,6 +5617,33 @@ start_apply(XLogRecPtr origin_startpos)
 			pgstat_report_subscription_error(MySubscription->oid,
 											 MyLogicalRepWorker->type);
 
+			/*
+			 * Insert any pending conflict log tuple under a new transaction.
+			 */
+			if (MyLogicalRepWorker->conflict_log_tuple != NULL)
+			{
+				Relation	conflictlogrel;
+
+				StartTransactionCommand();
+				PushActiveSnapshot(GetTransactionSnapshot());
+
+				/* Open conflict log table and insert the tuple. */
+				conflictlogrel = GetConflictLogTableRel();
+				if (ValidateConflictLogTable(conflictlogrel))
+					InsertConflictLogTuple(conflictlogrel);
+				else
+					ereport(WARNING,
+							errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+							errmsg("Conflict log table \"%s.%s\" structure changed, skipping insertion",
+								   get_namespace_name(RelationGetNamespace(conflictlogrel)),
+								   RelationGetRelationName(conflictlogrel)));
+				MyLogicalRepWorker->conflict_log_tuple = NULL;
+				table_close(conflictlogrel, RowExclusiveLock);
+
+				PopActiveSnapshot();
+				CommitTransactionCommand();
+			}
+
 			PG_RE_THROW();
 		}
 	}
diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c
index 5aa7a26d95c..b9090f7d17d 100644
--- a/src/backend/utils/cache/lsyscache.c
+++ b/src/backend/utils/cache/lsyscache.c
@@ -3879,3 +3879,41 @@ get_subscription_name(Oid subid, bool missing_ok)
 
 	return subname;
 }
+
+/*
+ * get_subscription_conflict_log_table
+ *
+ * Get conflict log table name and namespace id from subscription.
+ */
+char *
+get_subscription_conflict_log_table(Oid subid, Oid *nspid)
+{
+	HeapTuple	tup;
+	Datum		datum;
+	bool		isnull;
+	char	   *relname = NULL;
+	Form_pg_subscription subform;
+
+	*nspid = InvalidOid;
+
+	tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
+
+	if (!HeapTupleIsValid(tup))
+		elog(ERROR, "cache lookup failed for subscription %u", subid);
+
+	subform = (Form_pg_subscription) GETSTRUCT(tup);
+
+	/* Get conflict log table name. */
+	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
+							tup,
+							Anum_pg_subscription_subconflictlogtable,
+							&isnull);
+	if (!isnull)
+	{
+		*nspid = subform->subconflictlognspid;
+		relname = pstrdup(TextDatumGetCString(datum));
+	}
+
+	ReleaseSysCache(tup);
+	return relname;
+}
diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c
index 36f24502842..906167fe466 100644
--- a/src/bin/psql/describe.c
+++ b/src/bin/psql/describe.c
@@ -6806,7 +6806,7 @@ describeSubscriptions(const char *pattern, bool verbose)
 	printQueryOpt myopt = pset.popt;
 	static const bool translate_columns[] = {false, false, false, false,
 		false, false, false, false, false, false, false, false, false, false,
-	false, false, false, false};
+	false, false, false, false, false};
 
 	if (pset.sversion < 100000)
 	{
@@ -6900,15 +6900,25 @@ describeSubscriptions(const char *pattern, bool verbose)
 			appendPQExpBuffer(&buf,
 							  ", subskiplsn AS \"%s\"\n",
 							  gettext_noop("Skip LSN"));
+
+		/* Conflict log table is only supported in v19 and higher */
+		if (pset.sversion >= 190000)
+			appendPQExpBuffer(&buf,
+							  ", (CASE\n"
+							  "    WHEN subconflictlogtable IS NULL THEN NULL\n"
+							  "    ELSE pg_catalog.quote_ident(n.nspname) || '.' ||"
+							  "    pg_catalog.quote_ident(subconflictlogtable::text)\n"
+							  "END) AS \"%s\"\n",
+							  gettext_noop("Conflict log table"));
 	}
 
 	/* Only display subscriptions in current database. */
-	appendPQExpBufferStr(&buf,
-						 "FROM pg_catalog.pg_subscription\n"
-						 "WHERE subdbid = (SELECT oid\n"
-						 "                 FROM pg_catalog.pg_database\n"
-						 "                 WHERE datname = pg_catalog.current_database())");
-
+	appendPQExpBuffer(&buf,
+					  "FROM pg_catalog.pg_subscription "
+					  "LEFT JOIN pg_catalog.pg_namespace AS n ON subconflictlognspid = n.oid\n"
+					  "WHERE subdbid = (SELECT oid\n"
+					  "                 FROM pg_catalog.pg_database\n"
+					  "                 WHERE datname = pg_catalog.current_database())");
 	if (!validateSQLNamePattern(&buf, pattern, true, false,
 								NULL, "subname", NULL,
 								NULL,
diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c
index b1ff6f6cd94..00e45423879 100644
--- a/src/bin/psql/tab-complete.in.c
+++ b/src/bin/psql/tab-complete.in.c
@@ -2344,8 +2344,8 @@ match_previous_words(int pattern_id,
 		COMPLETE_WITH("(", "PUBLICATION");
 	/* ALTER SUBSCRIPTION <name> SET ( */
 	else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "SET", "("))
-		COMPLETE_WITH("binary", "disable_on_error", "failover",
-					  "max_retention_duration", "origin",
+		COMPLETE_WITH("binary", "conflict_log_table", "disable_on_error",
+					  "failover", "max_retention_duration", "origin",
 					  "password_required", "retain_dead_tuples",
 					  "run_as_owner", "slot_name", "streaming",
 					  "synchronous_commit", "two_phase");
@@ -3828,8 +3828,8 @@ match_previous_words(int pattern_id,
 		COMPLETE_WITH("WITH (");
 	/* Complete "CREATE SUBSCRIPTION <name> ...  WITH ( <opt>" */
 	else if (Matches("CREATE", "SUBSCRIPTION", MatchAnyN, "WITH", "("))
-		COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
-					  "disable_on_error", "enabled", "failover",
+		COMPLETE_WITH("binary", "conflict_log_table", "connect", "copy_data",
+					  "create_slot", "disable_on_error", "enabled", "failover",
 					  "max_retention_duration", "origin",
 					  "password_required", "retain_dead_tuples",
 					  "run_as_owner", "slot_name", "streaming",
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 55cb9b1eefa..f4526c15ec3 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -80,6 +80,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 
 	bool		subretaindeadtuples;	/* True if dead tuples useful for
 										 * conflict detection are retained */
+	Oid			subconflictlognspid;	/* Namespace Oid in which the conflict
+										 * log table is created. */
 
 	int32		submaxretention;	/* The maximum duration (in milliseconds)
 									 * for which information useful for
@@ -105,6 +107,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 
 	/* Only publish data originating from the specified origin */
 	text		suborigin BKI_DEFAULT(LOGICALREP_ORIGIN_ANY);
+
+	/* Conflict log table name if specified */
+	text		subconflictlogtable;
 #endif
 } FormData_pg_subscription;
 
diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h
index fb4e26a51a4..6c062b0991f 100644
--- a/src/include/commands/subscriptioncmds.h
+++ b/src/include/commands/subscriptioncmds.h
@@ -36,4 +36,6 @@ extern void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
 									   bool retention_active,
 									   bool max_retention_set);
 
+extern bool IsConflictLogTable(Oid relid);
+
 #endif							/* SUBSCRIPTIONCMDS_H */
diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h
index c8fbf9e51b8..c7e67bd300e 100644
--- a/src/include/replication/conflict.h
+++ b/src/include/replication/conflict.h
@@ -9,9 +9,12 @@
 #ifndef CONFLICT_H
 #define CONFLICT_H
 
+#include "access/htup.h"
 #include "access/xlogdefs.h"
+#include "catalog/pg_type.h"
 #include "nodes/pg_list.h"
 #include "utils/timestamp.h"
+#include "utils/relcache.h"
 
 /* Avoid including execnodes.h here */
 typedef struct EState EState;
@@ -79,6 +82,32 @@ typedef struct ConflictTupleInfo
 								 * conflicting local row occurred */
 } ConflictTupleInfo;
 
+/* Structure to hold metadata for one column of the conflict log table */
+typedef struct ConflictLogColumnDef
+{
+	const char *attname;    /* Column name */
+	Oid         atttypid;   /* Data type OID */
+} ConflictLogColumnDef;
+
+/* The single source of truth for the conflict log table schema */
+static const ConflictLogColumnDef ConflictLogSchema[] =
+{
+	{ .attname = "relid",            .atttypid = OIDOID },
+	{ .attname = "schemaname",       .atttypid = TEXTOID },
+	{ .attname = "relname",          .atttypid = TEXTOID },
+	{ .attname = "conflict_type",    .atttypid = TEXTOID },
+	{ .attname = "remote_xid",       .atttypid = XIDOID },
+	{ .attname = "remote_commit_lsn",.atttypid = LSNOID },
+	{ .attname = "remote_commit_ts", .atttypid = TIMESTAMPTZOID },
+	{ .attname = "remote_origin",    .atttypid = TEXTOID },
+	{ .attname = "replica_identity", .atttypid = JSONOID },
+	{ .attname = "remote_tuple",     .atttypid = JSONOID },
+	{ .attname = "local_conflicts",  .atttypid = JSONARRAYOID }
+};
+
+/* Define the count using the array size */
+#define MAX_CONFLICT_ATTR_NUM (sizeof(ConflictLogSchema) / sizeof(ConflictLogSchema[0]))
+
 extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
 									TransactionId *xmin,
 									RepOriginId *localorigin,
@@ -89,4 +118,7 @@ extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
 								TupleTableSlot *remoteslot,
 								List *conflicttuples);
 extern void InitConflictIndexes(ResultRelInfo *relInfo);
+extern Relation GetConflictLogTableRel(void);
+extern void InsertConflictLogTuple(Relation conflictlogrel);
+extern bool ValidateConflictLogTable(Relation rel);
 #endif
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index f081619f151..711c04c7297 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -101,6 +101,9 @@ typedef struct LogicalRepWorker
 	 */
 	TransactionId oldest_nonremovable_xid;
 
+	/* A conflict log tuple that is prepared but not yet inserted. */
+	HeapTuple	conflict_log_tuple;
+
 	/* Stats. */
 	XLogRecPtr	last_lsn;
 	TimestampTz last_send_time;
@@ -256,6 +259,10 @@ extern PGDLLIMPORT bool InitializingApplyWorker;
 
 extern PGDLLIMPORT List *table_states_not_ready;
 
+extern XLogRecPtr remote_final_lsn;
+extern TimestampTz remote_commit_ts;
+extern TransactionId	remote_xid;
+
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(LogicalRepWorkerType wtype,
 												Oid subid, Oid relid,
diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h
index 50fb149e9ac..3bebf04bf51 100644
--- a/src/include/utils/lsyscache.h
+++ b/src/include/utils/lsyscache.h
@@ -210,6 +210,7 @@ extern Oid	get_publication_oid(const char *pubname, bool missing_ok);
 extern char *get_publication_name(Oid pubid, bool missing_ok);
 extern Oid	get_subscription_oid(const char *subname, bool missing_ok);
 extern char *get_subscription_name(Oid subid, bool missing_ok);
+extern char *get_subscription_conflict_log_table(Oid subid, Oid *nspid);
 
 #define type_is_array(typid)  (get_element_type(typid) != InvalidOid)
 /* type_is_array_domain accepts both plain arrays and domains over arrays */
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 327d1e7731f..f96687e107c 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -116,18 +116,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
 \dRs+ regress_testsub4
-                                                                                                                                                  List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | none   | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000
+                                                                                                                                                             List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  | Conflict log table 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | none   | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
 \dRs+ regress_testsub4
-                                                                                                                                                  List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000
+                                                                                                                                                             List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  | Conflict log table 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000 | 
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub3;
@@ -145,10 +145,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                                                                                                                  List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000
+                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  | Conflict log table 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -157,10 +157,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname');
 ALTER SUBSCRIPTION regress_testsub SET (password_required = false);
 ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true);
 \dRs+
-                                                                                                                                                      List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |           Conninfo           |  Skip LSN  
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | f                 | t             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist2 | 0/00000000
+                                                                                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |           Conninfo           |  Skip LSN  | Conflict log table 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | f                 | t             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist2 | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (password_required = true);
@@ -176,10 +176,10 @@ ERROR:  unrecognized subscription parameter: "create_slot"
 -- ok
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
 \dRs+
-                                                                                                                                                      List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |           Conninfo           |  Skip LSN  
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist2 | 0/00012345
+                                                                                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |           Conninfo           |  Skip LSN  | Conflict log table 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist2 | 0/00012345 | 
 (1 row)
 
 -- ok - with lsn = NONE
@@ -188,10 +188,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
 ERROR:  invalid WAL location (LSN): 0/0
 \dRs+
-                                                                                                                                                      List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |           Conninfo           |  Skip LSN  
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist2 | 0/00000000
+                                                                                                                                                                 List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |           Conninfo           |  Skip LSN  | Conflict log table 
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist2 | 0/00000000 | 
 (1 row)
 
 BEGIN;
@@ -223,10 +223,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
 ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                                                                                                                        List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |           Conninfo           |  Skip LSN  
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | local              | dbname=regress_doesnotexist2 | 0/00000000
+                                                                                                                                                                   List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |           Conninfo           |  Skip LSN  | Conflict log table 
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------+--------------------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | local              | dbname=regress_doesnotexist2 | 0/00000000 | 
 (1 row)
 
 -- rename back to keep the rest simple
@@ -255,19 +255,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
 \dRs+
-                                                                                                                                                  List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000
+                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  | Conflict log table 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                                                                                  List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000
+                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  | Conflict log table 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000 | 
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -279,27 +279,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
 \dRs+
-                                                                                                                                                  List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000
+                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  | Conflict log table 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
 \dRs+
-                                                                                                                                                  List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000
+                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  | Conflict log table 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                                                                                  List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000
+                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  | Conflict log table 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000 | 
 (1 row)
 
 -- fail - publication already exists
@@ -314,10 +314,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
 ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
 ERROR:  publication "testpub1" is already in subscription "regress_testsub"
 \dRs+
-                                                                                                                                                          List of subscriptions
-      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000
+                                                                                                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  | Conflict log table 
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000 | 
 (1 row)
 
 -- fail - publication used more than once
@@ -332,10 +332,10 @@ ERROR:  publication "testpub3" is not in subscription "regress_testsub"
 -- ok - delete publications
 ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
 \dRs+
-                                                                                                                                                  List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000
+                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  | Conflict log table 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000 | 
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -371,19 +371,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
 \dRs+
-                                                                                                                                                  List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | p                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000
+                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  | Conflict log table 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | p                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000 | 
 (1 row)
 
 -- we can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
-                                                                                                                                                  List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000
+                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  | Conflict log table 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -393,10 +393,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
 \dRs+
-                                                                                                                                                  List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000
+                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  | Conflict log table 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -409,18 +409,18 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
 \dRs+
-                                                                                                                                                  List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000
+                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  | Conflict log table 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 \dRs+
-                                                                                                                                                  List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | t                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000
+                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  | Conflict log table 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | t                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -433,10 +433,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
 \dRs+
-                                                                                                                                                  List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000
+                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  | Conflict log table 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -450,19 +450,19 @@ NOTICE:  max_retention_duration is ineffective when retain_dead_tuples is disabl
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
 \dRs+
-                                                                                                                                                  List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                   1000 | f                | off                | dbname=regress_doesnotexist | 0/00000000
+                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  | Conflict log table 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                   1000 | f                | off                | dbname=regress_doesnotexist | 0/00000000 | 
 (1 row)
 
 -- ok
 ALTER SUBSCRIPTION regress_testsub SET (max_retention_duration = 0);
 \dRs+
-                                                                                                                                                  List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000
+                                                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  | Conflict log table 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+--------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -517,7 +517,201 @@ COMMIT;
 -- ok, owning it is enough for this stuff
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
+--
+-- CONFLICT LOG TABLE TESTS
+--
+SET SESSION AUTHORIZATION 'regress_subscription_user';
+-- fail - conflict_log_table specified when table already exists
+CREATE TABLE public.regress_conflict_log_temp (id int);
+CREATE SUBSCRIPTION regress_conflict_fail CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = 'public.regress_conflict_log_temp');
+ERROR:  cannot create conflict log table "public.regress_conflict_log_temp" because a table with that name already exists
+HINT:  Use a different name for the conflict log table or drop the existing table.
+DROP TABLE public.regress_conflict_log_temp;
+-- ok - conflict_log_table creation with CREATE SUBSCRIPTION
+CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = 'public.regress_conflict_log1');
+WARNING:  subscription was created, but is not connected
+HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
+-- check metadata in pg_subscription
+SELECT subname, subconflictlogtable, subconflictlognspid = (SELECT oid FROM pg_namespace WHERE nspname = 'public') AS is_public_schema
+FROM pg_subscription WHERE subname = 'regress_conflict_test1';
+        subname         |  subconflictlogtable  | is_public_schema 
+------------------------+-----------------------+------------------
+ regress_conflict_test1 | regress_conflict_log1 | t
+(1 row)
+
+-- check if the table exists and has the correct schema (11 columns)
+SELECT count(*) FROM pg_attribute WHERE attrelid = 'public.regress_conflict_log1'::regclass AND attnum > 0;
+ count 
+-------
+    11
+(1 row)
+
+-- check a specific column type (e.g., remote_tuple should be JSON)
+SELECT format_type(atttypid, atttypmod) FROM pg_attribute WHERE attrelid = 'public.regress_conflict_log1'::regclass AND attname = 'remote_tuple';
+ format_type 
+-------------
+ json
+(1 row)
+
+\dRs+
+                                                                                                                                                                     List of subscriptions
+          Name          |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  |      Conflict log table      
+------------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+------------------------------
+ regress_conflict_test1 | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000 | public.regress_conflict_log1
+(1 row)
+
+-- ok - adding conflict_log_table with ALTER SUBSCRIPTION
+CREATE SUBSCRIPTION regress_conflict_test2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false);
+WARNING:  subscription was created, but is not connected
+HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log2');
+-- check metadata after ALTER
+SELECT subname, subconflictlogtable, subconflictlognspid = (SELECT oid FROM pg_namespace WHERE nspname = 'public') AS is_public_schema
+FROM pg_subscription WHERE subname = 'regress_conflict_test2';
+        subname         |  subconflictlogtable  | is_public_schema 
+------------------------+-----------------------+------------------
+ regress_conflict_test2 | regress_conflict_log2 | t
+(1 row)
+
+-- ok - change the conflict log table name for an existing subscription that already had one
+CREATE SCHEMA clt;
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'clt.regress_conflict_log3');
+SELECT subname, subconflictlogtable, subconflictlognspid = (SELECT oid FROM pg_namespace WHERE nspname = 'public') AS is_public_schema
+FROM pg_subscription WHERE subname = 'regress_conflict_test2';
+        subname         |  subconflictlogtable  | is_public_schema 
+------------------------+-----------------------+------------------
+ regress_conflict_test2 | regress_conflict_log3 | f
+(1 row)
+
+\dRs+
+                                                                                                                                                                     List of subscriptions
+          Name          |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           |  Skip LSN  |      Conflict log table      
+------------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------+------------------------------
+ regress_conflict_test1 | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000 | public.regress_conflict_log1
+ regress_conflict_test2 | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | 0/00000000 | clt.regress_conflict_log3
+(2 rows)
+
+-- check the new table was created and the old table was dropped
+SELECT count(*) FROM pg_class WHERE relname = 'regress_conflict_log2';
+ count 
+-------
+     0
+(1 row)
+
+SELECT count(*) FROM pg_attribute WHERE attrelid = 'clt.regress_conflict_log3'::regclass AND attnum > 0;
+ count 
+-------
+    11
+(1 row)
+
+-- ok (NOTICE) - set conflict_log_table to one already used by this subscription
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'clt.regress_conflict_log3');
+NOTICE:  "clt.regress_conflict_log3" is already in use as the conflict log table for this subscription
+-- fail - try to publish the conflict_log_table
+CREATE PUBLICATION pub FOR TABLE clt.regress_conflict_log3;
+ERROR:  cannot add relation "clt.regress_conflict_log3" to publication
+DETAIL:  This operation is not supported for conflict log tables.
+-- suppress warning that depends on wal_level
+SET client_min_messages = 'ERROR';
+-- ok - conflict_log_table should not be published with ALL TABLE
+CREATE PUBLICATION pub FOR TABLES IN SCHEMA clt;
+SELECT * FROM pg_publication_tables WHERE pubname = 'pub';
+ pubname | schemaname | tablename | attnames | rowfilter 
+---------+------------+-----------+----------+-----------
+(0 rows)
+
+\dt+ clt.regress_conflict_log3
+                                              List of tables
+ Schema |         Name          | Type  |           Owner           | Persistence |  Size   | Description 
+--------+-----------------------+-------+---------------------------+-------------+---------+-------------
+ clt    | regress_conflict_log3 | table | regress_subscription_user | permanent   | 0 bytes | 
+(1 row)
+
+DROP PUBLICATION pub;
+-- fail - set conflict_log_table to one already used by a different subscription
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log1');
+ERROR:  cannot create conflict log table "public.regress_conflict_log1" because a table with that name already exists
+HINT:  Use a different name for the conflict log table or drop the existing table.
+-- ok - dropping subscription also drops the log table
+ALTER SUBSCRIPTION regress_conflict_test1 DISABLE;
+ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_test1;
+-- should return NULL, meaning the table was dropped
+SELECT to_regclass('public.regress_conflict_log1');
+ to_regclass 
+-------------
+ 
+(1 row)
+
+-- fail - dropping log table manually not allowed
+CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = 'public.regress_conflict_log1');
+DROP TABLE public.regress_conflict_log1;
+ERROR:  cannot drop table regress_conflict_log1 because subscription regress_conflict_test1 requires it
+HINT:  You can drop subscription regress_conflict_test1 instead.
+ALTER SUBSCRIPTION regress_conflict_test1 DISABLE;
+ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_test1;
+-- should return NULL, meaning the subscription was dropped successfully
+SELECT subname FROM pg_subscription WHERE subname = 'regress_conflict_test1';
+ subname 
+---------
+(0 rows)
+
+-- ok - create subscription with conflict_log_table = NONE
+CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = NONE);
+SELECT subname, subconflictlogtable FROM pg_subscription WHERE subname = 'regress_conflict_test2';
+        subname         |  subconflictlogtable  
+------------------------+-----------------------
+ regress_conflict_test2 | regress_conflict_log3
+(1 row)
+
+-- ok - alter subscription with valid conflict log table name
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log1');
+SELECT subname, subconflictlogtable FROM pg_subscription WHERE subname = 'regress_conflict_test2';
+        subname         |  subconflictlogtable  
+------------------------+-----------------------
+ regress_conflict_test2 | regress_conflict_log1
+(1 row)
+
+-- ok - pg_relation_is_publishable should return false for conflict log table
+SELECT pg_relation_is_publishable('public.regress_conflict_log1');
+ pg_relation_is_publishable 
+----------------------------
+ f
+(1 row)
+
+-- ok - alter subscription with conflict log table = NONE
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = NONE);
+-- should return NULL, meaning the table was dropped
+SELECT to_regclass('public.regress_conflict_log1');
+ to_regclass 
+-------------
+ 
+(1 row)
+
+SELECT subname, subconflictlogtable FROM pg_subscription WHERE subname = 'regress_conflict_test2';
+        subname         | subconflictlogtable 
+------------------------+---------------------
+ regress_conflict_test2 | 
+(1 row)
+
+ALTER SUBSCRIPTION regress_conflict_test1 DISABLE;
+ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_test1;
+-- Clean up remaining test subscription
+ALTER SUBSCRIPTION regress_conflict_test2 DISABLE;
+ALTER SUBSCRIPTION regress_conflict_test2 SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_test2;
+-- fail - can not create conflict log table in pg_temp
+CREATE SUBSCRIPTION regress_conflict_test2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = 'pg_temp.regress_conflict_log1');
+ERROR:  cannot create conflict log table "regress_conflict_log1" in a temporary namespace
+HINT:  Use a permanent schema.
+-- fail - empty string is not allowed for conflict log table name
+CREATE SUBSCRIPTION regress_conflict_test2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = '');
+ERROR:  conflict log table name cannot be empty
+HINT:  Provide a valid table name or omit the parameter.
 RESET SESSION AUTHORIZATION;
+DROP SCHEMA clt;
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
 DROP ROLE regress_subscription_user3;
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index ef0c298d2df..6b6f1503145 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -365,7 +365,121 @@ COMMIT;
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 
+--
+-- CONFLICT LOG TABLE TESTS
+--
+
+SET SESSION AUTHORIZATION 'regress_subscription_user';
+
+-- fail - conflict_log_table specified when table already exists
+CREATE TABLE public.regress_conflict_log_temp (id int);
+CREATE SUBSCRIPTION regress_conflict_fail CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = 'public.regress_conflict_log_temp');
+DROP TABLE public.regress_conflict_log_temp;
+
+-- ok - conflict_log_table creation with CREATE SUBSCRIPTION
+CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = 'public.regress_conflict_log1');
+
+-- check metadata in pg_subscription
+SELECT subname, subconflictlogtable, subconflictlognspid = (SELECT oid FROM pg_namespace WHERE nspname = 'public') AS is_public_schema
+FROM pg_subscription WHERE subname = 'regress_conflict_test1';
+
+-- check if the table exists and has the correct schema (11 columns)
+SELECT count(*) FROM pg_attribute WHERE attrelid = 'public.regress_conflict_log1'::regclass AND attnum > 0;
+
+-- check a specific column type (e.g., remote_tuple should be JSON)
+SELECT format_type(atttypid, atttypmod) FROM pg_attribute WHERE attrelid = 'public.regress_conflict_log1'::regclass AND attname = 'remote_tuple';
+
+\dRs+
+
+-- ok - adding conflict_log_table with ALTER SUBSCRIPTION
+CREATE SUBSCRIPTION regress_conflict_test2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false);
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log2');
+
+-- check metadata after ALTER
+SELECT subname, subconflictlogtable, subconflictlognspid = (SELECT oid FROM pg_namespace WHERE nspname = 'public') AS is_public_schema
+FROM pg_subscription WHERE subname = 'regress_conflict_test2';
+
+-- ok - change the conflict log table name for an existing subscription that already had one
+CREATE SCHEMA clt;
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'clt.regress_conflict_log3');
+SELECT subname, subconflictlogtable, subconflictlognspid = (SELECT oid FROM pg_namespace WHERE nspname = 'public') AS is_public_schema
+FROM pg_subscription WHERE subname = 'regress_conflict_test2';
+\dRs+
+
+-- check the new table was created and the old table was dropped
+SELECT count(*) FROM pg_class WHERE relname = 'regress_conflict_log2';
+SELECT count(*) FROM pg_attribute WHERE attrelid = 'clt.regress_conflict_log3'::regclass AND attnum > 0;
+
+-- ok (NOTICE) - set conflict_log_table to one already used by this subscription
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'clt.regress_conflict_log3');
+
+-- fail - try to publish the conflict_log_table
+CREATE PUBLICATION pub FOR TABLE clt.regress_conflict_log3;
+
+-- suppress warning that depends on wal_level
+SET client_min_messages = 'ERROR';
+
+-- ok - conflict_log_table should not be published with ALL TABLE
+CREATE PUBLICATION pub FOR TABLES IN SCHEMA clt;
+SELECT * FROM pg_publication_tables WHERE pubname = 'pub';
+\dt+ clt.regress_conflict_log3
+DROP PUBLICATION pub;
+
+-- fail - set conflict_log_table to one already used by a different subscription
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log1');
+
+-- ok - dropping subscription also drops the log table
+ALTER SUBSCRIPTION regress_conflict_test1 DISABLE;
+ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_test1;
+
+-- should return NULL, meaning the table was dropped
+SELECT to_regclass('public.regress_conflict_log1');
+
+-- fail - dropping log table manually not allowed
+CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = 'public.regress_conflict_log1');
+DROP TABLE public.regress_conflict_log1;
+ALTER SUBSCRIPTION regress_conflict_test1 DISABLE;
+ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_test1;
+
+-- should return NULL, meaning the subscription was dropped successfully
+SELECT subname FROM pg_subscription WHERE subname = 'regress_conflict_test1';
+
+-- ok - create subscription with conflict_log_table = NONE
+CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = NONE);
+SELECT subname, subconflictlogtable FROM pg_subscription WHERE subname = 'regress_conflict_test2';
+-- ok - alter subscription with valid conflict log table name
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = 'public.regress_conflict_log1');
+SELECT subname, subconflictlogtable FROM pg_subscription WHERE subname = 'regress_conflict_test2';
+
+-- ok - pg_relation_is_publishable should return false for conflict log table
+SELECT pg_relation_is_publishable('public.regress_conflict_log1');
+
+-- ok - alter subscription with conflict log table = NONE
+ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_table = NONE);
+
+-- should return NULL, meaning the table was dropped
+SELECT to_regclass('public.regress_conflict_log1');
+SELECT subname, subconflictlogtable FROM pg_subscription WHERE subname = 'regress_conflict_test2';
+
+ALTER SUBSCRIPTION regress_conflict_test1 DISABLE;
+ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_test1;
+
+-- Clean up remaining test subscription
+ALTER SUBSCRIPTION regress_conflict_test2 DISABLE;
+ALTER SUBSCRIPTION regress_conflict_test2 SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_conflict_test2;
+
+-- fail - can not create conflict log table in pg_temp
+CREATE SUBSCRIPTION regress_conflict_test2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = 'pg_temp.regress_conflict_log1');
+
+-- fail - empty string is not allowed for conflict log table name
+CREATE SUBSCRIPTION regress_conflict_test2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_table = '');
+
 RESET SESSION AUTHORIZATION;
+DROP SCHEMA clt;
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
 DROP ROLE regress_subscription_user3;
-- 
2.43.0

