ADD/DROP INHERITS

From: Greg Stark <gsstark(at)mit(dot)edu>
To: pgsql-hackers(at)postgresql(dot)org
Subject: ADD/DROP INHERITS
Date: 2006-06-07 17:33:03
Message-ID: 878xo8q3ao.fsf@stark.xeocode.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers


I've implemented most of ADD/DROP INHERITS but it's my first significant piece
of code at this level. I would appreciate any feedback about it. In particular
I'm worried I may be on the wrong track about how some low level operations
work like memory management, syscache lookups, heap tuple creation etc. Also,
I'm not at all clear what kind of locks are really necessary for this
operation. I may be taking excessively strong or weak locks or have deadlock
risks.

The main thing remaining to be done is implementing default column
expressions. Those would require an Alter Table "Pass 3" operation I believe.
Also I haven't looked at table constraints at all yet, I'm not clear what's
supposed to happen there.

I made some decisions on some semantic issues that I believe are correct but
could stand some double checking. Specifically If the parent has oids then the
child must have oids and if a column in the parent is NOT NULL then the column
in the child must be NOT NULL as well.

I can send the actual patch to psql-patches, it includes some other changes to
refactor StoreCatalogInheritance and add the syntax to gram.y. But it's still
not quite finished because of default values.

static void
ATExecAddInherits(Relation rel, RangeVar *parent)
{
Relation relation, catalogRelation;
SysScanDesc scan;
ScanKeyData key;
HeapTuple inheritsTuple;
int4 inhseqno = 0;
ListCell *child;
List *children;

relation = heap_openrv(parent, AccessShareLock); /* XXX is this enough locking? */
if (relation->rd_rel->relkind != RELKIND_RELATION)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("inherited relation \"%s\" is not a table",
parent->relname)));

/* Permanent rels cannot inherit from temporary ones */
if (!rel->rd_istemp && isTempNamespace(RelationGetNamespace(relation)))
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot inherit from temporary relation \"%s\"",
parent->relname)));

if (!pg_class_ownercheck(RelationGetRelid(relation), GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
RelationGetRelationName(relation));

/* If parent has OIDs then all children must have OIDs */
if (relation->rd_rel->relhasoids && !rel->rd_rel->relhasoids)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("table \"%s\" without OIDs cannot inherit from table \"%s\" with OIDs",
RelationGetRelationName(rel), parent->relname)));

/*
* Reject duplications in the list of parents. -- this is the same check as
* when creating a table, but maybe we should check for the parent anywhere
* higher in the inheritance structure?
*/
catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
ScanKeyInit(&key,
Anum_pg_inherits_inhrelid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(RelationGetRelid(rel)));
scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId, true, SnapshotNow, 1, &key);
while (HeapTupleIsValid(inheritsTuple = systable_getnext(scan)))
{
Form_pg_inherits inh = (Form_pg_inherits) GETSTRUCT(inheritsTuple);
if (inh->inhparent == RelationGetRelid(relation))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_TABLE),
errmsg("inherited relation \"%s\" duplicated",
parent->relname)));
if (inh->inhseqno > inhseqno)
inhseqno = inh->inhseqno;
}
systable_endscan(scan);
heap_close(catalogRelation, RowExclusiveLock);

/* Get children because we have to manually recurse and also because we
* have to check for recursive inheritance graphs */

/* this routine is actually in the planner */
children = find_all_inheritors(RelationGetRelid(rel));

if (list_member_oid(children, RelationGetRelid(relation)))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_TABLE),
errmsg("Circular inheritance structure found")));

foreach(child, children)
{
Oid childrelid = lfirst_oid(child);
Relation childrel;

childrel = relation_open(childrelid, AccessExclusiveLock);
MergeAttributesIntoExisting(childrel, relation);
relation_close(childrel, NoLock);
}

catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
StoreCatalogInheritance1(RelationGetRelid(rel), RelationGetRelid(relation), inhseqno+1, catalogRelation);
heap_close(catalogRelation, RowExclusiveLock);

heap_close(relation, AccessShareLock);
}

static void
MergeAttributesIntoExisting(Relation rel, Relation relation)
{
Relation attrdesc;
AttrNumber parent_attno, child_attno;
TupleDesc tupleDesc;
TupleConstr *constr;
HeapTuple tuple;

child_attno = RelationGetNumberOfAttributes(rel);

tupleDesc = RelationGetDescr(relation);
constr = tupleDesc->constr;

for (parent_attno = 1; parent_attno <= tupleDesc->natts;
parent_attno++)
{
Form_pg_attribute attribute = tupleDesc->attrs[parent_attno - 1];
char *attributeName = NameStr(attribute->attname);

/* Ignore dropped columns in the parent. */
if (attribute->attisdropped)
continue;

/* Does it conflict with an existing column? */
attrdesc = heap_open(AttributeRelationId, RowExclusiveLock);

tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), attributeName);
if (HeapTupleIsValid(tuple)) {
/*
* Yes, try to merge the two column definitions. They must
* have the same type and typmod.
*/
Form_pg_attribute childatt = (Form_pg_attribute) GETSTRUCT(tuple);
ereport(NOTICE,
(errmsg("merging column \"%s\" with inherited definition",
attributeName)));
if (attribute->atttypid != childatt->atttypid ||
attribute->atttypmod != childatt->atttypmod ||
(attribute->attnotnull && !childatt->attnotnull))
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("child table \"%s\" has different type for column \"%s\"",
RelationGetRelationName(rel), NameStr(attribute->attname))));

childatt->attinhcount++;
simple_heap_update(attrdesc, &tuple->t_self, tuple);
CatalogUpdateIndexes(attrdesc, tuple); /* XXX strength reduce openindexes to outside loop? */
heap_freetuple(tuple);

/* XXX defaults */

} else {
/*
* No, create a new inherited column
*/

FormData_pg_attribute attributeD;
HeapTuple attributeTuple = heap_addheader(Natts_pg_attribute,
false,
ATTRIBUTE_TUPLE_SIZE,
(void *) &attributeD);
Form_pg_attribute childatt = (Form_pg_attribute) GETSTRUCT(attributeTuple);

if (attribute->attnotnull)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("Cannot add new inherited NOT NULL column \"%s\"",
NameStr(attribute->attname))));

childatt->attrelid = RelationGetRelid(rel);
namecpy(&childatt->attname, &attribute->attname);
childatt->atttypid = attribute->atttypid;
childatt->attstattarget = -1;
childatt->attlen = attribute->attlen;
childatt->attcacheoff = -1;
childatt->atttypmod = attribute->atttypmod;
childatt->attnum = ++child_attno;
childatt->attbyval = attribute->attbyval;
childatt->attndims = attribute->attndims;
childatt->attstorage = attribute->attstorage;
childatt->attalign = attribute->attalign;
childatt->attnotnull = false;
childatt->atthasdef = false; /* XXX */
childatt->attisdropped = false;
childatt->attislocal = false;
childatt->attinhcount = attribute->attinhcount+1;

simple_heap_insert(attrdesc, attributeTuple);
CatalogUpdateIndexes(attrdesc, attributeTuple);
heap_freetuple(attributeTuple);

/* XXX Defaults */

}
heap_close(attrdesc, RowExclusiveLock);
}

}

static void
ATExecDropInherits(Relation rel, RangeVar *parent)
{

Relation catalogRelation;
SysScanDesc scan;
ScanKeyData key;
HeapTuple inheritsTuple, attributeTuple;
Oid inhparent;
Oid dropparent;
int found = 0;

/* Get the OID of parent -- if no schema is specified use the regular
* search path and only drop the one table that's found. We could try to be
* clever and look at each parent and see if it matches but that would be
* inconsistent with other operations I think. */

Assert(rel);
Assert(parent);

dropparent = RangeVarGetRelid(parent, false);

/* Search through the direct parents of rel looking for dropparent oid */

catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
ScanKeyInit(&key,
Anum_pg_inherits_inhrelid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(RelationGetRelid(rel)));
scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId, true, SnapshotNow, 1, &key);
while (!found && HeapTupleIsValid(inheritsTuple = systable_getnext(scan)))
{
inhparent = ((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhparent;
if (inhparent == dropparent) {
simple_heap_delete(catalogRelation, &inheritsTuple->t_self);
found = 1;
}
}
systable_endscan(scan);
heap_close(catalogRelation, RowExclusiveLock);

if (!found) {
/* would it be better to look up the actual schema of dropparent and
* make the error message explicitly name the qualified name it's
* trying to drop ?*/
if (parent->schemaname)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_TABLE),
errmsg("relation \"%s.%s\" is not a parent of relation \"%s\"",
parent->schemaname, parent->relname, RelationGetRelationName(rel))));
else
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_TABLE),
errmsg("relation \"%s\" is not a parent of relation \"%s\"",
parent->relname, RelationGetRelationName(rel))));
}

/* Search through columns looking for matching columns from parent table */

catalogRelation = heap_open(AttributeRelationId, RowExclusiveLock);
ScanKeyInit(&key,
Anum_pg_attribute_attrelid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(RelationGetRelid(rel)));
scan = systable_beginscan(catalogRelation, AttributeRelidNumIndexId, true, SnapshotNow, 1, &key);
while (HeapTupleIsValid(attributeTuple = systable_getnext(scan))) {
Form_pg_attribute att = ((Form_pg_attribute)GETSTRUCT(attributeTuple));
/* Not an inherited column at all
* (do NOT use islocal for this test--it can be true for inherited columns)
*/
if (att->attinhcount == 0)
continue;
if (att->attisdropped) /* XXX Is this right? */
continue;
if (SearchSysCacheExistsAttName(dropparent, NameStr(att->attname))) {
/* Decrement inhcount and possibly set islocal to 1 */
HeapTuple copyTuple = heap_copytuple(attributeTuple);
Form_pg_attribute copy_att = ((Form_pg_attribute)GETSTRUCT(copyTuple));

copy_att->attinhcount--;
if (copy_att->attinhcount == 0)
copy_att->attislocal = 1;

simple_heap_update(catalogRelation, &copyTuple->t_self, copyTuple);
/* XXX "Avoid using it for multiple tuples, since opening the
* indexes and building the index info structures is moderately
* expensive." Perhaps this can be moved outside the loop or else
* at least the CatalogOpenIndexes/CatalogCloseIndexes moved
* outside the loop but when I try that it seg faults?!*/
CatalogUpdateIndexes(catalogRelation, copyTuple);
heap_freetuple(copyTuple);
}
}
systable_endscan(scan);
heap_close(catalogRelation, RowExclusiveLock);
}

--
greg

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Tom Lane 2006-06-07 17:42:28 Re: That EXPLAIN ANALYZE patch still needs work
Previous Message Markus Schiltknecht 2006-06-07 17:29:56 Re: Snowball and ispell in tsearch2