From 90be0e5b2943aa4074a2274db45e6c7230733f0a Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Sat, 13 Jun 2026 19:23:28 +0800 Subject: [PATCH v20 9/9] Support dependency tracking via local foreign keys This patch tracks foreign key dependencies to prevent constraint violations. Consider a referencing table FK_TABLE (a INT REFERENCES PK_TABLE) and a referenced table PK_TABLE (a INT PRIMARY KEY): TX-1: INSERT row (1) INTO PK_TABLE TX-2: INSERT row (1) INTO FK_TABLE TX-2 must wait for TX-1; otherwise, a foreign key violation occurs. Similarly, for deletions: TX-1: DELETE row (1) FROM FK_TABLE TX-2: DELETE row (1) FROM PK_TABLE TX-2 must wait for TX-1 to avoid violating the foreign key constraint. Therefore, we record new tuples on the referenced table (PK_TABLE) and check for dependencies when processing new tuples on the referencing table (FK_TABLE). Conversely, we record old tuples deleted from the referencing table and require deletions on the referenced table to depend on them. Similar to unique key dependencies, for delete-delete cases we only track foreign key dependencies on columns that are part of the replica identity keys on the publisher. This may lead to false positives but is acceptable given the trade-off. --- .../replication/logical/applyparallelworker.c | 26 + src/backend/replication/logical/relation.c | 496 ++++++++++++++++++ src/backend/replication/logical/worker.c | 300 ++++++++++- src/include/replication/logicalrelation.h | 22 + src/test/subscription/t/050_parallel_apply.pl | 164 ++++++ 5 files changed, 998 insertions(+), 10 deletions(-) diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 8826faf102c..32144bb57fa 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -218,6 +218,32 @@ * replica identity keys. This could be useful for users who prefer higher * parallelism and experience few conflicts. * + * Additionally, foreign key dependencies are also tracked to prevent constraint + * violations. Consider a referencing table FK_TABLE (a INT REFERENCES PK_TABLE) + * and a referenced table PK_TABLE (a INT PRIMARY KEY): + * + * TX-1: INSERT row (1) INTO PK_TABLE + * TX-2: INSERT row (1) INTO FK_TABLE + * + * TX-2 must wait for TX-1; otherwise, a foreign key violation occurs. + * + * Similarly, for deletions: + * + * TX-1: DELETE row (1) FROM FK_TABLE + * TX-2: DELETE row (1) FROM PK_TABLE + * + * TX-2 must wait for TX-1 to avoid violating the foreign key constraint. + * + * Therefore, we record new tuples on the referenced table (PK_TABLE) and check + * for dependencies when processing new tuples on the referencing table + * (FK_TABLE). Conversely, we record old tuples deleted from the referencing + * table and require deletions on the referenced table to depend on them. + * + * Similar to unique key dependencies, for delete-delete cases we only track + * foreign key dependencies on columns that are part of the replica identity + * keys on the publisher. This may lead to false positives but is acceptable + * given the trade-off. + * * Note that the local unique key could change after dependency checking and * before applying the change. However, to centralize tracking and keep it * simple, we still perform this check only in the leader apply worker. This is diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index 8eef87ca910..f598ffe7117 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -20,7 +20,9 @@ #include "access/amapi.h" #include "access/genam.h" #include "access/table.h" +#include "catalog/heap.h" #include "catalog/namespace.h" +#include "catalog/pg_constraint.h" #include "catalog/pg_proc.h" #include "catalog/pg_subscription_rel.h" #include "commands/trigger.h" @@ -28,6 +30,7 @@ #include "nodes/makefuncs.h" #include "replication/logicalrelation.h" #include "replication/worker_internal.h" +#include "utils/fmgroids.h" #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/syscache.h" @@ -85,6 +88,7 @@ logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid) { entry->localrelvalid = false; entry->local_unique_indexes_collected = false; + entry->local_fkeys_collected = false; hash_seq_term(&status); break; } @@ -101,6 +105,7 @@ logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid) { entry->localrelvalid = false; entry->local_unique_indexes_collected = false; + entry->local_fkeys_collected = false; } } } @@ -147,6 +152,33 @@ free_local_unique_indexes(LogicalRepRelMapEntry *entry) entry->local_unique_indexes = NIL; } +/* + * Release local foreign key lists. + */ +static void +free_local_fkeys(LogicalRepRelMapEntry *entry) +{ + Assert(am_leader_apply_worker()); + + foreach_ptr(LogicalRepSubscriberFK, fkinfo, entry->local_fkeys) + { + list_free(fkinfo->fkattnums); + list_free(fkinfo->fkattnums_old); + } + + foreach_ptr(LogicalRepSubscriberRefFK, refinfo, entry->local_referenced_fkeys) + { + list_free(refinfo->refattnums); + list_free(refinfo->refattnums_old); + } + + list_free_deep(entry->local_fkeys); + list_free_deep(entry->local_referenced_fkeys); + + entry->local_fkeys = NIL; + entry->local_referenced_fkeys = NIL; +} + /* * Free the entry of a relation map cache. */ @@ -177,6 +209,9 @@ logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry) if (entry->local_unique_indexes != NIL) free_local_unique_indexes(entry); + + if (entry->local_fkeys != NIL || entry->local_referenced_fkeys != NIL) + free_local_fkeys(entry); } /* @@ -240,6 +275,7 @@ logicalrep_relmap_update(LogicalRepRelation *remoterel) entry->parallel_safe = LOGICALREP_PARALLEL_UNKNOWN; entry->local_unique_indexes_collected = false; + entry->local_fkeys_collected = false; MemoryContextSwitchTo(oldctx); } @@ -532,6 +568,461 @@ collect_indexes_for_dependency_tracking(LogicalRepRelMapEntry *entry) entry->local_unique_indexes_collected = true; } +/* + * Search a relmap entry by local relation OID. + */ +static LogicalRepRelMapEntry * +logicalrep_get_relentry_by_local_oid(Oid localreloid) +{ + HASH_SEQ_STATUS status; + LogicalRepRelMapEntry *entry = NULL; + + if (LogicalRepRelMap == NULL) + return NULL; + + hash_seq_init(&status, LogicalRepRelMap); + while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) + { + if (entry->localreloid == localreloid) + { + hash_seq_term(&status); + return entry; + } + } + + return NULL; +} + +/* + * Return remote relation IDs that have FK dependency metadata tied to the + * specified relation, either as referencing-side or referenced-side entries. + */ +List * +logicalrep_get_fk_related_relids(LogicalRepRelation *remoterel) +{ + LogicalRepRelMapEntry *target; + bool found; + List *related = NIL; + Oid relnamespace; + Oid remoteid = remoterel->remoteid; + + if (LogicalRepRelMap == NULL) + return NIL; + + target = hash_search(LogicalRepRelMap, &remoteid, HASH_FIND, &found); + + /* + * If not exists yet, build a new entry so that we can collect all the + * referenced and referencing tables. + */ + if (!found) + { + logicalrep_relmap_update(remoterel); + target = hash_search(LogicalRepRelMap, &remoteid, HASH_FIND, &found); + } + + Assert(found); + + /* Collect the tables if not yet */ + if (!target->local_fkeys_collected) + { + bool needs_start = !IsTransactionOrTransactionBlock(); + + if (needs_start) + StartTransactionCommand(); + + /* Return if the relation does not exist */ + relnamespace = get_namespace_oid(remoterel->nspname, true); + if (!OidIsValid(get_relname_relid(remoterel->relname, relnamespace))) + { + if (needs_start) + CommitTransactionCommand(); + + return NIL; + } + + logicalrep_rel_load(NULL, remoteid, AccessShareLock); + + if (needs_start) + CommitTransactionCommand(); + } + + Assert(target->local_fkeys_collected); + + /* Collect all the tables referenced by the given table */ + foreach_ptr(LogicalRepSubscriberFK, fkinfo, target->local_fkeys) + related = list_append_unique_oid(related, fkinfo->ref_remoteid); + + /* Collect all the tables referencing the given table */ + foreach_ptr(LogicalRepSubscriberRefFK, refinfo, target->local_referenced_fkeys) + related = list_append_unique_oid(related, refinfo->fk_remoteid); + + /* Remove the given table itself from the list */ + related = list_delete_oid(related, remoteid); + + return related; +} + +/* + * Return true if the FK constraint is always deferred. + */ +static bool +foreign_key_is_always_deferred(Oid conoid) +{ + HeapTuple tup; + Form_pg_constraint con; + bool deferred; + + tup = SearchSysCache1(CONSTROID, ObjectIdGetDatum(conoid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for foreign key %u", conoid); + + con = (Form_pg_constraint) GETSTRUCT(tup); + deferred = con->condeferrable && con->condeferred; + ReleaseSysCache(tup); + + return deferred; +} + +/* + * Check whether the table has any referencing-side foreign key triggers (ON + * INSERT or ON UPDATE) enabled in replica mode that could cause a foreign key + * violation error. + */ +static bool +fkey_trigger_enabled_in_replica(Relation rel, Oid conoid) +{ + TriggerDesc *trigdesc = rel->trigdesc; + + if (trigdesc == NULL) + return false; + + for (int i = 0; i < trigdesc->numtriggers; i++) + { + Trigger *trig = &trigdesc->triggers[i]; + + /* Keep only FK-side check triggers that belong to this FK constraint */ + if (trig->tgconstraint != conoid || + (trig->tgfoid != F_RI_FKEY_CHECK_INS && + trig->tgfoid != F_RI_FKEY_CHECK_UPD)) + continue; + + /* In replica mode, only REPLICA/ALWAYS triggers can fire */ + if (trig->tgenabled == TRIGGER_FIRES_ON_REPLICA || + trig->tgenabled == TRIGGER_FIRES_ALWAYS) + return true; + } + + return false; +} + +/* + * Check whether the table has any referenced-side foreign key triggers (ON + * DELETE or ON UPDATE) enabled in replica mode that could cause a foreign key + * violation error. + */ +static bool +refkey_trigger_enabled_in_replica(Relation rel, Oid conoid) +{ + TriggerDesc *trigdesc = rel->trigdesc; + + if (trigdesc == NULL) + return false; + + for (int i = 0; i < trigdesc->numtriggers; i++) + { + Trigger *trig = &trigdesc->triggers[i]; + + /* Keep only PK-side action triggers for this FK constraint */ + if (trig->tgconstraint != conoid || + (trig->tgfoid != F_RI_FKEY_NOACTION_DEL && + trig->tgfoid != F_RI_FKEY_NOACTION_UPD && + trig->tgfoid != F_RI_FKEY_RESTRICT_DEL && + trig->tgfoid != F_RI_FKEY_RESTRICT_UPD)) + continue; + + /* In replica mode, only REPLICA/ALWAYS triggers can fire. */ + if (trig->tgenabled == TRIGGER_FIRES_ON_REPLICA || + trig->tgenabled == TRIGGER_FIRES_ALWAYS) + return true; + } + + return false; +} + +/* + * Build a list of foreign key remote columns in the order of the referenced + * table's remote columns. + */ +static void +build_fk_remote_attnums(LogicalRepRelMapEntry *fkentry, + LogicalRepRelMapEntry *refentry, + int nkeys, + const AttrNumber *fk_conkey, + const AttrNumber *ref_confkey, + List **fkattnums, + List **fkattnums_old, + List **refattnums, + List **refattnums_old) +{ + int pkatt = -1; + List *fkcols = NIL; + List *fkcols_old = NIL; + List *refcols = NIL; + List *refcols_old = NIL; + + /* + * Traverse the referenced table's remote replica identity columns in order, + * and for each, find the corresponding foreign key column that matches it. + */ + while ((pkatt = bms_next_member(refentry->remoterel.attkeys, pkatt)) >= 0) + { + for (int i = 0; i < nkeys; i++) + { + AttrNumber fk_local_attnum = fk_conkey[i]; + AttrNumber ref_local_attnum = ref_confkey[i]; + int fk_remote_attnum; + int ref_remote_attnum; + + fk_remote_attnum = fkentry->attrmap->attnums[AttrNumberGetAttrOffset(fk_local_attnum)]; + ref_remote_attnum = refentry->attrmap->attnums[AttrNumberGetAttrOffset(ref_local_attnum)]; + + /* Skip if not the current traversed column */ + if (ref_remote_attnum != pkatt) + continue; + + /* Skip columns that are unavailable in the remote table */ + if (ref_remote_attnum < 0 || fk_remote_attnum < 0) + continue; + + fkcols = lappend_int(fkcols, fk_remote_attnum + 1); + refcols = lappend_int(refcols, ref_remote_attnum + 1); + + /* Old tuple contains only replica identity columns. */ + if (bms_is_member(fk_remote_attnum, fkentry->remoterel.attkeys) && + bms_is_member(ref_remote_attnum, refentry->remoterel.attkeys)) + { + fkcols_old = lappend_int(fkcols_old, fk_remote_attnum + 1); + refcols_old = lappend_int(refcols_old, ref_remote_attnum + 1); + } + + break; + } + } + + *fkattnums = fkcols; + *fkattnums_old = fkcols_old; + *refattnums = refcols; + *refattnums_old = refcols_old; +} + +/* + * Collect referenced-side key projections for dependency tracking + * + * Helper for collect_fkeys_for_dependency_tracking(). See that function for + * detailed comments. + */ +static void +collect_refkeys_for_dependency_tracking(LogicalRepRelMapEntry *entry) +{ + Relation fkeyRel; + SysScanDesc fkeyScan; + HeapTuple tuple; + Oid relid = RelationGetRelid(entry->localrel); + + Assert(OidIsValid(relid)); + + fkeyRel = table_open(ConstraintRelationId, AccessShareLock); + + fkeyScan = systable_beginscan(fkeyRel, InvalidOid, false, + NULL, 0, NULL); + + while (HeapTupleIsValid(tuple = systable_getnext(fkeyScan))) + { + Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple); + LogicalRepRelMapEntry *fkentry; + LogicalRepSubscriberRefFK *refinfo; + List *fkattnums; + List *fkattnums_old; + List *refattnums; + List *refattnums_old; + AttrNumber conkey[INDEX_MAX_KEYS] = {0}; + AttrNumber confkey[INDEX_MAX_KEYS] = {0}; + int numfks; + MemoryContext oldctx; + + /* Not a foreign key */ + if (con->contype != CONSTRAINT_FOREIGN) + continue; + + /* Not referencing the given table */ + if (con->confrelid != relid) + continue; + + /* Skip if FK enforcement is disabled */ + if (!con->conenforced) + continue; + + /* Always-deferred FK does not need dependency tracking. */ + if (foreign_key_is_always_deferred(con->oid)) + continue; + + /* + * Skip when no replica-mode ON DELETE/ON UPDATE violation trigger can + * fire for this FK on the referenced table. + */ + if (!refkey_trigger_enabled_in_replica(entry->localrel, con->oid)) + continue; + + fkentry = logicalrep_get_relentry_by_local_oid(con->conrelid); + + /* + * Skip if the referencing table is not published or has not replicated + * any changes. + */ + if (!fkentry || !fkentry->attrmap) + continue; + + DeconstructFkConstraintRow(tuple, &numfks, conkey, confkey, + NULL, NULL, NULL, NULL, NULL); + + build_fk_remote_attnums(fkentry, entry, numfks, conkey, confkey, + &fkattnums, &fkattnums_old, + &refattnums, &refattnums_old); + + list_free(fkattnums); + list_free(fkattnums_old); + + oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext); + refinfo = palloc_object(LogicalRepSubscriberRefFK); + refinfo->conoid = con->oid; + refinfo->fk_remoteid = fkentry->remoterel.remoteid; + refinfo->refattnums = list_copy(refattnums); + refinfo->refattnums_old = list_copy(refattnums_old); + entry->local_referenced_fkeys = lappend(entry->local_referenced_fkeys, + refinfo); + MemoryContextSwitchTo(oldctx); + + list_free(refattnums); + list_free(refattnums_old); + } + + systable_endscan(fkeyScan); + + table_close(fkeyRel, AccessShareLock); +} + +/* + * Collect foreign keys for dependency tracking. + * + * This function collects both foreign keys in the referencing table and the + * primary key in the referenced table, as both are needed for dependency + * tracking (see applyparallelworker.c for details). + * + * For a primary key, we directly record the bitmap of remote columns that the + * foreign key references. + * + * For a foreign key in the referencing table, we cannot use the columns as-is + * because their order on the remote side may differ from the local referencing + * and referenced table. To ensure consistency, we store a list of column + * numbers in the order of the referenced table's remote columns. The dependency + * tracking function will traverse this list to build the hash key. + * + * For one foreign key constraint on the table, We collect two set of column + * numbers for both the referencing and referenced tables: one for the new tuple + * of an INSERT or UPDATE, and the other for the old tuple of an UPDATE or + * DELETE. The former includes all remote columns that the foreign key + * references, while the latter includes only those that are part of the replica + * identity key. This is because the old tuple of an UPDATE or DELETE contains + * only replica identity key columns, and any other columns would be missing and + * thus unavailable for dependency tracking. + * + * If there are multiple foreign keys referencing other tables or if the primary + * key is referenced by multiple foreign keys, we will have multiple sets of + * column numbers. + * + * When recording or checking a foreign key dependency, we fill columns in the + * referencing table that are outside the replica identity as NULL, and during + * comparison, we treat NULL as equal to any value. This is safe because the + * referenced key is a primary key (no NULLs allowed), and NULL values in the + * referencing key never participate in foreign key constraint checks. + * Therefore, we will not encounter genuine NULL values in the remote columns. + */ +static void +collect_fkeys_for_dependency_tracking(LogicalRepRelMapEntry *entry) +{ + List *fkeys; + MemoryContext oldctx; + + if (entry->local_fkeys != NIL || entry->local_referenced_fkeys != NIL) + free_local_fkeys(entry); + + fkeys = copyObject(RelationGetFKeyList(entry->localrel)); + + /* Collect foreign keys where this table is the referencing side */ + foreach_ptr(ForeignKeyCacheInfo, fk, fkeys) + { + LogicalRepRelMapEntry *refentry; + List *fkattnums; + List *fkattnums_old; + List *refattnums; + List *refattnums_old; + LogicalRepSubscriberFK *fkinfo; + + /* Skip if FK enforcement is disabled */ + if (!fk->conenforced) + continue; + + /* + * Skip if this FK's check trigger is disabled in replica mode. + */ + if (!fkey_trigger_enabled_in_replica(entry->localrel, fk->conoid)) + continue; + + /* + * Deferrable foreign keys do not need tracking, as they won't cause + * constraint violations as long as commit order is preserved. + */ + if (foreign_key_is_always_deferred(fk->conoid)) + continue; + + refentry = logicalrep_get_relentry_by_local_oid(fk->confrelid); + + /* + * Skip if the referenced table is not published or has not replicated + * any changes. + */ + if (!refentry || !refentry->attrmap) + continue; + + build_fk_remote_attnums(entry, refentry, fk->nkeys, fk->conkey, + fk->confkey, &fkattnums, &fkattnums_old, + &refattnums, &refattnums_old); + + list_free(refattnums); + list_free(refattnums_old); + + oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext); + fkinfo = palloc_object(LogicalRepSubscriberFK); + fkinfo->conoid = fk->conoid; + fkinfo->ref_remoteid = refentry->remoterel.remoteid; + fkinfo->fkattnums = list_copy(fkattnums); + fkinfo->fkattnums_old = list_copy(fkattnums_old); + entry->local_fkeys = lappend(entry->local_fkeys, fkinfo); + MemoryContextSwitchTo(oldctx); + + list_free(fkattnums); + list_free(fkattnums_old); + } + + list_free_deep(fkeys); + + /* Collect keys where this table is the referenced side */ + collect_refkeys_for_dependency_tracking(entry); + + entry->local_fkeys_collected = true; +} + /* * Check all local triggers for the relation to see the parallelizability. * @@ -636,6 +1127,7 @@ logicalrep_rel_load(LogicalRepRelMapEntry *entry, LogicalRepRelId remoteid, /* Table was renamed or dropped. */ entry->localrelvalid = false; entry->local_unique_indexes_collected = false; + entry->local_fkeys_collected = false; } else if (!entry->localrelvalid) { @@ -755,6 +1247,9 @@ logicalrep_rel_load(LogicalRepRelMapEntry *entry, LogicalRepRelId remoteid, entry->localrelvalid = true; } + if (am_leader_apply_worker() && !entry->local_fkeys_collected) + collect_fkeys_for_dependency_tracking(entry); + if (entry->state != SUBREL_STATE_READY) entry->state = GetSubscriptionRelState(MySubscription->oid, entry->localreloid, @@ -1058,6 +1553,7 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root, * collect_indexes_for_dependency_tracking() for details.) */ entry->local_unique_indexes_collected = true; + entry->local_fkeys_collected = true; entry->localrelvalid = true; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 17eaef28039..0f8f49409b9 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -584,7 +584,9 @@ static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL}; typedef enum LogicalRepKeyKind { LOGICALREP_KEY_REPLICA_IDENTITY, - LOGICALREP_KEY_LOCAL_UNIQUE + LOGICALREP_KEY_LOCAL_UNIQUE, + LOGICALREP_KEY_FOREIGN_KEY, + LOGICALREP_KEY_REFERENCED_KEY } LogicalRepKeyKind; /* Hash table key for replica_identity_table */ @@ -780,9 +782,6 @@ hash_replica_identity_compare(ReplicaIdentityKey *a, ReplicaIdentityKey *b) if (a->data->colstatus[i] != b->data->colstatus[i]) return false; - if (a->data->colstatus[i] == LOGICALREP_COLUMN_NULL) - continue; - if (a->data->colvalues[i].len != b->data->colvalues[i].len) return false; @@ -950,6 +949,24 @@ append_xid_dependency(TransactionId xid, List **depends_on_xids) *depends_on_xids = lappend_xid(*depends_on_xids, xid); } +static char * +get_dependency_type_str(LogicalRepKeyKind kind) +{ + switch (kind) + { + case LOGICALREP_KEY_REPLICA_IDENTITY: + return "replica identity"; + case LOGICALREP_KEY_LOCAL_UNIQUE: + return "local unique key"; + case LOGICALREP_KEY_FOREIGN_KEY: + return "foreign key"; + case LOGICALREP_KEY_REFERENCED_KEY: + return "referenced key"; + } + + return "???"; +} + /* * Common function for checking dependency by using the key. Used by both * check_and_record_ri_dependency and check_and_record_local_key_dependency. @@ -982,9 +999,8 @@ check_and_record_key_dependency(ReplicaIdentityKey *key, if (rientry && has_active_key_dependency(rientry, true)) { elog(DEBUG1, - key->kind == LOGICALREP_KEY_REPLICA_IDENTITY ? - "found conflicting replica identity change on table %u from %u" : - "found conflicting local unique key change on table %u from %u", + "found conflicting %s change on table %u from %u", + get_dependency_type_str(key->kind), key->relid, rientry->remote_xid); existing_xid = rientry->remote_xid; @@ -1007,9 +1023,8 @@ check_and_record_key_dependency(ReplicaIdentityKey *key, if (has_active_key_dependency(rientry, false)) { elog(DEBUG1, - key->kind == LOGICALREP_KEY_REPLICA_IDENTITY ? - "found conflicting replica identity change on table %u from %u" : - "found conflicting local unique key change on table %u from %u", + "found conflicting %s change on table %u from %u", + get_dependency_type_str(key->kind), key->relid, rientry->remote_xid); existing_xid = rientry->remote_xid; @@ -1039,6 +1054,23 @@ has_null_key_values(LogicalRepTupleData *data, Bitmapset *indexkeys) return false; } +/* + * Check if any of the attnums have NULL values. + */ +static bool +fk_tuple_has_null(LogicalRepTupleData *data, List *attnums) +{ + foreach_int(attnum, attnums) + { + int attidx = AttrNumberGetAttrOffset(attnum); + + if (data->colstatus[attidx] == LOGICALREP_COLUMN_NULL) + return true; + } + + return false; +} + /* * Build a hash key for replica_identity_table using the given relation and * tuple data, restricted to the specified key columns. @@ -1104,6 +1136,72 @@ build_replica_identity_key(Oid relid, LogicalRepTupleData *original_data, return key; } +/* + * Build a hash key from explicitly ordered columns. + */ +static ReplicaIdentityKey * +build_replica_identity_key_by_attnums(Oid relid, + LogicalRepTupleData *original_data, + List *attnums) +{ + LogicalRepTupleData *keydata; + ReplicaIdentityKey *key; + MemoryContext oldctx; + int i_key = 0; + int nattnums = list_length(attnums); + + oldctx = MemoryContextSwitchTo(ApplyContext); + + keydata = palloc0_object(LogicalRepTupleData); + + if (nattnums) + { + keydata->colvalues = palloc0_array(StringInfoData, nattnums); + keydata->colstatus = palloc0_array(char, nattnums); + } + + keydata->ncols = nattnums; + + foreach_int(attnum, attnums) + { + int attidx = AttrNumberGetAttrOffset(attnum); + + /* + * For columns not exists in remote tuple, or outside of remote replica + * identity in which case the remote column is marked as NULL, fill it + * with NULL. + */ + if (attidx < 0 || + original_data->colstatus[attidx] == LOGICALREP_COLUMN_NULL) + { + keydata->colstatus[i_key] = LOGICALREP_COLUMN_NULL; + } + else + { + StringInfo original_colvalue; + + Assert(original_data->colstatus[attidx] != LOGICALREP_COLUMN_UNCHANGED || + original_data->colvalues[attidx].len > 0); + + original_colvalue = &original_data->colvalues[attidx]; + + initStringInfoExt(&keydata->colvalues[i_key], original_colvalue->len + 1); + appendStringInfoString(&keydata->colvalues[i_key], original_colvalue->data); + keydata->colstatus[i_key] = original_data->colstatus[attidx]; + } + + i_key++; + } + + key = palloc0_object(ReplicaIdentityKey); + key->relid = relid; + key->data = keydata; + + MemoryContextSwitchTo(oldctx); + + return key; +} + /* * Mostly same as check_and_record_ri_dependency() but for local unique indexes. * @@ -1199,6 +1297,152 @@ check_and_record_local_key_dependency(Oid relid, } } +/* + * Check and record dependencies caused by foreign key constraints. + * + * A new value in the referencing table depends on the referenced table's data. + * Conversely, a deletion in the referenced table depends on whether the + * referencing table still contains matching rows. + * + * See applyparallelworker.c for details on why tracking these dependencies is + * necessary. + */ +static void +check_and_record_fkey_dependency(Oid relid, + LogicalRepTupleData *original_data, + bool old_tuple, + TransactionId new_depended_xid, + List **depends_on_xids) +{ + LogicalRepRelMapEntry *relentry; + ReplicaIdentityKey *rikey; + + Assert(depends_on_xids); + + relentry = logicalrep_get_relentry(relid); + + Assert(relentry); + + /* + * Gather foreign key information if not yet. We require to be in a + * transaction state because system catalogs are read. + */ + if (!relentry->local_fkeys_collected) + { + bool needs_start = !IsTransactionOrTransactionBlock(); + + if (needs_start) + StartTransactionCommand(); + + logicalrep_rel_load(NULL, relid, AccessShareLock); + + if (needs_start) + CommitTransactionCommand(); + + Assert(relentry->local_fkeys_collected); + } + + foreach_ptr(LogicalRepSubscriberFK, fkinfo, relentry->local_fkeys) + { + List *keyattnums = old_tuple ? + fkinfo->fkattnums_old : fkinfo->fkattnums; + + /* NULL value won't violate foreign key violation */ + if (fk_tuple_has_null(original_data, keyattnums)) + continue; + + /* + * Old tuples of foreign keys do not conflict with any preceding + * transaction (see the comments in applyparallelworker.c for details on + * conflicting cases). When we don't need to record a new dependency, we + * can skip processing this index entirely. + */ + if (old_tuple && !TransactionIdIsValid(new_depended_xid)) + continue; + + rikey = build_replica_identity_key_by_attnums(fkinfo->ref_remoteid, + original_data, + keyattnums); + + /* + * For old tuples, record a dependency that later transactions deleting + * the same key from the referenced table must wait on. No preceding + * transactions are added to the list. + * + * For new tuples, check for existing dependencies on the same key + * inserted into the referenced table and add any dependent transactions + * to the list. + */ + if (old_tuple) + { + rikey->kind = LOGICALREP_KEY_FOREIGN_KEY; + (void) check_and_record_key_dependency(rikey, new_depended_xid); + } + else + { + TransactionId dep_xid; + + rikey->kind = LOGICALREP_KEY_REFERENCED_KEY; + dep_xid = check_and_record_key_dependency(rikey, InvalidTransactionId); + + if (TransactionIdIsValid(dep_xid) && + !TransactionIdEquals(dep_xid, new_depended_xid)) + append_xid_dependency(dep_xid, depends_on_xids); + } + } + + /* Return if no referenced key exists */ + if (relentry->local_referenced_fkeys == NIL) + return; + + foreach_ptr(LogicalRepSubscriberRefFK, refinfo, relentry->local_referenced_fkeys) + { + List *keyattnums = old_tuple ? + refinfo->refattnums_old : refinfo->refattnums; + + /* Shouldn't be real NULL value in referenced key (primary key). */ + Assert(old_tuple || !fk_tuple_has_null(original_data, keyattnums)); + + /* + * New tuples of referenced keys do not conflict with any preceding + * transaction (see the comments in applyparallelworker.c for details on + * conflicting cases). When we don't need to record a new dependency, we + * can skip processing this key entirely. + */ + if (!old_tuple && !TransactionIdIsValid(new_depended_xid)) + continue; + + rikey = build_replica_identity_key_by_attnums(relid, original_data, + keyattnums); + + /* + * For old tuples, check for existing dependencies on the same key + * deleted from the referencing table and add any dependent transactions + * to the list. + * + * For new tuples, record a dependency that later transactions inserting + * the same key into the referencing table must wait on. No preceding + * transactions are added to the list. + */ + if (old_tuple) + { + TransactionId dep_xid; + + rikey->kind = LOGICALREP_KEY_FOREIGN_KEY; + dep_xid = check_and_record_key_dependency(rikey, InvalidTransactionId); + + if (TransactionIdIsValid(dep_xid) && + !TransactionIdEquals(dep_xid, new_depended_xid)) + append_xid_dependency(dep_xid, depends_on_xids); + } + else + { + rikey->kind = LOGICALREP_KEY_REFERENCED_KEY; + (void) check_and_record_key_dependency(rikey, new_depended_xid); + } + } +} + /* * Check for dependencies on preceding transactions that modify the same key. * Returns the dependent transactions in 'depends_on_xids'. @@ -1464,6 +1708,9 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, check_and_record_local_key_dependency(relid, &newtup, false, new_depended_xid, &depends_on_xids); + check_and_record_fkey_dependency(relid, &newtup, false, + new_depended_xid, + &depends_on_xids); check_dependency_for_parallel_safety(relid, new_depended_xid, &depends_on_xids); break; @@ -1481,6 +1728,10 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, new_depended_xid, &depends_on_xids); + check_and_record_fkey_dependency(relid, &oldtup, true, + new_depended_xid, + &depends_on_xids); + check_dependency_for_parallel_safety(relid, new_depended_xid, &depends_on_xids); @@ -1505,6 +1756,9 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, check_and_record_local_key_dependency(relid, &newtup, false, new_depended_xid, &depends_on_xids); + check_and_record_fkey_dependency(relid, &newtup, false, + new_depended_xid, + &depends_on_xids); check_dependency_for_parallel_safety(relid, new_depended_xid, &depends_on_xids); break; @@ -1516,6 +1770,9 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, check_and_record_local_key_dependency(relid, &oldtup, true, new_depended_xid, &depends_on_xids); + check_and_record_fkey_dependency(relid, &oldtup, true, + new_depended_xid, + &depends_on_xids); check_dependency_for_parallel_safety(relid, new_depended_xid, &depends_on_xids); break; @@ -1552,6 +1809,29 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, */ check_and_record_rel_dependency(rel->remoteid, new_depended_xid, &depends_on_xids); + + remote_relids = logicalrep_get_fk_related_relids(rel); + + /* + * Wait for all pending changes on tables that reference or are + * referenced by the current table. Invalidate their cached foreign + * key information, as it may be outdated due to schema changes + * (e.g., replica identity) on the current table. + */ + foreach_oid(related_relid, remote_relids) + { + LogicalRepRelMapEntry *fk_related_entry; + + check_and_record_rel_dependency(related_relid, + new_depended_xid, + &depends_on_xids); + + fk_related_entry = logicalrep_get_relentry(related_relid); + + if (fk_related_entry) + fk_related_entry->local_fkeys_collected = false; + } + break; case LOGICAL_REP_MSG_TYPE: diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h index fe38862b300..abe15865779 100644 --- a/src/include/replication/logicalrelation.h +++ b/src/include/replication/logicalrelation.h @@ -50,6 +50,11 @@ typedef struct LogicalRepRelMapEntry List *local_unique_indexes; bool local_unique_indexes_collected; + /* Local foreign keys. Used for dependency tracking */ + List *local_fkeys; + List *local_referenced_fkeys; + bool local_fkeys_collected; + /* * Whether the relation can be applied in parallel or not. It is * distinglish whether defined triggers are the immutable or not. @@ -73,6 +78,22 @@ typedef struct LogicalRepSubscriberIdx bool nulls_distinct; /* Whether NULLs are considered distinct */ } LogicalRepSubscriberIdx; +typedef struct LogicalRepSubscriberFK +{ + Oid conoid; /* OID of the FK constraint */ + LogicalRepRelId ref_remoteid; /* referenced remote relation */ + List *fkattnums; /* FK remote attnums ordered by referenced key */ + List *fkattnums_old; /* old-tuple-safe FK remote attnums */ +} LogicalRepSubscriberFK; + +typedef struct LogicalRepSubscriberRefFK +{ + Oid conoid; /* OID of the FK constraint */ + LogicalRepRelId fk_remoteid; /* referencing remote relation */ + List *refattnums; /* referenced remote attnums */ + List *refattnums_old; /* old-tuple-safe referenced remote attnums */ +} LogicalRepSubscriberRefFK; + extern void logicalrep_relmap_update(LogicalRepRelation *remoterel); extern void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel); @@ -89,6 +110,7 @@ extern Oid GetRelationIdentityOrPK(Relation rel); extern int logicalrep_get_num_rels(void); extern void logicalrep_write_all_rels(StringInfo out); extern LogicalRepRelMapEntry *logicalrep_get_relentry(LogicalRepRelId remoteid); +extern List *logicalrep_get_fk_related_relids(LogicalRepRelation *remoteid); #define LOGICALREP_PARALLEL_SAFE 's' #define LOGICALREP_PARALLEL_RESTRICTED 'r' diff --git a/src/test/subscription/t/050_parallel_apply.pl b/src/test/subscription/t/050_parallel_apply.pl index 6b0d5c7e383..81550c71ba0 100644 --- a/src/test/subscription/t/050_parallel_apply.pl +++ b/src/test/subscription/t/050_parallel_apply.pl @@ -33,6 +33,9 @@ $node_publisher->safe_psql( CREATE UNIQUE INDEX tab_toast_ri_index on tab_toast (a, b); ALTER TABLE tab_toast REPLICA IDENTITY USING INDEX tab_toast_ri_index; INSERT INTO tab_toast(a, b) VALUES(repeat('1234567890', 200), '1234567890'); + + CREATE TABLE regress_tab_pk (id int PRIMARY KEY); + CREATE TABLE regress_tab_fk (id int PRIMARY KEY, fk int REFERENCES regress_tab_pk (id)); )); $node_publisher->safe_psql('postgres', "INSERT INTO regress_tab VALUES (generate_series(1, 10), 'test');"); @@ -71,6 +74,9 @@ $node_subscriber->safe_psql( ALTER TABLE tab_toast ALTER COLUMN a SET STORAGE EXTERNAL; CREATE UNIQUE INDEX tab_toast_ri_index on tab_toast (a, b); ALTER TABLE tab_toast REPLICA IDENTITY USING INDEX tab_toast_ri_index; + + CREATE TABLE regress_tab_pk (id int PRIMARY KEY); + CREATE TABLE regress_tab_fk (id int PRIMARY KEY, fk int REFERENCES regress_tab_pk (id)); )); $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION regress_sub CONNECTION '$publisher_connstr' PUBLICATION regress_pub;"); @@ -527,4 +533,162 @@ $node_subscriber->safe_psql('postgres', "DROP INDEX local_unique_idx_expr;"); $node_publisher->safe_psql('postgres', "TRUNCATE TABLE regress_tab;"); $node_publisher->wait_for_catchup('regress_sub'); +################################################## +# Test that the dependency tracking works correctly for foreign keys on +# subscriber during parallel apply. +################################################## + +# Test that when receiving table schema information for a referencing table, the +# subscriber correctly checks for dependencies on the referenced table and waits +# for its changes to complete. +$node_subscriber->safe_psql('postgres', + "SELECT injection_points_attach('parallel-worker-before-commit','wait');" +); + +# Enable foreign triggers on subscriber +my $pk_fk_trigger = $node_subscriber->safe_psql('postgres', qq[ + SELECT tgname + FROM pg_trigger + WHERE tgrelid = 'regress_tab_pk'::regclass + AND tgconstraint > 0 + LIMIT 1 +]); +chomp($pk_fk_trigger); + +my $fk_fk_trigger = $node_subscriber->safe_psql('postgres', qq[ + SELECT tgname + FROM pg_trigger + WHERE tgrelid = 'regress_tab_fk'::regclass + AND tgconstraint > 0 + LIMIT 1 +]); +chomp($fk_fk_trigger); + +$node_subscriber->safe_psql('postgres', + qq[ALTER TABLE regress_tab_pk ENABLE REPLICA TRIGGER "$pk_fk_trigger"; + ALTER TABLE regress_tab_fk ENABLE REPLICA TRIGGER "$fk_fk_trigger";]); + +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab_pk VALUES (2);"); + +$node_subscriber->wait_for_event('logical replication parallel worker', + 'parallel-worker-before-commit'); + +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab_fk VALUES (1, 2);"); + +# Verify the dependency is detected on referenced table schema information +$str = $node_subscriber->wait_for_log(qr/found conflicting change on table [1-9][0-9]+ from ([1-9][0-9]+)/, $offset); +$xid = $str =~ /found conflicting change on table [1-9][0-9]+ from ([1-9][0-9]+)/; + +# Verify the parallel worker waits for the same transaction +$node_subscriber->wait_for_log(qr/wait for depended xid $xid/, $offset); + +ok(1, "referenced table dependency detected for parallel apply"); + +# Wakeup the parallel worker +$node_subscriber->safe_psql('postgres', qq[ + SELECT injection_points_detach('parallel-worker-before-commit'); + SELECT injection_points_wakeup('parallel-worker-before-commit'); +]); + +# Verify the streamed transaction can be applied +$node_subscriber->wait_for_log(qr/finish waiting for depended xid $xid/, $offset); +$node_publisher->wait_for_catchup('regress_sub'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(1) FROM regress_tab_pk"); +is ($result, 1, 'insert is replicated to referenced table on subscriber'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(1) FROM regress_tab_fk"); +is ($result, 1, 'insert is replicated to referencing table on subscriber'); + +# INSERT - INSERT case: Tx-1 inserts referenced tuple and Tx-2 inserts +# referencing tuple. + +$node_subscriber->safe_psql('postgres', + "SELECT injection_points_attach('parallel-worker-before-commit','wait');" +); + +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab_pk VALUES (3);"); + +$node_subscriber->wait_for_event('logical replication parallel worker', + 'parallel-worker-before-commit'); + +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab_fk VALUES (2, 3);"); + +# Verify the dependency for referenced key change is detected +$str = $node_subscriber->wait_for_log(qr/found conflicting referenced key change on table [1-9][0-9]+ from ([1-9][0-9]+)/, $offset); +$xid = $str =~ /found conflicting referenced key change on table [1-9][0-9]+ from ([1-9][0-9]+)/; + +# Verify the parallel worker waits for the same transaction +$node_subscriber->wait_for_log(qr/wait for depended xid $xid/, $offset); + +ok(1, "referenced key dependency detected for parallel apply"); + +# Wakeup the parallel worker +$node_subscriber->safe_psql('postgres', qq[ + SELECT injection_points_detach('parallel-worker-before-commit'); + SELECT injection_points_wakeup('parallel-worker-before-commit'); +]); + +# Verify the streamed transaction can be applied +$node_subscriber->wait_for_log(qr/finish waiting for depended xid $xid/, $offset); +$node_publisher->wait_for_catchup('regress_sub'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(1) FROM regress_tab_pk"); +is ($result, 2, 'insert is replicated to referenced table on subscriber'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(1) FROM regress_tab_fk"); +is ($result, 2, 'insert is replicated to referencing table on subscriber'); + +# DELETE - DELETE case: Tx-1 deletes referencing tuple and Tx-2 deletes +# referenced tuple. +$node_subscriber->safe_psql('postgres', + "SELECT injection_points_attach('parallel-worker-before-commit','wait');" +); + +$node_publisher->safe_psql('postgres', + "DELETE FROM regress_tab_fk WHERE id = 1;"); + +$node_subscriber->wait_for_event('logical replication parallel worker', + 'parallel-worker-before-commit'); + +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', + "DELETE FROM regress_tab_pk WHERE id = 2;"); + +$str = $node_subscriber->wait_for_log(qr/found conflicting foreign key change on table [1-9][0-9]+ from ([1-9][0-9]+)/, $offset); +$xid = $str =~ /found conflicting foreign key change on table [1-9][0-9]+ from ([1-9][0-9]+)/; + +$node_subscriber->wait_for_log(qr/wait for depended xid $xid/, $offset); + +ok(1, "foreign key dependency detected for parallel apply"); + +$node_subscriber->safe_psql('postgres', qq[ + SELECT injection_points_detach('parallel-worker-before-commit'); + SELECT injection_points_wakeup('parallel-worker-before-commit'); +]); + +$node_subscriber->wait_for_log(qr/finish waiting for depended xid $xid/, $offset); +$node_publisher->wait_for_catchup('regress_sub'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(1) FROM regress_tab_pk"); +is ($result, 1, 'delete is replicated to referenced table on subscriber'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(1) FROM regress_tab_fk"); +is ($result, 1, 'delete is replicated to referencing table on subscriber'); + done_testing(); -- 2.43.0