From 239b5645fdbe0d5d6eb9431be0feb5d198a8ab89 Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Wed, 10 Jun 2026 13:03:16 +0800 Subject: [PATCH v20 8/9] Support dependency tracking via local unique indexes Currently, logical replication's parallel apply mechanism tracks dependencies primarily based on the REPLICA IDENTITY defined on the publisher table. However, local subscriber tables might have additional unique indexes that could effectively serve as dependency keys, even if they don't correspond to the publisher's REPLICA IDENTITY. Failing to track these additional unique keys can lead to incorrect data and/or deadlocks during parallel application. This patch extends the parallel apply's dependency tracking to consider local unique indexes on the subscriber table. This is achieved by extending the existing Replica Identity hash table to also store dependency information based on these local unique indexes. The LogicalRepRelMapEntry structure is extended to store details about these local unique indexes. This information is collected and cached when dependency checking is first performed for a remote transaction on a given relation. This collection process requires to be in a transaction to access system catalog information. --- .../replication/logical/applyparallelworker.c | 50 +++ src/backend/replication/logical/relation.c | 196 ++++++++- src/backend/replication/logical/worker.c | 387 +++++++++++++----- src/backend/storage/lmgr/deadlock.c | 1 - src/include/replication/logicalrelation.h | 12 + src/test/subscription/t/050_parallel_apply.pl | 124 ++++++ src/tools/pgindent/typedefs.list | 2 + 7 files changed, 667 insertions(+), 105 deletions(-) diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 58bd7a69992..8826faf102c 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -174,6 +174,56 @@ * both are allowed to apply in parallel, TX-2's DELETE could be applied before * TX-1's INSERT, resulting in a delete_missing conflict. * + * Beyond replica identity keys, we also track dependencies on transactions that + * modify the same local unique key. Even if the replica identity keys differ, + * unique indexes can still cause conflicts. This is necessary to prevent + * unexpected errors. For example: + * + * TX-1: DELETE row (1,2) with replica identity key (1,2) and unique key (2) + * TX-2: INSERT row (3,2) with replica identity key (3,2) and unique key (2) + * + * If applied in parallel, TX-2's INSERT could be applied before TX-1's DELETE, + * leading to a unique index violation error. + * + * We do not track dependencies for INSERT and UPDATE that conflict on a new + * unique key value, since such conflicts would cause an error even in serial + * mode. Instead, we only track dependencies involving old tuples (from DELETE + * or UPDATE) and require INSERT and UPDATE transactions that target the same + * unique key to wait for them. + * + * Note that the old tuple of an UPDATE or DELETE may not include the unique key + * column if that column is not part of the replica identity columns on the + * publisher. In such cases, we only use the unique columns that are part of the + * replica identity keys for dependency tracking, which may lead to false + * positives. For example, consider a unique index defined as UNIQUE (a, b), + * where only b is part of the replica identity keys: + * + * TX-1: DELETE row (1,2) TX-2: INSERT row (3,2) + * + * If applied in parallel, both transactions will be treated as dependent + * because they modify the same unique key value (b=2), even though they + * actually modify different unique keys. This is acceptable because it is still + * better than completely disallowing parallelism for these transactions. + * + * In the worst case, if none of the unique index columns are part of the + * replica identity keys, we treat all transactions that modify the same table + * as dependent and disallow parallelism for that table. + * + * XXX We could consider requesting the publisher to include unique key columns + * in the old tuple of UPDATE or DELETE when they are not part of the replica + * identity keys. This would reduce false positives, but would require changes + * on the publisher side and increase disk (WAL size) and network data. For now, + * we choose not to implement this. An alternative approach is to provide an + * option to skip tracking dependencies on unique keys that are not part of the + * replica identity keys. This could be useful for users who prefer higher + * parallelism and experience few conflicts. + * + * Note that the local unique key could change after dependency checking and + * before applying the change. However, to centralize tracking and keep it + * simple, we still perform this check only in the leader apply worker. This is + * acceptable because in the worst case, the parallel worker will report an + * error and restart the transaction using the latest index information. + * * Commit order * ------------ * We preserve publisher commit order for all transactions for two reasons: diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index eeb85f8cc5d..8eef87ca910 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -84,6 +84,7 @@ logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid) if (entry->localreloid == reloid) { entry->localrelvalid = false; + entry->local_unique_indexes_collected = false; hash_seq_term(&status); break; } @@ -97,7 +98,10 @@ logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid) hash_seq_init(&status, LogicalRepRelMap); while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL) + { entry->localrelvalid = false; + entry->local_unique_indexes_collected = false; + } } } @@ -128,6 +132,21 @@ logicalrep_relmap_init(void) (Datum) 0); } +/* + * Release local index list + */ +static void +free_local_unique_indexes(LogicalRepRelMapEntry *entry) +{ + Assert(am_leader_apply_worker()); + + foreach_ptr(LogicalRepSubscriberIdx, idxinfo, entry->local_unique_indexes) + bms_free(idxinfo->indexkeys); + + list_free_deep(entry->local_unique_indexes); + entry->local_unique_indexes = NIL; +} + /* * Free the entry of a relation map cache. */ @@ -155,6 +174,9 @@ logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry) if (entry->attrmap) free_attrmap(entry->attrmap); + + if (entry->local_unique_indexes != NIL) + free_local_unique_indexes(entry); } /* @@ -217,6 +239,7 @@ logicalrep_relmap_update(LogicalRepRelation *remoterel) entry->remoterel.attkeys = bms_copy(remoterel->attkeys); entry->parallel_safe = LOGICALREP_PARALLEL_UNKNOWN; + entry->local_unique_indexes_collected = false; MemoryContextSwitchTo(oldctx); } @@ -361,6 +384,154 @@ logicalrep_rel_mark_updatable(LogicalRepRelMapEntry *entry) } } +/* + * Collect all local unique indexes that can be used for dependency tracking + * + * This function collects all types of unique indexes, including those with + * index expressions and partial indexes. However, to avoid the overhead and + * complexity of executing expressions, we do not evaluate them during + * dependency tracking. + * + * For indexes with expressions, only the non-expression columns are recorded in + * the bitmap. The dependency tracking function will use only these columns, + * which may lead to false dependency detection. For example, consider a unique + * index defined as UNIQUE (a, func(b)), where b is an expression column. Rows + * (1, 2) and (1, 3) will be treated as dependent even though they are not. This + * is acceptable, as it is still better than disabling parallelism for all + * relations that have expression indexes. + * + * Similarly, partial indexes may also cause false dependencies due to predicate + * expressions. For the same reason, we consider this acceptable as well. + * + * To avoid redundant dependency tracking, indexes whose key columns are the + * same as, or a superset of, the replica identity key columns are skipped, + * since tracking the replica identity keys already covers their scope. + * + * Columns not in the replica identity key are excluded from the unique column + * set. Since the old tuple of an UPDATE or DELETE contains only replica + * identity key columns, any other columns would be missing and thus unavailable + * for dependency tracking. + */ +static void +collect_indexes_for_dependency_tracking(LogicalRepRelMapEntry *entry) +{ + List *idxlist; + + free_local_unique_indexes(entry); + + /* + * XXX For partitioned tables, we must collect unique indexes from leaf + * partitions, which are the actual replication targets. This is because + * leaf partitions can have unique indexes that are not present on the + * partitioned table, and those indexes can be used for dependency tracking. + * However, collecting unique indexes from leaf partitions requires building + * the tuple, and executing partition pruning expressions, which could be + * expensive for each change on a partitioned table. For now, we skip + * collecting local unique indexes for partitioned tables and create a dummy + * entry, ensuring that changes on partitioned tables are not applied in + * parallel. + */ + if (entry->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + { + MemoryContext oldctx; + LogicalRepSubscriberIdx *indexinfo; + + oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext); + indexinfo = palloc(sizeof(LogicalRepSubscriberIdx)); + indexinfo->indexoid = InvalidOid; + indexinfo->indexkeys = NULL; + indexinfo->nulls_distinct = false; + entry->local_unique_indexes = lappend(entry->local_unique_indexes, + indexinfo); + MemoryContextSwitchTo(oldctx); + + entry->local_unique_indexes_collected = true; + + return; + } + + idxlist = RelationGetIndexList(entry->localrel); + + /* Iterate indexes to list all usable indexes */ + foreach_oid(idxoid, idxlist) + { + Relation idxrel; + int indnkeys; + AttrMap *attrmap; + MemoryContext oldctx; + LogicalRepSubscriberIdx *indexinfo; + Bitmapset *indexkeys = NULL; + bool nulls_distinct; + + idxrel = index_open(idxoid, AccessShareLock); + + /* Only unique indexes are considered */ + if (!idxrel->rd_index->indisunique) + { + index_close(idxrel, AccessShareLock); + continue; + } + + indnkeys = idxrel->rd_index->indnkeyatts; + nulls_distinct = !idxrel->rd_index->indnullsnotdistinct; + attrmap = entry->attrmap; + + Assert(indnkeys); + + /* Seek each attributes and add to a Bitmap */ + for (int i = 0; i < indnkeys; i++) + { + AttrNumber localcol = idxrel->rd_index->indkey.values[i]; + AttrNumber remotecol; + + /* Skip expression */ + if (!AttributeNumberIsValid(localcol)) + continue; + + remotecol = attrmap->attnums[AttrNumberGetAttrOffset(localcol)]; + + /* Skip if the column does not exist on publisher node */ + if (remotecol < 0) + continue; + + /* Skip columns that are not part of the replica identity key */ + if (!bms_is_member(remotecol, entry->remoterel.attkeys)) + continue; + + /* Checks are passed, remember the attribute */ + indexkeys = bms_add_member(indexkeys, remotecol); + } + + index_close(idxrel, AccessShareLock); + + /* + * Skip indexes whose key columns are a superset of the replica identity + * key. + */ + if (bms_equal(entry->remoterel.attkeys, indexkeys) || + bms_is_subset(entry->remoterel.attkeys, indexkeys)) + { + bms_free(indexkeys); + continue; + } + + oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext); + indexinfo = palloc(sizeof(LogicalRepSubscriberIdx)); + indexinfo->indexoid = idxoid; + indexinfo->indexkeys = bms_copy(indexkeys); + indexinfo->nulls_distinct = nulls_distinct; + entry->local_unique_indexes = lappend(entry->local_unique_indexes, + indexinfo); + MemoryContextSwitchTo(oldctx); + + bms_free(indexkeys); + } + + list_free(idxlist); + + entry->local_unique_indexes_collected = true; +} + /* * Check all local triggers for the relation to see the parallelizability. * @@ -370,7 +541,16 @@ logicalrep_rel_mark_updatable(LogicalRepRelMapEntry *entry) static void check_defined_triggers(LogicalRepRelMapEntry *entry) { - TriggerDesc *trigdesc = entry->localrel->trigdesc; + TriggerDesc *trigdesc; + + /* + * Skip if the parallelizability has already been checked. Possilble if + * the relation has expression indexes. + */ + if (entry->parallel_safe != LOGICALREP_PARALLEL_UNKNOWN) + return; + + trigdesc = entry->localrel->trigdesc; /* Quick exit if triffer is not defined */ if (trigdesc == NULL) @@ -411,7 +591,7 @@ check_defined_triggers(LogicalRepRelMapEntry *entry) * If the key is given, the corresponding entry is first searched in the hash * table and processed as in the above case. At the end, logical replication is * closed. - */ + */ void logicalrep_rel_load(LogicalRepRelMapEntry *entry, LogicalRepRelId remoteid, LOCKMODE lockmode) @@ -455,6 +635,7 @@ logicalrep_rel_load(LogicalRepRelMapEntry *entry, LogicalRepRelId remoteid, { /* Table was renamed or dropped. */ entry->localrelvalid = false; + entry->local_unique_indexes_collected = false; } else if (!entry->localrelvalid) { @@ -565,7 +746,11 @@ logicalrep_rel_load(LogicalRepRelMapEntry *entry, LogicalRepRelId remoteid, * tracking. */ if (am_leader_apply_worker()) + { + entry->parallel_safe = LOGICALREP_PARALLEL_UNKNOWN; + collect_indexes_for_dependency_tracking(entry); check_defined_triggers(entry); + } entry->localrelvalid = true; } @@ -867,6 +1052,13 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root, entry->localindexoid = FindLogicalRepLocalIndex(partrel, remoterel, entry->attrmap); + /* + * TODO: Parallel apply cannot collect indexes from leaf partition for now. + * Just mark local indexes are collected. (See + * collect_indexes_for_dependency_tracking() for details.) + */ + entry->local_unique_indexes_collected = true; + entry->localrelvalid = true; return entry; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 717cf74f0ac..17eaef28039 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -578,10 +578,20 @@ typedef struct ApplySubXactData static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL}; +/* + * Type of key used for dependency tracking. + */ +typedef enum LogicalRepKeyKind +{ + LOGICALREP_KEY_REPLICA_IDENTITY, + LOGICALREP_KEY_LOCAL_UNIQUE +} LogicalRepKeyKind; + /* Hash table key for replica_identity_table */ typedef struct ReplicaIdentityKey { Oid relid; + LogicalRepKeyKind kind; LogicalRepTupleData *data; } ReplicaIdentityKey; @@ -761,7 +771,8 @@ static bool hash_replica_identity_compare(ReplicaIdentityKey *a, ReplicaIdentityKey *b) { if (a->relid != b->relid || - a->data->ncols != b->data->ncols) + a->data->ncols != b->data->ncols || + a->kind != b->kind) return false; for (int i = 0; i < a->data->ncols; i++) @@ -769,6 +780,9 @@ hash_replica_identity_compare(ReplicaIdentityKey *a, ReplicaIdentityKey *b) if (a->data->colstatus[i] != b->data->colstatus[i]) return false; + if (a->data->colstatus[i] == LOGICALREP_COLUMN_NULL) + continue; + if (a->data->colvalues[i].len != b->data->colvalues[i].len) return false; @@ -789,8 +803,12 @@ free_replica_identity_key(ReplicaIdentityKey *key) { Assert(key); - pfree(key->data->colvalues); - pfree(key->data->colstatus); + if (key->data->colvalues) + pfree(key->data->colvalues); + + if (key->data->colstatus) + pfree(key->data->colstatus); + pfree(key->data); pfree(key); } @@ -933,71 +951,124 @@ append_xid_dependency(TransactionId xid, List **depends_on_xids) } /* - * Check for dependencies on preceding transactions that modify the same key as - * the given tuple. Returns the dependent transactions in 'depends_on_xids'. + * Common function for checking dependency by using the key. Used by both + * check_and_record_ri_dependency and check_and_record_local_key_dependency. + * + * Check whether the given key has an active dependency. If new_depended_xid is + * valid, also records a new dependency for that transaction. * - * Additionally, if new_depended_xid is valid, record the current change and the - * transaction as a new dependency for the replica identity key modification, - * allowing subsequent transactions that modify the same key to be dependent on - * it. + * Return the existing transaction ID if an active dependency exists for the + * key; otherwise returns InvalidTransactionId. */ -static void -check_and_record_ri_dependency(Oid relid, LogicalRepTupleData *original_data, - TransactionId new_depended_xid, - List **depends_on_xids) +static TransactionId +check_and_record_key_dependency(ReplicaIdentityKey *key, + TransactionId new_depended_xid) { - LogicalRepRelMapEntry *relentry; - LogicalRepTupleData *ridata; - ReplicaIdentityKey *rikey; + TransactionId existing_xid = InvalidTransactionId; ReplicaIdentityEntry *rientry; - MemoryContext oldctx; - int n_ri; bool found = false; - Assert(depends_on_xids); + /* + * The new xid could be invalid if the transaction will be applied by the + * leader itself which means all the changes will be committed before + * processing next transaction. In this case, we only need to check for + * dependencies on preceding transactions, there is no need to record a new + * dependency for subsequent transactions to wait on. + */ + if (!TransactionIdIsValid(new_depended_xid)) + { + rientry = replica_identity_lookup(replica_identity_table, key); - /* Search for existing entry */ - relentry = logicalrep_get_relentry(relid); + if (rientry && has_active_key_dependency(rientry, true)) + { + elog(DEBUG1, + key->kind == LOGICALREP_KEY_REPLICA_IDENTITY ? + "found conflicting replica identity change on table %u from %u" : + "found conflicting local unique key change on table %u from %u", + key->relid, rientry->remote_xid); - Assert(relentry); + existing_xid = rientry->remote_xid; + } + + free_replica_identity_key(key); + + return existing_xid; + } + + /* Record a new dependency for subsequent transactions to wait on */ + rientry = replica_identity_insert(replica_identity_table, key, + &found); /* - * First check whether any previous transaction (other than the current one) - * has affected the whole table e.g., truncate or schema change from - * publisher. + * Release the key built to search the entry, if the entry already exists. */ - if (has_active_rel_dependency(relentry) && - !TransactionIdEquals(relentry->last_depended_xid, new_depended_xid)) + if (found) { - elog(DEBUG1, "found table-wide change affecting %u from %u", - relid, relentry->last_depended_xid); + if (has_active_key_dependency(rientry, false)) + { + elog(DEBUG1, + key->kind == LOGICALREP_KEY_REPLICA_IDENTITY ? + "found conflicting replica identity change on table %u from %u" : + "found conflicting local unique key change on table %u from %u", + key->relid, rientry->remote_xid); - append_xid_dependency(relentry->last_depended_xid, depends_on_xids); + existing_xid = rientry->remote_xid; + } + + free_replica_identity_key(key); } - n_ri = bms_num_members(relentry->remoterel.attkeys); + rientry->remote_xid = new_depended_xid; - /* - * Return if there are no replica identity columns, indicating that the - * remote relation has neither a replica identity key nor is marked as - * replica identity full. - */ - if (!n_ri) - return; + return existing_xid; +} - /* Check if the RI key value of the tuple is invalid */ - for (int i = 0; i < original_data->ncols; i++) +/* + * Check if any of the key columns have NULL values. + */ +static bool +has_null_key_values(LogicalRepTupleData *data, Bitmapset *indexkeys) +{ + for (int i = 0; i < data->ncols; i++) { - if (!bms_is_member(i, relentry->remoterel.attkeys)) - continue; + if (bms_is_member(i, indexkeys) && + data->colstatus[i] == LOGICALREP_COLUMN_NULL) + return true; + } - /* - * NULL in the new tuple means the replica identity key hasn't changed, - * so no new dependency needs to be recorded. The dependency should have - * been recorded when processing the old tuple. - */ - if (original_data->colstatus[i] == LOGICALREP_COLUMN_NULL) - return; + return false; +} + +/* + * Build a hash key for replica_identity_table using the given relation and + * tuple data, restricted to the specified key columns. + */ +static ReplicaIdentityKey * +build_replica_identity_key(Oid relid, LogicalRepTupleData *original_data, + Bitmapset *keycols) +{ + LogicalRepTupleData *keydata; + ReplicaIdentityKey *key; + MemoryContext oldctx; + int nkeycols = bms_num_members(keycols); + + oldctx = MemoryContextSwitchTo(ApplyContext); + + /* Allocate space for replica identity values */ + keydata = palloc0_object(LogicalRepTupleData); + + if (nkeycols) + { + keydata->colvalues = palloc0_array(StringInfoData, nkeycols); + keydata->colstatus = palloc0_array(char, nkeycols); + } + + keydata->ncols = nkeycols; + + for (int i = 0, i_key = 0; i < original_data->ncols; i++) + { + if (!bms_is_member(i, keycols)) + continue; /* * LOGICALREP_COLUMN_UNCHANGED only indicates that a TOAST column in the @@ -1011,85 +1082,184 @@ check_and_record_ri_dependency(Oid relid, LogicalRepTupleData *original_data, */ Assert(original_data->colstatus[i] != LOGICALREP_COLUMN_UNCHANGED || original_data->colvalues[i].len > 0); + + if (original_data->colstatus[i] != LOGICALREP_COLUMN_NULL) + { + StringInfo original_colvalue = &original_data->colvalues[i]; + + initStringInfoExt(&keydata->colvalues[i_key], original_colvalue->len + 1); + appendStringInfoString(&keydata->colvalues[i_key], original_colvalue->data); + } + + keydata->colstatus[i_key] = original_data->colstatus[i]; + i_key++; } - oldctx = MemoryContextSwitchTo(ApplyContext); + key = palloc0_object(ReplicaIdentityKey); + key->relid = relid; + key->data = keydata; - /* Allocate space for replica identity values */ - ridata = palloc0_object(LogicalRepTupleData); - ridata->colvalues = palloc0_array(StringInfoData, n_ri); - ridata->colstatus = palloc0_array(char, n_ri); - ridata->ncols = n_ri; + MemoryContextSwitchTo(oldctx); - for (int i_original = 0, i_ri = 0; i_original < original_data->ncols; i_original++) - { - StringInfo original_colvalue = &original_data->colvalues[i_original]; + return key; +} - if (!bms_is_member(i_original, relentry->remoterel.attkeys)) - continue; +/* + * Mostly same as check_and_record_ri_dependency() but for local unique indexes. + * + * See the comments in applyparallelworker.c for details on why tracking these + * dependencies is necessary. + */ +static void +check_and_record_local_key_dependency(Oid relid, + LogicalRepTupleData *original_data, + bool old_tuple, + TransactionId new_depended_xid, + List **depends_on_xids) +{ + LogicalRepRelMapEntry *relentry; + ReplicaIdentityKey *rikey; - initStringInfoExt(&ridata->colvalues[i_ri], original_colvalue->len + 1); - appendStringInfoString(&ridata->colvalues[i_ri], original_colvalue->data); - ridata->colstatus[i_ri] = original_data->colstatus[i_original]; - i_ri++; - } + Assert(depends_on_xids); - rikey = palloc0_object(ReplicaIdentityKey); - rikey->relid = relid; - rikey->data = ridata; + /* Search for existing entry */ + relentry = logicalrep_get_relentry(relid); - MemoryContextSwitchTo(oldctx); + Assert(relentry); /* - * The new xid could be invalid if the transaction will be applied by the - * leader itself which means all the changes will be committed before - * processing next transaction. In this case, we only need to check for - * dependencies on preceding transactions, there is no need to record a new - * dependency for subsequent transactions to wait on. + * Gather information for local indexes if not yet. We require to be in a + * transaction state because system catalogs are read. */ - if (!TransactionIdIsValid(new_depended_xid)) + if (!relentry->local_unique_indexes_collected) { - rientry = replica_identity_lookup(replica_identity_table, rikey); - free_replica_identity_key(rikey); + bool needs_start = !IsTransactionOrTransactionBlock(); - if (rientry && has_active_key_dependency(rientry, true)) - { - elog(DEBUG1, "found conflicting replica identity change on table %u from %u", - relid, rientry->remote_xid); + if (needs_start) + StartTransactionCommand(); - append_xid_dependency(rientry->remote_xid, depends_on_xids); - } + logicalrep_rel_load(NULL, relid, AccessShareLock); - return; - } + if (needs_start) + CommitTransactionCommand(); - /* Record a new dependency for subsequent transactions to wait on */ - rientry = replica_identity_insert(replica_identity_table, rikey, - &found); + Assert(relentry->local_unique_indexes_collected); + } - /* - * Release the key built to search the entry, if the entry already exists. - */ - if (found) + foreach_ptr(LogicalRepSubscriberIdx, idxinfo, relentry->local_unique_indexes) { - free_replica_identity_key(rikey); + /* + * NULL values in the new tuple represent true NULLs in a unique index. + * If NULLs are treated as distinct (nulls_distinct = true), they never + * cause conflicts. Therefore, we can skip dependency checking if any + * key column is NULL in this case. + * + * However, for old tuples in UPDATE or DELETE operations, a NULL key + * simply indicate the column lies outside the replica identity key + * rather than a true NULL. In such cases, the remote old tuple could + * still conflict with a local tuple, so we must not skip the check. + */ + if (!old_tuple && idxinfo->nulls_distinct && + has_null_key_values(original_data, idxinfo->indexkeys)) + continue; /* - * Append the dependency to the list if the current transaction was not - * the lastest one to modify the key. + * Old tuples of unique keys do not conflict with any preceding + * transaction (see the comments in applyparallelworker.c for details on + * conflicting cases). When we don't need to record a new dependency, we + * can skip processing this index entirely. */ - if (has_active_key_dependency(rientry, false) && - !TransactionIdEquals(rientry->remote_xid, new_depended_xid)) + if (old_tuple && !TransactionIdIsValid(new_depended_xid)) + continue; + + rikey = build_replica_identity_key(relid, original_data, idxinfo->indexkeys); + rikey->kind = LOGICALREP_KEY_LOCAL_UNIQUE; + + /* + * For old tuples, record a dependency for subsequent transactions to + * wait on; no preceding transactions are added to the list. + * + * For new tuples in INSERT or UPDATE, check for existing key + * dependencies and add any dependent transactions to the list. + */ + if (old_tuple) + { + (void) check_and_record_key_dependency(rikey, new_depended_xid); + } + else { - elog(DEBUG1, "found conflicting replica identity change on table %u from %u", - relid, rientry->remote_xid); + TransactionId xid; - append_xid_dependency(rientry->remote_xid, depends_on_xids); + xid = check_and_record_key_dependency(rikey, InvalidTransactionId); + + if (TransactionIdIsValid(xid) && + !TransactionIdEquals(xid, new_depended_xid)) + append_xid_dependency(xid, depends_on_xids); } } +} - /* Update the new depended xid into the entry */ - rientry->remote_xid = new_depended_xid; +/* + * Check for dependencies on preceding transactions that modify the same key. + * Returns the dependent transactions in 'depends_on_xids'. + * + * Additionally, if new_depended_xid is valid, record it as a dependency for the + * replica identity key modification, allowing subsequent transactions that + * modify the same key to be dependent on it. + */ +static void +check_and_record_ri_dependency(Oid relid, LogicalRepTupleData *original_data, + TransactionId new_depended_xid, + List **depends_on_xids) +{ + LogicalRepRelMapEntry *relentry; + ReplicaIdentityKey *rikey; + TransactionId xid; + + Assert(depends_on_xids); + + /* Search for existing entry */ + relentry = logicalrep_get_relentry(relid); + + Assert(relentry); + + /* + * First check whether any previous transaction (other than the current one) + * has affected the whole table e.g., truncate or schema change from + * publisher. + */ + if (has_active_rel_dependency(relentry) && + !TransactionIdEquals(relentry->last_depended_xid, new_depended_xid)) + append_xid_dependency(relentry->last_depended_xid, depends_on_xids); + + /* + * Return if there are no replica identity columns, indicating that the + * remote relation has neither a replica identity key nor is marked as + * replica identity full. + */ + if (!bms_num_members(relentry->remoterel.attkeys)) + return; + + /* + * NULL in the new tuple means the replica identity key hasn't changed, so + * no new dependency needs to be recorded. The dependency should have been + * recorded when processing the old tuple. + */ + if (has_null_key_values(original_data, relentry->remoterel.attkeys)) + return; + + rikey = build_replica_identity_key(relid, original_data, relentry->remoterel.attkeys); + rikey->kind = LOGICALREP_KEY_REPLICA_IDENTITY; + + xid = check_and_record_key_dependency(rikey, new_depended_xid); + + /* + * Append the dependency to the list if the current transaction was not the + * lastest one to modify the key. + */ + if (TransactionIdIsValid(xid) && + !TransactionIdEquals(xid, new_depended_xid)) + append_xid_dependency(xid, depends_on_xids); } /* @@ -1291,6 +1461,9 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, relid = logicalrep_read_insert(&change, &newtup); check_and_record_ri_dependency(relid, &newtup, new_depended_xid, &depends_on_xids); + check_and_record_local_key_dependency(relid, &newtup, false, + new_depended_xid, + &depends_on_xids); check_dependency_for_parallel_safety(relid, new_depended_xid, &depends_on_xids); break; @@ -1304,6 +1477,10 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, check_and_record_ri_dependency(relid, &oldtup, new_depended_xid, &depends_on_xids); + check_and_record_local_key_dependency(relid, &oldtup, true, + new_depended_xid, + &depends_on_xids); + check_dependency_for_parallel_safety(relid, new_depended_xid, &depends_on_xids); @@ -1325,6 +1502,9 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, check_and_record_ri_dependency(relid, &newtup, new_depended_xid, &depends_on_xids); + check_and_record_local_key_dependency(relid, &newtup, false, + new_depended_xid, + &depends_on_xids); check_dependency_for_parallel_safety(relid, new_depended_xid, &depends_on_xids); break; @@ -1333,6 +1513,9 @@ handle_dependency_on_change(LogicalRepMsgType action, StringInfo s, relid = logicalrep_read_delete(&change, &oldtup); check_and_record_ri_dependency(relid, &oldtup, new_depended_xid, &depends_on_xids); + check_and_record_local_key_dependency(relid, &oldtup, true, + new_depended_xid, + &depends_on_xids); check_dependency_for_parallel_safety(relid, new_depended_xid, &depends_on_xids); break; diff --git a/src/backend/storage/lmgr/deadlock.c b/src/backend/storage/lmgr/deadlock.c index b8962d875b6..31bbfcf1971 100644 --- a/src/backend/storage/lmgr/deadlock.c +++ b/src/backend/storage/lmgr/deadlock.c @@ -33,7 +33,6 @@ #include "storage/procnumber.h" #include "utils/memutils.h" - /* * One edge in the waits-for graph. * diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h index b1079d3f2fa..fe38862b300 100644 --- a/src/include/replication/logicalrelation.h +++ b/src/include/replication/logicalrelation.h @@ -46,6 +46,10 @@ typedef struct LogicalRepRelMapEntry */ TransactionId last_depended_xid; + /* Local unique indexes. Used for dependency tracking */ + List *local_unique_indexes; + bool local_unique_indexes_collected; + /* * Whether the relation can be applied in parallel or not. It is * distinglish whether defined triggers are the immutable or not. @@ -61,6 +65,14 @@ typedef struct LogicalRepRelMapEntry char parallel_safe; } LogicalRepRelMapEntry; + +typedef struct LogicalRepSubscriberIdx +{ + Oid indexoid; /* OID of the local key */ + Bitmapset *indexkeys; /* Bitmap of key columns *on remote* */ + bool nulls_distinct; /* Whether NULLs are considered distinct */ +} LogicalRepSubscriberIdx; + extern void logicalrep_relmap_update(LogicalRepRelation *remoterel); extern void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel); diff --git a/src/test/subscription/t/050_parallel_apply.pl b/src/test/subscription/t/050_parallel_apply.pl index 358c7b7b7e1..6b0d5c7e383 100644 --- a/src/test/subscription/t/050_parallel_apply.pl +++ b/src/test/subscription/t/050_parallel_apply.pl @@ -403,4 +403,128 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(1) FROM regress_tab"); is ($result, 5011, 'inserts are replicated to subscriber'); +################################################## +# Test that the dependency tracking works correctly for local unique indexes on +# subscriber during parallel apply. +################################################## + +# Truncate the data for upcoming tests +$node_publisher->safe_psql('postgres', "TRUNCATE TABLE regress_tab;"); +$node_publisher->wait_for_catchup('regress_sub'); + +# Define an unique index on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE UNIQUE INDEX local_unique_idx ON regress_tab (value);"); + +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab VALUES (1, 'would conflict');"); + +$node_publisher->wait_for_catchup('regress_sub'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(1) FROM regress_tab"); +is ($result, 1, 'the insert is replicated to subscriber'); + +# Attach an injection_point. Parallel workers would wait before the commit +$node_subscriber->safe_psql('postgres', + "SELECT injection_points_attach('parallel-worker-before-commit','wait');" +); + +# Delete the tuple on publisher. +$node_publisher->safe_psql('postgres', + "DELETE FROM regress_tab WHERE id = 1;"); + +# Wait until the parallel worker enters the injection point. +$node_subscriber->wait_for_event('logical replication parallel worker', + 'parallel-worker-before-commit'); + +$offset = -s $node_subscriber->logfile; + +# Insert tuples. This should conflict with the DELETE transaction, as both +# transactions modify the same key. The parallel worker will wait for the +# preceding transaction to finish. +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab VALUES (2, 'would conflict');"); + +# Verify the dependency is detected for the insert +$str = $node_subscriber->wait_for_log(qr/found conflicting local unique key change on table [1-9][0-9]+ from ([1-9][0-9]+)/, $offset); +$xid = $str =~ /found conflicting local unique key change on table [1-9][0-9]+ from ([1-9][0-9]+)/; + +# Verify the parallel worker waits for the same transaction +$node_subscriber->wait_for_log(qr/wait for depended xid $xid/, $offset); + +ok(1, "local unique key dependency detected for parallel apply"); + +# Wakeup the parallel worker +$node_subscriber->safe_psql('postgres', qq[ + SELECT injection_points_detach('parallel-worker-before-commit'); + SELECT injection_points_wakeup('parallel-worker-before-commit'); +]); + +# Verify the streamed transaction can be applied +$node_subscriber->wait_for_log(qr/finish waiting for depended xid $xid/, $offset); + +$node_publisher->wait_for_catchup('regress_sub'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(1) FROM regress_tab"); +is ($result, 1, 'inserts are replicated to subscriber'); + +# Test that the dependency tracking works correctly for local unique indexes on +# subscriber during parallel apply when the unique index has expression. +$node_subscriber->safe_psql('postgres', "DROP INDEX local_unique_idx;"); +$node_subscriber->safe_psql('postgres', + "CREATE UNIQUE INDEX local_unique_idx_expr ON regress_tab ((LOWER(value)));"); + +# Attach an injection_point. Parallel workers would wait before the commit +$node_subscriber->safe_psql('postgres', + "SELECT injection_points_attach('parallel-worker-before-commit','wait');" +); + +# Insert a tuple on publisher. Parallel worker would wait at the injection +# point +$node_publisher->safe_psql('postgres', + "DELETE FROM regress_tab WHERE id = 2;"); + +# Wait until the parallel worker enters the injection point. +$node_subscriber->wait_for_event('logical replication parallel worker', + 'parallel-worker-before-commit'); + +$offset = -s $node_subscriber->logfile; + +# Insert tuples. This should conflict with the DELETE transaction, as both +# transactions modify the same key value. The parallel worker will wait for the +# preceding transaction to finish. +$node_publisher->safe_psql('postgres', + "INSERT INTO regress_tab VALUES (3, 'WOULD CONFLICT');"); + +# Verify the dependency is detected for the insert +$str = $node_subscriber->wait_for_log(qr/found conflicting local unique key change on table [1-9][0-9]+ from ([1-9][0-9]+)/, $offset); +$xid = $str =~ /found conflicting local unique key change on table [1-9][0-9]+ from ([1-9][0-9]+)/; + +# Verify the parallel worker waits for the same transaction +$node_subscriber->wait_for_log(qr/wait for depended xid $xid/, $offset); + +ok(1, "local unique key dependency from index expression detected for parallel apply"); + +# Wakeup the parallel worker +$node_subscriber->safe_psql('postgres', qq[ + SELECT injection_points_detach('parallel-worker-before-commit'); + SELECT injection_points_wakeup('parallel-worker-before-commit'); +]); + +# Verify the streamed transaction can be applied +$node_subscriber->wait_for_log(qr/finish waiting for depended xid $xid/, $offset); + +$node_publisher->wait_for_catchup('regress_sub'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(1) FROM regress_tab"); +is ($result, 1, 'inserts are replicated to subscriber'); + +# Cleanup +$node_subscriber->safe_psql('postgres', "DROP INDEX local_unique_idx_expr;"); +$node_publisher->safe_psql('postgres', "TRUNCATE TABLE regress_tab;"); +$node_publisher->wait_for_catchup('regress_sub'); + done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index db8bb4e1e43..34c9592e3e5 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1701,6 +1701,7 @@ LogicalRepBeginData LogicalRepCommitData LogicalRepCommitPreparedTxnData LogicalRepCtxStruct +LogicalRepKeyKind LogicalRepMsgType LogicalRepPartMapEntry LogicalRepPreparedTxnData @@ -1710,6 +1711,7 @@ LogicalRepRelation LogicalRepRollbackPreparedTxnData LogicalRepSequenceInfo LogicalRepStreamAbortData +LogicalRepSubscriberIdx LogicalRepTupleData LogicalRepTyp LogicalRepWorker -- 2.43.0