ADD/DROP INHERITS - Mailing list pgsql-hackers

From Greg Stark
Subject ADD/DROP INHERITS
Date
Msg-id 878xo8q3ao.fsf@stark.xeocode.com
Whole thread Raw
Responses Re: ADD/DROP INHERITS  (Andrew Dunstan <andrew@dunslane.net>)
Re: ADD/DROP INHERITS  (Tom Lane <tgl@sss.pgh.pa.us>)
Re: ADD/DROP INHERITS  (Greg Stark <gsstark@mit.edu>)
List pgsql-hackers
I've implemented most of ADD/DROP INHERITS but it's my first significant piece
of code at this level. I would appreciate any feedback about it. In particular
I'm worried I may be on the wrong track about how some low level operations
work like memory management, syscache lookups, heap tuple creation etc. Also,
I'm not at all clear what kind of locks are really necessary for this
operation. I may be taking excessively strong or weak locks or have deadlock
risks.

The main thing remaining to be done is implementing default column
expressions. Those would require an Alter Table "Pass 3" operation I believe.
Also I haven't looked at table constraints at all yet, I'm not clear what's
supposed to happen there.

I made some decisions on some semantic issues that I believe are correct but
could stand some double checking. Specifically If the parent has oids then the
child must have oids and if a column in the parent is NOT NULL then the column
in the child must be NOT NULL as well.

I can send the actual patch to psql-patches, it includes some other changes to
refactor StoreCatalogInheritance and add the syntax to gram.y. But it's still
not quite finished because of default values.




static void
ATExecAddInherits(Relation rel, RangeVar *parent)
{Relation relation, catalogRelation;SysScanDesc scan;ScanKeyData key;HeapTuple inheritsTuple;int4 inhseqno = 0;ListCell
 *child;List       *children;
 
relation = heap_openrv(parent, AccessShareLock); /* XXX is this enough locking? */if (relation->rd_rel->relkind !=
RELKIND_RELATION)   ereport(ERROR,            (errcode(ERRCODE_WRONG_OBJECT_TYPE),             errmsg("inherited
relation\"%s\" is not a table",                    parent->relname)));
 

/* Permanent rels cannot inherit from temporary ones */if (!rel->rd_istemp &&
isTempNamespace(RelationGetNamespace(relation)))   ereport(ERROR,            (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     errmsg("cannot inherit from temporary relation \"%s\"",                    parent->relname)));if
(!pg_class_ownercheck(RelationGetRelid(relation),GetUserId()))    aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
              RelationGetRelationName(relation));
 
/* If parent has OIDs then all children must have OIDs */if (relation->rd_rel->relhasoids && !rel->rd_rel->relhasoids)
 ereport(ERROR,            (errcode(ERRCODE_WRONG_OBJECT_TYPE),             errmsg("table \"%s\" without OIDs cannot
inheritfrom table \"%s\" with OIDs",                    RelationGetRelationName(rel), parent->relname)));
 
/* * Reject duplications in the list of parents. -- this is the same check as * when creating a table, but maybe we
shouldcheck for the parent anywhere * higher in the inheritance structure? */catalogRelation =
heap_open(InheritsRelationId,RowExclusiveLock);ScanKeyInit(&key,            Anum_pg_inherits_inhrelid,
BTEqualStrategyNumber,F_OIDEQ,            ObjectIdGetDatum(RelationGetRelid(rel)));scan =
systable_beginscan(catalogRelation,InheritsRelidSeqnoIndexId, true, SnapshotNow, 1, &key);while
(HeapTupleIsValid(inheritsTuple= systable_getnext(scan))){    Form_pg_inherits inh = (Form_pg_inherits)
GETSTRUCT(inheritsTuple);   if (inh->inhparent == RelationGetRelid(relation))        ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_TABLE),                errmsg("inherited relation \"%s\" duplicated",
parent->relname)));   if (inh->inhseqno > inhseqno)        inhseqno =
inh->inhseqno;}systable_endscan(scan);heap_close(catalogRelation,RowExclusiveLock);
 
/* Get children because we have to manually recurse and also because we * have to check for recursive inheritance
graphs*/
 
/* this routine is actually in the planner */children = find_all_inheritors(RelationGetRelid(rel));
if (list_member_oid(children, RelationGetRelid(relation)))    ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_TABLE),            errmsg("Circular inheritance structure found")));
 
foreach(child, children)    {        Oid            childrelid = lfirst_oid(child);        Relation    childrel;
        childrel = relation_open(childrelid, AccessExclusiveLock);        MergeAttributesIntoExisting(childrel,
relation);       relation_close(childrel, NoLock);    }catalogRelation = heap_open(InheritsRelationId,
RowExclusiveLock);StoreCatalogInheritance1(RelationGetRelid(rel),RelationGetRelid(relation), inhseqno+1,
catalogRelation);heap_close(catalogRelation,RowExclusiveLock);heap_close(relation, AccessShareLock);
 
}


static void
MergeAttributesIntoExisting(Relation rel, Relation relation) 
{Relation     attrdesc;AttrNumber    parent_attno, child_attno;TupleDesc    tupleDesc;TupleConstr *constr;HeapTuple
tuple;
child_attno = RelationGetNumberOfAttributes(rel);
tupleDesc = RelationGetDescr(relation);constr = tupleDesc->constr;
for (parent_attno = 1; parent_attno <= tupleDesc->natts;     parent_attno++){    Form_pg_attribute attribute =
tupleDesc->attrs[parent_attno- 1];    char       *attributeName = NameStr(attribute->attname);
 
    /* Ignore dropped columns in the parent. */    if (attribute->attisdropped)        continue;
    /* Does it conflict with an existing column? */    attrdesc = heap_open(AttributeRelationId, RowExclusiveLock);
    tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), attributeName);    if (HeapTupleIsValid(tuple)) {
/*        * Yes, try to merge the two column definitions. They must         * have the same type and typmod.         */
      Form_pg_attribute childatt = (Form_pg_attribute) GETSTRUCT(tuple);        ereport(NOTICE,
(errmsg("mergingcolumn \"%s\" with inherited definition",                        attributeName)));        if
(attribute->atttypid != childatt->atttypid ||            attribute->atttypmod != childatt->atttypmod ||
(attribute->attnotnull&& !childatt->attnotnull))            ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),                    errmsg("child table \"%s\" has different type for column
\"%s\"",                           RelationGetRelationName(rel), NameStr(attribute->attname))));
 
        childatt->attinhcount++;        simple_heap_update(attrdesc, &tuple->t_self, tuple);
CatalogUpdateIndexes(attrdesc,tuple); /* XXX strength reduce openindexes to outside loop? */
heap_freetuple(tuple);               /* XXX defaults */
 
    } else {        /*         * No, create a new inherited column         */                FormData_pg_attribute
attributeD;       HeapTuple attributeTuple = heap_addheader(Natts_pg_attribute,
        false,                                                  ATTRIBUTE_TUPLE_SIZE,
              (void *) &attributeD);        Form_pg_attribute childatt = (Form_pg_attribute)
GETSTRUCT(attributeTuple);
        if (attribute->attnotnull)            ereport(ERROR,                    (errcode(ERRCODE_DATATYPE_MISMATCH),
                errmsg("Cannot add new inherited NOT NULL column \"%s\"",
NameStr(attribute->attname))));
        childatt->attrelid = RelationGetRelid(rel);        namecpy(&childatt->attname, &attribute->attname);
childatt->atttypid= attribute->atttypid;        childatt->attstattarget = -1;        childatt->attlen =
attribute->attlen;       childatt->attcacheoff = -1;        childatt->atttypmod = attribute->atttypmod;
childatt->attnum= ++child_attno;        childatt->attbyval = attribute->attbyval;        childatt->attndims =
attribute->attndims;       childatt->attstorage = attribute->attstorage;        childatt->attalign =
attribute->attalign;       childatt->attnotnull = false;        childatt->atthasdef = false; /* XXX */
childatt->attisdropped= false;        childatt->attislocal = false;        childatt->attinhcount =
attribute->attinhcount+1;
        simple_heap_insert(attrdesc, attributeTuple);        CatalogUpdateIndexes(attrdesc, attributeTuple);
heap_freetuple(attributeTuple);
        /* XXX Defaults */
    }    heap_close(attrdesc, RowExclusiveLock);}
}

static void
ATExecDropInherits(Relation rel, RangeVar *parent)
{

Relation    catalogRelation;SysScanDesc scan;ScanKeyData key;HeapTuple    inheritsTuple, attributeTuple;Oid
inhparent;Oid           dropparent;int         found = 0;/* Get the OID of parent -- if no schema is specified use the
regular* search path and only drop the one table that's found. We could try to be * clever and look at each parent and
seeif it matches but that would be * inconsistent with other operations I think. */Assert(rel);Assert(parent);
 
dropparent = RangeVarGetRelid(parent, false);
/* Search through the direct parents of rel looking for dropparent oid */
catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);ScanKeyInit(&key,
Anum_pg_inherits_inhrelid,           BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(RelationGetRelid(rel)));scan= systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId, true,
SnapshotNow,1, &key);while (!found && HeapTupleIsValid(inheritsTuple = systable_getnext(scan))){    inhparent =
((Form_pg_inherits)GETSTRUCT(inheritsTuple))->inhparent;    if (inhparent == dropparent) {
simple_heap_delete(catalogRelation,&inheritsTuple->t_self);        found = 1;
}}systable_endscan(scan);heap_close(catalogRelation,RowExclusiveLock);
 

if (!found) {    /* would it be better to look up the actual schema of dropparent and     * make the error message
explicitlyname the qualified name it's     * trying to drop ?*/    if (parent->schemaname)        ereport(ERROR,
       (errcode(ERRCODE_UNDEFINED_TABLE),                 errmsg("relation \"%s.%s\" is not a parent of relation
\"%s\"",                       parent->schemaname, parent->relname, RelationGetRelationName(rel))));    else
ereport(ERROR,               (errcode(ERRCODE_UNDEFINED_TABLE),                 errmsg("relation \"%s\" is not a parent
ofrelation \"%s\"",                        parent->relname, RelationGetRelationName(rel))));}/* Search through columns
lookingfor matching columns from parent table */
 
catalogRelation = heap_open(AttributeRelationId, RowExclusiveLock);ScanKeyInit(&key,
Anum_pg_attribute_attrelid,           BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(RelationGetRelid(rel)));scan= systable_beginscan(catalogRelation, AttributeRelidNumIndexId, true,
SnapshotNow,1, &key);while (HeapTupleIsValid(attributeTuple = systable_getnext(scan))) {    Form_pg_attribute att =
((Form_pg_attribute)GETSTRUCT(attributeTuple));   /* Not an inherited column at all     * (do NOT use islocal for this
test--itcan be true for inherited columns)     */    if (att->attinhcount == 0)         continue;     if
(att->attisdropped)/* XXX Is this right? */        continue;    if (SearchSysCacheExistsAttName(dropparent,
NameStr(att->attname))){        /* Decrement inhcount and possibly set islocal to 1 */        HeapTuple copyTuple =
heap_copytuple(attributeTuple);       Form_pg_attribute copy_att = ((Form_pg_attribute)GETSTRUCT(copyTuple));
 
        copy_att->attinhcount--;        if (copy_att->attinhcount == 0)            copy_att->attislocal = 1;
        simple_heap_update(catalogRelation, ©Tuple->t_self, copyTuple);        /* XXX "Avoid using it for multiple
tuples,since opening the         * indexes and building the index info structures is moderately         * expensive."
Perhapsthis can be moved outside the loop or else         * at least the CatalogOpenIndexes/CatalogCloseIndexes moved
     * outside the loop but when I try that it seg faults?!*/        CatalogUpdateIndexes(catalogRelation, copyTuple);
     heap_freetuple(copyTuple);    }}systable_endscan(scan);heap_close(catalogRelation, RowExclusiveLock);
 
}




-- 
greg



pgsql-hackers by date:

Previous
From: "Jim C. Nasby"
Date:
Subject: Re: Compression and on-disk sorting
Next
From: Markus Schiltknecht
Date:
Subject: Re: Snowball and ispell in tsearch2