From 2cb151a82e8c33fe8d9ff2c8d83208a90df85c9f Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Thu, 4 Jun 2026 13:23:55 +0800 Subject: [PATCH v19 08/10] Support dependency tracking via local unique indexes Currently, logical replication's parallel apply mechanism tracks dependencies primarily based on the REPLICA IDENTITY defined on the publisher table. However, local subscriber tables might have additional unique indexes that could effectively serve as dependency keys, even if they don't correspond to the publisher's REPLICA IDENTITY. Failing to track these additional unique keys can lead to incorrect data and/or deadlocks during parallel application. This patch extends the parallel apply's dependency tracking to consider local unique indexes on the subscriber table. This is achieved by extending the existing Replica Identity hash table to also store dependency information based on these local unique indexes. The LogicalRepRelMapEntry structure is extended to store details about these local unique indexes. This information is collected and cached when dependency checking is first performed for a remote transaction on a given relation. This collection process requires to be in a transaction to access system catalog information. --- src/backend/replication/logical/relation.c | 161 ++++++++++- src/backend/replication/logical/worker.c | 252 ++++++++++++++---- src/backend/storage/lmgr/deadlock.c | 1 - src/include/replication/logicalrelation.h | 14 + src/test/subscription/t/050_parallel_apply.pl | 60 +++++ src/tools/pgindent/typedefs.list | 2 + 6 files changed, 439 insertions(+), 51 deletions(-) diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index eeb85f8cc5d..6665f5c0cc9 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -128,6 +128,21 @@ logicalrep_relmap_init(void) (Datum) 0); } +/* + * Release local index list + */ +static void +free_local_unique_indexes(LogicalRepRelMapEntry *entry) +{ + Assert(am_leader_apply_worker()); + + foreach_ptr(LogicalRepSubscriberIdx, idxinfo, entry->local_unique_indexes) + bms_free(idxinfo->indexkeys); + + list_free_deep(entry->local_unique_indexes); + entry->local_unique_indexes = NIL; +} + /* * Free the entry of a relation map cache. */ @@ -155,6 +170,9 @@ logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry) if (entry->attrmap) free_attrmap(entry->attrmap); + + if (entry->local_unique_indexes != NIL) + free_local_unique_indexes(entry); } /* @@ -361,6 +379,126 @@ logicalrep_rel_mark_updatable(LogicalRepRelMapEntry *entry) } } +/* + * Collect all local unique indexes that can be used for dependency tracking. + */ +static void +collect_local_indexes(LogicalRepRelMapEntry *entry) +{ + List *idxlist; + + if (entry->local_unique_indexes != NIL) + free_local_unique_indexes(entry); + + entry->local_unique_indexes_collected = true; + + idxlist = RelationGetIndexList(entry->localrel); + + /* Quick exit if there are no indexes */ + if (idxlist == NIL) + return; + + /* Iterate indexes to list all usable indexes */ + foreach_oid(idxoid, idxlist) + { + Relation idxrel; + int indnkeys; + AttrMap *attrmap; + Bitmapset *indexkeys = NULL; + bool suitable = true; + + idxrel = index_open(idxoid, AccessShareLock); + + /* + * Check whether the index can be used for the dependency tracking. + * + * For simplification, the same condition as REPLICA IDENTITY FULL, + * plus it must be a unique index. + */ + if (!(idxrel->rd_index->indisunique && + IsIndexUsableForReplicaIdentityFull(idxrel, entry->attrmap))) + { + index_close(idxrel, AccessShareLock); + continue; + } + + indnkeys = idxrel->rd_index->indnkeyatts; + attrmap = entry->attrmap; + + Assert(indnkeys); + + /* Seek each attributes and add to a Bitmap */ + for (int i = 0; i < indnkeys; i++) + { + AttrNumber localcol = idxrel->rd_index->indkey.values[i]; + AttrNumber remotecol; + + /* + * XXX: Mark a relation as parallel-unsafe if it has expression + * indexes because we cannot compute the hash value for the + * dependency tracking. For safety, transactions that modify such + * tables can wait for applications till the lastly dispatched + * transaction is committed. + */ + if (!AttributeNumberIsValid(localcol)) + { + entry->parallel_safe = LOGICALREP_PARALLEL_RESTRICTED; + suitable = false; + break; + } + + remotecol = attrmap->attnums[AttrNumberGetAttrOffset(localcol)]; + + /* + * Skip if the column does not exist on publisher node. In this + * case the replicated tuples always have NULL or default value. + */ + if (remotecol < 0) + { + suitable = false; + break; + } + + /* Checks are passed, remember the attribute */ + indexkeys = bms_add_member(indexkeys, remotecol); + } + + index_close(idxrel, AccessShareLock); + + /* + * Skip using the index if it is not suitable. This can happen if 1) + * one of the columns does not exist on the publisher side, or 2) + * there is an expression column. + */ + if (!suitable) + { + if (indexkeys) + bms_free(indexkeys); + + continue; + } + + /* This index is usable, store on memory */ + if (indexkeys) + { + MemoryContext oldctx; + LogicalRepSubscriberIdx *idxinfo; + + oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext); + idxinfo = palloc(sizeof(LogicalRepSubscriberIdx)); + idxinfo->indexoid = idxoid; + idxinfo->indexkeys = bms_copy(indexkeys); + entry->local_unique_indexes = + lappend(entry->local_unique_indexes, idxinfo); + + pfree(indexkeys); + MemoryContextSwitchTo(oldctx); + } + } + + list_free(idxlist); +} + /* * Check all local triggers for the relation to see the parallelizability. * @@ -370,7 +508,16 @@ logicalrep_rel_mark_updatable(LogicalRepRelMapEntry *entry) static void check_defined_triggers(LogicalRepRelMapEntry *entry) { - TriggerDesc *trigdesc = entry->localrel->trigdesc; + TriggerDesc *trigdesc; + + /* + * Skip if the parallelizability has already been checked. Possilble if + * the relation has expression indexes. + */ + if (entry->parallel_safe != LOGICALREP_PARALLEL_UNKNOWN) + return; + + trigdesc = entry->localrel->trigdesc; /* Quick exit if triffer is not defined */ if (trigdesc == NULL) @@ -411,7 +558,7 @@ check_defined_triggers(LogicalRepRelMapEntry *entry) * If the key is given, the corresponding entry is first searched in the hash * table and processed as in the above case. At the end, logical replication is * closed. - */ + */ void logicalrep_rel_load(LogicalRepRelMapEntry *entry, LogicalRepRelId remoteid, LOCKMODE lockmode) @@ -565,7 +712,11 @@ logicalrep_rel_load(LogicalRepRelMapEntry *entry, LogicalRepRelId remoteid, * tracking. */ if (am_leader_apply_worker()) + { + entry->parallel_safe = LOGICALREP_PARALLEL_UNKNOWN; + collect_local_indexes(entry); check_defined_triggers(entry); + } entry->localrelvalid = true; } @@ -867,6 +1018,12 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root, entry->localindexoid = FindLogicalRepLocalIndex(partrel, remoterel, entry->attrmap); + /* + * TODO: Parallel apply does not support the parallel apply for now. Just + * mark local indexes are collected. + */ + entry->local_unique_indexes_collected = true; + entry->localrelvalid = true; return entry; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index de665b8d4b3..a212bc6013d 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -578,10 +578,20 @@ typedef struct ApplySubXactData static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL}; +/* + * Type of key used for dependency tracking. + */ +typedef enum LogicalRepKeyKind +{ + LOGICALREP_KEY_REPLICA_IDENTITY, + LOGICALREP_KEY_LOCAL_UNIQUE +} LogicalRepKeyKind; + /* Hash table key for replica_identity_table */ typedef struct ReplicaIdentityKey { Oid relid; + LogicalRepKeyKind kind; LogicalRepTupleData *data; } ReplicaIdentityKey; @@ -760,7 +770,8 @@ static bool hash_replica_identity_compare(ReplicaIdentityKey *a, ReplicaIdentityKey *b) { if (a->relid != b->relid || - a->data->ncols != b->data->ncols) + a->data->ncols != b->data->ncols || + a->kind != b->kind) return false; for (int i = 0; i < a->data->ncols; i++) @@ -768,6 +779,9 @@ hash_replica_identity_compare(ReplicaIdentityKey *a, ReplicaIdentityKey *b) if (a->data->colstatus[i] != b->data->colstatus[i]) return false; + if (a->data->colstatus[i] == LOGICALREP_COLUMN_NULL) + continue; + if (a->data->colvalues[i].len != b->data->colvalues[i].len) return false; @@ -895,6 +909,89 @@ check_and_append_xid_dependency(TransactionId *xid, List **depends_on_xids) *depends_on_xids = lappend_xid(*depends_on_xids, *xid); } +/* + * Common function for checking dependency by using the key. Used by both + * check_and_record_ri_dependency and check_and_record_local_key_dependency. + */ +static void +check_and_record_key_dependency(ReplicaIdentityKey *key, + TransactionId new_depended_xid, + List **depends_on_xids) +{ + ReplicaIdentityEntry *rientry; + bool found = false; + + /* + * The new xid could be invalid if the transaction will be applied by the + * leader itself which means all the changes will be committed before + * processing next transaction. In this case, we only need to check for + * dependencies on preceding transactions, there is no need to record a new + * dependency for subsequent transactions to wait on. + */ + if (!TransactionIdIsValid(new_depended_xid)) + { + rientry = replica_identity_lookup(replica_identity_table, key); + free_replica_identity_key(key); + + /* No dependency detected */ + if (!rientry) + return; + + elog(DEBUG1, + key->kind == LOGICALREP_KEY_REPLICA_IDENTITY ? + "found conflicting replica identity change from %u" : + "found conflicting local unique change from %u", + rientry->remote_xid); + + Assert(TransactionIdIsValid(rientry->remote_xid)); + check_and_append_xid_dependency(&rientry->remote_xid, depends_on_xids); + + /* + * Remove the entry if the remote transaction has been committed locally + * and no new dependency needs to be added. + */ + if (!TransactionIdIsValid(rientry->remote_xid)) + { + free_replica_identity_key(rientry->keydata); + replica_identity_delete_item(replica_identity_table, rientry); + } + + return; + } + + /* Record a new dependency for subsequent transactions to wait on */ + rientry = replica_identity_insert(replica_identity_table, key, + &found); + + /* + * Release the key built to search the entry, if the entry already exists. + */ + if (found) + { + Assert(TransactionIdIsValid(rientry->remote_xid)); + + elog(DEBUG1, + key->kind == LOGICALREP_KEY_REPLICA_IDENTITY ? + "found conflicting replica identity change from %u" : + "found conflicting local unique change from %u", + rientry->remote_xid); + + free_replica_identity_key(key); + + /* + * Return if the current transaction was the last one to modify the + * key. + */ + if (TransactionIdEquals(rientry->remote_xid, new_depended_xid)) + return; + + check_and_append_xid_dependency(&rientry->remote_xid, depends_on_xids); + } + + /* Update the new depended xid into the entry */ + rientry->remote_xid = new_depended_xid; +} + /* * Check for dependencies on preceding transactions that modify the same key. * Returns the dependent transactions in 'depends_on_xids'. @@ -911,10 +1008,8 @@ check_and_record_ri_dependency(Oid relid, LogicalRepTupleData *original_data, LogicalRepRelMapEntry *relentry; LogicalRepTupleData *ridata; ReplicaIdentityKey *rikey; - ReplicaIdentityEntry *rientry; MemoryContext oldctx; int n_ri; - bool found = false; Assert(depends_on_xids); @@ -993,73 +1088,121 @@ check_and_record_ri_dependency(Oid relid, LogicalRepTupleData *original_data, rikey = palloc0_object(ReplicaIdentityKey); rikey->relid = relid; + rikey->kind = LOGICALREP_KEY_REPLICA_IDENTITY; rikey->data = ridata; MemoryContextSwitchTo(oldctx); + check_and_record_key_dependency(rikey, new_depended_xid, depends_on_xids); +} + +/* + * Mostly same as check_and_record_ri_dependency() but for local unique indexes. + */ +static void +check_and_record_local_key_dependency(Oid relid, + LogicalRepTupleData *original_data, + TransactionId new_depended_xid, + List **depends_on_xids) +{ + LogicalRepRelMapEntry *relentry; + LogicalRepTupleData *ridata; + ReplicaIdentityKey *rikey; + MemoryContext oldctx; + + Assert(depends_on_xids); + + /* Search for existing entry */ + relentry = logicalrep_get_relentry(relid); + + Assert(relentry); /* - * The new xid could be invalid if the transaction will be applied by the - * leader itself which means all the changes will be committed before - * processing next transaction. In this case, we only need to check for - * dependencies on preceding transactions, there is no need to record a new - * dependency for subsequent transactions to wait on. + * Gather information for local indexes if not yet. We require to be in a + * transaction state because system catalogs are read. */ - if (!TransactionIdIsValid(new_depended_xid)) + if (!relentry->local_unique_indexes_collected) { - rientry = replica_identity_lookup(replica_identity_table, rikey); - free_replica_identity_key(rikey); - - /* No dependency detected */ - if (!rientry) - return; + bool needs_start = !IsTransactionOrTransactionBlock(); - elog(DEBUG1, "found conflicting replica identity change from %u", - rientry->remote_xid); + if (needs_start) + StartTransactionCommand(); - Assert(TransactionIdIsValid(rientry->remote_xid)); - check_and_append_xid_dependency(&rientry->remote_xid, depends_on_xids); + logicalrep_rel_load(NULL, relid, AccessShareLock); /* - * Remove the entry if the remote transaction has been committed locally - * and no new dependency needs to be added. + * Close the transaction if we start here. We must not abort because + * it would release all session-level locks, such as the stream lock, + * and break the deadlock detection mechanism between LA and PA. The + * outcome is the same regardless of the end status, since the + * transaction did not modify any tuples. */ - if (!TransactionIdIsValid(rientry->remote_xid)) + if (needs_start) + CommitTransactionCommand(); + + Assert(relentry->local_unique_indexes_collected); + } + + foreach_ptr(LogicalRepSubscriberIdx, idxinfo, relentry->local_unique_indexes) + { + int columns = bms_num_members(idxinfo->indexkeys); + bool suitable = true; + + Assert(columns); + + for (int i = 0; i < original_data->ncols; i++) { - free_replica_identity_key(rientry->keydata); - replica_identity_delete_item(replica_identity_table, rientry); + if (!bms_is_member(i, idxinfo->indexkeys)) + continue; + + /* + * Skip if the column is not changed. + * + * XXX: NULL is allowed. + */ + if (original_data->colstatus[i] == LOGICALREP_COLUMN_UNCHANGED) + { + suitable = false; + break; + } } - return; - } + if (!suitable) + continue; - /* Record a new dependency for subsequent transactions to wait on */ - rientry = replica_identity_insert(replica_identity_table, rikey, - &found); + oldctx = MemoryContextSwitchTo(ApplyContext); - /* - * Release the key built to search the entry, if the entry already exists. - */ - if (found) - { - Assert(TransactionIdIsValid(rientry->remote_xid)); + /* Allocate space for replica identity values */ + ridata = palloc0_object(LogicalRepTupleData); + ridata->colvalues = palloc0_array(StringInfoData, columns); + ridata->colstatus = palloc0_array(char, columns); + ridata->ncols = columns; - elog(DEBUG1, "found conflicting replica identity change from %u", - rientry->remote_xid); + for (int i_original = 0, i_key = 0; i_original < original_data->ncols; i_original++) + { + if (!bms_is_member(i_original, idxinfo->indexkeys)) + continue; - free_replica_identity_key(rikey); + if (original_data->colstatus[i_original] != LOGICALREP_COLUMN_NULL) + { + StringInfo original_colvalue = &original_data->colvalues[i_original]; - /* - * Return if the current transaction was the last one to modify the - * key. - */ - if (TransactionIdEquals(rientry->remote_xid, new_depended_xid)) - return; + initStringInfoExt(&ridata->colvalues[i_key], original_colvalue->len + 1); + appendStringInfoString(&ridata->colvalues[i_key], original_colvalue->data); + } - check_and_append_xid_dependency(&rientry->remote_xid, depends_on_xids); - } + ridata->colstatus[i_key] = original_data->colstatus[i_original]; + i_key++; + } - /* Update the new depended xid into the entry */ - rientry->remote_xid = new_depended_xid; + rikey = palloc0_object(ReplicaIdentityKey); + rikey->relid = relid; + rikey->kind = LOGICALREP_KEY_LOCAL_UNIQUE; + rikey->data = ridata; + + MemoryContextSwitchTo(oldctx); + + check_and_record_key_dependency(rikey, new_depended_xid, depends_on_xids); + } } /* @@ -1256,6 +1399,9 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, relid = logicalrep_read_insert(&change, &newtup); check_and_record_ri_dependency(relid, &newtup, new_depended_xid, &depends_on_xids); + check_and_record_local_key_dependency(relid, &newtup, + new_depended_xid, + &depends_on_xids); check_dependency_for_parallel_safety(relid, new_depended_xid, &depends_on_xids); break; @@ -1269,6 +1415,10 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, check_and_record_ri_dependency(relid, &oldtup, new_depended_xid, &depends_on_xids); + check_and_record_local_key_dependency(relid, &oldtup, + new_depended_xid, + &depends_on_xids); + check_dependency_for_parallel_safety(relid, new_depended_xid, &depends_on_xids); @@ -1284,6 +1434,9 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, check_and_record_ri_dependency(relid, &newtup, new_depended_xid, &depends_on_xids); + check_and_record_local_key_dependency(relid, &newtup, + new_depended_xid, + &depends_on_xids); check_dependency_for_parallel_safety(relid, new_depended_xid, &depends_on_xids); break; @@ -1292,6 +1445,9 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, relid = logicalrep_read_delete(&change, &oldtup); check_and_record_ri_dependency(relid, &oldtup, new_depended_xid, &depends_on_xids); + check_and_record_local_key_dependency(relid, &oldtup, + new_depended_xid, + &depends_on_xids); check_dependency_for_parallel_safety(relid, new_depended_xid, &depends_on_xids); break; diff --git a/src/backend/storage/lmgr/deadlock.c b/src/backend/storage/lmgr/deadlock.c index b8962d875b6..31bbfcf1971 100644 --- a/src/backend/storage/lmgr/deadlock.c +++ b/src/backend/storage/lmgr/deadlock.c @@ -33,7 +33,6 @@ #include "storage/procnumber.h" #include "utils/memutils.h" - /* * One edge in the waits-for graph. * diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h index b1079d3f2fa..241c4dda13c 100644 --- a/src/include/replication/logicalrelation.h +++ b/src/include/replication/logicalrelation.h @@ -16,6 +16,12 @@ #include "catalog/index.h" #include "replication/logicalproto.h" +typedef struct LogicalRepSubscriberIdx +{ + Oid indexoid; /* OID of the local key */ + Bitmapset *indexkeys; /* Bitmap of key columns *on remote* */ +} LogicalRepSubscriberIdx; + typedef struct LogicalRepRelMapEntry { LogicalRepRelation remoterel; /* key is remoterel.remoteid */ @@ -46,6 +52,10 @@ typedef struct LogicalRepRelMapEntry */ TransactionId last_depended_xid; + /* Local unique indexes. Used for dependency tracking */ + List *local_unique_indexes; + bool local_unique_indexes_collected; + /* * Whether the relation can be applied in parallel or not. It is * distinglish whether defined triggers are the immutable or not. @@ -57,6 +67,10 @@ typedef struct LogicalRepRelMapEntry * Note that we do not check the user-defined constraints here. PostgreSQL * has already assumed that CHECK constraints' conditions are immutable and * here follows the rule. + * + * XXX: Additonally, this can be false if the relation has expression + * indexes. Because we cannot compute the hash value for the dependency + * tracking. */ char parallel_safe; } LogicalRepRelMapEntry; diff --git a/src/test/subscription/t/050_parallel_apply.pl b/src/test/subscription/t/050_parallel_apply.pl index 04441ca1cd0..ae539f84639 100644 --- a/src/test/subscription/t/050_parallel_apply.pl +++ b/src/test/subscription/t/050_parallel_apply.pl @@ -310,4 +310,64 @@ $node_subscriber->wait_for_log(qr/finish waiting for depended xid $xid/, $offset $h->query_safe("COMMIT;"); +################################################## +# Test that the dependency tracking works correctly for local unique indexes on +# subscriber during parallel apply. +################################################## + +# Truncate the data for upcoming tests +$node_publisher->safe_psql('postgres', "TRUNCATE TABLE regress_tab;"); +$node_publisher->wait_for_catchup('regress_sub'); + +# Define an unique index on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE INDEX ON regress_tab (value);"); + +# Attach an injection_point. Parallel workers would wait before the commit +$node_subscriber->safe_psql('postgres', + "SELECT injection_points_attach('parallel-worker-before-commit','wait');" +); + +# Insert a tuple on publisher. Parallel worker would wait at the injection +# point +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab VALUES (1, 'would conflict');"); + +# Wait until the parallel worker enters the injection point. +$node_subscriber->wait_for_event('logical replication parallel worker', + 'parallel-worker-before-commit'); + +$offset = -s $node_subscriber->logfile; + +# Insert tuples on publisher again. This transaction is would wait because all +# parallel workers wait till the previously launched worker commits. +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab VALUES (2, 'would not conflict');"); + +# Verify the parallel worker waits for the transaction +$str = $node_subscriber->wait_for_log(qr/wait for depended xid ([1-9][0-9]+)/, $offset); +$xid = $str =~ /wait for depended xid ([1-9][0-9]+)/; + +# Insert a conflicting tuple on publisher. Leader worker would detect the conflict +# and wait for the transaction to commit. +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab VALUES (3, 'would conflict');"); + +# Verify the parallel worker waits for the same transaction +$node_subscriber->wait_for_log(qr/wait for depended xid $xid/, $offset); + +# Wakeup the parallel worker +$node_subscriber->safe_psql('postgres', qq[ + SELECT injection_points_detach('parallel-worker-before-commit'); + SELECT injection_points_wakeup('parallel-worker-before-commit'); +]); + +# Verify the streamed transaction can be applied +$node_subscriber->wait_for_log(qr/finish waiting for depended xid $xid/, $offset); + +# Cleanup +$node_subscriber->safe_psql('postgres', "DROP INDEX regress_tab_value_idx;"); +$node_publisher->safe_psql('postgres', "TRUNCATE TABLE regress_tab;"); +$node_publisher->wait_for_catchup('regress_sub'); + done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index db8bb4e1e43..34c9592e3e5 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1701,6 +1701,7 @@ LogicalRepBeginData LogicalRepCommitData LogicalRepCommitPreparedTxnData LogicalRepCtxStruct +LogicalRepKeyKind LogicalRepMsgType LogicalRepPartMapEntry LogicalRepPreparedTxnData @@ -1710,6 +1711,7 @@ LogicalRepRelation LogicalRepRollbackPreparedTxnData LogicalRepSequenceInfo LogicalRepStreamAbortData +LogicalRepSubscriberIdx LogicalRepTupleData LogicalRepTyp LogicalRepWorker -- 2.43.0