I wrote:
> change_varattnos_of_a_node(), which is used to adjust expressions
> referencing a parent relation or LIKE source relation to refer to the
> child relation, ignores whole-row Vars (those with attnum zero).
> ...
> My inclination, especially in the back branches, is to just throw error
> if we find a whole-row Var.
> ...
> There are a bunch of other things I don't like about
> change_varattnos_of_a_node, starting with the name, but those gripes are
> in the nature of cosmetics or inadequate error checking and wouldn't
> have any user-visible impact if changed.
Attached is a draft patch that adds error reports for whole-row Vars
and replaces change_varattnos_of_a_node with a more robust function.
The latter makes it kind of a large patch, but it's difficult to make
it a whole lot simpler unless we are willing to have the error messages
say only "cannot convert whole-row table reference", without any
indication of where the problem is. That seems a tad unfriendly to me.
Comments?
regards, tom lane
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 70753e33e430237a66991bac0740b8761c1b4e4e..d69809a2f866de511e7ce278b8dca9bf9db33d40 100644
*** a/src/backend/commands/tablecmds.c
--- b/src/backend/commands/tablecmds.c
***************
*** 68,73 ****
--- 68,74 ----
#include "parser/parser.h"
#include "rewrite/rewriteDefine.h"
#include "rewrite/rewriteHandler.h"
+ #include "rewrite/rewriteManip.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "storage/lock.h"
*************** static void truncate_check_rel(Relation
*** 253,259 ****
static List *MergeAttributes(List *schema, List *supers, char relpersistence,
List **supOids, List **supconstr, int *supOidCount);
static bool MergeCheckConstraint(List *constraints, char *name, Node *expr);
- static bool change_varattnos_walker(Node *node, const AttrNumber *newattno);
static void MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel);
static void MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel);
static void StoreCatalogInheritance(Oid relationId, List *supers);
--- 254,259 ----
*************** MergeAttributes(List *schema, List *supe
*** 1496,1502 ****
* parents after the first one, nor if we have dropped columns.)
*/
newattno = (AttrNumber *)
! palloc(tupleDesc->natts * sizeof(AttrNumber));
for (parent_attno = 1; parent_attno <= tupleDesc->natts;
parent_attno++)
--- 1496,1502 ----
* parents after the first one, nor if we have dropped columns.)
*/
newattno = (AttrNumber *)
! palloc0(tupleDesc->natts * sizeof(AttrNumber));
for (parent_attno = 1; parent_attno <= tupleDesc->natts;
parent_attno++)
*************** MergeAttributes(List *schema, List *supe
*** 1510,1524 ****
* Ignore dropped columns in the parent.
*/
if (attribute->attisdropped)
! {
! /*
! * change_varattnos_of_a_node asserts that this is greater
! * than zero, so if anything tries to use it, we should find
! * out.
! */
! newattno[parent_attno - 1] = 0;
! continue;
! }
/*
* Does it conflict with some previously inherited column?
--- 1510,1516 ----
* Ignore dropped columns in the parent.
*/
if (attribute->attisdropped)
! continue; /* leave newattno entry as zero */
/*
* Does it conflict with some previously inherited column?
*************** MergeAttributes(List *schema, List *supe
*** 1656,1669 ****
{
char *name = check[i].ccname;
Node *expr;
/* ignore if the constraint is non-inheritable */
if (check[i].ccnoinherit)
continue;
! /* adjust varattnos of ccbin here */
! expr = stringToNode(check[i].ccbin);
! change_varattnos_of_a_node(expr, newattno);
/* check for duplicate */
if (!MergeCheckConstraint(constraints, name, expr))
--- 1648,1677 ----
{
char *name = check[i].ccname;
Node *expr;
+ bool found_whole_row;
/* ignore if the constraint is non-inheritable */
if (check[i].ccnoinherit)
continue;
! /* Adjust Vars to match new table's column numbering */
! expr = map_variable_attnos(stringToNode(check[i].ccbin),
! 1, 0,
! newattno, tupleDesc->natts,
! &found_whole_row);
!
! /*
! * For the moment we have to reject whole-row variables.
! * We could convert them, if we knew the new table's rowtype
! * OID, but that hasn't been assigned yet.
! */
! if (found_whole_row)
! ereport(ERROR,
! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! errmsg("cannot convert whole-row table reference"),
! errdetail("Constraint \"%s\" contains a whole-row reference to table \"%s\".",
! name,
! RelationGetRelationName(relation))));
/* check for duplicate */
if (!MergeCheckConstraint(constraints, name, expr))
*************** MergeCheckConstraint(List *constraints,
*** 1867,1967 ****
/*
- * Replace varattno values in an expression tree according to the given
- * map array, that is, varattno N is replaced by newattno[N-1]. It is
- * caller's responsibility to ensure that the array is long enough to
- * define values for all user varattnos present in the tree. System column
- * attnos remain unchanged.
- *
- * Note that the passed node tree is modified in-place!
- */
- void
- change_varattnos_of_a_node(Node *node, const AttrNumber *newattno)
- {
- /* no setup needed, so away we go */
- (void) change_varattnos_walker(node, newattno);
- }
-
- static bool
- change_varattnos_walker(Node *node, const AttrNumber *newattno)
- {
- if (node == NULL)
- return false;
- if (IsA(node, Var))
- {
- Var *var = (Var *) node;
-
- if (var->varlevelsup == 0 && var->varno == 1 &&
- var->varattno > 0)
- {
- /*
- * ??? the following may be a problem when the node is multiply
- * referenced though stringToNode() doesn't create such a node
- * currently.
- */
- Assert(newattno[var->varattno - 1] > 0);
- var->varattno = var->varoattno = newattno[var->varattno - 1];
- }
- return false;
- }
- return expression_tree_walker(node, change_varattnos_walker,
- (void *) newattno);
- }
-
- /*
- * Generate a map for change_varattnos_of_a_node from old and new TupleDesc's,
- * matching according to column name.
- */
- AttrNumber *
- varattnos_map(TupleDesc olddesc, TupleDesc newdesc)
- {
- AttrNumber *attmap;
- int i,
- j;
-
- attmap = (AttrNumber *) palloc0(sizeof(AttrNumber) * olddesc->natts);
- for (i = 1; i <= olddesc->natts; i++)
- {
- if (olddesc->attrs[i - 1]->attisdropped)
- continue; /* leave the entry as zero */
-
- for (j = 1; j <= newdesc->natts; j++)
- {
- if (strcmp(NameStr(olddesc->attrs[i - 1]->attname),
- NameStr(newdesc->attrs[j - 1]->attname)) == 0)
- {
- attmap[i - 1] = j;
- break;
- }
- }
- }
- return attmap;
- }
-
- /*
- * Generate a map for change_varattnos_of_a_node from a TupleDesc and a list
- * of ColumnDefs
- */
- AttrNumber *
- varattnos_map_schema(TupleDesc old, List *schema)
- {
- AttrNumber *attmap;
- int i;
-
- attmap = (AttrNumber *) palloc0(sizeof(AttrNumber) * old->natts);
- for (i = 1; i <= old->natts; i++)
- {
- if (old->attrs[i - 1]->attisdropped)
- continue; /* leave the entry as zero */
-
- attmap[i - 1] = findAttrByName(NameStr(old->attrs[i - 1]->attname),
- schema);
- }
- return attmap;
- }
-
-
- /*
* StoreCatalogInheritance
* Updates the system catalogs with proper inheritance information.
*
--- 1875,1880 ----
diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c
index adc0d5b775e9754941d880722ea2fb7f4bf704fd..55a74eeaf448b825a091e50d8717082c89a71c5f 100644
*** a/src/backend/parser/parse_utilcmd.c
--- b/src/backend/parser/parse_utilcmd.c
*************** static void transformOfType(CreateStmtCo
*** 108,114 ****
TypeName *ofTypename);
static char *chooseIndexName(const RangeVar *relation, IndexStmt *index_stmt);
static IndexStmt *generateClonedIndexStmt(CreateStmtContext *cxt,
! Relation parent_index, AttrNumber *attmap);
static List *get_collation(Oid collation, Oid actual_datatype);
static List *get_opclass(Oid opclass, Oid actual_datatype);
static void transformIndexConstraints(CreateStmtContext *cxt);
--- 108,115 ----
TypeName *ofTypename);
static char *chooseIndexName(const RangeVar *relation, IndexStmt *index_stmt);
static IndexStmt *generateClonedIndexStmt(CreateStmtContext *cxt,
! Relation source_idx,
! const AttrNumber *attmap, int attmap_length);
static List *get_collation(Oid collation, Oid actual_datatype);
static List *get_opclass(Oid opclass, Oid actual_datatype);
static void transformIndexConstraints(CreateStmtContext *cxt);
*************** transformTableLikeClause(CreateStmtConte
*** 634,639 ****
--- 635,641 ----
Relation relation;
TupleDesc tupleDesc;
TupleConstr *constr;
+ AttrNumber *attmap;
AclResult aclresult;
char *comment;
ParseCallbackState pcbstate;
*************** transformTableLikeClause(CreateStmtConte
*** 677,682 ****
--- 679,691 ----
constr = tupleDesc->constr;
/*
+ * Initialize column number map for map_variable_attnos(). We need this
+ * since dropped columns in the source table aren't copied, so the new
+ * table can have different column numbers.
+ */
+ attmap = (AttrNumber *) palloc0(sizeof(AttrNumber) * tupleDesc->natts);
+
+ /*
* Insert the copied attributes into the cxt for the new table definition.
*/
for (parent_attno = 1; parent_attno <= tupleDesc->natts;
*************** transformTableLikeClause(CreateStmtConte
*** 687,693 ****
ColumnDef *def;
/*
! * Ignore dropped columns in the parent.
*/
if (attribute->attisdropped)
continue;
--- 696,702 ----
ColumnDef *def;
/*
! * Ignore dropped columns in the parent. attmap entry is left zero.
*/
if (attribute->attisdropped)
continue;
*************** transformTableLikeClause(CreateStmtConte
*** 718,723 ****
--- 727,734 ----
*/
cxt->columns = lappend(cxt->columns, def);
+ attmap[parent_attno - 1] = list_length(cxt->columns);
+
/*
* Copy default, if present and the default has been requested
*/
*************** transformTableLikeClause(CreateStmtConte
*** 776,797 ****
/*
* Copy CHECK constraints if requested, being careful to adjust attribute
! * numbers
*/
if ((table_like_clause->options & CREATE_TABLE_LIKE_CONSTRAINTS) &&
tupleDesc->constr)
{
- AttrNumber *attmap = varattnos_map_schema(tupleDesc, cxt->columns);
int ccnum;
for (ccnum = 0; ccnum < tupleDesc->constr->num_check; ccnum++)
{
char *ccname = tupleDesc->constr->check[ccnum].ccname;
char *ccbin = tupleDesc->constr->check[ccnum].ccbin;
- Node *ccbin_node = stringToNode(ccbin);
Constraint *n = makeNode(Constraint);
! change_varattnos_of_a_node(ccbin_node, attmap);
n->contype = CONSTR_CHECK;
n->location = -1;
--- 787,825 ----
/*
* Copy CHECK constraints if requested, being careful to adjust attribute
! * numbers so they match the child.
*/
if ((table_like_clause->options & CREATE_TABLE_LIKE_CONSTRAINTS) &&
tupleDesc->constr)
{
int ccnum;
for (ccnum = 0; ccnum < tupleDesc->constr->num_check; ccnum++)
{
char *ccname = tupleDesc->constr->check[ccnum].ccname;
char *ccbin = tupleDesc->constr->check[ccnum].ccbin;
Constraint *n = makeNode(Constraint);
+ Node *ccbin_node;
+ bool found_whole_row;
! ccbin_node = map_variable_attnos(stringToNode(ccbin),
! 1, 0,
! attmap, tupleDesc->natts,
! &found_whole_row);
!
! /*
! * We reject whole-row variables because the whole point of LIKE
! * is that the new table's rowtype might later diverge from the
! * parent's. So, while translation might be possible right now,
! * it wouldn't be possible to guarantee it would work in future.
! */
! if (found_whole_row)
! ereport(ERROR,
! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! errmsg("cannot convert whole-row table reference"),
! errdetail("Constraint \"%s\" contains a whole-row reference to table \"%s\".",
! ccname,
! RelationGetRelationName(relation))));
n->contype = CONSTR_CHECK;
n->location = -1;
*************** transformTableLikeClause(CreateStmtConte
*** 827,833 ****
if ((table_like_clause->options & CREATE_TABLE_LIKE_INDEXES) &&
relation->rd_rel->relhasindex)
{
- AttrNumber *attmap = varattnos_map_schema(tupleDesc, cxt->columns);
List *parent_indexes;
ListCell *l;
--- 855,860 ----
*************** transformTableLikeClause(CreateStmtConte
*** 842,848 ****
parent_index = index_open(parent_index_oid, AccessShareLock);
/* Build CREATE INDEX statement to recreate the parent_index */
! index_stmt = generateClonedIndexStmt(cxt, parent_index, attmap);
/* Copy comment on index */
if (table_like_clause->options & CREATE_TABLE_LIKE_COMMENTS)
--- 869,876 ----
parent_index = index_open(parent_index_oid, AccessShareLock);
/* Build CREATE INDEX statement to recreate the parent_index */
! index_stmt = generateClonedIndexStmt(cxt, parent_index,
! attmap, tupleDesc->natts);
/* Copy comment on index */
if (table_like_clause->options & CREATE_TABLE_LIKE_COMMENTS)
*************** chooseIndexName(const RangeVar *relation
*** 961,967 ****
*/
static IndexStmt *
generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
! AttrNumber *attmap)
{
Oid source_relid = RelationGetRelid(source_idx);
Form_pg_attribute *attrs = RelationGetDescr(source_idx)->attrs;
--- 989,995 ----
*/
static IndexStmt *
generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
! const AttrNumber *attmap, int attmap_length)
{
Oid source_relid = RelationGetRelid(source_idx);
Form_pg_attribute *attrs = RelationGetDescr(source_idx)->attrs;
*************** generateClonedIndexStmt(CreateStmtContex
*** 1149,1162 ****
{
/* Expressional index */
Node *indexkey;
if (indexpr_item == NULL)
elog(ERROR, "too few entries in indexprs list");
indexkey = (Node *) lfirst(indexpr_item);
indexpr_item = lnext(indexpr_item);
! /* OK to modify indexkey since we are working on a private copy */
! change_varattnos_of_a_node(indexkey, attmap);
iparam->name = NULL;
iparam->expr = indexkey;
--- 1177,1202 ----
{
/* Expressional index */
Node *indexkey;
+ bool found_whole_row;
if (indexpr_item == NULL)
elog(ERROR, "too few entries in indexprs list");
indexkey = (Node *) lfirst(indexpr_item);
indexpr_item = lnext(indexpr_item);
! /* Adjust Vars to match new table's column numbering */
! indexkey = map_variable_attnos(indexkey,
! 1, 0,
! attmap, attmap_length,
! &found_whole_row);
!
! /* As in transformTableLikeClause, reject whole-row variables */
! if (found_whole_row)
! ereport(ERROR,
! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! errmsg("cannot convert whole-row table reference"),
! errdetail("Index \"%s\" contains a whole-row table reference.",
! RelationGetRelationName(source_idx))));
iparam->name = NULL;
iparam->expr = indexkey;
*************** generateClonedIndexStmt(CreateStmtContex
*** 1213,1224 ****
if (!isnull)
{
char *pred_str;
/* Convert text string to node tree */
pred_str = TextDatumGetCString(datum);
! index->whereClause = (Node *) stringToNode(pred_str);
! /* Adjust attribute numbers */
! change_varattnos_of_a_node(index->whereClause, attmap);
}
/* Clean up */
--- 1253,1280 ----
if (!isnull)
{
char *pred_str;
+ Node *pred_tree;
+ bool found_whole_row;
/* Convert text string to node tree */
pred_str = TextDatumGetCString(datum);
! pred_tree = (Node *) stringToNode(pred_str);
!
! /* Adjust Vars to match new table's column numbering */
! pred_tree = map_variable_attnos(pred_tree,
! 1, 0,
! attmap, attmap_length,
! &found_whole_row);
!
! /* As in transformTableLikeClause, reject whole-row variables */
! if (found_whole_row)
! ereport(ERROR,
! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
! errmsg("cannot convert whole-row table reference"),
! errdetail("Index \"%s\" contains a whole-row table reference.",
! RelationGetRelationName(source_idx))));
!
! index->whereClause = pred_tree;
}
/* Clean up */
diff --git a/src/backend/rewrite/rewriteManip.c b/src/backend/rewrite/rewriteManip.c
index ec6b5bf5a924a8655227be1eeaffeefa88927611..9c778efd1c39849726c775aab55e3bdf3d97487f 100644
*** a/src/backend/rewrite/rewriteManip.c
--- b/src/backend/rewrite/rewriteManip.c
*************** replace_rte_variables_mutator(Node *node
*** 1218,1223 ****
--- 1218,1336 ----
/*
+ * map_variable_attnos() finds all user-column Vars in an expression tree
+ * that reference a particular RTE, and adjusts their varattnos according
+ * to the given mapping array (varattno n is replaced by attno_map[n-1]).
+ * Vars for system columns are not modified.
+ *
+ * A zero in the mapping array represents a dropped column, which should not
+ * appear in the expression.
+ *
+ * If the expression tree contains a whole-row Var for the target RTE,
+ * the Var is not changed but *found_whole_row is returned as TRUE.
+ * For most callers this is an error condition, but we leave it to the caller
+ * to report the error so that useful context can be provided. (In some
+ * usages it would be appropriate to modify the Var's vartype and insert a
+ * ConvertRowtypeExpr node to map back to the original vartype. We might
+ * someday extend this function's API to support that. For now, the only
+ * concession to that future need is that this function is a tree mutator
+ * not just a walker.)
+ *
+ * This could be built using replace_rte_variables and a callback function,
+ * but since we don't ever need to insert sublinks, replace_rte_variables is
+ * overly complicated.
+ */
+
+ typedef struct
+ {
+ int target_varno; /* RTE index to search for */
+ int sublevels_up; /* (current) nesting depth */
+ const AttrNumber *attno_map; /* map array for user attnos */
+ int map_length; /* number of entries in attno_map[] */
+ bool *found_whole_row; /* output flag */
+ } map_variable_attnos_context;
+
+ static Node *
+ map_variable_attnos_mutator(Node *node,
+ map_variable_attnos_context *context)
+ {
+ if (node == NULL)
+ return NULL;
+ if (IsA(node, Var))
+ {
+ Var *var = (Var *) node;
+
+ if (var->varno == context->target_varno &&
+ var->varlevelsup == context->sublevels_up)
+ {
+ /* Found a matching variable, make the substitution */
+ Var *newvar = (Var *) palloc(sizeof(Var));
+ int attno = var->varattno;
+
+ *newvar = *var;
+ if (attno > 0)
+ {
+ /* user-defined column, replace attno */
+ if (attno > context->map_length ||
+ context->attno_map[attno - 1] == 0)
+ elog(ERROR, "unexpected varattno %d in expression to be mapped",
+ attno);
+ newvar->varattno = newvar->varoattno = context->attno_map[attno - 1];
+ }
+ else if (attno == 0)
+ {
+ /* whole-row variable, warn caller */
+ *(context->found_whole_row) = true;
+ }
+ return (Node *) newvar;
+ }
+ /* otherwise fall through to copy the var normally */
+ }
+ else if (IsA(node, Query))
+ {
+ /* Recurse into RTE subquery or not-yet-planned sublink subquery */
+ Query *newnode;
+
+ context->sublevels_up++;
+ newnode = query_tree_mutator((Query *) node,
+ map_variable_attnos_mutator,
+ (void *) context,
+ 0);
+ context->sublevels_up--;
+ return (Node *) newnode;
+ }
+ return expression_tree_mutator(node, map_variable_attnos_mutator,
+ (void *) context);
+ }
+
+ Node *
+ map_variable_attnos(Node *node,
+ int target_varno, int sublevels_up,
+ const AttrNumber *attno_map, int map_length,
+ bool *found_whole_row)
+ {
+ map_variable_attnos_context context;
+
+ context.target_varno = target_varno;
+ context.sublevels_up = sublevels_up;
+ context.attno_map = attno_map;
+ context.map_length = map_length;
+ context.found_whole_row = found_whole_row;
+
+ *found_whole_row = false;
+
+ /*
+ * Must be prepared to start with a Query or a bare expression tree; if
+ * it's a Query, we don't want to increment sublevels_up.
+ */
+ return query_or_expression_tree_mutator(node,
+ map_variable_attnos_mutator,
+ (void *) &context,
+ 0);
+ }
+
+
+ /*
* ResolveNew - replace Vars with corresponding items from a targetlist
*
* Vars matching target_varno and sublevels_up are replaced by the
diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h
index 9ceb086f681420be255ae8790988fb074b6495ff..15d4713cec16ff9e2c4366deedf6aca807516cf5 100644
*** a/src/include/commands/tablecmds.h
--- b/src/include/commands/tablecmds.h
*************** extern void find_composite_type_dependen
*** 61,70 ****
extern void check_of_type(HeapTuple typetuple);
- extern AttrNumber *varattnos_map(TupleDesc olddesc, TupleDesc newdesc);
- extern AttrNumber *varattnos_map_schema(TupleDesc old, List *schema);
- extern void change_varattnos_of_a_node(Node *node, const AttrNumber *newattno);
-
extern void register_on_commit_action(Oid relid, OnCommitAction action);
extern void remove_on_commit_action(Oid relid);
--- 61,66 ----
diff --git a/src/include/rewrite/rewriteManip.h b/src/include/rewrite/rewriteManip.h
index 7226187849232caf9a6c1dc30f2334c7e2656283..6f57b37b81443ba1a5b7a450b2cb005b99831187 100644
*** a/src/include/rewrite/rewriteManip.h
--- b/src/include/rewrite/rewriteManip.h
*************** extern Node *replace_rte_variables(Node
*** 65,70 ****
--- 65,75 ----
extern Node *replace_rte_variables_mutator(Node *node,
replace_rte_variables_context *context);
+ extern Node *map_variable_attnos(Node *node,
+ int target_varno, int sublevels_up,
+ const AttrNumber *attno_map, int map_length,
+ bool *found_whole_row);
+
extern Node *ResolveNew(Node *node, int target_varno, int sublevels_up,
RangeTblEntry *target_rte,
List *targetlist, int event, int update_varno,