From 18567755d73a92271ffedbed511be1bdf064f7eb Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Fri, 29 May 2026 15:05:15 +0530 Subject: [PATCH v44 1/2] Add configurable conflict log table for Logical Replication MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This patch adds a feature to provide a structured, queryable record of all logical replication conflicts. The current approach of logging conflicts as plain text in the server logs makes it difficult to query, analyze, and use for external monitoring and automation. This patch addresses these limitations by introducing a configurable conflict_log_destination=('log'/'table'/'all') option in the CREATE SUBSCRIPTION command. If the user chooses to enable logging to a table (by selecting 'table' or 'all'), an internal logging table named pg_conflict_log_ is automatically created within a dedicated, system-managed 'pg_conflict' namespace to prevent users from manually dropping or altering it. This also prevents accidental name collisions with user-created tables. This table is linked to the subscription via an internal dependency, ensuring it is automatically dropped when the subscription is removed The per-subscription table model was chosen over a single global log to ensure superior isolation and administrative flexibility by directly aligning table ownership with the subscription’s lifecycle. This approach allows for granular permission management, enabling the subscription owner to perform necessary maintenance tasks like SELECT, DELETE, and TRUNCATE without the security risks or complex Row-Level Security required by a shared global table. The conflict details, including the original and remote tuples, are stored in JSON columns, providing a flexible format to accommodate different table schemas. The log table captures essential attributes such as local and remote transaction IDs, LSNs, commit timestamps, and conflict type, providing a complete record for post-mortem analysis. This feature will make logical replication conflicts easier to monitor and manage, significantly improving the overall resilience and operability of replication setups. The conflict log tables will not be included in a publication, even if the publication is configured to include ALL TABLES or ALL TABLES IN SCHEMA. --- src/backend/catalog/aclchk.c | 50 ++-- src/backend/catalog/catalog.c | 29 +- src/backend/catalog/heap.c | 32 ++- src/backend/catalog/namespace.c | 11 +- src/backend/catalog/pg_publication.c | 14 +- src/backend/catalog/pg_subscription.c | 7 + src/backend/catalog/system_views.sql | 3 +- src/backend/commands/subscriptioncmds.c | 210 +++++++++++++- src/backend/commands/tablecmds.c | 6 +- src/backend/executor/execMain.c | 28 ++ src/backend/replication/logical/conflict.c | 177 ++++++++++++ src/bin/initdb/initdb.c | 6 + src/bin/psql/tab-complete.in.c | 8 +- src/include/catalog/catalog.h | 2 + src/include/catalog/pg_namespace.dat | 3 + src/include/catalog/pg_subscription.h | 11 + src/include/replication/conflict.h | 27 ++ src/test/regress/expected/subscription.out | 308 +++++++++++++++++++++ src/test/regress/sql/subscription.sql | 256 +++++++++++++++++ src/tools/pgindent/typedefs.list | 2 + 20 files changed, 1144 insertions(+), 46 deletions(-) diff --git a/src/backend/catalog/aclchk.c b/src/backend/catalog/aclchk.c index 007ede997c5..5402f391efd 100644 --- a/src/backend/catalog/aclchk.c +++ b/src/backend/catalog/aclchk.c @@ -3337,25 +3337,39 @@ pg_class_aclmask_ext(Oid table_oid, Oid roleid, AclMode mask, classForm = (Form_pg_class) GETSTRUCT(tuple); - /* - * Deny anyone permission to update a system catalog unless - * pg_authid.rolsuper is set. - * - * As of 7.4 we have some updatable system views; those shouldn't be - * protected in this way. Assume the view rules can take care of - * themselves. ACL_USAGE is if we ever have system sequences. - */ - if ((mask & (ACL_INSERT | ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE | ACL_USAGE)) && - IsSystemClass(table_oid, classForm) && - classForm->relkind != RELKIND_VIEW && - !superuser_arg(roleid)) - mask &= ~(ACL_INSERT | ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE | ACL_USAGE); - - /* - * Otherwise, superusers bypass all permission-checking. - */ - if (superuser_arg(roleid)) + if (!superuser_arg(roleid)) + { + if (mask & (ACL_INSERT | ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE | ACL_USAGE)) + { + if (IsConflictLogTableClass(classForm)) + { + /* + * For conflict log tables, allow non-superusers to perform + * DELETE and TRUNCATE for cleanup and maintenance. While still + * restricting INSERT, UPDATE, and USAGE. + */ + mask &= ~(ACL_INSERT | ACL_UPDATE | ACL_USAGE); + } + else if (IsSystemClass(table_oid, classForm) && + classForm->relkind != RELKIND_VIEW) + { + /* + * Deny anyone permission to update a system catalog unless + * pg_authid.rolsuper is set. + * + * As of 7.4 we have some updatable system views; those + * shouldn't be protected in this way. Assume the view rules + * can take care of themselves. ACL_USAGE is if we ever have + * system sequences. + */ + mask &= ~(ACL_INSERT | ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE | + ACL_USAGE); + } + } + } + else { + /* Superusers bypass all permission-checking. */ ReleaseSysCache(tuple); return mask; } diff --git a/src/backend/catalog/catalog.c b/src/backend/catalog/catalog.c index 7be49032934..be8791af875 100644 --- a/src/backend/catalog/catalog.c +++ b/src/backend/catalog/catalog.c @@ -86,7 +86,9 @@ bool IsSystemClass(Oid relid, Form_pg_class reltuple) { /* IsCatalogRelationOid is a bit faster, so test that first */ - return (IsCatalogRelationOid(relid) || IsToastClass(reltuple)); + return (IsCatalogRelationOid(relid) || + IsToastClass(reltuple) || + IsConflictLogTableClass(reltuple)); } /* @@ -230,6 +232,20 @@ IsToastClass(Form_pg_class reltuple) return IsToastNamespace(relnamespace); } +/* + * IsConflictLogTableClass + * True iff pg_class tuple represents a Conflict Log Table. + * + * Does not perform any catalog accesses. + */ +bool +IsConflictLogTableClass(Form_pg_class reltuple) +{ + Oid relnamespace = reltuple->relnamespace; + + return IsConflictLogTableNamespace(relnamespace); +} + /* * IsCatalogNamespace * True iff namespace is pg_catalog. @@ -264,6 +280,17 @@ IsToastNamespace(Oid namespaceId) isTempToastNamespace(namespaceId); } +/* + * IsConflictLogTableNamespace + * True iff namespace is pg_conflict. + * + * Does not perform any catalog accesses. + */ +bool +IsConflictLogTableNamespace(Oid namespaceId) +{ + return namespaceId == PG_CONFLICT_NAMESPACE; +} /* * IsReservedName diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c index 88087654de9..1b4fae22390 100644 --- a/src/backend/catalog/heap.c +++ b/src/backend/catalog/heap.c @@ -305,23 +305,31 @@ heap_create(const char *relname, Assert(OidIsValid(relid)); /* - * Don't allow creating relations in pg_catalog directly, even though it - * is allowed to move user defined relations there. Semantics with search - * paths including pg_catalog are too confusing for now. + * Don't allow creating relations in pg_catalog or pg_conflict directly, + * even though it is allowed to move user defined relations there. Semantics + * with search paths including pg_catalog are too confusing for now. * * But allow creating indexes on relations in pg_catalog even if * allow_system_table_mods = off, upper layers already guarantee it's on a * user defined relation, not a system one. */ - if (!allow_system_table_mods && - ((IsCatalogNamespace(relnamespace) && relkind != RELKIND_INDEX) || - IsToastNamespace(relnamespace)) && - IsNormalProcessingMode()) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("permission denied to create \"%s.%s\"", - get_namespace_name(relnamespace), relname), - errdetail("System catalog modifications are currently disallowed."))); + if (!allow_system_table_mods && IsNormalProcessingMode()) + { + if ((IsCatalogNamespace(relnamespace) && relkind != RELKIND_INDEX) || + IsToastNamespace(relnamespace)) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied to create \"%s.%s\"", + get_namespace_name(relnamespace), relname), + errdetail("System catalog modifications are currently disallowed."))); + + if (IsConflictLogTableNamespace(relnamespace)) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied to create \"%s.%s\"", + get_namespace_name(relnamespace), relname), + errdetail("Conflict schema modifications are currently disallowed."))); + } *relfrozenxid = InvalidTransactionId; *relminmxid = InvalidMultiXactId; diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c index 56b87d878e8..5bd9be3983e 100644 --- a/src/backend/catalog/namespace.c +++ b/src/backend/catalog/namespace.c @@ -3523,9 +3523,8 @@ LookupCreationNamespace(const char *nspname) /* * Common checks on switching namespaces. * - * We complain if either the old or new namespaces is a temporary schema - * (or temporary toast schema), or if either the old or new namespaces is the - * TOAST schema. + * We complain if either the old or new namespaces is a temporary schema, + * temporary toast schema, the TOAST schema, or the conflict schema. */ void CheckSetNamespace(Oid oldNspOid, Oid nspOid) @@ -3541,6 +3540,12 @@ CheckSetNamespace(Oid oldNspOid, Oid nspOid) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot move objects into or out of TOAST schema"))); + + /* similarly for conflict schema */ + if (nspOid == PG_CONFLICT_NAMESPACE || oldNspOid == PG_CONFLICT_NAMESPACE) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot move objects into or out of the conflict schema"))); } /* diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 5c457d9aca8..1ec94c851b2 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -92,6 +92,13 @@ check_publication_add_relation(PublicationRelInfo *pri) errmsg(errormsg, relname), errdetail("This operation is not supported for system tables."))); + /* Can't be conflict log table */ + if (IsConflictLogTableNamespace(RelationGetNamespace(targetrel))) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg(errormsg, relname), + errdetail("This operation is not supported for conflict log tables."))); + /* UNLOGGED and TEMP relations cannot be part of publication. */ if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_TEMP) ereport(ERROR, @@ -113,7 +120,8 @@ static void check_publication_add_schema(Oid schemaid) { /* Can't be system namespace */ - if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid)) + if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid) || + IsConflictLogTableNamespace(schemaid)) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("cannot add schema \"%s\" to publication", @@ -148,7 +156,8 @@ check_publication_add_schema(Oid schemaid) * is really inadequate for that, since the information_schema could be * dropped and reloaded and then it'll be considered publishable. The best * long-term solution may be to add a "relispublishable" bool to pg_class, - * and depend on that instead of OID checks. + * and depend on that instead of OID checks. IsConflictLogTableClass() + * excludes tables in conflict schema. */ static bool is_publishable_class(Oid relid, Form_pg_class reltuple) @@ -157,6 +166,7 @@ is_publishable_class(Oid relid, Form_pg_class reltuple) reltuple->relkind == RELKIND_PARTITIONED_TABLE || reltuple->relkind == RELKIND_SEQUENCE) && !IsCatalogRelationOid(relid) && + !IsConflictLogTableClass(reltuple) && reltuple->relpersistence == RELPERSISTENCE_PERMANENT && relid >= FirstNormalObjectId; } diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 1f1fdc75af6..809818af9ea 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -118,6 +118,7 @@ GetSubscription(Oid subid, bool missing_ok, bool aclcheck) sub->retaindeadtuples = subform->subretaindeadtuples; sub->maxretention = subform->submaxretention; sub->retentionactive = subform->subretentionactive; + sub->conflictlogrelid = subform->subconflictlogrelid; /* Get conninfo */ if (OidIsValid(subform->subserver)) @@ -187,6 +188,12 @@ GetSubscription(Oid subid, bool missing_ok, bool aclcheck) Anum_pg_subscription_suborigin); sub->origin = TextDatumGetCString(datum); + /* Get conflict log destination */ + datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, + tup, + Anum_pg_subscription_subconflictlogdest); + sub->conflictlogdest = TextDatumGetCString(datum); + /* Is the subscription owner a superuser? */ sub->ownersuperuser = superuser_arg(sub->owner); diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 73a1c1c4670..166329551da 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1527,7 +1527,8 @@ GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subdisableonerr, subpasswordrequired, subrunasowner, subfailover, subretaindeadtuples, submaxretention, subretentionactive, - subserver, subslotname, subsynccommit, subpublications, suborigin) + subserver, subconflictlogrelid, subconflictlogdest, + subslotname, subsynccommit, subpublications, suborigin) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 523959ba0ce..5bd663180f1 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -28,6 +28,7 @@ #include "catalog/pg_authid_d.h" #include "catalog/pg_database_d.h" #include "catalog/pg_foreign_server.h" +#include "catalog/pg_namespace.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" @@ -35,6 +36,7 @@ #include "commands/defrem.h" #include "commands/event_trigger.h" #include "commands/subscriptioncmds.h" +#include "commands/tablecmds.h" #include "executor/executor.h" #include "foreign/foreign.h" #include "miscadmin.h" @@ -79,6 +81,7 @@ #define SUBOPT_WAL_RECEIVER_TIMEOUT 0x00010000 #define SUBOPT_LSN 0x00020000 #define SUBOPT_ORIGIN 0x00040000 +#define SUBOPT_CONFLICT_LOG_DEST 0x00080000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -107,6 +110,7 @@ typedef struct SubOpts bool retaindeadtuples; int32 maxretention; char *origin; + ConflictLogDest conflictlogdest; XLogRecPtr lsn; char *wal_receiver_timeout; } SubOpts; @@ -140,7 +144,11 @@ static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); static void CheckAlterSubOption(Subscription *sub, const char *option, bool slot_needs_update, bool isTopLevel); - +static bool alter_sub_conflictlogdestination(Subscription *sub, + ConflictLogDest logdest, + Oid *conflicttablerelid); +static void drop_sub_conflict_log_table(Oid subid, char *subname, + Oid subconflictlogrelid); /* * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands. @@ -196,6 +204,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->maxretention = 0; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); + if (IsSet(supported_opts, SUBOPT_CONFLICT_LOG_DEST)) + opts->conflictlogdest = CONFLICT_LOG_DEST_LOG; /* Parse options */ foreach(lc, stmt_options) @@ -431,6 +441,18 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET, false, 0, false); } + else if (IsSet(supported_opts, SUBOPT_CONFLICT_LOG_DEST) && + strcmp(defel->defname, "conflict_log_destination") == 0) + { + char *val; + + if (IsSet(opts->specified_opts, SUBOPT_CONFLICT_LOG_DEST)) + errorConflictingDefElem(defel, pstate); + + val = defGetString(defel); + opts->conflictlogdest = GetConflictLogDest(val); + opts->specified_opts |= SUBOPT_CONFLICT_LOG_DEST; + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -629,6 +651,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, uint32 supported_opts; SubOpts opts = {0}; AclResult aclresult; + Oid logrelid = InvalidOid; /* * Parse and check options. @@ -643,7 +666,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_MAX_RETENTION_DURATION | - SUBOPT_WAL_RECEIVER_TIMEOUT | SUBOPT_ORIGIN); + SUBOPT_WAL_RECEIVER_TIMEOUT | SUBOPT_ORIGIN | + SUBOPT_CONFLICT_LOG_DEST); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -817,6 +841,20 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_suborigin - 1] = CStringGetTextDatum(opts.origin); + values[Anum_pg_subscription_subconflictlogdest - 1] = + CStringGetTextDatum(ConflictLogDestNames[opts.conflictlogdest]); + + /* + * We create the conflict log table here, if required, so that its relation + * OID can be stored when inserting the pg_subscription tuple below. + */ + if (CONFLICTS_LOGGED_TO_TABLE(opts.conflictlogdest)) + logrelid = create_conflict_log_table(subid, stmt->subname, owner); + + /* Store table OID in the catalog. */ + values[Anum_pg_subscription_subconflictlogrelid - 1] = + ObjectIdGetDatum(logrelid); + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); /* Insert tuple into catalog. */ @@ -837,6 +875,25 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); } + /* + * Establish an internal dependency between conflict log table and + * subscription. + * + * We use DEPENDENCY_INTERNAL to signify that the table's lifecycle is + * strictly tied to the subscription, similar to how a TOAST table relates + * to its main table or a sequence relates to an identity column. + * + * This ensures the conflict log table is automatically reaped during a + * DROP SUBSCRIPTION via performDeletion(). + */ + if (OidIsValid(logrelid)) + { + ObjectAddress cltaddr; + + ObjectAddressSet(cltaddr, RelationRelationId, logrelid); + recordDependencyOn(&cltaddr, &myself, DEPENDENCY_INTERNAL); + } + /* * A replication origin is currently created for all subscriptions, * including those that only contain sequences or are otherwise empty. @@ -1406,6 +1463,71 @@ CheckAlterSubOption(Subscription *sub, const char *option, } } +/* + * alter_sub_conflictlogdestination + * + * When the subscription's 'conflict_log_destination' is changed, update the + * conflict log table if required. + * + * If the new destination no longer requires a conflict log table, the existing + * conflict log table associated with the subscription is removed via internal + * dependency cleanup to prevent orphaned relations. + * + * On success, *conflicttablerelid is set to the OID of the conflict log table + * that was created or validated, or to InvalidOid if no table is required. + * + * Returns true if the subscription's conflict log table reference must be + * updated as a result of the destination change; false otherwise. + */ +static bool +alter_sub_conflictlogdestination(Subscription *sub, ConflictLogDest logdest, + Oid *conflicttablerelid) +{ + ConflictLogDest old_dest = GetConflictLogDest(sub->conflictlogdest); + bool want_table; + bool has_oldtable; + bool update_relid = false; + Oid relid = InvalidOid; + + want_table = CONFLICTS_LOGGED_TO_TABLE(logdest); + has_oldtable = CONFLICTS_LOGGED_TO_TABLE(old_dest); + + if (has_oldtable) + { + /* There is a conflict log table already. */ + if (!want_table) + { + drop_sub_conflict_log_table(sub->oid, sub->name, + sub->conflictlogrelid); + update_relid = true; + } + } + else + { + /* There was no previous conflict log table. */ + if (want_table) + { + ObjectAddress cltaddr; + ObjectAddress subobj; + + relid = create_conflict_log_table(sub->oid, sub->name, sub->owner); + update_relid = true; + + /* + * Establish an internal dependency between the conflict log table + * and the subscription. For details refer comments in + * CreateSubscription function. + */ + ObjectAddressSet(cltaddr, RelationRelationId, relid); + ObjectAddressSet(subobj, SubscriptionRelationId, sub->oid); + recordDependencyOn(&cltaddr, &subobj, DEPENDENCY_INTERNAL); + } + } + + *conflicttablerelid = relid; + return update_relid; +} + /* * Alter the existing subscription. */ @@ -1501,7 +1623,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_MAX_RETENTION_DURATION | SUBOPT_WAL_RECEIVER_TIMEOUT | - SUBOPT_ORIGIN); + SUBOPT_ORIGIN | + SUBOPT_CONFLICT_LOG_DEST); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1763,6 +1886,33 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_subwalrcvtimeout - 1] = true; } + if (IsSet(opts.specified_opts, SUBOPT_CONFLICT_LOG_DEST)) + { + ConflictLogDest old_dest = + GetConflictLogDest(sub->conflictlogdest); + + if (opts.conflictlogdest != old_dest) + { + bool update_relid; + Oid relid = InvalidOid; + + values[Anum_pg_subscription_subconflictlogdest - 1] = + CStringGetTextDatum(ConflictLogDestNames[opts.conflictlogdest]); + replaces[Anum_pg_subscription_subconflictlogdest - 1] = true; + + update_relid = alter_sub_conflictlogdestination(sub, + opts.conflictlogdest, + &relid); + if (update_relid) + { + values[Anum_pg_subscription_subconflictlogrelid - 1] = + ObjectIdGetDatum(relid); + replaces[Anum_pg_subscription_subconflictlogrelid - 1] = + true; + } + } + } + update_tuple = true; break; } @@ -2178,6 +2328,51 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, return myself; } +/* + * Drop subscription's conflict log table + * + * The conflict log table is registered as an internal dependency of the + * subscription. This function removes the dependency by performing a + * cascading deletion on the subscription object, which in turn drops the + * associated conflict log table. + * + * This is used to clean up conflict log tables that are no longer required, + * preventing accumulation of stale or orphaned relations. + * + * NOTE: + * Only conflict log tables are currently managed via this internal dependency + * mechanism. + */ +static void +drop_sub_conflict_log_table(Oid subid, char *subname, Oid subconflictlogrelid) +{ + /* Drop any dependent conflict log table */ + if (OidIsValid(subconflictlogrelid)) + { + ObjectAddress object; + char *conflictrelname; + + conflictrelname = get_rel_name(subconflictlogrelid); + if (conflictrelname == NULL) + elog(ERROR, "cache lookup failed for index %u", + subconflictlogrelid); + + /* + * By using PERFORM_DELETION_SKIP_ORIGINAL, we ensure that only the + * conflict log table is deleted while the subscription remains. + */ + ObjectAddressSet(object, SubscriptionRelationId, subid); + performDeletion(&object, DROP_CASCADE, + PERFORM_DELETION_INTERNAL | + PERFORM_DELETION_SKIP_ORIGINAL); + + ereport(NOTICE, + errmsg("dropped conflict log table \"%s\" for subscription \"%s\"", + get_qualified_objname(PG_CONFLICT_NAMESPACE, conflictrelname), + subname)); + } +} + /* * Drop a subscription */ @@ -2189,6 +2384,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) HeapTuple tup; Oid subid; Oid subowner; + Oid subconflictlogrelid; Datum datum; bool isnull; char *subname; @@ -2234,6 +2430,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) form = (Form_pg_subscription) GETSTRUCT(tup); subid = form->oid; subowner = form->subowner; + subconflictlogrelid = form->subconflictlogrelid; must_use_password = !superuser_arg(subowner) && form->subpasswordrequired; /* must be owner */ @@ -2388,6 +2585,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) deleteDependencyRecordsFor(SubscriptionRelationId, subid, false); deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0); + drop_sub_conflict_log_table(subid, subname, subconflictlogrelid); + /* Remove any associated relation synchronization states. */ RemoveSubscriptionRel(subid, InvalidOid); @@ -2621,6 +2820,11 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId) form->subowner = newOwnerId; CatalogTupleUpdate(rel, &tup->t_self, tup); + /* Update owner of the conflict log table if it exists. */ + if (OidIsValid(form->subconflictlogrelid)) + ATExecChangeOwner(form->subconflictlogrelid, newOwnerId, true, + AccessExclusiveLock); + /* Update owner dependency reference */ changeDependencyOnOwner(SubscriptionRelationId, form->oid, diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index a1845240a98..0a0ca2b850e 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -2457,9 +2457,11 @@ truncate_check_rel(Oid relid, Form_pg_class reltuple) * pg_largeobject and pg_largeobject_metadata to be truncated as part of * pg_upgrade, because we need to change its relfilenode to match the old * cluster, and allowing a TRUNCATE command to be executed is the easiest - * way of doing that. + * way of doing that. We also allow TRUNCATE on the conflict log tables, + * to permit users to manually prune conflict data to manage disk space. */ - if (!allowSystemTableMods && IsSystemClass(relid, reltuple) + if (!allowSystemTableMods && IsSystemClass(relid, reltuple) && + !IsConflictLogTableClass(reltuple) && (!IsBinaryUpgrade || (relid != LargeObjectRelationId && relid != LargeObjectMetadataRelationId))) diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 4b30f768680..f87ef65f343 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -1187,6 +1187,24 @@ CheckValidResultRel(ResultRelInfo *resultRelInfo, CmdType operation, RelationGetRelationName(resultRel)))); break; } + + /* + * Conflict log tables are managed by the system to record logical + * replication conflicts. We allow DELETE and TRUNCATE to permit users to + * manually prune these logs, but manual data insertion or modification + * (INSERT, UPDATE, MERGE) is prohibited to maintain the integrity of the + * system-generated logs. + * + * Since TRUNCATE is handled as a separate utility command, we only need + * to explicitly permit CMD_DELETE here. + */ + if (IsConflictLogTableNamespace(RelationGetNamespace(resultRel)) && + operation != CMD_DELETE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot modify or insert data into conflict log table \"%s\"", + RelationGetRelationName(resultRel)), + errdetail("Conflict log tables are system-managed and only support cleanup via DELETE or TRUNCATE."))); } /* @@ -1258,6 +1276,16 @@ CheckValidRowMarkRel(Relation rel, RowMarkType markType) RelationGetRelationName(rel)))); break; } + + /* + * Conflict log tables are managed by the system to record logical + * replication conflicts. + */ + if (IsConflictLogTableNamespace(RelationGetNamespace(rel))) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot lock rows in the conflict log table \"%s\"", + RelationGetRelationName(rel)))); } /* diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index 1f8d67fdd90..65a325194ed 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -17,6 +17,12 @@ #include "access/commit_ts.h" #include "access/genam.h" #include "access/tableam.h" +#include "catalog/dependency.h" +#include "catalog/heap.h" +#include "catalog/pg_am.h" +#include "catalog/pg_namespace.h" +#include "catalog/toasting.h" +#include "commands/subscriptioncmds.h" #include "executor/executor.h" #include "pgstat.h" #include "replication/conflict.h" @@ -24,6 +30,50 @@ #include "storage/lmgr.h" #include "utils/lsyscache.h" +/* + * String representations for the supported conflict logging destinations. + */ +const char *const ConflictLogDestNames[] = { + [CONFLICT_LOG_DEST_LOG] = "log", + [CONFLICT_LOG_DEST_TABLE] = "table", + [CONFLICT_LOG_DEST_ALL] = "all" +}; + +StaticAssertDecl(lengthof(ConflictLogDestNames) == CONFLICT_LOG_DEST_ALL + 1, + "ConflictLogDestNames length mismatch"); + + +/* Structure to hold metadata for one column of the conflict log table */ +typedef struct ConflictLogColumnDef +{ + const char *attname; /* Column name */ + Oid atttypid; /* Data type OID */ +} ConflictLogColumnDef; + +/* + * Schema definition for conflict log tables. + * + * Defines the fixed schema of the per-subscription conflict log table created + * in the pg_conflict namespace. Each entry specifies the column name and its + * type OID; the table is created in this column order by + * create_conflict_log_table(). + */ +static const ConflictLogColumnDef ConflictLogSchema[] = { + { .attname = "relid", .atttypid = OIDOID }, + { .attname = "schemaname", .atttypid = TEXTOID }, + { .attname = "relname", .atttypid = TEXTOID }, + { .attname = "conflict_type", .atttypid = TEXTOID }, + { .attname = "remote_xid", .atttypid = XIDOID }, + { .attname = "remote_commit_lsn",.atttypid = LSNOID }, + { .attname = "remote_commit_ts", .atttypid = TIMESTAMPTZOID }, + { .attname = "remote_origin", .atttypid = TEXTOID }, + { .attname = "remote_tuple", .atttypid = JSONOID }, + { .attname = "replica_identity", .atttypid = JSONOID }, + { .attname = "local_conflicts", .atttypid = JSONARRAYOID } +}; + +#define NUM_CONFLICT_ATTRS lengthof(ConflictLogSchema) + static const char *const ConflictTypeNames[] = { [CT_INSERT_EXISTS] = "insert_exists", [CT_UPDATE_ORIGIN_DIFFERS] = "update_origin_differs", @@ -54,6 +104,133 @@ static void get_tuple_desc(EState *estate, ResultRelInfo *relinfo, static char *build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot, Oid indexoid); +/* + * Builds the TupleDesc for the conflict log table. + */ +static TupleDesc +create_conflict_log_table_tupdesc(void) +{ + TupleDesc tupdesc; + + tupdesc = CreateTemplateTupleDesc(NUM_CONFLICT_ATTRS); + + for (int i = 0; i < NUM_CONFLICT_ATTRS; i++) + TupleDescInitEntry(tupdesc, i + 1, + ConflictLogSchema[i].attname, + ConflictLogSchema[i].atttypid, + -1, 0); + + TupleDescFinalize(tupdesc); + + return tupdesc; +} + +/* + * Create a structured conflict log table for a subscription. + * + * The table is created within the system-managed 'pg_conflict' namespace to + * prevent users from manually dropping or altering it. This also prevents + * accidental name collisions with user-created tables with the same name. + * + * The table name is generated automatically using the subscription's OID + * (e.g., "pg_conflict_log_") to ensure uniqueness within the + * cluster and to avoid collisions during subscription renames. + */ +Oid +create_conflict_log_table(Oid subid, char *subname, Oid subowner) +{ + TupleDesc tupdesc; + Oid relid; + char relname[NAMEDATALEN]; + + snprintf(relname, NAMEDATALEN, "pg_conflict_log_%u", subid); + + /* + * Check for an existing table with the same name in the pg_conflict namespace. + * A collision should not occur under normal operation, but we must handle cases + * where a table has been created manually when allow_system_tables_mods is + * ON. + */ + if (OidIsValid(get_relname_relid(relname, PG_CONFLICT_NAMESPACE))) + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_TABLE), + errmsg("conflict log table pg_conflict.\"%s\" already exists", relname), + errhint("To proceed, drop the existing table and retry."))); + + /* Build the tuple descriptor for the new table. */ + tupdesc = create_conflict_log_table_tupdesc(); + + /* Create conflict log table. */ + relid = heap_create_with_catalog(relname, + PG_CONFLICT_NAMESPACE, + 0, /* tablespace */ + InvalidOid, /* relid */ + InvalidOid, /* reltypeid */ + InvalidOid, /* reloftypeid */ + subowner, + HEAP_TABLE_AM_OID, + tupdesc, + NIL, + RELKIND_RELATION, + RELPERSISTENCE_PERMANENT, + false, /* shared_relation */ + false, /* mapped_relation */ + ONCOMMIT_NOOP, + (Datum) 0, /* reloptions */ + false, /* use_user_acl */ + true, /* allow_system_table_mods */ + true, /* is_internal */ + InvalidOid, /* relrewrite */ + NULL); /* typaddress */ + Assert(OidIsValid(relid)); + + /* Release tuple descriptor memory. */ + FreeTupleDesc(tupdesc); + + /* + * We must bump the command counter to make the newly-created relation + * tuple visible for opening. + */ + CommandCounterIncrement(); + + /* + * Create a TOAST table for the conflict log to support out-of-line storage + * of large JSON data. + */ + NewRelationCreateToastTable(relid, (Datum) 0); + + ereport(NOTICE, + (errmsg("created conflict log table \"%s\" for subscription \"%s\"", + get_qualified_objname(PG_CONFLICT_NAMESPACE, relname), + subname))); + + return relid; +} + +/* + * Convert the string representation of a conflict logging destination to its + * corresponding enum value. + */ +ConflictLogDest +GetConflictLogDest(const char *dest) +{ + /* Empty string or NULL defaults to LOG. */ + if (dest == NULL || dest[0] == '\0' || pg_strcasecmp(dest, "log") == 0) + return CONFLICT_LOG_DEST_LOG; + + if (pg_strcasecmp(dest, "table") == 0) + return CONFLICT_LOG_DEST_TABLE; + + if (pg_strcasecmp(dest, "all") == 0) + return CONFLICT_LOG_DEST_ALL; + + /* Unrecognized string. */ + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("unrecognized conflict_log_destination value: \"%s\"", dest), + errhint("Valid values are \"log\", \"table\", and \"all\"."))); +} + /* * Get the xmin and commit timestamp data (origin and timestamp) associated * with the provided local row. diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c index 14cb79c26be..803ca4112d4 100644 --- a/src/bin/initdb/initdb.c +++ b/src/bin/initdb/initdb.c @@ -1839,6 +1839,12 @@ setup_privileges(FILE *cmdfd) " AND relacl IS NULL;\n\n", escape_quotes(username)); PG_CMD_PUTS("GRANT USAGE ON SCHEMA pg_catalog, public TO PUBLIC;\n\n"); + + /* + * Allow non-superuser subscription owners to access their associated + * conflict log tables in the pg_conflict schema. + */ + PG_CMD_PUTS("GRANT USAGE ON SCHEMA pg_conflict TO PUBLIC;\n\n"); PG_CMD_PUTS("REVOKE ALL ON pg_largeobject FROM PUBLIC;\n\n"); PG_CMD_PUTS("INSERT INTO pg_init_privs " " (objoid, classoid, objsubid, initprivs, privtype)" diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c index de547a8cb37..66ee1b6c2a4 100644 --- a/src/bin/psql/tab-complete.in.c +++ b/src/bin/psql/tab-complete.in.c @@ -2376,8 +2376,8 @@ match_previous_words(int pattern_id, COMPLETE_WITH("(", "PUBLICATION"); /* ALTER SUBSCRIPTION SET ( */ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "SET", "(")) - COMPLETE_WITH("binary", "disable_on_error", "failover", - "max_retention_duration", "origin", + COMPLETE_WITH("binary", "conflict_log_destination", "disable_on_error", + "failover", "max_retention_duration", "origin", "password_required", "retain_dead_tuples", "run_as_owner", "slot_name", "streaming", "synchronous_commit", "two_phase"); @@ -3960,8 +3960,8 @@ match_previous_words(int pattern_id, COMPLETE_WITH("WITH ("); /* Complete "CREATE SUBSCRIPTION ... WITH ( " */ else if (Matches("CREATE", "SUBSCRIPTION", MatchAnyN, "WITH", "(")) - COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", - "disable_on_error", "enabled", "failover", + COMPLETE_WITH("binary", "conflict_log_destination", "connect", "copy_data", + "create_slot", "disable_on_error", "enabled", "failover", "max_retention_duration", "origin", "password_required", "retain_dead_tuples", "run_as_owner", "slot_name", "streaming", diff --git a/src/include/catalog/catalog.h b/src/include/catalog/catalog.h index a9d6e8ea986..feab3982cf5 100644 --- a/src/include/catalog/catalog.h +++ b/src/include/catalog/catalog.h @@ -25,6 +25,7 @@ extern bool IsInplaceUpdateRelation(Relation relation); extern bool IsSystemClass(Oid relid, Form_pg_class reltuple); extern bool IsToastClass(Form_pg_class reltuple); +extern bool IsConflictLogTableClass(Form_pg_class reltuple); extern bool IsCatalogRelationOid(Oid relid); extern bool IsCatalogTextUniqueIndexOid(Oid relid); @@ -32,6 +33,7 @@ extern bool IsInplaceUpdateOid(Oid relid); extern bool IsCatalogNamespace(Oid namespaceId); extern bool IsToastNamespace(Oid namespaceId); +extern bool IsConflictLogTableNamespace(Oid namespaceId); extern bool IsReservedName(const char *name); diff --git a/src/include/catalog/pg_namespace.dat b/src/include/catalog/pg_namespace.dat index 3075e142c73..b45cb9383a8 100644 --- a/src/include/catalog/pg_namespace.dat +++ b/src/include/catalog/pg_namespace.dat @@ -18,6 +18,9 @@ { oid => '99', oid_symbol => 'PG_TOAST_NAMESPACE', descr => 'reserved schema for TOAST tables', nspname => 'pg_toast', nspacl => '_null_' }, +{ oid => '1382', oid_symbol => 'PG_CONFLICT_NAMESPACE', + descr => 'reserved schema for subscription-specific conflict log tables', + nspname => 'pg_conflict', nspacl => '_null_' }, # update dumpNamespace() if changing this descr { oid => '2200', oid_symbol => 'PG_PUBLIC_NAMESPACE', descr => 'standard public schema', diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index a6a2ad1e49c..89d2300abe1 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -95,7 +95,16 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW Oid subserver BKI_LOOKUP_OPT(pg_foreign_server); /* If connection uses * server */ + Oid subconflictlogrelid; /* Relid of the conflict log table. */ #ifdef CATALOG_VARLEN /* variable-length fields start here */ + /* + * Strategy for logging replication conflicts: + * 'log' - server log only, + * 'table' - conflict log table only, + * 'all' - both log and table. + */ + text subconflictlogdest BKI_FORCE_NOT_NULL; + /* Connection string to the publisher */ text subconninfo; /* Set if connecting with connection string */ @@ -164,6 +173,7 @@ typedef struct Subscription * and the retention duration has not * exceeded max_retention_duration, when * defined */ + Oid conflictlogrelid; /* conflict log table Oid */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ @@ -171,6 +181,7 @@ typedef struct Subscription List *publications; /* List of publication names to subscribe to */ char *origin; /* Only publish data originating from the * specified origin */ + char *conflictlogdest; /* Conflict log destination */ } Subscription; #ifdef EXPOSE_TO_CLIENT_CODE diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h index 2d9dbcf4d0d..a017e1e6cb5 100644 --- a/src/include/replication/conflict.h +++ b/src/include/replication/conflict.h @@ -10,6 +10,7 @@ #define CONFLICT_H #include "access/xlogdefs.h" +#include "catalog/pg_type.h" #include "datatype/timestamp.h" #include "nodes/pg_list.h" @@ -79,6 +80,32 @@ typedef struct ConflictTupleInfo * conflicting local row occurred */ } ConflictTupleInfo; +/* + * Defines where logical replication conflict details are recorded. + * + * While stored as a text-based array/string in + * pg_subscription.subconflictlogdest for user readability and extensibility, + * we map these to an internal enum to allow for efficient checks. + */ +typedef enum ConflictLogDest +{ + CONFLICT_LOG_DEST_LOG = 0, /* Emit to server logs */ + CONFLICT_LOG_DEST_TABLE, /* Insert into the conflict log table */ + CONFLICT_LOG_DEST_ALL /* Both log and table */ +} ConflictLogDest; + +#define CONFLICTS_LOGGED_TO_TABLE(dest) \ + ((dest == CONFLICT_LOG_DEST_TABLE) || (dest == CONFLICT_LOG_DEST_ALL)) +#define CONFLICTS_LOGGED_TO_LOG(dest) \ + ((dest == CONFLICT_LOG_DEST_LOG) || (dest == CONFLICT_LOG_DEST_ALL)) + +/* + * Array mapping for converting internal enum to string. + */ +extern PGDLLIMPORT const char *const ConflictLogDestNames[]; + +extern Oid create_conflict_log_table(Oid subid, char *subname, Oid subowner); +extern ConflictLogDest GetConflictLogDest(const char *dest); extern bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, ReplOriginId *localorigin, diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 7e3cabdb93f..861bccb5f37 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -576,6 +576,314 @@ COMMIT; -- ok, owning it is enough for this stuff ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; +-- +-- CONFLICT LOG DESTINATION TESTS +-- +SET SESSION AUTHORIZATION 'regress_subscription_user'; +SET client_min_messages = WARNING; +-- fail - unrecognized parameter value +CREATE SUBSCRIPTION regress_conflict_fail CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'invalid'); +ERROR: unrecognized conflict_log_destination value: "invalid" +HINT: Valid values are "log", "table", and "all". +-- verify subconflictlogdest is 'log' and subconflictlogrelid is 0 (InvalidOid) for default case +CREATE SUBSCRIPTION regress_conflict_log_default CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. +SELECT subname, subconflictlogdest, subconflictlogrelid +FROM pg_subscription WHERE subname = 'regress_conflict_log_default'; + subname | subconflictlogdest | subconflictlogrelid +------------------------------+--------------------+--------------------- + regress_conflict_log_default | log | 0 +(1 row) + +-- verify empty string defaults to 'log' +CREATE SUBSCRIPTION regress_conflict_empty_str CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = ''); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. +SELECT subname, subconflictlogdest, subconflictlogrelid +FROM pg_subscription WHERE subname = 'regress_conflict_empty_str'; + subname | subconflictlogdest | subconflictlogrelid +----------------------------+--------------------+--------------------- + regress_conflict_empty_str | log | 0 +(1 row) + +-- this should generate an conflict log table named pg_conflict_log_$subid$ +CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'table'); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. +-- check metadata in pg_subscription: destination should be 'table' and subconflictlogrelid valid +SELECT subname, subconflictlogdest, subconflictlogrelid > 0 AS has_relid +FROM pg_subscription WHERE subname = 'regress_conflict_test1'; + subname | subconflictlogdest | has_relid +------------------------+--------------------+----------- + regress_conflict_test1 | table | t +(1 row) + +-- verify the physical table exists, its OID matches subconflictlogrelid, +-- and it is located in the 'pg_conflict' namespace +SELECT n.nspname, (c.oid = s.subconflictlogrelid) AS "oid_matches" +FROM pg_class c +JOIN pg_subscription s ON c.relname = 'pg_conflict_log_' || s.oid +JOIN pg_namespace n ON c.relnamespace = n.oid +WHERE s.subname = 'regress_conflict_test1'; + nspname | oid_matches +-------------+------------- + pg_conflict | t +(1 row) + +-- check if the conflict log table has the correct schema +SELECT a.attnum, a.attname +FROM pg_attribute a +JOIN pg_class c ON a.attrelid = c.oid +JOIN pg_subscription s ON c.relname = 'pg_conflict_log_' || s.oid +WHERE s.subname = 'regress_conflict_test1' AND a.attnum > 0 + ORDER BY a.attnum; + attnum | attname +--------+------------------- + 1 | relid + 2 | schemaname + 3 | relname + 4 | conflict_type + 5 | remote_xid + 6 | remote_commit_lsn + 7 | remote_commit_ts + 8 | remote_origin + 9 | remote_tuple + 10 | replica_identity + 11 | local_conflicts +(11 rows) + +-- Changing the subscription owner should also update the owner +-- of the associated conflict log table. +ALTER SUBSCRIPTION regress_conflict_test1 OWNER TO regress_subscription_user2; +SELECT pg_catalog.pg_get_userbyid(c.relowner) AS owner +FROM pg_catalog.pg_class c +JOIN pg_catalog.pg_subscription s + ON c.relname = 'pg_conflict_log_' || s.oid +WHERE s.subname = 'regress_conflict_test1'; + owner +---------------------------- + regress_subscription_user2 +(1 row) + +-- Verify that a non-superuser subscription owner can truncate, +-- delete from, and select from the associated conflict log table. +SET ROLE 'regress_subscription_user2'; +SELECT format('%I.%I', n.nspname, c.relname) AS conflict_log_table +FROM pg_catalog.pg_class c +JOIN pg_catalog.pg_namespace n + ON n.oid = c.relnamespace +JOIN pg_catalog.pg_subscription s + ON c.relname = 'pg_conflict_log_' || s.oid +WHERE s.subname = 'regress_conflict_test1' +\gset +TRUNCATE TABLE :conflict_log_table; +DELETE FROM :conflict_log_table; +SELECT COUNT(*) FROM :conflict_log_table; + count +------- + 0 +(1 row) + +RESET ROLE; +-- Restore the original subscription owner. +ALTER SUBSCRIPTION regress_conflict_test1 OWNER TO regress_subscription_user; +-- +-- ALTER SUBSCRIPTION - conflict_log_destination state transitions +-- +-- These tests verify the transition logic between different logging +-- destinations, ensuring conflict log tables are created or dropped as +-- expected +-- +-- transition from 'log' to 'all' +-- a new conflict log table should be created +CREATE SUBSCRIPTION regress_conflict_test2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'log'); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'all'); +-- verify metadata after ALTER (destination should be 'all') +SELECT subname, subconflictlogdest, subconflictlogrelid > 0 AS has_relid +FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + subname | subconflictlogdest | has_relid +------------------------+--------------------+----------- + regress_conflict_test2 | all | t +(1 row) + +-- transition from 'all' to 'table' +-- should NOT drop the table, only change destination string +SELECT subconflictlogrelid AS old_relid FROM pg_subscription WHERE subname = 'regress_conflict_test2' \gset +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'table'); +SELECT subconflictlogdest, subconflictlogrelid = :old_relid AS relid_unchanged +FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + subconflictlogdest | relid_unchanged +--------------------+----------------- + table | t +(1 row) + +-- transition from 'table' to 'log' +-- should drop the table and clear subconflictlogrelid +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'log'); +SELECT subconflictlogdest, subconflictlogrelid +FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + subconflictlogdest | subconflictlogrelid +--------------------+--------------------- + log | 0 +(1 row) + +-- verify the physical table is gone +SELECT count(*) +FROM pg_class c +JOIN pg_subscription s ON c.relname = 'pg_conflict_log_' || s.oid +WHERE s.subname = 'regress_conflict_test2'; + count +------- + 0 +(1 row) + +-- +-- PUBLICATION: Verify conflict log tables are not publishable +-- +-- pg_relation_is_publishable should return false for conflict log tables to +-- prevent them from being accidentally included in publications +-- +SELECT n.nspname, pg_relation_is_publishable(c.oid) +FROM pg_class c +JOIN pg_namespace n ON c.relnamespace = n.oid +JOIN pg_subscription s ON s.subconflictlogrelid = c.oid +WHERE s.subname = 'regress_conflict_test1'; + nspname | pg_relation_is_publishable +-------------+---------------------------- + pg_conflict | f +(1 row) + +-- +-- Table Protection and Lifecycle Management +-- +-- These tests verify that: +-- Manual DROP TABLE is disallowed +-- DROP SUBSCRIPTION automatically reaps the table +-- +-- re-enable table logging for verification +ALTER SUBSCRIPTION regress_conflict_test1 SET (conflict_log_destination = 'table'); +-- We use a DO block with dynamic SQL because the conflict log table +-- name contains the subscription OID, which is non-deterministic. This +-- approach allows us to attempt the DROP and capture the expected error +-- without hard-coding a specific OID in the expected output +-- fail - drop table not allowed due to internal dependency +SET client_min_messages = NOTICE; +DO $$ +BEGIN + EXECUTE 'DROP TABLE ' || (SELECT 'pg_conflict.pg_conflict_log_' || oid FROM pg_subscription WHERE subname = 'regress_conflict_test1'); +EXCEPTION WHEN insufficient_privilege THEN + RAISE NOTICE 'captured expected error: insufficient_privilege'; +END $$; +NOTICE: captured expected error: insufficient_privilege +-- CLEANUP: DROP SUBSCRIPTION reaps the table +ALTER SUBSCRIPTION regress_conflict_test1 DISABLE; +ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE); +-- Verify the table OID for reap check +SELECT 'pg_conflict.pg_conflict_log_' || oid AS internal_tablename FROM pg_subscription WHERE subname = 'regress_conflict_test1' \gset +SET client_min_messages = WARNING; +DROP SUBSCRIPTION regress_conflict_test1; +-- should return NULL, meaning the conflict log table was reaped via dependency +SELECT to_regclass(:'internal_tablename'); + to_regclass +------------- + +(1 row) + +-- +-- Additional Namespace and Table Protection Tests +-- +-- Setup: Ensure we have a subscription with a conflict log table +CREATE SUBSCRIPTION regress_conflict_protection_test CONNECTION 'dbname=regress_doesnotexist' + PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'table'); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. +-- Trying to ALTER the conflict log table +-- This should fail because the table is system-managed +-- As mentioned in previous test cases, we use a DO block to hide dynamic OIDs +SET client_min_messages = NOTICE; +DO $$ +DECLARE + tab_name text; +BEGIN + SELECT 'pg_conflict.' || relname INTO tab_name + FROM pg_class c JOIN pg_subscription s ON c.relname = 'pg_conflict_log_' || s.oid + WHERE s.subname = 'regress_conflict_protection_test'; + + RAISE NOTICE 'Attempting ALTER TABLE on conflict log table'; + EXECUTE 'ALTER TABLE ' || tab_name || ' ADD COLUMN extra_info text'; +EXCEPTION WHEN insufficient_privilege THEN + RAISE NOTICE 'captured expected error: insufficient_privilege during ALTER'; +END $$; +NOTICE: Attempting ALTER TABLE on conflict log table +NOTICE: captured expected error: insufficient_privilege during ALTER +-- Test Manual INSERT on conflict log table +-- This should fail because the table is system-managed +-- Hiding the OID in the error message by catching the exception +DO $$ +DECLARE + tab_name text; +BEGIN + SELECT 'pg_conflict.' || relname INTO tab_name + FROM pg_class c JOIN pg_subscription s ON c.relname = 'pg_conflict_log_' || s.oid + WHERE s.subname = 'regress_conflict_protection_test'; + + EXECUTE 'INSERT INTO ' || tab_name || ' (relname) VALUES (''mytest'')'; +EXCEPTION WHEN wrong_object_type THEN + RAISE NOTICE 'captured expected error: wrong_object_type during INSERT'; +END $$; +NOTICE: captured expected error: wrong_object_type during INSERT +-- Test Manual UPDATE on conflict log table +-- This should fail because the table is system-managed +-- Hiding the OID in the error message by catching the exception +DO $$ +DECLARE + tab_name text; +BEGIN + SELECT 'pg_conflict.' || relname INTO tab_name + FROM pg_class c JOIN pg_subscription s ON c.relname = 'pg_conflict_log_' || s.oid + WHERE s.subname = 'regress_conflict_protection_test'; + + EXECUTE 'UPDATE ' || tab_name || ' SET relname = ''mytest'' '; +EXCEPTION WHEN wrong_object_type THEN + RAISE NOTICE 'captured expected error: wrong_object_type during UPDATE'; +END $$; +NOTICE: captured expected error: wrong_object_type during UPDATE +-- Trying to perform TRUNCATE/DELETE on the conflict log table +-- This should be allowed so that user can perform cleanup +SELECT 'pg_conflict.' || relname AS conflict_tab +FROM pg_class c +JOIN pg_subscription s ON c.relname = 'pg_conflict_log_' || s.oid +WHERE s.subname = 'regress_conflict_protection_test' \gset +TRUNCATE :conflict_tab; +DELETE FROM :conflict_tab; +-- Trying to create a new table manually in the pg_conflict namespace +-- This should fail as the namespace is reserved for conflict logs tables +CREATE TABLE pg_conflict.manual_table (id int); +ERROR: permission denied to create "pg_conflict.manual_table" +DETAIL: Conflict schema modifications are currently disallowed. +-- Moving an existing table into the pg_conflict namespace +-- Users should not be able to move their own tables within this namespace +CREATE TABLE public.test_move (id int); +ALTER TABLE public.test_move SET SCHEMA pg_conflict; +ERROR: cannot move objects into or out of the conflict schema +DROP TABLE public.test_move; +SET client_min_messages = WARNING; +-- Clean up remaining test subscription +ALTER SUBSCRIPTION regress_conflict_log_default DISABLE; +ALTER SUBSCRIPTION regress_conflict_log_default SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_log_default; +ALTER SUBSCRIPTION regress_conflict_empty_str DISABLE; +ALTER SUBSCRIPTION regress_conflict_empty_str SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_empty_str; +ALTER SUBSCRIPTION regress_conflict_test2 DISABLE; +ALTER SUBSCRIPTION regress_conflict_test2 SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_test2; +ALTER SUBSCRIPTION regress_conflict_protection_test DISABLE; +ALTER SUBSCRIPTION regress_conflict_protection_test SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_protection_test; RESET SESSION AUTHORIZATION; DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 6c3d9632e8a..f5ffd00adc3 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -431,6 +431,262 @@ COMMIT; ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; +-- +-- CONFLICT LOG DESTINATION TESTS +-- + +SET SESSION AUTHORIZATION 'regress_subscription_user'; + +SET client_min_messages = WARNING; + +-- fail - unrecognized parameter value +CREATE SUBSCRIPTION regress_conflict_fail CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'invalid'); + +-- verify subconflictlogdest is 'log' and subconflictlogrelid is 0 (InvalidOid) for default case +CREATE SUBSCRIPTION regress_conflict_log_default CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false); +SELECT subname, subconflictlogdest, subconflictlogrelid +FROM pg_subscription WHERE subname = 'regress_conflict_log_default'; + +-- verify empty string defaults to 'log' +CREATE SUBSCRIPTION regress_conflict_empty_str CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = ''); +SELECT subname, subconflictlogdest, subconflictlogrelid +FROM pg_subscription WHERE subname = 'regress_conflict_empty_str'; + +-- this should generate an conflict log table named pg_conflict_log_$subid$ +CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'table'); + +-- check metadata in pg_subscription: destination should be 'table' and subconflictlogrelid valid +SELECT subname, subconflictlogdest, subconflictlogrelid > 0 AS has_relid +FROM pg_subscription WHERE subname = 'regress_conflict_test1'; + +-- verify the physical table exists, its OID matches subconflictlogrelid, +-- and it is located in the 'pg_conflict' namespace +SELECT n.nspname, (c.oid = s.subconflictlogrelid) AS "oid_matches" +FROM pg_class c +JOIN pg_subscription s ON c.relname = 'pg_conflict_log_' || s.oid +JOIN pg_namespace n ON c.relnamespace = n.oid +WHERE s.subname = 'regress_conflict_test1'; + +-- check if the conflict log table has the correct schema +SELECT a.attnum, a.attname +FROM pg_attribute a +JOIN pg_class c ON a.attrelid = c.oid +JOIN pg_subscription s ON c.relname = 'pg_conflict_log_' || s.oid +WHERE s.subname = 'regress_conflict_test1' AND a.attnum > 0 + ORDER BY a.attnum; + +-- Changing the subscription owner should also update the owner +-- of the associated conflict log table. +ALTER SUBSCRIPTION regress_conflict_test1 OWNER TO regress_subscription_user2; +SELECT pg_catalog.pg_get_userbyid(c.relowner) AS owner +FROM pg_catalog.pg_class c +JOIN pg_catalog.pg_subscription s + ON c.relname = 'pg_conflict_log_' || s.oid +WHERE s.subname = 'regress_conflict_test1'; + +-- Verify that a non-superuser subscription owner can truncate, +-- delete from, and select from the associated conflict log table. +SET ROLE 'regress_subscription_user2'; + +SELECT format('%I.%I', n.nspname, c.relname) AS conflict_log_table +FROM pg_catalog.pg_class c +JOIN pg_catalog.pg_namespace n + ON n.oid = c.relnamespace +JOIN pg_catalog.pg_subscription s + ON c.relname = 'pg_conflict_log_' || s.oid +WHERE s.subname = 'regress_conflict_test1' +\gset + +TRUNCATE TABLE :conflict_log_table; +DELETE FROM :conflict_log_table; +SELECT COUNT(*) FROM :conflict_log_table; + +RESET ROLE; + +-- Restore the original subscription owner. +ALTER SUBSCRIPTION regress_conflict_test1 OWNER TO regress_subscription_user; + +-- +-- ALTER SUBSCRIPTION - conflict_log_destination state transitions +-- +-- These tests verify the transition logic between different logging +-- destinations, ensuring conflict log tables are created or dropped as +-- expected +-- +-- transition from 'log' to 'all' +-- a new conflict log table should be created +CREATE SUBSCRIPTION regress_conflict_test2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'log'); +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'all'); + +-- verify metadata after ALTER (destination should be 'all') +SELECT subname, subconflictlogdest, subconflictlogrelid > 0 AS has_relid +FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + +-- transition from 'all' to 'table' +-- should NOT drop the table, only change destination string +SELECT subconflictlogrelid AS old_relid FROM pg_subscription WHERE subname = 'regress_conflict_test2' \gset +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'table'); +SELECT subconflictlogdest, subconflictlogrelid = :old_relid AS relid_unchanged +FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + +-- transition from 'table' to 'log' +-- should drop the table and clear subconflictlogrelid +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'log'); +SELECT subconflictlogdest, subconflictlogrelid +FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + +-- verify the physical table is gone +SELECT count(*) +FROM pg_class c +JOIN pg_subscription s ON c.relname = 'pg_conflict_log_' || s.oid +WHERE s.subname = 'regress_conflict_test2'; + +-- +-- PUBLICATION: Verify conflict log tables are not publishable +-- +-- pg_relation_is_publishable should return false for conflict log tables to +-- prevent them from being accidentally included in publications +-- +SELECT n.nspname, pg_relation_is_publishable(c.oid) +FROM pg_class c +JOIN pg_namespace n ON c.relnamespace = n.oid +JOIN pg_subscription s ON s.subconflictlogrelid = c.oid +WHERE s.subname = 'regress_conflict_test1'; + +-- +-- Table Protection and Lifecycle Management +-- +-- These tests verify that: +-- Manual DROP TABLE is disallowed +-- DROP SUBSCRIPTION automatically reaps the table +-- +-- re-enable table logging for verification +ALTER SUBSCRIPTION regress_conflict_test1 SET (conflict_log_destination = 'table'); + +-- We use a DO block with dynamic SQL because the conflict log table +-- name contains the subscription OID, which is non-deterministic. This +-- approach allows us to attempt the DROP and capture the expected error +-- without hard-coding a specific OID in the expected output + +-- fail - drop table not allowed due to internal dependency +SET client_min_messages = NOTICE; +DO $$ +BEGIN + EXECUTE 'DROP TABLE ' || (SELECT 'pg_conflict.pg_conflict_log_' || oid FROM pg_subscription WHERE subname = 'regress_conflict_test1'); +EXCEPTION WHEN insufficient_privilege THEN + RAISE NOTICE 'captured expected error: insufficient_privilege'; +END $$; + +-- CLEANUP: DROP SUBSCRIPTION reaps the table +ALTER SUBSCRIPTION regress_conflict_test1 DISABLE; +ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE); + +-- Verify the table OID for reap check +SELECT 'pg_conflict.pg_conflict_log_' || oid AS internal_tablename FROM pg_subscription WHERE subname = 'regress_conflict_test1' \gset + +SET client_min_messages = WARNING; +DROP SUBSCRIPTION regress_conflict_test1; + +-- should return NULL, meaning the conflict log table was reaped via dependency +SELECT to_regclass(:'internal_tablename'); + +-- +-- Additional Namespace and Table Protection Tests +-- + +-- Setup: Ensure we have a subscription with a conflict log table +CREATE SUBSCRIPTION regress_conflict_protection_test CONNECTION 'dbname=regress_doesnotexist' + PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'table'); + +-- Trying to ALTER the conflict log table +-- This should fail because the table is system-managed +-- As mentioned in previous test cases, we use a DO block to hide dynamic OIDs + +SET client_min_messages = NOTICE; +DO $$ +DECLARE + tab_name text; +BEGIN + SELECT 'pg_conflict.' || relname INTO tab_name + FROM pg_class c JOIN pg_subscription s ON c.relname = 'pg_conflict_log_' || s.oid + WHERE s.subname = 'regress_conflict_protection_test'; + + RAISE NOTICE 'Attempting ALTER TABLE on conflict log table'; + EXECUTE 'ALTER TABLE ' || tab_name || ' ADD COLUMN extra_info text'; +EXCEPTION WHEN insufficient_privilege THEN + RAISE NOTICE 'captured expected error: insufficient_privilege during ALTER'; +END $$; + +-- Test Manual INSERT on conflict log table +-- This should fail because the table is system-managed +-- Hiding the OID in the error message by catching the exception +DO $$ +DECLARE + tab_name text; +BEGIN + SELECT 'pg_conflict.' || relname INTO tab_name + FROM pg_class c JOIN pg_subscription s ON c.relname = 'pg_conflict_log_' || s.oid + WHERE s.subname = 'regress_conflict_protection_test'; + + EXECUTE 'INSERT INTO ' || tab_name || ' (relname) VALUES (''mytest'')'; +EXCEPTION WHEN wrong_object_type THEN + RAISE NOTICE 'captured expected error: wrong_object_type during INSERT'; +END $$; + +-- Test Manual UPDATE on conflict log table +-- This should fail because the table is system-managed +-- Hiding the OID in the error message by catching the exception +DO $$ +DECLARE + tab_name text; +BEGIN + SELECT 'pg_conflict.' || relname INTO tab_name + FROM pg_class c JOIN pg_subscription s ON c.relname = 'pg_conflict_log_' || s.oid + WHERE s.subname = 'regress_conflict_protection_test'; + + EXECUTE 'UPDATE ' || tab_name || ' SET relname = ''mytest'' '; +EXCEPTION WHEN wrong_object_type THEN + RAISE NOTICE 'captured expected error: wrong_object_type during UPDATE'; +END $$; + +-- Trying to perform TRUNCATE/DELETE on the conflict log table +-- This should be allowed so that user can perform cleanup +SELECT 'pg_conflict.' || relname AS conflict_tab +FROM pg_class c +JOIN pg_subscription s ON c.relname = 'pg_conflict_log_' || s.oid +WHERE s.subname = 'regress_conflict_protection_test' \gset +TRUNCATE :conflict_tab; +DELETE FROM :conflict_tab; + +-- Trying to create a new table manually in the pg_conflict namespace +-- This should fail as the namespace is reserved for conflict logs tables +CREATE TABLE pg_conflict.manual_table (id int); + +-- Moving an existing table into the pg_conflict namespace +-- Users should not be able to move their own tables within this namespace +CREATE TABLE public.test_move (id int); +ALTER TABLE public.test_move SET SCHEMA pg_conflict; +DROP TABLE public.test_move; + +SET client_min_messages = WARNING; + +-- Clean up remaining test subscription +ALTER SUBSCRIPTION regress_conflict_log_default DISABLE; +ALTER SUBSCRIPTION regress_conflict_log_default SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_log_default; + +ALTER SUBSCRIPTION regress_conflict_empty_str DISABLE; +ALTER SUBSCRIPTION regress_conflict_empty_str SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_empty_str; + +ALTER SUBSCRIPTION regress_conflict_test2 DISABLE; +ALTER SUBSCRIPTION regress_conflict_test2 SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_test2; + +ALTER SUBSCRIPTION regress_conflict_protection_test DISABLE; +ALTER SUBSCRIPTION regress_conflict_protection_test SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_protection_test; + RESET SESSION AUTHORIZATION; DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 8cf40c87043..203959e5018 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -515,6 +515,8 @@ ConditionalStack ConditionalStackData ConfigData ConfigVariable +ConflictLogColumnDef +ConflictLogDest ConflictTupleInfo ConflictType ConnCacheEntry -- 2.49.0