Re: [PATCH] Support for foreign keys with arrays - Mailing list pgsql-hackers
From | Noah Misch |
---|---|
Subject | Re: [PATCH] Support for foreign keys with arrays |
Date | |
Msg-id | 20120121204223.GA2388@tornado.leadboat.com Whole thread Raw |
In response to | Re: [PATCH] Support for foreign keys with arrays (Marco Nenciarini <marco.nenciarini@2ndQuadrant.it>) |
Responses |
Re: [PATCH] Support for foreign keys with arrays
Re: [PATCH] Support for foreign keys with arrays |
List | pgsql-hackers |
On Sat, Jan 14, 2012 at 08:18:48PM +0100, Marco Nenciarini wrote: > This is our latest version of the patch. Gabriele, Gianni and I have > discussed a lot and decided to send an initial patch which uses EACH > REFERENCES instead of ARRAY REFERENCES. The reason behind this is that > ARRAY REFERENCES generates an ambiguous grammar, and we all agreed that > EACH REFERENCES makes sense (and the same time does not introduce any > new keyword). This is however open for discussion. I greatly like that name; it would still make sense for other aggregate types, should we ever expand its use. Please complete the name change: the documentation, catalog entries, etc should all call them something like "each foreign key constraints" (I don't particularly like that exact wording). You currently forbid multi-column EACH FKs. I agree that we should allow only one array column per FK; with more, the set of required PK rows would be something like the Cartesian product of the elements of array columns. However, there are no definitional problems, at least for NO ACTION, around a FK constraint having one array column and N scalar columns. Whether or not you implement that now, let's choose a table_constraint syntax leaving that opportunity open. How about:FOREIGN KEY(col_a, EACH col_b, col_c) REFERENCES pktable (a, b, c) You've identified that we cannot generally implement the ON DELETE ARRAY CASCADE action on multidimensional arrays. This patch chooses to downgrade to ON DELETE ARRAY SET NULL in that case. My initial reaction is that it would be better to forbid multidimensional arrays in the column when the delete action is ON DELETE ARRAY SET NULL. That's not satisfying, either, because it makes the definition of conforming data depend on the ON DELETE action. Do we have other options? > --------------- --------- --------- > | ON | ON | > Action | DELETE | UPDATE | > --------------- --------- --------- > CASCADE | Row | Error | > SET NULL | Row | Row | > SET DEFAULT | Row | Row | > ARRAY CASCADE | Element | Element | > ARRAY SET NULL | Element | Element | > NO ACTION | - | - | > RESTRICT | - | - | > --------------- --------- --------- To complete the ARRAY -> EACH transition, I would suggest names like CASCADE EACH/SET EACH NULL. I like the extensive test cases you have included. There's one more thing they should do: leave tables having EACH REFERENCES relationships in the regression database. This way, pg_dump tests of the regression database will exercise pg_dump with respect to this feature. The patch emits several warnings: heap.c: In function `StoreRelCheck': heap.c:1947: warning: passing argument 17 of `CreateConstraintEntry' makes integer from pointer without a cast index.c: In function `index_constraint_create': index.c:1160: warning: passing argument 17 of `CreateConstraintEntry' makes integer from pointer without a cast In file included from gram.y:13051: scan.c: In function `yy_try_NUL_trans': scan.c:16243: warning: unused variable `yyg' trigger.c: In function `CreateTrigger': trigger.c:454: warning: passing argument 17 of `CreateConstraintEntry' makes integer from pointer without a cast typecmds.c: In function `domainAddConstraint': typecmds.c:2960: warning: passing argument 17 of `CreateConstraintEntry' makes integer from pointer without a cast arrayfuncs.c: In function `array_remove': arrayfuncs.c:5197: warning: unused variable `dimresult' ri_triggers.c: In function `RI_FKey_check': ri_triggers.c:484: warning: too many arguments for format This test case, copied from my previous review except for updating the syntax, still fails: BEGIN; CREATE TABLE parent (c int PRIMARY KEY); CREATE TABLE child (c int[]); INSERT INTO parent VALUES (1); INSERT INTO child VALUES ('{3,1,2}'); ALTER TABLE child ADD FOREIGN KEY (c) EACH REFERENCES parent; -- should error INSERT INTO child VALUES ('{3,1,2}'); -- does error, as expected ROLLBACK; Most of my code comments concern minor matters: > *** a/doc/src/sgml/ddl.sgml > --- b/doc/src/sgml/ddl.sgml > *************** CREATE TABLE order_items ( > *** 852,857 **** > --- 882,931 ---- > </para> > > <para> > + When working with foreign key arrays, you have two more > + options that can be used with your > + <literal>EACH REFERENCES</literal> definition: > + <literal>ARRAY CASCADE</literal> and > + <literal>ARRAY SET NULL</literal>. Depending on > + the triggering action (<command>DELETE</command> or > + <command>UPDATE</command>) on the referenced table, > + every element in the referencing array will be either > + deleted/updated or set to NULL. > + For more detailed information on foreign key arrays > + options and special cases, please refer to the > + documentation for <xref linkend="sql-createtable">. > + </para> > + > + <para> > + For instance, in the example below, a <command>DELETE</command> > + from the <literal>countries</literal> table will remove > + the referencing elements in the <literal>citizenship_ids</literal> > + array. > + > + <programlisting> > + CREATE TABLE countries ( > + country_id integer PRIMARY KEY, > + name text, > + ... > + ); > + > + CREATE TABLE people ( > + person_id integer PRIMARY KEY, > + first_name text, > + last_name text, > + ... > + citizenship_ids integer[] <emphasis>EACH REFERENCES countries > + ON DELETE ARRAY CASCADE ON UPDATE ARRAY CASCADE</emphasis> > + ); > + </programlisting> > + > + Consequently, an <command>UPDATE</command> of > + the <literal>country_id</literal> column will be propagated > + to any element of the <literal>citizenship_ids</literal> > + field in the <literal>people</literal> table. > + </para> > + > + <para> I would leave off this second example. > *************** SELECT NULLIF(value, '(none)') ... > *** 10452,10457 **** > --- 10480,10490 ---- > </note> > > <para> > + When using <function>array_remove</function> with multi-dimensional > + arrays, elements will be set to NULL as fallback measure. > + </para> If we do keep this behavior, I would give this function a less-natural name and not document it. > *** a/doc/src/sgml/ref/create_table.sgml > --- b/doc/src/sgml/ref/create_table.sgml > *************** CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY > *** 51,57 **** > DEFAULT <replaceable>default_expr</replaceable> | > UNIQUE <replaceable class="PARAMETER">index_parameters</replaceable> | > PRIMARY KEY <replaceable class="PARAMETER">index_parameters</replaceable> | > ! REFERENCES <replaceable class="PARAMETER">reftable</replaceable> [ ( <replaceable class="PARAMETER">refcolumn</replaceable>) ] [ MATCH FULL | MATCH PARTIAL | MATCH SIMPLE ] > [ ON DELETE <replaceable class="parameter">action</replaceable> ] [ ON UPDATE <replaceable class="parameter">action</replaceable>] } > [ DEFERRABLE | NOT DEFERRABLE ] [ INITIALLY DEFERRED | INITIALLY IMMEDIATE ] > > --- 51,57 ---- > DEFAULT <replaceable>default_expr</replaceable> | > UNIQUE <replaceable class="PARAMETER">index_parameters</replaceable> | > PRIMARY KEY <replaceable class="PARAMETER">index_parameters</replaceable> | > ! {EACH} REFERENCES <replaceable class="PARAMETER">reftable</replaceable> [ ( <replaceable class="PARAMETER">refcolumn</replaceable>) ] [ MATCH FULL | MATCH PARTIAL | MATCH SIMPLE ] > [ ON DELETE <replaceable class="parameter">action</replaceable> ] [ ON UPDATE <replaceable class="parameter">action</replaceable>] } > [ DEFERRABLE | NOT DEFERRABLE ] [ INITIALLY DEFERRED | INITIALLY IMMEDIATE ] Use square brackets, not curly brackets, around optional terms. > > *************** CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY > *** 62,68 **** > UNIQUE ( <replaceable class="PARAMETER">column_name</replaceable> [, ... ] ) <replaceable class="PARAMETER">index_parameters</replaceable>| > PRIMARY KEY ( <replaceable class="PARAMETER">column_name</replaceable> [, ... ] ) <replaceable class="PARAMETER">index_parameters</replaceable>| > EXCLUDE [ USING <replaceable class="parameter">index_method</replaceable> ] ( <replaceable class="parameter">exclude_element</replaceable>WITH <replaceable class="parameter">operator</replaceable> [, ... ] ) <replaceableclass="parameter">index_parameters</replaceable> [ WHERE ( <replaceable class="parameter">predicate</replaceable>) ] | > ! FOREIGN KEY ( <replaceable class="PARAMETER">column_name</replaceable> [, ... ] ) REFERENCES <replaceable class="PARAMETER">reftable</replaceable>[ ( <replaceable class="PARAMETER">refcolumn</replaceable> [, ... ] ) ] > [ MATCH FULL | MATCH PARTIAL | MATCH SIMPLE ] [ ON DELETE <replaceable class="parameter">action</replaceable> ] [ON UPDATE <replaceable class="parameter">action</replaceable> ] } > [ DEFERRABLE | NOT DEFERRABLE ] [ INITIALLY DEFERRED | INITIALLY IMMEDIATE ] > > --- 62,68 ---- > UNIQUE ( <replaceable class="PARAMETER">column_name</replaceable> [, ... ] ) <replaceable class="PARAMETER">index_parameters</replaceable>| > PRIMARY KEY ( <replaceable class="PARAMETER">column_name</replaceable> [, ... ] ) <replaceable class="PARAMETER">index_parameters</replaceable>| > EXCLUDE [ USING <replaceable class="parameter">index_method</replaceable> ] ( <replaceable class="parameter">exclude_element</replaceable>WITH <replaceable class="parameter">operator</replaceable> [, ... ] ) <replaceableclass="parameter">index_parameters</replaceable> [ WHERE ( <replaceable class="parameter">predicate</replaceable>) ] | > ! FOREIGN KEY ( <replaceable class="PARAMETER">column_name</replaceable> [, ... ] ) {EACH} REFERENCES <replaceable class="PARAMETER">reftable</replaceable>[ ( <replaceable class="PARAMETER">refcolumn</replaceable> [, ... ] ) ] Likewise. > *** a/src/backend/catalog/information_schema.sql > --- b/src/backend/catalog/information_schema.sql > *************** CREATE VIEW referential_constraints AS > *** 1173,1183 **** > > CAST( > CASE con.confdeltype WHEN 'c' THEN 'CASCADE' > WHEN 'n' THEN 'SET NULL' > WHEN 'd' THEN 'SET DEFAULT' > WHEN 'r' THEN 'RESTRICT' > WHEN 'a' THEN 'NO ACTION' END > ! AS character_data) AS delete_rule > > FROM (pg_namespace ncon > INNER JOIN pg_constraint con ON ncon.oid = con.connamespace > --- 1175,1189 ---- > > CAST( > CASE con.confdeltype WHEN 'c' THEN 'CASCADE' > + WHEN 'C' THEN 'ARRAY CASCADE' > WHEN 'n' THEN 'SET NULL' > + WHEN 'N' THEN 'ARRAY SET NULL' > WHEN 'd' THEN 'SET DEFAULT' > WHEN 'r' THEN 'RESTRICT' > WHEN 'a' THEN 'NO ACTION' END > ! AS character_data) AS delete_rule, > ! > ! CAST(con.confisarray AS boolean) AS is_array No need for that cast. > *** a/src/backend/commands/tablecmds.c > --- b/src/backend/commands/tablecmds.c > *************** ATAddForeignKeyConstraint(AlteredTableIn > *** 5688,5693 **** > --- 5689,5728 ---- > (errcode(ERRCODE_INVALID_FOREIGN_KEY), > errmsg("number of referencing and referenced columns for foreign key disagree"))); > > + /* Enforce foreign key array restrictions */ > + if (fkconstraint->fk_array) > + { > + /* > + * Foreign key array must not be part of a multi-column foreign key > + */ > + if (numpks > 1) > + ereport(ERROR, > + (errcode(ERRCODE_INVALID_FOREIGN_KEY), > + errmsg("foreign key arrays must not be part of a multi-column foreign key"))); > + > + /* > + * ON UPDATE CASCADE action is not supported on FKA > + */ > + if (fkconstraint->fk_upd_action == FKCONSTR_ACTION_CASCADE) > + ereport(ERROR, > + (errcode(ERRCODE_INVALID_FOREIGN_KEY), > + errmsg("ON UPDATE CASCADE action is not supported on foreign key arrays"))); Add a HINT about using ARRAY CASCADE. > *************** ATAddForeignKeyConstraint(AlteredTableIn > *** 5736,5775 **** > eqstrategy, opcintype, opcintype, opfamily); > > /* > ! * Are there equality operators that take exactly the FK type? Assume > ! * we should look through any domain here. > */ > ! fktyped = getBaseType(fktype); > > ! pfeqop = get_opfamily_member(opfamily, opcintype, fktyped, > ! eqstrategy); > ! if (OidIsValid(pfeqop)) > ! ffeqop = get_opfamily_member(opfamily, fktyped, fktyped, > eqstrategy); > - else > - ffeqop = InvalidOid; /* keep compiler quiet */ > > ! if (!(OidIsValid(pfeqop) && OidIsValid(ffeqop))) > { > /* > ! * Otherwise, look for an implicit cast from the FK type to the > ! * opcintype, and if found, use the primary equality operator. > ! * This is a bit tricky because opcintype might be a polymorphic > ! * type such as ANYARRAY or ANYENUM; so what we have to test is > ! * whether the two actual column types can be concurrently cast to > ! * that type. (Otherwise, we'd fail to reject combinations such > ! * as int[] and point[].) > */ > ! Oid input_typeids[2]; > ! Oid target_typeids[2]; > > ! input_typeids[0] = pktype; > ! input_typeids[1] = fktype; > ! target_typeids[0] = opcintype; > ! target_typeids[1] = opcintype; > ! if (can_coerce_type(2, input_typeids, target_typeids, > ! COERCION_IMPLICIT)) > ! pfeqop = ffeqop = ppeqop; > } > > if (!(OidIsValid(pfeqop) && OidIsValid(ffeqop))) > --- 5771,5861 ---- > eqstrategy, opcintype, opcintype, opfamily); > > /* > ! * Discover the equality operators > */ > ! if (fkconstraint->fk_array) > ! { > ! /* > ! * Are there equality operators that take exactly the FK element type? > ! * Assume we should look through any domain here. > ! */ > ! Oid fk_element_type=get_base_element_type(fktype); > ! if (!OidIsValid(fk_element_type)) > ! ereport(ERROR, > ! (errcode(ERRCODE_DATATYPE_MISMATCH), > ! errmsg("foreign key constraint \"%s\" " > ! "cannot be implemented", > ! fkconstraint->conname), > ! errdetail("Key columns \"%s\" is not an array type: %s", > ! strVal(list_nth(fkconstraint->fk_attrs, i)), > ! format_type_be(fktype)))); Use a detail message like "Type of key column "%s" is not an array type: %s". > > ! ffeqop = ARRAY_EQ_OP; > ! > ! pfeqop = get_opfamily_member(opfamily, opcintype, fk_element_type, > eqstrategy); > > ! if (!(OidIsValid(pfeqop))) > ! { > ! /* > ! * Otherwise, look for an implicit cast from the FK type to the > ! * opcintype, and if found, use the primary equality operator. > ! * This is a bit tricky because opcintype might be a polymorphic > ! * type such as ANYARRAY or ANYENUM; so what we have to test is > ! * whether the two actual column types can be concurrently cast to > ! * that type. (Otherwise, we'd fail to reject combinations such > ! * as int[] and point[].) > ! */ > ! Oid input_typeids[2]; > ! Oid target_typeids[2]; > ! > ! input_typeids[0] = pktype; > ! input_typeids[1] = fk_element_type; > ! target_typeids[0] = opcintype; > ! target_typeids[1] = opcintype; > ! if (can_coerce_type(2, input_typeids, target_typeids, > ! COERCION_IMPLICIT)) > ! pfeqop = ppeqop; > ! } > ! } > ! else > { > /* > ! * Are there equality operators that take exactly the FK type? Assume > ! * we should look through any domain here. > */ > ! fktyped = getBaseType(fktype); > > ! pfeqop = get_opfamily_member(opfamily, opcintype, fktyped, > ! eqstrategy); > ! if (OidIsValid(pfeqop)) > ! ffeqop = get_opfamily_member(opfamily, fktyped, fktyped, > ! eqstrategy); > ! else > ! ffeqop = InvalidOid; /* keep compiler quiet */ > ! > ! if (!(OidIsValid(pfeqop) && OidIsValid(ffeqop))) > ! { > ! /* > ! * Otherwise, look for an implicit cast from the FK type to the > ! * opcintype, and if found, use the primary equality operator. > ! * This is a bit tricky because opcintype might be a polymorphic > ! * type such as ANYARRAY or ANYENUM; so what we have to test is > ! * whether the two actual column types can be concurrently cast to > ! * that type. (Otherwise, we'd fail to reject combinations such > ! * as int[] and point[].) > ! */ > ! Oid input_typeids[2]; > ! Oid target_typeids[2]; > ! > ! input_typeids[0] = pktype; > ! input_typeids[1] = fktype; > ! target_typeids[0] = opcintype; > ! target_typeids[1] = opcintype; > ! if (can_coerce_type(2, input_typeids, target_typeids, > ! COERCION_IMPLICIT)) > ! pfeqop = ffeqop = ppeqop; > ! } > } Please reduce the level of textual code duplication here. > *** a/src/backend/commands/trigger.c > --- b/src/backend/commands/trigger.c > *************** ConvertTriggerToFK(CreateTrigStmt *stmt, > *** 861,876 **** > --- 862,881 ---- > switch (funcoid) > { > case F_RI_FKEY_CASCADE_UPD: > + case F_RI_FKEY_ARRCASCADE_UPD: > case F_RI_FKEY_RESTRICT_UPD: > case F_RI_FKEY_SETNULL_UPD: > + case F_RI_FKEY_ARRSETNULL_UPD: > case F_RI_FKEY_SETDEFAULT_UPD: > case F_RI_FKEY_NOACTION_UPD: > funcnum = 0; > break; > > case F_RI_FKEY_CASCADE_DEL: > + case F_RI_FKEY_ARRCASCADE_DEL: > case F_RI_FKEY_RESTRICT_DEL: > case F_RI_FKEY_SETNULL_DEL: > + case F_RI_FKEY_ARRSETNULL_DEL: > case F_RI_FKEY_SETDEFAULT_DEL: > case F_RI_FKEY_NOACTION_DEL: > funcnum = 1; We don't need to support these clauses in ConvertTriggerToFK(); no ancient dumps will bear them. A comment would be enough. On the other hand, your changes here are simple enough. Maybe it's better to add the support as you have than to explain why it's absent. > *** a/src/backend/utils/adt/arrayfuncs.c > --- b/src/backend/utils/adt/arrayfuncs.c > *************** array_unnest(PG_FUNCTION_ARGS) > *** 5174,5176 **** > --- 5174,5599 ---- > SRF_RETURN_DONE(funcctx); > } > } > + > + /* > + * Remove any occurrence of an element from an array > + * > + * If used on a multi-dimensional array the matching elements will be replaced with NULLs as fallback. > + * > + */ > + Datum > + array_remove(PG_FUNCTION_ARGS) > + { > + ArrayType *v; > + Datum old_value = PG_GETARG_DATUM(1); > + Oid element_type; > + ArrayType *result; > + Datum *values; > + bool *nulls; > + Datum elt; > + int ndim; > + int *dim; > + int nitems; > + int *dimresult; > + int nresult; > + int i; > + int32 nbytes = 0; > + int32 dataoffset; > + bool hasnulls; > + int typlen; > + bool typbyval; > + char typalign; > + char *s; > + bits8 *bitmap; > + int bitmask; > + Oid collation = PG_GET_COLLATION(); > + TypeCacheEntry *typentry; > + FunctionCallInfoData locfcinfo; > + > + /* > + * If the first argument is null > + * return NULL > + */ > + if (PG_ARGISNULL(0)) > + PG_RETURN_NULL(); > + > + v = PG_GETARG_ARRAYTYPE_P(0); > + > + /* > + * If second argument is NULL, no match is possible > + * so return the first argument unchanged > + */ > + if (PG_ARGISNULL(1)) > + PG_RETURN_ARRAYTYPE_P(v); > + > + ndim = ARR_NDIM(v); > + > + /* > + * If used on a multi-dimensional array the matching elements > + * will be replaced with NULLs as fallback. > + */ > + if (ndim > 1) { > + fcinfo->nargs = 3; > + fcinfo->argnull[2]=true; > + return array_replace(fcinfo); > + } > + > + dim = ARR_DIMS(v); > + element_type = ARR_ELEMTYPE(v); > + nitems = ArrayGetNItems(ndim, dim); > + > + /* Check for empty array */ > + if (nitems <= 0) > + { > + /* Return empty array */ > + PG_RETURN_ARRAYTYPE_P(construct_empty_array(element_type)); > + } > + > + /* > + * We arrange to look up the equality function only once per series of > + * calls, assuming the element type doesn't change underneath us. The > + * typcache is used so that we have no memory leakage when being used > + * as an index support function. > + */ The second sentence of the comment does not apply in this function. > + typentry = (TypeCacheEntry *) fcinfo->flinfo->fn_extra; > + if (typentry == NULL || > + typentry->type_id != element_type) > + { > + typentry = lookup_type_cache(element_type, > + TYPECACHE_EQ_OPR_FINFO); > + if (!OidIsValid(typentry->eq_opr_finfo.fn_oid)) > + ereport(ERROR, > + (errcode(ERRCODE_UNDEFINED_FUNCTION), > + errmsg("could not identify an equality operator for type %s", > + format_type_be(element_type)))); > + fcinfo->flinfo->fn_extra = (void *) typentry; > + } > + typlen = typentry->typlen; > + typbyval = typentry->typbyval; > + typalign = typentry->typalign; > + > + /* > + * apply the operator to each pair of array elements. > + */ > + InitFunctionCallInfoData(locfcinfo, &typentry->eq_opr_finfo, 2, > + collation, NULL, NULL); > + > + /* Allocate temporary arrays for new values */ > + values = (Datum *) palloc(nitems * sizeof(Datum)); > + nulls = (bool *) palloc(nitems * sizeof(bool)); > + > + /* Loop over source data */ > + s = ARR_DATA_PTR(v); > + bitmap = ARR_NULLBITMAP(v); > + bitmask = 1; > + hasnulls = false; > + nresult=0; > + > + for (i = 0; i < nitems; i++) > + { > + bool isNull; > + bool oprresult; > + bool skip; > + > + /* Get source element, checking for NULL */ > + if (bitmap && (*bitmap & bitmask) == 0) > + { > + isNull = true; > + skip = false; > + } > + else > + { > + elt = fetch_att(s, typbyval, typlen); > + s = att_addlength_datum(s, typlen, elt); > + s = (char *) att_align_nominal(s, typalign); > + isNull = false; > + > + /* > + * Apply the operator to the element pair > + */ > + locfcinfo.arg[0] = elt; > + locfcinfo.arg[1] = old_value; > + locfcinfo.argnull[0] = false; > + locfcinfo.argnull[1] = false; > + locfcinfo.isnull = false; > + oprresult = DatumGetBool(FunctionCallInvoke(&locfcinfo)); > + if (!oprresult) > + { > + values[nresult] = elt; > + skip = false; > + } > + else { > + skip = true; > + } Remove braces. > + } > + > + if (!skip) > + { > + nulls[nresult] = isNull; > + if (isNull) > + hasnulls = true; > + else > + { > + /* Ensure data is not toasted */ > + if (typlen == -1) > + values[nresult] = PointerGetDatum(PG_DETOAST_DATUM(values[nresult])); Shouldn't be needed; we just pulled the values from an array. > + /* Update total result size */ > + nbytes = att_addlength_datum(nbytes, typlen, values[nresult]); > + nbytes = att_align_nominal(nbytes, typalign); > + /* check for overflow of total request */ > + if (!AllocSizeIsValid(nbytes)) > + ereport(ERROR, > + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), > + errmsg("array size exceeds the maximum allowed (%d)", > + (int) MaxAllocSize))); This test should never fail. Either convert it to an Assert or just add a comment to that effect. > + } > + nresult++; > + } > + > + /* advance bitmap pointer if any */ > + if (bitmap) > + { > + bitmask <<= 1; > + if (bitmask == 0x100) > + { > + bitmap++; > + bitmask = 1; > + } > + } > + } > + > + /* Allocate and initialize the result array */ Is it worth tracking whether we didn't find anything to remove and just returning the old array in that case? > + if (hasnulls) > + { > + dataoffset = ARR_OVERHEAD_WITHNULLS(ndim, nresult); > + nbytes += dataoffset; > + } > + else > + { > + dataoffset = 0; /* marker for no null bitmap */ > + nbytes += ARR_OVERHEAD_NONULLS(ndim); > + } > + result = (ArrayType *) palloc0(nbytes); > + SET_VARSIZE(result, nbytes); > + result->ndim = ndim; > + result->dataoffset = dataoffset; > + result->elemtype = element_type; > + memcpy(ARR_DIMS(result), ARR_DIMS(v), 2 * ndim * sizeof(int)); > + > + /* Adjust the final length */ > + ARR_DIMS(result)[0] = nresult; > + > + /* > + * Note: do not risk trying to pfree the results of the called function > + */ The comment does not apply here; a comparison function has no result to free. > + CopyArrayEls(result, > + values, nulls, nresult, > + typlen, typbyval, typalign, > + false); > + > + pfree(values); > + pfree(nulls); > + > + PG_RETURN_ARRAYTYPE_P(result); > + } > + > + /* > + * Replace any occurrence of an element in an array > + */ > + Datum > + array_replace(PG_FUNCTION_ARGS) > + { > + ArrayType *v; > + Datum old_value = PG_GETARG_DATUM(1); > + Datum new_value = PG_GETARG_DATUM(2); > + bool new_value_isnull = PG_ARGISNULL(2); > + Oid element_type; > + ArrayType *result; > + Datum *values; > + bool *nulls; > + Datum elt; > + int *dim; > + int ndim; > + int nitems; > + int i; > + int32 nbytes = 0; > + int32 dataoffset; > + bool hasnulls; > + int typlen; > + bool typbyval; > + char typalign; > + char *s; > + bits8 *bitmap; > + int bitmask; > + Oid collation = PG_GET_COLLATION(); > + TypeCacheEntry *typentry; > + FunctionCallInfoData locfcinfo; > + > + /* > + * If the first argument is null > + * return NULL > + */ > + if (PG_ARGISNULL(0)) > + PG_RETURN_NULL(); > + > + v = PG_GETARG_ARRAYTYPE_P(0); > + > + /* > + * If second argument is NULL, no match is possible > + * so return the first argument unchanged > + */ > + if (PG_ARGISNULL(1)) > + PG_RETURN_ARRAYTYPE_P(v); > + > + ndim = ARR_NDIM(v); > + dim = ARR_DIMS(v); > + element_type = ARR_ELEMTYPE(v); > + nitems = ArrayGetNItems(ndim, dim); > + > + /* Check for empty array */ > + if (nitems <= 0) > + { > + /* Return empty array */ > + PG_RETURN_ARRAYTYPE_P(construct_empty_array(element_type)); > + } > + > + /* > + * We arrange to look up the equality function only once per series of > + * calls, assuming the element type doesn't change underneath us. The > + * typcache is used so that we have no memory leakage when being used > + * as an index support function. > + */ The second sentence does not apply here, either. > + typentry = (TypeCacheEntry *) fcinfo->flinfo->fn_extra; > + if (typentry == NULL || > + typentry->type_id != element_type) > + { > + typentry = lookup_type_cache(element_type, > + TYPECACHE_EQ_OPR_FINFO); > + if (!OidIsValid(typentry->eq_opr_finfo.fn_oid)) > + ereport(ERROR, > + (errcode(ERRCODE_UNDEFINED_FUNCTION), > + errmsg("could not identify an equality operator for type %s", > + format_type_be(element_type)))); > + fcinfo->flinfo->fn_extra = (void *) typentry; > + } > + typlen = typentry->typlen; > + typbyval = typentry->typbyval; > + typalign = typentry->typalign; > + > + /* > + * apply the operator to each pair of array elements. > + */ > + InitFunctionCallInfoData(locfcinfo, &typentry->eq_opr_finfo, 2, > + collation, NULL, NULL); > + > + /* Allocate temporary arrays for new values */ > + values = (Datum *) palloc(nitems * sizeof(Datum)); > + nulls = (bool *) palloc(nitems * sizeof(bool)); > + > + /* Loop over source data */ > + s = ARR_DATA_PTR(v); > + bitmap = ARR_NULLBITMAP(v); > + bitmask = 1; > + hasnulls = false; > + > + for (i = 0; i < nitems; i++) > + { > + bool isNull; > + bool oprresult; > + > + /* Get source element, checking for NULL */ > + if (bitmap && (*bitmap & bitmask) == 0) > + { > + isNull = true; > + } > + else > + { > + elt = fetch_att(s, typbyval, typlen); > + s = att_addlength_datum(s, typlen, elt); > + s = (char *) att_align_nominal(s, typalign); > + isNull = false; > + > + /* > + * Apply the operator to the element pair > + */ > + locfcinfo.arg[0] = elt; > + locfcinfo.arg[1] = old_value; > + locfcinfo.argnull[0] = false; > + locfcinfo.argnull[1] = false; > + locfcinfo.isnull = false; > + oprresult = DatumGetBool(FunctionCallInvoke(&locfcinfo)); > + if (!oprresult) > + { > + values[i] = elt; > + } > + else if (!new_value_isnull) > + { > + values[i] = new_value; > + } > + else { > + isNull = true; > + } Remove the braces around single-statement blocks. > + } > + > + nulls[i] = isNull; > + if (isNull) > + hasnulls = true; > + else > + { > + /* Ensure data is not toasted */ > + if (typlen == -1) > + values[i] = PointerGetDatum(PG_DETOAST_DATUM(values[i])); The only value that might need a detoast is new_value. Do so once at the top of the function. > + /* Update total result size */ > + nbytes = att_addlength_datum(nbytes, typlen, values[i]); > + nbytes = att_align_nominal(nbytes, typalign); > + /* check for overflow of total request */ > + if (!AllocSizeIsValid(nbytes)) > + ereport(ERROR, > + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), > + errmsg("array size exceeds the maximum allowed (%d)", > + (int) MaxAllocSize))); > + } > + > + /* advance bitmap pointer if any */ > + if (bitmap) > + { > + bitmask <<= 1; > + if (bitmask == 0x100) > + { > + bitmap++; > + bitmask = 1; > + } > + } > + } > + > + /* Allocate and initialize the result array */ > + if (hasnulls) > + { > + dataoffset = ARR_OVERHEAD_WITHNULLS(ndim, nitems); > + nbytes += dataoffset; > + } > + else > + { > + dataoffset = 0; /* marker for no null bitmap */ > + nbytes += ARR_OVERHEAD_NONULLS(ndim); > + } > + result = (ArrayType *) palloc0(nbytes); > + SET_VARSIZE(result, nbytes); > + result->ndim = ndim; > + result->dataoffset = dataoffset; > + result->elemtype = element_type; > + memcpy(ARR_DIMS(result), ARR_DIMS(v), 2 * ndim * sizeof(int)); > + > + /* > + * Note: do not risk trying to pfree the results of the called function > + */ The comment does not apply here; a comparison function has no result to free. > + CopyArrayEls(result, > + values, nulls, nitems, > + typlen, typbyval, typalign, > + false); > + > + pfree(values); > + pfree(nulls); > + > + PG_RETURN_ARRAYTYPE_P(result); > + } > diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c > index 03a974a..58a92e1 100644 > *** a/src/backend/utils/adt/ri_triggers.c > --- b/src/backend/utils/adt/ri_triggers.c > *************** typedef struct RI_ConstraintInfo > *** 106,111 **** > --- 110,116 ---- > NameData conname; /* name of the FK constraint */ > Oid pk_relid; /* referenced relation */ > Oid fk_relid; /* referencing relation */ > + bool confisarray; Comment that member. > + Datum > + RI_FKey_arrcascade_upd(PG_FUNCTION_ARGS) > + { These belong earlier in the file, adjacent to the other PK trigger functions. > + /* ---------- > + * The query string built is > + * UPDATE ONLY <fktable> SET fkatt1 = array_replace(fkatt1, $n, $1) [, ...] > + * WHERE $n = fkatt1 [AND ...] > + * The type id's for the $ parameters are those of the > + * corresponding PK attributes. > + * ---------- > + */ RI_FKey_cascade_upd() has this to say at the corresponding juncture: /* ---------- * The query string built is * UPDATE ONLY <fktable> SET fkatt1 = $1[, ...] * WHERE $n = fkatt1 [AND ...] * The type id's for the $ parameters are thoseof the * corresponding PK attributes. Note that we are assuming * there is an assignment castfrom the PK to the FK type; * else the parser will fail. * ---------- */ Since we're matching a polymorphic function here, the types of the second and third arguments must precisely match the element type of the first argument. Even when an implicit cast exists, we must insert cast syntax. Test case: BEGIN; CREATE TABLE parent (c smallint PRIMARY KEY); CREATE TABLE child (c int[] EACH REFERENCES parentON UPDATE ARRAY CASCADEON DELETE ARRAY CASCADE); INSERT INTO parent VALUES (1), (2); INSERT INTO child VALUES ('{1,2}'); UPDATE parent SET c = 3 WHERE c = 2; DELETE FROM parent WHERE c = 1; ROLLBACK; > + initStringInfo(&querybuf); > + initStringInfo(&qualbuf); > + quoteRelationName(fkrelname, fk_rel); > + appendStringInfo(&querybuf, "UPDATE ONLY %s SET", fkrelname); > + querysep = ""; > + qualsep = "WHERE"; > + for (i = 0, j = riinfo.nkeys; i < riinfo.nkeys; i++, j++) > + { > + Oid pk_type = RIAttType(pk_rel, riinfo.pk_attnums[i]); > + Oid fk_type = RIAttType(fk_rel, riinfo.fk_attnums[i]); > + > + quoteOneName(attname, > + RIAttName(fk_rel, riinfo.fk_attnums[i])); > + appendStringInfo(&querybuf, > + "%s %s = array_replace(%s, $%d, $%d)", > + querysep, attname, attname, j + 1, i + 1); Break this line to keep it within 78 columns. Otherwise, pgindent will reverse-indent it. There are some other examples in your patch. Please run your patch through this command, inspect the output, and fix any that don't belong: <patchfile expand -t4 | awk 'length > 78' > *** a/src/include/catalog/pg_constraint.h > --- b/src/include/catalog/pg_constraint.h > *************** CATALOG(pg_constraint,2606) > *** 78,83 **** > --- 78,84 ---- > * constraint. Otherwise confrelid is 0 and the char fields are spaces. > */ > Oid confrelid; /* relation referenced by foreign key */ > + bool confisarray; /* true if an EACH REFERENCE foreign key */ > char confupdtype; /* foreign key's ON UPDATE action */ > char confdeltype; /* foreign key's ON DELETE action */ > char confmatchtype; /* foreign key's match type */ Putting the field at this location adds three bytes of padding. Please locate it elsewhere to avoid that. > *** a/src/include/catalog/pg_proc.h > --- b/src/include/catalog/pg_proc.h > *************** DESCR("referential integrity ON DELETE N > *** 1976,1981 **** > --- 1981,1995 ---- > DATA(insert OID = 1655 ( RI_FKey_noaction_upd PGNSP PGUID 12 1 0 0 0 f f f t f v 0 0 2279 "" _null_ _null_ _null_ _null_RI_FKey_noaction_upd _null_ _null_ _null_ )); > DESCR("referential integrity ON UPDATE NO ACTION"); > > + DATA(insert OID = 3144 ( RI_FKey_arrcascade_del PGNSP PGUID 12 1 0 0 0 f f f t f v 0 0 2279 "" _null_ _null_ _null__null_ RI_FKey_arrcascade_del _null_ _null_ _null_ )); > + DESCR("referential integrity ON DELETE CASCADE"); > + DATA(insert OID = 3145 ( RI_FKey_arrcascade_upd PGNSP PGUID 12 1 0 0 0 f f f t f v 0 0 2279 "" _null_ _null_ _null__null_ RI_FKey_arrcascade_upd _null_ _null_ _null_ )); > + DESCR("referential integrity ON UPDATE CASCADE"); > + DATA(insert OID = 3146 ( RI_FKey_arrsetnull_del PGNSP PGUID 12 1 0 0 0 f f f t f v 0 0 2279 "" _null_ _null_ _null__null_ RI_FKey_arrsetnull_del _null_ _null_ _null_ )); > + DESCR("referential integrity ON DELETE SET NULL"); > + DATA(insert OID = 3147 ( RI_FKey_arrsetnull_upd PGNSP PGUID 12 1 0 0 0 f f f t f v 0 0 2279 "" _null_ _null_ _null__null_ RI_FKey_arrsetnull_upd _null_ _null_ _null_ )); > + DESCR("referential integrity ON UPDATE SET NULL"); Those descriptions need to reflect the actual clauses involved. Thanks, nm
pgsql-hackers by date: