*** a/doc/src/sgml/catalogs.sgml --- b/doc/src/sgml/catalogs.sgml *************** *** 1986,1992 **** --- 1986,1994 ---- a = no action, r = restrict, c = cascade, + C = EACH cascade, n = set null, + N = EACH set null, d = set default *************** *** 1999,2005 **** --- 2001,2009 ---- a = no action, r = restrict, c = cascade, + C = EACH cascade, n = set null, + N = EACH set null, d = set default *************** *** 2102,2107 **** --- 2106,2118 ---- If a check constraint, a human-readable representation of the expression + + + confiseach + bool + + If a foreign key constraint, is it an EACH foreign key? + *** a/doc/src/sgml/ddl.sgml --- b/doc/src/sgml/ddl.sgml *************** *** 866,871 **** CREATE TABLE order_items ( --- 866,1006 ---- + + EACH Foreign Key Constraints + + + EACH foreign key + + + + constraint + EACH foreign key + + + + referential integrity + + + + Another option you have with foreign keys is to use a + referencing column which is an array of elements with + the same type (or a compatible one) as the referenced + column in the related table. This feature is called + EACH foreign key and is implemented + in PostgreSQL with EACH foreign key constraints, + as described in the following example: + + + CREATE TABLE drivers ( + driver_id integer PRIMARY KEY, + first_name text, + last_name text, + ... + ); + + CREATE TABLE races ( + race_id integer PRIMARY KEY, + title text, + race_day DATE, + ... + final_positions integer[] EACH REFERENCES drivers + ); + + + The above example uses an array (final_positions) + to store the results of a race: for each of its elements + a referential integrity check is enforced on the + drivers table. + Note that EACH REFERENCES is an extension + of PostgreSQL and it is not included in the SQL standard. + + + + As far as referential actions are concerned, + when working with EACH foreign keys, you have two more + options that can be used with your + EACH REFERENCES definition: + EACH CASCADE and + EACH SET NULL. Depending on + the triggering action (DELETE or + UPDATE) on the referenced table, + every element in the referencing array will be either + deleted/updated or set to the null value. + + + + For instance, you can change the definition of the + races table so that a DELETE + on the drivers table will remove + the referencing elements from the final_positions + array: + + + CREATE TABLE races ( + ... + final_positions integer[] EACH REFERENCES drivers + ON DELETE EACH CASCADE ON UPDATE EACH CASCADE + ); + + + Consequently, an UPDATE of + the driver_id column will be propagated + to any element of the final_positions + field in the races table. + + + + Even though the most common use case for EACH foreign keys + is on a single column key, you can define an EACH foreign + key constraint on a group of columns. As the following + example shows, it must be written in table constraint form: + + CREATE TABLE available_moves ( + kind text, + move text, + description text, + PRIMARY KEY (kind, move) + ); + + CREATE TABLE paths ( + description text, + kind text, + moves text[], + FOREIGN KEY (kind, EACH moves) REFERENCES available_moves (kind, move) + ); + + INSERT INTO available_moves VALUES ('relative', 'LN', 'look north'); + INSERT INTO available_moves VALUES ('relative', 'RL', 'rotate left'); + INSERT INTO available_moves VALUES ('relative', 'RR', 'rotate right'); + INSERT INTO available_moves VALUES ('relative', 'MF', 'move forward'); + INSERT INTO available_moves VALUES ('absolute', 'N', 'move north'); + INSERT INTO available_moves VALUES ('absolute', 'S', 'move south'); + INSERT INTO available_moves VALUES ('absolute', 'E', 'move east'); + INSERT INTO available_moves VALUES ('absolute', 'W', 'move west'); + + INSERT INTO paths VALUES ('L-shaped path', 'relative', '{LN, RL, MF, RR, MF, MF}'); + INSERT INTO paths VALUES ('L-shaped path', 'absolute', '{W, N, N}'); + + + On top of standard foreign key + requirements, EACH foreign key constraints + require that the referencing column is an array of a compatible + type of the corresponding referenced column. Current + implementation forbids EACH CASCADE + and EACH SET NULL actions on multi-column + EACH foreign key constraints. + + + + For more detailed information on EACH foreign key + options and special cases, please refer to the documentation + for and + . + + + + Exclusion Constraints *** a/doc/src/sgml/func.sgml --- b/doc/src/sgml/func.sgml *************** *** 10309,10314 **** SELECT NULLIF(value, '(none)') ... --- 10309,10320 ---- array_prepend + array_remove + + + array_replace + + array_to_string *************** *** 10427,10432 **** SELECT NULLIF(value, '(none)') ... --- 10433,10461 ---- + array_remove(anyarray, anyelement) + + + anyarray + remove from the array all the elements equal to a given value + (requires a one-dimensional array) + array_remove(ARRAY[1,2,2,3], 2) + {1,3} + + + + + array_replace(anyarray, anyelement, anyelement) + + + anyarray + replaces with a new value each element in the array equal to a given value + array_replace(ARRAY[1,2,5,4], 5, 3) + {1,2,3,4} + + + + array_to_string(anyarray, text , text) *************** *** 10504,10509 **** SELECT NULLIF(value, '(none)') ... --- 10533,10543 ---- + array_remove can only be used with + uni-dimensional arrays. + + + See also about the aggregate function array_agg for use with arrays. *** a/doc/src/sgml/ref/create_table.sgml --- b/doc/src/sgml/ref/create_table.sgml *************** *** 51,57 **** CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXI DEFAULT default_expr | UNIQUE index_parameters | PRIMARY KEY index_parameters | ! REFERENCES reftable [ ( refcolumn ) ] [ MATCH FULL | MATCH PARTIAL | MATCH SIMPLE ] [ ON DELETE action ] [ ON UPDATE action ] } [ DEFERRABLE | NOT DEFERRABLE ] [ INITIALLY DEFERRED | INITIALLY IMMEDIATE ] --- 51,57 ---- DEFAULT default_expr | UNIQUE index_parameters | PRIMARY KEY index_parameters | ! [EACH] REFERENCES reftable [ ( refcolumn ) ] [ MATCH FULL | MATCH PARTIAL | MATCH SIMPLE ] [ ON DELETE action ] [ ON UPDATE action ] } [ DEFERRABLE | NOT DEFERRABLE ] [ INITIALLY DEFERRED | INITIALLY IMMEDIATE ] *************** *** 62,68 **** CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXI UNIQUE ( column_name [, ... ] ) index_parameters | PRIMARY KEY ( column_name [, ... ] ) index_parameters | EXCLUDE [ USING index_method ] ( exclude_element WITH operator [, ... ] ) index_parameters [ WHERE ( predicate ) ] | ! FOREIGN KEY ( column_name [, ... ] ) REFERENCES reftable [ ( refcolumn [, ... ] ) ] [ MATCH FULL | MATCH PARTIAL | MATCH SIMPLE ] [ ON DELETE action ] [ ON UPDATE action ] } [ DEFERRABLE | NOT DEFERRABLE ] [ INITIALLY DEFERRED | INITIALLY IMMEDIATE ] --- 62,68 ---- UNIQUE ( column_name [, ... ] ) index_parameters | PRIMARY KEY ( column_name [, ... ] ) index_parameters | EXCLUDE [ USING index_method ] ( exclude_element WITH operator [, ... ] ) index_parameters [ WHERE ( predicate ) ] | ! FOREIGN KEY ( [EACH] column_name [, ... ] ) REFERENCES reftable [ ( refcolumn [, ... ] ) ] [ MATCH FULL | MATCH PARTIAL | MATCH SIMPLE ] [ ON DELETE action ] [ ON UPDATE action ] } [ DEFERRABLE | NOT DEFERRABLE ] [ INITIALLY DEFERRED | INITIALLY IMMEDIATE ] *************** *** 563,572 **** CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXI ! REFERENCES reftable [ ( refcolumn ) ] [ MATCH matchtype ] [ ON DELETE action ] [ ON UPDATE action ] (column constraint) ! FOREIGN KEY ( column [, ... ] ) REFERENCES reftable [ ( refcolumn [, ... ] ) ] [ MATCH matchtype ] [ ON DELETE action ] --- 563,573 ---- ! REFERENCES reftable [ ( refcolumn ) ] [ MATCH matchtype ] [ ON DELETE action ] [ ON UPDATE action ] (column constraint) ! ! FOREIGN KEY ( [EACH] column [, ... ] ) REFERENCES reftable [ ( refcolumn [, ... ] ) ] [ MATCH matchtype ] [ ON DELETE action ] *************** *** 588,593 **** CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXI --- 589,606 ---- + In case the column name column + is prepended with the EACH keyword and column is an array of elements compatible + with the corresponding refcolumn + in reftable, an + each foreign key constraint is put in place (see for more information). + Multi-column keys with more than one EACH column + are currently not allowed. + + + A value inserted into the referencing column(s) is matched against the values of the referenced table and referenced columns using the given match type. There are three match types: MATCH *************** *** 647,652 **** CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXI --- 660,667 ---- Delete any rows referencing the deleted row, or update the value of the referencing column to the new value of the referenced column, respectively. + With each foreign key constraints, it is limited + to the ON DELETE action only. *************** *** 668,673 **** CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXI --- 683,719 ---- + + + EACH CASCADE + + + EACH CASCADE can be used only with + each foreign key constraints. + Delete any element in the array that is referencing + the deleted row, or update the value of every + referencing element in the array to the new value of the + referenced column, respectively. + If the referencing array is multi-dimensional, then + the ON DELETE action behaves in the same + way as under the RESTRICT clause. + Current implementation is limited to single-column foreign keys. + + + + + + EACH SET NULL + + + EACH SET NULL can be used only with + each foreign key constraints. + Set any referencing element in the array to null. + Current implementation is limited to single-column foreign keys. + + + + *************** *** 680,685 **** CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXI --- 726,835 ---- + + EACH REFERENCES reftable [ ( refcolumn ) ] [ MATCH matchtype ] [ ON DELETE action ] [ ON UPDATE action ] (column constraint) + + + + The EACH REFERENCES definition specifies + an EACH foreign key, a special kind of foreign key + constraint requiring the referencing column to be an array of elements + of the same type (or a compatible one) as the referenced column + in the referenced table. The value of each element of the + refcolumn array + will be matched against some row of reftable. + + + + EACH foreign keys are an extension of PostgreSQL + and are not included in the SQL standard. + + + + Even with EACH foreign keys, modifications in the referenced + column can trigger actions to be performed on the referencing array. + Similarly to standard foreign keys, you can specify these + actions using the ON DELETE and + ON UPDATE clauses. + There are the following possible actions for each clause: + + + + NO ACTION + + + Same as standard foreign key constraints. This is the default action. + + + + + + RESTRICT + + + Same as standard foreign key constraints. + + + + + + CASCADE + + + Can be used with ON DELETE only. + Delete any rows referencing any of the deleted rows. + + + + + + SET NULL + + + Set the referencing array column to null. + + + + + + SET DEFAULT + + + Set the referencing array column to the default value. + + + + + + EACH CASCADE + + + Delete any element in the array that is referencing + the deleted row, or update the value of every + referencing element in the array to the new value of the + referenced column, respectively. + If the referencing array is multi-dimensional, then + the ON DELETE action behaves in the same + way as under the RESTRICT clause. + + + + + + EACH SET NULL + + + Set any referencing element in the array to null. + + + + + + + + + DEFERRABLE NOT DEFERRABLE *************** *** 1426,1431 **** CREATE TABLE employees OF employee_type ( --- 1576,1592 ---- effect can be had using the OID feature. + + + EACH Foreign Keys + + + EACH foreign keys, the EACH REFERENCES, + EACH CASCADE and EACH SET + NULL clauses are a PostgreSQL + extension. + + *** a/src/backend/catalog/heap.c --- b/src/backend/catalog/heap.c *************** *** 1943,1948 **** StoreRelCheck(Relation rel, char *ccname, Node *expr, --- 1943,1950 ---- NULL, NULL, 0, + false, + NULL, ' ', ' ', ' ', *** a/src/backend/catalog/index.c --- b/src/backend/catalog/index.c *************** *** 1147,1152 **** index_constraint_create(Relation heapRelation, --- 1147,1154 ---- NULL, NULL, 0, + false, + NULL, ' ', ' ', ' ', *** a/src/backend/catalog/information_schema.sql --- b/src/backend/catalog/information_schema.sql *************** *** 1165,1171 **** CREATE VIEW referential_constraints AS --- 1165,1173 ---- CAST( CASE con.confupdtype WHEN 'c' THEN 'CASCADE' + WHEN 'C' THEN 'EACH CASCADE' WHEN 'n' THEN 'SET NULL' + WHEN 'N' THEN 'EACH SET NULL' WHEN 'd' THEN 'SET DEFAULT' WHEN 'r' THEN 'RESTRICT' WHEN 'a' THEN 'NO ACTION' END *************** *** 1173,1183 **** CREATE VIEW referential_constraints AS CAST( CASE con.confdeltype WHEN 'c' THEN 'CASCADE' WHEN 'n' THEN 'SET NULL' WHEN 'd' THEN 'SET DEFAULT' WHEN 'r' THEN 'RESTRICT' WHEN 'a' THEN 'NO ACTION' END ! AS character_data) AS delete_rule FROM (pg_namespace ncon INNER JOIN pg_constraint con ON ncon.oid = con.connamespace --- 1175,1189 ---- CAST( CASE con.confdeltype WHEN 'c' THEN 'CASCADE' + WHEN 'C' THEN 'EACH CASCADE' WHEN 'n' THEN 'SET NULL' + WHEN 'N' THEN 'EACH SET NULL' WHEN 'd' THEN 'SET DEFAULT' WHEN 'r' THEN 'RESTRICT' WHEN 'a' THEN 'NO ACTION' END ! AS character_data) AS delete_rule, ! ! con.confiseach AS is_each FROM (pg_namespace ncon INNER JOIN pg_constraint con ON ncon.oid = con.connamespace *** a/src/backend/catalog/pg_constraint.c --- b/src/backend/catalog/pg_constraint.c *************** *** 58,63 **** CreateConstraintEntry(const char *constraintName, --- 58,65 ---- const Oid *ppEqOp, const Oid *ffEqOp, int foreignNKeys, + bool confisEach, + const bool *foreignEach, char foreignUpdateType, char foreignDeleteType, char foreignMatchType, *************** *** 76,81 **** CreateConstraintEntry(const char *constraintName, --- 78,84 ---- Datum values[Natts_pg_constraint]; ArrayType *conkeyArray; ArrayType *confkeyArray; + ArrayType *confeachArray; ArrayType *conpfeqopArray; ArrayType *conppeqopArray; ArrayType *conffeqopArray; *************** *** 126,135 **** CreateConstraintEntry(const char *constraintName, --- 129,144 ---- fkdatums[i] = ObjectIdGetDatum(ffEqOp[i]); conffeqopArray = construct_array(fkdatums, foreignNKeys, OIDOID, sizeof(Oid), true, 'i'); + for (i = 0; i < foreignNKeys; i++) { + fkdatums[i] = BoolGetDatum(foreignEach[i]); + } + confeachArray = construct_array(fkdatums, foreignNKeys, + BOOLOID, 1, true, 'c'); } else { confkeyArray = NULL; + confeachArray = NULL; conpfeqopArray = NULL; conppeqopArray = NULL; conffeqopArray = NULL; *************** *** 171,176 **** CreateConstraintEntry(const char *constraintName, --- 180,186 ---- values[Anum_pg_constraint_conislocal - 1] = BoolGetDatum(conIsLocal); values[Anum_pg_constraint_coninhcount - 1] = Int32GetDatum(conInhCount); values[Anum_pg_constraint_conisonly - 1] = BoolGetDatum(conIsOnly); + values[Anum_pg_constraint_coniseach - 1] = BoolGetDatum(confisEach); if (conkeyArray) values[Anum_pg_constraint_conkey - 1] = PointerGetDatum(conkeyArray); *************** *** 182,187 **** CreateConstraintEntry(const char *constraintName, --- 192,202 ---- else nulls[Anum_pg_constraint_confkey - 1] = true; + if (confeachArray) + values[Anum_pg_constraint_confeach - 1] = PointerGetDatum(confeachArray); + else + nulls[Anum_pg_constraint_confeach - 1] = true; + if (conpfeqopArray) values[Anum_pg_constraint_conpfeqop - 1] = PointerGetDatum(conpfeqopArray); else *** a/src/backend/commands/tablecmds.c --- b/src/backend/commands/tablecmds.c *************** *** 35,40 **** --- 35,41 ---- #include "catalog/pg_inherits_fn.h" #include "catalog/pg_namespace.h" #include "catalog/pg_opclass.h" + #include "catalog/pg_operator.h" #include "catalog/pg_tablespace.h" #include "catalog/pg_trigger.h" #include "catalog/pg_type.h" *************** *** 5714,5719 **** ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, --- 5715,5721 ---- Relation pkrel; int16 pkattnum[INDEX_MAX_KEYS]; int16 fkattnum[INDEX_MAX_KEYS]; + bool fkatteach[INDEX_MAX_KEYS]; Oid pktypoid[INDEX_MAX_KEYS]; Oid fktypoid[INDEX_MAX_KEYS]; Oid opclasses[INDEX_MAX_KEYS]; *************** *** 5790,5795 **** ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, --- 5792,5798 ---- */ MemSet(pkattnum, 0, sizeof(pkattnum)); MemSet(fkattnum, 0, sizeof(fkattnum)); + MemSet(fkatteach, 0, sizeof(fkatteach)); MemSet(pktypoid, 0, sizeof(pktypoid)); MemSet(fktypoid, 0, sizeof(fktypoid)); MemSet(opclasses, 0, sizeof(opclasses)); *************** *** 5802,5807 **** ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, --- 5805,5843 ---- fkattnum, fktypoid); /* + * If an EACH FK, decode the content of the fk_each_attrs array. + */ + if (fkconstraint->fk_is_each) + { + ListCell *l; + int attnum; + bool each_found = false; + + attnum = 0; + foreach(l, fkconstraint->fk_each_attrs) + { + if (intVal(lfirst(l))) { + + /* + * Currently, the EACH flag cannot be set on more than + * one column. + */ + if (each_found) { + ereport(ERROR, + (errcode(ERRCODE_INVALID_FOREIGN_KEY), + errmsg("EACH foreign keys support only " + "one EACH column"))); + } + + fkatteach[attnum] = true; + each_found = true; + } + attnum++; + } + + } + + /* * If the attribute list for the referenced table was omitted, lookup the * definition of the primary key and use it. Otherwise, validate the * supplied attribute list. In either case, discover the index OID and *************** *** 5850,5855 **** ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, --- 5886,5935 ---- old_check_ok = (fkconstraint->old_conpfeqop != NIL); Assert(!old_check_ok || numfks == list_length(fkconstraint->old_conpfeqop)); + /* Enforce each foreign key restrictions */ + if (fkconstraint->fk_is_each) + { + /* + * ON UPDATE CASCADE action is not supported on EACH foreign keys + */ + if (fkconstraint->fk_upd_action == FKCONSTR_ACTION_CASCADE) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FOREIGN_KEY), + errmsg("ON UPDATE CASCADE action is not supported " + "on EACH foreign keys"), + errhint("Perhaps you meant to use the " + "ON UPDATE EACH CASCADE action"))); + + /* + * EACH CASCADE and EACH SET NULL actions are only available + * on single-column EACH foreign keys + */ + if (numpks > 1 && + (fkconstraint->fk_upd_action == FKCONSTR_ACTION_EACHCASCADE + || fkconstraint->fk_upd_action == FKCONSTR_ACTION_EACHSETNULL + || fkconstraint->fk_del_action == FKCONSTR_ACTION_EACHCASCADE + || fkconstraint->fk_del_action == FKCONSTR_ACTION_EACHSETNULL)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FOREIGN_KEY), + errmsg("EACH CASCADE and EACH SET NULL actions are only " + "available on single-column EACH foreign keys"))); + } + else + { + /* + * EACH CASCADE and EACH SET NULL actions are only available + * on single-column EACH foreign keys + */ + if (fkconstraint->fk_upd_action == FKCONSTR_ACTION_EACHCASCADE + || fkconstraint->fk_upd_action == FKCONSTR_ACTION_EACHSETNULL + || fkconstraint->fk_del_action == FKCONSTR_ACTION_EACHCASCADE + || fkconstraint->fk_del_action == FKCONSTR_ACTION_EACHSETNULL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FOREIGN_KEY), + errmsg("EACH CASCADE and EACH SET NULL actions are only " + "available on single-column EACH foreign keys"))); + } + for (i = 0; i < numpks; i++) { Oid pktype = pktypoid[i]; *************** *** 5898,5922 **** ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, elog(ERROR, "missing operator %d(%u,%u) in opfamily %u", eqstrategy, opcintype, opcintype, opfamily); ! /* ! * Are there equality operators that take exactly the FK type? Assume ! * we should look through any domain here. ! */ ! fktyped = getBaseType(fktype); ! ! pfeqop = get_opfamily_member(opfamily, opcintype, fktyped, ! eqstrategy); ! if (OidIsValid(pfeqop)) { ! pfeqop_right = fktyped; ! ffeqop = get_opfamily_member(opfamily, fktyped, fktyped, eqstrategy); } else { ! /* keep compiler quiet */ ! pfeqop_right = InvalidOid; ! ffeqop = InvalidOid; } if (!(OidIsValid(pfeqop) && OidIsValid(ffeqop))) --- 5978,6028 ---- elog(ERROR, "missing operator %d(%u,%u) in opfamily %u", eqstrategy, opcintype, opcintype, opfamily); ! if (fkatteach[i]) { ! /* ! * For every EACH FK, look if an equality operator that takes ! * exactly the FK element type exists. Assume we should look ! * through any domain here. ! */ ! pfeqop_right = get_base_element_type(fktype); ! if (!OidIsValid(pfeqop_right)) ! ereport(ERROR, ! (errcode(ERRCODE_DATATYPE_MISMATCH), ! errmsg("foreign key constraint \"%s\" " ! "cannot be implemented", ! fkconstraint->conname), ! errdetail("key column \"%s\" has type %s " ! "which is not an array type", ! strVal(list_nth(fkconstraint->fk_attrs, i)), ! format_type_be(fktype)))); ! ! ffeqop = ARRAY_EQ_OP; ! pfeqop = get_opfamily_member(opfamily, opcintype, pfeqop_right, eqstrategy); } else { ! /* ! * Are there equality operators that take exactly the FK type? ! * Assume we should look through any domain here. ! */ ! fktyped = getBaseType(fktype); ! ! pfeqop = get_opfamily_member(opfamily, opcintype, fktyped, ! eqstrategy); ! if (OidIsValid(pfeqop)) ! { ! pfeqop_right = fktyped; ! ffeqop = get_opfamily_member(opfamily, fktyped, fktyped, ! eqstrategy); ! } ! else ! { ! /* keep compiler quiet */ ! pfeqop_right = InvalidOid; ! ffeqop = InvalidOid; ! } } if (!(OidIsValid(pfeqop) && OidIsValid(ffeqop))) *************** *** 5934,5950 **** ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, Oid target_typeids[2]; input_typeids[0] = pktype; ! input_typeids[1] = fktype; target_typeids[0] = opcintype; target_typeids[1] = opcintype; if (can_coerce_type(2, input_typeids, target_typeids, COERCION_IMPLICIT)) { ! pfeqop = ffeqop = ppeqop; pfeqop_right = opcintype; } } if (!(OidIsValid(pfeqop) && OidIsValid(ffeqop))) ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), --- 6040,6080 ---- Oid target_typeids[2]; input_typeids[0] = pktype; ! if (fkatteach[i]) ! input_typeids[1] = get_element_type(fktype); ! else ! input_typeids[1] = fktype; target_typeids[0] = opcintype; target_typeids[1] = opcintype; if (can_coerce_type(2, input_typeids, target_typeids, COERCION_IMPLICIT)) { ! pfeqop = ppeqop; pfeqop_right = opcintype; + /* + * In case of an EACH FK, the ffeqop must be left untouched; + * otherwise we use the primary equality operator. + */ + if (!fkatteach[i]) + ffeqop = ppeqop; } } + /* + * In case of an EACH FK, make sure TYPECACHE_EQ_OPR exists for + * the FK element_type and it is compatible with pfeqop + */ + if (fkatteach[i]) + { + Oid fk_element_type = get_element_type(fktype); + TypeCacheEntry *typentry = lookup_type_cache(fk_element_type, + TYPECACHE_EQ_OPR); + if (!OidIsValid(typentry->eq_opr) + || !equality_ops_are_compatible(typentry->eq_opr, pfeqop)) + /* Error: incompatible operators */ + pfeqop = InvalidOid; + } + if (!(OidIsValid(pfeqop) && OidIsValid(ffeqop))) ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), *************** *** 6055,6060 **** ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, --- 6185,6192 ---- ppeqoperators, ffeqoperators, numpks, + fkconstraint->fk_is_each, + fkatteach, fkconstraint->fk_upd_action, fkconstraint->fk_del_action, fkconstraint->fk_matchtype, *************** *** 6833,6843 **** createForeignKeyTriggers(Relation rel, Constraint *fkconstraint, --- 6965,6985 ---- fk_trigger->initdeferred = false; fk_trigger->funcname = SystemFuncName("RI_FKey_cascade_del"); break; + case FKCONSTR_ACTION_EACHCASCADE: + fk_trigger->deferrable = false; + fk_trigger->initdeferred = false; + fk_trigger->funcname = SystemFuncName("RI_FKey_eachcascade_del"); + break; case FKCONSTR_ACTION_SETNULL: fk_trigger->deferrable = false; fk_trigger->initdeferred = false; fk_trigger->funcname = SystemFuncName("RI_FKey_setnull_del"); break; + case FKCONSTR_ACTION_EACHSETNULL: + fk_trigger->deferrable = false; + fk_trigger->initdeferred = false; + fk_trigger->funcname = SystemFuncName("RI_FKey_eachsetnull_del"); + break; case FKCONSTR_ACTION_SETDEFAULT: fk_trigger->deferrable = false; fk_trigger->initdeferred = false; *************** *** 6886,6896 **** createForeignKeyTriggers(Relation rel, Constraint *fkconstraint, --- 7028,7048 ---- fk_trigger->initdeferred = false; fk_trigger->funcname = SystemFuncName("RI_FKey_cascade_upd"); break; + case FKCONSTR_ACTION_EACHCASCADE: + fk_trigger->deferrable = false; + fk_trigger->initdeferred = false; + fk_trigger->funcname = SystemFuncName("RI_FKey_eachcascade_upd"); + break; case FKCONSTR_ACTION_SETNULL: fk_trigger->deferrable = false; fk_trigger->initdeferred = false; fk_trigger->funcname = SystemFuncName("RI_FKey_setnull_upd"); break; + case FKCONSTR_ACTION_EACHSETNULL: + fk_trigger->deferrable = false; + fk_trigger->initdeferred = false; + fk_trigger->funcname = SystemFuncName("RI_FKey_eachsetnull_upd"); + break; case FKCONSTR_ACTION_SETDEFAULT: fk_trigger->deferrable = false; fk_trigger->initdeferred = false; *** a/src/backend/commands/trigger.c --- b/src/backend/commands/trigger.c *************** *** 450,455 **** CreateTrigger(CreateTrigStmt *stmt, const char *queryString, --- 450,457 ---- NULL, NULL, 0, + false, + NULL, ' ', ' ', ' ', *************** *** 870,885 **** ConvertTriggerToFK(CreateTrigStmt *stmt, Oid funcoid) --- 872,891 ---- switch (funcoid) { case F_RI_FKEY_CASCADE_UPD: + case F_RI_FKEY_EACHCASCADE_UPD: case F_RI_FKEY_RESTRICT_UPD: case F_RI_FKEY_SETNULL_UPD: + case F_RI_FKEY_EACHSETNULL_UPD: case F_RI_FKEY_SETDEFAULT_UPD: case F_RI_FKEY_NOACTION_UPD: funcnum = 0; break; case F_RI_FKEY_CASCADE_DEL: + case F_RI_FKEY_EACHCASCADE_DEL: case F_RI_FKEY_RESTRICT_DEL: case F_RI_FKEY_SETNULL_DEL: + case F_RI_FKEY_EACHSETNULL_DEL: case F_RI_FKEY_SETDEFAULT_DEL: case F_RI_FKEY_NOACTION_DEL: funcnum = 1; *************** *** 984,995 **** ConvertTriggerToFK(CreateTrigStmt *stmt, Oid funcoid) --- 990,1007 ---- case F_RI_FKEY_CASCADE_UPD: fkcon->fk_upd_action = FKCONSTR_ACTION_CASCADE; break; + case F_RI_FKEY_EACHCASCADE_UPD: + fkcon->fk_upd_action = FKCONSTR_ACTION_EACHCASCADE; + break; case F_RI_FKEY_RESTRICT_UPD: fkcon->fk_upd_action = FKCONSTR_ACTION_RESTRICT; break; case F_RI_FKEY_SETNULL_UPD: fkcon->fk_upd_action = FKCONSTR_ACTION_SETNULL; break; + case F_RI_FKEY_EACHSETNULL_UPD: + fkcon->fk_upd_action = FKCONSTR_ACTION_EACHSETNULL; + break; case F_RI_FKEY_SETDEFAULT_UPD: fkcon->fk_upd_action = FKCONSTR_ACTION_SETDEFAULT; break; *************** *** 1005,1016 **** ConvertTriggerToFK(CreateTrigStmt *stmt, Oid funcoid) --- 1017,1034 ---- case F_RI_FKEY_CASCADE_DEL: fkcon->fk_del_action = FKCONSTR_ACTION_CASCADE; break; + case F_RI_FKEY_EACHCASCADE_DEL: + fkcon->fk_del_action = FKCONSTR_ACTION_EACHCASCADE; + break; case F_RI_FKEY_RESTRICT_DEL: fkcon->fk_del_action = FKCONSTR_ACTION_RESTRICT; break; case F_RI_FKEY_SETNULL_DEL: fkcon->fk_del_action = FKCONSTR_ACTION_SETNULL; break; + case F_RI_FKEY_EACHSETNULL_DEL: + fkcon->fk_del_action = FKCONSTR_ACTION_EACHSETNULL; + break; case F_RI_FKEY_SETDEFAULT_DEL: fkcon->fk_del_action = FKCONSTR_ACTION_SETDEFAULT; break; *** a/src/backend/commands/typecmds.c --- b/src/backend/commands/typecmds.c *************** *** 2948,2953 **** domainAddConstraint(Oid domainOid, Oid domainNamespace, Oid baseTypeOid, --- 2948,2955 ---- NULL, NULL, 0, + false, + NULL, ' ', ' ', ' ', *** a/src/backend/nodes/copyfuncs.c --- b/src/backend/nodes/copyfuncs.c *************** *** 2352,2357 **** _copyConstraint(const Constraint *from) --- 2352,2359 ---- COPY_SCALAR_FIELD(fk_upd_action); COPY_SCALAR_FIELD(fk_del_action); COPY_NODE_FIELD(old_conpfeqop); + COPY_SCALAR_FIELD(fk_is_each); + COPY_NODE_FIELD(fk_each_attrs); COPY_SCALAR_FIELD(skip_validation); COPY_SCALAR_FIELD(initially_valid); *************** *** 2371,2376 **** _copyDefElem(const DefElem *from) --- 2373,2389 ---- return newnode; } + static ForeignKeyColumnElem * + _copyForeignKeyColumnElem(const ForeignKeyColumnElem *from) + { + ForeignKeyColumnElem *newnode = makeNode(ForeignKeyColumnElem); + + COPY_NODE_FIELD(name); + COPY_SCALAR_FIELD(each); + + return newnode; + } + static LockingClause * _copyLockingClause(const LockingClause *from) { *************** *** 4415,4420 **** copyObject(const void *from) --- 4428,4436 ---- case T_DefElem: retval = _copyDefElem(from); break; + case T_ForeignKeyColumnElem: + retval = _copyForeignKeyColumnElem(from); + break; case T_LockingClause: retval = _copyLockingClause(from); break; *** a/src/backend/nodes/equalfuncs.c --- b/src/backend/nodes/equalfuncs.c *************** *** 2200,2205 **** _equalConstraint(const Constraint *a, const Constraint *b) --- 2200,2207 ---- COMPARE_SCALAR_FIELD(fk_upd_action); COMPARE_SCALAR_FIELD(fk_del_action); COMPARE_NODE_FIELD(old_conpfeqop); + COMPARE_SCALAR_FIELD(fk_is_each); + COMPARE_NODE_FIELD(fk_each_attrs); COMPARE_SCALAR_FIELD(skip_validation); COMPARE_SCALAR_FIELD(initially_valid); *************** *** 2218,2223 **** _equalDefElem(const DefElem *a, const DefElem *b) --- 2220,2235 ---- } static bool + _equalForeignKeyColumnElem(const ForeignKeyColumnElem *a, + const ForeignKeyColumnElem *b) + { + COMPARE_NODE_FIELD(name); + COMPARE_SCALAR_FIELD(each); + + return true; + } + + static bool _equalLockingClause(const LockingClause *a, const LockingClause *b) { COMPARE_NODE_FIELD(lockedRels); *************** *** 2974,2979 **** equal(const void *a, const void *b) --- 2986,2994 ---- case T_DefElem: retval = _equalDefElem(a, b); break; + case T_ForeignKeyColumnElem: + retval = _equalForeignKeyColumnElem(a, b); + break; case T_LockingClause: retval = _equalLockingClause(a, b); break; *** a/src/backend/nodes/outfuncs.c --- b/src/backend/nodes/outfuncs.c *************** *** 2057,2062 **** _outTableLikeClause(StringInfo str, const TableLikeClause *node) --- 2057,2071 ---- } static void + _outForeignKeyColumnElem(StringInfo str, const ForeignKeyColumnElem *node) + { + WRITE_NODE_TYPE("FOREIGNKEYCOLUMNELEM"); + + WRITE_NODE_FIELD(name); + WRITE_BOOL_FIELD(each); + } + + static void _outLockingClause(StringInfo str, const LockingClause *node) { WRITE_NODE_TYPE("LOCKINGCLAUSE"); *************** *** 2621,2626 **** _outConstraint(StringInfo str, const Constraint *node) --- 2630,2637 ---- WRITE_CHAR_FIELD(fk_upd_action); WRITE_CHAR_FIELD(fk_del_action); WRITE_NODE_FIELD(old_conpfeqop); + WRITE_BOOL_FIELD(fk_is_each); + WRITE_NODE_FIELD(fk_each_attrs); WRITE_BOOL_FIELD(skip_validation); WRITE_BOOL_FIELD(initially_valid); break; *************** *** 3122,3127 **** _outNode(StringInfo str, const void *obj) --- 3133,3141 ---- case T_TableLikeClause: _outTableLikeClause(str, obj); break; + case T_ForeignKeyColumnElem: + _outForeignKeyColumnElem(str, obj); + break; case T_LockingClause: _outLockingClause(str, obj); break; *** a/src/backend/parser/gram.y --- b/src/backend/parser/gram.y *************** *** 320,326 **** static void processCASbits(int cas_bits, int location, const char *constrType, execute_param_clause using_clause returning_clause opt_enum_val_list enum_val_list table_func_column_list create_generic_options alter_generic_options ! relation_expr_list dostmt_opt_list %type opt_fdw_options fdw_options %type fdw_option --- 320,326 ---- execute_param_clause using_clause returning_clause opt_enum_val_list enum_val_list table_func_column_list create_generic_options alter_generic_options ! relation_expr_list dostmt_opt_list foreignKeyColumnList %type opt_fdw_options fdw_options %type fdw_option *************** *** 379,385 **** static void processCASbits(int cas_bits, int location, const char *constrType, %type def_arg columnElem where_clause where_or_current_clause a_expr b_expr c_expr func_expr AexprConst indirection_el columnref in_expr having_clause func_table array_expr ! ExclusionWhereClause %type ExclusionConstraintList ExclusionConstraintElem %type func_arg_list %type func_arg_expr --- 379,385 ---- %type def_arg columnElem where_clause where_or_current_clause a_expr b_expr c_expr func_expr AexprConst indirection_el columnref in_expr having_clause func_table array_expr ! ExclusionWhereClause foreignKeyColumnElem %type ExclusionConstraintList ExclusionConstraintElem %type func_arg_list %type func_arg_expr *************** *** 645,650 **** static void processCASbits(int cas_bits, int location, const char *constrType, --- 645,651 ---- %left JOIN CROSS LEFT FULL RIGHT INNER_P NATURAL /* kluge to keep xml_whitespace_option from causing shift/reduce conflicts */ %right PRESERVE STRIP_P + %nonassoc EACH %% *************** *** 2715,2720 **** ColConstraintElem: --- 2716,2739 ---- n->fk_matchtype = $4; n->fk_upd_action = (char) ($5 >> 8); n->fk_del_action = (char) ($5 & 0xFF); + n->fk_is_each = false; + n->skip_validation = false; + n->initially_valid = true; + $$ = (Node *)n; + } + | EACH REFERENCES qualified_name opt_column_list + key_match key_actions + { + Constraint *n = makeNode(Constraint); + n->contype = CONSTR_FOREIGN; + n->location = @1; + n->pktable = $3; + n->fk_attrs = NIL; + n->pk_attrs = $4; + n->fk_matchtype = $5; + n->fk_upd_action = (char) ($6 >> 8); + n->fk_del_action = (char) ($6 & 0xFF); + n->fk_is_each = true; n->skip_validation = false; n->initially_valid = true; $$ = (Node *)n; *************** *** 2900,2907 **** ConstraintElem: yyscanner); $$ = (Node *)n; } ! | FOREIGN KEY '(' columnList ')' REFERENCES qualified_name ! opt_column_list key_match key_actions ConstraintAttributeSpec { Constraint *n = makeNode(Constraint); n->contype = CONSTR_FOREIGN; --- 2919,2927 ---- yyscanner); $$ = (Node *)n; } ! | FOREIGN KEY '(' foreignKeyColumnList ')' REFERENCES ! qualified_name opt_column_list key_match key_actions ! ConstraintAttributeSpec { Constraint *n = makeNode(Constraint); n->contype = CONSTR_FOREIGN; *************** *** 2921,2926 **** ConstraintElem: --- 2941,2971 ---- } ; + + foreignKeyColumnList: + foreignKeyColumnElem + { $$ = list_make1($1); } + | foreignKeyColumnList ',' foreignKeyColumnElem + { $$ = lappend($1, $3); } + ; + + foreignKeyColumnElem: + EACH ColId + { + ForeignKeyColumnElem *n = makeNode(ForeignKeyColumnElem); + n->name = (Node *) makeString($2); + n->each = true; + $$ = (Node *) n; + } + | ColId + { + ForeignKeyColumnElem *n = makeNode(ForeignKeyColumnElem); + n->name = (Node *) makeString($1); + n->each = false; + $$ = (Node *) n; + } + ; + opt_column_list: '(' columnList ')' { $$ = $2; } | /*EMPTY*/ { $$ = NIL; } *************** *** 3012,3017 **** key_action: --- 3057,3064 ---- | CASCADE { $$ = FKCONSTR_ACTION_CASCADE; } | SET NULL_P { $$ = FKCONSTR_ACTION_SETNULL; } | SET DEFAULT { $$ = FKCONSTR_ACTION_SETDEFAULT; } + | EACH CASCADE { $$ = FKCONSTR_ACTION_EACHCASCADE; } + | EACH SET NULL_P { $$ = FKCONSTR_ACTION_EACHSETNULL; } ; OptInherit: INHERITS '(' qualified_name_list ')' { $$ = $3; } *** a/src/backend/parser/parse_utilcmd.c --- b/src/backend/parser/parse_utilcmd.c *************** *** 536,541 **** transformColumnDefinition(CreateStmtContext *cxt, ColumnDef *column) --- 536,543 ---- * list of FK constraints to be processed later. */ constraint->fk_attrs = list_make1(makeString(column->colname)); + constraint->fk_each_attrs = list_make1(makeInteger( + constraint->fk_is_each)); cxt->fkconstraints = lappend(cxt->fkconstraints, constraint); break; *************** *** 599,604 **** transformTableConstraint(CreateStmtContext *cxt, Constraint *constraint) --- 601,633 ---- break; case CONSTR_FOREIGN: + /* + * Split the content of foreignKeyColumnList + * in two separate list. One list of fields + * and one list of boolean values. + */ + { + ListCell *i; + List *old_fk_attrs = constraint->fk_attrs; + + constraint->fk_attrs = NIL; + constraint->fk_is_each = false; + constraint->fk_each_attrs = NIL; + foreach (i, old_fk_attrs) + { + ForeignKeyColumnElem *elem = + (ForeignKeyColumnElem *)lfirst(i); + + Assert(IsA(elem, ForeignKeyColumnElem)); + constraint->fk_attrs = + lappend(constraint->fk_attrs, elem->name); + constraint->fk_is_each |= elem->each; + constraint->fk_each_attrs = + lappend(constraint->fk_each_attrs, + makeInteger(elem->each)); + } + } + cxt->fkconstraints = lappend(cxt->fkconstraints, constraint); break; *** a/src/backend/utils/adt/arrayfuncs.c --- b/src/backend/utils/adt/arrayfuncs.c *************** *** 5174,5176 **** array_unnest(PG_FUNCTION_ARGS) --- 5174,5606 ---- SRF_RETURN_DONE(funcctx); } } + + /* + * Remove any occurrence of an element from an array + * + * If used on a multi-dimensional array it will raise an error. + * + */ + Datum + array_remove(PG_FUNCTION_ARGS) + { + ArrayType *v; + Datum old_value = PG_GETARG_DATUM(1); + bool old_value_isnull = PG_ARGISNULL(1); + Oid element_type; + ArrayType *result; + Datum *values; + bool *nulls; + Datum elt; + int ndim; + int *dim; + int nitems; + int nresult; + int i; + int32 nbytes = 0; + int32 dataoffset; + bool hasnulls; + int typlen; + bool typbyval; + char typalign; + char *s; + bits8 *bitmap; + int bitmask; + Oid collation = PG_GET_COLLATION(); + bool changed = false; + TypeCacheEntry *typentry; + FunctionCallInfoData locfcinfo; + + /* + * If the first argument is null + * return NULL + */ + if (PG_ARGISNULL(0)) + PG_RETURN_NULL(); + + v = PG_GETARG_ARRAYTYPE_P(0); + + ndim = ARR_NDIM(v); + + /* + * If used on a multi-dimensional array the matching elements + * will be replaced with NULLs as fallback. + */ + if (ndim > 1) { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("removing elements from multidimensional arrays" + " is not supported"))); + } + + dim = ARR_DIMS(v); + element_type = ARR_ELEMTYPE(v); + nitems = ArrayGetNItems(ndim, dim); + + /* Check for empty array */ + if (nitems <= 0) + { + /* Return empty array */ + PG_RETURN_ARRAYTYPE_P(construct_empty_array(element_type)); + } + + /* + * We arrange to look up the equality function only once per series of + * calls, assuming the element type doesn't change underneath us. + */ + typentry = (TypeCacheEntry *) fcinfo->flinfo->fn_extra; + if (typentry == NULL || + typentry->type_id != element_type) + { + typentry = lookup_type_cache(element_type, + TYPECACHE_EQ_OPR_FINFO); + if (!OidIsValid(typentry->eq_opr_finfo.fn_oid)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_FUNCTION), + errmsg("could not identify an equality operator for type %s", + format_type_be(element_type)))); + fcinfo->flinfo->fn_extra = (void *) typentry; + } + typlen = typentry->typlen; + typbyval = typentry->typbyval; + typalign = typentry->typalign; + + /* + * apply the operator to each pair of array elements. + */ + InitFunctionCallInfoData(locfcinfo, &typentry->eq_opr_finfo, 2, + collation, NULL, NULL); + + /* Allocate temporary arrays for new values */ + values = (Datum *) palloc(nitems * sizeof(Datum)); + nulls = (bool *) palloc(nitems * sizeof(bool)); + + /* Loop over source data */ + s = ARR_DATA_PTR(v); + bitmap = ARR_NULLBITMAP(v); + bitmask = 1; + hasnulls = false; + nresult=0; + + for (i = 0; i < nitems; i++) + { + bool isNull; + bool oprresult; + bool skip; + + /* Get source element, checking for NULL */ + if (bitmap && (*bitmap & bitmask) == 0) + { + isNull = true; + skip = old_value_isnull; + } + else + { + elt = fetch_att(s, typbyval, typlen); + s = att_addlength_datum(s, typlen, elt); + s = (char *) att_align_nominal(s, typalign); + isNull = false; + + /* + * Apply the operator to the element pair + */ + locfcinfo.arg[0] = elt; + locfcinfo.arg[1] = old_value; + locfcinfo.argnull[0] = false; + locfcinfo.argnull[1] = false; + locfcinfo.isnull = false; + oprresult = DatumGetBool(FunctionCallInvoke(&locfcinfo)); + if (!oprresult) + { + values[nresult] = elt; + skip = false; + } + else + skip = true; + } + + if (!skip) + { + nulls[nresult] = isNull; + if (isNull) + hasnulls = true; + else + { + /* Update total result size */ + nbytes = att_addlength_datum(nbytes, typlen, values[nresult]); + nbytes = att_align_nominal(nbytes, typalign); + /* This should never overflow */ + Assert(AllocSizeIsValid(nbytes)); + } + nresult++; + } + else + changed = true; + + /* advance bitmap pointer if any */ + if (bitmap) + { + bitmask <<= 1; + if (bitmask == 0x100) + { + bitmap++; + bitmask = 1; + } + } + } + + /* + * If not changed just return the original array + */ + if (!changed) + { + pfree(values); + pfree(nulls); + PG_RETURN_ARRAYTYPE_P(v); + } + + /* Allocate and initialize the result array */ + if (hasnulls) + { + dataoffset = ARR_OVERHEAD_WITHNULLS(ndim, nresult); + nbytes += dataoffset; + } + else + { + dataoffset = 0; /* marker for no null bitmap */ + nbytes += ARR_OVERHEAD_NONULLS(ndim); + } + result = (ArrayType *) palloc0(nbytes); + SET_VARSIZE(result, nbytes); + result->ndim = ndim; + result->dataoffset = dataoffset; + result->elemtype = element_type; + memcpy(ARR_DIMS(result), ARR_DIMS(v), 2 * ndim * sizeof(int)); + + /* Adjust the final length */ + ARR_DIMS(result)[0] = nresult; + + CopyArrayEls(result, + values, nulls, nresult, + typlen, typbyval, typalign, + false); + + pfree(values); + pfree(nulls); + + PG_RETURN_ARRAYTYPE_P(result); + } + + /* + * Replace any occurrence of an element in an array + */ + Datum + array_replace(PG_FUNCTION_ARGS) + { + ArrayType *v; + Datum old_value = PG_GETARG_DATUM(1); + bool old_value_isnull = PG_ARGISNULL(1); + Datum new_value = PG_GETARG_DATUM(2); + bool new_value_isnull = PG_ARGISNULL(2); + Oid element_type; + ArrayType *result; + Datum *values; + bool *nulls; + Datum elt; + int *dim; + int ndim; + int nitems; + int i; + int32 nbytes = 0; + int32 dataoffset; + bool hasnulls; + int typlen; + bool typbyval; + char typalign; + char *s; + bits8 *bitmap; + int bitmask; + bool changed = false; + Oid collation = PG_GET_COLLATION(); + + TypeCacheEntry *typentry; + FunctionCallInfoData locfcinfo; + + /* + * If the first argument is null + * return NULL + */ + if (PG_ARGISNULL(0)) + PG_RETURN_NULL(); + + v = PG_GETARG_ARRAYTYPE_P(0); + + ndim = ARR_NDIM(v); + dim = ARR_DIMS(v); + element_type = ARR_ELEMTYPE(v); + nitems = ArrayGetNItems(ndim, dim); + + /* Check for empty array */ + if (nitems <= 0) + { + /* Return empty array */ + PG_RETURN_ARRAYTYPE_P(construct_empty_array(element_type)); + } + + /* + * We arrange to look up the equality function only once per series of + * calls, assuming the element type doesn't change underneath us. + */ + typentry = (TypeCacheEntry *) fcinfo->flinfo->fn_extra; + if (typentry == NULL || + typentry->type_id != element_type) + { + typentry = lookup_type_cache(element_type, + TYPECACHE_EQ_OPR_FINFO); + if (!OidIsValid(typentry->eq_opr_finfo.fn_oid)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_FUNCTION), + errmsg("could not identify an equality operator for type %s", + format_type_be(element_type)))); + fcinfo->flinfo->fn_extra = (void *) typentry; + } + typlen = typentry->typlen; + typbyval = typentry->typbyval; + typalign = typentry->typalign; + + /* detoast new_value if necessary */ + if (typlen == -1 && !new_value_isnull) + new_value = PointerGetDatum(PG_DETOAST_DATUM(new_value)); + + /* + * apply the operator to each pair of array elements. + */ + InitFunctionCallInfoData(locfcinfo, &typentry->eq_opr_finfo, 2, + collation, NULL, NULL); + + /* Allocate temporary arrays for new values */ + values = (Datum *) palloc(nitems * sizeof(Datum)); + nulls = (bool *) palloc(nitems * sizeof(bool)); + + /* Loop over source data */ + s = ARR_DATA_PTR(v); + bitmap = ARR_NULLBITMAP(v); + bitmask = 1; + hasnulls = false; + + for (i = 0; i < nitems; i++) + { + bool isNull; + bool oprresult; + + /* Get source element, checking for NULL */ + if (bitmap && (*bitmap & bitmask) == 0) + { + if (old_value_isnull && !new_value_isnull) + { + values[i] = new_value; + isNull = false; + changed = true; + } + else + isNull = true; + } + else + { + elt = fetch_att(s, typbyval, typlen); + s = att_addlength_datum(s, typlen, elt); + s = (char *) att_align_nominal(s, typalign); + isNull = false; + + if (!old_value_isnull) + { + /* + * Apply the operator to the element pair + */ + locfcinfo.arg[0] = elt; + locfcinfo.arg[1] = old_value; + locfcinfo.argnull[0] = false; + locfcinfo.argnull[1] = false; + locfcinfo.isnull = false; + oprresult = DatumGetBool(FunctionCallInvoke(&locfcinfo)); + if (!oprresult) + values[i] = elt; + else + { + changed = true; + if (!new_value_isnull) + values[i] = new_value; + else + isNull = true; + } + } + else + values[i] = elt; + } + + nulls[i] = isNull; + if (isNull) + hasnulls = true; + else + { + /* Update total result size */ + nbytes = att_addlength_datum(nbytes, typlen, values[i]); + nbytes = att_align_nominal(nbytes, typalign); + /* check for overflow of total request */ + if (!AllocSizeIsValid(nbytes)) + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("array size exceeds the maximum allowed (%d)", + (int) MaxAllocSize))); + } + + /* advance bitmap pointer if any */ + if (bitmap) + { + bitmask <<= 1; + if (bitmask == 0x100) + { + bitmap++; + bitmask = 1; + } + } + } + + /* + * If not changed just return the original array + */ + if (!changed) + { + pfree(values); + pfree(nulls); + PG_RETURN_ARRAYTYPE_P(v); + } + + /* Allocate and initialize the result array */ + if (hasnulls) + { + dataoffset = ARR_OVERHEAD_WITHNULLS(ndim, nitems); + nbytes += dataoffset; + } + else + { + dataoffset = 0; /* marker for no null bitmap */ + nbytes += ARR_OVERHEAD_NONULLS(ndim); + } + result = (ArrayType *) palloc0(nbytes); + SET_VARSIZE(result, nbytes); + result->ndim = ndim; + result->dataoffset = dataoffset; + result->elemtype = element_type; + memcpy(ARR_DIMS(result), ARR_DIMS(v), 2 * ndim * sizeof(int)); + + CopyArrayEls(result, + values, nulls, nitems, + typlen, typbyval, typalign, + false); + + pfree(values); + pfree(nulls); + + PG_RETURN_ARRAYTYPE_P(result); + } *** a/src/backend/utils/adt/ri_triggers.c --- b/src/backend/utils/adt/ri_triggers.c *************** *** 67,82 **** #define RI_KEYS_NONE_NULL 2 /* queryno values must be distinct for the convenience of ri_PerformCheck */ ! #define RI_PLAN_CHECK_LOOKUPPK_NOCOLS 1 ! #define RI_PLAN_CHECK_LOOKUPPK 2 ! #define RI_PLAN_CASCADE_DEL_DODELETE 3 ! #define RI_PLAN_CASCADE_UPD_DOUPDATE 4 ! #define RI_PLAN_NOACTION_DEL_CHECKREF 5 ! #define RI_PLAN_NOACTION_UPD_CHECKREF 6 ! #define RI_PLAN_RESTRICT_DEL_CHECKREF 7 ! #define RI_PLAN_RESTRICT_UPD_CHECKREF 8 ! #define RI_PLAN_SETNULL_DEL_DOUPDATE 9 ! #define RI_PLAN_SETNULL_UPD_DOUPDATE 10 #define MAX_QUOTED_NAME_LEN (NAMEDATALEN*2+3) #define MAX_QUOTED_REL_NAME_LEN (MAX_QUOTED_NAME_LEN*2) --- 67,87 ---- #define RI_KEYS_NONE_NULL 2 /* queryno values must be distinct for the convenience of ri_PerformCheck */ ! #define RI_PLAN_CHECK_LOOKUPPK_NOCOLS 1 ! #define RI_PLAN_CHECK_LOOKUPPK 2 ! #define RI_PLAN_CASCADE_DEL_DODELETE 3 ! #define RI_PLAN_CASCADE_UPD_DOUPDATE 4 ! #define RI_PLAN_NOACTION_DEL_CHECKREF 5 ! #define RI_PLAN_NOACTION_UPD_CHECKREF 6 ! #define RI_PLAN_RESTRICT_DEL_CHECKREF 7 ! #define RI_PLAN_RESTRICT_UPD_CHECKREF 8 ! #define RI_PLAN_SETNULL_DEL_DOUPDATE 9 ! #define RI_PLAN_SETNULL_UPD_DOUPDATE 10 ! #define RI_PLAN_EACHCASCADE_DEL_CHECKREF 11 ! #define RI_PLAN_EACHCASCADE_DEL_DOUPDATE 12 ! #define RI_PLAN_EACHCASCADE_UPD_DOUPDATE 13 ! #define RI_PLAN_EACHSETNULL_DEL_DOUPDATE 14 ! #define RI_PLAN_EACHSETNULL_UPD_DOUPDATE 15 #define MAX_QUOTED_NAME_LEN (NAMEDATALEN*2+3) #define MAX_QUOTED_REL_NAME_LEN (MAX_QUOTED_NAME_LEN*2) *************** *** 106,117 **** typedef struct RI_ConstraintInfo --- 111,125 ---- NameData conname; /* name of the FK constraint */ Oid pk_relid; /* referenced relation */ Oid fk_relid; /* referencing relation */ + bool confiseach; /* is an EACH FK */ char confupdtype; /* foreign key's ON UPDATE action */ char confdeltype; /* foreign key's ON DELETE action */ char confmatchtype; /* foreign key's match type */ int nkeys; /* number of key columns */ int16 pk_attnums[RI_MAX_NUMKEYS]; /* attnums of referenced cols */ int16 fk_attnums[RI_MAX_NUMKEYS]; /* attnums of referencing cols */ + bool fk_each_atts[RI_MAX_NUMKEYS]; /* referencing cols is + * an EACH FK */ Oid pf_eq_oprs[RI_MAX_NUMKEYS]; /* equality operators (PK = * FK) */ Oid pp_eq_oprs[RI_MAX_NUMKEYS]; /* equality operators (PK = *************** *** 194,200 **** static void ri_GenerateQual(StringInfo buf, const char *sep, const char *leftop, Oid leftoptype, Oid opoid, ! const char *rightop, Oid rightoptype); static void ri_add_cast_to(StringInfo buf, Oid typid); static void ri_GenerateQualCollation(StringInfo buf, Oid collation); static int ri_NullCheck(Relation rel, HeapTuple tup, --- 202,209 ---- const char *sep, const char *leftop, Oid leftoptype, Oid opoid, ! const char *rightop, Oid rightoptype, ! bool is_array); static void ri_add_cast_to(StringInfo buf, Oid typid); static void ri_GenerateQualCollation(StringInfo buf, Oid collation); static int ri_NullCheck(Relation rel, HeapTuple tup, *************** *** 455,460 **** RI_FKey_check(PG_FUNCTION_ARGS) --- 464,470 ---- if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL) { StringInfoData querybuf; + StringInfoData countbuf; char pkrelname[MAX_QUOTED_REL_NAME_LEN]; char attname[MAX_QUOTED_NAME_LEN]; char paramname[16]; *************** *** 466,477 **** RI_FKey_check(PG_FUNCTION_ARGS) --- 476,499 ---- * SELECT 1 FROM ONLY WHERE pkatt1 = $1 [AND ...] FOR SHARE * The type id's for the $ parameters are those of the * corresponding FK attributes. + * + * In case of an EACH foreign key, the previous query is used to count + * the number of matching rows and see if every combination is + * actually referenced. + * The wrapping query is + * SELECT 1 WHERE 1 * + * (SELECT count(DISTINCT y) FROM UNNEST($1) y WHERE y IS NOT NULL) + * [ * ...] = (SELECT count(*) FROM () z) * ---------- */ initStringInfo(&querybuf); quoteRelationName(pkrelname, pk_rel); appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x", pkrelname); querysep = "WHERE"; + if (riinfo.confiseach) { + initStringInfo(&countbuf); + appendStringInfo(&countbuf, "SELECT 1 WHERE 1"); + } for (i = 0; i < riinfo.nkeys; i++) { Oid pk_type = RIAttType(pk_rel, riinfo.pk_attnums[i]); *************** *** 480,497 **** RI_FKey_check(PG_FUNCTION_ARGS) quoteOneName(attname, RIAttName(pk_rel, riinfo.pk_attnums[i])); sprintf(paramname, "$%d", i + 1); ri_GenerateQual(&querybuf, querysep, attname, pk_type, riinfo.pf_eq_oprs[i], ! paramname, fk_type); querysep = "AND"; queryoids[i] = fk_type; } appendStringInfo(&querybuf, " FOR SHARE OF x"); ! /* Prepare and save the plan */ ! qplan = ri_PlanCheck(querybuf.data, riinfo.nkeys, queryoids, ! &qkey, fk_rel, pk_rel, true); } /* --- 502,540 ---- quoteOneName(attname, RIAttName(pk_rel, riinfo.pk_attnums[i])); sprintf(paramname, "$%d", i + 1); + /* + * In case of an EACH foreign key, we check that every + * DISTINCT NOT NULL value in the array is present in the PK table. + */ + if (riinfo.fk_each_atts[i]) + { + appendStringInfo(&countbuf, + " * (SELECT count(DISTINCT y) " + "FROM UNNEST(%s) y WHERE y IS NOT NULL)" + , paramname); + } ri_GenerateQual(&querybuf, querysep, attname, pk_type, riinfo.pf_eq_oprs[i], ! paramname, fk_type, ! riinfo.fk_each_atts[i]); querysep = "AND"; queryoids[i] = fk_type; } appendStringInfo(&querybuf, " FOR SHARE OF x"); ! if (riinfo.confiseach) { ! appendStringInfo(&countbuf, " = " ! "(SELECT count(*) FROM (%s) z)", querybuf.data); ! ! /* Prepare and save the plan for each foreign keys */ ! qplan = ri_PlanCheck(countbuf.data, riinfo.nkeys, queryoids, ! &qkey, fk_rel, pk_rel, true); ! } ! else ! /* Prepare and save the plan for standard foreign keys */ ! qplan = ri_PlanCheck(querybuf.data, riinfo.nkeys, queryoids, ! &qkey, fk_rel, pk_rel, true); } /* *************** *** 644,650 **** ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel, ri_GenerateQual(&querybuf, querysep, attname, pk_type, riinfo->pp_eq_oprs[i], ! paramname, pk_type); querysep = "AND"; queryoids[i] = pk_type; } --- 687,694 ---- ri_GenerateQual(&querybuf, querysep, attname, pk_type, riinfo->pp_eq_oprs[i], ! paramname, pk_type, ! false); querysep = "AND"; queryoids[i] = pk_type; } *************** *** 801,807 **** RI_FKey_noaction_del(PG_FUNCTION_ARGS) ri_GenerateQual(&querybuf, querysep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type); querysep = "AND"; queryoids[i] = pk_type; } --- 845,852 ---- ri_GenerateQual(&querybuf, querysep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type, ! riinfo.fk_each_atts[i]); querysep = "AND"; queryoids[i] = pk_type; } *************** *** 989,995 **** RI_FKey_noaction_upd(PG_FUNCTION_ARGS) ri_GenerateQual(&querybuf, querysep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type); querysep = "AND"; queryoids[i] = pk_type; } --- 1034,1041 ---- ri_GenerateQual(&querybuf, querysep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type, ! riinfo.fk_each_atts[i]); querysep = "AND"; queryoids[i] = pk_type; } *************** *** 1151,1157 **** RI_FKey_cascade_del(PG_FUNCTION_ARGS) ri_GenerateQual(&querybuf, querysep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type); querysep = "AND"; queryoids[i] = pk_type; } --- 1197,1204 ---- ri_GenerateQual(&querybuf, querysep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type, ! riinfo.fk_each_atts[i]); querysep = "AND"; queryoids[i] = pk_type; } *************** *** 1337,1343 **** RI_FKey_cascade_upd(PG_FUNCTION_ARGS) ri_GenerateQual(&qualbuf, qualsep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type); querysep = ","; qualsep = "AND"; queryoids[i] = pk_type; --- 1384,1391 ---- ri_GenerateQual(&qualbuf, qualsep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type, ! false); querysep = ","; qualsep = "AND"; queryoids[i] = pk_type; *************** *** 1510,1516 **** RI_FKey_restrict_del(PG_FUNCTION_ARGS) ri_GenerateQual(&querybuf, querysep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type); querysep = "AND"; queryoids[i] = pk_type; } --- 1558,1565 ---- ri_GenerateQual(&querybuf, querysep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type, ! riinfo.fk_each_atts[i]); querysep = "AND"; queryoids[i] = pk_type; } *************** *** 1693,1699 **** RI_FKey_restrict_upd(PG_FUNCTION_ARGS) ri_GenerateQual(&querybuf, querysep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type); querysep = "AND"; queryoids[i] = pk_type; } --- 1742,1749 ---- ri_GenerateQual(&querybuf, querysep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type, ! riinfo.fk_each_atts[i]); querysep = "AND"; queryoids[i] = pk_type; } *************** *** 1863,1869 **** RI_FKey_setnull_del(PG_FUNCTION_ARGS) ri_GenerateQual(&qualbuf, qualsep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type); querysep = ","; qualsep = "AND"; queryoids[i] = pk_type; --- 1913,1920 ---- ri_GenerateQual(&qualbuf, qualsep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type, ! riinfo.fk_each_atts[i]); querysep = ","; qualsep = "AND"; queryoids[i] = pk_type; *************** *** 2076,2082 **** RI_FKey_setnull_upd(PG_FUNCTION_ARGS) ri_GenerateQual(&qualbuf, qualsep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type); qualsep = "AND"; queryoids[i] = pk_type; } --- 2127,2134 ---- ri_GenerateQual(&qualbuf, qualsep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type, ! riinfo.fk_each_atts[i]); qualsep = "AND"; queryoids[i] = pk_type; } *************** *** 2251,2257 **** RI_FKey_setdefault_del(PG_FUNCTION_ARGS) ri_GenerateQual(&qualbuf, qualsep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type); querysep = ","; qualsep = "AND"; queryoids[i] = pk_type; --- 2303,2310 ---- ri_GenerateQual(&qualbuf, qualsep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type, ! riinfo.fk_each_atts[i]); querysep = ","; qualsep = "AND"; queryoids[i] = pk_type; *************** *** 2455,2461 **** RI_FKey_setdefault_upd(PG_FUNCTION_ARGS) ri_GenerateQual(&qualbuf, qualsep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type); qualsep = "AND"; queryoids[i] = pk_type; } --- 2508,2515 ---- ri_GenerateQual(&qualbuf, qualsep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type, ! riinfo.fk_each_atts[i]); qualsep = "AND"; queryoids[i] = pk_type; } *************** *** 2512,2596 **** RI_FKey_setdefault_upd(PG_FUNCTION_ARGS) /* ---------- ! * RI_FKey_keyequal_upd_pk - * ! * Check if we have a key change on an update to a PK relation. This is ! * used by the AFTER trigger queue manager to see if it can skip queuing ! * an instance of an RI trigger. * ---------- */ ! bool ! RI_FKey_keyequal_upd_pk(Trigger *trigger, Relation pk_rel, ! HeapTuple old_row, HeapTuple new_row) { RI_ConstraintInfo riinfo; /* * Get arguments. */ ! ri_FetchConstraintInfo(&riinfo, trigger, pk_rel, true); /* * Nothing to do if no column names to compare given */ if (riinfo.nkeys == 0) ! return true; switch (riinfo.confmatchtype) { case FKCONSTR_MATCH_UNSPECIFIED: case FKCONSTR_MATCH_FULL: ! /* Return true if keys are equal */ ! return ri_KeysEqual(pk_rel, old_row, new_row, &riinfo, true); ! /* Handle MATCH PARTIAL set null delete. */ case FKCONSTR_MATCH_PARTIAL: ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("MATCH PARTIAL not yet implemented"))); ! break; } ! /* Never reached */ elog(ERROR, "invalid confmatchtype"); ! return false; } /* ---------- ! * RI_FKey_keyequal_upd_fk - * ! * Check if we have a key change on an update to an FK relation. This is ! * used by the AFTER trigger queue manager to see if it can skip queuing ! * an instance of an RI trigger. * ---------- */ ! bool ! RI_FKey_keyequal_upd_fk(Trigger *trigger, Relation fk_rel, ! HeapTuple old_row, HeapTuple new_row) { RI_ConstraintInfo riinfo; /* * Get arguments. */ ! ri_FetchConstraintInfo(&riinfo, trigger, fk_rel, false); /* * Nothing to do if no column names to compare given */ if (riinfo.nkeys == 0) ! return true; ! switch (riinfo.confmatchtype) ! { ! case FKCONSTR_MATCH_UNSPECIFIED: ! case FKCONSTR_MATCH_FULL: ! /* Return true if keys are equal */ ! return ri_KeysEqual(fk_rel, old_row, new_row, &riinfo, false); ! /* Handle MATCH PARTIAL set null delete. */ ! case FKCONSTR_MATCH_PARTIAL: ! ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("MATCH PARTIAL not yet implemented"))); break; --- 2566,3565 ---- /* ---------- ! * RI_FKey_eachcascade_del - * ! * Cascaded delete of array elements in "each" foreign key ! * references at delete event on PK table. * ---------- */ ! Datum ! RI_FKey_eachcascade_del(PG_FUNCTION_ARGS) { + TriggerData *trigdata = (TriggerData *) fcinfo->context; RI_ConstraintInfo riinfo; + Relation fk_rel; + Relation pk_rel; + HeapTuple old_row; + RI_QueryKey qkey; + SPIPlanPtr qplan; + int i; + + /* + * Check that this is a valid trigger call on the right time and event. + */ + ri_CheckTrigger(fcinfo, "RI_FKey_eachcascade_del", RI_TRIGTYPE_DELETE); /* * Get arguments. */ ! ri_FetchConstraintInfo(&riinfo, ! trigdata->tg_trigger, trigdata->tg_relation, true); /* * Nothing to do if no column names to compare given */ if (riinfo.nkeys == 0) ! return PointerGetDatum(NULL); ! ! /* ! * EACH CASCADE actions are only available ! * on single-column EACH foreign keys ! */ ! if (riinfo.nkeys > 1) ! ereport(ERROR, ! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! errmsg("EACH CASCADE and EACH SET NULL actions are only " ! "available on single-column EACH foreign keys"))); ! ! /* ! * Get the relation descriptors of the FK and PK tables and the old tuple. ! * ! * fk_rel is opened in RowExclusiveLock mode since that's what our ! * eventual UPDATE will get on it. ! */ ! fk_rel = heap_open(riinfo.fk_relid, RowExclusiveLock); ! pk_rel = trigdata->tg_relation; ! old_row = trigdata->tg_trigtuple; switch (riinfo.confmatchtype) { + /* ---------- + * SQL3 11.9 + * Gereral rules 6) a) i): + * MATCH or MATCH FULL + * ... ON DELETE CASCADE + * ---------- + */ case FKCONSTR_MATCH_UNSPECIFIED: case FKCONSTR_MATCH_FULL: ! ri_BuildQueryKeyFull(&qkey, &riinfo, ! RI_PLAN_EACHCASCADE_DEL_CHECKREF); ! switch (ri_NullCheck(pk_rel, old_row, &qkey, RI_KEYPAIR_PK_IDX)) ! { ! case RI_KEYS_ALL_NULL: ! case RI_KEYS_SOME_NULL: ! ! /* ! * No update - MATCH FULL means there cannot be any ! * reference to old key if it contains NULL ! */ ! heap_close(fk_rel, RowExclusiveLock); ! return PointerGetDatum(NULL); ! ! case RI_KEYS_NONE_NULL: ! ! /* ! * Have a full qualified key - continue below ! */ ! break; ! } ! ! if (SPI_connect() != SPI_OK_CONNECT) ! elog(ERROR, "SPI_connect failed"); ! ! /* ! * Fetch or prepare a saved plan for lookup if ! * foreign references exist and have ndims > 1 ! */ ! if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL) ! { ! StringInfoData querybuf; ! char fkrelname[MAX_QUOTED_REL_NAME_LEN]; ! char attname[MAX_QUOTED_NAME_LEN]; ! char paramname[16]; ! const char *querysep; ! Oid queryoids[RI_MAX_NUMKEYS]; ! ! /* ---------- ! * The query string built is ! * SELECT 1 FROM ONLY WHERE $1 = fkatt1 ! * AND array_ndims(fkatt1) > 1 [AND ...] ! * The type id's for the $ parameters are those of the ! * corresponding PK attributes. ! * ---------- ! * ! * NOTE: The following loop is currently limited to one ! * iteration, as only single-column EACH foreign keys are ! * currently supported. It has been kept for consistency ! * with other RI triggers and to prepare future ! * implementations on multiple column EACH foreign keys. ! */ ! initStringInfo(&querybuf); ! quoteRelationName(fkrelname, fk_rel); ! appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x", ! fkrelname); ! querysep = "WHERE"; ! for (i = 0; i < riinfo.nkeys; i++) ! { ! Oid pk_type = RIAttType(pk_rel, riinfo.pk_attnums[i]); ! Oid fk_type = RIAttType(fk_rel, riinfo.fk_attnums[i]); ! ! quoteOneName(attname, ! RIAttName(fk_rel, riinfo.fk_attnums[i])); ! sprintf(paramname, "$%d", i + 1); ! ri_GenerateQual(&querybuf, querysep, ! paramname, pk_type, ! riinfo.pf_eq_oprs[i], ! attname, fk_type, ! riinfo.fk_each_atts[i]); ! appendStringInfo(&querybuf, " AND array_ndims(%s) > 1", attname); ! querysep = "AND"; ! queryoids[i] = pk_type; ! } ! appendStringInfo(&querybuf, " FOR SHARE OF x"); ! ! /* Prepare and save the plan */ ! qplan = ri_PlanCheck(querybuf.data, riinfo.nkeys, queryoids, ! &qkey, fk_rel, pk_rel, true); ! } ! ! /* ! * We have a plan now. Run it to check for existing references. ! */ ! ri_PerformCheck(&qkey, qplan, ! fk_rel, pk_rel, ! old_row, NULL, ! true, /* must detect new rows */ ! SPI_OK_SELECT, ! NameStr(riinfo.conname)); ! ! ri_BuildQueryKeyFull(&qkey, &riinfo, ! RI_PLAN_EACHCASCADE_DEL_DOUPDATE); ! ! /* ! * Fetch or prepare a saved plan for the cascade delete operation ! */ ! if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL) ! { ! StringInfoData querybuf; ! StringInfoData qualbuf; ! char fkrelname[MAX_QUOTED_REL_NAME_LEN]; ! char attname[MAX_QUOTED_NAME_LEN]; ! char paramname[16]; ! const char *querysep; ! const char *qualsep; ! Oid queryoids[RI_MAX_NUMKEYS]; ! ! /* ---------- ! * The query string built is ! * UPDATE ONLY SET fkatt1 = array_remove(fkatt1, $1)[, ...] ! * WHERE $1 = ANY(fkatt1) [AND ...] ! * The type id's for the $ parameters are those of the ! * corresponding PK attributes. ! * ---------- ! * ! * NOTE: The following loop is currently limited to one ! * iteration, as only single-column EACH foreign keys are ! * currently supported. It has been kept for consistency ! * with other RI triggers and to prepare future ! * implementations on multiple column EACH foreign keys. ! */ ! initStringInfo(&querybuf); ! initStringInfo(&qualbuf); ! quoteRelationName(fkrelname, fk_rel); ! appendStringInfo(&querybuf, "UPDATE ONLY %s SET", fkrelname); ! querysep = ""; ! qualsep = "WHERE"; ! for (i = 0; i < riinfo.nkeys; i++) ! { ! Oid pk_type = RIAttType(pk_rel, riinfo.pk_attnums[i]); ! Oid fk_type = RIAttType(fk_rel, riinfo.fk_attnums[i]); ! Oid fk_element_type = get_base_element_type(fk_type); ! ! quoteOneName(attname, ! RIAttName(fk_rel, riinfo.fk_attnums[i])); ! appendStringInfo(&querybuf, ! "%s %s = array_remove(%s, $%d", ! querysep, attname, attname, i + 1); ! if (pk_type != fk_element_type) ! ri_add_cast_to(&querybuf, fk_element_type); ! appendStringInfo(&querybuf, ! ")"); ! sprintf(paramname, "$%d", i + 1); ! ri_GenerateQual(&qualbuf, qualsep, ! paramname, pk_type, ! riinfo.pf_eq_oprs[i], ! attname, fk_type, ! riinfo.fk_each_atts[i]); ! querysep = ","; ! qualsep = "AND"; ! queryoids[i] = pk_type; ! } ! appendStringInfoString(&querybuf, qualbuf.data); ! ! /* Prepare and save the plan */ ! qplan = ri_PlanCheck(querybuf.data, riinfo.nkeys, queryoids, ! &qkey, fk_rel, pk_rel, true); ! } ! ! /* ! * We have a plan now. Run it to update the existing references. ! */ ! ri_PerformCheck(&qkey, qplan, ! fk_rel, pk_rel, ! old_row, NULL, ! true, /* must detect new rows */ ! SPI_OK_UPDATE, ! NameStr(riinfo.conname)); ! ! if (SPI_finish() != SPI_OK_FINISH) ! elog(ERROR, "SPI_finish failed"); ! ! heap_close(fk_rel, RowExclusiveLock); ! ! return PointerGetDatum(NULL); ! ! /* ! * Handle MATCH PARTIAL cascade update. ! */ case FKCONSTR_MATCH_PARTIAL: ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("MATCH PARTIAL not yet implemented"))); ! return PointerGetDatum(NULL); } ! /* ! * Never reached ! */ elog(ERROR, "invalid confmatchtype"); ! return PointerGetDatum(NULL); } + /* ---------- ! * RI_FKey_eachcascade_upd - * ! * Cascaded update of array elements in "each" foreign key ! * references at update event on PK table. * ---------- */ ! Datum ! RI_FKey_eachcascade_upd(PG_FUNCTION_ARGS) { + TriggerData *trigdata = (TriggerData *) fcinfo->context; RI_ConstraintInfo riinfo; + Relation fk_rel; + Relation pk_rel; + HeapTuple new_row; + HeapTuple old_row; + RI_QueryKey qkey; + SPIPlanPtr qplan; + int i; + int j; + + /* + * Check that this is a valid trigger call on the right time and event. + */ + ri_CheckTrigger(fcinfo, "RI_FKey_eachcascade_upd", RI_TRIGTYPE_UPDATE); /* * Get arguments. */ ! ri_FetchConstraintInfo(&riinfo, ! trigdata->tg_trigger, trigdata->tg_relation, true); /* * Nothing to do if no column names to compare given */ if (riinfo.nkeys == 0) ! return PointerGetDatum(NULL); ! /* ! * EACH CASCADE actions are only available ! * on single-column EACH foreign keys ! */ ! if (riinfo.nkeys > 1) ! ereport(ERROR, ! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! errmsg("EACH CASCADE and EACH SET NULL actions are only " ! "available on single-column EACH foreign keys"))); ! /* ! * Get the relation descriptors of the FK and PK tables and the new and ! * old tuple. ! * ! * fk_rel is opened in RowExclusiveLock mode since that's what our ! * eventual UPDATE will get on it. ! */ ! fk_rel = heap_open(riinfo.fk_relid, RowExclusiveLock); ! pk_rel = trigdata->tg_relation; ! new_row = trigdata->tg_newtuple; ! old_row = trigdata->tg_trigtuple; ! ! switch (riinfo.confmatchtype) ! { ! /* ---------- ! * SQL3 11.9 ! * Gereral rules 7) a) i): ! * MATCH or MATCH FULL ! * ... ON UPDATE CASCADE ! * ---------- ! */ ! case FKCONSTR_MATCH_UNSPECIFIED: ! case FKCONSTR_MATCH_FULL: ! ri_BuildQueryKeyFull(&qkey, &riinfo, ! RI_PLAN_EACHCASCADE_UPD_DOUPDATE); ! ! switch (ri_NullCheck(pk_rel, old_row, &qkey, RI_KEYPAIR_PK_IDX)) ! { ! case RI_KEYS_ALL_NULL: ! case RI_KEYS_SOME_NULL: ! ! /* ! * No update - MATCH FULL means there cannot be any ! * reference to old key if it contains NULL ! */ ! heap_close(fk_rel, RowExclusiveLock); ! return PointerGetDatum(NULL); ! ! case RI_KEYS_NONE_NULL: ! ! /* ! * Have a full qualified key - continue below ! */ ! break; ! } ! ! /* ! * No need to do anything if old and new keys are equal ! */ ! if (ri_KeysEqual(pk_rel, old_row, new_row, &riinfo, true)) ! { ! heap_close(fk_rel, RowExclusiveLock); ! return PointerGetDatum(NULL); ! } ! ! if (SPI_connect() != SPI_OK_CONNECT) ! elog(ERROR, "SPI_connect failed"); ! ! /* ! * Fetch or prepare a saved plan for the cascaded update of ! * foreign references ! */ ! if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL) ! { ! StringInfoData querybuf; ! StringInfoData qualbuf; ! char fkrelname[MAX_QUOTED_REL_NAME_LEN]; ! char attname[MAX_QUOTED_NAME_LEN]; ! char paramname[16]; ! const char *querysep; ! const char *qualsep; ! Oid queryoids[RI_MAX_NUMKEYS * 2]; ! ! /* ---------- ! * The query string built is ! * UPDATE ONLY ! * SET fkatt1 = array_replace(fkatt1, $n, $1) [, ...] ! * WHERE $n = fkatt1 [AND ...] ! * The type id's for the $ parameters are those of the ! * corresponding PK attributes. ! * ---------- ! * ! * NOTE: The following loop is currently limited to one ! * iteration, as only single-column EACH foreign keys are ! * currently supported. It has been kept for consistency ! * with other RI triggers and to prepare future ! * implementations on multiple column EACH foreign keys. ! */ ! initStringInfo(&querybuf); ! initStringInfo(&qualbuf); ! quoteRelationName(fkrelname, fk_rel); ! appendStringInfo(&querybuf, "UPDATE ONLY %s SET", fkrelname); ! querysep = ""; ! qualsep = "WHERE"; ! for (i = 0, j = riinfo.nkeys; i < riinfo.nkeys; i++, j++) ! { ! Oid pk_type = RIAttType(pk_rel, riinfo.pk_attnums[i]); ! Oid fk_type = RIAttType(fk_rel, riinfo.fk_attnums[i]); ! Oid fk_element_type = get_base_element_type(fk_type); ! ! quoteOneName(attname, ! RIAttName(fk_rel, riinfo.fk_attnums[i])); ! appendStringInfo(&querybuf, ! "%s %s = array_replace(%s, $%d", ! querysep, attname, attname, j + 1); ! if (pk_type != fk_element_type) ! ri_add_cast_to(&querybuf, fk_element_type); ! appendStringInfo(&querybuf, ", $%d", i + 1); ! if (pk_type != fk_element_type) ! ri_add_cast_to(&querybuf, fk_element_type); ! appendStringInfo(&querybuf,")"); ! sprintf(paramname, "$%d", j + 1); ! ri_GenerateQual(&qualbuf, qualsep, ! paramname, pk_type, ! riinfo.pf_eq_oprs[i], ! attname, fk_type, ! riinfo.fk_each_atts[i]); ! querysep = ","; ! qualsep = "AND"; ! queryoids[i] = pk_type; ! queryoids[j] = pk_type; ! } ! appendStringInfoString(&querybuf, qualbuf.data); ! ! /* Prepare and save the plan */ ! qplan = ri_PlanCheck(querybuf.data, riinfo.nkeys * 2, queryoids, ! &qkey, fk_rel, pk_rel, true); ! } ! ! /* ! * We have a plan now. Run it to update the existing references. ! */ ! ri_PerformCheck(&qkey, qplan, ! fk_rel, pk_rel, ! old_row, new_row, ! true, /* must detect new rows */ ! SPI_OK_UPDATE, ! NameStr(riinfo.conname)); ! ! if (SPI_finish() != SPI_OK_FINISH) ! elog(ERROR, "SPI_finish failed"); ! ! heap_close(fk_rel, RowExclusiveLock); ! ! return PointerGetDatum(NULL); ! ! /* ! * Handle MATCH PARTIAL cascade update. ! */ ! case FKCONSTR_MATCH_PARTIAL: ! ereport(ERROR, ! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! errmsg("MATCH PARTIAL not yet implemented"))); ! return PointerGetDatum(NULL); ! } ! ! /* ! * Never reached ! */ ! elog(ERROR, "invalid confmatchtype"); ! return PointerGetDatum(NULL); ! } ! ! ! /* ---------- ! * RI_FKey_eachsetnull_del - ! * ! * Set EACH foreign key element references to NULL values ! * at delete event on PK table. ! * ---------- ! */ ! Datum ! RI_FKey_eachsetnull_del(PG_FUNCTION_ARGS) ! { ! TriggerData *trigdata = (TriggerData *) fcinfo->context; ! RI_ConstraintInfo riinfo; ! Relation fk_rel; ! Relation pk_rel; ! HeapTuple old_row; ! RI_QueryKey qkey; ! SPIPlanPtr qplan; ! int i; ! ! /* ! * Check that this is a valid trigger call on the right time and event. ! */ ! ri_CheckTrigger(fcinfo, "RI_FKey_eachsetnull_del", RI_TRIGTYPE_DELETE); ! ! /* ! * Get arguments. ! */ ! ri_FetchConstraintInfo(&riinfo, ! trigdata->tg_trigger, trigdata->tg_relation, true); ! ! /* ! * Nothing to do if no column names to compare given ! */ ! if (riinfo.nkeys == 0) ! return PointerGetDatum(NULL); ! ! /* ! * EACH SET NULL actions are only available ! * on single-column EACH foreign keys ! */ ! if (riinfo.nkeys > 1) ! ereport(ERROR, ! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! errmsg("EACH CASCADE and EACH SET NULL actions are only " ! "available on single-column EACH foreign keys"))); ! ! /* ! * Get the relation descriptors of the FK and PK tables and the old tuple. ! * ! * fk_rel is opened in RowExclusiveLock mode since that's what our ! * eventual UPDATE will get on it. ! */ ! fk_rel = heap_open(riinfo.fk_relid, RowExclusiveLock); ! pk_rel = trigdata->tg_relation; ! old_row = trigdata->tg_trigtuple; ! ! switch (riinfo.confmatchtype) ! { ! /* ---------- ! * SQL3 11.9 ! * Gereral rules 6) a) ii): ! * MATCH or MATCH FULL ! * ... ON DELETE SET NULL ! * ---------- ! */ ! case FKCONSTR_MATCH_UNSPECIFIED: ! case FKCONSTR_MATCH_FULL: ! ri_BuildQueryKeyFull(&qkey, &riinfo, ! RI_PLAN_EACHSETNULL_DEL_DOUPDATE); ! ! switch (ri_NullCheck(pk_rel, old_row, &qkey, RI_KEYPAIR_PK_IDX)) ! { ! case RI_KEYS_ALL_NULL: ! case RI_KEYS_SOME_NULL: ! ! /* ! * No update - MATCH FULL means there cannot be any ! * reference to old key if it contains NULL ! */ ! heap_close(fk_rel, RowExclusiveLock); ! return PointerGetDatum(NULL); ! ! case RI_KEYS_NONE_NULL: ! ! /* ! * Have a full qualified key - continue below ! */ ! break; ! } ! ! if (SPI_connect() != SPI_OK_CONNECT) ! elog(ERROR, "SPI_connect failed"); ! ! /* ! * Fetch or prepare a saved plan for the set null delete operation ! */ ! if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL) ! { ! StringInfoData querybuf; ! StringInfoData qualbuf; ! char fkrelname[MAX_QUOTED_REL_NAME_LEN]; ! char attname[MAX_QUOTED_NAME_LEN]; ! char paramname[16]; ! const char *querysep; ! const char *qualsep; ! Oid queryoids[RI_MAX_NUMKEYS]; ! ! /* ---------- ! * The query string built is ! * UPDATE ONLY ! * SET fkatt1 = array_replace(fkatt1, $1, NULL) [, ...] ! * WHERE $1 = ANY(fkatt1) [AND ...] ! * The type id's for the $ parameters are those of the ! * corresponding PK attributes. ! * ---------- ! * ! * NOTE: The following loop is currently limited to one ! * iteration, as only single-column EACH foreign keys are ! * currently supported. It has been kept for consistency ! * with other RI triggers and to prepare future ! * implementations on multiple column EACH foreign keys. ! */ ! initStringInfo(&querybuf); ! initStringInfo(&qualbuf); ! quoteRelationName(fkrelname, fk_rel); ! appendStringInfo(&querybuf, "UPDATE ONLY %s SET", fkrelname); ! querysep = ""; ! qualsep = "WHERE"; ! for (i = 0; i < riinfo.nkeys; i++) ! { ! Oid pk_type = RIAttType(pk_rel, riinfo.pk_attnums[i]); ! Oid fk_type = RIAttType(fk_rel, riinfo.fk_attnums[i]); ! Oid fk_element_type = get_base_element_type(fk_type); ! ! quoteOneName(attname, ! RIAttName(fk_rel, riinfo.fk_attnums[i])); ! appendStringInfo(&querybuf, ! "%s %s = array_replace(%s, $%d", ! querysep, attname, attname, i + 1); ! if (pk_type != fk_element_type) ! ri_add_cast_to(&querybuf, fk_element_type); ! appendStringInfo(&querybuf, ", NULL"); ! if (pk_type != fk_element_type) ! ri_add_cast_to(&querybuf, fk_element_type); ! appendStringInfo(&querybuf,")"); ! sprintf(paramname, "$%d", i + 1); ! ri_GenerateQual(&qualbuf, qualsep, ! paramname, pk_type, ! riinfo.pf_eq_oprs[i], ! attname, fk_type, ! riinfo.fk_each_atts[i]); ! querysep = ","; ! qualsep = "AND"; ! queryoids[i] = pk_type; ! } ! appendStringInfoString(&querybuf, qualbuf.data); ! ! /* Prepare and save the plan */ ! qplan = ri_PlanCheck(querybuf.data, riinfo.nkeys, queryoids, ! &qkey, fk_rel, pk_rel, true); ! } ! ! /* ! * We have a plan now. Run it to update the existing references. ! */ ! ri_PerformCheck(&qkey, qplan, ! fk_rel, pk_rel, ! old_row, NULL, ! true, /* must detect new rows */ ! SPI_OK_UPDATE, ! NameStr(riinfo.conname)); ! ! if (SPI_finish() != SPI_OK_FINISH) ! elog(ERROR, "SPI_finish failed"); ! ! heap_close(fk_rel, RowExclusiveLock); ! ! return PointerGetDatum(NULL); ! ! /* ! * Handle MATCH PARTIAL set null delete. ! */ ! case FKCONSTR_MATCH_PARTIAL: ! ereport(ERROR, ! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! errmsg("MATCH PARTIAL not yet implemented"))); ! return PointerGetDatum(NULL); ! } ! ! /* ! * Never reached ! */ ! elog(ERROR, "invalid confmatchtype"); ! return PointerGetDatum(NULL); ! } ! ! ! /* ---------- ! * RI_FKey_eachsetnull_upd - ! * ! * Set EACH foreign key element references to NULL ! * at update event on PK table. ! * ---------- ! */ ! Datum ! RI_FKey_eachsetnull_upd(PG_FUNCTION_ARGS) ! { ! TriggerData *trigdata = (TriggerData *) fcinfo->context; ! RI_ConstraintInfo riinfo; ! Relation fk_rel; ! Relation pk_rel; ! HeapTuple new_row; ! HeapTuple old_row; ! RI_QueryKey qkey; ! SPIPlanPtr qplan; ! int i; ! bool use_cached_query; ! ! /* ! * Check that this is a valid trigger call on the right time and event. ! */ ! ri_CheckTrigger(fcinfo, "RI_FKey_eachsetnull_upd", RI_TRIGTYPE_UPDATE); ! ! /* ! * Get arguments. ! */ ! ri_FetchConstraintInfo(&riinfo, ! trigdata->tg_trigger, trigdata->tg_relation, true); ! ! /* ! * Nothing to do if no column names to compare given ! */ ! if (riinfo.nkeys == 0) ! return PointerGetDatum(NULL); ! ! /* ! * EACH CASCADE actions are only available ! * on single-column EACH foreign keys ! */ ! if (riinfo.nkeys > 1) ! ereport(ERROR, ! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! errmsg("EACH CASCADE and EACH SET NULL actions are only " ! "available on single-column EACH foreign keys"))); ! ! /* ! * Get the relation descriptors of the FK and PK tables and the new and ! * old tuple. ! * ! * fk_rel is opened in RowExclusiveLock mode since that's what our ! * eventual UPDATE will get on it. ! */ ! fk_rel = heap_open(riinfo.fk_relid, RowExclusiveLock); ! pk_rel = trigdata->tg_relation; ! new_row = trigdata->tg_newtuple; ! old_row = trigdata->tg_trigtuple; ! ! switch (riinfo.confmatchtype) ! { ! /* ---------- ! * SQL3 11.9 ! * Gereral rules 7) a) ii) 2): ! * MATCH FULL ! * ... ON UPDATE SET NULL ! * ---------- ! */ ! case FKCONSTR_MATCH_UNSPECIFIED: ! case FKCONSTR_MATCH_FULL: ! ri_BuildQueryKeyFull(&qkey, &riinfo, ! RI_PLAN_EACHSETNULL_UPD_DOUPDATE); ! ! switch (ri_NullCheck(pk_rel, old_row, &qkey, RI_KEYPAIR_PK_IDX)) ! { ! case RI_KEYS_ALL_NULL: ! case RI_KEYS_SOME_NULL: ! ! /* ! * No update - MATCH FULL means there cannot be any ! * reference to old key if it contains NULL ! */ ! heap_close(fk_rel, RowExclusiveLock); ! return PointerGetDatum(NULL); ! ! case RI_KEYS_NONE_NULL: ! ! /* ! * Have a full qualified key - continue below ! */ ! break; ! } ! ! /* ! * No need to do anything if old and new keys are equal ! */ ! if (ri_KeysEqual(pk_rel, old_row, new_row, &riinfo, true)) ! { ! heap_close(fk_rel, RowExclusiveLock); ! return PointerGetDatum(NULL); ! } ! ! if (SPI_connect() != SPI_OK_CONNECT) ! elog(ERROR, "SPI_connect failed"); ! ! /* ! * "MATCH " only changes columns corresponding to the ! * referenced columns that have changed in pk_rel. This means the ! * "SET attrn=NULL [, attrn=NULL]" string will be change as well. ! * In this case, we need to build a temporary plan rather than use ! * our cached plan, unless the update happens to change all ! * columns in the key. Fortunately, for the most common case of a ! * single-column foreign key, this will be true. ! * ! * In case you're wondering, the inequality check works because we ! * know that the old key value has no NULLs (see above). ! */ ! ! use_cached_query = (riinfo.confmatchtype == FKCONSTR_MATCH_FULL) || ! ri_AllKeysUnequal(pk_rel, old_row, new_row, ! &riinfo, true); ! ! /* ! * Fetch or prepare a saved plan for the set null update operation ! * if possible, or build a temporary plan if not. ! */ ! if (!use_cached_query || ! (qplan = ri_FetchPreparedPlan(&qkey)) == NULL) ! { ! StringInfoData querybuf; ! StringInfoData qualbuf; ! char fkrelname[MAX_QUOTED_REL_NAME_LEN]; ! char attname[MAX_QUOTED_NAME_LEN]; ! char paramname[16]; ! const char *querysep; ! const char *qualsep; ! Oid queryoids[RI_MAX_NUMKEYS]; ! ! /* ---------- ! * The query string built is ! * UPDATE ONLY ! * SET fkatt1 = array_replace(fkatt1, $1, NULL) [, ...] ! * WHERE $n = ANY(fkatt1) [AND ...] ! * The type id's for the $ parameters are those of the ! * corresponding PK attributes. ! * ---------- ! * ! * NOTE: The following loop is currently limited to one ! * iteration, as only single-column EACH foreign keys are ! * currently supported. It has been kept for consistency ! * with other RI triggers and to prepare future ! * implementations on multiple column EACH foreign keys. ! */ ! initStringInfo(&querybuf); ! initStringInfo(&qualbuf); ! quoteRelationName(fkrelname, fk_rel); ! appendStringInfo(&querybuf, "UPDATE ONLY %s SET", fkrelname); ! querysep = ""; ! qualsep = "WHERE"; ! for (i = 0; i < riinfo.nkeys; i++) ! { ! Oid pk_type = RIAttType(pk_rel, riinfo.pk_attnums[i]); ! Oid fk_type = RIAttType(fk_rel, riinfo.fk_attnums[i]); ! ! quoteOneName(attname, ! RIAttName(fk_rel, riinfo.fk_attnums[i])); ! ! /* ! * MATCH - only change columns corresponding ! * to changed columns in pk_rel's key ! */ ! if (riinfo.confmatchtype == FKCONSTR_MATCH_FULL || ! !ri_OneKeyEqual(pk_rel, i, old_row, new_row, ! &riinfo, true)) ! { ! Oid fk_element_type = get_base_element_type(fk_type); ! appendStringInfo(&querybuf, ! "%s %s = array_replace(%s, $%d", ! querysep, attname, attname, i + 1); ! if (pk_type != fk_element_type) ! ri_add_cast_to(&querybuf, fk_element_type); ! appendStringInfo(&querybuf, ", NULL"); ! if (pk_type != fk_element_type) ! ri_add_cast_to(&querybuf, fk_element_type); ! appendStringInfo(&querybuf,")"); ! querysep = ","; ! } ! sprintf(paramname, "$%d", i + 1); ! ri_GenerateQual(&qualbuf, qualsep, ! paramname, pk_type, ! riinfo.pf_eq_oprs[i], ! attname, fk_type, ! riinfo.fk_each_atts[i]); ! qualsep = "AND"; ! queryoids[i] = pk_type; ! } ! appendStringInfoString(&querybuf, qualbuf.data); ! ! /* ! * Prepare the plan. Save it only if we're building the ! * "standard" plan. ! */ ! qplan = ri_PlanCheck(querybuf.data, riinfo.nkeys, queryoids, ! &qkey, fk_rel, pk_rel, ! use_cached_query); ! } ! ! /* ! * We have a plan now. Run it to update the existing references. ! */ ! ri_PerformCheck(&qkey, qplan, ! fk_rel, pk_rel, ! old_row, NULL, ! true, /* must detect new rows */ ! SPI_OK_UPDATE, ! NameStr(riinfo.conname)); ! ! if (SPI_finish() != SPI_OK_FINISH) ! elog(ERROR, "SPI_finish failed"); ! ! heap_close(fk_rel, RowExclusiveLock); ! ! return PointerGetDatum(NULL); ! ! /* ! * Handle MATCH PARTIAL set null update. ! */ ! case FKCONSTR_MATCH_PARTIAL: ! ereport(ERROR, ! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! errmsg("MATCH PARTIAL not yet implemented"))); ! return PointerGetDatum(NULL); ! } ! ! /* ! * Never reached ! */ ! elog(ERROR, "invalid confmatchtype"); ! return PointerGetDatum(NULL); ! } ! ! ! /* ---------- ! * RI_FKey_keyequal_upd_pk - ! * ! * Check if we have a key change on an update to a PK relation. This is ! * used by the AFTER trigger queue manager to see if it can skip queuing ! * an instance of an RI trigger. ! * ---------- ! */ ! bool ! RI_FKey_keyequal_upd_pk(Trigger *trigger, Relation pk_rel, ! HeapTuple old_row, HeapTuple new_row) ! { ! RI_ConstraintInfo riinfo; ! ! /* ! * Get arguments. ! */ ! ri_FetchConstraintInfo(&riinfo, trigger, pk_rel, true); ! ! /* ! * Nothing to do if no column names to compare given ! */ ! if (riinfo.nkeys == 0) ! return true; ! ! switch (riinfo.confmatchtype) ! { ! case FKCONSTR_MATCH_UNSPECIFIED: ! case FKCONSTR_MATCH_FULL: ! /* Return true if keys are equal */ ! return ri_KeysEqual(pk_rel, old_row, new_row, &riinfo, true); ! ! /* Handle MATCH PARTIAL set null delete. */ ! case FKCONSTR_MATCH_PARTIAL: ! ereport(ERROR, ! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! errmsg("MATCH PARTIAL not yet implemented"))); ! break; ! } ! ! /* Never reached */ ! elog(ERROR, "invalid confmatchtype"); ! return false; ! } ! ! /* ---------- ! * RI_FKey_keyequal_upd_fk - ! * ! * Check if we have a key change on an update to an FK relation. This is ! * used by the AFTER trigger queue manager to see if it can skip queuing ! * an instance of an RI trigger. ! * ---------- ! */ ! bool ! RI_FKey_keyequal_upd_fk(Trigger *trigger, Relation fk_rel, ! HeapTuple old_row, HeapTuple new_row) ! { ! RI_ConstraintInfo riinfo; ! ! /* ! * Get arguments. ! */ ! ri_FetchConstraintInfo(&riinfo, trigger, fk_rel, false); ! ! /* ! * Nothing to do if no column names to compare given ! */ ! if (riinfo.nkeys == 0) ! return true; ! ! switch (riinfo.confmatchtype) ! { ! case FKCONSTR_MATCH_UNSPECIFIED: ! case FKCONSTR_MATCH_FULL: ! /* Return true if keys are equal */ ! return ri_KeysEqual(fk_rel, old_row, new_row, &riinfo, false); ! ! /* Handle MATCH PARTIAL set null delete. */ ! case FKCONSTR_MATCH_PARTIAL: ! ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("MATCH PARTIAL not yet implemented"))); break; *************** *** 2625,2637 **** RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) RI_ConstraintInfo riinfo; const char *constrname = trigger->tgname; StringInfoData querybuf; char pkrelname[MAX_QUOTED_REL_NAME_LEN]; char fkrelname[MAX_QUOTED_REL_NAME_LEN]; char pkattname[MAX_QUOTED_NAME_LEN + 3]; char fkattname[MAX_QUOTED_NAME_LEN + 3]; RangeTblEntry *pkrte; RangeTblEntry *fkrte; ! const char *sep; int i; int save_nestlevel; char workmembuf[32]; --- 3594,3608 ---- RI_ConstraintInfo riinfo; const char *constrname = trigger->tgname; StringInfoData querybuf; + StringInfoData qualbuf; + StringInfoData recheckbuf; char pkrelname[MAX_QUOTED_REL_NAME_LEN]; char fkrelname[MAX_QUOTED_REL_NAME_LEN]; char pkattname[MAX_QUOTED_NAME_LEN + 3]; char fkattname[MAX_QUOTED_NAME_LEN + 3]; RangeTblEntry *pkrte; RangeTblEntry *fkrte; ! const char *sep, *qual_sep, *recheck_sep; int i; int save_nestlevel; char workmembuf[32]; *************** *** 2678,2695 **** RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) * The query string built is: * SELECT fk.keycols FROM ONLY relname fk * LEFT OUTER JOIN ONLY pkrelname pk ! * ON (pk.pkkeycol1=fk.keycol1 [AND ...]) * WHERE pk.pkkeycol1 IS NULL AND * For MATCH unspecified: * (fk.keycol1 IS NOT NULL [AND ...]) * For MATCH FULL: * (fk.keycol1 IS NOT NULL [OR ...]) * * We attach COLLATE clauses to the operators when comparing columns * that have different collations. *---------- */ initStringInfo(&querybuf); appendStringInfo(&querybuf, "SELECT "); sep = ""; for (i = 0; i < riinfo.nkeys; i++) --- 3649,3679 ---- * The query string built is: * SELECT fk.keycols FROM ONLY relname fk * LEFT OUTER JOIN ONLY pkrelname pk ! * ON (pk.pkkeycol1=fk.keycol1 [AND ...] ! * [AND NOT EXISTS()]) * WHERE pk.pkkeycol1 IS NULL AND * For MATCH unspecified: * (fk.keycol1 IS NOT NULL [AND ...]) * For MATCH FULL: * (fk.keycol1 IS NOT NULL [OR ...]) * + * In case of an EACH foreign key, a recheck subquery is added to + * the join condition in order to check that every combination of keys + * is actually referenced. + * The RECHECK_SUBQUERY is + * SELECT 1 FROM + * unnest(fk.keycol1) x1(x1) [CROSS JOIN ...] + * LEFT OUTER JOIN ONLY pkrelname pk + * ON (pk.pkkeycol1=x1.x1 [AND ...]) + * WHERE pk.pkkeycol1 IS NULL AND + * (fk.keycol1 IS NOT NULL [AND ...]) + * * We attach COLLATE clauses to the operators when comparing columns * that have different collations. *---------- */ initStringInfo(&querybuf); + initStringInfo(&qualbuf); appendStringInfo(&querybuf, "SELECT "); sep = ""; for (i = 0; i < riinfo.nkeys; i++) *************** *** 2706,2711 **** RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) --- 3690,3706 ---- " FROM ONLY %s fk LEFT OUTER JOIN ONLY %s pk ON", fkrelname, pkrelname); + if (riinfo.confiseach) + { + initStringInfo(&recheckbuf); + appendStringInfo(&recheckbuf, + "SELECT 1 FROM"); + appendStringInfo(&qualbuf, + "LEFT OUTER JOIN ONLY %s pk ON", + pkrelname); + recheck_sep = ""; + qual_sep = "("; + } strcpy(pkattname, "pk."); strcpy(fkattname, "fk."); sep = "("; *************** *** 2723,2748 **** RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) ri_GenerateQual(&querybuf, sep, pkattname, pk_type, riinfo.pf_eq_oprs[i], ! fkattname, fk_type); if (pk_coll != fk_coll) ri_GenerateQualCollation(&querybuf, pk_coll); sep = "AND"; } /* * It's sufficient to test any one pk attribute for null to detect a join ! * failure. */ quoteOneName(pkattname, RIAttName(pk_rel, riinfo.pk_attnums[0])); ! appendStringInfo(&querybuf, ") WHERE pk.%s IS NULL AND (", pkattname); sep = ""; for (i = 0; i < riinfo.nkeys; i++) { quoteOneName(fkattname, RIAttName(fk_rel, riinfo.fk_attnums[i])); ! appendStringInfo(&querybuf, "%sfk.%s IS NOT NULL", sep, fkattname); switch (riinfo.confmatchtype) { case FKCONSTR_MATCH_UNSPECIFIED: --- 3718,3786 ---- ri_GenerateQual(&querybuf, sep, pkattname, pk_type, riinfo.pf_eq_oprs[i], ! fkattname, fk_type, ! riinfo.fk_each_atts[i]); if (pk_coll != fk_coll) ri_GenerateQualCollation(&querybuf, pk_coll); sep = "AND"; + + /* + * In case of an EACH foreign key, we check if there is at least + * a value in the array that is not present in the PK table. + */ + if (riinfo.fk_each_atts[i]) { + Oid fk_element_type = get_base_element_type(fk_type); + char unnest_name[16]; + + sprintf(unnest_name, "x%d.x%d", i + 1, i + 1); + appendStringInfo(&recheckbuf, + " %s unnest(%s) x%d(x%d)", + recheck_sep, fkattname, i + 1, i + 1); + ri_GenerateQual(&qualbuf, qual_sep, + pkattname, pk_type, + riinfo.pf_eq_oprs[i], + unnest_name, fk_element_type, + false); + if (pk_coll != fk_coll) + ri_GenerateQualCollation(&qualbuf, pk_coll); + recheck_sep = "CROSS JOIN"; + qual_sep = "AND"; + } } /* * It's sufficient to test any one pk attribute for null to detect a join ! * failure in the recheck subquery. */ quoteOneName(pkattname, RIAttName(pk_rel, riinfo.pk_attnums[0])); ! if (riinfo.confiseach) ! { ! appendStringInfo(&recheckbuf, " %s) WHERE pk.%s IS NULL AND (", ! qualbuf.data, pkattname); ! resetStringInfo(&qualbuf); ! recheck_sep = ""; ! } sep = ""; for (i = 0; i < riinfo.nkeys; i++) { quoteOneName(fkattname, RIAttName(fk_rel, riinfo.fk_attnums[i])); ! appendStringInfo(&qualbuf, "%sfk.%s IS NOT NULL", sep, fkattname); + + /* + * In the recheck subquery we always ignore any NULL value + * inside the array. + */ + if (riinfo.fk_each_atts[i]) + { + appendStringInfo(&recheckbuf, + "%sx%d.x%d IS NOT NULL", + recheck_sep, i + 1, i + 1); + recheck_sep = " AND "; + } + switch (riinfo.confmatchtype) { case FKCONSTR_MATCH_UNSPECIFIED: *************** *** 2762,2768 **** RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) break; } } ! appendStringInfo(&querybuf, ")"); /* * Temporarily increase work_mem so that the check query can be executed --- 3800,3814 ---- break; } } ! if (riinfo.confiseach) ! appendStringInfo(&querybuf, " AND NOT EXISTS(%s))", recheckbuf.data); ! ! /* ! * It's sufficient to test any one pk attribute for null to detect a join ! * failure. ! */ ! appendStringInfo(&querybuf, ") WHERE pk.%s IS NULL AND (%s)", ! pkattname, qualbuf.data); /* * Temporarily increase work_mem so that the check query can be executed *************** *** 2928,2934 **** ri_GenerateQual(StringInfo buf, const char *sep, const char *leftop, Oid leftoptype, Oid opoid, ! const char *rightop, Oid rightoptype) { HeapTuple opertup; Form_pg_operator operform; --- 3974,3981 ---- const char *sep, const char *leftop, Oid leftoptype, Oid opoid, ! const char *rightop, Oid rightoptype, ! bool is_array) { HeapTuple opertup; Form_pg_operator operform; *************** *** 2949,2957 **** ri_GenerateQual(StringInfo buf, ri_add_cast_to(buf, operform->oprleft); appendStringInfo(buf, " OPERATOR(%s.", quote_identifier(nspname)); appendStringInfoString(buf, oprname); ! appendStringInfo(buf, ") %s", rightop); ! if (rightoptype != operform->oprright) ! ri_add_cast_to(buf, operform->oprright); ReleaseSysCache(opertup); } --- 3996,4018 ---- ri_add_cast_to(buf, operform->oprleft); appendStringInfo(buf, " OPERATOR(%s.", quote_identifier(nspname)); appendStringInfoString(buf, oprname); ! /* ! * If rightoptype is an array of leftoptype check equality using ANY(). ! * Needed for array support in foreign keys. ! */ ! if (is_array) ! { ! appendStringInfo(buf, ") ANY (%s", rightop); ! if (rightoptype != get_array_type (operform->oprright)) ! ri_add_cast_to(buf, get_array_type (operform->oprright)); ! appendStringInfo(buf, ")"); ! } ! else ! { ! appendStringInfo(buf, ") %s", rightop); ! if (rightoptype != operform->oprright) ! ri_add_cast_to(buf, operform->oprright); ! } ReleaseSysCache(opertup); } *************** *** 3179,3184 **** ri_FetchConstraintInfo(RI_ConstraintInfo *riinfo, --- 4240,4246 ---- riinfo->confupdtype = conForm->confupdtype; riinfo->confdeltype = conForm->confdeltype; riinfo->confmatchtype = conForm->confmatchtype; + riinfo->confiseach = conForm->confiseach; /* * We expect the arrays to be 1-D arrays of the right types; verify that. *************** *** 3219,3224 **** ri_FetchConstraintInfo(RI_ConstraintInfo *riinfo, --- 4281,4302 ---- pfree(arr); /* free de-toasted copy, if any */ adatum = SysCacheGetAttr(CONSTROID, tup, + Anum_pg_constraint_confeach, &isNull); + if (isNull) + elog(ERROR, "null confeach for constraint %u", constraintOid); + arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */ + numkeys = ARR_DIMS(arr)[0]; + if (ARR_NDIM(arr) != 1 || + numkeys != riinfo->nkeys || + numkeys > RI_MAX_NUMKEYS || + ARR_HASNULL(arr) || + ARR_ELEMTYPE(arr) != BOOLOID) + elog(ERROR, "confeach is not a 1-D boolean array"); + memcpy(riinfo->fk_each_atts, ARR_DATA_PTR(arr), numkeys * sizeof(int16)); + if ((Pointer) arr != DatumGetPointer(adatum)) + pfree(arr); /* free de-toasted copy, if any */ + + adatum = SysCacheGetAttr(CONSTROID, tup, Anum_pg_constraint_conpfeqop, &isNull); if (isNull) elog(ERROR, "null conpfeqop for constraint %u", constraintOid); *************** *** 4078,4087 **** RI_FKey_trigger_type(Oid tgfoid) --- 5156,5169 ---- { case F_RI_FKEY_CASCADE_DEL: case F_RI_FKEY_CASCADE_UPD: + case F_RI_FKEY_EACHCASCADE_DEL: + case F_RI_FKEY_EACHCASCADE_UPD: case F_RI_FKEY_RESTRICT_DEL: case F_RI_FKEY_RESTRICT_UPD: case F_RI_FKEY_SETNULL_DEL: case F_RI_FKEY_SETNULL_UPD: + case F_RI_FKEY_EACHSETNULL_DEL: + case F_RI_FKEY_EACHSETNULL_UPD: case F_RI_FKEY_SETDEFAULT_DEL: case F_RI_FKEY_SETDEFAULT_UPD: case F_RI_FKEY_NOACTION_DEL: *** a/src/backend/utils/adt/ruleutils.c --- b/src/backend/utils/adt/ruleutils.c *************** *** 159,164 **** static char *pg_get_viewdef_worker(Oid viewoid, int prettyFlags); --- 159,167 ---- static char *pg_get_triggerdef_worker(Oid trigid, bool pretty); static void decompile_column_index_array(Datum column_index_array, Oid relId, StringInfo buf); + static void decompile_fk_column_index_array(Datum column_index_array, + Datum each_array, + Oid relId, StringInfo buf); static char *pg_get_ruledef_worker(Oid ruleoid, int prettyFlags); static char *pg_get_indexdef_worker(Oid indexrelid, int colno, const Oid *excludeOps, *************** *** 1144,1149 **** pg_get_constraintdef_worker(Oid constraintId, bool fullCommand, --- 1147,1153 ---- case CONSTRAINT_FOREIGN: { Datum val; + Datum each; bool isnull; const char *string; *************** *** 1156,1166 **** pg_get_constraintdef_worker(Oid constraintId, bool fullCommand, if (isnull) elog(ERROR, "null conkey for constraint %u", constraintId); ! decompile_column_index_array(val, conForm->conrelid, &buf); /* add foreign relation name */ ! appendStringInfo(&buf, ") REFERENCES %s(", generate_relation_name(conForm->confrelid, NIL)); --- 1160,1177 ---- if (isnull) elog(ERROR, "null conkey for constraint %u", constraintId); + each = SysCacheGetAttr(CONSTROID, tup, + Anum_pg_constraint_confeach, &isnull); + if (isnull) + elog(ERROR, "null confeach for constraint %u", + constraintId); ! decompile_fk_column_index_array(val, each, conForm->conrelid, &buf); ! ! appendStringInfo(&buf, ") REFERENCES "); /* add foreign relation name */ ! appendStringInfo(&buf, "%s(", generate_relation_name(conForm->confrelid, NIL)); *************** *** 1207,1215 **** pg_get_constraintdef_worker(Oid constraintId, bool fullCommand, --- 1218,1232 ---- case FKCONSTR_ACTION_CASCADE: string = "CASCADE"; break; + case FKCONSTR_ACTION_EACHCASCADE: + string = "EACH CASCADE"; + break; case FKCONSTR_ACTION_SETNULL: string = "SET NULL"; break; + case FKCONSTR_ACTION_EACHSETNULL: + string = "EACH SET NULL"; + break; case FKCONSTR_ACTION_SETDEFAULT: string = "SET DEFAULT"; break; *************** *** 1233,1241 **** pg_get_constraintdef_worker(Oid constraintId, bool fullCommand, --- 1250,1264 ---- case FKCONSTR_ACTION_CASCADE: string = "CASCADE"; break; + case FKCONSTR_ACTION_EACHCASCADE: + string = "EACH CASCADE"; + break; case FKCONSTR_ACTION_SETNULL: string = "SET NULL"; break; + case FKCONSTR_ACTION_EACHSETNULL: + string = "EACH SET NULL"; + break; case FKCONSTR_ACTION_SETDEFAULT: string = "SET DEFAULT"; break; *************** *** 1440,1445 **** decompile_column_index_array(Datum column_index_array, Oid relId, --- 1463,1513 ---- } } + /* + * Convert an int16[] Datum and an bool[] Datum into a comma-separated + * list of column names for the indicated relation prefixed by + * an optional EACH keyword; append the list to buf. + * + * The two arrays must have the same cardinality. + */ + static void + decompile_fk_column_index_array(Datum column_index_array, + Datum each_array, + Oid relId, StringInfo buf) + { + Datum *keys; + int nKeys; + Datum *bools; + int nBools; + int j; + + /* Extract data from array of int16 */ + deconstruct_array(DatumGetArrayTypeP(column_index_array), + INT2OID, 2, true, 's', + &keys, NULL, &nKeys); + + /* Extract data from array of bool */ + deconstruct_array(DatumGetArrayTypeP(each_array), + BOOLOID, 1, true, 'c', + &bools, NULL, &nBools); + + if (nKeys != nBools) + elog(ERROR, "wrong confeach cardinality"); + + for (j = 0; j < nKeys; j++) + { + char *colName; + char *each; + + colName = get_relid_attribute_name(relId, DatumGetInt16(keys[j])); + each = DatumGetBool(bools[j])?"EACH ":""; + + if (j == 0) + appendStringInfo(buf, "%s%s", each, quote_identifier(colName)); + else + appendStringInfo(buf, ", %s%s", each, quote_identifier(colName)); + } + } /* ---------- * get_expr - Decompile an expression tree *** a/src/include/catalog/pg_constraint.h --- b/src/include/catalog/pg_constraint.h *************** *** 91,96 **** CATALOG(pg_constraint,2606) --- 91,99 ---- /* Has a local definition and cannot be inherited */ bool conisonly; + /* true if an EACH REFERENCE foreign key */ + bool confiseach; + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* * Columns of conrelid that the constraint applies to, if known (this is *************** *** 104,109 **** CATALOG(pg_constraint,2606) --- 107,118 ---- int2 confkey[1]; /* + * If a foreign key, true if EACH foreign key for each column of + * the constraint + */ + bool confeach[1]; + + /* * If a foreign key, the OIDs of the PK = FK equality operators for each * column of the constraint */ *************** *** 150,156 **** typedef FormData_pg_constraint *Form_pg_constraint; * compiler constants for pg_constraint * ---------------- */ ! #define Natts_pg_constraint 24 #define Anum_pg_constraint_conname 1 #define Anum_pg_constraint_connamespace 2 #define Anum_pg_constraint_contype 3 --- 159,165 ---- * compiler constants for pg_constraint * ---------------- */ ! #define Natts_pg_constraint 26 #define Anum_pg_constraint_conname 1 #define Anum_pg_constraint_connamespace 2 #define Anum_pg_constraint_contype 3 *************** *** 167,180 **** typedef FormData_pg_constraint *Form_pg_constraint; #define Anum_pg_constraint_conislocal 14 #define Anum_pg_constraint_coninhcount 15 #define Anum_pg_constraint_conisonly 16 ! #define Anum_pg_constraint_conkey 17 ! #define Anum_pg_constraint_confkey 18 ! #define Anum_pg_constraint_conpfeqop 19 ! #define Anum_pg_constraint_conppeqop 20 ! #define Anum_pg_constraint_conffeqop 21 ! #define Anum_pg_constraint_conexclop 22 ! #define Anum_pg_constraint_conbin 23 ! #define Anum_pg_constraint_consrc 24 /* Valid values for contype */ --- 176,191 ---- #define Anum_pg_constraint_conislocal 14 #define Anum_pg_constraint_coninhcount 15 #define Anum_pg_constraint_conisonly 16 ! #define Anum_pg_constraint_coniseach 17 ! #define Anum_pg_constraint_conkey 18 ! #define Anum_pg_constraint_confkey 19 ! #define Anum_pg_constraint_confeach 20 ! #define Anum_pg_constraint_conpfeqop 21 ! #define Anum_pg_constraint_conppeqop 22 ! #define Anum_pg_constraint_conffeqop 23 ! #define Anum_pg_constraint_conexclop 24 ! #define Anum_pg_constraint_conbin 25 ! #define Anum_pg_constraint_consrc 26 /* Valid values for contype */ *************** *** 221,226 **** extern Oid CreateConstraintEntry(const char *constraintName, --- 232,239 ---- const Oid *ppEqOp, const Oid *ffEqOp, int foreignNKeys, + bool confisEach, + const bool *foreignEach, char foreignUpdateType, char foreignDeleteType, char foreignMatchType, *** a/src/include/catalog/pg_proc.h --- b/src/include/catalog/pg_proc.h *************** *** 876,881 **** DESCR("restriction selectivity for array-containment operators"); --- 876,886 ---- DATA(insert OID = 3818 ( arraycontjoinsel PGNSP PGUID 12 1 0 0 0 f f f f t f s 5 0 701 "2281 26 2281 21 2281" _null_ _null_ _null_ _null_ arraycontjoinsel _null_ _null_ _null_ )); DESCR("join selectivity for array-containment operators"); + DATA(insert OID = 3160 ( array_remove PGNSP PGUID 12 1 0 0 0 f f f f f f i 2 0 2277 "2277 2283" _null_ _null_ _null_ _null_ array_remove _null_ _null_ _null_ )); + DESCR("remove any occurrence of an element from an array"); + DATA(insert OID = 3161 ( array_replace PGNSP PGUID 12 1 0 0 0 f f f f f f i 3 0 2277 "2277 2283 2283" _null_ _null_ _null_ _null_ array_replace _null_ _null_ _null_ )); + DESCR("replace any occurrence of an element in an array"); + DATA(insert OID = 760 ( smgrin PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 210 "2275" _null_ _null_ _null_ _null_ smgrin _null_ _null_ _null_ )); DESCR("I/O"); DATA(insert OID = 761 ( smgrout PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 2275 "210" _null_ _null_ _null_ _null_ smgrout _null_ _null_ _null_ )); *************** *** 1992,1997 **** DESCR("referential integrity ON DELETE NO ACTION"); --- 1997,2011 ---- DATA(insert OID = 1655 ( RI_FKey_noaction_upd PGNSP PGUID 12 1 0 0 0 f f f f t f v 0 0 2279 "" _null_ _null_ _null_ _null_ RI_FKey_noaction_upd _null_ _null_ _null_ )); DESCR("referential integrity ON UPDATE NO ACTION"); + DATA(insert OID = 3166 ( RI_FKey_eachcascade_del PGNSP PGUID 12 1 0 0 0 f f f f t f v 0 0 2279 "" _null_ _null_ _null_ _null_ RI_FKey_eachcascade_del _null_ _null_ _null_ )); + DESCR("referential integrity ON DELETE EACH CASCADE"); + DATA(insert OID = 3167 ( RI_FKey_eachcascade_upd PGNSP PGUID 12 1 0 0 0 f f f f t f v 0 0 2279 "" _null_ _null_ _null_ _null_ RI_FKey_eachcascade_upd _null_ _null_ _null_ )); + DESCR("referential integrity ON UPDATE EACH CASCADE"); + DATA(insert OID = 3168 ( RI_FKey_eachsetnull_del PGNSP PGUID 12 1 0 0 0 f f f f t f v 0 0 2279 "" _null_ _null_ _null_ _null_ RI_FKey_eachsetnull_del _null_ _null_ _null_ )); + DESCR("referential integrity ON DELETE EACH SET NULL"); + DATA(insert OID = 3169 ( RI_FKey_eachsetnull_upd PGNSP PGUID 12 1 0 0 0 f f f f t f v 0 0 2279 "" _null_ _null_ _null_ _null_ RI_FKey_eachsetnull_upd _null_ _null_ _null_ )); + DESCR("referential integrity ON UPDATE EACH SET NULL"); + DATA(insert OID = 1666 ( varbiteq PGNSP PGUID 12 1 0 0 0 f f f t t f i 2 0 16 "1562 1562" _null_ _null_ _null_ _null_ biteq _null_ _null_ _null_ )); DATA(insert OID = 1667 ( varbitne PGNSP PGUID 12 1 0 0 0 f f f t t f i 2 0 16 "1562 1562" _null_ _null_ _null_ _null_ bitne _null_ _null_ _null_ )); DATA(insert OID = 1668 ( varbitge PGNSP PGUID 12 1 0 0 0 f f f t t f i 2 0 16 "1562 1562" _null_ _null_ _null_ _null_ bitge _null_ _null_ _null_ )); *** a/src/include/nodes/nodes.h --- b/src/include/nodes/nodes.h *************** *** 394,399 **** typedef enum NodeTag --- 394,400 ---- T_XmlSerialize, T_WithClause, T_CommonTableExpr, + T_ForeignKeyColumnElem, /* * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h) *** a/src/include/nodes/parsenodes.h --- b/src/include/nodes/parsenodes.h *************** *** 570,575 **** typedef struct DefElem --- 570,589 ---- } DefElem; /* + * ForeignKeyColumnElem - foreign key column (used in foreign key constraint) + * + * For a foreign key attribute, 'name' is the name of the table column to + * index, and each is true if it is an EACH fk. + */ + typedef struct ForeignKeyColumnElem + { + NodeTag type; + Node *name; /* name of the column, or NULL */ + bool each; /* true if an EACH foreign key */ + + } ForeignKeyColumnElem; + + /* * LockingClause - raw representation of FOR UPDATE/SHARE options * * Note: lockedRels == NIL means "all relations in query". Otherwise it *************** *** 1510,1515 **** typedef enum ConstrType /* types of constraints */ --- 1524,1531 ---- #define FKCONSTR_ACTION_CASCADE 'c' #define FKCONSTR_ACTION_SETNULL 'n' #define FKCONSTR_ACTION_SETDEFAULT 'd' + #define FKCONSTR_ACTION_EACHCASCADE 'C' + #define FKCONSTR_ACTION_EACHSETNULL 'N' /* Foreign key matchtype codes */ #define FKCONSTR_MATCH_FULL 'f' *************** *** 1553,1558 **** typedef struct Constraint --- 1569,1576 ---- char fk_upd_action; /* ON UPDATE action */ char fk_del_action; /* ON DELETE action */ List *old_conpfeqop; /* pg_constraint.conpfeqop of my former self */ + bool fk_is_each; /* is EACH REFERENCE foreign key? */ + List *fk_each_attrs; /* EACH REFERENCE attrs */ /* Fields used for constraints that allow a NOT VALID specification */ bool skip_validation; /* skip validation of existing rows? */ *** a/src/include/utils/array.h --- b/src/include/utils/array.h *************** *** 211,216 **** extern Datum generate_subscripts_nodir(PG_FUNCTION_ARGS); --- 211,218 ---- extern Datum array_fill(PG_FUNCTION_ARGS); extern Datum array_fill_with_lower_bounds(PG_FUNCTION_ARGS); extern Datum array_unnest(PG_FUNCTION_ARGS); + extern Datum array_remove(PG_FUNCTION_ARGS); + extern Datum array_replace(PG_FUNCTION_ARGS); extern Datum array_ref(ArrayType *array, int nSubscripts, int *indx, int arraytyplen, int elmlen, bool elmbyval, char elmalign, *** a/src/include/utils/builtins.h --- b/src/include/utils/builtins.h *************** *** 985,994 **** extern Datum RI_FKey_noaction_del(PG_FUNCTION_ARGS); --- 985,998 ---- extern Datum RI_FKey_noaction_upd(PG_FUNCTION_ARGS); extern Datum RI_FKey_cascade_del(PG_FUNCTION_ARGS); extern Datum RI_FKey_cascade_upd(PG_FUNCTION_ARGS); + extern Datum RI_FKey_eachcascade_del(PG_FUNCTION_ARGS); + extern Datum RI_FKey_eachcascade_upd(PG_FUNCTION_ARGS); extern Datum RI_FKey_restrict_del(PG_FUNCTION_ARGS); extern Datum RI_FKey_restrict_upd(PG_FUNCTION_ARGS); extern Datum RI_FKey_setnull_del(PG_FUNCTION_ARGS); extern Datum RI_FKey_setnull_upd(PG_FUNCTION_ARGS); + extern Datum RI_FKey_eachsetnull_del(PG_FUNCTION_ARGS); + extern Datum RI_FKey_eachsetnull_upd(PG_FUNCTION_ARGS); extern Datum RI_FKey_setdefault_del(PG_FUNCTION_ARGS); extern Datum RI_FKey_setdefault_upd(PG_FUNCTION_ARGS); *** a/src/test/regress/expected/arrays.out --- b/src/test/regress/expected/arrays.out *************** *** 1543,1548 **** select unnest(array[1,2,3,null,4,null,null,5,6]::text[]); --- 1543,1628 ---- 6 (9 rows) + select array_remove(array[1,2,2,3], 2); + array_remove + -------------- + {1,3} + (1 row) + + select array_remove(array[1,2,2,3], 5); + array_remove + -------------- + {1,2,2,3} + (1 row) + + select array_remove(array[1,NULL,NULL,3], NULL); + array_remove + -------------- + {1,3} + (1 row) + + select array_remove(array['A','C','D','C','R'], 'R'); + array_remove + -------------- + {A,C,D,C} + (1 row) + + select array_remove(array[1,3,4,5,6,10], 5); + array_remove + -------------- + {1,3,4,6,10} + (1 row) + + select array_remove('{{1,2,2},{1,4,3}}', 2); -- fails + ERROR: removing elements from multidimensional arrays is not supported + select array_replace(array[1,2,5,4],5,3); + array_replace + --------------- + {1,2,3,4} + (1 row) + + select array_replace(array[1,2,5,4],5,NULL); + array_replace + --------------- + {1,2,NULL,4} + (1 row) + + select array_replace(array[1,2,NULL,4,NULL],NULL,5); + array_replace + --------------- + {1,2,5,4,5} + (1 row) + + select array_replace(array['A','B','D','B'],'B','C'); + array_replace + --------------- + {A,C,D,C} + (1 row) + + select array_replace(array[1,NULL,3],NULL,NULL); + array_replace + --------------- + {1,NULL,3} + (1 row) + + select array_replace(array[1,NULL,3],NULL,2); + array_replace + --------------- + {1,2,3} + (1 row) + + select array_replace(array['1'::text,NULL::text,'3'::text],NULL::text,NULL::text); + array_replace + --------------- + {1,NULL,3} + (1 row) + + select array_replace(array['1'::text,NULL::text,'3'::text],NULL::text,'2'::text); + array_replace + --------------- + {1,2,3} + (1 row) + -- Insert/update on a column that is array of composite create temp table t1 (f1 int8_tbl[]); insert into t1 (f1[5].q1) values(42); *** a/src/test/regress/expected/btree_index.out --- b/src/test/regress/expected/btree_index.out *************** *** 106,129 **** set enable_seqscan to false; set enable_indexscan to true; set enable_bitmapscan to false; select proname from pg_proc where proname like E'RI\\_FKey%del' order by 1; ! proname ! ------------------------ RI_FKey_cascade_del RI_FKey_noaction_del RI_FKey_restrict_del RI_FKey_setdefault_del RI_FKey_setnull_del ! (5 rows) set enable_indexscan to false; set enable_bitmapscan to true; select proname from pg_proc where proname like E'RI\\_FKey%del' order by 1; ! proname ! ------------------------ RI_FKey_cascade_del RI_FKey_noaction_del RI_FKey_restrict_del RI_FKey_setdefault_del RI_FKey_setnull_del ! (5 rows) --- 106,133 ---- set enable_indexscan to true; set enable_bitmapscan to false; select proname from pg_proc where proname like E'RI\\_FKey%del' order by 1; ! proname ! ------------------------- RI_FKey_cascade_del + RI_FKey_eachcascade_del + RI_FKey_eachsetnull_del RI_FKey_noaction_del RI_FKey_restrict_del RI_FKey_setdefault_del RI_FKey_setnull_del ! (7 rows) set enable_indexscan to false; set enable_bitmapscan to true; select proname from pg_proc where proname like E'RI\\_FKey%del' order by 1; ! proname ! ------------------------- RI_FKey_cascade_del + RI_FKey_eachcascade_del + RI_FKey_eachsetnull_del RI_FKey_noaction_del RI_FKey_restrict_del RI_FKey_setdefault_del RI_FKey_setnull_del ! (7 rows) *** /dev/null --- b/src/test/regress/expected/each_foreign_key.out *************** *** 0 **** --- 1,1087 ---- + -- EACH FK CONSTRAINTS + -- + CREATE TABLE PKTABLEFORARRAY ( ptest1 int PRIMARY KEY, ptest2 text ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "pktableforarray_pkey" for table "pktableforarray" + -- Insert test data into PKTABLEFORARRAY + INSERT INTO PKTABLEFORARRAY VALUES (1, 'Test1'); + INSERT INTO PKTABLEFORARRAY VALUES (2, 'Test2'); + INSERT INTO PKTABLEFORARRAY VALUES (3, 'Test3'); + INSERT INTO PKTABLEFORARRAY VALUES (4, 'Test4'); + INSERT INTO PKTABLEFORARRAY VALUES (5, 'Test5'); + -- Check alter table + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[], ftest2 int ); + ALTER TABLE FKTABLEFORARRAY ADD CONSTRAINT FKARRAY FOREIGN KEY (EACH ftest1) REFERENCES PKTABLEFORARRAY; + DROP TABLE FKTABLEFORARRAY; + -- Check alter table with rows + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[], ftest2 int ); + INSERT INTO FKTABLEFORARRAY VALUES ('{1}', 1); + ALTER TABLE FKTABLEFORARRAY ADD CONSTRAINT FKARRAY FOREIGN KEY (EACH ftest1) REFERENCES PKTABLEFORARRAY; + DROP TABLE FKTABLEFORARRAY; + -- Check alter table with failing rows + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[], ftest2 int ); + INSERT INTO FKTABLEFORARRAY VALUES ('{10,1}', 2); + ALTER TABLE FKTABLEFORARRAY ADD CONSTRAINT FKARRAY FOREIGN KEY (EACH ftest1) REFERENCES PKTABLEFORARRAY; + ERROR: insert or update on table "fktableforarray" violates foreign key constraint "fkarray" + DETAIL: Key (ftest1)=({10,1}) is not present in table "pktableforarray". + DROP TABLE FKTABLEFORARRAY; + -- Check create table + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY, ftest2 int ); + CREATE TABLE FKTABLEFORARRAYMDIM ( ftest1 int[][] EACH REFERENCES PKTABLEFORARRAY, ftest2 int ); + CREATE TABLE FKTABLEFORARRAYNOTNULL ( ftest1 int[] NOT NULL EACH REFERENCES PKTABLEFORARRAY, ftest2 int ); + -- Insert successful rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES ('{1}', 3); + INSERT INTO FKTABLEFORARRAY VALUES ('{2}', 4); + INSERT INTO FKTABLEFORARRAY VALUES ('{1}', 5); + INSERT INTO FKTABLEFORARRAY VALUES ('{3}', 6); + INSERT INTO FKTABLEFORARRAY VALUES ('{1}', 7); + INSERT INTO FKTABLEFORARRAY VALUES ('{4,5}', 8); + INSERT INTO FKTABLEFORARRAY VALUES ('{4,4}', 9); + INSERT INTO FKTABLEFORARRAY VALUES (NULL, 10); + INSERT INTO FKTABLEFORARRAY VALUES ('{}', 11); + INSERT INTO FKTABLEFORARRAY VALUES ('{1,NULL}', 12); + INSERT INTO FKTABLEFORARRAY VALUES ('{NULL}', 13); + INSERT INTO FKTABLEFORARRAYMDIM VALUES ('{{4,5},{1,2},{1,3}}', 14); + INSERT INTO FKTABLEFORARRAYMDIM VALUES ('{{4,5},{NULL,2},{NULL,3}}', 15); + -- Insert failed rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES ('{6}', 16); + ERROR: insert or update on table "fktableforarray" violates foreign key constraint "fktableforarray_ftest1_fkey" + DETAIL: Key (ftest1)=({6}) is not present in table "pktableforarray". + INSERT INTO FKTABLEFORARRAY VALUES ('{4,6}', 17); + ERROR: insert or update on table "fktableforarray" violates foreign key constraint "fktableforarray_ftest1_fkey" + DETAIL: Key (ftest1)=({4,6}) is not present in table "pktableforarray". + INSERT INTO FKTABLEFORARRAY VALUES ('{6,NULL}', 18); + ERROR: insert or update on table "fktableforarray" violates foreign key constraint "fktableforarray_ftest1_fkey" + DETAIL: Key (ftest1)=({6,NULL}) is not present in table "pktableforarray". + INSERT INTO FKTABLEFORARRAY VALUES ('{6,NULL,4,NULL}', 19); + ERROR: insert or update on table "fktableforarray" violates foreign key constraint "fktableforarray_ftest1_fkey" + DETAIL: Key (ftest1)=({6,NULL,4,NULL}) is not present in table "pktableforarray". + INSERT INTO FKTABLEFORARRAYMDIM VALUES ('{{1,2},{6,NULL}}', 20); + ERROR: insert or update on table "fktableforarraymdim" violates foreign key constraint "fktableforarraymdim_ftest1_fkey" + DETAIL: Key (ftest1)=({{1,2},{6,NULL}}) is not present in table "pktableforarray". + INSERT INTO FKTABLEFORARRAYNOTNULL VALUES (NULL, 21); + ERROR: null value in column "ftest1" violates not-null constraint + DETAIL: Failing row contains (null, 21). + -- Check FKTABLE + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + ----------+-------- + {1} | 3 + {2} | 4 + {1} | 5 + {3} | 6 + {1} | 7 + {4,5} | 8 + {4,4} | 9 + | 10 + {} | 11 + {1,NULL} | 12 + {NULL} | 13 + (11 rows) + + -- Delete a row from PK TABLE (must fail due to ON DELETE NO ACTION) + DELETE FROM PKTABLEFORARRAY WHERE ptest1=1; + ERROR: update or delete on table "pktableforarray" violates foreign key constraint "fktableforarray_ftest1_fkey" on table "fktableforarray" + DETAIL: Key (ptest1)=(1) is still referenced from table "fktableforarray". + -- Check FKTABLE for removal of matched row + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + ----------+-------- + {1} | 3 + {2} | 4 + {1} | 5 + {3} | 6 + {1} | 7 + {4,5} | 8 + {4,4} | 9 + | 10 + {} | 11 + {1,NULL} | 12 + {NULL} | 13 + (11 rows) + + -- Update a row from PK TABLE (must fail due to ON UPDATE NO ACTION) + UPDATE PKTABLEFORARRAY SET ptest1=7 WHERE ptest1=1; + ERROR: update or delete on table "pktableforarray" violates foreign key constraint "fktableforarray_ftest1_fkey" on table "fktableforarray" + DETAIL: Key (ptest1)=(1) is still referenced from table "fktableforarray". + -- Check FKTABLE for update of matched row + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + ----------+-------- + {1} | 3 + {2} | 4 + {1} | 5 + {3} | 6 + {1} | 7 + {4,5} | 8 + {4,4} | 9 + | 10 + {} | 11 + {1,NULL} | 12 + {NULL} | 13 + (11 rows) + + -- Check UPDATE on FKTABLE + UPDATE FKTABLEFORARRAY SET ftest1=ARRAY[1] WHERE ftest2=4; + -- Check FKTABLE for update of matched row + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + ----------+-------- + {1} | 3 + {1} | 5 + {3} | 6 + {1} | 7 + {4,5} | 8 + {4,4} | 9 + | 10 + {} | 11 + {1,NULL} | 12 + {NULL} | 13 + {1} | 4 + (11 rows) + + DROP TABLE FKTABLEFORARRAY; + DROP TABLE FKTABLEFORARRAYNOTNULL; + DROP TABLE FKTABLEFORARRAYMDIM; + -- Allowed references with actions + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE NO ACTION ON UPDATE NO ACTION, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE NO ACTION ON UPDATE RESTRICT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE NO ACTION ON UPDATE SET DEFAULT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE NO ACTION ON UPDATE SET NULL, ftest2 int ); + INSERT INTO PKTABLEFORARRAY VALUES (100, 'TO BE UPDATED'); + INSERT INTO FKTABLEFORARRAY VALUES ('{1,100}', 100); + SELECT COUNT(*) FROM FKTABLEFORARRAY WHERE ftest1 IS NULL; -- Should return 0 + count + ------- + 0 + (1 row) + + UPDATE PKTABLEFORARRAY SET ptest1 = 1000, ptest2 = 'TO BE REMOVED' WHERE ptest1 = 100; + SELECT COUNT(*) FROM FKTABLEFORARRAY WHERE ftest1 IS NULL; -- Should return 1 + count + ------- + 1 + (1 row) + + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE NO ACTION ON UPDATE EACH CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE NO ACTION ON UPDATE EACH SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE RESTRICT ON UPDATE NO ACTION, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE RESTRICT ON UPDATE RESTRICT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE RESTRICT ON UPDATE SET DEFAULT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE RESTRICT ON UPDATE SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE RESTRICT ON UPDATE EACH CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE RESTRICT ON UPDATE EACH SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE CASCADE ON UPDATE NO ACTION, ftest2 int ); + INSERT INTO FKTABLEFORARRAY VALUES ('{1,1000}', 100); + SELECT COUNT(*) FROM FKTABLEFORARRAY; -- Should return 1 + count + ------- + 1 + (1 row) + + DELETE FROM PKTABLEFORARRAY WHERE ptest1 = 1000; -- From the previous example + SELECT COUNT(*) FROM FKTABLEFORARRAY; -- Should return 0 + count + ------- + 0 + (1 row) + + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE NO ACTION ON UPDATE EACH CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE CASCADE ON UPDATE RESTRICT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE CASCADE ON UPDATE SET DEFAULT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE CASCADE ON UPDATE SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE CASCADE ON UPDATE EACH CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE CASCADE ON UPDATE EACH SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET DEFAULT ON UPDATE NO ACTION, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET DEFAULT ON UPDATE RESTRICT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET DEFAULT ON UPDATE SET DEFAULT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET DEFAULT ON UPDATE SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET DEFAULT ON UPDATE EACH CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET DEFAULT ON UPDATE EACH SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET NULL ON UPDATE NO ACTION, ftest2 int ); + INSERT INTO PKTABLEFORARRAY VALUES (100, 'TO BE REMOVED'); + INSERT INTO FKTABLEFORARRAY VALUES ('{1,100}', 100); + SELECT COUNT(*) FROM FKTABLEFORARRAY; -- Should return 1 + count + ------- + 1 + (1 row) + + DELETE FROM PKTABLEFORARRAY WHERE ptest1 = 100; + SELECT COUNT(*) FROM FKTABLEFORARRAY WHERE ftest1 IS NULL; -- Should return 1 + count + ------- + 1 + (1 row) + + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET NULL ON UPDATE RESTRICT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET NULL ON UPDATE SET DEFAULT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET NULL ON UPDATE SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET NULL ON UPDATE EACH CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET NULL ON UPDATE EACH SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH CASCADE ON UPDATE NO ACTION, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH CASCADE ON UPDATE RESTRICT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH CASCADE ON UPDATE SET DEFAULT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH CASCADE ON UPDATE SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH CASCADE ON UPDATE EACH CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH CASCADE ON UPDATE EACH SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH SET NULL ON UPDATE NO ACTION, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH SET NULL ON UPDATE RESTRICT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH SET NULL ON UPDATE SET DEFAULT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH SET NULL ON UPDATE SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH SET NULL ON UPDATE EACH CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH SET NULL ON UPDATE EACH SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + -- Not allowed references (ON UPDATE CASCADE) + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE NO ACTION ON UPDATE CASCADE, ftest2 int ); + ERROR: ON UPDATE CASCADE action is not supported on EACH foreign keys + HINT: Perhaps you meant to use the ON UPDATE EACH CASCADE action + DROP TABLE FKTABLEFORARRAY; + ERROR: table "fktableforarray" does not exist + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE RESTRICT ON UPDATE CASCADE, ftest2 int ); + ERROR: ON UPDATE CASCADE action is not supported on EACH foreign keys + HINT: Perhaps you meant to use the ON UPDATE EACH CASCADE action + DROP TABLE FKTABLEFORARRAY; + ERROR: table "fktableforarray" does not exist + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE CASCADE ON UPDATE CASCADE, ftest2 int ); + ERROR: ON UPDATE CASCADE action is not supported on EACH foreign keys + HINT: Perhaps you meant to use the ON UPDATE EACH CASCADE action + DROP TABLE FKTABLEFORARRAY; + ERROR: table "fktableforarray" does not exist + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET DEFAULT ON UPDATE CASCADE, ftest2 int ); + ERROR: ON UPDATE CASCADE action is not supported on EACH foreign keys + HINT: Perhaps you meant to use the ON UPDATE EACH CASCADE action + DROP TABLE FKTABLEFORARRAY; + ERROR: table "fktableforarray" does not exist + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET NULL ON UPDATE CASCADE, ftest2 int ); + ERROR: ON UPDATE CASCADE action is not supported on EACH foreign keys + HINT: Perhaps you meant to use the ON UPDATE EACH CASCADE action + DROP TABLE FKTABLEFORARRAY; + ERROR: table "fktableforarray" does not exist + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH CASCADE ON UPDATE CASCADE, ftest2 int ); + ERROR: ON UPDATE CASCADE action is not supported on EACH foreign keys + HINT: Perhaps you meant to use the ON UPDATE EACH CASCADE action + DROP TABLE FKTABLEFORARRAY; + ERROR: table "fktableforarray" does not exist + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH SET NULL ON UPDATE CASCADE, ftest2 int ); + ERROR: ON UPDATE CASCADE action is not supported on EACH foreign keys + HINT: Perhaps you meant to use the ON UPDATE EACH CASCADE action + DROP TABLE FKTABLEFORARRAY; + ERROR: table "fktableforarray" does not exist + -- Cleanup + DROP TABLE PKTABLEFORARRAY; + -- Check reference on empty table + CREATE TABLE PKTABLEFORARRAY (ptest1 int PRIMARY KEY); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "pktableforarray_pkey" for table "pktableforarray" + CREATE TABLE FKTABLEFORARRAY (ftest1 int[] EACH REFERENCES PKTABLEFORARRAY); + INSERT INTO FKTABLEFORARRAY VALUES ('{}'); + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + -- Repeat a similar test using CHAR(1) keys rather than INTEGER + CREATE TABLE PKTABLEFORARRAY ( ptest1 CHAR(1) PRIMARY KEY, ptest2 text ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "pktableforarray_pkey" for table "pktableforarray" + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES ('A', 'Test A'); + INSERT INTO PKTABLEFORARRAY VALUES ('B', 'Test B'); + INSERT INTO PKTABLEFORARRAY VALUES ('C', 'Test C'); + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( ftest1 char(1)[] EACH REFERENCES PKTABLEFORARRAY ON UPDATE RESTRICT ON DELETE RESTRICT, ftest2 int ); + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES ('{"A"}', 1); + INSERT INTO FKTABLEFORARRAY VALUES ('{"B"}', 2); + INSERT INTO FKTABLEFORARRAY VALUES ('{"C"}', 3); + INSERT INTO FKTABLEFORARRAY VALUES ('{"A","B","C"}', 4); + -- Insert invalid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES ('{"D"}', 5); + ERROR: insert or update on table "fktableforarray" violates foreign key constraint "fktableforarray_ftest1_fkey" + DETAIL: Key (ftest1)=({D}) is not present in table "pktableforarray". + INSERT INTO FKTABLEFORARRAY VALUES ('{"A","B","D"}', 6); + ERROR: insert or update on table "fktableforarray" violates foreign key constraint "fktableforarray_ftest1_fkey" + DETAIL: Key (ftest1)=({A,B,D}) is not present in table "pktableforarray". + -- Check FKTABLE + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + ---------+-------- + {A} | 1 + {B} | 2 + {C} | 3 + {A,B,C} | 4 + (4 rows) + + -- Delete a row from PK TABLE (must fail due to ON DELETE RESTRICT) + DELETE FROM PKTABLEFORARRAY WHERE ptest1='A'; + ERROR: update or delete on table "pktableforarray" violates foreign key constraint "fktableforarray_ftest1_fkey" on table "fktableforarray" + DETAIL: Key (ptest1)=(A) is still referenced from table "fktableforarray". + -- Check FKTABLE for removal of matched row + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + ---------+-------- + {A} | 1 + {B} | 2 + {C} | 3 + {A,B,C} | 4 + (4 rows) + + -- Update a row from PK TABLE (must fail due to ON UPDATE RESTRICT) + UPDATE PKTABLEFORARRAY SET ptest1='D' WHERE ptest1='B'; + ERROR: update or delete on table "pktableforarray" violates foreign key constraint "fktableforarray_ftest1_fkey" on table "fktableforarray" + DETAIL: Key (ptest1)=(B) is still referenced from table "fktableforarray". + -- Check FKTABLE for update of matched row + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + ---------+-------- + {A} | 1 + {B} | 2 + {C} | 3 + {A,B,C} | 4 + (4 rows) + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + -- Repeat a similar test using INT4 keys coerced from INT2 + CREATE TABLE PKTABLEFORARRAY ( ptest1 int4 PRIMARY KEY, ptest2 text ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "pktableforarray_pkey" for table "pktableforarray" + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES (1, 'Test 1'); + INSERT INTO PKTABLEFORARRAY VALUES (2, 'Test 2'); + INSERT INTO PKTABLEFORARRAY VALUES (3, 'Test 3'); + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( ftest1 int2[] EACH REFERENCES PKTABLEFORARRAY, ftest2 int ); + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1], 1); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2], 2); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[3], 3); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1,2,3], 4); + -- Insert invalid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[4], 5); + ERROR: insert or update on table "fktableforarray" violates foreign key constraint "fktableforarray_ftest1_fkey" + DETAIL: Key (ftest1)=({4}) is not present in table "pktableforarray". + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1,2,5], 6); + ERROR: insert or update on table "fktableforarray" violates foreign key constraint "fktableforarray_ftest1_fkey" + DETAIL: Key (ftest1)=({1,2,5}) is not present in table "pktableforarray". + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + -- Repeat a similar test using FLOAT keys coerced from INTEGER + CREATE TABLE PKTABLEFORARRAY ( ptest1 float PRIMARY KEY, ptest2 text ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "pktableforarray_pkey" for table "pktableforarray" + -- FAILS because equality operator are incompatible + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY, ftest2 int ); + ERROR: foreign key constraint "fktableforarray_ftest1_fkey" cannot be implemented + DETAIL: Key columns "ftest1" and "ptest1" are of incompatible types: integer[] and double precision. + -- Cleanup + DROP TABLE PKTABLEFORARRAY; + -- Composite primary keys + CREATE TABLE PKTABLEFORARRAY ( id1 CHAR(1), id2 CHAR(1), ptest2 text, PRIMARY KEY (id1, id2) ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "pktableforarray_pkey" for table "pktableforarray" + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES ('A', 'A', 'Test A'); + INSERT INTO PKTABLEFORARRAY VALUES ('A', 'B', 'Test B'); + INSERT INTO PKTABLEFORARRAY VALUES ('B', 'C', 'Test B'); + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( fid1 CHAR(1), fid2 CHAR(1)[], ftest2 text, FOREIGN KEY (fid1, EACH fid2) REFERENCES PKTABLEFORARRAY); + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES ('A', ARRAY['A','B'], '1'); + INSERT INTO FKTABLEFORARRAY VALUES ('B', ARRAY['C'], '2'); + -- Insert invalid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES ('A', ARRAY['A','B', 'C'], '3'); + ERROR: insert or update on table "fktableforarray" violates foreign key constraint "fktableforarray_fid1_fkey" + DETAIL: Key (fid1, fid2)=(A, {A,B,C}) is not present in table "pktableforarray". + INSERT INTO FKTABLEFORARRAY VALUES ('B', ARRAY['A'], '4'); + ERROR: insert or update on table "fktableforarray" violates foreign key constraint "fktableforarray_fid1_fkey" + DETAIL: Key (fid1, fid2)=(B, {A}) is not present in table "pktableforarray". + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + -- Test EACH foreign keys with composite type + CREATE TYPE INVOICEID AS (year_part INTEGER, progressive_part INTEGER); + CREATE TABLE PKTABLEFORARRAY ( id INVOICEID PRIMARY KEY, ptest2 text); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "pktableforarray_pkey" for table "pktableforarray" + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES (ROW(2010, 99), 'Last invoice for 2010'); + INSERT INTO PKTABLEFORARRAY VALUES (ROW(2011, 1), 'First invoice for 2011'); + INSERT INTO PKTABLEFORARRAY VALUES (ROW(2011, 2), 'Second invoice for 2011'); + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( id SERIAL PRIMARY KEY, invoice_ids INVOICEID[] EACH REFERENCES PKTABLEFORARRAY ON UPDATE SET NULL ON DELETE SET NULL, ftest2 TEXT ); + NOTICE: CREATE TABLE will create implicit sequence "fktableforarray_id_seq" for serial column "fktableforarray.id" + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "fktableforarray_pkey" for table "fktableforarray" + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY(invoice_ids, ftest2) VALUES (ARRAY['(2010,99)']::INVOICEID[], 'Product A'); + INSERT INTO FKTABLEFORARRAY(invoice_ids, ftest2) VALUES (ARRAY['(2011,1)','(2011,2)']::INVOICEID[], 'Product B'); + INSERT INTO FKTABLEFORARRAY(invoice_ids, ftest2) VALUES (ARRAY['(2011,2)']::INVOICEID[], 'Product C'); + -- Insert invalid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY(invoice_ids, ftest2) VALUES (ARRAY['(2011,99)']::INVOICEID[], 'Product A'); + ERROR: insert or update on table "fktableforarray" violates foreign key constraint "fktableforarray_invoice_ids_fkey" + DETAIL: Key (invoice_ids)=({"(2011,99)"}) is not present in table "pktableforarray". + INSERT INTO FKTABLEFORARRAY(invoice_ids, ftest2) VALUES (ARRAY['(2011,1)','(2010,1)']::INVOICEID[], 'Product B'); + ERROR: insert or update on table "fktableforarray" violates foreign key constraint "fktableforarray_invoice_ids_fkey" + DETAIL: Key (invoice_ids)=({"(2011,1)","(2010,1)"}) is not present in table "pktableforarray". + -- Check FKTABLE + SELECT * FROM FKTABLEFORARRAY; + id | invoice_ids | ftest2 + ----+-------------------------+----------- + 1 | {"(2010,99)"} | Product A + 2 | {"(2011,1)","(2011,2)"} | Product B + 3 | {"(2011,2)"} | Product C + (3 rows) + + -- Delete a row from PK TABLE (ON DELETE SET NULL) + DELETE FROM PKTABLEFORARRAY WHERE id=ROW(2010,99); + -- Check FKTABLE for removal of matched row + SELECT * FROM FKTABLEFORARRAY; + id | invoice_ids | ftest2 + ----+-------------------------+----------- + 2 | {"(2011,1)","(2011,2)"} | Product B + 3 | {"(2011,2)"} | Product C + 1 | | Product A + (3 rows) + + -- Update a row from PK TABLE (ON UPDATE SET NULL) + UPDATE PKTABLEFORARRAY SET id=ROW(2011,99) WHERE id=ROW(2011,1); + -- Check FKTABLE for update of matched row + SELECT * FROM FKTABLEFORARRAY; + id | invoice_ids | ftest2 + ----+--------------+----------- + 3 | {"(2011,2)"} | Product C + 1 | | Product A + 2 | | Product B + (3 rows) + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + DROP TYPE INVOICEID; + -- Repeat a similar test for SET DEFAULT actions + CREATE TABLE PKTABLEFORARRAY ( ptest1 int PRIMARY KEY, ptest2 int ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "pktableforarray_pkey" for table "pktableforarray" + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES (1, 1); + INSERT INTO PKTABLEFORARRAY VALUES (2, 2); + INSERT INTO PKTABLEFORARRAY VALUES (3, 3); + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] NOT NULL DEFAULT ARRAY[]::int[] EACH REFERENCES PKTABLEFORARRAY ON UPDATE SET DEFAULT ON DELETE SET DEFAULT, ftest2 int ); + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1], 1); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2], 2); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[3], 3); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2,3], 4); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1,2], 5); + -- Check FKTABLE + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + --------+-------- + {1} | 1 + {2} | 2 + {3} | 3 + {2,3} | 4 + {1,2} | 5 + (5 rows) + + -- Delete a row from PK TABLE (ON DELETE SET DEFAULT) + DELETE FROM PKTABLEFORARRAY WHERE ptest1=1; + -- Check FKTABLE for removal of matched row + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + --------+-------- + {2} | 2 + {3} | 3 + {2,3} | 4 + {} | 1 + {} | 5 + (5 rows) + + -- Update a row from PK TABLE (ON UPDATE SET DEFAULT) + UPDATE PKTABLEFORARRAY SET ptest1=5 WHERE ptest1=3; + -- Check FKTABLE for update of matched row + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + --------+-------- + {2} | 2 + {} | 1 + {} | 5 + {} | 3 + {} | 4 + (5 rows) + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + -- Repeat a similar test for CASCADE actions + CREATE TABLE PKTABLEFORARRAY ( ptest1 int PRIMARY KEY, ptest2 int ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "pktableforarray_pkey" for table "pktableforarray" + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES (1, 1); + INSERT INTO PKTABLEFORARRAY VALUES (2, 2); + INSERT INTO PKTABLEFORARRAY VALUES (3, 3); + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] NOT NULL DEFAULT ARRAY[]::int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE CASCADE, ftest2 int ); + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1], 1); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2], 2); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[3], 3); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2,3], 4); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1,2], 5); + -- Check FKTABLE + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + --------+-------- + {1} | 1 + {2} | 2 + {3} | 3 + {2,3} | 4 + {1,2} | 5 + (5 rows) + + -- Delete a row from PK TABLE (ON DELETE CASCADE) + DELETE FROM PKTABLEFORARRAY WHERE ptest1=1; + -- Check FKTABLE for removal of matched row + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + --------+-------- + {2} | 2 + {3} | 3 + {2,3} | 4 + (3 rows) + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + -- Test for EACH SET NULL actions + CREATE TABLE PKTABLEFORARRAY ( ptest1 int PRIMARY KEY, ptest2 int ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "pktableforarray_pkey" for table "pktableforarray" + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES (1, 1); + INSERT INTO PKTABLEFORARRAY VALUES (2, 2); + INSERT INTO PKTABLEFORARRAY VALUES (3, 3); + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON UPDATE EACH SET NULL ON DELETE EACH SET NULL, ftest2 int ); + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1], 1); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2], 2); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[3], 3); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2,2,1], 4); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1,3,3], 5); + -- Check FKTABLE + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + ---------+-------- + {1} | 1 + {2} | 2 + {3} | 3 + {2,2,1} | 4 + {1,3,3} | 5 + (5 rows) + + -- Delete a row from PK TABLE (ON DELETE EACH SET NULL) + DELETE FROM PKTABLEFORARRAY WHERE ptest1=1; + -- Check FKTABLE for removal of matched row + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + ------------+-------- + {2} | 2 + {3} | 3 + {NULL} | 1 + {2,2,NULL} | 4 + {NULL,3,3} | 5 + (5 rows) + + -- Update a row from PK TABLE (ON UPDATE EACH SET NULL) + UPDATE PKTABLEFORARRAY SET ptest1=5 WHERE ptest1=3; + -- Check FKTABLE for update of matched row + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + ------------------+-------- + {2} | 2 + {NULL} | 1 + {2,2,NULL} | 4 + {NULL} | 3 + {NULL,NULL,NULL} | 5 + (5 rows) + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + -- Test for EACH CASCADE actions + CREATE TABLE PKTABLEFORARRAY ( ptest1 int PRIMARY KEY, ptest2 int ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "pktableforarray_pkey" for table "pktableforarray" + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES (1, 1); + INSERT INTO PKTABLEFORARRAY VALUES (2, 2); + INSERT INTO PKTABLEFORARRAY VALUES (3, 3); + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON UPDATE EACH CASCADE ON DELETE EACH CASCADE, ftest2 int ); + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1], 1); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2], 2); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[3], 3); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2,2,1], 4); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1,3,3], 5); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[ARRAY[2,3],ARRAY[3,3]], 6); + -- Check FKTABLE + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + ---------------+-------- + {1} | 1 + {2} | 2 + {3} | 3 + {2,2,1} | 4 + {1,3,3} | 5 + {{2,3},{3,3}} | 6 + (6 rows) + + -- Delete a row from PK TABLE (ON DELETE EACH CASCADE) + DELETE FROM PKTABLEFORARRAY WHERE ptest1=1; + -- Check FKTABLE for removal of matched row + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + ---------------+-------- + {2} | 2 + {3} | 3 + {{2,3},{3,3}} | 6 + {} | 1 + {2,2} | 4 + {3,3} | 5 + (6 rows) + + -- Delete a row from PK TABLE (ON DELETE EACH CASCADE fallback to ON DELETE RESTRICT) + DELETE FROM PKTABLEFORARRAY WHERE ptest1=2; + ERROR: update or delete on table "pktableforarray" violates foreign key constraint "fktableforarray_ftest1_fkey" on table "fktableforarray" + DETAIL: Key (ptest1)=(2) is still referenced from table "fktableforarray". + -- Check FKTABLE for removal of matched row + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + ---------------+-------- + {2} | 2 + {3} | 3 + {{2,3},{3,3}} | 6 + {} | 1 + {2,2} | 4 + {3,3} | 5 + (6 rows) + + -- Update a row from PK TABLE (ON UPDATE EACH CASCADE) + UPDATE PKTABLEFORARRAY SET ptest1=5 WHERE ptest1=3; + -- Check FKTABLE for update of matched row + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + ---------------+-------- + {2} | 2 + {} | 1 + {2,2} | 4 + {5} | 3 + {{2,5},{5,5}} | 6 + {5,5} | 5 + (6 rows) + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + -- Check EACH CASCADE actions with cast + CREATE TABLE PKTABLEFORARRAY (c smallint PRIMARY KEY); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "pktableforarray_pkey" for table "pktableforarray" + CREATE TABLE FKTABLEFORARRAY (c int[] EACH REFERENCES PKTABLEFORARRAY + ON UPDATE EACH CASCADE + ON DELETE EACH CASCADE); + INSERT INTO PKTABLEFORARRAY VALUES (1), (2); + INSERT INTO FKTABLEFORARRAY VALUES ('{1,2}'); + UPDATE PKTABLEFORARRAY SET c = 3 WHERE c = 2; + DELETE FROM PKTABLEFORARRAY WHERE c = 1; + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + -- Check EACH SET NULL actions with cast + CREATE TABLE PKTABLEFORARRAY (c smallint PRIMARY KEY); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "pktableforarray_pkey" for table "pktableforarray" + CREATE TABLE FKTABLEFORARRAY (c int[] EACH REFERENCES PKTABLEFORARRAY + ON UPDATE EACH SET NULL + ON DELETE EACH SET NULL); + INSERT INTO PKTABLEFORARRAY VALUES (1), (2); + INSERT INTO FKTABLEFORARRAY VALUES ('{1,2}'); + UPDATE PKTABLEFORARRAY SET c = 3 WHERE c = 2; + DELETE FROM PKTABLEFORARRAY WHERE c = 1; + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + -- Check for an array column referencing another array column (NOT EACH FOREIGN KEY) + -- Create primary table with an array primary key + CREATE TABLE PKTABLEFORARRAY ( id INT[] PRIMARY KEY, ptest2 text); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "pktableforarray_pkey" for table "pktableforarray" + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( id SERIAL PRIMARY KEY, fids INT[] REFERENCES PKTABLEFORARRAY, ftest2 TEXT ); + NOTICE: CREATE TABLE will create implicit sequence "fktableforarray_id_seq" for serial column "fktableforarray.id" + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "fktableforarray_pkey" for table "fktableforarray" + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES ('{1,1}', 'A'); + INSERT INTO PKTABLEFORARRAY VALUES ('{1,2}', 'B'); + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY (fids, ftest2) VALUES ('{1,1}', 'Product A'); + INSERT INTO FKTABLEFORARRAY (fids, ftest2) VALUES ('{1,2}', 'Product B'); + -- Insert invalid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY (fids, ftest2) VALUES ('{0,1}', 'Product C'); + ERROR: insert or update on table "fktableforarray" violates foreign key constraint "fktableforarray_fids_fkey" + DETAIL: Key (fids)=({0,1}) is not present in table "pktableforarray". + INSERT INTO FKTABLEFORARRAY (fids, ftest2) VALUES ('{2,1}', 'Product D'); + ERROR: insert or update on table "fktableforarray" violates foreign key constraint "fktableforarray_fids_fkey" + DETAIL: Key (fids)=({2,1}) is not present in table "pktableforarray". + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + -- Check ARRAY actions on normal FK + -- Create primary table with an EACH primary key + CREATE TABLE PKTABLEFORARRAY ( id INT PRIMARY KEY, ptest2 text); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "pktableforarray_pkey" for table "pktableforarray" + -- Create an EACH foreign key without an array (ERROR) + CREATE TABLE FKTABLEFORARRAY ( id SERIAL PRIMARY KEY, ftest2 INT EACH REFERENCES PKTABLEFORARRAY ); + NOTICE: CREATE TABLE will create implicit sequence "fktableforarray_id_seq" for serial column "fktableforarray.id" + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "fktableforarray_pkey" for table "fktableforarray" + ERROR: foreign key constraint "fktableforarray_ftest2_fkey" cannot be implemented + DETAIL: key column "ftest2" has type integer which is not an array type + -- Create the foreign table with forbidden actions (ERROR) + CREATE TABLE FKTABLEFORARRAY ( id SERIAL PRIMARY KEY, ftest2 INT[] REFERENCES PKTABLEFORARRAY ON DELETE EACH CASCADE ); + NOTICE: CREATE TABLE will create implicit sequence "fktableforarray_id_seq" for serial column "fktableforarray.id" + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "fktableforarray_pkey" for table "fktableforarray" + ERROR: EACH CASCADE and EACH SET NULL actions are only available on single-column EACH foreign keys + CREATE TABLE FKTABLEFORARRAY ( id SERIAL PRIMARY KEY, ftest2 INT[] REFERENCES PKTABLEFORARRAY ON UPDATE EACH CASCADE ); + NOTICE: CREATE TABLE will create implicit sequence "fktableforarray_id_seq" for serial column "fktableforarray.id" + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "fktableforarray_pkey" for table "fktableforarray" + ERROR: EACH CASCADE and EACH SET NULL actions are only available on single-column EACH foreign keys + CREATE TABLE FKTABLEFORARRAY ( id SERIAL PRIMARY KEY, ftest2 INT[] REFERENCES PKTABLEFORARRAY ON DELETE EACH SET NULL ); + NOTICE: CREATE TABLE will create implicit sequence "fktableforarray_id_seq" for serial column "fktableforarray.id" + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "fktableforarray_pkey" for table "fktableforarray" + ERROR: EACH CASCADE and EACH SET NULL actions are only available on single-column EACH foreign keys + CREATE TABLE FKTABLEFORARRAY ( id SERIAL PRIMARY KEY, ftest2 INT[] REFERENCES PKTABLEFORARRAY ON UPDATE EACH SET NULL ); + NOTICE: CREATE TABLE will create implicit sequence "fktableforarray_id_seq" for serial column "fktableforarray.id" + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "fktableforarray_pkey" for table "fktableforarray" + ERROR: EACH CASCADE and EACH SET NULL actions are only available on single-column EACH foreign keys + -- Cleanup + DROP TABLE PKTABLEFORARRAY; + -- --------------------------------------- + -- Multi-column "EACH" foreign key tests + -- --------------------------------------- + -- Create DIM1 table with two-column primary key + CREATE TABLE DIM1 (X INTEGER NOT NULL, Y INTEGER NOT NULL, PRIMARY KEY (X, Y)); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "dim1_pkey" for table "dim1" + -- Populate DIM1 table pairs + INSERT INTO DIM1 SELECT x.t, x.t * y.t + FROM (SELECT generate_series(1, 10) AS t) x, + (SELECT generate_series(0, 10) AS t) y; + -- Test with TABLE declaration of an each foreign key constraint (NO ACTION) + CREATE TABLE F1 ( + x INTEGER PRIMARY KEY, y INTEGER[], + FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y) + ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "f1_pkey" for table "f1" + -- Insert facts + INSERT INTO F1 VALUES (1, '{0,1,2,3,4,5}'); -- OK + INSERT INTO F1 VALUES (2, '{0,2,4,6}'); -- OK + INSERT INTO F1 VALUES (3, '{0,3,6,9,0,3,6,9,0,0,0,0,9,9}'); -- OK (multiple occurrences) + INSERT INTO F1 VALUES (4, '{0,2,4}'); -- FAILS (2 is not present) + ERROR: insert or update on table "f1" violates foreign key constraint "f1_x_fkey" + DETAIL: Key (x, y)=(4, {0,2,4}) is not present in table "dim1". + INSERT INTO F1 VALUES (4, '{0,NULL,4}'); -- OK + INSERT INTO F1 VALUES (5, '{0,NULL,5}'); -- OK + -- Try updates + UPDATE F1 SET y = '{0,2,4,6}' WHERE x = 2; -- OK + UPDATE F1 SET y = '{0,2,3,4,6}' WHERE x = 2; -- FAILS + ERROR: insert or update on table "f1" violates foreign key constraint "f1_x_fkey" + DETAIL: Key (x, y)=(2, {0,2,3,4,6}) is not present in table "dim1". + UPDATE F1 SET x = 20, y = '{0,2,3,4,6}' WHERE x = 2; -- FAILS (20 does not exist) + ERROR: insert or update on table "f1" violates foreign key constraint "f1_x_fkey" + DETAIL: Key (x, y)=(20, {0,2,3,4,6}) is not present in table "dim1". + UPDATE F1 SET y = '{0,4,8}' WHERE x = 4; -- OK + UPDATE F1 SET y = '{0,5,NULL,10}' WHERE x = 5; -- OK + DROP TABLE F1; + -- Test with FOREIGN KEY after TABLE population + CREATE TABLE F1 ( + x INTEGER PRIMARY KEY, y INTEGER[] + ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "f1_pkey" for table "f1" + -- Insert facts + INSERT INTO F1 VALUES (1, '{0,1,2,3,4,5}'); -- OK + INSERT INTO F1 VALUES (2, '{0,2,4,6}'); -- OK + INSERT INTO F1 VALUES (3, '{0,3,6,9,0,3,6,9,0,0,0,0,9,9}'); -- OK (multiple occurrences) + INSERT INTO F1 VALUES (4, '{0,2,4}'); -- OK + INSERT INTO F1 VALUES (5, '{0,NULL,5}'); -- OK + -- Try updates + UPDATE F1 SET y = '{0,2,4,6}' WHERE x = 2; -- OK + UPDATE F1 SET y = '{0,2,3,4,6}' WHERE x = 2; -- OK + UPDATE F1 SET x = 20, y = '{0,2,3,4,6}' WHERE x = 2; -- OK + -- Add foreign key (FAILS) + ALTER TABLE F1 ADD FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y); + ERROR: insert or update on table "f1" violates foreign key constraint "f1_x_fkey" + DETAIL: Key (x, y)=(20, {0,2,3,4,6}) is not present in table "dim1". + DROP TABLE F1; + -- Limitation on EACH CASCADE an EACH SET NULL on multi-column keys + CREATE TABLE F2 ( + x INTEGER PRIMARY KEY, y INTEGER[], + -- FAILS (multi-column foreign key forbids EACH CASCADE + FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y) ON DELETE EACH CASCADE ON UPDATE EACH CASCADE + ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "f2_pkey" for table "f2" + ERROR: EACH CASCADE and EACH SET NULL actions are only available on single-column EACH foreign keys + CREATE TABLE F2 ( + x INTEGER PRIMARY KEY, y INTEGER[], + -- FAILS (multi-column foreign key forbids EACH CASCADE + FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y) ON DELETE EACH CASCADE + ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "f2_pkey" for table "f2" + ERROR: EACH CASCADE and EACH SET NULL actions are only available on single-column EACH foreign keys + CREATE TABLE F2 ( + x INTEGER PRIMARY KEY, y INTEGER[], + -- FAILS (multi-column foreign key forbids EACH CASCADE + FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y) ON UPDATE EACH CASCADE + ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "f2_pkey" for table "f2" + ERROR: EACH CASCADE and EACH SET NULL actions are only available on single-column EACH foreign keys + CREATE TABLE F2 ( + x INTEGER PRIMARY KEY, y INTEGER[], + -- FAILS (multi-column foreign key forbids EACH SET NULL + FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y) ON DELETE EACH SET NULL ON UPDATE EACH SET NULL + ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "f2_pkey" for table "f2" + ERROR: EACH CASCADE and EACH SET NULL actions are only available on single-column EACH foreign keys + CREATE TABLE F2 ( + x INTEGER PRIMARY KEY, y INTEGER[], + -- FAILS (multi-column foreign key forbids EACH SET NULL + FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y) ON DELETE EACH SET NULL + ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "f2_pkey" for table "f2" + ERROR: EACH CASCADE and EACH SET NULL actions are only available on single-column EACH foreign keys + CREATE TABLE F2 ( + x INTEGER PRIMARY KEY, y INTEGER[], + -- FAILS (multi-column foreign key forbids EACH SET NULL + FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y) ON UPDATE EACH SET NULL + ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "f2_pkey" for table "f2" + ERROR: EACH CASCADE and EACH SET NULL actions are only available on single-column EACH foreign keys + -- Test of ON DELETE CASCADE and ON UPDATE SET NULL with multi-columns + CREATE TABLE F1 ( + x INTEGER PRIMARY KEY, y INTEGER[], + FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y) ON DELETE CASCADE ON UPDATE SET NULL + ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "f1_pkey" for table "f1" + -- Insert dummy dimension record + INSERT INTO DIM1 VALUES (199, 199); + INSERT INTO DIM1 VALUES (199, 200); + INSERT INTO DIM1 VALUES (199, 201); + -- Insert facts + INSERT INTO F1 VALUES (1, '{0,1,2,3,4,5}'); -- OK + INSERT INTO F1 VALUES (2, '{0,2,4,6}'); -- OK + INSERT INTO F1 VALUES (3, '{0,3,6,9,0,3,6,9,0,0,0,0,9,9}'); -- OK (multiple occurrences) + INSERT INTO F1 VALUES (4, '{0,NULL,4}'); -- OK + INSERT INTO F1 VALUES (5, '{0,NULL,5}'); -- OK + INSERT INTO F1 VALUES (199, '{199, 200}'); -- OK + SELECT COUNT(*) FROM F1 WHERE x = 199; -- should return 1 + count + ------- + 1 + (1 row) + + DELETE FROM DIM1 WHERE x = 199 AND y = 200; -- should remove the whole row in F1 + SELECT COUNT(*) FROM F1 WHERE x = 199; -- should return 0 + count + ------- + 0 + (1 row) + + INSERT INTO F1 VALUES (199, '{199, 201}'); -- OK + SELECT COUNT(*) FROM F1 WHERE x = 199 AND y IS NULL; -- should return 0 + count + ------- + 0 + (1 row) + + UPDATE DIM1 SET y = 200 WHERE x = 199 AND y = 201; -- should set the whole row to NULL in F1 + SELECT COUNT(*) FROM F1 WHERE x = 199 AND y IS NULL; -- should return 1 + count + ------- + 1 + (1 row) + + DROP TABLE F1; + -- Test with TABLE declaration of a two-dim EACH foreign key constraint (FAILS) + CREATE TABLE F1 ( + x INTEGER[] PRIMARY KEY, y INTEGER[], + FOREIGN KEY (EACH x, EACH y) REFERENCES DIM1(x, y) + ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "f1_pkey" for table "f1" + ERROR: EACH foreign keys support only one EACH column + -- Test with two-dim EACH foreign key after TABLE population + CREATE TABLE F1 ( + x INTEGER[] PRIMARY KEY, y INTEGER[] + ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "f1_pkey" for table "f1" + INSERT INTO F1 VALUES ('{1}', '{0,1,2,3,4,5}'); -- OK + INSERT INTO F1 VALUES ('{1,2}', '{0,2,4,6}'); -- OK + -- Add foreign key (FAILS) + ALTER TABLE F1 ADD FOREIGN KEY (EACH x, EACH y) REFERENCES DIM1(x, y); + ERROR: EACH foreign keys support only one EACH column + DROP TABLE F1; + -- Cleanup + DROP TABLE DIM1; + -- Check for potential name conflicts (with internal integrity checks) + CREATE TABLE x1(x1 int, x2 int, PRIMARY KEY(x1,x2)); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "x1_pkey" for table "x1" + INSERT INTO x1 VALUES + (1,4), + (1,5), + (2,4), + (2,5), + (3,6), + (3,7) + ; + CREATE TABLE x2(x1 int[], x2 int, FOREIGN KEY(EACH x1, x2) REFERENCES x1); + INSERT INTO x2 VALUES ('{1,2}',4); + INSERT INTO x2 VALUES ('{1,3}',6); -- FAILS + ERROR: insert or update on table "x2" violates foreign key constraint "x2_x1_fkey" + DETAIL: Key (x1, x2)=({1,3}, 6) is not present in table "x1". + DROP TABLE x2; + CREATE TABLE x2(x1 int[], x2 int); + INSERT INTO x2 VALUES ('{1,2}',4); + INSERT INTO x2 VALUES ('{1,3}',6); + ALTER TABLE x2 ADD CONSTRAINT fk_const FOREIGN KEY(EACH x1, x2) REFERENCES x1; -- FAILS + DROP TABLE x2; + DROP TABLE x1; + -- --------------------------------------- + -- Multi-dimensional "EACH" foreign key tests + -- --------------------------------------- + -- Create DIM1 table with two-column primary key + CREATE TABLE DIM1 (X INTEGER NOT NULL PRIMARY KEY, + CODE TEXT NOT NULL UNIQUE); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "dim1_pkey" for table "dim1" + NOTICE: CREATE TABLE / UNIQUE will create implicit index "dim1_code_key" for table "dim1" + -- Populate DIM1 table pairs + INSERT INTO DIM1 SELECT t, 'DIM1-' || lpad(t::TEXT, 2, '0') + FROM (SELECT generate_series(1, 10)) x(t); + -- Test with TABLE declaration of an each foreign key constraint (NO ACTION) + CREATE TABLE F1 ( + ID SERIAL PRIMARY KEY, + SLOTS INTEGER[3][3] EACH REFERENCES DIM1 + ); + NOTICE: CREATE TABLE will create implicit sequence "f1_id_seq" for serial column "f1.id" + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "f1_pkey" for table "f1" + INSERT INTO F1(SLOTS) VALUES ('{{NULL, 1, NULL}, {NULL, NULL, 3}, {NULL, NULL, 6}}'); -- OK + INSERT INTO F1(SLOTS) VALUES ('{{NULL, 1, NULL}, {NULL, NULL, 11}, {NULL, NULL, 6}}'); -- FAILS + ERROR: insert or update on table "f1" violates foreign key constraint "f1_slots_fkey" + DETAIL: Key (slots)=({{NULL,1,NULL},{NULL,NULL,11},{NULL,NULL,6}}) is not present in table "dim1". + INSERT INTO F1(SLOTS) VALUES ('{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}'); -- OK + INSERT INTO F1(SLOTS) VALUES ('{1, 2, 3, 4, 5, 6, 7, 8, 9}'); -- OK + UPDATE F1 SET SLOTS = '{{NULL, 1, NULL}, {NULL, NULL, 3}, {7, 8, 10}}' WHERE ID = 1; -- OK + UPDATE F1 SET SLOTS = '{{100, 100, 100}, {NULL, NULL, 20}, {7, 8, 10}}' WHERE ID = 1; -- FAILS + ERROR: insert or update on table "f1" violates foreign key constraint "f1_slots_fkey" + DETAIL: Key (slots)=({{100,100,100},{NULL,NULL,20},{7,8,10}}) is not present in table "dim1". + DROP TABLE F1; + -- Test with postponed foreign key + CREATE TABLE F1 ( + ID SERIAL PRIMARY KEY, + SLOTS INTEGER[3][3] + ); + NOTICE: CREATE TABLE will create implicit sequence "f1_id_seq" for serial column "f1.id" + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "f1_pkey" for table "f1" + INSERT INTO F1(SLOTS) VALUES ('{{NULL, 1, NULL}, {NULL, NULL, 3}, {NULL, NULL, 6}}'); -- OK + INSERT INTO F1(SLOTS) VALUES ('{{NULL, 1, NULL}, {NULL, NULL, 11}, {NULL, NULL, 6}}'); -- OK + INSERT INTO F1(SLOTS) VALUES ('{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}'); -- OK + INSERT INTO F1(SLOTS) VALUES ('{1, 2, 3, 4, 5, 6, 7, 8, 9}'); -- OK + ALTER TABLE F1 ADD FOREIGN KEY (EACH SLOTS) REFERENCES DIM1; -- FAILS + ERROR: insert or update on table "f1" violates foreign key constraint "f1_slots_fkey" + DETAIL: Key (slots)=({{NULL,1,NULL},{NULL,NULL,11},{NULL,NULL,6}}) is not present in table "dim1". + DELETE FROM F1 WHERE ID = 2; -- REMOVE ISSUE + ALTER TABLE F1 ADD FOREIGN KEY (EACH SLOTS) REFERENCES DIM1; -- NOW OK + INSERT INTO F1(SLOTS) VALUES ('{{NULL, 1, NULL}, {NULL, NULL, 11}, {NULL, NULL, 6}}'); -- FAILS + ERROR: insert or update on table "f1" violates foreign key constraint "f1_slots_fkey" + DETAIL: Key (slots)=({{NULL,1,NULL},{NULL,NULL,11},{NULL,NULL,6}}) is not present in table "dim1". + DROP TABLE F1; + -- Test with postponed foreign key, on delete each cascade + -- and a multi-dimensional array + CREATE TABLE F1 ( + ID SERIAL PRIMARY KEY, + SLOTS INTEGER[3][3] + ); + NOTICE: CREATE TABLE will create implicit sequence "f1_id_seq" for serial column "f1.id" + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "f1_pkey" for table "f1" + INSERT INTO F1(SLOTS) VALUES ('{{NULL, 1, NULL}, {NULL, NULL, 3}, {NULL, NULL, 6}}'); -- OK + INSERT INTO F1(SLOTS) VALUES ('{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}'); -- OK + INSERT INTO F1(SLOTS) VALUES ('{1, 2, 3, 4, 5, 6, 7, 8, 9}'); -- OK + ALTER TABLE F1 ADD FOREIGN KEY (EACH SLOTS) REFERENCES DIM1 ON DELETE EACH CASCADE; -- FAILS + DROP TABLE F1; + -- Test with EACH foreign key, on delete each cascade + -- and a multi-dimensional array + CREATE TABLE F1 ( + ID SERIAL PRIMARY KEY, + SLOTS INTEGER[3][3] EACH REFERENCES DIM1 ON DELETE EACH CASCADE + ); + NOTICE: CREATE TABLE will create implicit sequence "f1_id_seq" for serial column "f1.id" + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "f1_pkey" for table "f1" + INSERT INTO F1(SLOTS) VALUES ('{{NULL, 1, NULL}, {NULL, NULL, 3}, {NULL, NULL, 6}}'); -- OK + INSERT INTO F1(SLOTS) VALUES ('{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}'); -- OK + INSERT INTO F1(SLOTS) VALUES ('{1, 2, 3, 4, 5, 6, 7, 8, 9}'); -- OK + -- Fails (behaves like RESTRICT) + DELETE FROM DIM1 WHERE x = 2; + ERROR: update or delete on table "dim1" violates foreign key constraint "f1_slots_fkey" on table "f1" + DETAIL: Key (x)=(2) is still referenced from table "f1". + DROP TABLE F1; + -- Test with postponed foreign key, on delete each cascade + -- and a multi-dimensional array + CREATE TABLE F1 ( + ID SERIAL PRIMARY KEY, + SLOTS INTEGER[3][3] + ); + NOTICE: CREATE TABLE will create implicit sequence "f1_id_seq" for serial column "f1.id" + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "f1_pkey" for table "f1" + INSERT INTO F1(SLOTS) VALUES ('{{NULL, 1, NULL}, {NULL, NULL, 3}, {NULL, NULL, 6}}'); -- OK + INSERT INTO F1(SLOTS) VALUES ('{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}'); -- OK + INSERT INTO F1(SLOTS) VALUES ('{1, 2, 3, 4, 5, 6, 7, 8, 9}'); -- OK + ALTER TABLE F1 ADD FOREIGN KEY (EACH SLOTS) REFERENCES DIM1 ON DELETE EACH CASCADE; + -- Fails (behaves like RESTRICT) + DELETE FROM DIM1 WHERE x = 2; + ERROR: update or delete on table "dim1" violates foreign key constraint "f1_slots_fkey" on table "f1" + DETAIL: Key (x)=(2) is still referenced from table "f1". + DROP TABLE F1; + -- Cleanup + DROP TABLE DIM1; + -- Leave tables in the database + CREATE TABLE PKTABLEFOREACHFK ( ptest1 int PRIMARY KEY, ptest2 text ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "pktableforeachfk_pkey" for table "pktableforeachfk" + CREATE TABLE FKTABLEFOREACHFK ( ftest1 int[] EACH REFERENCES PKTABLEFOREACHFK, ftest2 int ); *** a/src/test/regress/parallel_schedule --- b/src/test/regress/parallel_schedule *************** *** 92,98 **** test: rules # ---------- # Another group of parallel tests # ---------- ! test: select_views portals_p2 foreign_key cluster dependency guc bitmapops combocid tsearch tsdicts foreign_data window xmlmap functional_deps advisory_lock json # ---------- # Another group of parallel tests --- 92,98 ---- # ---------- # Another group of parallel tests # ---------- ! test: select_views portals_p2 foreign_key cluster dependency guc bitmapops combocid tsearch tsdicts foreign_data window xmlmap functional_deps advisory_lock each_foreign_key # ---------- # Another group of parallel tests *** a/src/test/regress/serial_schedule --- b/src/test/regress/serial_schedule *************** *** 99,104 **** test: rules --- 99,105 ---- test: select_views test: portals_p2 test: foreign_key + test: each_foreign_key test: cluster test: dependency test: guc *** a/src/test/regress/sql/arrays.sql --- b/src/test/regress/sql/arrays.sql *************** *** 432,437 **** select unnest(array[1,2,3,4.5]::float8[]); --- 432,451 ---- select unnest(array[1,2,3,4.5]::numeric[]); select unnest(array[1,2,3,null,4,null,null,5,6]); select unnest(array[1,2,3,null,4,null,null,5,6]::text[]); + select array_remove(array[1,2,2,3], 2); + select array_remove(array[1,2,2,3], 5); + select array_remove(array[1,NULL,NULL,3], NULL); + select array_remove(array['A','C','D','C','R'], 'R'); + select array_remove(array[1,3,4,5,6,10], 5); + select array_remove('{{1,2,2},{1,4,3}}', 2); -- fails + select array_replace(array[1,2,5,4],5,3); + select array_replace(array[1,2,5,4],5,NULL); + select array_replace(array[1,2,NULL,4,NULL],NULL,5); + select array_replace(array['A','B','D','B'],'B','C'); + select array_replace(array[1,NULL,3],NULL,NULL); + select array_replace(array[1,NULL,3],NULL,2); + select array_replace(array['1'::text,NULL::text,'3'::text],NULL::text,NULL::text); + select array_replace(array['1'::text,NULL::text,'3'::text],NULL::text,'2'::text); -- Insert/update on a column that is array of composite *** /dev/null --- b/src/test/regress/sql/each_foreign_key.sql *************** *** 0 **** --- 1,798 ---- + -- EACH FK CONSTRAINTS + -- + CREATE TABLE PKTABLEFORARRAY ( ptest1 int PRIMARY KEY, ptest2 text ); + + -- Insert test data into PKTABLEFORARRAY + INSERT INTO PKTABLEFORARRAY VALUES (1, 'Test1'); + INSERT INTO PKTABLEFORARRAY VALUES (2, 'Test2'); + INSERT INTO PKTABLEFORARRAY VALUES (3, 'Test3'); + INSERT INTO PKTABLEFORARRAY VALUES (4, 'Test4'); + INSERT INTO PKTABLEFORARRAY VALUES (5, 'Test5'); + + -- Check alter table + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[], ftest2 int ); + ALTER TABLE FKTABLEFORARRAY ADD CONSTRAINT FKARRAY FOREIGN KEY (EACH ftest1) REFERENCES PKTABLEFORARRAY; + DROP TABLE FKTABLEFORARRAY; + + -- Check alter table with rows + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[], ftest2 int ); + INSERT INTO FKTABLEFORARRAY VALUES ('{1}', 1); + ALTER TABLE FKTABLEFORARRAY ADD CONSTRAINT FKARRAY FOREIGN KEY (EACH ftest1) REFERENCES PKTABLEFORARRAY; + DROP TABLE FKTABLEFORARRAY; + + -- Check alter table with failing rows + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[], ftest2 int ); + INSERT INTO FKTABLEFORARRAY VALUES ('{10,1}', 2); + ALTER TABLE FKTABLEFORARRAY ADD CONSTRAINT FKARRAY FOREIGN KEY (EACH ftest1) REFERENCES PKTABLEFORARRAY; + DROP TABLE FKTABLEFORARRAY; + + -- Check create table + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY, ftest2 int ); + CREATE TABLE FKTABLEFORARRAYMDIM ( ftest1 int[][] EACH REFERENCES PKTABLEFORARRAY, ftest2 int ); + CREATE TABLE FKTABLEFORARRAYNOTNULL ( ftest1 int[] NOT NULL EACH REFERENCES PKTABLEFORARRAY, ftest2 int ); + + -- Insert successful rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES ('{1}', 3); + INSERT INTO FKTABLEFORARRAY VALUES ('{2}', 4); + INSERT INTO FKTABLEFORARRAY VALUES ('{1}', 5); + INSERT INTO FKTABLEFORARRAY VALUES ('{3}', 6); + INSERT INTO FKTABLEFORARRAY VALUES ('{1}', 7); + INSERT INTO FKTABLEFORARRAY VALUES ('{4,5}', 8); + INSERT INTO FKTABLEFORARRAY VALUES ('{4,4}', 9); + INSERT INTO FKTABLEFORARRAY VALUES (NULL, 10); + INSERT INTO FKTABLEFORARRAY VALUES ('{}', 11); + INSERT INTO FKTABLEFORARRAY VALUES ('{1,NULL}', 12); + INSERT INTO FKTABLEFORARRAY VALUES ('{NULL}', 13); + INSERT INTO FKTABLEFORARRAYMDIM VALUES ('{{4,5},{1,2},{1,3}}', 14); + INSERT INTO FKTABLEFORARRAYMDIM VALUES ('{{4,5},{NULL,2},{NULL,3}}', 15); + + -- Insert failed rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES ('{6}', 16); + INSERT INTO FKTABLEFORARRAY VALUES ('{4,6}', 17); + INSERT INTO FKTABLEFORARRAY VALUES ('{6,NULL}', 18); + INSERT INTO FKTABLEFORARRAY VALUES ('{6,NULL,4,NULL}', 19); + INSERT INTO FKTABLEFORARRAYMDIM VALUES ('{{1,2},{6,NULL}}', 20); + INSERT INTO FKTABLEFORARRAYNOTNULL VALUES (NULL, 21); + + -- Check FKTABLE + SELECT * FROM FKTABLEFORARRAY; + + -- Delete a row from PK TABLE (must fail due to ON DELETE NO ACTION) + DELETE FROM PKTABLEFORARRAY WHERE ptest1=1; + + -- Check FKTABLE for removal of matched row + SELECT * FROM FKTABLEFORARRAY; + + -- Update a row from PK TABLE (must fail due to ON UPDATE NO ACTION) + UPDATE PKTABLEFORARRAY SET ptest1=7 WHERE ptest1=1; + + -- Check FKTABLE for update of matched row + SELECT * FROM FKTABLEFORARRAY; + + -- Check UPDATE on FKTABLE + UPDATE FKTABLEFORARRAY SET ftest1=ARRAY[1] WHERE ftest2=4; + + -- Check FKTABLE for update of matched row + SELECT * FROM FKTABLEFORARRAY; + + DROP TABLE FKTABLEFORARRAY; + DROP TABLE FKTABLEFORARRAYNOTNULL; + DROP TABLE FKTABLEFORARRAYMDIM; + + -- Allowed references with actions + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE NO ACTION ON UPDATE NO ACTION, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE NO ACTION ON UPDATE RESTRICT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE NO ACTION ON UPDATE SET DEFAULT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE NO ACTION ON UPDATE SET NULL, ftest2 int ); + INSERT INTO PKTABLEFORARRAY VALUES (100, 'TO BE UPDATED'); + INSERT INTO FKTABLEFORARRAY VALUES ('{1,100}', 100); + SELECT COUNT(*) FROM FKTABLEFORARRAY WHERE ftest1 IS NULL; -- Should return 0 + UPDATE PKTABLEFORARRAY SET ptest1 = 1000, ptest2 = 'TO BE REMOVED' WHERE ptest1 = 100; + SELECT COUNT(*) FROM FKTABLEFORARRAY WHERE ftest1 IS NULL; -- Should return 1 + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE NO ACTION ON UPDATE EACH CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE NO ACTION ON UPDATE EACH SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE RESTRICT ON UPDATE NO ACTION, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE RESTRICT ON UPDATE RESTRICT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE RESTRICT ON UPDATE SET DEFAULT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE RESTRICT ON UPDATE SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE RESTRICT ON UPDATE EACH CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE RESTRICT ON UPDATE EACH SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE CASCADE ON UPDATE NO ACTION, ftest2 int ); + INSERT INTO FKTABLEFORARRAY VALUES ('{1,1000}', 100); + SELECT COUNT(*) FROM FKTABLEFORARRAY; -- Should return 1 + DELETE FROM PKTABLEFORARRAY WHERE ptest1 = 1000; -- From the previous example + SELECT COUNT(*) FROM FKTABLEFORARRAY; -- Should return 0 + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE NO ACTION ON UPDATE EACH CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE CASCADE ON UPDATE RESTRICT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE CASCADE ON UPDATE SET DEFAULT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE CASCADE ON UPDATE SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE CASCADE ON UPDATE EACH CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE CASCADE ON UPDATE EACH SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET DEFAULT ON UPDATE NO ACTION, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET DEFAULT ON UPDATE RESTRICT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET DEFAULT ON UPDATE SET DEFAULT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET DEFAULT ON UPDATE SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET DEFAULT ON UPDATE EACH CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET DEFAULT ON UPDATE EACH SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET NULL ON UPDATE NO ACTION, ftest2 int ); + INSERT INTO PKTABLEFORARRAY VALUES (100, 'TO BE REMOVED'); + INSERT INTO FKTABLEFORARRAY VALUES ('{1,100}', 100); + SELECT COUNT(*) FROM FKTABLEFORARRAY; -- Should return 1 + DELETE FROM PKTABLEFORARRAY WHERE ptest1 = 100; + SELECT COUNT(*) FROM FKTABLEFORARRAY WHERE ftest1 IS NULL; -- Should return 1 + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET NULL ON UPDATE RESTRICT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET NULL ON UPDATE SET DEFAULT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET NULL ON UPDATE SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET NULL ON UPDATE EACH CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET NULL ON UPDATE EACH SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH CASCADE ON UPDATE NO ACTION, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH CASCADE ON UPDATE RESTRICT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH CASCADE ON UPDATE SET DEFAULT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH CASCADE ON UPDATE SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH CASCADE ON UPDATE EACH CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH CASCADE ON UPDATE EACH SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH SET NULL ON UPDATE NO ACTION, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH SET NULL ON UPDATE RESTRICT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH SET NULL ON UPDATE SET DEFAULT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH SET NULL ON UPDATE SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH SET NULL ON UPDATE EACH CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH SET NULL ON UPDATE EACH SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + + -- Not allowed references (ON UPDATE CASCADE) + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE NO ACTION ON UPDATE CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE RESTRICT ON UPDATE CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE CASCADE ON UPDATE CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET DEFAULT ON UPDATE CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET NULL ON UPDATE CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH CASCADE ON UPDATE CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH SET NULL ON UPDATE CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + + -- Cleanup + DROP TABLE PKTABLEFORARRAY; + + -- Check reference on empty table + CREATE TABLE PKTABLEFORARRAY (ptest1 int PRIMARY KEY); + CREATE TABLE FKTABLEFORARRAY (ftest1 int[] EACH REFERENCES PKTABLEFORARRAY); + INSERT INTO FKTABLEFORARRAY VALUES ('{}'); + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + + -- Repeat a similar test using CHAR(1) keys rather than INTEGER + CREATE TABLE PKTABLEFORARRAY ( ptest1 CHAR(1) PRIMARY KEY, ptest2 text ); + + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES ('A', 'Test A'); + INSERT INTO PKTABLEFORARRAY VALUES ('B', 'Test B'); + INSERT INTO PKTABLEFORARRAY VALUES ('C', 'Test C'); + + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( ftest1 char(1)[] EACH REFERENCES PKTABLEFORARRAY ON UPDATE RESTRICT ON DELETE RESTRICT, ftest2 int ); + + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES ('{"A"}', 1); + INSERT INTO FKTABLEFORARRAY VALUES ('{"B"}', 2); + INSERT INTO FKTABLEFORARRAY VALUES ('{"C"}', 3); + INSERT INTO FKTABLEFORARRAY VALUES ('{"A","B","C"}', 4); + + -- Insert invalid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES ('{"D"}', 5); + INSERT INTO FKTABLEFORARRAY VALUES ('{"A","B","D"}', 6); + + -- Check FKTABLE + SELECT * FROM FKTABLEFORARRAY; + + -- Delete a row from PK TABLE (must fail due to ON DELETE RESTRICT) + DELETE FROM PKTABLEFORARRAY WHERE ptest1='A'; + + -- Check FKTABLE for removal of matched row + SELECT * FROM FKTABLEFORARRAY; + + -- Update a row from PK TABLE (must fail due to ON UPDATE RESTRICT) + UPDATE PKTABLEFORARRAY SET ptest1='D' WHERE ptest1='B'; + + -- Check FKTABLE for update of matched row + SELECT * FROM FKTABLEFORARRAY; + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + + -- Repeat a similar test using INT4 keys coerced from INT2 + CREATE TABLE PKTABLEFORARRAY ( ptest1 int4 PRIMARY KEY, ptest2 text ); + + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES (1, 'Test 1'); + INSERT INTO PKTABLEFORARRAY VALUES (2, 'Test 2'); + INSERT INTO PKTABLEFORARRAY VALUES (3, 'Test 3'); + + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( ftest1 int2[] EACH REFERENCES PKTABLEFORARRAY, ftest2 int ); + + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1], 1); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2], 2); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[3], 3); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1,2,3], 4); + + -- Insert invalid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[4], 5); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1,2,5], 6); + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + + -- Repeat a similar test using FLOAT keys coerced from INTEGER + CREATE TABLE PKTABLEFORARRAY ( ptest1 float PRIMARY KEY, ptest2 text ); + -- FAILS because equality operator are incompatible + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY, ftest2 int ); + + -- Cleanup + DROP TABLE PKTABLEFORARRAY; + + -- Composite primary keys + CREATE TABLE PKTABLEFORARRAY ( id1 CHAR(1), id2 CHAR(1), ptest2 text, PRIMARY KEY (id1, id2) ); + + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES ('A', 'A', 'Test A'); + INSERT INTO PKTABLEFORARRAY VALUES ('A', 'B', 'Test B'); + INSERT INTO PKTABLEFORARRAY VALUES ('B', 'C', 'Test B'); + + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( fid1 CHAR(1), fid2 CHAR(1)[], ftest2 text, FOREIGN KEY (fid1, EACH fid2) REFERENCES PKTABLEFORARRAY); + + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES ('A', ARRAY['A','B'], '1'); + INSERT INTO FKTABLEFORARRAY VALUES ('B', ARRAY['C'], '2'); + + -- Insert invalid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES ('A', ARRAY['A','B', 'C'], '3'); + INSERT INTO FKTABLEFORARRAY VALUES ('B', ARRAY['A'], '4'); + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + + -- Test EACH foreign keys with composite type + CREATE TYPE INVOICEID AS (year_part INTEGER, progressive_part INTEGER); + CREATE TABLE PKTABLEFORARRAY ( id INVOICEID PRIMARY KEY, ptest2 text); + + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES (ROW(2010, 99), 'Last invoice for 2010'); + INSERT INTO PKTABLEFORARRAY VALUES (ROW(2011, 1), 'First invoice for 2011'); + INSERT INTO PKTABLEFORARRAY VALUES (ROW(2011, 2), 'Second invoice for 2011'); + + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( id SERIAL PRIMARY KEY, invoice_ids INVOICEID[] EACH REFERENCES PKTABLEFORARRAY ON UPDATE SET NULL ON DELETE SET NULL, ftest2 TEXT ); + + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY(invoice_ids, ftest2) VALUES (ARRAY['(2010,99)']::INVOICEID[], 'Product A'); + INSERT INTO FKTABLEFORARRAY(invoice_ids, ftest2) VALUES (ARRAY['(2011,1)','(2011,2)']::INVOICEID[], 'Product B'); + INSERT INTO FKTABLEFORARRAY(invoice_ids, ftest2) VALUES (ARRAY['(2011,2)']::INVOICEID[], 'Product C'); + + -- Insert invalid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY(invoice_ids, ftest2) VALUES (ARRAY['(2011,99)']::INVOICEID[], 'Product A'); + INSERT INTO FKTABLEFORARRAY(invoice_ids, ftest2) VALUES (ARRAY['(2011,1)','(2010,1)']::INVOICEID[], 'Product B'); + + -- Check FKTABLE + SELECT * FROM FKTABLEFORARRAY; + + -- Delete a row from PK TABLE (ON DELETE SET NULL) + DELETE FROM PKTABLEFORARRAY WHERE id=ROW(2010,99); + + -- Check FKTABLE for removal of matched row + SELECT * FROM FKTABLEFORARRAY; + + -- Update a row from PK TABLE (ON UPDATE SET NULL) + UPDATE PKTABLEFORARRAY SET id=ROW(2011,99) WHERE id=ROW(2011,1); + + -- Check FKTABLE for update of matched row + SELECT * FROM FKTABLEFORARRAY; + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + DROP TYPE INVOICEID; + + -- Repeat a similar test for SET DEFAULT actions + CREATE TABLE PKTABLEFORARRAY ( ptest1 int PRIMARY KEY, ptest2 int ); + + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES (1, 1); + INSERT INTO PKTABLEFORARRAY VALUES (2, 2); + INSERT INTO PKTABLEFORARRAY VALUES (3, 3); + + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] NOT NULL DEFAULT ARRAY[]::int[] EACH REFERENCES PKTABLEFORARRAY ON UPDATE SET DEFAULT ON DELETE SET DEFAULT, ftest2 int ); + + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1], 1); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2], 2); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[3], 3); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2,3], 4); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1,2], 5); + + -- Check FKTABLE + SELECT * FROM FKTABLEFORARRAY; + + -- Delete a row from PK TABLE (ON DELETE SET DEFAULT) + DELETE FROM PKTABLEFORARRAY WHERE ptest1=1; + + -- Check FKTABLE for removal of matched row + SELECT * FROM FKTABLEFORARRAY; + + -- Update a row from PK TABLE (ON UPDATE SET DEFAULT) + UPDATE PKTABLEFORARRAY SET ptest1=5 WHERE ptest1=3; + + -- Check FKTABLE for update of matched row + SELECT * FROM FKTABLEFORARRAY; + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + + -- Repeat a similar test for CASCADE actions + CREATE TABLE PKTABLEFORARRAY ( ptest1 int PRIMARY KEY, ptest2 int ); + + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES (1, 1); + INSERT INTO PKTABLEFORARRAY VALUES (2, 2); + INSERT INTO PKTABLEFORARRAY VALUES (3, 3); + + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] NOT NULL DEFAULT ARRAY[]::int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE CASCADE, ftest2 int ); + + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1], 1); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2], 2); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[3], 3); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2,3], 4); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1,2], 5); + + -- Check FKTABLE + SELECT * FROM FKTABLEFORARRAY; + + -- Delete a row from PK TABLE (ON DELETE CASCADE) + DELETE FROM PKTABLEFORARRAY WHERE ptest1=1; + + -- Check FKTABLE for removal of matched row + SELECT * FROM FKTABLEFORARRAY; + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + + -- Test for EACH SET NULL actions + CREATE TABLE PKTABLEFORARRAY ( ptest1 int PRIMARY KEY, ptest2 int ); + + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES (1, 1); + INSERT INTO PKTABLEFORARRAY VALUES (2, 2); + INSERT INTO PKTABLEFORARRAY VALUES (3, 3); + + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON UPDATE EACH SET NULL ON DELETE EACH SET NULL, ftest2 int ); + + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1], 1); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2], 2); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[3], 3); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2,2,1], 4); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1,3,3], 5); + + -- Check FKTABLE + SELECT * FROM FKTABLEFORARRAY; + + -- Delete a row from PK TABLE (ON DELETE EACH SET NULL) + DELETE FROM PKTABLEFORARRAY WHERE ptest1=1; + + -- Check FKTABLE for removal of matched row + SELECT * FROM FKTABLEFORARRAY; + + -- Update a row from PK TABLE (ON UPDATE EACH SET NULL) + UPDATE PKTABLEFORARRAY SET ptest1=5 WHERE ptest1=3; + + -- Check FKTABLE for update of matched row + SELECT * FROM FKTABLEFORARRAY; + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + + -- Test for EACH CASCADE actions + CREATE TABLE PKTABLEFORARRAY ( ptest1 int PRIMARY KEY, ptest2 int ); + + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES (1, 1); + INSERT INTO PKTABLEFORARRAY VALUES (2, 2); + INSERT INTO PKTABLEFORARRAY VALUES (3, 3); + + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON UPDATE EACH CASCADE ON DELETE EACH CASCADE, ftest2 int ); + + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1], 1); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2], 2); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[3], 3); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2,2,1], 4); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1,3,3], 5); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[ARRAY[2,3],ARRAY[3,3]], 6); + + -- Check FKTABLE + SELECT * FROM FKTABLEFORARRAY; + + -- Delete a row from PK TABLE (ON DELETE EACH CASCADE) + DELETE FROM PKTABLEFORARRAY WHERE ptest1=1; + + -- Check FKTABLE for removal of matched row + SELECT * FROM FKTABLEFORARRAY; + + -- Delete a row from PK TABLE (ON DELETE EACH CASCADE fallback to ON DELETE RESTRICT) + DELETE FROM PKTABLEFORARRAY WHERE ptest1=2; + + -- Check FKTABLE for removal of matched row + SELECT * FROM FKTABLEFORARRAY; + + -- Update a row from PK TABLE (ON UPDATE EACH CASCADE) + UPDATE PKTABLEFORARRAY SET ptest1=5 WHERE ptest1=3; + + -- Check FKTABLE for update of matched row + SELECT * FROM FKTABLEFORARRAY; + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + + -- Check EACH CASCADE actions with cast + CREATE TABLE PKTABLEFORARRAY (c smallint PRIMARY KEY); + CREATE TABLE FKTABLEFORARRAY (c int[] EACH REFERENCES PKTABLEFORARRAY + ON UPDATE EACH CASCADE + ON DELETE EACH CASCADE); + INSERT INTO PKTABLEFORARRAY VALUES (1), (2); + INSERT INTO FKTABLEFORARRAY VALUES ('{1,2}'); + UPDATE PKTABLEFORARRAY SET c = 3 WHERE c = 2; + DELETE FROM PKTABLEFORARRAY WHERE c = 1; + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + + -- Check EACH SET NULL actions with cast + CREATE TABLE PKTABLEFORARRAY (c smallint PRIMARY KEY); + CREATE TABLE FKTABLEFORARRAY (c int[] EACH REFERENCES PKTABLEFORARRAY + ON UPDATE EACH SET NULL + ON DELETE EACH SET NULL); + INSERT INTO PKTABLEFORARRAY VALUES (1), (2); + INSERT INTO FKTABLEFORARRAY VALUES ('{1,2}'); + UPDATE PKTABLEFORARRAY SET c = 3 WHERE c = 2; + DELETE FROM PKTABLEFORARRAY WHERE c = 1; + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + + -- Check for an array column referencing another array column (NOT EACH FOREIGN KEY) + -- Create primary table with an array primary key + CREATE TABLE PKTABLEFORARRAY ( id INT[] PRIMARY KEY, ptest2 text); + + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( id SERIAL PRIMARY KEY, fids INT[] REFERENCES PKTABLEFORARRAY, ftest2 TEXT ); + + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES ('{1,1}', 'A'); + INSERT INTO PKTABLEFORARRAY VALUES ('{1,2}', 'B'); + + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY (fids, ftest2) VALUES ('{1,1}', 'Product A'); + INSERT INTO FKTABLEFORARRAY (fids, ftest2) VALUES ('{1,2}', 'Product B'); + + -- Insert invalid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY (fids, ftest2) VALUES ('{0,1}', 'Product C'); + INSERT INTO FKTABLEFORARRAY (fids, ftest2) VALUES ('{2,1}', 'Product D'); + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + + -- Check ARRAY actions on normal FK + -- Create primary table with an EACH primary key + CREATE TABLE PKTABLEFORARRAY ( id INT PRIMARY KEY, ptest2 text); + + -- Create an EACH foreign key without an array (ERROR) + CREATE TABLE FKTABLEFORARRAY ( id SERIAL PRIMARY KEY, ftest2 INT EACH REFERENCES PKTABLEFORARRAY ); + + -- Create the foreign table with forbidden actions (ERROR) + CREATE TABLE FKTABLEFORARRAY ( id SERIAL PRIMARY KEY, ftest2 INT[] REFERENCES PKTABLEFORARRAY ON DELETE EACH CASCADE ); + CREATE TABLE FKTABLEFORARRAY ( id SERIAL PRIMARY KEY, ftest2 INT[] REFERENCES PKTABLEFORARRAY ON UPDATE EACH CASCADE ); + CREATE TABLE FKTABLEFORARRAY ( id SERIAL PRIMARY KEY, ftest2 INT[] REFERENCES PKTABLEFORARRAY ON DELETE EACH SET NULL ); + CREATE TABLE FKTABLEFORARRAY ( id SERIAL PRIMARY KEY, ftest2 INT[] REFERENCES PKTABLEFORARRAY ON UPDATE EACH SET NULL ); + + -- Cleanup + DROP TABLE PKTABLEFORARRAY; + + + -- --------------------------------------- + -- Multi-column "EACH" foreign key tests + -- --------------------------------------- + + -- Create DIM1 table with two-column primary key + CREATE TABLE DIM1 (X INTEGER NOT NULL, Y INTEGER NOT NULL, PRIMARY KEY (X, Y)); + -- Populate DIM1 table pairs + INSERT INTO DIM1 SELECT x.t, x.t * y.t + FROM (SELECT generate_series(1, 10) AS t) x, + (SELECT generate_series(0, 10) AS t) y; + + + -- Test with TABLE declaration of an each foreign key constraint (NO ACTION) + CREATE TABLE F1 ( + x INTEGER PRIMARY KEY, y INTEGER[], + FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y) + ); + -- Insert facts + INSERT INTO F1 VALUES (1, '{0,1,2,3,4,5}'); -- OK + INSERT INTO F1 VALUES (2, '{0,2,4,6}'); -- OK + INSERT INTO F1 VALUES (3, '{0,3,6,9,0,3,6,9,0,0,0,0,9,9}'); -- OK (multiple occurrences) + INSERT INTO F1 VALUES (4, '{0,2,4}'); -- FAILS (2 is not present) + INSERT INTO F1 VALUES (4, '{0,NULL,4}'); -- OK + INSERT INTO F1 VALUES (5, '{0,NULL,5}'); -- OK + -- Try updates + UPDATE F1 SET y = '{0,2,4,6}' WHERE x = 2; -- OK + UPDATE F1 SET y = '{0,2,3,4,6}' WHERE x = 2; -- FAILS + UPDATE F1 SET x = 20, y = '{0,2,3,4,6}' WHERE x = 2; -- FAILS (20 does not exist) + UPDATE F1 SET y = '{0,4,8}' WHERE x = 4; -- OK + UPDATE F1 SET y = '{0,5,NULL,10}' WHERE x = 5; -- OK + DROP TABLE F1; + + + -- Test with FOREIGN KEY after TABLE population + CREATE TABLE F1 ( + x INTEGER PRIMARY KEY, y INTEGER[] + ); + -- Insert facts + INSERT INTO F1 VALUES (1, '{0,1,2,3,4,5}'); -- OK + INSERT INTO F1 VALUES (2, '{0,2,4,6}'); -- OK + INSERT INTO F1 VALUES (3, '{0,3,6,9,0,3,6,9,0,0,0,0,9,9}'); -- OK (multiple occurrences) + INSERT INTO F1 VALUES (4, '{0,2,4}'); -- OK + INSERT INTO F1 VALUES (5, '{0,NULL,5}'); -- OK + -- Try updates + UPDATE F1 SET y = '{0,2,4,6}' WHERE x = 2; -- OK + UPDATE F1 SET y = '{0,2,3,4,6}' WHERE x = 2; -- OK + UPDATE F1 SET x = 20, y = '{0,2,3,4,6}' WHERE x = 2; -- OK + -- Add foreign key (FAILS) + ALTER TABLE F1 ADD FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y); + DROP TABLE F1; + + -- Limitation on EACH CASCADE an EACH SET NULL on multi-column keys + CREATE TABLE F2 ( + x INTEGER PRIMARY KEY, y INTEGER[], + -- FAILS (multi-column foreign key forbids EACH CASCADE + FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y) ON DELETE EACH CASCADE ON UPDATE EACH CASCADE + ); + CREATE TABLE F2 ( + x INTEGER PRIMARY KEY, y INTEGER[], + -- FAILS (multi-column foreign key forbids EACH CASCADE + FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y) ON DELETE EACH CASCADE + ); + CREATE TABLE F2 ( + x INTEGER PRIMARY KEY, y INTEGER[], + -- FAILS (multi-column foreign key forbids EACH CASCADE + FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y) ON UPDATE EACH CASCADE + ); + CREATE TABLE F2 ( + x INTEGER PRIMARY KEY, y INTEGER[], + -- FAILS (multi-column foreign key forbids EACH SET NULL + FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y) ON DELETE EACH SET NULL ON UPDATE EACH SET NULL + ); + CREATE TABLE F2 ( + x INTEGER PRIMARY KEY, y INTEGER[], + -- FAILS (multi-column foreign key forbids EACH SET NULL + FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y) ON DELETE EACH SET NULL + ); + CREATE TABLE F2 ( + x INTEGER PRIMARY KEY, y INTEGER[], + -- FAILS (multi-column foreign key forbids EACH SET NULL + FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y) ON UPDATE EACH SET NULL + ); + + -- Test of ON DELETE CASCADE and ON UPDATE SET NULL with multi-columns + CREATE TABLE F1 ( + x INTEGER PRIMARY KEY, y INTEGER[], + FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y) ON DELETE CASCADE ON UPDATE SET NULL + ); + -- Insert dummy dimension record + INSERT INTO DIM1 VALUES (199, 199); + INSERT INTO DIM1 VALUES (199, 200); + INSERT INTO DIM1 VALUES (199, 201); + -- Insert facts + INSERT INTO F1 VALUES (1, '{0,1,2,3,4,5}'); -- OK + INSERT INTO F1 VALUES (2, '{0,2,4,6}'); -- OK + INSERT INTO F1 VALUES (3, '{0,3,6,9,0,3,6,9,0,0,0,0,9,9}'); -- OK (multiple occurrences) + INSERT INTO F1 VALUES (4, '{0,NULL,4}'); -- OK + INSERT INTO F1 VALUES (5, '{0,NULL,5}'); -- OK + INSERT INTO F1 VALUES (199, '{199, 200}'); -- OK + SELECT COUNT(*) FROM F1 WHERE x = 199; -- should return 1 + DELETE FROM DIM1 WHERE x = 199 AND y = 200; -- should remove the whole row in F1 + SELECT COUNT(*) FROM F1 WHERE x = 199; -- should return 0 + INSERT INTO F1 VALUES (199, '{199, 201}'); -- OK + SELECT COUNT(*) FROM F1 WHERE x = 199 AND y IS NULL; -- should return 0 + UPDATE DIM1 SET y = 200 WHERE x = 199 AND y = 201; -- should set the whole row to NULL in F1 + SELECT COUNT(*) FROM F1 WHERE x = 199 AND y IS NULL; -- should return 1 + DROP TABLE F1; + + + -- Test with TABLE declaration of a two-dim EACH foreign key constraint (FAILS) + CREATE TABLE F1 ( + x INTEGER[] PRIMARY KEY, y INTEGER[], + FOREIGN KEY (EACH x, EACH y) REFERENCES DIM1(x, y) + ); + + + -- Test with two-dim EACH foreign key after TABLE population + CREATE TABLE F1 ( + x INTEGER[] PRIMARY KEY, y INTEGER[] + ); + INSERT INTO F1 VALUES ('{1}', '{0,1,2,3,4,5}'); -- OK + INSERT INTO F1 VALUES ('{1,2}', '{0,2,4,6}'); -- OK + -- Add foreign key (FAILS) + ALTER TABLE F1 ADD FOREIGN KEY (EACH x, EACH y) REFERENCES DIM1(x, y); + DROP TABLE F1; + + -- Cleanup + DROP TABLE DIM1; + + -- Check for potential name conflicts (with internal integrity checks) + CREATE TABLE x1(x1 int, x2 int, PRIMARY KEY(x1,x2)); + INSERT INTO x1 VALUES + (1,4), + (1,5), + (2,4), + (2,5), + (3,6), + (3,7) + ; + CREATE TABLE x2(x1 int[], x2 int, FOREIGN KEY(EACH x1, x2) REFERENCES x1); + INSERT INTO x2 VALUES ('{1,2}',4); + INSERT INTO x2 VALUES ('{1,3}',6); -- FAILS + DROP TABLE x2; + CREATE TABLE x2(x1 int[], x2 int); + INSERT INTO x2 VALUES ('{1,2}',4); + INSERT INTO x2 VALUES ('{1,3}',6); + ALTER TABLE x2 ADD CONSTRAINT fk_const FOREIGN KEY(EACH x1, x2) REFERENCES x1; -- FAILS + DROP TABLE x2; + DROP TABLE x1; + + + -- --------------------------------------- + -- Multi-dimensional "EACH" foreign key tests + -- --------------------------------------- + + -- Create DIM1 table with two-column primary key + CREATE TABLE DIM1 (X INTEGER NOT NULL PRIMARY KEY, + CODE TEXT NOT NULL UNIQUE); + -- Populate DIM1 table pairs + INSERT INTO DIM1 SELECT t, 'DIM1-' || lpad(t::TEXT, 2, '0') + FROM (SELECT generate_series(1, 10)) x(t); + + -- Test with TABLE declaration of an each foreign key constraint (NO ACTION) + CREATE TABLE F1 ( + ID SERIAL PRIMARY KEY, + SLOTS INTEGER[3][3] EACH REFERENCES DIM1 + ); + INSERT INTO F1(SLOTS) VALUES ('{{NULL, 1, NULL}, {NULL, NULL, 3}, {NULL, NULL, 6}}'); -- OK + INSERT INTO F1(SLOTS) VALUES ('{{NULL, 1, NULL}, {NULL, NULL, 11}, {NULL, NULL, 6}}'); -- FAILS + INSERT INTO F1(SLOTS) VALUES ('{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}'); -- OK + INSERT INTO F1(SLOTS) VALUES ('{1, 2, 3, 4, 5, 6, 7, 8, 9}'); -- OK + UPDATE F1 SET SLOTS = '{{NULL, 1, NULL}, {NULL, NULL, 3}, {7, 8, 10}}' WHERE ID = 1; -- OK + UPDATE F1 SET SLOTS = '{{100, 100, 100}, {NULL, NULL, 20}, {7, 8, 10}}' WHERE ID = 1; -- FAILS + DROP TABLE F1; + + -- Test with postponed foreign key + CREATE TABLE F1 ( + ID SERIAL PRIMARY KEY, + SLOTS INTEGER[3][3] + ); + INSERT INTO F1(SLOTS) VALUES ('{{NULL, 1, NULL}, {NULL, NULL, 3}, {NULL, NULL, 6}}'); -- OK + INSERT INTO F1(SLOTS) VALUES ('{{NULL, 1, NULL}, {NULL, NULL, 11}, {NULL, NULL, 6}}'); -- OK + INSERT INTO F1(SLOTS) VALUES ('{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}'); -- OK + INSERT INTO F1(SLOTS) VALUES ('{1, 2, 3, 4, 5, 6, 7, 8, 9}'); -- OK + ALTER TABLE F1 ADD FOREIGN KEY (EACH SLOTS) REFERENCES DIM1; -- FAILS + DELETE FROM F1 WHERE ID = 2; -- REMOVE ISSUE + ALTER TABLE F1 ADD FOREIGN KEY (EACH SLOTS) REFERENCES DIM1; -- NOW OK + INSERT INTO F1(SLOTS) VALUES ('{{NULL, 1, NULL}, {NULL, NULL, 11}, {NULL, NULL, 6}}'); -- FAILS + DROP TABLE F1; + + -- Test with postponed foreign key, on delete each cascade + -- and a multi-dimensional array + CREATE TABLE F1 ( + ID SERIAL PRIMARY KEY, + SLOTS INTEGER[3][3] + ); + INSERT INTO F1(SLOTS) VALUES ('{{NULL, 1, NULL}, {NULL, NULL, 3}, {NULL, NULL, 6}}'); -- OK + INSERT INTO F1(SLOTS) VALUES ('{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}'); -- OK + INSERT INTO F1(SLOTS) VALUES ('{1, 2, 3, 4, 5, 6, 7, 8, 9}'); -- OK + ALTER TABLE F1 ADD FOREIGN KEY (EACH SLOTS) REFERENCES DIM1 ON DELETE EACH CASCADE; -- FAILS + DROP TABLE F1; + + -- Test with EACH foreign key, on delete each cascade + -- and a multi-dimensional array + CREATE TABLE F1 ( + ID SERIAL PRIMARY KEY, + SLOTS INTEGER[3][3] EACH REFERENCES DIM1 ON DELETE EACH CASCADE + ); + INSERT INTO F1(SLOTS) VALUES ('{{NULL, 1, NULL}, {NULL, NULL, 3}, {NULL, NULL, 6}}'); -- OK + INSERT INTO F1(SLOTS) VALUES ('{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}'); -- OK + INSERT INTO F1(SLOTS) VALUES ('{1, 2, 3, 4, 5, 6, 7, 8, 9}'); -- OK + -- Fails (behaves like RESTRICT) + DELETE FROM DIM1 WHERE x = 2; + DROP TABLE F1; + + -- Test with postponed foreign key, on delete each cascade + -- and a multi-dimensional array + CREATE TABLE F1 ( + ID SERIAL PRIMARY KEY, + SLOTS INTEGER[3][3] + ); + INSERT INTO F1(SLOTS) VALUES ('{{NULL, 1, NULL}, {NULL, NULL, 3}, {NULL, NULL, 6}}'); -- OK + INSERT INTO F1(SLOTS) VALUES ('{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}'); -- OK + INSERT INTO F1(SLOTS) VALUES ('{1, 2, 3, 4, 5, 6, 7, 8, 9}'); -- OK + ALTER TABLE F1 ADD FOREIGN KEY (EACH SLOTS) REFERENCES DIM1 ON DELETE EACH CASCADE; + -- Fails (behaves like RESTRICT) + DELETE FROM DIM1 WHERE x = 2; + DROP TABLE F1; + + -- Cleanup + DROP TABLE DIM1; + + -- Leave tables in the database + CREATE TABLE PKTABLEFOREACHFK ( ptest1 int PRIMARY KEY, ptest2 text ); + CREATE TABLE FKTABLEFOREACHFK ( ftest1 int[] EACH REFERENCES PKTABLEFOREACHFK, ftest2 int );