ALTER TABLE ADD/DROP INHERITS - Mailing list pgsql-patches

From Greg Stark
Subject ALTER TABLE ADD/DROP INHERITS
Date
Msg-id 87r720omsr.fsf@stark.xeocode.com
Whole thread Raw
Responses Re: ALTER TABLE ADD/DROP INHERITS  (Andrew Dunstan <andrew@dunslane.net>)
List pgsql-patches
As described on -hackers this is my work so far adding ADD/DROP INHERITS. It
implements the controversial "ALTER TABLE <table> ADD/DROP INHERITS <parent>"
syntax that requires making INHERITS a reserved keyword. I haven't seen a
clear consensus yet on what the best syntax to use here would be.

Also, it doesn't handle default column values yet.

Other than that I think it's complete. There are a number of things I'm not
completely certain I'm on the right track with though so it can certainly use
some more eyeballs on it.



Index: src/backend/commands/tablecmds.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/commands/tablecmds.c,v
retrieving revision 1.184
diff -u -p -c -r1.184 tablecmds.c
cvs diff: conflicting specifications of output style
*** src/backend/commands/tablecmds.c    10 May 2006 23:18:39 -0000    1.184
--- src/backend/commands/tablecmds.c    7 Jun 2006 18:09:56 -0000
*************** typedef struct NewColumnValue
*** 159,166 ****
--- 159,168 ----
  static void truncate_check_rel(Relation rel);
  static List *MergeAttributes(List *schema, List *supers, bool istemp,
                  List **supOids, List **supconstr, int *supOidCount);
+ static void MergeAttributesIntoExisting(Relation rel, Relation relation);
  static bool change_varattnos_of_a_node(Node *node, const AttrNumber *newattno);
  static void StoreCatalogInheritance(Oid relationId, List *supers);
+ static void StoreCatalogInheritance1(Oid relationId, Oid parentOid, int16 seqNumber, Relation catalogRelation);
  static int    findAttrByName(const char *attributeName, List *schema);
  static void setRelhassubclassInRelation(Oid relationId, bool relhassubclass);
  static bool needs_toast_table(Relation rel);
*************** static void ATPrepSetTableSpace(AlteredT
*** 246,251 ****
--- 248,255 ----
  static void ATExecSetTableSpace(Oid tableOid, Oid newTableSpace);
  static void ATExecEnableDisableTrigger(Relation rel, char *trigname,
                             bool enable, bool skip_system);
+ static void ATExecAddInherits(Relation rel, RangeVar *parent);
+ static void ATExecDropInherits(Relation rel, RangeVar *parent);
  static void copy_relation_data(Relation rel, SMgrRelation dst);
  static void update_ri_trigger_args(Oid relid,
                         const char *oldname,
*************** static void
*** 1156,1165 ****
  StoreCatalogInheritance(Oid relationId, List *supers)
  {
      Relation    relation;
-     TupleDesc    desc;
      int16        seqNumber;
      ListCell   *entry;
-     HeapTuple    tuple;

      /*
       * sanity checks
--- 1160,1167 ----
*************** StoreCatalogInheritance(Oid relationId,
*** 1179,1194 ****
       * anymore, there's no need to look for indirect ancestors.)
       */
      relation = heap_open(InheritsRelationId, RowExclusiveLock);
-     desc = RelationGetDescr(relation);

      seqNumber = 1;
      foreach(entry, supers)
      {
!         Oid            parentOid = lfirst_oid(entry);
          Datum        datum[Natts_pg_inherits];
          char        nullarr[Natts_pg_inherits];
          ObjectAddress childobject,
                      parentobject;

          datum[0] = ObjectIdGetDatum(relationId);        /* inhrel */
          datum[1] = ObjectIdGetDatum(parentOid); /* inhparent */
--- 1181,1206 ----
       * anymore, there's no need to look for indirect ancestors.)
       */
      relation = heap_open(InheritsRelationId, RowExclusiveLock);

      seqNumber = 1;
      foreach(entry, supers)
      {
!         StoreCatalogInheritance1(relationId, lfirst_oid(entry), seqNumber, relation);
!         seqNumber += 1;
!     }
!
!     heap_close(relation, RowExclusiveLock);
! }
!
! static void
! StoreCatalogInheritance1(Oid relationId, Oid parentOid, int16 seqNumber, Relation relation)
! {
          Datum        datum[Natts_pg_inherits];
          char        nullarr[Natts_pg_inherits];
          ObjectAddress childobject,
                      parentobject;
+         HeapTuple    tuple;
+         TupleDesc desc = RelationGetDescr(relation);

          datum[0] = ObjectIdGetDatum(relationId);        /* inhrel */
          datum[1] = ObjectIdGetDatum(parentOid); /* inhparent */
*************** StoreCatalogInheritance(Oid relationId,
*** 1222,1234 ****
           * Mark the parent as having subclasses.
           */
          setRelhassubclassInRelation(parentOid, true);

-         seqNumber += 1;
-     }

-     heap_close(relation, RowExclusiveLock);
  }

  /*
   * Look for an existing schema entry with the given name.
   *
--- 1234,1246 ----
           * Mark the parent as having subclasses.
           */
          setRelhassubclassInRelation(parentOid, true);
+


  }

+
+
  /*
   * Look for an existing schema entry with the given name.
   *
*************** ATPrepCmd(List **wqueue, Relation rel, A
*** 2053,2063 ****
--- 2065,2078 ----
          case AT_DisableTrig:    /* DISABLE TRIGGER variants */
          case AT_DisableTrigAll:
          case AT_DisableTrigUser:
+         case AT_AddInherits:
+         case AT_DropInherits:
              ATSimplePermissions(rel, false);
              /* These commands never recurse */
              /* No command-specific prep needed */
              pass = AT_PASS_MISC;
              break;
+
          default:                /* oops */
              elog(ERROR, "unrecognized alter table type: %d",
                   (int) cmd->subtype);
*************** ATExecCmd(AlteredTableInfo *tab, Relatio
*** 2233,2238 ****
--- 2248,2259 ----
          case AT_DisableTrigUser:        /* DISABLE TRIGGER USER */
              ATExecEnableDisableTrigger(rel, NULL, false, true);
              break;
+         case AT_DropInherits:
+             ATExecDropInherits(rel, cmd->parent);
+             break;
+         case AT_AddInherits:
+             ATExecAddInherits(rel, cmd->parent);
+             break;
          default:                /* oops */
              elog(ERROR, "unrecognized alter table type: %d",
                   (int) cmd->subtype);
*************** ATExecEnableDisableTrigger(Relation rel,
*** 5880,5885 ****
--- 5901,6198 ----
      EnableDisableTrigger(rel, trigname, enable, skip_system);
  }

+ static void
+ ATExecAddInherits(Relation rel, RangeVar *parent)
+ {
+     Relation relation, catalogRelation;
+     SysScanDesc scan;
+     ScanKeyData key;
+     HeapTuple inheritsTuple;
+     int4 inhseqno = 0;
+     ListCell   *child;
+     List       *children;
+
+     relation = heap_openrv(parent, AccessShareLock); /* XXX is this enough locking? */
+     if (relation->rd_rel->relkind != RELKIND_RELATION)
+         ereport(ERROR,
+                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                  errmsg("inherited relation \"%s\" is not a table",
+                         parent->relname)));
+
+
+     /* Permanent rels cannot inherit from temporary ones */
+     if (!rel->rd_istemp && isTempNamespace(RelationGetNamespace(relation)))
+         ereport(ERROR,
+                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                  errmsg("cannot inherit from temporary relation \"%s\"",
+                         parent->relname)));
+
+     if (!pg_class_ownercheck(RelationGetRelid(relation), GetUserId()))
+         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
+                        RelationGetRelationName(relation));
+
+     /* If parent has OIDs then all children must have OIDs */
+     if (relation->rd_rel->relhasoids && !rel->rd_rel->relhasoids)
+         ereport(ERROR,
+                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                  errmsg("table \"%s\" without OIDs cannot inherit from table \"%s\" with OIDs",
+                         RelationGetRelationName(rel), parent->relname)));
+
+     /*
+      * Reject duplications in the list of parents. -- this is the same check as
+      * when creating a table, but maybe we should check for the parent anywhere
+      * higher in the inheritance structure?
+      */
+     catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
+     ScanKeyInit(&key,
+                 Anum_pg_inherits_inhrelid,
+                 BTEqualStrategyNumber, F_OIDEQ,
+                 ObjectIdGetDatum(RelationGetRelid(rel)));
+     scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId, true, SnapshotNow, 1, &key);
+     while (HeapTupleIsValid(inheritsTuple = systable_getnext(scan)))
+     {
+         Form_pg_inherits inh = (Form_pg_inherits) GETSTRUCT(inheritsTuple);
+         if (inh->inhparent == RelationGetRelid(relation))
+             ereport(ERROR,
+                     (errcode(ERRCODE_DUPLICATE_TABLE),
+                      errmsg("inherited relation \"%s\" duplicated",
+                             parent->relname)));
+         if (inh->inhseqno > inhseqno)
+             inhseqno = inh->inhseqno;
+     }
+     systable_endscan(scan);
+     heap_close(catalogRelation, RowExclusiveLock);
+
+     /* Get children because we have to manually recurse and also because we
+      * have to check for recursive inheritance graphs */
+
+     /* this routine is actually in the planner */
+     children = find_all_inheritors(RelationGetRelid(rel));
+
+     if (list_member_oid(children, RelationGetRelid(relation)))
+         ereport(ERROR,
+                 (errcode(ERRCODE_DUPLICATE_TABLE),
+                  errmsg("Circular inheritance structure found")));
+
+     foreach(child, children)
+         {
+             Oid            childrelid = lfirst_oid(child);
+             Relation    childrel;
+
+             childrel = relation_open(childrelid, AccessExclusiveLock);
+             MergeAttributesIntoExisting(childrel, relation);
+             relation_close(childrel, NoLock);
+         }
+
+     catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
+     StoreCatalogInheritance1(RelationGetRelid(rel), RelationGetRelid(relation), inhseqno+1, catalogRelation);
+     heap_close(catalogRelation, RowExclusiveLock);
+
+     heap_close(relation, AccessShareLock);
+ }
+
+
+ static void
+ MergeAttributesIntoExisting(Relation rel, Relation relation)
+ {
+     Relation     attrdesc;
+     AttrNumber    parent_attno, child_attno;
+     TupleDesc    tupleDesc;
+     TupleConstr *constr;
+     HeapTuple     tuple;
+
+     child_attno = RelationGetNumberOfAttributes(rel);
+
+     tupleDesc = RelationGetDescr(relation);
+     constr = tupleDesc->constr;
+
+     for (parent_attno = 1; parent_attno <= tupleDesc->natts;
+          parent_attno++)
+     {
+         Form_pg_attribute attribute = tupleDesc->attrs[parent_attno - 1];
+         char       *attributeName = NameStr(attribute->attname);
+
+         /* Ignore dropped columns in the parent. */
+         if (attribute->attisdropped)
+             continue;
+
+         /* Does it conflict with an existing column? */
+         attrdesc = heap_open(AttributeRelationId, RowExclusiveLock);
+
+         tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), attributeName);
+         if (HeapTupleIsValid(tuple)) {
+             /*
+              * Yes, try to merge the two column definitions. They must
+              * have the same type and typmod.
+              */
+             Form_pg_attribute childatt = (Form_pg_attribute) GETSTRUCT(tuple);
+             ereport(NOTICE,
+                     (errmsg("merging column \"%s\" with inherited definition",
+                             attributeName)));
+             if (attribute->atttypid  != childatt->atttypid ||
+                 attribute->atttypmod != childatt->atttypmod ||
+                 (attribute->attnotnull && !childatt->attnotnull))
+                 ereport(ERROR,
+                         (errcode(ERRCODE_DATATYPE_MISMATCH),
+                          errmsg("child table \"%s\" has different type for column \"%s\"",
+                                 RelationGetRelationName(rel), NameStr(attribute->attname))));
+
+             childatt->attinhcount++;
+             simple_heap_update(attrdesc, &tuple->t_self, tuple);
+             CatalogUpdateIndexes(attrdesc, tuple); /* XXX strength reduce openindexes to outside loop? */
+             heap_freetuple(tuple);
+
+             /* XXX defaults */
+
+         } else {
+             /*
+              * No, create a new inherited column
+              */
+
+             FormData_pg_attribute attributeD;
+             HeapTuple attributeTuple = heap_addheader(Natts_pg_attribute,
+                                                       false,
+                                                       ATTRIBUTE_TUPLE_SIZE,
+                                                       (void *) &attributeD);
+             Form_pg_attribute childatt = (Form_pg_attribute) GETSTRUCT(attributeTuple);
+
+             if (attribute->attnotnull)
+                 ereport(ERROR,
+                         (errcode(ERRCODE_DATATYPE_MISMATCH),
+                          errmsg("Cannot add new inherited NOT NULL column \"%s\"",
+                                 NameStr(attribute->attname))));
+
+             childatt->attrelid = RelationGetRelid(rel);
+             namecpy(&childatt->attname, &attribute->attname);
+             childatt->atttypid = attribute->atttypid;
+             childatt->attstattarget = -1;
+             childatt->attlen = attribute->attlen;
+             childatt->attcacheoff = -1;
+             childatt->atttypmod = attribute->atttypmod;
+             childatt->attnum = ++child_attno;
+             childatt->attbyval = attribute->attbyval;
+             childatt->attndims = attribute->attndims;
+             childatt->attstorage = attribute->attstorage;
+             childatt->attalign = attribute->attalign;
+             childatt->attnotnull = false;
+             childatt->atthasdef = false; /* XXX */
+             childatt->attisdropped = false;
+             childatt->attislocal = false;
+             childatt->attinhcount = attribute->attinhcount+1;
+
+             simple_heap_insert(attrdesc, attributeTuple);
+             CatalogUpdateIndexes(attrdesc, attributeTuple);
+             heap_freetuple(attributeTuple);
+
+             /* XXX Defaults */
+
+         }
+         heap_close(attrdesc, RowExclusiveLock);
+     }
+
+ }
+
+ static void
+ ATExecDropInherits(Relation rel, RangeVar *parent)
+ {
+
+
+     Relation    catalogRelation;
+     SysScanDesc scan;
+     ScanKeyData key;
+     HeapTuple    inheritsTuple, attributeTuple;
+     Oid            inhparent;
+     Oid            dropparent;
+     int         found = 0;
+
+     /* Get the OID of parent -- if no schema is specified use the regular
+      * search path and only drop the one table that's found. We could try to be
+      * clever and look at each parent and see if it matches but that would be
+      * inconsistent with other operations I think. */
+
+     Assert(rel);
+     Assert(parent);
+
+     dropparent = RangeVarGetRelid(parent, false);
+
+     /* Search through the direct parents of rel looking for dropparent oid */
+
+     catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
+     ScanKeyInit(&key,
+                 Anum_pg_inherits_inhrelid,
+                 BTEqualStrategyNumber, F_OIDEQ,
+                 ObjectIdGetDatum(RelationGetRelid(rel)));
+     scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId, true, SnapshotNow, 1, &key);
+     while (!found && HeapTupleIsValid(inheritsTuple = systable_getnext(scan)))
+     {
+         inhparent = ((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhparent;
+         if (inhparent == dropparent) {
+             simple_heap_delete(catalogRelation, &inheritsTuple->t_self);
+             found = 1;
+         }
+     }
+     systable_endscan(scan);
+     heap_close(catalogRelation, RowExclusiveLock);
+
+
+     if (!found) {
+         /* would it be better to look up the actual schema of dropparent and
+          * make the error message explicitly name the qualified name it's
+          * trying to drop ?*/
+         if (parent->schemaname)
+             ereport(ERROR,
+                     (errcode(ERRCODE_UNDEFINED_TABLE),
+                      errmsg("relation \"%s.%s\" is not a parent of relation \"%s\"",
+                             parent->schemaname, parent->relname, RelationGetRelationName(rel))));
+         else
+             ereport(ERROR,
+                     (errcode(ERRCODE_UNDEFINED_TABLE),
+                      errmsg("relation \"%s\" is not a parent of relation \"%s\"",
+                             parent->relname, RelationGetRelationName(rel))));
+     }
+
+     /* Search through columns looking for matching columns from parent table */
+
+     catalogRelation = heap_open(AttributeRelationId, RowExclusiveLock);
+     ScanKeyInit(&key,
+                 Anum_pg_attribute_attrelid,
+                 BTEqualStrategyNumber, F_OIDEQ,
+                 ObjectIdGetDatum(RelationGetRelid(rel)));
+     scan = systable_beginscan(catalogRelation, AttributeRelidNumIndexId, true, SnapshotNow, 1, &key);
+     while (HeapTupleIsValid(attributeTuple = systable_getnext(scan))) {
+         Form_pg_attribute att = ((Form_pg_attribute)GETSTRUCT(attributeTuple));
+         /* Not an inherited column at all
+          * (do NOT use islocal for this test--it can be true for inherited columns)
+          */
+         if (att->attinhcount == 0)
+             continue;
+         if (att->attisdropped) /* XXX Is this right? */
+             continue;
+         if (SearchSysCacheExistsAttName(dropparent, NameStr(att->attname))) {
+             /* Decrement inhcount and possibly set islocal to 1 */
+             HeapTuple copyTuple = heap_copytuple(attributeTuple);
+             Form_pg_attribute copy_att = ((Form_pg_attribute)GETSTRUCT(copyTuple));
+
+             copy_att->attinhcount--;
+             if (copy_att->attinhcount == 0)
+                 copy_att->attislocal = 1;
+
+             simple_heap_update(catalogRelation, ©Tuple->t_self, copyTuple);
+             /* XXX "Avoid using it for multiple tuples, since opening the
+              * indexes and building the index info structures is moderately
+              * expensive." Perhaps this can be moved outside the loop or else
+              * at least the CatalogOpenIndexes/CatalogCloseIndexes moved
+              * outside the loop but when I try that it seg faults?!*/
+             CatalogUpdateIndexes(catalogRelation, copyTuple);
+             heap_freetuple(copyTuple);
+         }
+     }
+     systable_endscan(scan);
+     heap_close(catalogRelation, RowExclusiveLock);
+ }
+
+
+
  /*
   * ALTER TABLE CREATE TOAST TABLE
   *
Index: src/backend/nodes/copyfuncs.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/nodes/copyfuncs.c,v
retrieving revision 1.335
diff -u -p -c -r1.335 copyfuncs.c
cvs diff: conflicting specifications of output style
*** src/backend/nodes/copyfuncs.c    30 Apr 2006 18:30:38 -0000    1.335
--- src/backend/nodes/copyfuncs.c    7 Jun 2006 18:09:56 -0000
*************** _copyAlterTableCmd(AlterTableCmd *from)
*** 1799,1804 ****
--- 1799,1805 ----
      COPY_SCALAR_FIELD(subtype);
      COPY_STRING_FIELD(name);
      COPY_NODE_FIELD(def);
+     COPY_NODE_FIELD(parent);
      COPY_NODE_FIELD(transform);
      COPY_SCALAR_FIELD(behavior);

Index: src/backend/parser/gram.y
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/parser/gram.y,v
retrieving revision 2.545
diff -u -p -c -r2.545 gram.y
cvs diff: conflicting specifications of output style
*** src/backend/parser/gram.y    27 May 2006 17:38:45 -0000    2.545
--- src/backend/parser/gram.y    7 Jun 2006 18:09:57 -0000
*************** alter_table_cmd:
*** 1514,1519 ****
--- 1514,1535 ----
                      n->subtype = AT_DisableTrigUser;
                      $$ = (Node *)n;
                  }
+             /* ALTER TABLE <name> ADD INHERITS <parent> */
+             | ADD_P INHERITS qualified_name
+                 {
+                     AlterTableCmd *n = makeNode(AlterTableCmd);
+                     n->subtype = AT_AddInherits;
+                     n->parent = $3;
+                     $$ = (Node *)n;
+                 }
+             /* ALTER TABLE <name> DROP INHERITS <parent> */
+             | DROP INHERITS qualified_name
+                 {
+                     AlterTableCmd *n = makeNode(AlterTableCmd);
+                     n->subtype = AT_DropInherits;
+                     n->parent = $3;
+                     $$ = (Node *)n;
+                 }
              | alter_rel_cmd
                  {
                      $$ = $1;
*************** unreserved_keyword:
*** 8422,8428 ****
              | INCREMENT
              | INDEX
              | INHERIT
-             | INHERITS
              | INPUT_P
              | INSENSITIVE
              | INSERT
--- 8438,8443 ----
*************** func_name_keyword:
*** 8640,8645 ****
--- 8655,8661 ----
   */
  reserved_keyword:
                ALL
+             | INHERITS
              | ANALYSE
              | ANALYZE
              | AND
Index: src/include/nodes/parsenodes.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/include/nodes/parsenodes.h,v
retrieving revision 1.310
diff -u -p -c -r1.310 parsenodes.h
cvs diff: conflicting specifications of output style
*** src/include/nodes/parsenodes.h    30 Apr 2006 18:30:40 -0000    1.310
--- src/include/nodes/parsenodes.h    7 Jun 2006 18:10:00 -0000
*************** typedef enum AlterTableType
*** 874,880 ****
      AT_EnableTrigAll,            /* ENABLE TRIGGER ALL */
      AT_DisableTrigAll,            /* DISABLE TRIGGER ALL */
      AT_EnableTrigUser,            /* ENABLE TRIGGER USER */
!     AT_DisableTrigUser            /* DISABLE TRIGGER USER */
  } AlterTableType;

  typedef struct AlterTableCmd    /* one subcommand of an ALTER TABLE */
--- 874,882 ----
      AT_EnableTrigAll,            /* ENABLE TRIGGER ALL */
      AT_DisableTrigAll,            /* DISABLE TRIGGER ALL */
      AT_EnableTrigUser,            /* ENABLE TRIGGER USER */
!     AT_DisableTrigUser,            /* DISABLE TRIGGER USER */
!     AT_AddInherits,                /* ADD INHERITS parent */
!     AT_DropInherits                /* DROP INHERITS parent */
  } AlterTableType;

  typedef struct AlterTableCmd    /* one subcommand of an ALTER TABLE */
*************** typedef struct AlterTableCmd    /* one subc
*** 883,888 ****
--- 885,891 ----
      AlterTableType subtype;        /* Type of table alteration to apply */
      char       *name;            /* column, constraint, or trigger to act on,
                                   * or new owner or tablespace */
+     RangeVar   *parent;            /* Parent table for add/drop inherits */
      Node       *def;            /* definition of new column, column type,
                                   * index, or constraint */
      Node       *transform;        /* transformation expr for ALTER TYPE */


--
greg

pgsql-patches by date:

Previous
From: "Jim C. Nasby"
Date:
Subject: Re: remove lock protection on HeapTupleSatisfiesVacuum
Next
From: Andrew Dunstan
Date:
Subject: Re: ALTER TABLE ADD/DROP INHERITS