diff --git a/src/backend/catalog/pg_constraint.c b/src/backend/catalog/pg_constraint.c index 3c960c9423..9016f1f093 100644 --- a/src/backend/catalog/pg_constraint.c +++ b/src/backend/catalog/pg_constraint.c @@ -448,7 +448,6 @@ clone_fk_constraints(Relation pg_constraint, Relation parentRel, { TupleDesc tupdesc; AttrNumber *attmap; - List *partFKs; List *subclone = NIL; ListCell *cell; @@ -462,8 +461,6 @@ clone_fk_constraints(Relation pg_constraint, Relation parentRel, RelationGetDescr(parentRel), gettext_noop("could not convert row type")); - partFKs = copyObject(RelationGetFKeyList(partRel)); - foreach(cell, clone) { Oid parentConstrOid = lfirst_oid(cell); @@ -476,12 +473,10 @@ clone_fk_constraints(Relation pg_constraint, Relation parentRel, Oid conppeqop[INDEX_MAX_KEYS]; Oid conffeqop[INDEX_MAX_KEYS]; Constraint *fkconstraint; - bool attach_it; Oid constrOid; ObjectAddress parentAddr, childAddr; int nelem; - ListCell *cell; int i; ArrayType *arr; Datum datum; @@ -547,20 +542,6 @@ clone_fk_constraints(Relation pg_constraint, Relation parentRel, elog(ERROR, "conpfeqop is not a 1-D OID array"); memcpy(conpfeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid)); - datum = fastgetattr(tuple, Anum_pg_constraint_conpfeqop, - tupdesc, &isnull); - if (isnull) - elog(ERROR, "null conpfeqop"); - arr = DatumGetArrayTypeP(datum); - nelem = ARR_DIMS(arr)[0]; - if (ARR_NDIM(arr) != 1 || - nelem < 1 || - nelem > INDEX_MAX_KEYS || - ARR_HASNULL(arr) || - ARR_ELEMTYPE(arr) != OIDOID) - elog(ERROR, "conpfeqop is not a 1-D OID array"); - memcpy(conpfeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid)); - datum = fastgetattr(tuple, Anum_pg_constraint_conppeqop, tupdesc, &isnull); if (isnull) @@ -590,83 +571,12 @@ clone_fk_constraints(Relation pg_constraint, Relation parentRel, memcpy(conffeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid)); /* - * Before creating a new constraint, see whether any existing FKs are - * fit for the purpose. If one is, attach the parent constraint to it, - * and don't clone anything. This way we avoid the expensive - * verification step and don't end up with a duplicate FK. This also - * means we don't consider this constraint when recursing to - * partitions. - */ - attach_it = false; - foreach(cell, partFKs) - { - ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, cell); - Form_pg_constraint partConstr; - HeapTuple partcontup; - - attach_it = true; - - /* - * Do some quick & easy initial checks. If any of these fail, we - * cannot use this constraint, but keep looking. - */ - if (fk->confrelid != constrForm->confrelid || fk->nkeys != nelem) - { - attach_it = false; - continue; - } - for (i = 0; i < nelem; i++) - { - if (fk->conkey[i] != mapped_conkey[i] || - fk->confkey[i] != confkey[i] || - fk->conpfeqop[i] != conpfeqop[i]) - { - attach_it = false; - break; - } - } - if (!attach_it) - continue; - - /* - * Looks good so far; do some more extensive checks. Presumably - * the check for 'convalidated' could be dropped, since we don't - * really care about that, but let's be careful for now. - */ - partcontup = SearchSysCache1(CONSTROID, - ObjectIdGetDatum(fk->conoid)); - if (!partcontup) - elog(ERROR, "cache lookup failed for constraint %u", - fk->conoid); - partConstr = (Form_pg_constraint) GETSTRUCT(partcontup); - if (OidIsValid(partConstr->conparentid) || - !partConstr->convalidated || - partConstr->condeferrable != constrForm->condeferrable || - partConstr->condeferred != constrForm->condeferred || - partConstr->confupdtype != constrForm->confupdtype || - partConstr->confdeltype != constrForm->confdeltype || - partConstr->confmatchtype != constrForm->confmatchtype) - { - ReleaseSysCache(partcontup); - attach_it = false; - continue; - } - - ReleaseSysCache(partcontup); - - /* looks good! Attach this constraint */ - ConstraintSetParentConstraint(fk->conoid, constrForm->oid); - CommandCounterIncrement(); - attach_it = true; - break; - } - - /* * If we attached to an existing constraint, there is no need to * create a new one. In fact, there's no need to recurse for this * constraint to partitions, either. */ - if (attach_it) + if (attach_to_existing_FK_constraint(partRel, parentRel, + pg_constraint, tuple)) { ReleaseSysCache(tuple); continue; @@ -738,7 +648,6 @@ clone_fk_constraints(Relation pg_constraint, Relation parentRel, } pfree(attmap); - list_free_deep(partFKs); /* * If the partition is partitioned, recurse to handle any constraints that diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index d2781cbf19..e828bf600c 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -7654,9 +7654,16 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, if (recurse && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) { PartitionDesc partdesc; + Relation pg_constraint; + HeapTuple tuple; partdesc = RelationGetPartitionDesc(rel); + tuple = SearchSysCache1(CONSTROID, constrOid); + if (!tuple) + elog(ERROR, "cache lookup failed for constraint %u", constrOid); + pg_constraint = heap_open(ConstraintRelationId, RowShareLock); + for (i = 0; i < partdesc->nparts; i++) { Oid partitionId = partdesc->oids[i]; @@ -7664,6 +7671,18 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, AlteredTableInfo *childtab; ObjectAddress childAddr; + /* + * Try to check if the partition already has a FK constraint + * that can be attached to the one being added in the parent + * instead of creating a new one for the partition. + */ + if (attach_to_existing_FK_constraint(partition, rel, pg_constraint, + tuple)) + { + heap_close(partition, NoLock); + continue; + } + CheckTableNotInUse(partition, "ALTER TABLE"); /* Find or create work queue entry for this table */ @@ -7679,6 +7698,9 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, heap_close(partition, NoLock); } + + heap_close(pg_constraint, RowShareLock); + ReleaseSysCache(tuple); } /* @@ -15163,3 +15185,124 @@ update_relispartition(Relation classRel, Oid relationId, bool newval) if (opened) heap_close(classRel, RowExclusiveLock); } + +/* + * attach_to_existing_FK_constraint + * Try to attach a FK constraint described by 'constrTuple' of the parent + * relation 'parentRel' to one of the existing FK constraints of + * partition 'partRel' + * + * Returns true if it succeeds in doing so. + */ + +bool +attach_to_existing_FK_constraint(Relation partRel, Relation parentRel, + Relation pg_constraint, HeapTuple constrTuple) +{ + bool attached = false; + List *partFKs = copyObject(RelationGetFKeyList(partRel)); + Form_pg_constraint constrForm; + TupleDesc tupdesc = RelationGetDescr(pg_constraint); + int nelem, + i; + AttrNumber conkey[INDEX_MAX_KEYS]; + AttrNumber mapped_conkey[INDEX_MAX_KEYS]; + AttrNumber confkey[INDEX_MAX_KEYS]; + Oid conpfeqop[INDEX_MAX_KEYS]; + AttrNumber *attmap; + ArrayType *arr; + Datum datum; + bool isnull; + ListCell *cell; + + constrForm = (Form_pg_constraint) GETSTRUCT(constrTuple); + Assert(constrForm->contype == CONSTRAINT_FOREIGN); + + /* + * The constraint key may differ, if the columns in the partition are + * different. This map is used to convert them. + */ + attmap = convert_tuples_by_name_map(RelationGetDescr(partRel), + RelationGetDescr(parentRel), + gettext_noop("could not convert row type")); + + datum = fastgetattr(constrTuple, Anum_pg_constraint_conkey, tupdesc, + &isnull); + Assert(!isnull); + arr = DatumGetArrayTypeP(datum); + nelem = ARR_DIMS(arr)[0]; + memcpy(conkey, ARR_DATA_PTR(arr), nelem * sizeof(AttrNumber)); + for (i = 0; i < nelem; i++) + mapped_conkey[i] = attmap[conkey[i] - 1]; + + datum = fastgetattr(constrTuple, Anum_pg_constraint_confkey, tupdesc, &isnull); + Assert(!isnull); + arr = DatumGetArrayTypeP(datum); + memcpy(confkey, ARR_DATA_PTR(arr), nelem * sizeof(AttrNumber)); + + datum = fastgetattr(constrTuple, Anum_pg_constraint_conpfeqop, tupdesc, &isnull); + Assert(!isnull); + arr = DatumGetArrayTypeP(datum); + memcpy(conpfeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid)); + + /* + * Before creating a new constraint, see whether any existing FKs are + * fit for the purpose. If one is, attach the parent constraint to it, + * and don't clone anything. This way we avoid the expensive + * verification step and don't end up with a duplicate FK. This also + * means we don't consider this constraint when recursing to + * partitions. + */ + foreach(cell, partFKs) + { + ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, cell); + Form_pg_constraint partConstr; + HeapTuple partcontup; + + /* + * Do some quick & easy initial checks. If any of these fail, we + * cannot use this constraint, but keep looking. + */ + if (fk->confrelid != constrForm->confrelid || fk->nkeys != nelem) + continue; + + for (i = 0; i < nelem; i++) + if (fk->conkey[i] != mapped_conkey[i] || + fk->confkey[i] != confkey[i] || + fk->conpfeqop[i] != conpfeqop[i]) + continue; + + /* + * Looks good so far; do some more extensive checks. Presumably + * the check for 'convalidated' could be dropped, since we don't + * really care about that, but let's be careful for now. + */ + partcontup = SearchSysCache1(CONSTROID, ObjectIdGetDatum(fk->conoid)); + if (!partcontup) + elog(ERROR, "cache lookup failed for constraint %u", fk->conoid); + partConstr = (Form_pg_constraint) GETSTRUCT(partcontup); + if (OidIsValid(partConstr->conparentid) || + !partConstr->convalidated || + partConstr->condeferrable != constrForm->condeferrable || + partConstr->condeferred != constrForm->condeferred || + partConstr->confupdtype != constrForm->confupdtype || + partConstr->confdeltype != constrForm->confdeltype || + partConstr->confmatchtype != constrForm->confmatchtype) + { + ReleaseSysCache(partcontup); + continue; + } + + ReleaseSysCache(partcontup); + + /* looks good! Attach this constraint */ + ConstraintSetParentConstraint(fk->conoid, constrForm->oid); + CommandCounterIncrement(); + attached = true; + break; + } + + list_free_deep(partFKs); + + return attached; +} diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h index ec3bb90b01..9255118814 100644 --- a/src/include/commands/tablecmds.h +++ b/src/include/commands/tablecmds.h @@ -96,5 +96,9 @@ extern void RangeVarCallbackOwnsRelation(const RangeVar *relation, Oid relId, Oid oldRelId, void *noCatalogs); extern bool PartConstraintImpliedByRelConstraint(Relation scanrel, List *partConstraint); +extern bool attach_to_existing_FK_constraint(Relation partRel, + Relation parentRel, + Relation pg_constraint, + HeapTuple constrTuple); #endif /* TABLECMDS_H */