Re: ALTER TABLE ADD COLUMN fast default - Mailing list pgsql-hackers

From Tom Lane
Subject Re: ALTER TABLE ADD COLUMN fast default
Date
Msg-id 554470.1617639048@sss.pgh.pa.us
Whole thread Raw
In response to Re: ALTER TABLE ADD COLUMN fast default  (Andrew Dunstan <andrew@dunslane.net>)
List pgsql-hackers
Andrew Dunstan <andrew@dunslane.net> writes:
> On 4/4/21 6:50 PM, Tom Lane wrote:
>> Meh.  "pg_class.relchecks is inconsistent with the number of entries
>> in pg_constraint" does not seem to me like a scary enough situation
>> to justify a panic response.  Maybe there's an argument for failing
>> at the point where we'd need to actually apply the CHECK constraints
>> (similarly to what my patch is doing for missing defaults).
>> But preventing the user from, say, dumping the data in the table
>> seems to me to be making the situation worse not better.

> OK, fair argument.

Here's a v2 that applies the same principles to struct ConstrCheck
as AttrDefault, ie create only valid entries (no NULL strings), and
if there's not the right number, complain at point of use rather
than failing the relcache load.

There is a hole in this, which is that if pg_class.relchecks = 0
then we won't even look in pg_constraint, so we won't realize if
there is an inconsistency.  But that was true before, and I don't
want to expend an almost-always-useless catalog search to see if
relchecks = 0 is a lie.

I also tried to bring the related message texts up to something
approaching project standards, though I didn't convert them into
translatable ereports.

            regards, tom lane

diff --git a/src/backend/access/common/tupdesc.c b/src/backend/access/common/tupdesc.c
index cb76465050..ffb275afbe 100644
--- a/src/backend/access/common/tupdesc.c
+++ b/src/backend/access/common/tupdesc.c
@@ -174,10 +174,7 @@ CreateTupleDescCopyConstr(TupleDesc tupdesc)
             cpy->defval = (AttrDefault *) palloc(cpy->num_defval * sizeof(AttrDefault));
             memcpy(cpy->defval, constr->defval, cpy->num_defval * sizeof(AttrDefault));
             for (i = cpy->num_defval - 1; i >= 0; i--)
-            {
-                if (constr->defval[i].adbin)
-                    cpy->defval[i].adbin = pstrdup(constr->defval[i].adbin);
-            }
+                cpy->defval[i].adbin = pstrdup(constr->defval[i].adbin);
         }

         if (constr->missing)
@@ -203,10 +200,8 @@ CreateTupleDescCopyConstr(TupleDesc tupdesc)
             memcpy(cpy->check, constr->check, cpy->num_check * sizeof(ConstrCheck));
             for (i = cpy->num_check - 1; i >= 0; i--)
             {
-                if (constr->check[i].ccname)
-                    cpy->check[i].ccname = pstrdup(constr->check[i].ccname);
-                if (constr->check[i].ccbin)
-                    cpy->check[i].ccbin = pstrdup(constr->check[i].ccbin);
+                cpy->check[i].ccname = pstrdup(constr->check[i].ccname);
+                cpy->check[i].ccbin = pstrdup(constr->check[i].ccbin);
                 cpy->check[i].ccvalid = constr->check[i].ccvalid;
                 cpy->check[i].ccnoinherit = constr->check[i].ccnoinherit;
             }
@@ -328,10 +323,7 @@ FreeTupleDesc(TupleDesc tupdesc)
             AttrDefault *attrdef = tupdesc->constr->defval;

             for (i = tupdesc->constr->num_defval - 1; i >= 0; i--)
-            {
-                if (attrdef[i].adbin)
-                    pfree(attrdef[i].adbin);
-            }
+                pfree(attrdef[i].adbin);
             pfree(attrdef);
         }
         if (tupdesc->constr->missing)
@@ -352,10 +344,8 @@ FreeTupleDesc(TupleDesc tupdesc)

             for (i = tupdesc->constr->num_check - 1; i >= 0; i--)
             {
-                if (check[i].ccname)
-                    pfree(check[i].ccname);
-                if (check[i].ccbin)
-                    pfree(check[i].ccbin);
+                pfree(check[i].ccname);
+                pfree(check[i].ccbin);
             }
             pfree(check);
         }
@@ -412,7 +402,6 @@ bool
 equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2)
 {
     int            i,
-                j,
                 n;

     if (tupdesc1->natts != tupdesc2->natts)
@@ -488,22 +477,13 @@ equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2)
         n = constr1->num_defval;
         if (n != (int) constr2->num_defval)
             return false;
+        /* We assume here that both AttrDefault arrays are in adnum order */
         for (i = 0; i < n; i++)
         {
             AttrDefault *defval1 = constr1->defval + i;
-            AttrDefault *defval2 = constr2->defval;
-
-            /*
-             * We can't assume that the items are always read from the system
-             * catalogs in the same order; so use the adnum field to identify
-             * the matching item to compare.
-             */
-            for (j = 0; j < n; defval2++, j++)
-            {
-                if (defval1->adnum == defval2->adnum)
-                    break;
-            }
-            if (j >= n)
+            AttrDefault *defval2 = constr2->defval + i;
+
+            if (defval1->adnum != defval2->adnum)
                 return false;
             if (strcmp(defval1->adbin, defval2->adbin) != 0)
                 return false;
@@ -534,25 +514,21 @@ equalTupleDescs(TupleDesc tupdesc1, TupleDesc tupdesc2)
         n = constr1->num_check;
         if (n != (int) constr2->num_check)
             return false;
+
+        /*
+         * Similarly, we rely here on the ConstrCheck entries being sorted by
+         * name.  If there are duplicate names, the outcome of the comparison
+         * is uncertain, but that should not happen.
+         */
         for (i = 0; i < n; i++)
         {
             ConstrCheck *check1 = constr1->check + i;
-            ConstrCheck *check2 = constr2->check;
-
-            /*
-             * Similarly, don't assume that the checks are always read in the
-             * same order; match them up by name and contents. (The name
-             * *should* be unique, but...)
-             */
-            for (j = 0; j < n; check2++, j++)
-            {
-                if (strcmp(check1->ccname, check2->ccname) == 0 &&
-                    strcmp(check1->ccbin, check2->ccbin) == 0 &&
-                    check1->ccvalid == check2->ccvalid &&
-                    check1->ccnoinherit == check2->ccnoinherit)
-                    break;
-            }
-            if (j >= n)
+            ConstrCheck *check2 = constr2->check + i;
+
+            if (!(strcmp(check1->ccname, check2->ccname) == 0 &&
+                  strcmp(check1->ccbin, check2->ccbin) == 0 &&
+                  check1->ccvalid == check2->ccvalid &&
+                  check1->ccnoinherit == check2->ccnoinherit))
                 return false;
         }
     }
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 88a68a4697..87f9bdaef0 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -2501,21 +2501,24 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
             if (attribute->atthasdef)
             {
                 Node       *this_default = NULL;
-                AttrDefault *attrdef;
-                int            i;

                 /* Find default in constraint structure */
-                Assert(constr != NULL);
-                attrdef = constr->defval;
-                for (i = 0; i < constr->num_defval; i++)
+                if (constr != NULL)
                 {
-                    if (attrdef[i].adnum == parent_attno)
+                    AttrDefault *attrdef = constr->defval;
+
+                    for (int i = 0; i < constr->num_defval; i++)
                     {
-                        this_default = stringToNode(attrdef[i].adbin);
-                        break;
+                        if (attrdef[i].adnum == parent_attno)
+                        {
+                            this_default = stringToNode(attrdef[i].adbin);
+                            break;
+                        }
                     }
                 }
-                Assert(this_default != NULL);
+                if (this_default == NULL)
+                    elog(ERROR, "default expression not found for attribute %d of relation \"%s\"",
+                         parent_attno, RelationGetRelationName(relation));

                 /*
                  * If it's a GENERATED default, it might contain Vars that
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 163242f54e..9f86910a6b 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -1609,6 +1609,15 @@ ExecRelCheck(ResultRelInfo *resultRelInfo,
     MemoryContext oldContext;
     int            i;

+    /*
+     * CheckConstraintFetch let this pass with only a warning, but now we
+     * should fail rather than possibly failing to enforce an important
+     * constraint.
+     */
+    if (ncheck != rel->rd_rel->relchecks)
+        elog(ERROR, "%d pg_constraint record(s) missing for relation \"%s\"",
+             rel->rd_rel->relchecks - ncheck, RelationGetRelationName(rel));
+
     /*
      * If first time through for this result relation, build expression
      * nodetrees for rel's constraint expressions.  Keep them in the per-query
@@ -1862,7 +1871,7 @@ ExecConstraints(ResultRelInfo *resultRelInfo,
         }
     }

-    if (constr->num_check > 0)
+    if (rel->rd_rel->relchecks > 0)
     {
         const char *failed;

diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c
index b968c25dd6..9dd30370da 100644
--- a/src/backend/parser/parse_utilcmd.c
+++ b/src/backend/parser/parse_utilcmd.c
@@ -1239,8 +1239,6 @@ expandTableLikeClause(RangeVar *heapRel, TableLikeClause *table_like_clause)
          (CREATE_TABLE_LIKE_DEFAULTS | CREATE_TABLE_LIKE_GENERATED)) &&
         constr != NULL)
     {
-        AttrDefault *attrdef = constr->defval;
-
         for (parent_attno = 1; parent_attno <= tupleDesc->natts;
              parent_attno++)
         {
@@ -1264,6 +1262,7 @@ expandTableLikeClause(RangeVar *heapRel, TableLikeClause *table_like_clause)
                  (table_like_clause->options & CREATE_TABLE_LIKE_DEFAULTS)))
             {
                 Node       *this_default = NULL;
+                AttrDefault *attrdef = constr->defval;
                 AlterTableCmd *atsubcmd;
                 bool        found_whole_row;

@@ -1276,7 +1275,9 @@ expandTableLikeClause(RangeVar *heapRel, TableLikeClause *table_like_clause)
                         break;
                     }
                 }
-                Assert(this_default != NULL);
+                if (this_default == NULL)
+                    elog(ERROR, "default expression not found for attribute %d of relation \"%s\"",
+                         parent_attno, RelationGetRelationName(relation));

                 atsubcmd = makeNode(AlterTableCmd);
                 atsubcmd->subtype = AT_CookedColumnDefault;
diff --git a/src/backend/rewrite/rewriteHandler.c b/src/backend/rewrite/rewriteHandler.c
index 92661abae2..da78f02775 100644
--- a/src/backend/rewrite/rewriteHandler.c
+++ b/src/backend/rewrite/rewriteHandler.c
@@ -1228,25 +1228,28 @@ build_column_default(Relation rel, int attrno)
     }

     /*
-     * Scan to see if relation has a default for this column.
+     * If relation has a default for this column, fetch that expression.
      */
-    if (att_tup->atthasdef && rd_att->constr &&
-        rd_att->constr->num_defval > 0)
+    if (att_tup->atthasdef)
     {
-        AttrDefault *defval = rd_att->constr->defval;
-        int            ndef = rd_att->constr->num_defval;
-
-        while (--ndef >= 0)
+        if (rd_att->constr && rd_att->constr->num_defval > 0)
         {
-            if (attrno == defval[ndef].adnum)
+            AttrDefault *defval = rd_att->constr->defval;
+            int            ndef = rd_att->constr->num_defval;
+
+            while (--ndef >= 0)
             {
-                /*
-                 * Found it, convert string representation to node tree.
-                 */
-                expr = stringToNode(defval[ndef].adbin);
-                break;
+                if (attrno == defval[ndef].adnum)
+                {
+                    /* Found it, convert string representation to node tree. */
+                    expr = stringToNode(defval[ndef].adbin);
+                    break;
+                }
             }
         }
+        if (expr == NULL)
+            elog(ERROR, "default expression not found for attribute %d of relation \"%s\"",
+                 attrno, RelationGetRelationName(rel));
     }

     /*
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index ff7395c85b..29702d6eab 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -282,7 +282,8 @@ static void RelationInitPhysicalAddr(Relation relation);
 static void load_critical_index(Oid indexoid, Oid heapoid);
 static TupleDesc GetPgClassDescriptor(void);
 static TupleDesc GetPgIndexDescriptor(void);
-static void AttrDefaultFetch(Relation relation);
+static void AttrDefaultFetch(Relation relation, int ndef);
+static int    AttrDefaultCmp(const void *a, const void *b);
 static void CheckConstraintFetch(Relation relation);
 static int    CheckConstraintCmp(const void *a, const void *b);
 static void InitIndexAmRoutine(Relation relation);
@@ -503,7 +504,6 @@ RelationBuildTupleDesc(Relation relation)
     ScanKeyData skey[2];
     int            need;
     TupleConstr *constr;
-    AttrDefault *attrdef = NULL;
     AttrMissing *attrmiss = NULL;
     int            ndef = 0;

@@ -512,8 +512,8 @@ RelationBuildTupleDesc(Relation relation)
         relation->rd_rel->reltype ? relation->rd_rel->reltype : RECORDOID;
     relation->rd_att->tdtypmod = -1;    /* just to be sure */

-    constr = (TupleConstr *) MemoryContextAlloc(CacheMemoryContext,
-                                                sizeof(TupleConstr));
+    constr = (TupleConstr *) MemoryContextAllocZero(CacheMemoryContext,
+                                                    sizeof(TupleConstr));
     constr->has_not_null = false;
     constr->has_generated_stored = false;

@@ -557,10 +557,9 @@ RelationBuildTupleDesc(Relation relation)

         attnum = attp->attnum;
         if (attnum <= 0 || attnum > RelationGetNumberOfAttributes(relation))
-            elog(ERROR, "invalid attribute number %d for %s",
+            elog(ERROR, "invalid attribute number %d for relation \"%s\"",
                  attp->attnum, RelationGetRelationName(relation));

-
         memcpy(TupleDescAttr(relation->rd_att, attnum - 1),
                attp,
                ATTRIBUTE_FIXED_PART_SIZE);
@@ -570,22 +569,10 @@ RelationBuildTupleDesc(Relation relation)
             constr->has_not_null = true;
         if (attp->attgenerated == ATTRIBUTE_GENERATED_STORED)
             constr->has_generated_stored = true;
-
-        /* If the column has a default, fill it into the attrdef array */
         if (attp->atthasdef)
-        {
-            if (attrdef == NULL)
-                attrdef = (AttrDefault *)
-                    MemoryContextAllocZero(CacheMemoryContext,
-                                           RelationGetNumberOfAttributes(relation) *
-                                           sizeof(AttrDefault));
-            attrdef[ndef].adnum = attnum;
-            attrdef[ndef].adbin = NULL;
-
             ndef++;
-        }

-        /* Likewise for a missing value */
+        /* If the column has a "missing" value, put it in the attrmiss array */
         if (attp->atthasmissing)
         {
             Datum        missingval;
@@ -648,7 +635,7 @@ RelationBuildTupleDesc(Relation relation)
     table_close(pg_attribute_desc, AccessShareLock);

     if (need != 0)
-        elog(ERROR, "catalog is missing %d attribute(s) for relid %u",
+        elog(ERROR, "pg_attribute catalog is missing %d attribute(s) for relation OID %u",
              need, RelationGetRelid(relation));

     /*
@@ -680,33 +667,19 @@ RelationBuildTupleDesc(Relation relation)
         constr->has_generated_stored ||
         ndef > 0 ||
         attrmiss ||
-        relation->rd_rel->relchecks)
+        relation->rd_rel->relchecks > 0)
     {
         relation->rd_att->constr = constr;

         if (ndef > 0)            /* DEFAULTs */
-        {
-            if (ndef < RelationGetNumberOfAttributes(relation))
-                constr->defval = (AttrDefault *)
-                    repalloc(attrdef, ndef * sizeof(AttrDefault));
-            else
-                constr->defval = attrdef;
-            constr->num_defval = ndef;
-            AttrDefaultFetch(relation);
-        }
+            AttrDefaultFetch(relation, ndef);
         else
             constr->num_defval = 0;

         constr->missing = attrmiss;

         if (relation->rd_rel->relchecks > 0)    /* CHECKs */
-        {
-            constr->num_check = relation->rd_rel->relchecks;
-            constr->check = (ConstrCheck *)
-                MemoryContextAllocZero(CacheMemoryContext,
-                                       constr->num_check * sizeof(ConstrCheck));
             CheckConstraintFetch(relation);
-        }
         else
             constr->num_check = 0;
     }
@@ -4251,21 +4224,29 @@ GetPgIndexDescriptor(void)

 /*
  * Load any default attribute value definitions for the relation.
+ *
+ * ndef is the number of attributes that were marked atthasdef.
+ *
+ * Note: we don't make it a hard error to be missing some pg_attrdef records.
+ * We can limp along as long as nothing needs to use the default value.  Code
+ * that fails to find an expected AttrDefault record should throw an error.
  */
 static void
-AttrDefaultFetch(Relation relation)
+AttrDefaultFetch(Relation relation, int ndef)
 {
-    AttrDefault *attrdef = relation->rd_att->constr->defval;
-    int            ndef = relation->rd_att->constr->num_defval;
+    AttrDefault *attrdef;
     Relation    adrel;
     SysScanDesc adscan;
     ScanKeyData skey;
     HeapTuple    htup;
-    Datum        val;
-    bool        isnull;
-    int            found;
-    int            i;
+    int            found = 0;
+
+    /* Allocate array with room for as many entries as expected */
+    attrdef = (AttrDefault *)
+        MemoryContextAllocZero(CacheMemoryContext,
+                               ndef * sizeof(AttrDefault));

+    /* Search pg_attrdef for relevant entries */
     ScanKeyInit(&skey,
                 Anum_pg_attrdef_adrelid,
                 BTEqualStrategyNumber, F_OIDEQ,
@@ -4274,65 +4255,94 @@ AttrDefaultFetch(Relation relation)
     adrel = table_open(AttrDefaultRelationId, AccessShareLock);
     adscan = systable_beginscan(adrel, AttrDefaultIndexId, true,
                                 NULL, 1, &skey);
-    found = 0;

     while (HeapTupleIsValid(htup = systable_getnext(adscan)))
     {
         Form_pg_attrdef adform = (Form_pg_attrdef) GETSTRUCT(htup);
-        Form_pg_attribute attr = TupleDescAttr(relation->rd_att, adform->adnum - 1);
+        Datum        val;
+        bool        isnull;

-        for (i = 0; i < ndef; i++)
+        /* protect limited size of array */
+        if (found >= ndef)
         {
-            if (adform->adnum != attrdef[i].adnum)
-                continue;
-            if (attrdef[i].adbin != NULL)
-                elog(WARNING, "multiple attrdef records found for attr %s of rel %s",
-                     NameStr(attr->attname),
-                     RelationGetRelationName(relation));
-            else
-                found++;
-
-            val = fastgetattr(htup,
-                              Anum_pg_attrdef_adbin,
-                              adrel->rd_att, &isnull);
-            if (isnull)
-                elog(WARNING, "null adbin for attr %s of rel %s",
-                     NameStr(attr->attname),
-                     RelationGetRelationName(relation));
-            else
-            {
-                /* detoast and convert to cstring in caller's context */
-                char       *s = TextDatumGetCString(val);
-
-                attrdef[i].adbin = MemoryContextStrdup(CacheMemoryContext, s);
-                pfree(s);
-            }
+            elog(WARNING, "unexpected pg_attrdef record found for attribute %d of relation \"%s\"",
+                 adform->adnum, RelationGetRelationName(relation));
             break;
         }

-        if (i >= ndef)
-            elog(WARNING, "unexpected attrdef record found for attr %d of rel %s",
+        val = fastgetattr(htup,
+                          Anum_pg_attrdef_adbin,
+                          adrel->rd_att, &isnull);
+        if (isnull)
+            elog(WARNING, "null adbin for attribute %d of relation \"%s\"",
                  adform->adnum, RelationGetRelationName(relation));
+        else
+        {
+            /* detoast and convert to cstring in caller's context */
+            char       *s = TextDatumGetCString(val);
+
+            attrdef[found].adnum = adform->adnum;
+            attrdef[found].adbin = MemoryContextStrdup(CacheMemoryContext, s);
+            pfree(s);
+            found++;
+        }
     }

     systable_endscan(adscan);
     table_close(adrel, AccessShareLock);
+
+    if (found != ndef)
+        elog(WARNING, "%d pg_attrdef record(s) missing for relation \"%s\"",
+             ndef - found, RelationGetRelationName(relation));
+
+    /*
+     * Sort the AttrDefault entries by adnum, for the convenience of
+     * equalTupleDescs().  (Usually, they already will be in order, but this
+     * might not be so if systable_getnext isn't using an index.)
+     */
+    if (found > 1)
+        qsort(attrdef, found, sizeof(AttrDefault), AttrDefaultCmp);
+
+    /* Install array only after it's fully valid */
+    relation->rd_att->constr->defval = attrdef;
+    relation->rd_att->constr->num_defval = found;
+}
+
+/*
+ * qsort comparator to sort AttrDefault entries by adnum
+ */
+static int
+AttrDefaultCmp(const void *a, const void *b)
+{
+    const AttrDefault *ada = (const AttrDefault *) a;
+    const AttrDefault *adb = (const AttrDefault *) b;
+
+    return ada->adnum - adb->adnum;
 }

 /*
  * Load any check constraints for the relation.
+ *
+ * As with defaults, if we don't find the expected number of them, just warn
+ * here.  The executor should throw an error if an INSERT/UPDATE is attempted.
  */
 static void
 CheckConstraintFetch(Relation relation)
 {
-    ConstrCheck *check = relation->rd_att->constr->check;
-    int            ncheck = relation->rd_att->constr->num_check;
+    ConstrCheck *check;
+    int            ncheck = relation->rd_rel->relchecks;
     Relation    conrel;
     SysScanDesc conscan;
     ScanKeyData skey[1];
     HeapTuple    htup;
     int            found = 0;

+    /* Allocate array with room for as many entries as expected */
+    check = (ConstrCheck *)
+        MemoryContextAllocZero(CacheMemoryContext,
+                               ncheck * sizeof(ConstrCheck));
+
+    /* Search pg_constraint for relevant entries */
     ScanKeyInit(&skey[0],
                 Anum_pg_constraint_conrelid,
                 BTEqualStrategyNumber, F_OIDEQ,
@@ -4347,15 +4357,18 @@ CheckConstraintFetch(Relation relation)
         Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);
         Datum        val;
         bool        isnull;
-        char       *s;

         /* We want check constraints only */
         if (conform->contype != CONSTRAINT_CHECK)
             continue;

+        /* protect limited size of array */
         if (found >= ncheck)
-            elog(ERROR, "unexpected constraint record found for rel %s",
+        {
+            elog(WARNING, "unexpected pg_constraint record found for relation \"%s\"",
                  RelationGetRelationName(relation));
+            break;
+        }

         check[found].ccvalid = conform->convalidated;
         check[found].ccnoinherit = conform->connoinherit;
@@ -4367,27 +4380,36 @@ CheckConstraintFetch(Relation relation)
                           Anum_pg_constraint_conbin,
                           conrel->rd_att, &isnull);
         if (isnull)
-            elog(ERROR, "null conbin for rel %s",
+            elog(WARNING, "null conbin for relation \"%s\"",
                  RelationGetRelationName(relation));
+        else
+        {
+            /* detoast and convert to cstring in caller's context */
+            char       *s = TextDatumGetCString(val);

-        /* detoast and convert to cstring in caller's context */
-        s = TextDatumGetCString(val);
-        check[found].ccbin = MemoryContextStrdup(CacheMemoryContext, s);
-        pfree(s);
-
-        found++;
+            check[found].ccbin = MemoryContextStrdup(CacheMemoryContext, s);
+            pfree(s);
+            found++;
+        }
     }

     systable_endscan(conscan);
     table_close(conrel, AccessShareLock);

     if (found != ncheck)
-        elog(ERROR, "%d constraint record(s) missing for rel %s",
+        elog(WARNING, "%d pg_constraint record(s) missing for relation \"%s\"",
              ncheck - found, RelationGetRelationName(relation));

-    /* Sort the records so that CHECKs are applied in a deterministic order */
-    if (ncheck > 1)
-        qsort(check, ncheck, sizeof(ConstrCheck), CheckConstraintCmp);
+    /*
+     * Sort the records by name.  This ensures that CHECKs are applied in a
+     * deterministic order, and it also makes equalTupleDescs() faster.
+     */
+    if (found > 1)
+        qsort(check, found, sizeof(ConstrCheck), CheckConstraintCmp);
+
+    /* Install array only after it's fully valid */
+    relation->rd_att->constr->check = check;
+    relation->rd_att->constr->num_check = found;
 }

 /*

pgsql-hackers by date:

Previous
From: Alvaro Herrera
Date:
Subject: Re: Additional Chapter for Tutorial - arch-dev.sgml
Next
From: Tom Lane
Date:
Subject: Re: ALTER TABLE ADD COLUMN fast default