From ca928f54ea47a51d9031bc559d2065680bfacb85 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Thu, 25 Jun 2026 17:56:27 +0530 Subject: [PATCH v60 1/5] Add configurable conflict log table for Logical Replication MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This patch adds a feature to provide a structured, queryable record of all logical replication conflicts. The current approach of logging conflicts as plain text in the server logs makes it difficult to query, analyze, and use for external monitoring and automation. This patch addresses these limitations by introducing a configurable conflict_log_destination=('log'/'table'/'all') option in the CREATE SUBSCRIPTION command. If the user chooses to enable logging to a table (by selecting 'table' or 'all'), an internal logging table named pg_conflict_log_ is automatically created within a dedicated, system-managed 'pg_conflict' namespace to prevent users from manually dropping or altering it. This also prevents accidental name collisions with user-created tables. This table is linked to the subscription via an internal dependency, ensuring it is automatically dropped when the subscription is removed The per-subscription table model was chosen over a single global log to ensure superior isolation and administrative flexibility by directly aligning table ownership with the subscription’s lifecycle. This approach allows for granular permission management, enabling the subscription owner to perform necessary maintenance tasks like SELECT, DELETE, and TRUNCATE without the security risks or complex Row-Level Security required by a shared global table. The conflict details, including the original and remote tuples, are stored in JSON columns, providing a flexible format to accommodate different table schemas. The log table captures essential attributes such as local and remote transaction IDs, LSNs, commit timestamps, and conflict type, providing a complete record for post-mortem analysis. This feature will make logical replication conflicts easier to monitor and manage, significantly improving the overall resilience and operability of replication setups. The conflict log tables will not be included in a publication, even if the publication is configured to include ALL TABLES or ALL TABLES IN SCHEMA. Note: Bump catalog version as this add new column to pg_subscription catalog --- src/backend/catalog/aclchk.c | 58 ++-- src/backend/catalog/catalog.c | 29 +- src/backend/catalog/pg_publication.c | 14 +- src/backend/catalog/pg_subscription.c | 7 + src/backend/catalog/system_views.sql | 4 +- src/backend/commands/lockcmds.c | 24 +- src/backend/commands/policy.c | 12 + src/backend/commands/statscmds.c | 14 + src/backend/commands/subscriptioncmds.c | 213 ++++++++++++++- src/backend/commands/tablecmds.c | 92 ++++++- src/backend/commands/trigger.c | 25 ++ src/backend/executor/execMain.c | 28 ++ src/backend/replication/logical/conflict.c | 172 ++++++++++++ src/backend/rewrite/rewriteDefine.c | 22 ++ src/backend/utils/cache/lsyscache.c | 2 +- src/bin/initdb/initdb.c | 6 + src/bin/psql/tab-complete.in.c | 8 +- src/include/catalog/catalog.h | 2 + src/include/catalog/pg_namespace.dat | 3 + src/include/catalog/pg_subscription.h | 11 + src/include/replication/conflict.h | 27 ++ src/include/utils/lsyscache.h | 2 +- src/test/regress/expected/subscription.out | 295 +++++++++++++++++++++ src/test/regress/sql/subscription.sql | 234 ++++++++++++++++ src/test/subscription/t/035_conflicts.pl | 67 ++++- src/tools/pgindent/typedefs.list | 2 + 26 files changed, 1335 insertions(+), 38 deletions(-) diff --git a/src/backend/catalog/aclchk.c b/src/backend/catalog/aclchk.c index 007ede997c5..66aca583c85 100644 --- a/src/backend/catalog/aclchk.c +++ b/src/backend/catalog/aclchk.c @@ -3337,25 +3337,39 @@ pg_class_aclmask_ext(Oid table_oid, Oid roleid, AclMode mask, classForm = (Form_pg_class) GETSTRUCT(tuple); - /* - * Deny anyone permission to update a system catalog unless - * pg_authid.rolsuper is set. - * - * As of 7.4 we have some updatable system views; those shouldn't be - * protected in this way. Assume the view rules can take care of - * themselves. ACL_USAGE is if we ever have system sequences. - */ - if ((mask & (ACL_INSERT | ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE | ACL_USAGE)) && - IsSystemClass(table_oid, classForm) && - classForm->relkind != RELKIND_VIEW && - !superuser_arg(roleid)) - mask &= ~(ACL_INSERT | ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE | ACL_USAGE); - - /* - * Otherwise, superusers bypass all permission-checking. - */ - if (superuser_arg(roleid)) + if (!superuser_arg(roleid)) { + if (mask & (ACL_INSERT | ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE | ACL_USAGE)) + { + if (IsConflictLogTableClass(classForm)) + { + /* + * For conflict log tables, allow non-superusers to perform + * DELETE and TRUNCATE for cleanup and maintenance, while still + * restricting INSERT, UPDATE, and USAGE. + */ + mask &= ~(ACL_INSERT | ACL_UPDATE | ACL_USAGE); + } + else if (IsSystemClass(table_oid, classForm) && + classForm->relkind != RELKIND_VIEW) + { + /* + * Deny anyone permission to update a system catalog unless + * pg_authid.rolsuper is set. + * + * As of 7.4 we have some updatable system views; those + * shouldn't be protected in this way. Assume the view rules + * can take care of themselves. ACL_USAGE is if we ever have + * system sequences. + */ + mask &= ~(ACL_INSERT | ACL_UPDATE | ACL_DELETE | ACL_TRUNCATE | + ACL_USAGE); + } + } + } + else + { + /* Superusers bypass all permission-checking. */ ReleaseSysCache(tuple); return mask; } @@ -3660,6 +3674,14 @@ pg_namespace_aclmask_ext(Oid nsp_oid, Oid roleid, Acl *acl; Oid ownerId; + /* + * Disallow creation in the conflict schema for everyone, including + * superusers, unless in binary-upgrade mode. + */ + if (!IsBinaryUpgrade && (mask & ACL_CREATE) && + IsConflictLogTableNamespace(nsp_oid)) + return mask & ~ACL_CREATE; + /* Superusers bypass all permission checking. */ if (superuser_arg(roleid)) return mask; diff --git a/src/backend/catalog/catalog.c b/src/backend/catalog/catalog.c index 7be49032934..be8791af875 100644 --- a/src/backend/catalog/catalog.c +++ b/src/backend/catalog/catalog.c @@ -86,7 +86,9 @@ bool IsSystemClass(Oid relid, Form_pg_class reltuple) { /* IsCatalogRelationOid is a bit faster, so test that first */ - return (IsCatalogRelationOid(relid) || IsToastClass(reltuple)); + return (IsCatalogRelationOid(relid) || + IsToastClass(reltuple) || + IsConflictLogTableClass(reltuple)); } /* @@ -230,6 +232,20 @@ IsToastClass(Form_pg_class reltuple) return IsToastNamespace(relnamespace); } +/* + * IsConflictLogTableClass + * True iff pg_class tuple represents a Conflict Log Table. + * + * Does not perform any catalog accesses. + */ +bool +IsConflictLogTableClass(Form_pg_class reltuple) +{ + Oid relnamespace = reltuple->relnamespace; + + return IsConflictLogTableNamespace(relnamespace); +} + /* * IsCatalogNamespace * True iff namespace is pg_catalog. @@ -264,6 +280,17 @@ IsToastNamespace(Oid namespaceId) isTempToastNamespace(namespaceId); } +/* + * IsConflictLogTableNamespace + * True iff namespace is pg_conflict. + * + * Does not perform any catalog accesses. + */ +bool +IsConflictLogTableNamespace(Oid namespaceId) +{ + return namespaceId == PG_CONFLICT_NAMESPACE; +} /* * IsReservedName diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 5c457d9aca8..1ec94c851b2 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -92,6 +92,13 @@ check_publication_add_relation(PublicationRelInfo *pri) errmsg(errormsg, relname), errdetail("This operation is not supported for system tables."))); + /* Can't be conflict log table */ + if (IsConflictLogTableNamespace(RelationGetNamespace(targetrel))) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg(errormsg, relname), + errdetail("This operation is not supported for conflict log tables."))); + /* UNLOGGED and TEMP relations cannot be part of publication. */ if (targetrel->rd_rel->relpersistence == RELPERSISTENCE_TEMP) ereport(ERROR, @@ -113,7 +120,8 @@ static void check_publication_add_schema(Oid schemaid) { /* Can't be system namespace */ - if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid)) + if (IsCatalogNamespace(schemaid) || IsToastNamespace(schemaid) || + IsConflictLogTableNamespace(schemaid)) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("cannot add schema \"%s\" to publication", @@ -148,7 +156,8 @@ check_publication_add_schema(Oid schemaid) * is really inadequate for that, since the information_schema could be * dropped and reloaded and then it'll be considered publishable. The best * long-term solution may be to add a "relispublishable" bool to pg_class, - * and depend on that instead of OID checks. + * and depend on that instead of OID checks. IsConflictLogTableClass() + * excludes tables in conflict schema. */ static bool is_publishable_class(Oid relid, Form_pg_class reltuple) @@ -157,6 +166,7 @@ is_publishable_class(Oid relid, Form_pg_class reltuple) reltuple->relkind == RELKIND_PARTITIONED_TABLE || reltuple->relkind == RELKIND_SEQUENCE) && !IsCatalogRelationOid(relid) && + !IsConflictLogTableClass(reltuple) && reltuple->relpersistence == RELPERSISTENCE_PERMANENT && relid >= FirstNormalObjectId; } diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index b5cb301db88..2068e03c571 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -132,6 +132,7 @@ GetSubscription(Oid subid, bool missing_ok, bool conninfo_needed, sub->retaindeadtuples = subform->subretaindeadtuples; sub->maxretention = subform->submaxretention; sub->retentionactive = subform->subretentionactive; + sub->conflictlogrelid = subform->subconflictlogrelid; if (conninfo_needed) { @@ -203,6 +204,12 @@ GetSubscription(Oid subid, bool missing_ok, bool conninfo_needed, Anum_pg_subscription_suborigin); sub->origin = TextDatumGetCString(datum); + /* Get conflict log destination */ + datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, + tup, + Anum_pg_subscription_subconflictlogdest); + sub->conflictlogdest = TextDatumGetCString(datum); + /* Is the subscription owner a superuser? */ sub->ownersuperuser = superuser_arg(sub->owner); diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 8f129baec90..5967762e603 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1527,8 +1527,8 @@ GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subdisableonerr, subpasswordrequired, subrunasowner, subfailover, subretaindeadtuples, submaxretention, subretentionactive, - subserver, subslotname, subsynccommit, subwalrcvtimeout, - subpublications, suborigin) + subserver, subconflictlogrelid, subconflictlogdest, subslotname, + subsynccommit, subwalrcvtimeout, subpublications, suborigin) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/lockcmds.c b/src/backend/commands/lockcmds.c index f66b8f17b9b..85ffe7092ce 100644 --- a/src/backend/commands/lockcmds.c +++ b/src/backend/commands/lockcmds.c @@ -16,6 +16,7 @@ #include "access/table.h" #include "access/xact.h" +#include "catalog/catalog.h" #include "catalog/namespace.h" #include "catalog/pg_inherits.h" #include "commands/lockcmds.h" @@ -83,7 +84,18 @@ RangeVarCallbackForLockTable(const RangeVar *rv, Oid relid, Oid oldrelid, return; /* woops, concurrently dropped; no permissions * check */ - /* Currently, we only allow plain tables or views to be locked */ + /* + * Note: Conflict log tables are deliberately NOT blocked here, even though + * other direct DDL on them is rejected elsewhere. pg_dump relies on being + * able to take an ACCESS SHARE lock on these tables to safely dump their + * definitions during a binary upgrade, so we permit LOCK commands on them + * and treat them like ordinary tables here. It's true that a strong lock + * (ShareLock or above) on such a table would conflict with the + * RowExclusiveLock taken by the apply worker's inserts and could stall + * conflict logging as well as the apply worker for as long as it is held. + * But locking a system-managed conflict log table is an unusual thing to + * do, and it doesn't seem worth the trouble of filtering by lock mode here. + */ if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE && relkind != RELKIND_VIEW) ereport(ERROR, @@ -198,6 +210,16 @@ LockViewRecurse_walker(Node *node, LockViewRecurse_context *context) relkind != RELKIND_VIEW) continue; + /* + * Conflict log tables only support SELECT, DELETE, and TRUNCATE. + * A direct LOCK on them is permitted solely so that pg_dump can + * lock them during a binary upgrade; locking one indirectly by + * locking a view over it is not needed for that, so skip it here + * rather than locking it. + */ + if (IsConflictLogTableNamespace(get_rel_namespace(relid))) + continue; + /* * We might be dealing with a self-referential view. If so, we * can just stop recursing, since we already locked it. diff --git a/src/backend/commands/policy.c b/src/backend/commands/policy.c index 21b8eebe32d..ebab2edde92 100644 --- a/src/backend/commands/policy.c +++ b/src/backend/commands/policy.c @@ -79,6 +79,18 @@ RangeVarCallbackForPolicy(const RangeVar *rv, Oid relid, Oid oldrelid, if (!object_ownercheck(RelationRelationId, relid, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(get_rel_relkind(relid)), rv->relname); + /* + * Conflict log tables are used internally for logical replication conflict + * logging and should not be modified directly, as it could disrupt + * conflict logging. + */ + if (IsConflictLogTableClass(classform)) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot create policy on conflict log table \"%s\"", + rv->relname), + errdetail("Conflict log tables are system-managed tables for logical replication conflicts."))); + /* No system table modifications unless explicitly allowed. */ if (!allowSystemTableMods && IsSystemClass(relid, classform)) ereport(ERROR, diff --git a/src/backend/commands/statscmds.c b/src/backend/commands/statscmds.c index b354723be44..ce98ebb35ea 100644 --- a/src/backend/commands/statscmds.c +++ b/src/backend/commands/statscmds.c @@ -147,6 +147,20 @@ CreateStatistics(CreateStatsStmt *stmt, bool check_rights) aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind), RelationGetRelationName(rel)); + /* + * Conflict log tables are system-managed tables used internally for + * logical replication conflict logging. Unlike user tables, they are + * not expected to have complex query usage, so to keep things simple, + * user-defined extended statistics are not required or supported at + * present. + */ + if (IsConflictLogTableClass(rel->rd_rel)) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot create statistics on conflict log table \"%s\"", + RelationGetRelationName(rel)), + errdetail("Conflict log tables are system-managed tables for logical replication conflicts."))); + /* Creating statistics on system catalogs is not allowed */ if (!allowSystemTableMods && IsSystemRelation(rel)) ereport(ERROR, diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index ee06a726f42..e30e0e0dfe7 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -28,6 +28,7 @@ #include "catalog/pg_authid_d.h" #include "catalog/pg_database_d.h" #include "catalog/pg_foreign_server.h" +#include "catalog/pg_namespace.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" @@ -35,6 +36,7 @@ #include "commands/defrem.h" #include "commands/event_trigger.h" #include "commands/subscriptioncmds.h" +#include "commands/tablecmds.h" #include "executor/executor.h" #include "foreign/foreign.h" #include "miscadmin.h" @@ -79,6 +81,7 @@ #define SUBOPT_WAL_RECEIVER_TIMEOUT 0x00010000 #define SUBOPT_LSN 0x00020000 #define SUBOPT_ORIGIN 0x00040000 +#define SUBOPT_CONFLICT_LOG_DEST 0x00080000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -107,6 +110,7 @@ typedef struct SubOpts bool retaindeadtuples; int32 maxretention; char *origin; + ConflictLogDest conflictlogdest; XLogRecPtr lsn; char *wal_receiver_timeout; } SubOpts; @@ -140,7 +144,12 @@ static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); static void CheckAlterSubOption(Subscription *sub, const char *option, bool slot_needs_update, bool isTopLevel); - +static bool alter_sub_conflict_log_dest(Subscription *sub, + ConflictLogDest oldlogdest, + ConflictLogDest newlogdest, + Oid *conflicttablerelid); +static void drop_sub_conflict_log_table(Oid subid, char *subname, + Oid subconflictlogrelid); /* * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands. @@ -196,6 +205,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->maxretention = 0; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); + if (IsSet(supported_opts, SUBOPT_CONFLICT_LOG_DEST)) + opts->conflictlogdest = CONFLICT_LOG_DEST_LOG; /* Parse options */ foreach(lc, stmt_options) @@ -436,6 +447,18 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, PGC_BACKEND, PGC_S_TEST, GUC_ACTION_SET, false, 0, false); } + else if (IsSet(supported_opts, SUBOPT_CONFLICT_LOG_DEST) && + strcmp(defel->defname, "conflict_log_destination") == 0) + { + char *val; + + if (IsSet(opts->specified_opts, SUBOPT_CONFLICT_LOG_DEST)) + errorConflictingDefElem(defel, pstate); + + val = defGetString(defel); + opts->conflictlogdest = GetConflictLogDest(val); + opts->specified_opts |= SUBOPT_CONFLICT_LOG_DEST; + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -660,6 +683,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, uint32 supported_opts; SubOpts opts = {0}; AclResult aclresult; + Oid logrelid = InvalidOid; /* * Parse and check options. @@ -674,7 +698,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_MAX_RETENTION_DURATION | - SUBOPT_WAL_RECEIVER_TIMEOUT | SUBOPT_ORIGIN); + SUBOPT_WAL_RECEIVER_TIMEOUT | SUBOPT_ORIGIN | + SUBOPT_CONFLICT_LOG_DEST); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -848,6 +873,20 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_suborigin - 1] = CStringGetTextDatum(opts.origin); + values[Anum_pg_subscription_subconflictlogdest - 1] = + CStringGetTextDatum(ConflictLogDestNames[opts.conflictlogdest]); + + /* + * We create the conflict log table here, if required, so that its relation + * OID can be stored when inserting the pg_subscription tuple below. + */ + if (CONFLICTS_LOGGED_TO_TABLE(opts.conflictlogdest)) + logrelid = create_conflict_log_table(subid, stmt->subname, owner); + + /* Store table OID in the catalog. */ + values[Anum_pg_subscription_subconflictlogrelid - 1] = + ObjectIdGetDatum(logrelid); + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); /* Insert tuple into catalog. */ @@ -868,6 +907,25 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); } + /* + * Establish an internal dependency between the conflict log table and + * the subscription. + * + * We use DEPENDENCY_INTERNAL to signify that the table's lifecycle is + * strictly tied to the subscription, similar to how a TOAST table relates + * to its main table or a sequence relates to an identity column. + * + * This ensures the conflict log table is automatically reaped during a + * DROP SUBSCRIPTION via performDeletion(). + */ + if (OidIsValid(logrelid)) + { + ObjectAddress cltaddr; + + ObjectAddressSet(cltaddr, RelationRelationId, logrelid); + recordDependencyOn(&cltaddr, &myself, DEPENDENCY_INTERNAL); + } + /* * A replication origin is currently created for all subscriptions, * including those that only contain sequences or are otherwise empty. @@ -1437,6 +1495,71 @@ CheckAlterSubOption(Subscription *sub, const char *option, } } +/* + * alter_sub_conflict_log_dest + * + * When the subscription's 'conflict_log_destination' is changed, update the + * conflict log table if required. + * + * If the new destination no longer requires a conflict log table, the existing + * conflict log table associated with the subscription is removed via internal + * dependency cleanup to prevent orphaned relations. + * + * On success, *conflicttablerelid is set to the OID of the conflict log table + * that was created or validated, or to InvalidOid if no table is required. + * + * Returns true if the subscription's conflict log table reference must be + * updated as a result of the destination change; false otherwise. + */ +static bool +alter_sub_conflict_log_dest(Subscription *sub, ConflictLogDest oldlogdest, + ConflictLogDest newlogdest, + Oid *conflicttablerelid) +{ + bool want_table; + bool has_oldtable; + bool update_relid = false; + Oid relid = InvalidOid; + + want_table = CONFLICTS_LOGGED_TO_TABLE(newlogdest); + has_oldtable = CONFLICTS_LOGGED_TO_TABLE(oldlogdest); + + if (has_oldtable) + { + /* There is a conflict log table already. */ + if (!want_table) + { + drop_sub_conflict_log_table(sub->oid, sub->name, + sub->conflictlogrelid); + update_relid = true; + } + } + else + { + /* There was no previous conflict log table. */ + if (want_table) + { + ObjectAddress cltaddr; + ObjectAddress subobj; + + relid = create_conflict_log_table(sub->oid, sub->name, sub->owner); + update_relid = true; + + /* + * Establish an internal dependency between the conflict log table + * and the subscription. For details refer comments in + * CreateSubscription function. + */ + ObjectAddressSet(cltaddr, RelationRelationId, relid); + ObjectAddressSet(subobj, SubscriptionRelationId, sub->oid); + recordDependencyOn(&cltaddr, &subobj, DEPENDENCY_INTERNAL); + } + } + + *conflicttablerelid = relid; + return update_relid; +} + /* * Alter the existing subscription. */ @@ -1499,7 +1622,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_MAX_RETENTION_DURATION | SUBOPT_WAL_RECEIVER_TIMEOUT | - SUBOPT_ORIGIN); + SUBOPT_ORIGIN | + SUBOPT_CONFLICT_LOG_DEST); break; case ALTER_SUBSCRIPTION_ENABLED: @@ -1858,6 +1982,34 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_subwalrcvtimeout - 1] = true; } + if (IsSet(opts.specified_opts, SUBOPT_CONFLICT_LOG_DEST)) + { + ConflictLogDest old_dest = + GetConflictLogDest(sub->conflictlogdest); + + if (opts.conflictlogdest != old_dest) + { + bool update_relid; + Oid relid = InvalidOid; + + values[Anum_pg_subscription_subconflictlogdest - 1] = + CStringGetTextDatum(ConflictLogDestNames[opts.conflictlogdest]); + replaces[Anum_pg_subscription_subconflictlogdest - 1] = true; + + update_relid = alter_sub_conflict_log_dest(sub, + old_dest, + opts.conflictlogdest, + &relid); + if (update_relid) + { + values[Anum_pg_subscription_subconflictlogrelid - 1] = + ObjectIdGetDatum(relid); + replaces[Anum_pg_subscription_subconflictlogrelid - 1] = + true; + } + } + } + update_tuple = true; break; } @@ -2297,6 +2449,51 @@ construct_subserver_conninfo(Oid subserver, Oid subowner, char **err) return ForeignServerConnectionString(subowner, server); } +/* + * Drop subscription's conflict log table + * + * The conflict log table is registered as an internal dependency of the + * subscription. This function removes the dependency by performing a + * cascading deletion on the subscription object, which in turn drops the + * associated conflict log table. + * + * This is used to clean up conflict log tables that are no longer required, + * preventing accumulation of stale or orphaned relations. + * + * NOTE: + * Only conflict log tables are currently managed via this internal dependency + * mechanism. + */ +static void +drop_sub_conflict_log_table(Oid subid, char *subname, Oid subconflictlogrelid) +{ + /* Drop any dependent conflict log table */ + if (OidIsValid(subconflictlogrelid)) + { + ObjectAddress object; + char *conflictrelname; + + conflictrelname = get_rel_name(subconflictlogrelid); + if (conflictrelname == NULL) + elog(ERROR, "cache lookup failed for relation %u", + subconflictlogrelid); + + /* + * By using PERFORM_DELETION_SKIP_ORIGINAL, we ensure that only the + * conflict log table is deleted while the subscription remains. + */ + ObjectAddressSet(object, SubscriptionRelationId, subid); + performDeletion(&object, DROP_CASCADE, + PERFORM_DELETION_INTERNAL | + PERFORM_DELETION_SKIP_ORIGINAL); + + ereport(NOTICE, + errmsg("dropped conflict log table \"%s\" for subscription \"%s\"", + get_qualified_objname(PG_CONFLICT_NAMESPACE, conflictrelname), + subname)); + } +} + /* * Drop a subscription */ @@ -2309,6 +2506,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) Oid subid; Oid subowner; Oid subserver; + Oid subconflictlogrelid; char *subconninfo = NULL; Datum datum; bool isnull; @@ -2361,6 +2559,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) subid = form->oid; subowner = form->subowner; subserver = form->subserver; + subconflictlogrelid = form->subconflictlogrelid; must_use_password = !superuser_arg(subowner) && form->subpasswordrequired; /* must be owner */ @@ -2478,6 +2677,9 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) replorigin_drop_by_name(originname, true, false); } + /* Drop subscription's conflict log table */ + drop_sub_conflict_log_table(subid, subname, subconflictlogrelid); + /* Clean up dependencies */ deleteDependencyRecordsFor(SubscriptionRelationId, subid, false); deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0); @@ -2722,6 +2924,11 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId) form->subowner = newOwnerId; CatalogTupleUpdate(rel, &tup->t_self, tup); + /* Update owner of the conflict log table if it exists. */ + if (OidIsValid(form->subconflictlogrelid)) + ATExecChangeOwner(form->subconflictlogrelid, newOwnerId, true, + AccessExclusiveLock); + /* Update owner dependency reference */ changeDependencyOnOwner(SubscriptionRelationId, form->oid, diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 472db112fa7..c945a44943a 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -2464,9 +2464,11 @@ truncate_check_rel(Oid relid, Form_pg_class reltuple) * pg_largeobject and pg_largeobject_metadata to be truncated as part of * pg_upgrade, because we need to change its relfilenode to match the old * cluster, and allowing a TRUNCATE command to be executed is the easiest - * way of doing that. + * way of doing that. We also allow TRUNCATE on the conflict log tables, + * to permit users to manually prune conflict data to manage disk space. */ - if (!allowSystemTableMods && IsSystemClass(relid, reltuple) + if (!allowSystemTableMods && IsSystemClass(relid, reltuple) && + !IsConflictLogTableClass(reltuple) && (!IsBinaryUpgrade || (relid != LargeObjectRelationId && relid != LargeObjectMetadataRelationId))) @@ -2755,6 +2757,18 @@ MergeAttributes(List *columns, const List *supers, char relpersistence, errmsg("cannot inherit from partition \"%s\"", RelationGetRelationName(relation)))); + /* + * Conflict log tables are managed by the system for logical + * replication and should not be used as parent tables, as + * inheritance could interfere with the logging behavior. + */ + if (IsConflictLogTableNamespace(relation->rd_rel->relnamespace)) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot inherit from conflict log table \"%s\"", + RelationGetRelationName(relation)), + errdetail("Conflict log tables are system-managed tables for logical replication conflicts."))); + if (relation->rd_rel->relkind != RELKIND_RELATION && relation->rd_rel->relkind != RELKIND_FOREIGN_TABLE && relation->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) @@ -3892,6 +3906,19 @@ renameatt_check(Oid myrelid, Form_pg_class classform, bool recursing) if (!object_ownercheck(RelationRelationId, myrelid, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(get_rel_relkind(myrelid)), NameStr(classform->relname)); + + /* + * Conflict log tables are used internally for logical replication conflict + * logging and should not be modified directly, as it could disrupt + * conflict logging. + */ + if (IsConflictLogTableClass(classform)) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot rename columns of conflict log table \"%s\"", + NameStr(classform->relname)), + errdetail("Conflict log tables are system-managed tables for logical replication conflicts."))); + if (!allowSystemTableMods && IsSystemClass(myrelid, classform)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), @@ -6894,6 +6921,22 @@ ATSimplePermissions(AlterTableType cmdtype, Relation rel, int allowed_targets) aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind), RelationGetRelationName(rel)); + /* + * Conflict log tables are used internally for logical replication conflict + * logging and should not be altered directly, as it could disrupt conflict + * logging. Direct ALTER commands are already rejected during relation + * lookup in RangeVarCallbackForAlterRelation(), and AlterTableMoveAll() + * skips these tables, so a conflict log table does not normally reach here; + * this check guards any internal caller that arrives via + * AlterTableInternal(). + */ + if (IsConflictLogTableClass(rel->rd_rel)) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot alter conflict log table \"%s\"", + RelationGetRelationName(rel)), + errdetail("Conflict log tables are system-managed tables for logical replication conflicts."))); + if (!allowSystemTableMods && IsSystemRelation(rel)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), @@ -10203,6 +10246,18 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, errmsg("referenced relation \"%s\" is not a table", RelationGetRelationName(pkrel)))); + /* + * Conflict log tables are used internally for logical replication conflict + * logging and should not be referenced by foreign keys, as it could + * disrupt conflict logging. + */ + if (IsConflictLogTableClass(pkrel->rd_rel)) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot reference conflict log table \"%s\"", + RelationGetRelationName(pkrel)), + errdetail("Conflict log tables are system-managed tables for logical replication conflicts."))); + if (!allowSystemTableMods && IsSystemRelation(pkrel)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), @@ -17539,13 +17594,18 @@ AlterTableMoveAll(AlterTableMoveAllStmt *stmt) * really wishes to do so, they can issue the individual ALTER * commands directly. * - * Also, explicitly avoid any shared tables, temp tables, or TOAST + * Also, explicitly avoid any shared tables, temp tables, TOAST * (TOAST will be moved with the main table). + * + * Conflict log tables are system-managed for logical replication and + * cannot be altered directly, so skip them as well; otherwise a single + * such table in the source tablespace would abort the whole bulk move. */ if (IsCatalogNamespace(relForm->relnamespace) || relForm->relisshared || isAnyTempNamespace(relForm->relnamespace) || - IsToastNamespace(relForm->relnamespace)) + IsToastNamespace(relForm->relnamespace) || + IsConflictLogTableNamespace(relForm->relnamespace)) continue; /* Only move the object type requested */ @@ -20034,6 +20094,18 @@ RangeVarCallbackOwnsRelation(const RangeVar *relation, aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(get_rel_relkind(relId)), relation->relname); + /* + * Conflict log tables are used internally for logical replication conflict + * logging and should not be modified directly, as it could disrupt + * conflict logging. + */ + if (IsConflictLogTableClass((Form_pg_class) GETSTRUCT(tuple))) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot change conflict log table \"%s\"", + relation->relname), + errdetail("Conflict log tables are system-managed tables for logical replication conflicts."))); + if (!allowSystemTableMods && IsSystemClass(relId, (Form_pg_class) GETSTRUCT(tuple))) ereport(ERROR, @@ -20069,6 +20141,18 @@ RangeVarCallbackForAlterRelation(const RangeVar *rv, Oid relid, Oid oldrelid, if (!object_ownercheck(RelationRelationId, relid, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(get_rel_relkind(relid)), rv->relname); + /* + * Conflict log tables are used internally for logical replication conflict + * logging and should not be altered directly, as it could disrupt conflict + * logging. + */ + if (IsConflictLogTableClass(classform)) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot alter conflict log table \"%s\"", + rv->relname), + errdetail("Conflict log tables are system-managed tables for logical replication conflicts."))); + /* No system table modifications unless explicitly allowed. */ if (!allowSystemTableMods && IsSystemClass(relid, classform)) ereport(ERROR, diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c index b87b4b40d07..00d3c90b599 100644 --- a/src/backend/commands/trigger.c +++ b/src/backend/commands/trigger.c @@ -314,6 +314,18 @@ CreateTriggerFiringOn(const CreateTrigStmt *stmt, const char *queryString, RelationGetRelationName(rel)), errdetail_relkind_not_supported(rel->rd_rel->relkind))); + /* + * Conflict log tables are used internally for logical replication conflict + * logging and should not have triggers, as it could disrupt conflict + * logging. + */ + if (IsConflictLogTableClass(rel->rd_rel)) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot create trigger on conflict log table \"%s\"", + RelationGetRelationName(rel)), + errdetail("Conflict log tables are system-managed tables for logical replication conflicts."))); + if (!allowSystemTableMods && IsSystemRelation(rel)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), @@ -1443,6 +1455,19 @@ RangeVarCallbackForRenameTrigger(const RangeVar *rv, Oid relid, Oid oldrelid, /* you must own the table to rename one of its triggers */ if (!object_ownercheck(RelationRelationId, relid, GetUserId())) aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(get_rel_relkind(relid)), rv->relname); + + /* + * Conflict log tables are used internally for logical replication conflict + * logging and should not have triggers, as it could disrupt conflict + * logging. + */ + if (IsConflictLogTableClass(form)) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot rename trigger on conflict log table \"%s\"", + rv->relname), + errdetail("Conflict log tables are system-managed tables for logical replication conflicts."))); + if (!allowSystemTableMods && IsSystemClass(relid, form)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index c3af96989ba..fde502efd38 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -1196,6 +1196,24 @@ CheckValidResultRel(ResultRelInfo *resultRelInfo, CmdType operation, RelationGetRelationName(resultRel)))); break; } + + /* + * Conflict log tables are managed by the system to record logical + * replication conflicts. We allow DELETE and TRUNCATE to permit users to + * manually prune these logs, but manual data insertion or modification + * (INSERT, UPDATE, MERGE) is prohibited to maintain the integrity of the + * system-generated logs. + * + * Since TRUNCATE is handled as a separate utility command, we only need + * to explicitly permit CMD_DELETE here. + */ + if (IsConflictLogTableNamespace(RelationGetNamespace(resultRel)) && + operation != CMD_DELETE) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot modify or insert data into conflict log table \"%s\"", + RelationGetRelationName(resultRel)), + errdetail("Conflict log tables are system-managed and only support cleanup using DELETE or TRUNCATE."))); } /* @@ -1267,6 +1285,16 @@ CheckValidRowMarkRel(Relation rel, RowMarkType markType) RelationGetRelationName(rel)))); break; } + + /* + * Conflict log tables are managed by the system to record logical + * replication conflicts. + */ + if (IsConflictLogTableNamespace(RelationGetNamespace(rel))) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("cannot lock rows in the conflict log table \"%s\"", + RelationGetRelationName(rel)))); } /* diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index 1f8d67fdd90..5b17ed41f4b 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -17,6 +17,10 @@ #include "access/commit_ts.h" #include "access/genam.h" #include "access/tableam.h" +#include "catalog/heap.h" +#include "catalog/pg_am.h" +#include "catalog/pg_namespace.h" +#include "catalog/toasting.h" #include "executor/executor.h" #include "pgstat.h" #include "replication/conflict.h" @@ -24,6 +28,59 @@ #include "storage/lmgr.h" #include "utils/lsyscache.h" +/* + * String representations for the supported conflict logging destinations. + */ +const char *const ConflictLogDestNames[] = { + [CONFLICT_LOG_DEST_LOG] = "log", + [CONFLICT_LOG_DEST_TABLE] = "table", + [CONFLICT_LOG_DEST_ALL] = "all" +}; + +StaticAssertDecl(lengthof(ConflictLogDestNames) == CONFLICT_LOG_DEST_ALL + 1, + "ConflictLogDestNames length mismatch"); + + +/* Structure to hold metadata for one column of the conflict log table */ +typedef struct ConflictLogColumnDef +{ + const char *attname; /* Column name */ + Oid atttypid; /* Data type OID */ +} ConflictLogColumnDef; + +/* + * Schema definition for conflict log tables. + * + * Defines the fixed schema of the per-subscription conflict log table created + * in the pg_conflict namespace. Each entry specifies the column name and its + * type OID; the table is created in this column order by + * create_conflict_log_table(). + * + * The tuple/key columns (replica_identity, remote_tuple, local_conflicts) are + * typed json rather than jsonb on purpose: they hold an exact audit snapshot + * of the applied tuples and replica identity, and json preserves the verbatim + * representation whereas jsonb would normalize it. Indexing them (jsonb's main + * advantage) wouldn't help anyway, as the conflict log is looked up by its + * scalar columns (relid, conflict_type, commit timestamp) while these json + * columns are per-conflict payload to inspect, not search keys. + */ +static const ConflictLogColumnDef ConflictLogSchema[] = { + { .attname = "relid", .atttypid = OIDOID }, + { .attname = "schemaname", .atttypid = TEXTOID }, + { .attname = "relname", .atttypid = TEXTOID }, + { .attname = "conflict_type", .atttypid = TEXTOID }, + { .attname = "remote_xid", .atttypid = XIDOID }, + { .attname = "remote_commit_lsn",.atttypid = LSNOID }, + { .attname = "remote_commit_ts", .atttypid = TIMESTAMPTZOID }, + { .attname = "remote_origin", .atttypid = TEXTOID }, + { .attname = "replica_identity_full", .atttypid = BOOLOID }, + { .attname = "replica_identity", .atttypid = JSONOID }, + { .attname = "remote_tuple", .atttypid = JSONOID }, + { .attname = "local_conflicts", .atttypid = JSONARRAYOID } +}; + +#define NUM_CONFLICT_ATTRS lengthof(ConflictLogSchema) + static const char *const ConflictTypeNames[] = { [CT_INSERT_EXISTS] = "insert_exists", [CT_UPDATE_ORIGIN_DIFFERS] = "update_origin_differs", @@ -54,6 +111,121 @@ static void get_tuple_desc(EState *estate, ResultRelInfo *relinfo, static char *build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot, Oid indexoid); +/* + * Builds the TupleDesc for the conflict log table. + */ +static TupleDesc +create_conflict_log_table_tupdesc(void) +{ + TupleDesc tupdesc; + + tupdesc = CreateTemplateTupleDesc(NUM_CONFLICT_ATTRS); + + for (int i = 0; i < NUM_CONFLICT_ATTRS; i++) + TupleDescInitEntry(tupdesc, i + 1, + ConflictLogSchema[i].attname, + ConflictLogSchema[i].atttypid, + -1, 0); + + TupleDescFinalize(tupdesc); + + return tupdesc; +} + +/* + * Create a structured conflict log table for a subscription. + * + * The table is created within the system-managed 'pg_conflict' namespace to + * prevent users from manually dropping or altering it. This also prevents + * accidental name collisions with user-created tables with the same name. + * + * The table name is generated automatically using the subscription's OID + * (e.g., "pg_conflict_log_") to ensure uniqueness within the + * cluster and to avoid collisions during subscription renames. + */ +Oid +create_conflict_log_table(Oid subid, char *subname, Oid subowner) +{ + TupleDesc tupdesc; + Oid relid; + char relname[NAMEDATALEN]; + + snprintf(relname, NAMEDATALEN, "pg_conflict_log_%u", subid); + + /* Build the tuple descriptor for the new table. */ + tupdesc = create_conflict_log_table_tupdesc(); + + /* Create conflict log table. */ + relid = heap_create_with_catalog(relname, + PG_CONFLICT_NAMESPACE, + 0, /* tablespace */ + InvalidOid, /* relid */ + InvalidOid, /* reltypeid */ + InvalidOid, /* reloftypeid */ + subowner, + HEAP_TABLE_AM_OID, + tupdesc, + NIL, + RELKIND_RELATION, + RELPERSISTENCE_PERMANENT, + false, /* shared_relation */ + false, /* mapped_relation */ + ONCOMMIT_NOOP, + (Datum) 0, /* reloptions */ + false, /* use_user_acl */ + true, /* allow_system_table_mods */ + true, /* is_internal */ + InvalidOid, /* relrewrite */ + NULL); /* typaddress */ + Assert(OidIsValid(relid)); + + /* Release tuple descriptor memory. */ + FreeTupleDesc(tupdesc); + + /* + * We must bump the command counter to make the newly-created relation + * tuple visible for opening. + */ + CommandCounterIncrement(); + + /* + * Create a TOAST table for the conflict log to support out-of-line storage + * of large json data. + */ + NewRelationCreateToastTable(relid, (Datum) 0); + + ereport(NOTICE, + (errmsg("created conflict log table \"%s\" for subscription \"%s\"", + get_qualified_objname(PG_CONFLICT_NAMESPACE, relname), + subname))); + + return relid; +} + +/* + * Convert the string representation of a conflict logging destination to its + * corresponding enum value. + */ +ConflictLogDest +GetConflictLogDest(const char *dest) +{ + /* NULL defaults to LOG. */ + if (dest == NULL || pg_strcasecmp(dest, "log") == 0) + return CONFLICT_LOG_DEST_LOG; + + if (pg_strcasecmp(dest, "table") == 0) + return CONFLICT_LOG_DEST_TABLE; + + if (pg_strcasecmp(dest, "all") == 0) + return CONFLICT_LOG_DEST_ALL; + + /* Unrecognized string. */ + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("unrecognized conflict_log_destination value: \"%s\"", dest), + errhint("Valid values are \"log\", \"table\", and \"all\"."))); +} + /* * Get the xmin and commit timestamp data (origin and timestamp) associated * with the provided local row. diff --git a/src/backend/rewrite/rewriteDefine.c b/src/backend/rewrite/rewriteDefine.c index 6a223fbeaa4..968ed8ccfce 100644 --- a/src/backend/rewrite/rewriteDefine.c +++ b/src/backend/rewrite/rewriteDefine.c @@ -262,6 +262,17 @@ DefineQueryRewrite(const char *rulename, RelationGetRelationName(event_relation)), errdetail_relkind_not_supported(event_relation->rd_rel->relkind))); + /* + * Conflict log tables are used internally for logical replication conflict + * logging and should not have rules, as it could disrupt conflict logging. + */ + if (IsConflictLogTableClass(event_relation->rd_rel)) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("conflict log table \"%s\" cannot have rules", + RelationGetRelationName(event_relation)), + errdetail("Conflict log tables are system-managed tables for logical replication conflicts."))); + if (!allowSystemTableMods && IsSystemRelation(event_relation)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), @@ -772,6 +783,17 @@ RangeVarCallbackForRenameRule(const RangeVar *rv, Oid relid, Oid oldrelid, errmsg("relation \"%s\" cannot have rules", rv->relname), errdetail_relkind_not_supported(form->relkind))); + /* + * Conflict log tables are used internally for logical replication conflict + * logging and should not have rules, as it could disrupt conflict logging. + */ + if (IsConflictLogTableClass(form)) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("conflict log table \"%s\" cannot have rules", + rv->relname), + errdetail("Conflict log tables are system-managed tables for logical replication conflicts."))); + if (!allowSystemTableMods && IsSystemClass(relid, form)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c index 036de5f79ef..e52d4ccd089 100644 --- a/src/backend/utils/cache/lsyscache.c +++ b/src/backend/utils/cache/lsyscache.c @@ -3709,7 +3709,7 @@ get_namespace_name_or_temp(Oid nspid) * object for the given namespace ID and object name. */ char * -get_qualified_objname(Oid nspid, char *objname) +get_qualified_objname(Oid nspid, const char *objname) { char *nspname; char *result; diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c index 14cb79c26be..803ca4112d4 100644 --- a/src/bin/initdb/initdb.c +++ b/src/bin/initdb/initdb.c @@ -1839,6 +1839,12 @@ setup_privileges(FILE *cmdfd) " AND relacl IS NULL;\n\n", escape_quotes(username)); PG_CMD_PUTS("GRANT USAGE ON SCHEMA pg_catalog, public TO PUBLIC;\n\n"); + + /* + * Allow non-superuser subscription owners to access their associated + * conflict log tables in the pg_conflict schema. + */ + PG_CMD_PUTS("GRANT USAGE ON SCHEMA pg_conflict TO PUBLIC;\n\n"); PG_CMD_PUTS("REVOKE ALL ON pg_largeobject FROM PUBLIC;\n\n"); PG_CMD_PUTS("INSERT INTO pg_init_privs " " (objoid, classoid, objsubid, initprivs, privtype)" diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c index 46b9add0604..f331464f2c1 100644 --- a/src/bin/psql/tab-complete.in.c +++ b/src/bin/psql/tab-complete.in.c @@ -2376,8 +2376,8 @@ match_previous_words(int pattern_id, COMPLETE_WITH("(", "PUBLICATION"); /* ALTER SUBSCRIPTION SET ( */ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "SET", "(")) - COMPLETE_WITH("binary", "disable_on_error", "failover", - "max_retention_duration", "origin", + COMPLETE_WITH("binary", "conflict_log_destination", "disable_on_error", + "failover", "max_retention_duration", "origin", "password_required", "retain_dead_tuples", "run_as_owner", "slot_name", "streaming", "synchronous_commit", "two_phase", @@ -3961,8 +3961,8 @@ match_previous_words(int pattern_id, COMPLETE_WITH("WITH ("); /* Complete "CREATE SUBSCRIPTION ... WITH ( " */ else if (Matches("CREATE", "SUBSCRIPTION", MatchAnyN, "WITH", "(")) - COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", - "disable_on_error", "enabled", "failover", + COMPLETE_WITH("binary", "conflict_log_destination", "connect", "copy_data", + "create_slot", "disable_on_error", "enabled", "failover", "max_retention_duration", "origin", "password_required", "retain_dead_tuples", "run_as_owner", "slot_name", "streaming", diff --git a/src/include/catalog/catalog.h b/src/include/catalog/catalog.h index a9d6e8ea986..feab3982cf5 100644 --- a/src/include/catalog/catalog.h +++ b/src/include/catalog/catalog.h @@ -25,6 +25,7 @@ extern bool IsInplaceUpdateRelation(Relation relation); extern bool IsSystemClass(Oid relid, Form_pg_class reltuple); extern bool IsToastClass(Form_pg_class reltuple); +extern bool IsConflictLogTableClass(Form_pg_class reltuple); extern bool IsCatalogRelationOid(Oid relid); extern bool IsCatalogTextUniqueIndexOid(Oid relid); @@ -32,6 +33,7 @@ extern bool IsInplaceUpdateOid(Oid relid); extern bool IsCatalogNamespace(Oid namespaceId); extern bool IsToastNamespace(Oid namespaceId); +extern bool IsConflictLogTableNamespace(Oid namespaceId); extern bool IsReservedName(const char *name); diff --git a/src/include/catalog/pg_namespace.dat b/src/include/catalog/pg_namespace.dat index 3075e142c73..de8316e40df 100644 --- a/src/include/catalog/pg_namespace.dat +++ b/src/include/catalog/pg_namespace.dat @@ -18,6 +18,9 @@ { oid => '99', oid_symbol => 'PG_TOAST_NAMESPACE', descr => 'reserved schema for TOAST tables', nspname => 'pg_toast', nspacl => '_null_' }, +{ oid => '8988', oid_symbol => 'PG_CONFLICT_NAMESPACE', + descr => 'reserved schema for subscription-specific conflict log tables', + nspname => 'pg_conflict', nspacl => '_null_' }, # update dumpNamespace() if changing this descr { oid => '2200', oid_symbol => 'PG_PUBLIC_NAMESPACE', descr => 'standard public schema', diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 48944201889..4338e6620b4 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -95,7 +95,16 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW Oid subserver BKI_LOOKUP_OPT(pg_foreign_server); /* If connection uses * server */ + Oid subconflictlogrelid; /* Relid of the conflict log table. */ #ifdef CATALOG_VARLEN /* variable-length fields start here */ + /* + * Strategy for logging replication conflicts: + * 'log' - server log only, + * 'table' - conflict log table only, + * 'all' - both log and table. + */ + text subconflictlogdest BKI_FORCE_NOT_NULL; + /* Connection string to the publisher */ text subconninfo; /* Set if connecting with connection string */ @@ -164,6 +173,7 @@ typedef struct Subscription * and the retention duration has not * exceeded max_retention_duration, when * defined */ + Oid conflictlogrelid; /* conflict log table Oid */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ @@ -171,6 +181,7 @@ typedef struct Subscription List *publications; /* List of publication names to subscribe to */ char *origin; /* Only publish data originating from the * specified origin */ + char *conflictlogdest; /* Conflict log destination */ } Subscription; #ifdef EXPOSE_TO_CLIENT_CODE diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h index 2d9dbcf4d0d..a017e1e6cb5 100644 --- a/src/include/replication/conflict.h +++ b/src/include/replication/conflict.h @@ -10,6 +10,7 @@ #define CONFLICT_H #include "access/xlogdefs.h" +#include "catalog/pg_type.h" #include "datatype/timestamp.h" #include "nodes/pg_list.h" @@ -79,6 +80,32 @@ typedef struct ConflictTupleInfo * conflicting local row occurred */ } ConflictTupleInfo; +/* + * Defines where logical replication conflict details are recorded. + * + * While stored as a text-based array/string in + * pg_subscription.subconflictlogdest for user readability and extensibility, + * we map these to an internal enum to allow for efficient checks. + */ +typedef enum ConflictLogDest +{ + CONFLICT_LOG_DEST_LOG = 0, /* Emit to server logs */ + CONFLICT_LOG_DEST_TABLE, /* Insert into the conflict log table */ + CONFLICT_LOG_DEST_ALL /* Both log and table */ +} ConflictLogDest; + +#define CONFLICTS_LOGGED_TO_TABLE(dest) \ + ((dest == CONFLICT_LOG_DEST_TABLE) || (dest == CONFLICT_LOG_DEST_ALL)) +#define CONFLICTS_LOGGED_TO_LOG(dest) \ + ((dest == CONFLICT_LOG_DEST_LOG) || (dest == CONFLICT_LOG_DEST_ALL)) + +/* + * Array mapping for converting internal enum to string. + */ +extern PGDLLIMPORT const char *const ConflictLogDestNames[]; + +extern Oid create_conflict_log_table(Oid subid, char *subname, Oid subowner); +extern ConflictLogDest GetConflictLogDest(const char *dest); extern bool GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin, ReplOriginId *localorigin, diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h index 865980cb0f1..53c2a9e8673 100644 --- a/src/include/utils/lsyscache.h +++ b/src/include/utils/lsyscache.h @@ -202,7 +202,7 @@ extern bool get_attstatsslot(AttStatsSlot *sslot, HeapTuple statstuple, extern void free_attstatsslot(AttStatsSlot *sslot); extern char *get_namespace_name(Oid nspid); extern char *get_namespace_name_or_temp(Oid nspid); -extern char *get_qualified_objname(Oid nspid, char *objname); +extern char *get_qualified_objname(Oid nspid, const char *objname); extern Oid get_range_subtype(Oid rangeOid); extern Oid get_range_collation(Oid rangeOid); extern Oid get_range_constructor2(Oid rangeOid); diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 8dbfac66326..d201ad764f0 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -619,6 +619,301 @@ COMMIT; -- ok, owning it is enough for this stuff ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; +-- +-- CONFLICT LOG DESTINATION TESTS +-- +SET SESSION AUTHORIZATION 'regress_subscription_user'; +SET client_min_messages = WARNING; +-- fail - unrecognized parameter value +CREATE SUBSCRIPTION regress_conflict_fail CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'invalid'); +ERROR: unrecognized conflict_log_destination value: "invalid" +HINT: Valid values are "log", "table", and "all". +-- verify subconflictlogdest is 'log' and subconflictlogrelid is 0 (InvalidOid) for default case +CREATE SUBSCRIPTION regress_conflict_log_default CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. +SELECT subname, subconflictlogdest, subconflictlogrelid +FROM pg_subscription WHERE subname = 'regress_conflict_log_default'; + subname | subconflictlogdest | subconflictlogrelid +------------------------------+--------------------+--------------------- + regress_conflict_log_default | log | 0 +(1 row) + +-- fail - empty string parameter value +CREATE SUBSCRIPTION regress_conflict_empty_str CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = ''); +ERROR: unrecognized conflict_log_destination value: "" +HINT: Valid values are "log", "table", and "all". +-- this should generate a conflict log table named pg_conflict_log_$subid$ +CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'table'); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. +-- check metadata in pg_subscription: destination should be 'table' and subconflictlogrelid valid +SELECT subname, subconflictlogdest, subconflictlogrelid > 0 AS has_relid +FROM pg_subscription WHERE subname = 'regress_conflict_test1'; + subname | subconflictlogdest | has_relid +------------------------+--------------------+----------- + regress_conflict_test1 | table | t +(1 row) + +-- verify the physical table exists, its OID matches subconflictlogrelid, +-- and it is located in the 'pg_conflict' namespace +SELECT n.nspname, (c.oid = s.subconflictlogrelid) AS "oid_matches" +FROM pg_class c +JOIN pg_subscription s ON c.relname = 'pg_conflict_log_' || s.oid +JOIN pg_namespace n ON c.relnamespace = n.oid +WHERE s.subname = 'regress_conflict_test1'; + nspname | oid_matches +-------------+------------- + pg_conflict | t +(1 row) + +-- check if the conflict log table has the correct schema +SELECT a.attnum, a.attname +FROM pg_attribute a +JOIN pg_class c ON a.attrelid = c.oid +JOIN pg_subscription s ON c.relname = 'pg_conflict_log_' || s.oid +WHERE s.subname = 'regress_conflict_test1' AND a.attnum > 0 + ORDER BY a.attnum; + attnum | attname +--------+----------------------- + 1 | relid + 2 | schemaname + 3 | relname + 4 | conflict_type + 5 | remote_xid + 6 | remote_commit_lsn + 7 | remote_commit_ts + 8 | remote_origin + 9 | replica_identity_full + 10 | replica_identity + 11 | remote_tuple + 12 | local_conflicts +(12 rows) + +-- Changing the subscription owner should also update the owner +-- of the associated conflict log table. +ALTER SUBSCRIPTION regress_conflict_test1 OWNER TO regress_subscription_user2; +SELECT pg_catalog.pg_get_userbyid(c.relowner) AS owner +FROM pg_catalog.pg_class c +JOIN pg_catalog.pg_subscription s + ON c.relname = 'pg_conflict_log_' || s.oid +WHERE s.subname = 'regress_conflict_test1'; + owner +---------------------------- + regress_subscription_user2 +(1 row) + +-- Verify that a non-superuser subscription owner can truncate, +-- delete from, and select from the associated conflict log table. +SET ROLE 'regress_subscription_user2'; +SELECT format('%I.%I', n.nspname, c.relname) AS conflict_log_table +FROM pg_catalog.pg_class c +JOIN pg_catalog.pg_namespace n + ON n.oid = c.relnamespace +JOIN pg_catalog.pg_subscription s + ON c.relname = 'pg_conflict_log_' || s.oid +WHERE s.subname = 'regress_conflict_test1' +\gset +TRUNCATE TABLE :conflict_log_table; +DELETE FROM :conflict_log_table; +SELECT COUNT(*) FROM :conflict_log_table; + count +------- + 0 +(1 row) + +RESET ROLE; +-- Restore the original subscription owner. +ALTER SUBSCRIPTION regress_conflict_test1 OWNER TO regress_subscription_user; +-- +-- ALTER SUBSCRIPTION - conflict_log_destination state transitions +-- +-- These tests verify the transition logic between different logging +-- destinations, ensuring conflict log tables are created or dropped as +-- expected +-- +-- transition from 'log' to 'all' +-- a new conflict log table should be created +CREATE SUBSCRIPTION regress_conflict_test2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'log'); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'all'); +-- verify metadata after ALTER (destination should be 'all') +SELECT subname, subconflictlogdest, subconflictlogrelid > 0 AS has_relid +FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + subname | subconflictlogdest | has_relid +------------------------+--------------------+----------- + regress_conflict_test2 | all | t +(1 row) + +-- transition from 'all' to 'table' +-- should NOT drop the table, only change destination string +SELECT subconflictlogrelid AS old_relid FROM pg_subscription WHERE subname = 'regress_conflict_test2' \gset +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'table'); +SELECT subconflictlogdest, subconflictlogrelid = :old_relid AS relid_unchanged +FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + subconflictlogdest | relid_unchanged +--------------------+----------------- + table | t +(1 row) + +-- transition from 'table' to 'log' +-- should drop the table and clear subconflictlogrelid +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'log'); +SELECT subconflictlogdest, subconflictlogrelid +FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + subconflictlogdest | subconflictlogrelid +--------------------+--------------------- + log | 0 +(1 row) + +-- verify the physical table is gone +SELECT count(*) +FROM pg_class c +JOIN pg_subscription s ON c.relname = 'pg_conflict_log_' || s.oid +WHERE s.subname = 'regress_conflict_test2'; + count +------- + 0 +(1 row) + +-- +-- PUBLICATION: Verify conflict log tables are not publishable +-- +-- pg_relation_is_publishable should return false for conflict log tables to +-- prevent them from being accidentally included in publications +-- +SELECT n.nspname, pg_relation_is_publishable(c.oid) +FROM pg_class c +JOIN pg_namespace n ON c.relnamespace = n.oid +JOIN pg_subscription s ON s.subconflictlogrelid = c.oid +WHERE s.subname = 'regress_conflict_test1'; + nspname | pg_relation_is_publishable +-------------+---------------------------- + pg_conflict | f +(1 row) + +-- +-- Table Protection and Lifecycle Management +-- +-- These tests verify that: +-- Manual DROP TABLE is disallowed +-- DROP SUBSCRIPTION automatically reaps the table +-- +-- re-enable table logging for verification +ALTER SUBSCRIPTION regress_conflict_test1 SET (conflict_log_destination = 'table'); +-- The conflict log table name contains the subscription OID, which is +-- non-deterministic. Capture it into a psql variable and report only the +-- SQLSTATE, so the expected output does not depend on the OID. +-- fail - drop table not allowed due to internal dependency +SET client_min_messages = NOTICE; +SELECT 'pg_conflict.pg_conflict_log_' || oid AS clt1 + FROM pg_subscription WHERE subname = 'regress_conflict_test1' \gset +\set VERBOSITY sqlstate +DROP TABLE :clt1; +ERROR: 42501 +\set VERBOSITY default +-- CLEANUP: DROP SUBSCRIPTION reaps the table +ALTER SUBSCRIPTION regress_conflict_test1 DISABLE; +ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE); +-- Verify the table OID for reap check +SELECT 'pg_conflict.pg_conflict_log_' || oid AS internal_tablename FROM pg_subscription WHERE subname = 'regress_conflict_test1' \gset +SET client_min_messages = WARNING; +DROP SUBSCRIPTION regress_conflict_test1; +-- should return NULL, meaning the conflict log table was reaped via dependency +SELECT to_regclass(:'internal_tablename'); + to_regclass +------------- + +(1 row) + +-- +-- Additional Namespace and Table Protection Tests +-- +-- Setup: Ensure we have a subscription with a conflict log table +CREATE SUBSCRIPTION regress_conflict_protection_test CONNECTION 'dbname=regress_doesnotexist' + PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'table'); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. +-- The conflict log table is system-managed; its name contains the +-- subscription OID, which is non-deterministic. Capture the name into a psql +-- variable and report only the SQLSTATE for the operations that must be +-- rejected, so the expected output stays free of the dynamic OID. Every +-- statement in the VERBOSITY-sqlstate block below must fail with 42809 +-- (wrong_object_type), except adding the table to a publication, which fails +-- with 22023 (invalid_parameter_value). +SELECT 'pg_conflict.' || relname AS clt + FROM pg_class c JOIN pg_subscription s ON c.relname = 'pg_conflict_log_' || s.oid + WHERE s.subname = 'regress_conflict_protection_test' \gset +-- Trigger function used by the CREATE TRIGGER check below. +CREATE FUNCTION public.dummy_trigger_func() RETURNS trigger AS $$ +BEGIN + RETURN NEW; +END; +$$ LANGUAGE plpgsql; +\set VERBOSITY sqlstate +ALTER TABLE :clt ADD COLUMN extra_info text; +ERROR: 42809 +INSERT INTO :clt (relname) VALUES ('mytest'); +ERROR: 42809 +UPDATE :clt SET relname = 'mytest'; +ERROR: 42809 +CREATE POLICY p1 ON :clt USING (true); +ERROR: 42809 +CREATE STATISTICS s1 ON relname, schemaname FROM :clt; +ERROR: 42809 +CREATE TABLE public.conflict_child () INHERITS (:clt); +ERROR: 42809 +ALTER TABLE :clt RENAME COLUMN relname TO new_relname; +ERROR: 42809 +CREATE TABLE public.conflict_fk (relname text REFERENCES :clt(relname)); +ERROR: 42809 +ALTER TABLE :clt OWNER TO regress_subscription_user_dummy; +ERROR: 42809 +ALTER TABLE :clt SET SCHEMA public; +ERROR: 42809 +CREATE TRIGGER t1 BEFORE INSERT ON :clt FOR EACH ROW EXECUTE FUNCTION public.dummy_trigger_func(); +ERROR: 42809 +ALTER TRIGGER non_existent_trigger ON :clt RENAME TO new_trigger; +ERROR: 42809 +CREATE RULE r1 AS ON INSERT TO :clt DO INSTEAD NOTHING; +ERROR: 42809 +ALTER RULE non_existent_rule ON :clt RENAME TO new_rule; +ERROR: 42809 +CREATE INDEX idx1 ON :clt (relname); +ERROR: 42809 +SELECT 1 FROM :clt FOR UPDATE; +ERROR: 42809 +CREATE PUBLICATION testpub_for_clt FOR TABLE :clt; +ERROR: 22023 +\set VERBOSITY default +-- Clean up the trigger function used above. +DROP FUNCTION public.dummy_trigger_func(); +-- TRUNCATE and DELETE are allowed so that users can prune the conflict log. +TRUNCATE :clt; +DELETE FROM :clt; +-- Creating a table directly in the pg_conflict namespace is rejected for +-- everyone (the schema is reserved for conflict log tables). +CREATE TABLE pg_conflict.manual_table (id int); +ERROR: permission denied for schema pg_conflict +LINE 1: CREATE TABLE pg_conflict.manual_table (id int); + ^ +-- Moving a user table into the pg_conflict namespace is likewise rejected. +CREATE TABLE public.test_move (id int); +ALTER TABLE public.test_move SET SCHEMA pg_conflict; +ERROR: permission denied for schema pg_conflict +DROP TABLE public.test_move; +SET client_min_messages = WARNING; +-- Clean up remaining test subscription +ALTER SUBSCRIPTION regress_conflict_log_default DISABLE; +ALTER SUBSCRIPTION regress_conflict_log_default SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_log_default; +ALTER SUBSCRIPTION regress_conflict_test2 DISABLE; +ALTER SUBSCRIPTION regress_conflict_test2 SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_test2; +ALTER SUBSCRIPTION regress_conflict_protection_test DISABLE; +ALTER SUBSCRIPTION regress_conflict_protection_test SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_protection_test; RESET SESSION AUTHORIZATION; DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 05533d66675..86c402c59aa 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -474,6 +474,240 @@ COMMIT; ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; +-- +-- CONFLICT LOG DESTINATION TESTS +-- + +SET SESSION AUTHORIZATION 'regress_subscription_user'; + +SET client_min_messages = WARNING; + +-- fail - unrecognized parameter value +CREATE SUBSCRIPTION regress_conflict_fail CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'invalid'); + +-- verify subconflictlogdest is 'log' and subconflictlogrelid is 0 (InvalidOid) for default case +CREATE SUBSCRIPTION regress_conflict_log_default CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false); +SELECT subname, subconflictlogdest, subconflictlogrelid +FROM pg_subscription WHERE subname = 'regress_conflict_log_default'; + +-- fail - empty string parameter value +CREATE SUBSCRIPTION regress_conflict_empty_str CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = ''); + +-- this should generate a conflict log table named pg_conflict_log_$subid$ +CREATE SUBSCRIPTION regress_conflict_test1 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'table'); + +-- check metadata in pg_subscription: destination should be 'table' and subconflictlogrelid valid +SELECT subname, subconflictlogdest, subconflictlogrelid > 0 AS has_relid +FROM pg_subscription WHERE subname = 'regress_conflict_test1'; + +-- verify the physical table exists, its OID matches subconflictlogrelid, +-- and it is located in the 'pg_conflict' namespace +SELECT n.nspname, (c.oid = s.subconflictlogrelid) AS "oid_matches" +FROM pg_class c +JOIN pg_subscription s ON c.relname = 'pg_conflict_log_' || s.oid +JOIN pg_namespace n ON c.relnamespace = n.oid +WHERE s.subname = 'regress_conflict_test1'; + +-- check if the conflict log table has the correct schema +SELECT a.attnum, a.attname +FROM pg_attribute a +JOIN pg_class c ON a.attrelid = c.oid +JOIN pg_subscription s ON c.relname = 'pg_conflict_log_' || s.oid +WHERE s.subname = 'regress_conflict_test1' AND a.attnum > 0 + ORDER BY a.attnum; + +-- Changing the subscription owner should also update the owner +-- of the associated conflict log table. +ALTER SUBSCRIPTION regress_conflict_test1 OWNER TO regress_subscription_user2; +SELECT pg_catalog.pg_get_userbyid(c.relowner) AS owner +FROM pg_catalog.pg_class c +JOIN pg_catalog.pg_subscription s + ON c.relname = 'pg_conflict_log_' || s.oid +WHERE s.subname = 'regress_conflict_test1'; + +-- Verify that a non-superuser subscription owner can truncate, +-- delete from, and select from the associated conflict log table. +SET ROLE 'regress_subscription_user2'; + +SELECT format('%I.%I', n.nspname, c.relname) AS conflict_log_table +FROM pg_catalog.pg_class c +JOIN pg_catalog.pg_namespace n + ON n.oid = c.relnamespace +JOIN pg_catalog.pg_subscription s + ON c.relname = 'pg_conflict_log_' || s.oid +WHERE s.subname = 'regress_conflict_test1' +\gset + +TRUNCATE TABLE :conflict_log_table; +DELETE FROM :conflict_log_table; +SELECT COUNT(*) FROM :conflict_log_table; + +RESET ROLE; + +-- Restore the original subscription owner. +ALTER SUBSCRIPTION regress_conflict_test1 OWNER TO regress_subscription_user; + +-- +-- ALTER SUBSCRIPTION - conflict_log_destination state transitions +-- +-- These tests verify the transition logic between different logging +-- destinations, ensuring conflict log tables are created or dropped as +-- expected +-- +-- transition from 'log' to 'all' +-- a new conflict log table should be created +CREATE SUBSCRIPTION regress_conflict_test2 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'log'); +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'all'); + +-- verify metadata after ALTER (destination should be 'all') +SELECT subname, subconflictlogdest, subconflictlogrelid > 0 AS has_relid +FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + +-- transition from 'all' to 'table' +-- should NOT drop the table, only change destination string +SELECT subconflictlogrelid AS old_relid FROM pg_subscription WHERE subname = 'regress_conflict_test2' \gset +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'table'); +SELECT subconflictlogdest, subconflictlogrelid = :old_relid AS relid_unchanged +FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + +-- transition from 'table' to 'log' +-- should drop the table and clear subconflictlogrelid +ALTER SUBSCRIPTION regress_conflict_test2 SET (conflict_log_destination = 'log'); +SELECT subconflictlogdest, subconflictlogrelid +FROM pg_subscription WHERE subname = 'regress_conflict_test2'; + +-- verify the physical table is gone +SELECT count(*) +FROM pg_class c +JOIN pg_subscription s ON c.relname = 'pg_conflict_log_' || s.oid +WHERE s.subname = 'regress_conflict_test2'; + +-- +-- PUBLICATION: Verify conflict log tables are not publishable +-- +-- pg_relation_is_publishable should return false for conflict log tables to +-- prevent them from being accidentally included in publications +-- +SELECT n.nspname, pg_relation_is_publishable(c.oid) +FROM pg_class c +JOIN pg_namespace n ON c.relnamespace = n.oid +JOIN pg_subscription s ON s.subconflictlogrelid = c.oid +WHERE s.subname = 'regress_conflict_test1'; + +-- +-- Table Protection and Lifecycle Management +-- +-- These tests verify that: +-- Manual DROP TABLE is disallowed +-- DROP SUBSCRIPTION automatically reaps the table +-- +-- re-enable table logging for verification +ALTER SUBSCRIPTION regress_conflict_test1 SET (conflict_log_destination = 'table'); + +-- The conflict log table name contains the subscription OID, which is +-- non-deterministic. Capture it into a psql variable and report only the +-- SQLSTATE, so the expected output does not depend on the OID. + +-- fail - drop table not allowed due to internal dependency +SET client_min_messages = NOTICE; +SELECT 'pg_conflict.pg_conflict_log_' || oid AS clt1 + FROM pg_subscription WHERE subname = 'regress_conflict_test1' \gset +\set VERBOSITY sqlstate +DROP TABLE :clt1; +\set VERBOSITY default + +-- CLEANUP: DROP SUBSCRIPTION reaps the table +ALTER SUBSCRIPTION regress_conflict_test1 DISABLE; +ALTER SUBSCRIPTION regress_conflict_test1 SET (slot_name = NONE); + +-- Verify the table OID for reap check +SELECT 'pg_conflict.pg_conflict_log_' || oid AS internal_tablename FROM pg_subscription WHERE subname = 'regress_conflict_test1' \gset + +SET client_min_messages = WARNING; +DROP SUBSCRIPTION regress_conflict_test1; + +-- should return NULL, meaning the conflict log table was reaped via dependency +SELECT to_regclass(:'internal_tablename'); + +-- +-- Additional Namespace and Table Protection Tests +-- + +-- Setup: Ensure we have a subscription with a conflict log table +CREATE SUBSCRIPTION regress_conflict_protection_test CONNECTION 'dbname=regress_doesnotexist' + PUBLICATION testpub WITH (connect = false, conflict_log_destination = 'table'); + +-- The conflict log table is system-managed; its name contains the +-- subscription OID, which is non-deterministic. Capture the name into a psql +-- variable and report only the SQLSTATE for the operations that must be +-- rejected, so the expected output stays free of the dynamic OID. Every +-- statement in the VERBOSITY-sqlstate block below must fail with 42809 +-- (wrong_object_type), except adding the table to a publication, which fails +-- with 22023 (invalid_parameter_value). +SELECT 'pg_conflict.' || relname AS clt + FROM pg_class c JOIN pg_subscription s ON c.relname = 'pg_conflict_log_' || s.oid + WHERE s.subname = 'regress_conflict_protection_test' \gset + +-- Trigger function used by the CREATE TRIGGER check below. +CREATE FUNCTION public.dummy_trigger_func() RETURNS trigger AS $$ +BEGIN + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +\set VERBOSITY sqlstate +ALTER TABLE :clt ADD COLUMN extra_info text; +INSERT INTO :clt (relname) VALUES ('mytest'); +UPDATE :clt SET relname = 'mytest'; +CREATE POLICY p1 ON :clt USING (true); +CREATE STATISTICS s1 ON relname, schemaname FROM :clt; +CREATE TABLE public.conflict_child () INHERITS (:clt); +ALTER TABLE :clt RENAME COLUMN relname TO new_relname; +CREATE TABLE public.conflict_fk (relname text REFERENCES :clt(relname)); +ALTER TABLE :clt OWNER TO regress_subscription_user_dummy; +ALTER TABLE :clt SET SCHEMA public; +CREATE TRIGGER t1 BEFORE INSERT ON :clt FOR EACH ROW EXECUTE FUNCTION public.dummy_trigger_func(); +ALTER TRIGGER non_existent_trigger ON :clt RENAME TO new_trigger; +CREATE RULE r1 AS ON INSERT TO :clt DO INSTEAD NOTHING; +ALTER RULE non_existent_rule ON :clt RENAME TO new_rule; +CREATE INDEX idx1 ON :clt (relname); +SELECT 1 FROM :clt FOR UPDATE; +CREATE PUBLICATION testpub_for_clt FOR TABLE :clt; +\set VERBOSITY default + +-- Clean up the trigger function used above. +DROP FUNCTION public.dummy_trigger_func(); + +-- TRUNCATE and DELETE are allowed so that users can prune the conflict log. +TRUNCATE :clt; +DELETE FROM :clt; + +-- Creating a table directly in the pg_conflict namespace is rejected for +-- everyone (the schema is reserved for conflict log tables). +CREATE TABLE pg_conflict.manual_table (id int); + +-- Moving a user table into the pg_conflict namespace is likewise rejected. +CREATE TABLE public.test_move (id int); +ALTER TABLE public.test_move SET SCHEMA pg_conflict; +DROP TABLE public.test_move; + + +SET client_min_messages = WARNING; + +-- Clean up remaining test subscription +ALTER SUBSCRIPTION regress_conflict_log_default DISABLE; +ALTER SUBSCRIPTION regress_conflict_log_default SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_log_default; + + +ALTER SUBSCRIPTION regress_conflict_test2 DISABLE; +ALTER SUBSCRIPTION regress_conflict_test2 SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_test2; + +ALTER SUBSCRIPTION regress_conflict_protection_test DISABLE; +ALTER SUBSCRIPTION regress_conflict_protection_test SET (slot_name = NONE); +DROP SUBSCRIPTION regress_conflict_protection_test; + RESET SESSION AUTHORIZATION; DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl index f23fe6af2a5..3910a49c0aa 100644 --- a/src/test/subscription/t/035_conflicts.pl +++ b/src/test/subscription/t/035_conflicts.pl @@ -50,7 +50,7 @@ $node_subscriber->safe_psql( 'postgres', "CREATE SUBSCRIPTION sub_tab CONNECTION '$publisher_connstr application_name=$appname' - PUBLICATION pub_tab;"); + PUBLICATION pub_tab WITH (conflict_log_destination=all)"); # Wait for initial table sync to finish $node_subscriber->wait_for_subscription_sync($node_publisher, $appname); @@ -670,4 +670,69 @@ ok( $node_A->poll_query_until( ), "the slot 'pg_conflict_detection' has been dropped on Node A"); +############################################################################### +# A conflict log table is system-managed and cannot be altered directly, so +# moving it to another tablespace must be rejected. +############################################################################### +my $subid = $node_subscriber->safe_psql('postgres', + "SELECT oid FROM pg_subscription WHERE subname = 'sub_tab';"); +my $clt = "pg_conflict.pg_conflict_log_$subid"; + +(undef, undef, $stderr) = $node_subscriber->psql('postgres', + "ALTER TABLE $clt SET TABLESPACE pg_default"); +like( + $stderr, + qr/cannot alter conflict log table "pg_conflict_log_\d+"/, + "moving a conflict log table with ALTER TABLE SET TABLESPACE is rejected"); + +############################################################################### +# ALTER TABLE ALL IN TABLESPACE must skip conflict log tables, the same way it +# skips catalog and TOAST tables, instead of failing. Use an isolated database +# so the bulk move only touches the objects created here. +############################################################################### +$node_subscriber->safe_psql('postgres', "CREATE DATABASE clt_ts_test"); +$node_subscriber->safe_psql('clt_ts_test', + "CREATE SUBSCRIPTION sub_ts_test + CONNECTION 'dbname=nonexistent' + PUBLICATION pub + WITH (connect=false, conflict_log_destination='table')"); + +# A plain user table that should be moved, alongside the CLT that must not be. +$node_subscriber->safe_psql('clt_ts_test', "CREATE TABLE user_tbl (i int)"); + +# Create a tablespace backed by a directory inside the data dir. +my $ts_dir = $node_subscriber->data_dir . '/backup_space'; +mkdir($ts_dir) + or die "could not create tablespace directory $ts_dir: $!"; +$node_subscriber->safe_psql('postgres', + "CREATE TABLESPACE backup_space LOCATION '$ts_dir'"); + +# The bulk move succeeds: the user table is relocated while the CLT is skipped. +$node_subscriber->safe_psql('clt_ts_test', + "ALTER TABLE ALL IN TABLESPACE pg_default SET TABLESPACE backup_space"); + +is( $node_subscriber->safe_psql( + 'clt_ts_test', + "SELECT reltablespace <> 0 FROM pg_class WHERE relname = 'user_tbl'"), + 't', + "ALTER TABLE ALL IN TABLESPACE moves an ordinary user table"); + +is( $node_subscriber->safe_psql( + 'clt_ts_test', + "SELECT count(*) FROM pg_class c JOIN pg_subscription s + ON c.relname = 'pg_conflict_log_' || s.oid + WHERE s.subname = 'sub_ts_test' AND c.reltablespace <> 0"), + '0', + "ALTER TABLE ALL IN TABLESPACE skips the conflict log table"); + +# Cleanup. The subscription has no real publisher connection, so detach its +# slot before dropping it. +$node_subscriber->safe_psql('clt_ts_test', + "ALTER SUBSCRIPTION sub_ts_test DISABLE"); +$node_subscriber->safe_psql('clt_ts_test', + "ALTER SUBSCRIPTION sub_ts_test SET (slot_name = NONE)"); +$node_subscriber->safe_psql('clt_ts_test', "DROP SUBSCRIPTION sub_ts_test"); +$node_subscriber->safe_psql('postgres', "DROP DATABASE clt_ts_test"); +$node_subscriber->safe_psql('postgres', "DROP TABLESPACE backup_space"); + done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 3a2720fb5f9..3d6d1c16132 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -515,6 +515,8 @@ ConditionalStack ConditionalStackData ConfigData ConfigVariable +ConflictLogColumnDef +ConflictLogDest ConflictTupleInfo ConflictType ConnCacheEntry -- 2.49.0