/* * Check if changes on one relation can be applied by an apply background * worker and assign the 'parallel_apply' flag. * * There are two requirements for applying changes in an apply background * worker: 1) The unique column in the relation on the subscriber-side should * also be the unique column on the publisher-side; 2) There cannot be any * non-immutable functions used by the subscriber-side. * * We just mark the relation entry as 'PARALLEL_APPLY_UNSAFE' here if changes * on one relation can not be applied by an apply background worker and leave * it to apply_bgworker_relation_check() to throw the actual error if needed. */ static void logicalrep_rel_mark_parallel_apply(LogicalRepRelMapEntry *entry) { Bitmapset *ukey; int i; TupleDesc tupdesc; int attnum; List *fkeys = NIL; /* Fast path if 'parallel_apply' flag is already known. */ if (entry->parallel_apply != PARALLEL_APPLY_UNKNOWN) return; /* * First, check if the unique column in the relation on the subscriber-side * is also the unique column on the publisher-side. */ ukey = RelationGetIndexAttrBitmap(entry->localrel, INDEX_ATTR_BITMAP_KEY); if (ukey) { i = -1; while ((i = bms_next_member(ukey, i)) >= 0) { attnum = AttrNumberGetAttrOffset(i + FirstLowInvalidHeapAttributeNumber); if (entry->attrmap->attnums[attnum] < 0 || !bms_is_member(entry->attrmap->attnums[attnum], entry->remoterel.attunique)) goto parallel_apply_unsafe; } bms_free(ukey); } /* * Then, check if there is any non-immutable function used by the local * table. Look for functions in the following places: * a. trigger functions; * b. Column default value expressions and domain constraints; * c. Constraint expressions; * d. Foreign keys. */ /* Check the trigger functions. */ if (entry->localrel->trigdesc != NULL) { for (i = 0; i < entry->localrel->trigdesc->numtriggers; i++) { Trigger *trig = entry->localrel->trigdesc->triggers + i; if (trig->tgenabled != TRIGGER_FIRES_ALWAYS && trig->tgenabled != TRIGGER_FIRES_ON_REPLICA) continue; if (func_volatile(trig->tgfoid) != PROVOLATILE_IMMUTABLE) goto parallel_apply_unsafe; } } /* Check the columns. */ tupdesc = RelationGetDescr(entry->localrel); for (attnum = 0; attnum < tupdesc->natts; attnum++) { Form_pg_attribute att = TupleDescAttr(tupdesc, attnum); /* We don't need info for dropped or generated attributes */ if (att->attisdropped || att->attgenerated) continue; /* * We don't need to check columns that only exist on the * subscriber */ if (entry->attrmap->attnums[attnum] < 0) continue; if (att->atthasdef) { Node *defaultexpr; defaultexpr = build_column_default(entry->localrel, attnum + 1); if (contain_mutable_functions(defaultexpr)) goto parallel_apply_unsafe; } /* * If the column is of a DOMAIN type, determine whether * that domain has any CHECK expressions that are not * immutable. */ if (get_typtype(att->atttypid) == TYPTYPE_DOMAIN) { List *domain_constraints; ListCell *lc; domain_constraints = GetDomainConstraints(att->atttypid); foreach(lc, domain_constraints) { DomainConstraintState *con = (DomainConstraintState *) lfirst(lc); if (con->check_expr && contain_mutable_functions((Node *) con->check_expr)) goto parallel_apply_unsafe; } } } /* Check the constraints. */ if (tupdesc->constr) { ConstrCheck *check = tupdesc->constr->check; /* * Determine if there are any CHECK constraints which * contains non-immutable function. */ for (i = 0; i < tupdesc->constr->num_check; i++) { Expr *check_expr = stringToNode(check[i].ccbin); if (contain_mutable_functions((Node *) check_expr)) goto parallel_apply_unsafe; } } /* Check the foreign keys. */ fkeys = RelationGetFKeyList(entry->localrel); if (fkeys) goto parallel_apply_unsafe; parallel_apply_safe: entry->parallel_apply = PARALLEL_APPLY_SAFE; return; parallel_apply_unsafe: entry->parallel_apply = PARALLEL_APPLY_UNSAFE; return; }