From c05f0b4a0b18feb4d71a01690572fedcbd4f1406 Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Thu, 4 Jun 2026 13:29:18 +0800 Subject: [PATCH v19 09/10] Support dependency tracking via foreign keys Logical replication's parallel apply mechanism did not account for foreign-key dependencies. This can cause a failure because the reference column may be committed before the referenced column. This patch extends the parallel apply's dependency tracking to account for foreign keys in the subscriber table. One assumption here is that referenced columns can be registered to the identity hash table. The leader apply worker checks to determine whether values in referencing columns have already been registered or not, and regards that there is a dependency if found. --- src/backend/replication/logical/relation.c | 185 ++++++++++++++++++ src/backend/replication/logical/worker.c | 145 ++++++++++++++ src/include/replication/logicalrelation.h | 11 ++ src/test/subscription/t/050_parallel_apply.pl | 53 ++++- 4 files changed, 391 insertions(+), 3 deletions(-) diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index 6665f5c0cc9..83602dcd4cf 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -21,6 +21,7 @@ #include "access/genam.h" #include "access/table.h" #include "catalog/namespace.h" +#include "catalog/pg_constraint.h" #include "catalog/pg_proc.h" #include "catalog/pg_subscription_rel.h" #include "commands/trigger.h" @@ -143,6 +144,21 @@ free_local_unique_indexes(LogicalRepRelMapEntry *entry) entry->local_unique_indexes = NIL; } +/* + * Release foreign key list + */ +static void +free_local_fkeys(LogicalRepRelMapEntry *entry) +{ + Assert(am_leader_apply_worker()); + + foreach_ptr(LogicalRepSubscriberFK, fkinfo, entry->local_fkeys) + bms_free(fkinfo->conkeys); + + list_free_deep(entry->local_fkeys); + entry->local_fkeys = NIL; +} + /* * Free the entry of a relation map cache. */ @@ -173,6 +189,9 @@ logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry) if (entry->local_unique_indexes != NIL) free_local_unique_indexes(entry); + + if (entry->local_fkeys != NIL) + free_local_fkeys(entry); } /* @@ -499,6 +518,171 @@ collect_local_indexes(LogicalRepRelMapEntry *entry) list_free(idxlist); } +/* + * Search a relmap entry by local relation OID. + */ +static LogicalRepRelMapEntry * +logicalrep_get_relentry_by_local_oid(Oid localreloid) +{ + HASH_SEQ_STATUS status; + LogicalRepRelMapEntry *entry = NULL; + + if (LogicalRepRelMap == NULL) + return NULL; + + /* + * Each entry must be checked individually. Because the key of + * LogicalRepRelMap is the "remote" relid but we only have the local one. + * + * Note: This iteration ignores relations that have not yet been + * registered in LogicalRepRelMap. It is OK because such relations have + * not been modified yet since the subscriber started receiving changes. + */ + hash_seq_init(&status, LogicalRepRelMap); + while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) + { + if (entry->localreloid == localreloid) + { + hash_seq_term(&status); + return entry; + } + } + + return NULL; +} + +/* + * Return true if the FK constraint is always deferred. + */ +static bool +foreign_key_is_always_deferred(Oid conoid) +{ + HeapTuple tup; + Form_pg_constraint con; + bool deferred; + + tup = SearchSysCache1(CONSTROID, ObjectIdGetDatum(conoid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for foreign key %u", conoid); + + con = (Form_pg_constraint) GETSTRUCT(tup); + deferred = con->condeferrable && con->condeferred; + ReleaseSysCache(tup); + + return deferred; +} + +/* + * Collect all local foreign keys that can be used for dependency tracking. + */ +static void +collect_local_fkeys(LogicalRepRelMapEntry *entry) +{ + List *fkeys; + + if (entry->local_fkeys != NIL) + free_local_fkeys(entry); + + entry->local_fkeys_collected = true; + + /* + * Get the list of foreign keys for the relation. + * + * XXX: Apart from RelationGetIndexList(), the returned list is a part of + * relcache and must be copied before doing anything. See comments atop + * RelationGetFKeyList(). + */ + fkeys = copyObject(RelationGetFKeyList(entry->localrel)); + + /* Quick exit if there are no foreign keys */ + if (fkeys == NIL) + return; + + foreach_ptr(ForeignKeyCacheInfo, fk, fkeys) + { + LogicalRepRelMapEntry *refentry; + Bitmapset *fkkeys = NULL; + bool suitable = true; + + /* + * Skip NOT ENFORCED constraints because they won't be checked at any + * times. + */ + if (!fk->conenforced) + continue; + + /* + * Skip if the foreign key constraint is always deferred. The commit + * ordering is always preserved thus the constraint can be checked in + * correct order. + */ + if (foreign_key_is_always_deferred(fk->conoid)) + continue; + + /* Find the referenced relation by the local OID */ + refentry = logicalrep_get_relentry_by_local_oid(fk->confrelid); + + /* + * Skip if the referenced relation is not the target of this + * subscription. + */ + if (!refentry) + continue; + + /* Seek each attributes and add to a Bitmap */ + for (int i = 0; i < fk->nkeys; i++) + { + AttrNumber localcol = fk->conkey[i]; + int remotecol = + entry->attrmap->attnums[AttrNumberGetAttrOffset(localcol)]; + + /* Skip if the column does not exist on publisher node */ + if (remotecol < 0) + { + suitable = false; + break; + } + + /* + * XXX: What if the FK column is specified with different order? + * E.g., there is a table (a, b) and other table refers like + * REFERENCES (b, a). Will it work correctly? + */ + fkkeys = bms_add_member(fkkeys, remotecol); + } + + /* + * One of the columns does not exist on the publisher side, skip such + * a constraint. + */ + if (!suitable) + { + if (fkkeys) + bms_free(fkkeys); + + continue; + } + + if (fkkeys) + { + MemoryContext oldctx; + LogicalRepSubscriberFK *fkinfo; + + oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext); + fkinfo = palloc(sizeof(LogicalRepSubscriberFK)); + fkinfo->conoid = fk->conoid; + fkinfo->ref_remoteid = refentry->remoterel.remoteid; + fkinfo->conkeys = bms_copy(fkkeys); + + bms_free(fkkeys); + entry->local_fkeys = lappend(entry->local_fkeys, fkinfo); + MemoryContextSwitchTo(oldctx); + } + } + + list_free_deep(fkeys); +} + /* * Check all local triggers for the relation to see the parallelizability. * @@ -715,6 +899,7 @@ logicalrep_rel_load(LogicalRepRelMapEntry *entry, LogicalRepRelId remoteid, { entry->parallel_safe = LOGICALREP_PARALLEL_UNKNOWN; collect_local_indexes(entry); + collect_local_fkeys(entry); check_defined_triggers(entry); } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index a212bc6013d..6f8d83c6b7b 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1205,6 +1205,138 @@ check_and_record_local_key_dependency(Oid relid, } } +/* + * Lookup-only dependency check by key. Does not register/update entries. + */ +static void +find_dependencies_by_key(ReplicaIdentityKey *key, + TransactionId new_depended_xid, + List **depends_on_xids) +{ + ReplicaIdentityEntry *rientry; + MemoryContext oldctx; + + Assert(replica_identity_table); + + oldctx = MemoryContextSwitchTo(ApplyContext); + + rientry = replica_identity_lookup(replica_identity_table, key); + free_replica_identity_key(key); + + MemoryContextSwitchTo(oldctx); + + /* + * Return if no entry exists, or if the current transaction was the last one + * to modify the key. + */ + if (!rientry || TransactionIdEquals(rientry->remote_xid, new_depended_xid)) + return; + + /* + * Clean up committed entries while on it. + */ + if (pa_transaction_committed(rientry->remote_xid)) + { + free_replica_identity_key(rientry->keydata); + replica_identity_delete_item(replica_identity_table, rientry); + return; + } + + check_and_append_xid_dependency(&rientry->remote_xid, depends_on_xids); +} + +/* + * Check for dependencies on preceding transactions that modify the referenced + * unique key of foreign keys. + */ +static void +check_and_record_fkey_dependency(Oid relid, + LogicalRepTupleData *original_data, + TransactionId new_depended_xid, + List **depends_on_xids) +{ + LogicalRepRelMapEntry *relentry; + LogicalRepTupleData *ridata; + ReplicaIdentityKey *rikey; + MemoryContext oldctx; + + Assert(depends_on_xids); + + relentry = logicalrep_get_relentry(relid); + + Assert(relentry); + + /* + * We have already checked the local unique indexes and also gathered the + * FK info at that time. + * + * XXX: Should we check and load again? + */ + Assert(relentry->local_fkeys_collected); + + foreach_ptr(LogicalRepSubscriberFK, fkinfo, relentry->local_fkeys) + { + int columns = bms_num_members(fkinfo->conkeys); + bool suitable = true; + + Assert(columns); + + for (int i = 0; i < original_data->ncols; i++) + { + if (!bms_is_member(i, fkinfo->conkeys)) + continue; + + /* Skip if the column is NULL or not changed */ + if (original_data->colstatus[i] == LOGICALREP_COLUMN_NULL || + original_data->colstatus[i] == LOGICALREP_COLUMN_UNCHANGED) + { + suitable = false; + break; + } + } + + if (!suitable) + continue; + + oldctx = MemoryContextSwitchTo(ApplyContext); + + /* Allocate space for replica identity values */ + ridata = palloc0_object(LogicalRepTupleData); + ridata->colvalues = palloc0_array(StringInfoData, columns); + ridata->colstatus = palloc0_array(char, columns); + ridata->ncols = columns; + + for (int i_original = 0, i_key = 0; i_original < original_data->ncols; i_original++) + { + StringInfo original_colvalue; + + if (!bms_is_member(i_original, fkinfo->conkeys)) + continue; + + original_colvalue = &original_data->colvalues[i_original]; + initStringInfoExt(&ridata->colvalues[i_key], original_colvalue->len + 1); + appendStringInfoString(&ridata->colvalues[i_key], original_colvalue->data); + + ridata->colstatus[i_key] = original_data->colstatus[i_original]; + i_key++; + } + + rikey = palloc0_object(ReplicaIdentityKey); + rikey->relid = fkinfo->ref_remoteid; + + /* + * XXX: use LOGICALREP_KEY_LOCAL_UNIQUE to compare with local unique + * indexes. + */ + rikey->kind = LOGICALREP_KEY_LOCAL_UNIQUE; + rikey->data = ridata; + + MemoryContextSwitchTo(oldctx); + + find_dependencies_by_key(rikey, new_depended_xid, depends_on_xids); + } +} + /* * Check for preceding transactions (other than current one 'new_depended_xid') * that involve insert, delete, or update operations on the specified table, and @@ -1402,6 +1534,9 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, check_and_record_local_key_dependency(relid, &newtup, new_depended_xid, &depends_on_xids); + check_and_record_fkey_dependency(relid, &newtup, + new_depended_xid, + &depends_on_xids); check_dependency_for_parallel_safety(relid, new_depended_xid, &depends_on_xids); break; @@ -1419,6 +1554,10 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, new_depended_xid, &depends_on_xids); + check_and_record_fkey_dependency(relid, &oldtup, + new_depended_xid, + &depends_on_xids); + check_dependency_for_parallel_safety(relid, new_depended_xid, &depends_on_xids); @@ -1437,6 +1576,9 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, check_and_record_local_key_dependency(relid, &newtup, new_depended_xid, &depends_on_xids); + check_and_record_fkey_dependency(relid, &newtup, + new_depended_xid, + &depends_on_xids); check_dependency_for_parallel_safety(relid, new_depended_xid, &depends_on_xids); break; @@ -1448,6 +1590,9 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, check_and_record_local_key_dependency(relid, &oldtup, new_depended_xid, &depends_on_xids); + check_and_record_fkey_dependency(relid, &oldtup, + new_depended_xid, + &depends_on_xids); check_dependency_for_parallel_safety(relid, new_depended_xid, &depends_on_xids); break; diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h index 241c4dda13c..ff25cd3ce4f 100644 --- a/src/include/replication/logicalrelation.h +++ b/src/include/replication/logicalrelation.h @@ -22,6 +22,13 @@ typedef struct LogicalRepSubscriberIdx Bitmapset *indexkeys; /* Bitmap of key columns *on remote* */ } LogicalRepSubscriberIdx; +typedef struct LogicalRepSubscriberFK +{ + Oid conoid; /* OID of the FK constraint */ + LogicalRepRelId ref_remoteid; /* referenced remote relid */ + Bitmapset *conkeys; /* Bitmap of key columns *on remote* */ +} LogicalRepSubscriberFK; + typedef struct LogicalRepRelMapEntry { LogicalRepRelation remoterel; /* key is remoterel.remoteid */ @@ -56,6 +63,10 @@ typedef struct LogicalRepRelMapEntry List *local_unique_indexes; bool local_unique_indexes_collected; + /* Local foreign keys. Used for dependency tracking */ + List *local_fkeys; + bool local_fkeys_collected; + /* * Whether the relation can be applied in parallel or not. It is * distinglish whether defined triggers are the immutable or not. diff --git a/src/test/subscription/t/050_parallel_apply.pl b/src/test/subscription/t/050_parallel_apply.pl index ae539f84639..896244a42fd 100644 --- a/src/test/subscription/t/050_parallel_apply.pl +++ b/src/test/subscription/t/050_parallel_apply.pl @@ -33,6 +33,8 @@ $node_publisher->safe_psql( CREATE UNIQUE INDEX tab_toast_ri_index on tab_toast (a, b); ALTER TABLE tab_toast REPLICA IDENTITY USING INDEX tab_toast_ri_index; INSERT INTO tab_toast(a, b) VALUES(repeat('1234567890', 200), '1234567890'); + + CREATE TABLE regress_tab_fk (id int PRIMARY KEY, fk int); )); $node_publisher->safe_psql('postgres', "INSERT INTO regress_tab VALUES (generate_series(1, 10), 'test');"); @@ -71,6 +73,8 @@ $node_subscriber->safe_psql( ALTER TABLE tab_toast ALTER COLUMN a SET STORAGE EXTERNAL; CREATE UNIQUE INDEX tab_toast_ri_index on tab_toast (a, b); ALTER TABLE tab_toast REPLICA IDENTITY USING INDEX tab_toast_ri_index; + + CREATE TABLE regress_tab_fk (id int PRIMARY KEY, fk int REFERENCES regress_tab (id)); )); $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION regress_sub CONNECTION '$publisher_connstr' PUBLICATION regress_pub;"); @@ -315,8 +319,8 @@ $h->query_safe("COMMIT;"); # subscriber during parallel apply. ################################################## -# Truncate the data for upcoming tests -$node_publisher->safe_psql('postgres', "TRUNCATE TABLE regress_tab;"); +# delete the data for upcoming tests +$node_publisher->safe_psql('postgres', "DELETE FROM regress_tab;"); $node_publisher->wait_for_catchup('regress_sub'); # Define an unique index on subscriber @@ -367,7 +371,50 @@ $node_subscriber->wait_for_log(qr/finish waiting for depended xid $xid/, $offset # Cleanup $node_subscriber->safe_psql('postgres', "DROP INDEX regress_tab_value_idx;"); -$node_publisher->safe_psql('postgres', "TRUNCATE TABLE regress_tab;"); +$node_publisher->safe_psql('postgres', "DELETE FROM regress_tab;"); +$node_publisher->wait_for_catchup('regress_sub'); + +################################################## +# Test that the dependency tracking works correctly for local foreign key on +# subscriber during parallel apply. +################################################## + +# Insert an initial tuple +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab VALUES (1, 'initial tuple');"); $node_publisher->wait_for_catchup('regress_sub'); +# Attach an injection_point. Parallel workers would wait before the commit +$node_subscriber->safe_psql('postgres', + "SELECT injection_points_attach('parallel-worker-before-commit','wait');" +); + +# Insert a tuple on publisher. Parallel worker would wait at the injection +# point +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab VALUES (2, 'tmp');"); + +# Wait until the parallel worker enters the injection point. +$node_subscriber->wait_for_event('logical replication parallel worker', + 'parallel-worker-before-commit'); + +$offset = -s $node_subscriber->logfile; + +# Insert tuples on publisher again. This transaction waits for referring tuple +# is committed. +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab_fk VALUES (1, 1);"); + +# Verify the parallel worker waits for the transaction +$str = $node_subscriber->wait_for_log(qr/wait for depended xid ([1-9][0-9]+)/, $offset); +$xid = $str =~ /wait for depended xid ([1-9][0-9]+)/; + +# Wakeup the parallel worker +$node_subscriber->safe_psql('postgres', qq[ + SELECT injection_points_detach('parallel-worker-before-commit'); + SELECT injection_points_wakeup('parallel-worker-before-commit'); +]); + +$node_subscriber->wait_for_log(qr/finish waiting for depended xid $xid/, $offset); + done_testing(); -- 2.43.0