*** src/backend/commands/tablecmds.c.orig Thu Oct 2 15:24:52 2003 --- src/backend/commands/tablecmds.c Sun Oct 5 16:29:51 2003 *************** *** 3455,3460 **** --- 3455,3467 ---- int count; /* + * See if we can do it with a single LEFT JOIN query. A FALSE result + * indicates we must proceed with the fire-the-trigger method. + */ + if (RI_Initial_Check(fkconstraint, rel, pkrel)) + return; + + /* * Scan through each tuple, calling RI_FKey_check_ins (insert trigger) * as if that tuple had just been inserted. If any of those fail, it * should ereport(ERROR) and that's that. *** src/backend/utils/adt/ri_triggers.c.orig Wed Oct 1 17:30:52 2003 --- src/backend/utils/adt/ri_triggers.c Sun Oct 5 16:42:37 2003 *************** *** 40,45 **** --- 40,46 ---- #include "rewrite/rewriteHandler.h" #include "utils/lsyscache.h" #include "utils/typcache.h" + #include "utils/acl.h" #include "miscadmin.h" *************** *** 164,170 **** Datum *vals, char *nulls); static void ri_ReportViolation(RI_QueryKey *qkey, const char *constrname, Relation pk_rel, Relation fk_rel, ! HeapTuple violator, bool spi_err); /* ---------- --- 165,172 ---- Datum *vals, char *nulls); static void ri_ReportViolation(RI_QueryKey *qkey, const char *constrname, Relation pk_rel, Relation fk_rel, ! HeapTuple violator, TupleDesc tupdesc, ! bool spi_err); /* ---------- *************** *** 2540,2546 **** --- 2542,2743 ---- } + /* ---------- + * RI_Initial_Check - + * + * Check an entire table for non-matching values using a single query. + * This is not a trigger procedure, but is called during ALTER TABLE + * ADD FOREIGN KEY to validate the initial table contents. + * + * We expect that an exclusive lock has been taken on rel and pkrel; + * hence, we do not need to lock individual rows for the check. + * + * If the check fails because the current user doesn't have permissions + * to read both tables, return false to let our caller know that they will + * need to do something else to check the constraint. + * ---------- + */ + bool + RI_Initial_Check(FkConstraint *fkconstraint, Relation rel, Relation pkrel) + { + const char *constrname = fkconstraint->constr_name; + char querystr[MAX_QUOTED_REL_NAME_LEN * 2 + 250 + + (MAX_QUOTED_NAME_LEN + 32) * ((RI_MAX_NUMKEYS * 4)+1)]; + char pkrelname[MAX_QUOTED_REL_NAME_LEN]; + char relname[MAX_QUOTED_REL_NAME_LEN]; + char attname[MAX_QUOTED_NAME_LEN]; + char fkattname[MAX_QUOTED_NAME_LEN]; + const char *sep; + List *list; + List *list2; + int spi_result; + void *qplan; + + /* + * Check to make sure current user has enough permissions to do the + * test query. (If not, caller can fall back to the trigger method, + * which works because it changes user IDs on the fly.) + * + * XXX are there any other show-stopper conditions to check? + */ + if (pg_class_aclcheck(RelationGetRelid(rel), GetUserId(), ACL_SELECT) != ACLCHECK_OK) + return false; + if (pg_class_aclcheck(RelationGetRelid(pkrel), GetUserId(), ACL_SELECT) != ACLCHECK_OK) + return false; + + /*---------- + * The query string built is: + * SELECT fk.keycols FROM ONLY relname fk + * LEFT OUTER JOIN ONLY pkrelname pk + * ON (pk.pkkeycol1=fk.keycol1 [AND ...]) + * WHERE pk.pkkeycol1 IS NULL AND + * For MATCH unspecified: + * (fk.keycol1 IS NOT NULL [AND ...]) + * For MATCH FULL: + * (fk.keycol1 IS NOT NULL [OR ...]) + *---------- + */ + + sprintf(querystr, "SELECT "); + sep=""; + foreach(list, fkconstraint->fk_attrs) + { + quoteOneName(attname, strVal(lfirst(list))); + snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr), + "%sfk.%s", sep, attname); + sep = ", "; + } + + quoteRelationName(pkrelname, pkrel); + quoteRelationName(relname, rel); + snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr), + " FROM ONLY %s fk LEFT OUTER JOIN ONLY %s pk ON (", + relname, pkrelname); + + sep=""; + for (list=fkconstraint->pk_attrs, list2=fkconstraint->fk_attrs; + list != NIL && list2 != NIL; + list=lnext(list), list2=lnext(list2)) + { + quoteOneName(attname, strVal(lfirst(list))); + quoteOneName(fkattname, strVal(lfirst(list2))); + snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr), + "%spk.%s=fk.%s", + sep, attname, fkattname); + sep = " AND "; + } + /* + * It's sufficient to test any one pk attribute for null to detect a + * join failure. + */ + quoteOneName(attname, strVal(lfirst(fkconstraint->pk_attrs))); + snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr), + ") WHERE pk.%s IS NULL AND (", attname); + + sep=""; + foreach(list, fkconstraint->fk_attrs) + { + quoteOneName(attname, strVal(lfirst(list))); + snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr), + "%sfk.%s IS NOT NULL", + sep, attname); + switch (fkconstraint->fk_matchtype) + { + case FKCONSTR_MATCH_UNSPECIFIED: + sep=" AND "; + break; + case FKCONSTR_MATCH_FULL: + sep=" OR "; + break; + case FKCONSTR_MATCH_PARTIAL: + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("MATCH PARTIAL not yet implemented"))); + break; + default: + elog(ERROR, "unrecognized match type: %d", + fkconstraint->fk_matchtype); + break; + } + } + snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr), + ")"); + + if (SPI_connect() != SPI_OK_CONNECT) + elog(ERROR, "SPI_connect failed"); + + /* + * Generate the plan. We don't need to cache it, and there are no + * arguments to the plan. + */ + qplan = SPI_prepare(querystr, 0, NULL); + + /* + * Run the plan. For safety we force a current query snapshot to be + * used. (In serializable mode, this arguably violates serializability, + * but we really haven't got much choice.) We need at most one tuple + * returned, so pass limit = 1. + */ + spi_result = SPI_execp_current(qplan, NULL, NULL, true, 1); + /* Check result */ + if (spi_result != SPI_OK_SELECT) + elog(ERROR, "SPI_execp_current returned %d", spi_result); + + /* Did we find a tuple violating the constraint? */ + if (SPI_processed > 0) + { + HeapTuple tuple = SPI_tuptable->vals[0]; + TupleDesc tupdesc = SPI_tuptable->tupdesc; + int nkeys = length(fkconstraint->fk_attrs); + int i; + RI_QueryKey qkey; + + /* + * If it's MATCH FULL, and there are any nulls in the FK keys, + * complain about that rather than the lack of a match. MATCH FULL + * disallows partially-null FK rows. + */ + if (fkconstraint->fk_matchtype == FKCONSTR_MATCH_FULL) + { + bool isnull = false; + + for (i = 1; i <= nkeys; i++) + { + (void) SPI_getbinval(tuple, tupdesc, i, &isnull); + if (isnull) + break; + } + if (isnull) + ereport(ERROR, + (errcode(ERRCODE_FOREIGN_KEY_VIOLATION), + errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"", + RelationGetRelationName(rel), + constrname), + errdetail("MATCH FULL does not allow mixing of null and nonnull key values."))); + } + + /* + * Although we didn't cache the query, we need to set up a fake + * query key to pass to ri_ReportViolation. + */ + MemSet(&qkey, 0, sizeof(qkey)); + qkey.constr_queryno = RI_PLAN_CHECK_LOOKUPPK; + qkey.nkeypairs = nkeys; + for (i = 0; i < nkeys; i++) + qkey.keypair[i][RI_KEYPAIR_FK_IDX] = i + 1; + + ri_ReportViolation(&qkey, constrname, + pkrel, rel, + tuple, tupdesc, + false); + } + + if (SPI_finish() != SPI_OK_FINISH) + elog(ERROR, "SPI_finish failed"); + + return true; + } /* ---------- *************** *** 2905,2910 **** --- 3102,3108 ---- ri_ReportViolation(qkey, constrname ? constrname : "", pk_rel, fk_rel, new_tuple ? new_tuple : old_tuple, + NULL, true); /* XXX wouldn't it be clearer to do this part at the caller? */ *************** *** 2913,2918 **** --- 3111,3117 ---- ri_ReportViolation(qkey, constrname, pk_rel, fk_rel, new_tuple ? new_tuple : old_tuple, + NULL, false); return SPI_processed != 0; *************** *** 2950,2956 **** static void ri_ReportViolation(RI_QueryKey *qkey, const char *constrname, Relation pk_rel, Relation fk_rel, ! HeapTuple violator, bool spi_err) { #define BUFLENGTH 512 char key_names[BUFLENGTH]; --- 3149,3156 ---- static void ri_ReportViolation(RI_QueryKey *qkey, const char *constrname, Relation pk_rel, Relation fk_rel, ! HeapTuple violator, TupleDesc tupdesc, ! bool spi_err) { #define BUFLENGTH 512 char key_names[BUFLENGTH]; *************** *** 2958,2964 **** char *name_ptr = key_names; char *val_ptr = key_values; bool onfk; - Relation rel; int idx, key_idx; --- 3158,3163 ---- *************** *** 2972,2989 **** errhint("This is most likely due to a rule having rewritten the query."))); /* ! * rel is set to where the tuple description is coming from. */ onfk = (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK); if (onfk) { - rel = fk_rel; key_idx = RI_KEYPAIR_FK_IDX; } else { - rel = pk_rel; key_idx = RI_KEYPAIR_PK_IDX; } /* --- 3171,3191 ---- errhint("This is most likely due to a rule having rewritten the query."))); /* ! * Determine which relation to complain about. If tupdesc wasn't ! * passed by caller, assume the violator tuple came from there. */ onfk = (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK); if (onfk) { key_idx = RI_KEYPAIR_FK_IDX; + if (tupdesc == NULL) + tupdesc = fk_rel->rd_att; } else { key_idx = RI_KEYPAIR_PK_IDX; + if (tupdesc == NULL) + tupdesc = pk_rel->rd_att; } /* *************** *** 3008,3015 **** char *name, *val; ! name = SPI_fname(rel->rd_att, fnum); ! val = SPI_getvalue(violator, rel->rd_att, fnum); if (!val) val = "null"; --- 3210,3217 ---- char *name, *val; ! name = SPI_fname(tupdesc, fnum); ! val = SPI_getvalue(violator, tupdesc, fnum); if (!val) val = "null"; *** src/include/commands/trigger.h.orig Sun Aug 3 23:01:31 2003 --- src/include/commands/trigger.h Sun Oct 5 16:11:44 2003 *************** *** 197,201 **** --- 197,204 ---- * in utils/adt/ri_triggers.c */ extern bool RI_FKey_keyequal_upd(TriggerData *trigdata); + extern bool RI_Initial_Check(FkConstraint *fkconstraint, + Relation rel, + Relation pkrel); #endif /* TRIGGER_H */