diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml index 7518c84..3aa9aab 100644 --- a/doc/src/sgml/ref/allfiles.sgml +++ b/doc/src/sgml/ref/allfiles.sgml @@ -119,6 +119,7 @@ Complete list of usable sgml source files in this directory. + diff --git a/doc/src/sgml/ref/merge.sgml b/doc/src/sgml/ref/merge.sgml new file mode 100644 index 0000000..7c73623 --- /dev/null +++ b/doc/src/sgml/ref/merge.sgml @@ -0,0 +1,409 @@ + + + + + MERGE + SQL - Language Statements + + + + MERGE + update, insert or delete rows of a table based upon source data + + + + MERGE + + + + +MERGE INTO table [ [ AS ] alias ] +USING source-query +ON join_condition +[when_clause [...]] + +where when_clause is + +{ WHEN MATCHED [ AND condition ] THEN { merge_update | DELETE | DO NOTHING | RAISE ERROR} + WHEN NOT MATCHED [ AND condition ] THEN { merge_insert | DO NOTHING | RAISE ERROR} } + +where merge_update is + +UPDATE SET { column = { expression | DEFAULT } | + ( column [, ...] ) = ( { expression | DEFAULT } [, ...] ) } [, ...] + +and merge_insert is + +INSERT [( column [, ...] )] { VALUES ( { expression | DEFAULT } [, ...] ) | DEFAULT VALUES } + + + + + Description + + + MERGE performs at most one action on each row from + the target table, driven by the rows from the source query. This + provides a way to specify a single SQL statement that can conditionally + UPDATE or INSERT rows, a task + that would otherwise require multiple procedural language statements. + + + + First, the MERGE command performs a left outer join + from source query to target table, producing zero or more merged rows. For + each merged row, WHEN clauses are evaluated in the + specified order until one of them is activated. The corresponding action + is then applied and processing continues for the next row. + + + + MERGE actions have the same effect as + regular UPDATE, INSERT, or + DELETE commands of the same names, though the syntax + is slightly different. + + + + If no WHEN clause activates then an implicit action of + RAISE ERROR is performed for that row. If that + implicit action is not desirable an explicit action of + DO NOTHING may be specified instead. + + + + MERGE will only affect rows only in the specified table. + + + + There is no RETURNING clause with MERGE. + + + + There is no MERGE privilege. + You must have the UPDATE privilege on the table + if you specify an update action, the INSERT privilege if + you specify an insert action and/or the DELETE privilege + if you wish to delete. You will also require the + SELECT privilege to any table whose values are read + in the expressions or + condition. + + + + + Parameters + + + + table + + + The name (optionally schema-qualified) of the table to merge into. + + + + + + alias + + + A substitute name for the target table. When an alias is + provided, it completely hides the actual name of the table. For + example, given MERGE foo AS f, the remainder of the + MERGE statement must refer to this table as + f not foo. + + + + + + source-query + + + A query (SELECT statement or VALUES + statement) that supplies the rows to be merged into the target table. + Refer to the + statement or + statement for a description of the syntax. + + + + + + join_condition + + + join_condition is + an expression resulting in a value of type + boolean (similar to a WHERE + clause) that specifies which rows in the join are considered to + match. You should ensure that the join produces at most one output + row for each row to be modified. An attempt to modify any row of the + target table more than once will result in an error. This behaviour + requires the user to take greater care in using MERGE, + though is required explicitly by the SQL Standard. + + + + + + condition + + + An expression that returns a value of type boolean. + If this expression returns true then the WHEN + clause will be activated and the corresponding action will occur for + that row. + + + + + + merge_update + + + The specification of an UPDATE action. Do not include + the table name, as you would normally do with an + command. + For example, UPDATE tab SET col = 1 is invalid. Also, + do not include a WHERE clause, since only the current + can be updated. For example, + UPDATE SET col = 1 WHERE key = 57 is invalid. + + + + + + merge_insert + + + The specification of an INSERT action. Do not include + the table name, as you would normally do with an + command. + For example, INSERT INTO tab VALUES (1, 50) is invalid. + + + + + + column + + + The name of a column in table. + The column name can be qualified with a subfield name or array + subscript, if needed. Do not include the table's name in the + specification of a target column — for example, + UPDATE SET tab.col = 1 is invalid. + + + + + + expression + + + An expression to assign to the column. The expression can use the + old values of this and other columns in the table. + + + + + + DEFAULT + + + Set the column to its default value (which will be NULL if no + specific default expression has been assigned to it). + + + + + + + + + Outputs + + + On successful completion, a MERGE command returns a command + tag of the form + +MERGE total-count + + The total-count is the number + of rows changed (either updated, inserted or deleted). + If total-count is 0, no rows + were changed (this is not considered an error). + + + + The number of rows updated, inserted or deleted is not available as part + of the command tag. An optional NOTIFY message can be generated to + present this information, if desired. + +NOTIFY: 34 rows processed: 11 updated, 5 deleted, 15 inserted, 3 default inserts, 0 no action + + + + + + + Notes + + + What essentially happens is that the target table is left outer-joined to + the tables mentioned in the source-query, and + each output row of the join may then activate at most one when-clause. + The row will be matched only once per statement, so the status of + MATCHED or NOT MATCHED cannot change once testing + of WHEN clauses has begun. MERGE will not + invoke Rules. + + + + The following steps take place during the execution of + MERGE. + + + + Perform any BEFORE STATEMENT triggers for actions specified, whether or + not they actually occur. + + + + + Perform left outer join from source to target table. Then for each row: + + + + Evaluate whether each row is MATCHED or NOT MATCHED. + + + + + Test each WHEN condition in the order specified until one activates. + Identify the action and its event type. + + + + + Perform any BEFORE ROW triggers that fire for the action's event type. + + + + + Apply the action specified. + + + + + Perform any AFTER ROW triggers that fire for the action's event type. + + + + + + + + Perform any AFTER STATEMENT triggers for actions specified, whether or + not they actually occur. + + + + In summary, statement triggers for an event type (say, INSERT) will + be fired whenever we specify an action of that kind. Row-level + triggers will fire only for event type activated. + So a MERGE might fire statement triggers for both + UPDATE and INSERT, even though only + UPDATE row triggers were fired. + + + + + + Examples + + + Attempt to insert a new stock item along with the quantity of stock. If + the item already exists, instead update the stock count of the existing + item. + +MERGE INTO wines w +USING (VALUES('Chateau Lafite 2003', '24')) v +ON v.column1 = w.winename +WHEN NOT MATCHED THEN + INSERT VALUES(v.column1, v.column2) +WHEN MATCHED THEN + UPDATE SET stock = stock + v.column2; + + + + + Perform maintenance on CustomerAccounts based upon new Transactions. + The following statement will fail if any accounts have had more than + one transaction + + +MERGE CustomerAccount CA + +USING (SELECT CustomerId, TransactionValue, + FROM Transactions + WHERE TransactionId > 35345678) AS T + +ON T.CustomerId = CA.CustomerId + +WHEN MATCHED THEN + UPDATE SET Balance = Balance - TransactionValue + +WHEN NOT MATCHED THEN + INSERT (CustomerId, Balance) + VALUES (T.CustomerId, T.TransactionValue) +; + + + so the right way to do this is to pre-aggregate the data + + +MERGE CustomerAccount CA + +USING (SELECT CustomerId, Sum(TransactionValue) As TransactionSum + FROM Transactions + WHERE TransactionId > 35345678 + GROUP BY CustomerId) AS T + +ON T.CustomerId = CA.CustomerId + +WHEN MATCHED THEN + UPDATE SET Balance = Balance - TransactionSum + +WHEN NOT MATCHED THEN + INSERT (CustomerId, Balance) + VALUES (T.CustomerId, T.TransactionSum) +; + + + + + + + Compatibility + + + This command conforms to the SQL standard, except + that the DELETE and DO NOTHING actions + are PostgreSQL extensions. + + + + According to the standard, the column-list syntax for an UPDATE + action should allow a list of columns to be assigned from a single + row-valued expression. + This is not currently implemented — the source must be a list + of independent expressions. + + + diff --git a/doc/src/sgml/ref/update.sgml b/doc/src/sgml/ref/update.sgml index 5268794..677390c 100644 --- a/doc/src/sgml/ref/update.sgml +++ b/doc/src/sgml/ref/update.sgml @@ -327,6 +327,9 @@ UPDATE wines SET stock = stock + 24 WHERE winename = 'Chateau Lafite 2003'; -- continue with other operations, and eventually COMMIT; + + This operation can be executed in a single statement using + . diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml index c33d883..5068235 100644 --- a/doc/src/sgml/reference.sgml +++ b/doc/src/sgml/reference.sgml @@ -147,6 +147,7 @@ &listen; &load; &lock; + &merge; &move; ¬ify; &prepare; diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index fcb20d6..f84e9dd 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -76,6 +76,8 @@ static void show_sort_keys(SortState *sortstate, List *ancestors, static void show_sort_info(SortState *sortstate, ExplainState *es); static void show_hash_info(HashState *hashstate, ExplainState *es); static const char *explain_get_index_name(Oid indexId); +static void ExplainMergeActions(ModifyTableState *mt_planstate, + List *ancestors, ExplainState *es); static void ExplainScanTarget(Scan *plan, ExplainState *es); static void ExplainMemberNodes(List *plans, PlanState **planstates, List *ancestors, ExplainState *es); @@ -639,6 +641,9 @@ ExplainNode(PlanState *planstate, List *ancestors, case CMD_DELETE: pname = operation = "Delete"; break; + case CMD_MERGE: + pname = operation = "Merge"; + break; default: pname = "???"; break; @@ -1194,6 +1199,11 @@ ExplainNode(PlanState *planstate, List *ancestors, if (innerPlanState(planstate)) ExplainNode(innerPlanState(planstate), ancestors, "Inner", NULL, es); + + if (IsA(plan, ModifyTable) && + ((ModifyTable *)plan)->operation == CMD_MERGE) + ExplainMergeActions((ModifyTableState *) planstate, + ancestors, es); /* special child plans */ switch (nodeTag(plan)) @@ -1504,6 +1514,89 @@ explain_get_index_name(Oid indexId) return result; } +static void +ExplainMergeActions(ModifyTableState *mt_planstate, List *ancestors, + ExplainState *es) +{ + ListCell *l; + int actno = 1; + StringInfo buf = makeStringInfo(); + StringInfo acttitle = makeStringInfo(); + MergeActionSet *actset; + + if (mt_planstate->operation != CMD_MERGE || + mt_planstate->mt_mergeActPstates == NIL) + return; + + actset = mt_planstate->mt_mergeActPstates[0]; + + foreach(l, actset->actions) + { + ModifyTableState *mt_state = (ModifyTableState *) lfirst(l); + MergeActionState *act_pstate = (MergeActionState *) mt_state->mt_plans[0]; + MergeAction *act_plan = (MergeAction *) act_pstate->ps.plan; + + /*prepare the title*/ + resetStringInfo(acttitle); + appendStringInfo(acttitle, "Action %d", actno); + + /*prepare the string for printing*/ + resetStringInfo(buf); + switch(act_pstate->operation) + { + case CMD_INSERT: + appendStringInfoString(buf, "Insert When "); + break; + case CMD_UPDATE: + appendStringInfoString(buf, "Update When "); + break; + case CMD_DELETE: + appendStringInfoString(buf, "Delete When "); + break; + case CMD_DONOTHING: + appendStringInfoString(buf, "Do Nothing When "); + break; + default: + elog(ERROR, "unknown merge action"); + } + + if (act_plan->matched) + appendStringInfoString(buf, "Matched "); + else + appendStringInfoString(buf, "Not Mactched "); + + if (act_plan->flattenedqual) + appendStringInfoString(buf, "And "); + + /*print the action type*/ + ExplainPropertyText(acttitle->data, buf->data, es); + + /*print the action qual*/ + show_qual(act_plan->flattenedqual, "Qual", + &act_pstate->ps, ancestors, true, es); + + /*print the target list of action*/ + if (es->verbose && + (act_plan->operation == CMD_INSERT || + act_plan->operation == CMD_UPDATE)) + { + List *orignialtlist; + + orignialtlist = act_plan->plan.targetlist; + act_plan->plan.targetlist = act_plan->flattenedtlist; + show_plan_tlist((PlanState *) act_pstate, ancestors, es); + act_plan->plan.targetlist = orignialtlist; + } + + if (act_pstate->ps.subPlan) + ExplainSubPlans(act_pstate->ps.subPlan, ancestors, "SubPlan", es); + + actno++; + } + + ExplainPropertyText("MainPlan", "", es); +} + /* * Show the target of a Scan node */ diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c index e3230d4..890fa82 100644 --- a/src/backend/commands/trigger.c +++ b/src/backend/commands/trigger.c @@ -2342,6 +2342,87 @@ ExecASTruncateTriggers(EState *estate, ResultRelInfo *relinfo) false, NULL, NULL, NIL, NULL); } +void +ExecBSMergeTriggers(ModifyTableState *mt_state) +{ + ListCell *l; + MergeActionSet *actset; + bool doUpdateTriggers = false; + bool doInsertTriggers = false; + bool doDeleteTriggers = false; + + /* Scan the actions to see what kind of statements there is */ + actset = mt_state->mt_mergeActPstates[0]; + foreach(l, actset->actions) + { + ModifyTableState *actmtstate; + MergeActionState *actPstate; + MergeAction *actplan; + + actmtstate = (ModifyTableState *) lfirst(l); + actPstate = (MergeActionState *) actmtstate->mt_plans[0]; + actplan = (MergeAction *) actPstate->ps.plan; + + if (actplan->operation == CMD_UPDATE) + doUpdateTriggers = true; + else if (actplan->operation == CMD_INSERT) + doInsertTriggers = true; + else if (actplan->operation == CMD_DELETE) + doDeleteTriggers = true; + } + + /* And fire the triggers */ + if (doUpdateTriggers) + ExecBSUpdateTriggers(mt_state->ps.state, + mt_state->ps.state->es_result_relations); + if (doInsertTriggers) + ExecBSInsertTriggers(mt_state->ps.state, + mt_state->ps.state->es_result_relations); + if (doDeleteTriggers) + ExecBSDeleteTriggers(mt_state->ps.state, + mt_state->ps.state->es_result_relations); +} + +void +ExecASMergeTriggers(ModifyTableState *mt_state) +{ + ListCell *l; + MergeActionSet *actset; + bool doUpdateTriggers = false; + bool doInsertTriggers = false; + bool doDeleteTriggers = false; + + /* Scan the actions to see what kind of statements there is */ + actset = mt_state->mt_mergeActPstates[0]; + foreach(l, actset->actions) + { + ModifyTableState *actmtstate; + MergeActionState *actPstate; + MergeAction *actplan; + + actmtstate = (ModifyTableState *)lfirst(l); + actPstate = (MergeActionState *)actmtstate->mt_plans[0]; + actplan = (MergeAction *)actPstate->ps.plan; + + if(actplan->operation == CMD_UPDATE) + doUpdateTriggers = true; + else if(actplan->operation == CMD_INSERT) + doInsertTriggers = true; + else if(actplan->operation == CMD_DELETE) + doDeleteTriggers = true; + } + + /* And fire the triggers */ + if (doUpdateTriggers) + ExecASUpdateTriggers(mt_state->ps.state, + mt_state->ps.state->es_result_relations); + if (doInsertTriggers) + ExecASInsertTriggers(mt_state->ps.state, + mt_state->ps.state->es_result_relations); + if (doDeleteTriggers) + ExecASDeleteTriggers(mt_state->ps.state, + mt_state->ps.state->es_result_relations); +} static HeapTuple GetTupleForTrigger(EState *estate, diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index b34a154..2b7ceb1 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -170,6 +170,7 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) case CMD_INSERT: case CMD_DELETE: case CMD_UPDATE: + case CMD_MERGE: estate->es_output_cid = GetCurrentCommandId(true); break; diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index f4cc7d9..ff691c7 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -153,6 +153,11 @@ ExecInitNode(Plan *node, EState *estate, int eflags) estate, eflags); break; + case T_MergeAction: + result = (PlanState *) ExecInitMergeAction((MergeAction *) node, + estate, eflags); + break; + case T_Append: result = (PlanState *) ExecInitAppend((Append *) node, estate, eflags); diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 562c419..bede096 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -582,6 +582,103 @@ lreplace:; return NULL; } +static TupleTableSlot * +ExecMerge(ItemPointer tupleid, + TupleTableSlot *slot, + TupleTableSlot *planSlot, + MergeActionSet *actset, + EState *estate) +{ + + TupleTableSlot *actslot = NULL; + ListCell *each; + + /* + * Try the merge actions one by one until we have a match. + */ + foreach(each, actset->actions) + { + ModifyTableState *mt_pstate; + MergeActionState *action_pstate; + ExprContext *econtext; + bool matched; + + mt_pstate = (ModifyTableState *) lfirst(each); + Assert(IsA(mt_pstate, ModifyTableState)); + + /* + * mt_pstate is supposed to have only ONE mt_plans, + * which is a MergeActionState + */ + action_pstate = (MergeActionState *) mt_pstate->mt_plans[0]; + matched = ((MergeAction *)action_pstate->ps.plan)->matched; + + /* + * If tupleid == NULL, it is a NOT MATCHED case, + * else, it is a MATCHED case, + */ + if ((tupleid == NULL && matched) || + (tupleid != NULL && !matched)) + continue; + + /* Setup the expression context. */ + econtext = action_pstate->ps.ps_ExprContext; + + /* + * Check that additional quals match, if any. + */ + if (action_pstate->ps.qual) + { + ResetExprContext(econtext); + + econtext->ecxt_scantuple = slot; + econtext->ecxt_outertuple = planSlot; + + if (!ExecQual(action_pstate->ps.qual, econtext, false)) + continue; + } + + /* Ok, we have a match. Perform the action */ + + /* First project any RETURNING result tuple slot, if needed */ + if (action_pstate->operation == CMD_INSERT || + action_pstate->operation == CMD_UPDATE) + actslot = ExecProcessReturning(action_pstate->ps.ps_ProjInfo, + slot, planSlot); + + switch (action_pstate->operation) + { + case CMD_INSERT: + return ExecInsert(actslot, planSlot, estate); + + case CMD_UPDATE: + return ExecUpdate(tupleid, + actslot, + planSlot, + &mt_pstate->mt_epqstate, + estate); + + case CMD_DELETE: + return ExecDelete(tupleid, + planSlot, + &mt_pstate->mt_epqstate, + estate); + + case CMD_DONOTHING: + return NULL; + + default: + elog(ERROR, "unknown merge action type for execute"); + break; + } + } + + /* + * No matching action found. Perform the default action, which is + * DO NOTHING. + */ + return NULL; +} /* * Process BEFORE EACH STATEMENT triggers @@ -603,6 +700,9 @@ fireBSTriggers(ModifyTableState *node) ExecBSDeleteTriggers(node->ps.state, node->ps.state->es_result_relations); break; + case CMD_MERGE: + ExecBSMergeTriggers(node); + break; default: elog(ERROR, "unknown operation"); break; @@ -629,6 +729,9 @@ fireASTriggers(ModifyTableState *node) ExecASDeleteTriggers(node->ps.state, node->ps.state->es_result_relations); break; + case CMD_MERGE: + ExecASMergeTriggers(node); + break; default: elog(ERROR, "unknown operation"); break; @@ -654,6 +757,7 @@ ExecModifyTable(ModifyTableState *node) TupleTableSlot *planSlot; ItemPointer tupleid = NULL; ItemPointerData tuple_ctid; + MergeActionSet *mergeActSet = NULL; /* * On first call, fire BEFORE STATEMENT triggers before proceeding. @@ -675,6 +779,8 @@ ExecModifyTable(ModifyTableState *node) /* Preload local variables */ subplanstate = node->mt_plans[node->mt_whichplan]; junkfilter = estate->es_result_relation_info->ri_junkFilter; + if(node->mt_mergeActPstates) + mergeActSet = node->mt_mergeActPstates[node->mt_whichplan]; /* * Fetch rows from subplan(s), and execute the required table modification @@ -701,6 +807,8 @@ ExecModifyTable(ModifyTableState *node) estate->es_result_relation_info++; subplanstate = node->mt_plans[node->mt_whichplan]; junkfilter = estate->es_result_relation_info->ri_junkFilter; + if(node->mt_mergeActPstates) + mergeActSet = node->mt_mergeActPstates[node->mt_whichplan]; EvalPlanQualSetPlan(&node->mt_epqstate, subplanstate->plan); continue; } @@ -716,20 +824,32 @@ ExecModifyTable(ModifyTableState *node) /* * extract the 'ctid' junk attribute. */ - if (operation == CMD_UPDATE || operation == CMD_DELETE) + if (operation == CMD_UPDATE || operation == CMD_DELETE || operation == CMD_MERGE) { Datum datum; bool isNull; datum = ExecGetJunkAttribute(slot, junkfilter->jf_junkAttNo, &isNull); - /* shouldn't ever get a null result... */ + if (isNull) - elog(ERROR, "ctid is NULL"); + { + /* + * Shouldn't ever get a null result for UPDATE or DELETE. + * MERGE gets a null ctid in "NOT MATCHED" case + */ + if (operation != CMD_MERGE) + elog(ERROR, "ctid is NULL"); + else + tupleid = NULL; + } + else + { - tupleid = (ItemPointer) DatumGetPointer(datum); - tuple_ctid = *tupleid; /* be sure we don't free the ctid!! */ - tupleid = &tuple_ctid; + tupleid = (ItemPointer) DatumGetPointer(datum); + tuple_ctid = *tupleid; /* be sure we don't free the ctid!! */ + tupleid = &tuple_ctid; + } } /* @@ -752,6 +872,10 @@ ExecModifyTable(ModifyTableState *node) slot = ExecDelete(tupleid, planSlot, &node->mt_epqstate, estate); break; + case CMD_MERGE: + slot = ExecMerge(tupleid, slot, planSlot, + mergeActSet, estate); + break; default: elog(ERROR, "unknown operation"); break; @@ -779,6 +903,69 @@ ExecModifyTable(ModifyTableState *node) return NULL; } +/* + * When init a merge plan, we also need init its action plans. + * These action plans are "MergeAction" plans. + * + * This function mainly handles the tlist and qual in the plan. + * The returning result is a "MergeActionState". + */ +MergeActionState * +ExecInitMergeAction(MergeAction *node, EState *estate, int eflags) +{ + MergeActionState *result; + + /* + * do nothing when we get to the end of a leaf on tree. + */ + if (node == NULL) + return NULL; + + /* + * create state structure + */ + result = makeNode(MergeActionState); + result->operation = node->operation; + result->ps.plan = (Plan *)node; + result->ps.state = estate; + + /* + * tuple table initialization + */ + ExecInitResultTupleSlot(estate, &result->ps); + + /* + * initialize tuple type + */ + ExecAssignResultTypeFromTL(&result->ps); + + /* + * create expression context for node + */ + ExecAssignExprContext(estate, &result->ps); + + /* + * initialize child expressions + */ + result->ps.targetlist = (List *) + ExecInitExpr((Expr *) node->plan.targetlist, &result->ps); + + result->ps.qual = (List *) + ExecInitExpr((Expr *) node->plan.qual, &result->ps); + + /* + * init the projection information + */ + ExecAssignProjectionInfo(&result->ps, NULL); + + /* + * XXX: do we need a check for the plan output here ? + * (by calling the ExecCheckPlanOutput() function + */ + + return result; +} + /* ---------------------------------------------------------------- * ExecInitModifyTable * ---------------------------------------------------------------- @@ -794,6 +981,7 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) Plan *subplan; ListCell *l; int i; + bool isMergeAction = false; /* check for unsupported flags */ Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); @@ -834,6 +1022,16 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) foreach(l, node->plans) { subplan = (Plan *) lfirst(l); + + /* + * test if this subplan node is a MergeAction. + * We need this information for setting the junkfilter. + * junkfilter is necessary for an ordinary UPDATE/DELETE plan, + * but not for an UPDATE/DELETE merge action + */ + if (IsA(subplan, MergeAction)) + isMergeAction = true; + mtstate->mt_plans[i] = ExecInitNode(subplan, estate, eflags); estate->es_result_relation_info++; i++; @@ -963,7 +1161,11 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) break; case CMD_UPDATE: case CMD_DELETE: - junk_filter_needed = true; + case CMD_MERGE: + if(!isMergeAction) + junk_filter_needed = true; + break; + case CMD_DONOTHING: break; default: elog(ERROR, "unknown operation"); @@ -986,9 +1188,10 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) resultRelInfo->ri_RelationDesc->rd_att->tdhasoid, ExecInitExtraTupleSlot(estate)); - if (operation == CMD_UPDATE || operation == CMD_DELETE) + if (operation == CMD_UPDATE || operation == CMD_DELETE || + operation == CMD_MERGE) { - /* For UPDATE/DELETE, find the ctid junk attr now */ + /* For UPDATE/DELETE/MERGE, find the ctid junk attr now */ j->jf_junkAttNo = ExecFindJunkAttribute(j, "ctid"); if (!AttributeNumberIsValid(j->jf_junkAttNo)) elog(ERROR, "could not find junk ctid column"); @@ -1014,6 +1217,36 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) if (estate->es_trig_tuple_slot == NULL) estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate); + /* + * for the merge actions, we need to do similar things as above. + *Each action in each action set should be init by the ExecInitNode. + *The returned planstates will take the place of the original plan nodes. + *These new action set will be put in the mt_mergeActPstates array. + */ + if(node->operation == CMD_MERGE) + { + /*we have one action set for each result relation (main plan)*/ + mtstate->mt_mergeActPstates = + (MergeActionSet **) palloc0(sizeof(MergeActionSet*) * nplans); + + estate->es_result_relation_info = estate->es_result_relations; + i = 0; + foreach(l, node->mergeActPlan) + { + ListCell *e; + MergeActionSet *actset = (MergeActionSet *) lfirst(l); + + foreach(e, actset->actions) + { + lfirst(e) = ExecInitNode((Plan *)lfirst(e), estate, 0); + } + mtstate->mt_mergeActPstates[i] = actset; + estate->es_result_relation_info++; + i++; + } + estate->es_result_relation_info = NULL; + } + return mtstate; } diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 87de0c5..3ff2dce 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -177,6 +177,42 @@ _copyModifyTable(ModifyTable *from) COPY_NODE_FIELD(returningLists); COPY_NODE_FIELD(rowMarks); COPY_SCALAR_FIELD(epqParam); + COPY_NODE_FIELD(mergeActPlan); + + return newnode; +} + +/* + * _copyMergeAction + */ +static MergeAction * +_copyMergeAction(MergeAction *from) +{ + MergeAction *newnode = makeNode(MergeAction); + + /* + * copy node superclass fields + */ + CopyPlanFields((Plan *) from, (Plan *) newnode); + + COPY_SCALAR_FIELD(operation); + COPY_SCALAR_FIELD(matched); + COPY_NODE_FIELD(flattenedqual); + COPY_NODE_FIELD(flattenedtlist); + + return newnode; +} + +/* + * _copyMergeActionSet + */ +static MergeActionSet * +_copyMergeActionSet(MergeActionSet *from) +{ + MergeActionSet *newnode = makeNode(MergeActionSet); + + COPY_SCALAR_FIELD(result_relation); + COPY_NODE_FIELD(actions); return newnode; } @@ -2274,6 +2310,10 @@ _copyQuery(Query *from) COPY_NODE_FIELD(rowMarks); COPY_NODE_FIELD(setOperations); COPY_NODE_FIELD(constraintDeps); + COPY_SCALAR_FIELD(isMergeAction); + COPY_SCALAR_FIELD(matched); + COPY_SCALAR_FIELD(sourceAttrNo); + COPY_NODE_FIELD(mergeActQry); return newnode; } @@ -2283,6 +2323,7 @@ _copyInsertStmt(InsertStmt *from) { InsertStmt *newnode = makeNode(InsertStmt); + COPY_SCALAR_FIELD(isMergeAction); COPY_NODE_FIELD(relation); COPY_NODE_FIELD(cols); COPY_NODE_FIELD(selectStmt); @@ -2296,6 +2337,7 @@ _copyDeleteStmt(DeleteStmt *from) { DeleteStmt *newnode = makeNode(DeleteStmt); + COPY_SCALAR_FIELD(isMergeAction); COPY_NODE_FIELD(relation); COPY_NODE_FIELD(usingClause); COPY_NODE_FIELD(whereClause); @@ -2309,6 +2351,7 @@ _copyUpdateStmt(UpdateStmt *from) { UpdateStmt *newnode = makeNode(UpdateStmt); + COPY_SCALAR_FIELD(isMergeAction); COPY_NODE_FIELD(relation); COPY_NODE_FIELD(targetList); COPY_NODE_FIELD(whereClause); @@ -2345,6 +2388,47 @@ _copySelectStmt(SelectStmt *from) return newnode; } +static MergeStmt * +_copyMergeStmt(MergeStmt *from) +{ + MergeStmt *newnode = makeNode(MergeStmt); + + COPY_NODE_FIELD(relation); + COPY_NODE_FIELD(source); + COPY_NODE_FIELD(matchCondition); + COPY_NODE_FIELD(actions); + + return newnode; +} + +static MergeConditionAction * +_copyMergeConditionAction(MergeConditionAction *from) +{ + MergeConditionAction *newnode = makeNode(MergeConditionAction); + + COPY_SCALAR_FIELD(match); + COPY_NODE_FIELD(condition); + COPY_NODE_FIELD(action); + + return newnode; +} + +static MergeDoNothing * +_copyMergeDoNothing(MergeDoNothing *from) +{ + MergeDoNothing *newnode = makeNode(MergeDoNothing); + + return newnode; +} + +static MergeError* +_copyMergeError(MergeError *from) +{ + MergeError *newnode = makeNode(MergeError); + + return newnode; +} + static SetOperationStmt * _copySetOperationStmt(SetOperationStmt *from) { @@ -3607,6 +3691,12 @@ copyObject(void *from) case T_ModifyTable: retval = _copyModifyTable(from); break; + case T_MergeAction: + retval = _copyMergeAction(from); + break; + case T_MergeActionSet: + retval = _copyMergeActionSet(from); + break; case T_Append: retval = _copyAppend(from); break; @@ -3907,6 +3997,18 @@ copyObject(void *from) case T_SelectStmt: retval = _copySelectStmt(from); break; + case T_MergeStmt: + retval = _copyMergeStmt(from); + break; + case T_MergeConditionAction: + retval = _copyMergeConditionAction(from); + break; + case T_MergeDoNothing: + retval = _copyMergeDoNothing(from); + break; + case T_MergeError: + retval = _copyMergeError(from); + break; case T_SetOperationStmt: retval = _copySetOperationStmt(from); break; diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 6f6e9e4..4d0d7c3 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -878,6 +878,10 @@ _equalQuery(Query *a, Query *b) COMPARE_NODE_FIELD(rowMarks); COMPARE_NODE_FIELD(setOperations); COMPARE_NODE_FIELD(constraintDeps); + COMPARE_SCALAR_FIELD(isMergeAction); + COMPARE_SCALAR_FIELD(matched); + COMPARE_SCALAR_FIELD(sourceAttrNo); + COMPARE_NODE_FIELD(mergeActQry); return true; } @@ -885,6 +889,7 @@ _equalQuery(Query *a, Query *b) static bool _equalInsertStmt(InsertStmt *a, InsertStmt *b) { + COMPARE_SCALAR_FIELD(isMergeAction); COMPARE_NODE_FIELD(relation); COMPARE_NODE_FIELD(cols); COMPARE_NODE_FIELD(selectStmt); @@ -896,6 +901,7 @@ _equalInsertStmt(InsertStmt *a, InsertStmt *b) static bool _equalDeleteStmt(DeleteStmt *a, DeleteStmt *b) { + COMPARE_SCALAR_FIELD(isMergeAction); COMPARE_NODE_FIELD(relation); COMPARE_NODE_FIELD(usingClause); COMPARE_NODE_FIELD(whereClause); @@ -907,6 +913,7 @@ _equalDeleteStmt(DeleteStmt *a, DeleteStmt *b) static bool _equalUpdateStmt(UpdateStmt *a, UpdateStmt *b) { + COMPARE_SCALAR_FIELD(isMergeAction); COMPARE_NODE_FIELD(relation); COMPARE_NODE_FIELD(targetList); COMPARE_NODE_FIELD(whereClause); diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index d0ba12c..65b5347 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -332,6 +332,7 @@ _outModifyTable(StringInfo str, ModifyTable *node) WRITE_NODE_FIELD(returningLists); WRITE_NODE_FIELD(rowMarks); WRITE_INT_FIELD(epqParam); + WRITE_NODE_FIELD(mergeActPlan); } static void @@ -2022,6 +2023,53 @@ _outQuery(StringInfo str, Query *node) WRITE_NODE_FIELD(rowMarks); WRITE_NODE_FIELD(setOperations); WRITE_NODE_FIELD(constraintDeps); + WRITE_BOOL_FIELD(isMergeAction); + WRITE_BOOL_FIELD(matched); + WRITE_INT_FIELD(sourceAttrNo); + WRITE_NODE_FIELD(mergeActQry); +} + +static void +_outMergeConditionAction(StringInfo str, MergeConditionAction *node) +{ + WRITE_NODE_TYPE("MERGECONDITIONACTION"); + + WRITE_BOOL_FIELD(match); + WRITE_NODE_FIELD(condition); + WRITE_NODE_FIELD(action); +} + +static void +_outMergeStmt(StringInfo str, MergeStmt *node) +{ + WRITE_NODE_TYPE("MERGESTMT"); + + WRITE_NODE_FIELD(relation); + WRITE_NODE_FIELD(source); + WRITE_NODE_FIELD(matchCondition); + WRITE_NODE_FIELD(actions); +} + +static void +_outMergeAction(StringInfo str, MergeAction *node) +{ + WRITE_NODE_TYPE("MERGEACTION"); + + _outPlanInfo(str, (Plan *)node); + + WRITE_ENUM_FIELD(operation, CmdType); + WRITE_BOOL_FIELD(matched); + WRITE_NODE_FIELD(flattenedqual); + WRITE_NODE_FIELD(flattenedtlist); +} + +static void +_outMergeActionSet(StringInfo str, MergeActionSet *node) +{ + WRITE_NODE_TYPE("MERGEACTIONSET"); + + WRITE_INT_FIELD(result_relation); + WRITE_NODE_FIELD(actions); } static void @@ -2907,6 +2955,18 @@ _outNode(StringInfo str, void *obj) case T_XmlSerialize: _outXmlSerialize(str, obj); break; + case T_MergeAction: + _outMergeAction(str, obj); + break; + case T_MergeActionSet: + _outMergeActionSet(str, obj); + break; + case T_MergeStmt: + _outMergeStmt(str, obj); + break; + case T_MergeConditionAction: + _outMergeConditionAction(str,obj); + break; default: diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 0a2edcb..0aeab17 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -219,6 +219,10 @@ _readQuery(void) READ_NODE_FIELD(rowMarks); READ_NODE_FIELD(setOperations); READ_NODE_FIELD(constraintDeps); + READ_BOOL_FIELD(isMergeAction); + READ_BOOL_FIELD(matched); + READ_INT_FIELD(sourceAttrNo); + READ_NODE_FIELD(mergeActQry); READ_DONE(); } diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 2115cc0..f5df243 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -3919,7 +3919,7 @@ make_result(PlannerInfo *root, ModifyTable * make_modifytable(CmdType operation, List *resultRelations, List *subplans, List *returningLists, - List *rowMarks, int epqParam) + List *rowMarks, List *mergeActPlans, int epqParam) { ModifyTable *node = makeNode(ModifyTable); Plan *plan = &node->plan; @@ -3972,6 +3972,8 @@ make_modifytable(CmdType operation, List *resultRelations, node->returningLists = returningLists; node->rowMarks = rowMarks; node->epqParam = epqParam; + if (operation == CMD_MERGE) + node->mergeActPlan = mergeActPlans; return node; } diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 3950ab4..c627e6e 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -102,7 +102,8 @@ static void get_column_info_for_window(PlannerInfo *root, WindowClause *wc, int *ordNumCols, AttrNumber **ordColIdx, Oid **ordOperators); - +static ModifyTable *merge_action_planner(PlannerInfo *root, Plan *mainPlan); +static MergeActionSet *merge_action_set_planner(PlannerInfo *root, Plan *mainPlan); /***************************************************************************** * @@ -452,6 +453,27 @@ subquery_planner(PlannerGlobal *glob, Query *parse, } /* + * for MERGE command, we need to preprocess + * the expressions for merge actions too. + */ + if(parse->commandType == CMD_MERGE) + { + ListCell *e; + + foreach(e, parse->mergeActQry) + { + Query *actqry = (Query *)lfirst(e); + + actqry->targetList = (List *) preprocess_expression(root, + (Node *) actqry->targetList, + EXPRKIND_TARGET); + actqry->jointree->quals = preprocess_expression(root, + (Node *) actqry->jointree->quals, + EXPRKIND_QUAL); + } + } + + /* * In some cases we may want to transfer a HAVING clause into WHERE. We * cannot do so if the HAVING clause contains aggregates (obviously) or * volatile functions (since a HAVING clause is supposed to be executed @@ -528,6 +550,7 @@ subquery_planner(PlannerGlobal *glob, Query *parse, { List *returningLists; List *rowMarks; + List *mergeAction; /* * Deal with the RETURNING clause if any. It's convenient to pass @@ -559,12 +582,19 @@ subquery_planner(PlannerGlobal *glob, Query *parse, else rowMarks = root->rowMarks; + /*if here is a MERGE command, we need to plan the actions too*/ + if(parse->commandType == CMD_MERGE) + mergeAction = list_make1(merge_action_set_planner(root, plan)); + else + mergeAction = NIL; + plan = (Plan *) make_modifytable(parse->commandType, copyObject(root->resultRelations), list_make1(plan), returningLists, rowMarks, - SS_assign_special_param(root)); + mergeAction, + SS_assign_special_param(root)); } } @@ -585,6 +615,130 @@ subquery_planner(PlannerGlobal *glob, Query *parse, } /* +to generate the merge action set for the main plan. +Call this function after the grouping_planner() + +Only works for MERGE command +*/ +static MergeActionSet * +merge_action_set_planner(PlannerInfo *root, Plan *mainPlan) +{ + MergeActionSet *result; + Query *mainQry = root->parse; + ListCell *l; + PlannerInfo subroot; + + /*for non-merge command, no need to plan the merge actions*/ + if(mainQry->commandType != CMD_MERGE || + mainQry->mergeActQry == NIL) + return NULL; + + /*do a copy of the root info*/ + memcpy(&subroot, root, sizeof(PlannerInfo)); + + /*create the result node*/ + result = makeNode(MergeActionSet); + result->result_relation = mainQry->resultRelation; + result->actions = NIL; + + /*plan the actions one by one*/ + foreach(l, mainQry->mergeActQry) + { + ModifyTable *actplan; + + /*put the action query into the subroot*/ + subroot.parse = (Query *) lfirst(l); + actplan = merge_action_planner(&subroot, mainPlan); + result->actions = lappend(result->actions, actplan); + } + return result; +} + +/* create plan for a single merge action */ +static ModifyTable * +merge_action_planner(PlannerInfo *root, Plan *mainPlan) +{ + Query *parse = root->parse; + MergeAction *actplan; + ModifyTable *result; + + List *returningLists; + List *rowMarks; + + /* + * no having clause in a merge action + */ + Assert(parse->havingQual == NULL); + + /* + * Create the action plan node + */ + actplan = makeNode(MergeAction); + actplan->operation = parse->commandType; + actplan->matched = parse->matched; + + /* copy the cost from the top_plan */ + actplan->plan.startup_cost = mainPlan->startup_cost; + actplan->plan.total_cost = mainPlan->total_cost; + actplan->plan.plan_rows = mainPlan->plan_rows; + actplan->plan.plan_width = mainPlan->plan_width; + + /* + * Here, the quals expressions are flattened, which is accepted + * by deparse functions in EXPLAIN. + * But, these expressions will be processed by push_up_vars + * latterly and become not flat again. + * So, we need to keep a copy of current quals for explaining. + */ + actplan->flattenedqual = (List *) copyObject(parse->jointree->quals); + actplan->plan.qual = (List *)parse->jointree->quals; + + /* prepare the target list */ + if (parse->targetList) + { + actplan->plan.targetlist = preprocess_targetlist(root, + parse->targetList); + /*the target list should also be copied for EXPLAIN*/ + actplan->flattenedtlist = (List *) copyObject(actplan->plan.targetlist); + } + + /* + *In general situation, all the vars in target list and quals are flattened. + *But, we want them to point to the attributes of the top join plan, not to + *the subplans. So push them up again here. + */ + push_up_merge_action_vars(actplan, parse); + + if (parse->returningList) + { + List *rlist; + + Assert(parse->resultRelation); + rlist = set_returning_clause_references(root->glob, + parse->returningList, + &actplan->plan, + parse->resultRelation); + returningLists = list_make1(rlist); + } + else + returningLists = NIL; + + if (parse->rowMarks) + rowMarks = NIL; + else + rowMarks = root->rowMarks; + + result = make_modifytable(parse->commandType, + list_make1_int(parse->resultRelation), + list_make1(actplan), + returningLists, + rowMarks, + NIL, + SS_assign_special_param(root)); + return result; +} + +/* * preprocess_expression * Do subquery_planner's preprocessing work for an expression, * which can be a targetlist, a WHERE clause (including JOIN/ON @@ -729,6 +883,7 @@ inheritance_planner(PlannerInfo *root) List *returningLists = NIL; List *rtable = NIL; List *rowMarks; + List *mergeActSets = NIL; List *tlist; PlannerInfo subroot; ListCell *l; @@ -790,6 +945,66 @@ inheritance_planner(PlannerInfo *root) appinfo->child_relid); returningLists = lappend(returningLists, rlist); } + + /* + *For a merge command, we need to generate a set of action plans corresponding + *to current result relations. + *The adjust_appendrel_attrs() will not process the merge action list of + *the main query. And, We need to do this here for a MERGE command. + *In fact, no need to do a full query tree mutator on the action queries. + *only adjust the target list and quals. + */ + if (parse->commandType == CMD_MERGE) + { + ListCell *e; + MergeActionSet *maset; + + /* + *the parse in subroot now is a copy of the main query of current result relation + *Here we need to generate a copy of the action queries and shift their target table + *to current result relation + */ + subroot.parse->mergeActQry = NIL; + + foreach(e, parse->mergeActQry) + { + Query *actqry = (Query *)lfirst(e); + Query *newactqry = makeNode(Query); + + /*copy most of the common fields from original query*/ + *newactqry = *actqry; + + /*reset the result relation to current child table*/ + newactqry->resultRelation = subroot.parse->resultRelation; + + /*make the range table to be consistent with current main query*/ + newactqry->rtable = subroot.parse->rtable; + + /*adjust the target list*/ + newactqry->targetList = (List *) adjust_appendrel_attrs( + (Node *) actqry->targetList, + appinfo); + newactqry->targetList = adjust_inherited_tlist(newactqry->targetList, + appinfo); + /*and qual*/ + newactqry->jointree = makeNode(FromExpr); + newactqry->jointree->fromlist = subroot.parse->jointree->fromlist; + newactqry->jointree->quals = adjust_appendrel_attrs( + actqry->jointree->quals, + appinfo); + + /*put this new action query in to the action list of current main query*/ + subroot.parse->mergeActQry = lappend(subroot.parse->mergeActQry, newactqry); + } + + /* + * now we have a complete query (main query + action queries) that has been + *shifted to current result relation. Plan these actions here. + */ + maset = merge_action_set_planner(&subroot, subplan); + Assert(maset != NULL); + mergeActSets = lappend(mergeActSets, maset); + } } root->resultRelations = resultRelations; @@ -842,6 +1057,7 @@ inheritance_planner(PlannerInfo *root) subplans, returningLists, rowMarks, + mergeActSets, SS_assign_special_param(root)); } diff --git a/src/backend/optimizer/prep/preptlist.c b/src/backend/optimizer/prep/preptlist.c index 59d3518..6faa234 100644 --- a/src/backend/optimizer/prep/preptlist.c +++ b/src/backend/optimizer/prep/preptlist.c @@ -78,25 +78,64 @@ preprocess_targetlist(PlannerInfo *root, List *tlist) result_relation, range_table); /* - * for "update" and "delete" queries, add ctid of the result relation into + for MERGE command, we also need to expend the target list of the main query. + Note that, the target list of main query is the combination of attrs + from Source table and target table. We only want to expend the part + of target table. + + We do this in an aggressive way: + 1. Truncate the old target list, keep only the entries for source table + 2. expend the target list of result relation from an NIL list. + 3. Append this new list at the end of old target list. + */ + if (command_type == CMD_MERGE) + { + ListCell *l; + List *TLforResultRelatoin = NIL; + int new_resno; + + Assert(parse->sourceAttrNo > 0); + + tlist = list_truncate(tlist, parse->sourceAttrNo); + + TLforResultRelatoin = expand_targetlist(TLforResultRelatoin, command_type, + result_relation, range_table); + new_resno = parse->sourceAttrNo + 1; + foreach(l, TLforResultRelatoin) + { + TargetEntry *te = (TargetEntry *)lfirst(l); + te->resno = new_resno++; + } + + tlist = list_concat(tlist, TLforResultRelatoin); + } + + /* + * for "update" , "delete" and "merge" queries, add ctid of the result relation into * the target list so that the ctid will propagate through execution and * ExecutePlan() will be able to identify the right tuple to replace or * delete. This extra field is marked "junk" so that it is not stored * back into the tuple. + * + * BUT, if the query node is a merge action, + * we don't need to expand the ctid attribute in tlist. + * The tlist of the merge top level plan already contains + * a "ctid" junk attr of the target relation. */ - if (command_type == CMD_UPDATE || command_type == CMD_DELETE) + if (!parse->isMergeAction && + (command_type == CMD_UPDATE || + command_type == CMD_DELETE || + command_type == CMD_MERGE)) { TargetEntry *tle; Var *var; var = makeVar(result_relation, SelfItemPointerAttributeNumber, TIDOID, -1, 0); - tle = makeTargetEntry((Expr *) var, list_length(tlist) + 1, pstrdup("ctid"), true); - /* * For an UPDATE, expand_targetlist already created a fresh tlist. For * DELETE, better do a listCopy so that we don't destructively modify @@ -339,6 +378,7 @@ expand_targetlist(List *tlist, int command_type, } break; case CMD_UPDATE: + case CMD_MERGE: if (!att_tup->attisdropped) { new_expr = (Node *) makeVar(result_relation, diff --git a/src/backend/optimizer/prep/prepunion.c b/src/backend/optimizer/prep/prepunion.c index 68fe92f..1690a1f 100644 --- a/src/backend/optimizer/prep/prepunion.c +++ b/src/backend/optimizer/prep/prepunion.c @@ -101,9 +101,6 @@ static Bitmapset *translate_col_privs(const Bitmapset *parent_privs, static Node *adjust_appendrel_attrs_mutator(Node *node, AppendRelInfo *context); static Relids adjust_relid_set(Relids relids, Index oldrelid, Index newrelid); -static List *adjust_inherited_tlist(List *tlist, - AppendRelInfo *context); - /* * plan_set_operations @@ -1740,8 +1737,9 @@ adjust_relid_set(Relids relids, Index oldrelid, Index newrelid) * scribble on. * * Note that this is not needed for INSERT because INSERT isn't inheritable. + * But the INSERT actions in MERGE need this function */ -static List * +List * adjust_inherited_tlist(List *tlist, AppendRelInfo *context) { bool changed_it = false; diff --git a/src/backend/optimizer/util/var.c b/src/backend/optimizer/util/var.c index 92c2208..0059c3c 100644 --- a/src/backend/optimizer/util/var.c +++ b/src/backend/optimizer/util/var.c @@ -67,6 +67,16 @@ typedef struct bool inserted_sublink; /* have we inserted a SubLink? */ } flatten_join_alias_vars_context; +typedef struct +{ + int varno_source; + int varno_target; + int varno_join; + + int offset_source; + int offset_target; +} push_up_merge_action_vars_context; + static bool pull_varnos_walker(Node *node, pull_varnos_context *context); static bool pull_varattnos_walker(Node *node, Bitmapset **varattnos); @@ -83,6 +93,8 @@ static bool pull_var_clause_walker(Node *node, static Node *flatten_join_alias_vars_mutator(Node *node, flatten_join_alias_vars_context *context); static Relids alias_relid_set(PlannerInfo *root, Relids relids); +static bool push_up_merge_action_vars_walker(Node *node, + push_up_merge_action_vars_context *context); /* @@ -677,6 +689,79 @@ pull_var_clause_walker(Node *node, pull_var_clause_context *context) (void *) context); } +/* + * When prepare for the MERGE command, we have made a + * left join between the Source table and target table as the + * main plan. + * + * In this case, the range table contains ONLY THREE range table entries: + * 1. the source table, which may be a subquery or a plain table + * 2. the entry of the target table, which is a plain table + * 3. join expression with the source table and target table as its parameters. + * + * Each merge action of the command has its own query and + * plan nodes as well. And, the vars in its target list and qual + * expressions may refers to the attribute in any one of the above 3 + * range table entries. + * + * However, since the result tuple slots of merge actions are + * projected from the returned tuple of the join, we need to + * mapping the vars of source table and target table to their + * corresponding attributes in the third range table entry. + * + * This function does the opposite of the flatten_join_alias_vars() + * function. It walks through the target list and qual of a + * MergeAction plan, changes the vars' varno and varattno to the + * corresponding position in the upper level join RTE. + */ +void +push_up_merge_action_vars(MergeAction *actplan, Query *actqry) +{ + push_up_merge_action_vars_context context; + + context.varno_source = Merge_SourceTableRTindex; + context.varno_target = actqry->resultRelation; + context.varno_join = Merge_TopJoinTableRTindex; + context.offset_source = 0; + context.offset_target = actqry->sourceAttrNo; + + push_up_merge_action_vars_walker((Node *)actplan->plan.targetlist, + &context); + push_up_merge_action_vars_walker((Node *)actplan->plan.qual, + &context); +} + +static bool +push_up_merge_action_vars_walker(Node *node, + push_up_merge_action_vars_context *context) +{ + if (node == NULL) + return false; + if (IsA(node, Var)) + { + Var *var = (Var *)node; + + if(var->varno == context->varno_source) + { + var->varno = context->varno_join; + var->varattno += context->offset_source; + return false; + } + else if(var->varno == context->varno_target) + { + var->varno = context->varno_join; + var->varattno += context->offset_target; + return false; + } + else if(var->varno == context->varno_join) + return false; + else + elog(ERROR, "the vars in merge action tlist of qual should only belongs to the source table or target table"); + } + + return expression_tree_walker(node, push_up_merge_action_vars_walker, + (void *) context); +} /* * flatten_join_alias_vars diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c index a1320f5..d94446e 100644 --- a/src/backend/parser/analyze.c +++ b/src/backend/parser/analyze.c @@ -47,6 +47,7 @@ static Query *transformDeleteStmt(ParseState *pstate, DeleteStmt *stmt); static Query *transformInsertStmt(ParseState *pstate, InsertStmt *stmt); static List *transformInsertRow(ParseState *pstate, List *exprlist, List *stmtcols, List *icolumns, List *attrnos); +static Query *transformMergeStmt(ParseState *pstate, MergeStmt *stmt); static Query *transformSelectStmt(ParseState *pstate, SelectStmt *stmt); static Query *transformValuesClause(ParseState *pstate, SelectStmt *stmt); static Query *transformSetOperationStmt(ParseState *pstate, SelectStmt *stmt); @@ -175,6 +176,10 @@ transformStmt(ParseState *pstate, Node *parseTree) result = transformUpdateStmt(pstate, (UpdateStmt *) parseTree); break; + case T_MergeStmt: + result = transformMergeStmt(pstate, (MergeStmt *)parseTree); + break; + case T_SelectStmt: { SelectStmt *n = (SelectStmt *) parseTree; @@ -245,6 +250,7 @@ analyze_requires_snapshot(Node *parseTree) case T_DeleteStmt: case T_UpdateStmt: case T_SelectStmt: + case T_MergeStmt: result = true; break; @@ -282,21 +288,27 @@ transformDeleteStmt(ParseState *pstate, DeleteStmt *stmt) qry->commandType = CMD_DELETE; - /* set up range table with just the result rel */ - qry->resultRelation = setTargetTable(pstate, stmt->relation, - interpretInhOption(stmt->relation->inhOpt), - true, - ACL_DELETE); - qry->distinctClause = NIL; /* - * The USING clause is non-standard SQL syntax, and is equivalent in - * functionality to the FROM list that can be specified for UPDATE. The - * USING keyword is used rather than FROM because FROM is already a - * keyword in the DELETE syntax. + * The input stmt could be a MergeDelete node. + * In this case, we don't need the process on range table. */ - transformFromClause(pstate, stmt->usingClause); + if (!stmt->isMergeAction) + { + /* set up range table with just the result rel */ + qry->resultRelation = setTargetTable(pstate, stmt->relation, + interpretInhOption(stmt->relation->inhOpt), + true, + ACL_DELETE); + /* + * The USING clause is non-standard SQL syntax, and is equivalent in + * functionality to the FROM list that can be specified for UPDATE. The + * USING keyword is used rather than FROM because FROM is already a + * keyword in the DELETE syntax. + */ + transformFromClause(pstate, stmt->usingClause); + } qual = transformWhereClause(pstate, stmt->whereClause, "WHERE"); @@ -347,6 +359,8 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt) * VALUES list, or general SELECT input. We special-case VALUES, both for * efficiency and so we can handle DEFAULT specifications. */ + + /* a MergeInsert statement is always a VALUES clause*/ isGeneralSelect = (selectStmt && selectStmt->valuesLists == NIL); /* @@ -382,7 +396,8 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt) * mentioned in the SELECT part. Note that the target table is not added * to the joinlist or namespace. */ - qry->resultRelation = setTargetTable(pstate, stmt->relation, + if (!stmt->isMergeAction) /* for MergeInsert, no need to do this */ + qry->resultRelation = setTargetTable(pstate, stmt->relation, false, false, ACL_INSERT); /* Validate stmt->cols list, or build default list if no list given */ @@ -1726,16 +1741,18 @@ transformUpdateStmt(ParseState *pstate, UpdateStmt *stmt) qry->commandType = CMD_UPDATE; pstate->p_is_update = true; - qry->resultRelation = setTargetTable(pstate, stmt->relation, - interpretInhOption(stmt->relation->inhOpt), - true, - ACL_UPDATE); - - /* - * the FROM clause is non-standard SQL syntax. We used to be able to do - * this with REPLACE in POSTQUEL so we keep the feature. - */ - transformFromClause(pstate, stmt->fromClause); + if (!stmt->isMergeAction) /* for MergeUpdate, no need to do this */ + { + qry->resultRelation = setTargetTable(pstate, stmt->relation, + interpretInhOption(stmt->relation->inhOpt), + true, + ACL_UPDATE); + /* + * the FROM clause is non-standard SQL syntax. We used to be able to do + * this with REPLACE in POSTQUEL so we keep the feature. + */ + transformFromClause(pstate, stmt->fromClause); + } qry->targetList = transformTargetList(pstate, stmt->targetList); @@ -2237,3 +2254,329 @@ applyLockingClause(Query *qry, Index rtindex, rc->pushedDown = pushedDown; qry->rowMarks = lappend(qry->rowMarks, rc); } + +/* + * transform an action of merge command into a query. + * No change of the pstate range table is allowed in this function. + */ +static Query * +transformMergeActions(ParseState *pstate, MergeStmt *stmt, + MergeConditionAction *condact) +{ + Query *actqry; + + /* + * First, we need to make sure that DELETE and UPDATE + * actions are only taken in MATCHED condition, + * and INSERTs are only taken when not MATCHED + */ + switch(condact->action->type) + { + case T_DeleteStmt:/*a delete action*/ + { + DeleteStmt *deleteact = (DeleteStmt *) condact->action; + Assert(deleteact->isMergeAction); + + if (!condact->match) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("The DELETE action in MERGE command is not allowed when NOT MATCHED"))); + + /*put new right code to the result relation. + This line chages the RTE in range table directly*/ + pstate->p_target_rangetblentry->requiredPerms |= ACL_DELETE; + + deleteact->relation = stmt->relation; + deleteact->usingClause = list_make1(stmt->source); + deleteact->whereClause = condact->condition; + + /*parse the action query*/ + actqry = transformStmt(pstate, (Node *) deleteact); + + if (!IsA(actqry, Query) || + actqry->commandType != CMD_DELETE || + actqry->utilityStmt != NULL) + elog(ERROR, "improper DELETE action in merge stmt"); + + actqry->isMergeAction = true; + actqry->matched = condact->match; + + return actqry; + } + break; + case T_UpdateStmt:/*an update action*/ + { + UpdateStmt *updateact = (UpdateStmt *) condact->action; + Assert(updateact->isMergeAction); + + if (!condact->match) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("The UPDATE action in MERGE command is not allowed when NOT MATCHED"))); + + pstate->p_target_rangetblentry->requiredPerms |= ACL_UPDATE; + + /*the "targetlist" of the updateact is filled in the parser */ + updateact->relation = stmt->relation; + updateact->fromClause = list_make1(stmt->source); + updateact->whereClause = condact->condition; + + /*parse the action query*/ + actqry = transformStmt(pstate, (Node *)updateact); + + if (!IsA(actqry, Query) || + actqry->commandType != CMD_UPDATE|| + actqry->utilityStmt != NULL) + elog(ERROR, "improper UPDATE action in merge stmt"); + + actqry->isMergeAction = true; + actqry->matched = condact->match; + + return actqry; + } + break; + case T_InsertStmt:/*an insert action*/ + { + InsertStmt *insertact = (InsertStmt *) condact->action; + Assert(insertact->isMergeAction); + + if(condact->match) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("The INSERT action in MERGE command is not allowed when MATCHED"))); + + pstate->p_target_rangetblentry->requiredPerms |= ACL_INSERT; + + /*the "cols" and "selectStmt" of the insertact is filled in the parser */ + insertact->relation = stmt->relation; + + /* + the merge insert action has a strange feature. + In an ordinary INSERT, the VALUES list can only + contains constants and DEFAULT. (am I right??) + But in the INSERT action of MERGE command, + the VALUES list can have expressions with + variables(attributes of the target and source tables). + Besides, in the ordinary INSERT, a VALUES list can + never be followed by a WHERE clause. + But in MERGE INSERT action, there are matching conditions. + + Thus, the output qry of this function is an INSERT + query in the style of "INSERT...VALUES...", + except that we have other range tables and a WHERE clause. + Note that it is also different from the "INSERT ... SELECT..." + query, in which the whole SELECT is a subquery. + (We don't have subquery here). + + We construct this novel query structure in order + to keep consistency with other merge action types + (DELETE, UPDATE). In this way, all the merge action + queries are in fact share the very same Range Table, + They only differs in their target lists and join trees + */ + + /*parse the action query, this will call + transformInsertStmt() which analyzes the VALUES list.*/ + actqry = transformStmt(pstate, (Node *)insertact); + + /*do the WHERE clause here, Since the + transformInsertStmt() function only analyzes + the VALUES list but not the WHERE clause*/ + actqry->jointree = makeFromExpr(pstate->p_joinlist, + transformWhereClause(pstate, + condact->condition, + "WHERE")); + if(!IsA(actqry, Query) || + actqry->commandType != CMD_INSERT|| + actqry->utilityStmt != NULL) + elog(ERROR, "improper INSERT action in merge stmt"); + + actqry->isMergeAction = true; + actqry->matched = condact->match; + + return actqry; + } + break; + case T_MergeDoNothing: + { + MergeDoNothing *nothingact; + + nothingact = (MergeDoNothing *)(condact->action); + + Assert(IsA(nothingact,MergeDoNothing)); + + actqry = makeNode(Query); + + actqry->jointree = makeFromExpr(pstate->p_joinlist, + transformWhereClause(pstate, + condact->condition, + "WHERE")); + actqry->rtable = pstate->p_rtable; + + actqry->commandType = CMD_DONOTHING; + actqry->isMergeAction = true; + actqry->matched = condact->match; + + return actqry; + } + break; + default: + elog(ERROR, "unknown MERGE action type %d", condact->action->type); + break; + } + + /*never comes here*/ + return NULL; +} + +static Query * +transformMergeStmt(ParseState *pstate, MergeStmt *stmt) +{ + Query *qry; + + ColumnRef *starRef; + ResTarget *starResTarget; + ListCell *act; + ListCell *l; + JoinExpr *joinexp; + RangeTblEntry *resultRTE = NULL; + RangeTblEntry *topjoinRTE = NULL; + RangeTblEntry *sourceRTE = NULL; + + /*firstly, create the output node structure*/ + qry = makeNode(Query); + qry->commandType = CMD_MERGE; + + /* + * What we are doing here is to create a query like + * "SELECT * FROM LEFT JOIN ON ;" + * + * Note: + * 1. we set the "match condition" as the join qualification. + * The left join will scan both the matched and non-matched tuples. + * + * 2. a normal SELECT query has no "target relation". + * But here we need to set the target relation in query, + * like the UPDATE/DELETE/INSERT queries. + * So this is a left join SELECT with a "target table" in its range table. + * + * 3. We don't have a specific ACL level for Merge, here we just use + * ACL_SELECT. + * But we will add other ACL levels when handle each merge actions. + */ + + /* + * Before transforming the FROM clause, acquire write lock on the + * target relation. We don't want to add it to the range table yet, + * so we use setTargetTableLock() instead of setTargetTable(). + */ + setTargetTableLock(pstate, stmt->relation); + + /* + * Make the join expression on source table and target table, + * as the only element in FROM list + */ + joinexp = makeNode(JoinExpr); + joinexp->jointype = JOIN_LEFT; + joinexp->isNatural = FALSE; + joinexp->larg = stmt->source; + joinexp->rarg = (Node *)stmt->relation; + joinexp->quals = stmt->matchCondition; + + /* + * transform the FROM clause. The target relation and + * source relation will be added to the range table here. + */ + transformFromClause(pstate, list_make1(joinexp)); + + /* the targetList of the main query is "*" */ + starRef = makeNode(ColumnRef); + starRef->fields = list_make1(makeNode(A_Star)); + starRef->location = 1; + + starResTarget = makeNode(ResTarget); + starResTarget->name = NULL; + starResTarget->indirection = NIL; + starResTarget->val = (Node *)starRef; + starResTarget->location = 1; + + qry->targetList = transformTargetList(pstate, list_make1(starResTarget)); + + /* we don't need a WHERE clause here. Set it null. */ + qry->jointree = makeFromExpr(pstate->p_joinlist, NULL); + + /* + *The range table of a MERGE command query has only 3 RTE. + *RTE 1 is the source table, RTE 2 is the target table, RTE 3 is the left join + *between source and target table. + */ + qry->rtable = pstate->p_rtable; + + /*Set RTE 2 as the result relation of query*/ + qry->resultRelation = 2; + resultRTE = rt_fetch(qry->resultRelation, pstate->p_rtable); + if(resultRTE->relid != pstate->p_target_relation->rd_id) + elog(ERROR, "The target relation entry should be the second one in range table"); + + resultRTE->requiredPerms = ACL_SELECT; + resultRTE->inh = interpretInhOption(stmt->relation->inhOpt); + pstate->p_target_rangetblentry = resultRTE; + + /* + *we also need to find out how many attributes are there in the source table. + *There are many ways to do this. Here, I choose to scan the join var list of the + *top join table entry. And count how many vars belong to source table. + */ + topjoinRTE = rt_fetch(Merge_TopJoinTableRTindex, pstate->p_rtable); + Assert(topjoinRTE->jointype == JOIN_LEFT); + + qry->sourceAttrNo = 0; + foreach(l, topjoinRTE->joinaliasvars) + { + Var *jv = (Var *)lfirst(l); + + if(jv->varno == Merge_SourceTableRTindex) + qry->sourceAttrNo++; + } + /*lets do a simple check here*/ + sourceRTE = rt_fetch(Merge_SourceTableRTindex, pstate->p_rtable); + Assert(qry->sourceAttrNo == list_length(sourceRTE->eref->colnames)); + + /* + * For each action, transform it to a separate query. + * The action queries share the range table with the main query. + * + * In other words, in the extra conditions of the sub actions, + * we don't allow involvement of new tables + */ + qry->mergeActQry = NIL; + + foreach(act,stmt->actions) + { + MergeConditionAction *mca = (MergeConditionAction *) lfirst(act); + Query *actqry; + + /* transform the act (and its condition) as a single query. */ + actqry = transformMergeActions(pstate, stmt, mca); + + /* + * since we don't invoke setTargetTable() in transformMergeActions(), + * we need to set actqry->resultRelation here + */ + actqry->resultRelation = qry->resultRelation; + + /*record the source attr number in each action query, for latter use*/ + actqry->sourceAttrNo = qry->sourceAttrNo; + + /* put it into the list */ + qry->mergeActQry = lappend(qry->mergeActQry, actqry); + } + + /* + * set the sublink mark at the last. + * Thus, the sublink in actions will be counted in. + */ + qry->hasSubLinks = pstate->p_hasSubLinks; + + return qry; +} diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index b38b2b1..fb792d3 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -211,6 +211,11 @@ static TypeName *TableFuncTypeName(List *columns); DeallocateStmt PrepareStmt ExecuteStmt DropOwnedStmt ReassignOwnedStmt AlterTSConfigurationStmt AlterTSDictionaryStmt + MergeStmt + +%type opt_and_condition merge_condition_action merge_action +%type opt_not +%type merge_condition_action_list %type select_no_parens select_with_parens select_clause simple_select values_clause @@ -480,7 +485,7 @@ static TypeName *TableFuncTypeName(List *columns); DEFERRABLE DEFERRED DEFINER DELETE_P DELIMITER DELIMITERS DESC DICTIONARY DISABLE_P DISCARD DISTINCT DO DOCUMENT_P DOMAIN_P DOUBLE_P DROP - EACH ELSE ENABLE_P ENCODING ENCRYPTED END_P ENUM_P ESCAPE EXCEPT + EACH ELSE ENABLE_P ENCODING ENCRYPTED END_P ENUM_P ERROR_P ESCAPE EXCEPT EXCLUDE EXCLUDING EXCLUSIVE EXECUTE EXISTS EXPLAIN EXTERNAL EXTRACT FALSE_P FAMILY FETCH FIRST_P FLOAT_P FOLLOWING FOR FORCE FOREIGN FORWARD @@ -503,7 +508,7 @@ static TypeName *TableFuncTypeName(List *columns); LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOGIN_P - MAPPING MATCH MAXVALUE MINUTE_P MINVALUE MODE MONTH_P MOVE + MAPPING MATCH MATCHED MAXVALUE MERGE MINUTE_P MINVALUE MODE MONTH_P MOVE NAME_P NAMES NATIONAL NATURAL NCHAR NEXT NO NOCREATEDB NOCREATEROLE NOCREATEUSER NOINHERIT NOLOGIN_P NONE NOSUPERUSER @@ -518,7 +523,7 @@ static TypeName *TableFuncTypeName(List *columns); QUOTE - RANGE READ REAL REASSIGN RECHECK RECURSIVE REF REFERENCES REINDEX + RAISE RANGE READ REAL REASSIGN RECHECK RECURSIVE REF REFERENCES REINDEX RELATIVE_P RELEASE RENAME REPEATABLE REPLACE REPLICA RESET RESTART RESTRICT RETURNING RETURNS REVOKE RIGHT ROLE ROLLBACK ROW ROWS RULE @@ -726,6 +731,7 @@ stmt : | ListenStmt | LoadStmt | LockStmt + | MergeStmt | NotifyStmt | PrepareStmt | ReassignOwnedStmt @@ -6988,6 +6994,7 @@ ExplainableStmt: | InsertStmt | UpdateStmt | DeleteStmt + | MergeStmt | DeclareCursorStmt | CreateAsStmt | ExecuteStmt /* by default all are $$=$1 */ @@ -7333,6 +7340,112 @@ set_target_list: /***************************************************************************** * * QUERY: + * MERGE STATEMENT + * + *****************************************************************************/ + + +MergeStmt: + MERGE INTO relation_expr_opt_alias + USING table_ref + ON a_expr + merge_condition_action_list + { + MergeStmt *m = makeNode(MergeStmt); + + m->relation = $3; + m->source = $5; + m->matchCondition = $7; + m->actions = $8; + + $$ = (Node *)m; + } + ; + +merge_condition_action_list: + merge_condition_action + { $$ = list_make1($1); } + | merge_condition_action_list merge_condition_action + { $$ = lappend($1,$2); } + ; + +merge_condition_action: + WHEN opt_not MATCHED opt_and_condition THEN merge_action + { + MergeConditionAction *m = makeNode(MergeConditionAction); + + m->match = $2; + m->condition = $4; + m->action = $6; + + $$ = (Node *)m; + } + ; + +opt_and_condition: + AND a_expr { $$ = $2; } + | /*EMPTY*/ { $$ = NULL; } + ; + +opt_not: + NOT { $$ = false; } + | /*EMPTY*/ { $$ = true; } + ; + +merge_action: + DELETE_P + { + DeleteStmt *n = makeNode(DeleteStmt); + n->isMergeAction = true; + $$ = (Node *) n; + } + | UPDATE SET set_clause_list + { + UpdateStmt *n = makeNode(UpdateStmt); + n->targetList = $3; + n->isMergeAction = true; + + $$ = (Node *) n; + } + | INSERT values_clause + { + InsertStmt *n = makeNode(InsertStmt); + n->cols = NIL; + n->selectStmt = $2; + n->isMergeAction = true; + + $$ = (Node *) n; + } + + | INSERT '(' insert_column_list ')' values_clause + { + InsertStmt *n = makeNode(InsertStmt); + n->cols = $3; + n->selectStmt = $5; + n->isMergeAction = true; + + $$ = (Node *) n; + } + | INSERT DEFAULT VALUES + { + InsertStmt *n = makeNode(InsertStmt); + n->cols = NIL; + n->selectStmt = NULL; + n->isMergeAction = true; + + $$ = (Node *) n; + } + | DO NOTHING + { + $$ = (Node *) makeNode(MergeDoNothing); + } + ; + + + +/***************************************************************************** + * + * QUERY: * CURSOR STATEMENTS * *****************************************************************************/ @@ -10954,6 +11067,7 @@ unreserved_keyword: | ENCODING | ENCRYPTED | ENUM_P + | ERROR_P | ESCAPE | EXCLUDE | EXCLUDING @@ -11007,7 +11121,9 @@ unreserved_keyword: | LOGIN_P | MAPPING | MATCH + | MATCHED | MAXVALUE + | MERGE | MINUTE_P | MINVALUE | MODE @@ -11050,6 +11166,7 @@ unreserved_keyword: | PROCEDURAL | PROCEDURE | QUOTE + | RAISE | RANGE | READ | REASSIGN diff --git a/src/backend/parser/parse_clause.c b/src/backend/parser/parse_clause.c index 16ca583..c3e8038 100644 --- a/src/backend/parser/parse_clause.c +++ b/src/backend/parser/parse_clause.c @@ -214,6 +214,25 @@ setTargetTable(ParseState *pstate, RangeVar *relation, return rtindex; } +void +setTargetTableLock(ParseState *pstate, RangeVar *relation) +{ + + /* Close old target; this could only happen for multi-action rules */ + if (pstate->p_target_relation != NULL) + heap_close(pstate->p_target_relation, NoLock); + + /* + * Open target rel and grab suitable lock (which we will hold till end of + * transaction). + * + * free_parsestate() will eventually do the corresponding heap_close(), + * but *not* release the lock. + */ + pstate->p_target_relation = parserOpenTable(pstate, relation, + RowExclusiveLock); +} + /* * Simplify InhOption (yes/no/default) into boolean yes/no. * diff --git a/src/backend/rewrite/rewriteHandler.c b/src/backend/rewrite/rewriteHandler.c index 25b44dd..63f963a 100644 --- a/src/backend/rewrite/rewriteHandler.c +++ b/src/backend/rewrite/rewriteHandler.c @@ -1836,6 +1836,41 @@ RewriteQuery(Query *parsetree, List *rewrite_events) return rewritten; } +/*if the merge action type has already been processed by rewriter*/ +#define insert_rewrite (1<<0) +#define delete_rewrite (1<<1) +#define update_rewrite (1<<2) + +/*if the merge action type is fully replace by rules.*/ +#define insert_instead (1<<3) +#define delete_instead (1<<4) +#define update_instead (1<<5) + +#define merge_action_already_rewrite(acttype, flag) \ + ((acttype == CMD_INSERT && (flag & insert_rewrite)) || \ + (acttype == CMD_UPDATE && (flag & update_rewrite)) || \ + (acttype == CMD_DELETE && (flag & delete_rewrite))) + +#define set_action_rewrite(acttype, flag) \ + if(acttype == CMD_INSERT) \ + {flag |= insert_rewrite;}\ + else if(acttype == CMD_UPDATE) \ + {flag |= update_rewrite;}\ + else if(acttype == CMD_DELETE) \ + {flag |= delete_rewrite;} + +#define merge_action_instead(acttype, flag) \ + ((acttype == CMD_INSERT && (flag & insert_instead)) || \ + (acttype == CMD_UPDATE && (flag & update_instead)) || \ + (acttype == CMD_DELETE && (flag & delete_instead))) + +#define set_action_instead(acttype, flag)\ + if(acttype == CMD_INSERT) \ + {flag |= insert_instead;}\ + else if(acttype == CMD_UPDATE) \ + {flag |= update_instead;}\ + else if(acttype == CMD_DELETE) \ + {flag |= delete_instead;} /* * QueryRewrite - @@ -1861,7 +1896,148 @@ QueryRewrite(Query *parsetree) * * Apply all non-SELECT rules possibly getting 0 or many queries */ - querylist = RewriteQuery(parsetree, NIL); + if (parsetree->commandType == CMD_MERGE) + { + /* + * for MERGE, we have a set of action queries (not subquery). + * each of these action queries should rewritten with RewriteQuery(). + */ + ListCell *l; + int flag = 0; + List *pre_qry = NIL; + List *post_qry = NIL; + + querylist = NIL; + + /*rewrite the merge action queries one by one.*/ + foreach(l, parsetree->mergeActQry) + { + List *queryList4action = NIL; + Query *actionqry; + Query *q; + + actionqry = lfirst(l); + + /* no rewriting for DO NOTHING action*/ + if (actionqry->commandType == CMD_DONOTHING) + continue; + + /* + * if this kind of actions are fully replaced by rules, + * we change it into a DO NOTHING action + */ + if (merge_action_instead(actionqry->commandType, flag)) + { + /* + * Still need to call RewriteQuery(), + * since we need the process on target list and so on. + * BUT, the returned list is discarded + */ + RewriteQuery(actionqry, NIL); + actionqry->commandType = CMD_DONOTHING; + actionqry->targetList = NIL; + continue; + } + + /* if this kind of actions are already processed by rewriter, skip it.*/ + if (merge_action_already_rewrite(actionqry->commandType, flag)) + { + RewriteQuery(actionqry, NIL); + continue; + } + + /* ok this action has not been processed before, let's do it now. */ + queryList4action = RewriteQuery(actionqry, NIL); + + /* this kind of actions has been processed, set the flag */ + set_action_rewrite(actionqry->commandType, flag); + + /* + * if the returning list is empty, this merge action + * is replaced by a do-nothing rule + */ + if (queryList4action == NIL) + { + /* set the flag for other merge actions of the same type */ + set_action_instead(actionqry->commandType, flag); + actionqry->commandType = CMD_DONOTHING; + actionqry->targetList = NIL; + continue; + } + + /* + * if the rewriter return a non-NIL list, the merge action query + * could be one element in it. + * if so, it must be the head (for INSERT action) + * or tail (for UPDATE/DELETE action). + */ + + /* test the list head */ + q = (Query *) linitial(queryList4action); + if (q->querySource == QSRC_ORIGINAL) + { + /* + * the merge action is the head, the remaining part + * of the list are the queries generated by rules + * we put them in the post_qry list. + */ + if (querylist == NIL) + querylist = list_make1(parsetree); + + queryList4action = list_delete_first(queryList4action); + post_qry = list_concat(post_qry,queryList4action); + + continue; + } + + /*test the list tail*/ + q = (Query *) llast(queryList4action); + if (q->querySource == QSRC_ORIGINAL) + { + /* + * the merge action is the tail. + * Put the rule queries in pre_qry list + */ + if (querylist == NIL) + querylist = list_make1(parsetree); + + queryList4action = list_truncate(queryList4action, + list_length(queryList4action) - 1); + + pre_qry = list_concat(pre_qry,queryList4action); + continue; + } + + /* + * here, the merge action query is not in the rewritten query list, + * which means the action is replaced by INSTEAD rule(s). + * We need to change it into do noting action. + * + * For a INSERT action, we put the rule queries in the post list + * otherwise, in the pre list + */ + if(actionqry->commandType == CMD_INSERT) + post_qry = list_concat(post_qry,queryList4action); + else + pre_qry = list_concat(pre_qry,queryList4action); + + set_action_instead(actionqry->commandType, flag); + actionqry->commandType = CMD_DONOTHING; + actionqry->targetList = NIL; + } + + /* + * finally, put the 3 lists into one. + * If all the merge actions are replaced by rules, + * the original merge query + * will not be involved in the querylist. + */ + querylist = list_concat(pre_qry,querylist); + querylist = list_concat(querylist, post_qry); + + } + else + querylist = RewriteQuery(parsetree, NIL); /* * Step 2 diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 48b77d8..12628a4 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -979,7 +979,7 @@ exec_simple_query(const char *query_string) NULL, 0); plantree_list = pg_plan_queries(querytree_list, 0, NULL); - +//printf("the plann is \n%s\n", nodeToString(plantree_list)); /* Done with the snapshot used for parsing/planning */ if (snapshot_set) PopActiveSnapshot(); diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c index 8ad4915..0dc3117 100644 --- a/src/backend/tcop/pquery.c +++ b/src/backend/tcop/pquery.c @@ -225,6 +225,10 @@ ProcessQuery(PlannedStmt *plan, snprintf(completionTag, COMPLETION_TAG_BUFSIZE, "DELETE %u", queryDesc->estate->es_processed); break; + case CMD_MERGE: + snprintf(completionTag, COMPLETION_TAG_BUFSIZE, + "MERGE %u", queryDesc->estate->es_processed); + break; default: strcpy(completionTag, "???"); break; diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 44cc401..3ac1ab0 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -125,6 +125,7 @@ CommandIsReadOnly(Node *parsetree) case CMD_UPDATE: case CMD_INSERT: case CMD_DELETE: + case CMD_MERGE: return false; default: elog(WARNING, "unrecognized commandType: %d", @@ -1406,6 +1407,10 @@ CreateCommandTag(Node *parsetree) tag = "SELECT"; break; + case T_MergeStmt: + tag = "MERGE"; + break; + /* utility statements --- same whether raw or cooked */ case T_TransactionStmt: { @@ -2243,6 +2248,7 @@ GetCommandLogLevel(Node *parsetree) case T_InsertStmt: case T_DeleteStmt: case T_UpdateStmt: + case T_MergeStmt: lev = LOGSTMT_MOD; break; diff --git a/src/include/commands/trigger.h b/src/include/commands/trigger.h index 267a08e..51d0f11 100644 --- a/src/include/commands/trigger.h +++ b/src/include/commands/trigger.h @@ -164,6 +164,8 @@ extern void ExecBSTruncateTriggers(EState *estate, ResultRelInfo *relinfo); extern void ExecASTruncateTriggers(EState *estate, ResultRelInfo *relinfo); +extern void ExecBSMergeTriggers(ModifyTableState *mt_state); +extern void ExecASMergeTriggers(ModifyTableState *mt_state); extern void AfterTriggerBeginXact(void); extern void AfterTriggerBeginQuery(void); diff --git a/src/include/executor/nodeModifyTable.h b/src/include/executor/nodeModifyTable.h index 67ba3e8..422e3ce 100644 --- a/src/include/executor/nodeModifyTable.h +++ b/src/include/executor/nodeModifyTable.h @@ -16,6 +16,7 @@ #include "nodes/execnodes.h" extern ModifyTableState *ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags); +extern MergeActionState *ExecInitMergeAction(MergeAction *node, EState *estate, int eflags); extern TupleTableSlot *ExecModifyTable(ModifyTableState *node); extern void ExecEndModifyTable(ModifyTableState *node); extern void ExecReScanModifyTable(ModifyTableState *node); diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 7442d2d..d55375c 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1032,9 +1032,23 @@ typedef struct ModifyTableState int mt_whichplan; /* which one is being executed (0..n-1) */ EPQState mt_epqstate; /* for evaluating EvalPlanQual rechecks */ bool fireBSTriggers; /* do we need to fire stmt triggers? */ + MergeActionSet **mt_mergeActPstates; /*the list of the planstate of merge command actions. + NULL if this is not a merge command. + The elements if it are still MergeActionSet nodes. + But the action list in these nodes are ModifyTableState */ } ModifyTableState; /* ---------------- + * MergeActionState information + * ---------------- + */ +typedef struct MergeActionState +{ + PlanState ps; /* its first field is NodeTag */ + CmdType operation; +} MergeActionState; + +/* ---------------- * AppendState information * * nplans how many plans are in the array diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index a5f5df5..5bb5706 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -44,6 +44,8 @@ typedef enum NodeTag T_Plan = 100, T_Result, T_ModifyTable, + T_MergeAction, + T_MergeActionSet, T_Append, T_RecursiveUnion, T_BitmapAnd, @@ -86,6 +88,7 @@ typedef enum NodeTag T_PlanState = 200, T_ResultState, T_ModifyTableState, + T_MergeActionState, T_AppendState, T_RecursiveUnionState, T_BitmapAndState, @@ -347,6 +350,10 @@ typedef enum NodeTag T_AlterUserMappingStmt, T_DropUserMappingStmt, T_AlterTableSpaceOptionsStmt, + T_MergeStmt, + T_MergeConditionAction, + T_MergeDoNothing, + T_MergeError, /* * TAGS FOR PARSE TREE NODES (parsenodes.h) @@ -511,6 +518,8 @@ typedef enum CmdType CMD_UPDATE, /* update stmt */ CMD_INSERT, /* insert stmt */ CMD_DELETE, + CMD_MERGE, /* merge stmt */ + CMD_DONOTHING, /*DO NOTHING action in MERGE command*/ CMD_UTILITY, /* cmds like create, destroy, copy, vacuum, * etc. */ CMD_NOTHING /* dummy command for instead nothing rules diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index e896dc7..696036d 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -149,8 +149,24 @@ typedef struct Query List *constraintDeps; /* a list of pg_constraint OIDs that the query * depends on to be semantically valid */ + + /* fields for MERGE command */ + bool isMergeAction; /* if this query is a merge action. */ + bool matched; /* this is a MATCHED action or NOT */ + int sourceAttrNo; /*the number of attributes in source table*/ + List *mergeActQry; /* the list of all the merge actions. + * used only for merge query statement */ } Query; +/* +* In MERGE command, the initial MERGE query has only three range table entry +*The first one is the source table; The second one is the target table; and The +*third one is the left join entry of them. +*During the whole process of a MEREGE command, the rt_index for the source table +*entry and the join table entry will never change. We set them as constant here. +*/ +#define Merge_SourceTableRTindex 1 +#define Merge_TopJoinTableRTindex 3 /**************************************************************************** * Supporting data structures for Parse Trees @@ -892,6 +908,7 @@ typedef struct CommonTableExpr typedef struct InsertStmt { NodeTag type; + bool isMergeAction; /*if this is a merge insert action*/ RangeVar *relation; /* relation to insert into */ List *cols; /* optional: names of the target columns */ Node *selectStmt; /* the source SELECT/VALUES, or NULL */ @@ -905,6 +922,7 @@ typedef struct InsertStmt typedef struct DeleteStmt { NodeTag type; + bool isMergeAction; /*if this is a merge delete action*/ RangeVar *relation; /* relation to delete from */ List *usingClause; /* optional using clause for more tables */ Node *whereClause; /* qualifications */ @@ -918,6 +936,7 @@ typedef struct DeleteStmt typedef struct UpdateStmt { NodeTag type; + bool isMergeAction; /*if this is a merge delete action*/ RangeVar *relation; /* relation to update */ List *targetList; /* the target list (of ResTarget) */ Node *whereClause; /* qualifications */ @@ -993,6 +1012,54 @@ typedef struct SelectStmt /* Eventually add fields for CORRESPONDING spec here */ } SelectStmt; +/* ---------------------- + * Merge Statement + * ---------------------- + */ +typedef struct MergeStmt +{ + NodeTag type; + RangeVar *relation; /* target relation for merge */ + + /* + * source relations for the merge. + * Currently, we only allow single-source merge, + * so the length of this list should always be 1. + */ + Node *source; + Node *matchCondition; /* qualifications of the merge */ + + /* list of MergeConditionAction structure. + * It stores all the matched / not-matched + * conditions and the corresponding actions + * The elements of this list are MergeConditionAction + * nodes. + */ + List *actions; +} MergeStmt; + +/* + * the structure for the actions of MERGE command. + * Holds info of the clauses like + * "WHEN MATCHED AND ... THEN UPDATE/DELETE/INSERT" + */ +typedef struct MergeConditionAction +{ + NodeTag type; + bool match; /* WHEN MATCHED or WHEN NOT MATCHED? */ + Node *condition; /* the AND condition for this action */ + Node *action; /* the actions: delete, insert or update */ +} MergeConditionAction; + +typedef struct MergeDoNothing +{ + NodeTag type; +} MergeDoNothing; + +typedef struct MergeError +{ + NodeTag type; +} MergeError; /* ---------------------- * Set Operation node for post-analysis query trees diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 037bc0b..5583f59 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -169,9 +169,38 @@ typedef struct ModifyTable List *returningLists; /* per-target-table RETURNING tlists */ List *rowMarks; /* PlanRowMarks (non-locking only) */ int epqParam; /* ID of Param for EvalPlanQual re-eval */ + List *mergeActPlan; /* the plans for merge actions, + * which are MergeActionSet nodes. + * one set for one target relation */ } ModifyTable; /* ---------------- + * MergeActionSet node - + * The node contains all actions of MERGE command for one specific target relation + * ---------------- + */ +typedef struct MergeActionSet +{ + NodeTag type; + int result_relation; + List *actions; +}MergeActionSet; + +/* ---------------- + * MergeAction node - + * The plan node for the actions of MERGE command + * ---------------- + */ +typedef struct MergeAction +{ + Plan plan; + CmdType operation; /* INSERT, UPDATE, DELETE, DO_NOTHING or RAISE_ERR */ + bool matched; /* is this a MATCHED or NOT MATCHED rule? */ + List *flattenedqual; /* the flattened qual expression of action */ + List *flattenedtlist;/*the fattened target list*/ +} MergeAction; + +/* ---------------- * Append node - * Generate the concatenation of the results of sub-plans. * ---------------- diff --git a/src/include/optimizer/planmain.h b/src/include/optimizer/planmain.h index 35cdd2c..3660e57 100644 --- a/src/include/optimizer/planmain.h +++ b/src/include/optimizer/planmain.h @@ -80,7 +80,7 @@ extern Result *make_result(PlannerInfo *root, List *tlist, Node *resconstantqual, Plan *subplan); extern ModifyTable *make_modifytable(CmdType operation, List *resultRelations, List *subplans, List *returningLists, - List *rowMarks, int epqParam); + List *rowMarks, List *mergeActPlans, int epqParam); extern bool is_projection_capable_plan(Plan *plan); /* diff --git a/src/include/optimizer/prep.h b/src/include/optimizer/prep.h index 9df6390..4e8117f 100644 --- a/src/include/optimizer/prep.h +++ b/src/include/optimizer/prep.h @@ -52,4 +52,6 @@ extern void expand_inherited_tables(PlannerInfo *root); extern Node *adjust_appendrel_attrs(Node *node, AppendRelInfo *appinfo); +extern List *adjust_inherited_tlist(List * tlist, AppendRelInfo * context); + #endif /* PREP_H */ diff --git a/src/include/optimizer/var.h b/src/include/optimizer/var.h index b0e04a0..ec773f4 100644 --- a/src/include/optimizer/var.h +++ b/src/include/optimizer/var.h @@ -15,6 +15,7 @@ #define VAR_H #include "nodes/relation.h" +#include "nodes/plannodes.h" typedef enum { @@ -32,5 +33,6 @@ extern int locate_var_of_relation(Node *node, int relid, int levelsup); extern int find_minimum_var_level(Node *node); extern List *pull_var_clause(Node *node, PVCPlaceHolderBehavior behavior); extern Node *flatten_join_alias_vars(PlannerInfo *root, Node *node); +extern void push_up_merge_action_vars(MergeAction *actplan, Query *actqry); #endif /* VAR_H */ diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index 974bb7a..208c3e4 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -141,6 +141,7 @@ PG_KEYWORD("encoding", ENCODING, UNRESERVED_KEYWORD) PG_KEYWORD("encrypted", ENCRYPTED, UNRESERVED_KEYWORD) PG_KEYWORD("end", END_P, RESERVED_KEYWORD) PG_KEYWORD("enum", ENUM_P, UNRESERVED_KEYWORD) +PG_KEYWORD("error", ERROR_P, UNRESERVED_KEYWORD) PG_KEYWORD("escape", ESCAPE, UNRESERVED_KEYWORD) PG_KEYWORD("except", EXCEPT, RESERVED_KEYWORD) PG_KEYWORD("exclude", EXCLUDE, UNRESERVED_KEYWORD) @@ -229,7 +230,9 @@ PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD) PG_KEYWORD("login", LOGIN_P, UNRESERVED_KEYWORD) PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD) PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD) +PG_KEYWORD("matched", MATCHED, UNRESERVED_KEYWORD) PG_KEYWORD("maxvalue", MAXVALUE, UNRESERVED_KEYWORD) +PG_KEYWORD("merge", MERGE, UNRESERVED_KEYWORD) PG_KEYWORD("minute", MINUTE_P, UNRESERVED_KEYWORD) PG_KEYWORD("minvalue", MINVALUE, UNRESERVED_KEYWORD) PG_KEYWORD("mode", MODE, UNRESERVED_KEYWORD) @@ -296,6 +299,7 @@ PG_KEYWORD("privileges", PRIVILEGES, UNRESERVED_KEYWORD) PG_KEYWORD("procedural", PROCEDURAL, UNRESERVED_KEYWORD) PG_KEYWORD("procedure", PROCEDURE, UNRESERVED_KEYWORD) PG_KEYWORD("quote", QUOTE, UNRESERVED_KEYWORD) +PG_KEYWORD("raise", RAISE, UNRESERVED_KEYWORD) PG_KEYWORD("range", RANGE, UNRESERVED_KEYWORD) PG_KEYWORD("read", READ, UNRESERVED_KEYWORD) PG_KEYWORD("real", REAL, COL_NAME_KEYWORD) diff --git a/src/include/parser/parse_clause.h b/src/include/parser/parse_clause.h index f3d3ee9..b54f530 100644 --- a/src/include/parser/parse_clause.h +++ b/src/include/parser/parse_clause.h @@ -19,6 +19,7 @@ extern void transformFromClause(ParseState *pstate, List *frmList); extern int setTargetTable(ParseState *pstate, RangeVar *relation, bool inh, bool alsoSource, AclMode requiredPerms); +extern void setTargetTableLock(ParseState *pstate, RangeVar *relation); extern bool interpretInhOption(InhOption inhOpt); extern bool interpretOidsOption(List *defList); diff --git a/src/test/regress/expected/merge.out b/src/test/regress/expected/merge.out new file mode 100644 index 0000000..7ee155a --- /dev/null +++ b/src/test/regress/expected/merge.out @@ -0,0 +1,202 @@ +-- MERGE -- +-- +-- create basic tables for test, and insert some source rows to work from +-- +--The Stock table, which records the amount of each item on hand. +CREATE TABLE Stock(item_id int UNIQUE, balance int); +NOTICE: CREATE TABLE / UNIQUE will create implicit index "stock_item_id_key" for table "stock" +INSERT INTO Stock VALUES (10, 2200); +INSERT INTO Stock VALUES (20, 1900); +SELECT * FROM Stock; + item_id | balance +---------+--------- + 10 | 2200 + 20 | 1900 +(2 rows) + +--The Buy table, which records the amount we bought today for each item. +CREATE TABLE Buy(item_id int, volume int); +INSERT INTO Buy values(10, 1000); +INSERT INTO Buy values(30, 300); +SELECT * FROM Buy; + item_id | volume +---------+-------- + 10 | 1000 + 30 | 300 +(2 rows) + +--The Sale table, which records the amount we sold today for each item. +CREATE TABLE Sale(item_id int, volume int); +INSERT INTO Sale VALUES (10, 2200); +INSERT INTO Sale VALUES (20, 1000); +SELECT * FROM Sale; + item_id | volume +---------+-------- + 10 | 2200 + 20 | 1000 +(2 rows) + +-- +-- initial queries +-- +-- do a simple equivalent of an UPDATE join +BEGIN; +MERGE INTO Stock USING Buy ON Stock.item_id = Buy.item_id + WHEN MATCHED THEN UPDATE SET balance = balance + Buy.volume +; +SELECT * FROM Stock ORDER BY item_id; + item_id | balance +---------+--------- + 10 | 3200 + 20 | 1900 +(2 rows) + +ROLLBACK; +-- do a simple equivalent of an INSERT SELECT +BEGIN; +MERGE INTO Stock USING Buy ON Stock.item_id = Buy.item_id + WHEN NOT MATCHED THEN INSERT VALUES (Buy.item_id, Buy.volume) +; +SELECT * FROM Stock ORDER BY item_id; + item_id | balance +---------+--------- + 10 | 2200 + 20 | 1900 + 30 | 300 +(3 rows) + +ROLLBACK; +-- now the classic UPSERT +BEGIN; +MERGE INTO Stock USING Buy ON Stock.item_id = Buy.item_id + WHEN MATCHED THEN UPDATE SET balance = balance + Buy.volume + WHEN NOT MATCHED THEN INSERT VALUES (Buy.item_id, Buy.volume) +; +SELECT * FROM Stock ORDER BY item_id; + item_id | balance +---------+--------- + 10 | 3200 + 20 | 1900 + 30 | 300 +(3 rows) + +ROLLBACK; +-- +-- Non-standard functionality +-- +-- a MERGE with DELETE action, which is not allowed in Standard. +-- Extra qualifications are allowed in each WHEN clause. +BEGIN; +MERGE INTO Stock USING Sale ON Stock.item_id = Sale.item_id + WHEN MATCHED AND balance - volume > 0 THEN UPDATE SET balance = balance - volume + WHEN MATCHED THEN DELETE; +; +SELECT * FROM Stock ORDER BY item_id; + item_id | balance +---------+--------- + 20 | 900 +(1 row) + +ROLLBACK; +-- The DO NOTHING action is another extension for MERGE. +-- rows specified by DO NOTHING are ignored +BEGIN; +MERGE INTO Stock USING Sale ON Stock.item_id = Sale.item_id + WHEN MATCHED AND balance - volume > 0 THEN DO NOTHING + WHEN MATCHED THEN DELETE +; +SELECT * FROM Stock ORDER BY item_id; + item_id | balance +---------+--------- + 20 | 1900 +(1 row) + +ROLLBACK; +-- DO NOTHING is the default action for non-matching rows that do not +-- activate any WHEN clause. They are just ignored +BEGIN; +MERGE INTO Stock USING Sale ON Stock.item_id = Sale.item_id + WHEN MATCHED AND balance - volume > 100000 THEN UPDATE SET balance = balance - volume +; +SELECT * FROM Stock ORDER BY item_id; + item_id | balance +---------+--------- + 10 | 2200 + 20 | 1900 +(2 rows) + +ROLLBACK; +-- Prepare the test data to generate multiple matching rows for a single target +INSERT INTO Buy values(10, 400); +SELECT * FROM Buy ORDER BY item_id; + item_id | volume +---------+-------- + 10 | 1000 + 10 | 400 + 30 | 300 +(3 rows) + +-- we now have a duplicate key in Buy, so when we join to +-- Stock we will generate 2 matching rows, not one. +-- According to standard this command should fail. +-- But it succeeds in PostgreSQL implementation by simply ignoring the second +BEGIN; +MERGE INTO Stock USING Buy ON Stock.item_id = Buy.item_id + WHEN MATCHED THEN UPDATE SET balance = balance + Buy.volume +; +SELECT * FROM Stock ORDER BY item_id; + item_id | balance +---------+--------- + 10 | 3200 + 20 | 1900 +(2 rows) + +ROLLBACK; +-- +-- More complicated query +-- +-- The source table of MERGE could be a SELECT clause +BEGIN; +MERGE INTO Stock USING + (SELECT Buy.item_id, (Buy.volume - Sale.volume) as v + FROM Buy, Sale + WHERE Buy.item_id = Sale.item_id) + AS BS + ON Stock.item_id = BS.item_id + WHEN MATCHED THEN UPDATE SET balance = balance + BS.v +; +SELECT * FROM Stock ORDER BY item_id; + item_id | balance +---------+--------- + 10 | 1000 + 20 | 1900 +(2 rows) + +ROLLBACK; +-- Subplan/sublinks can be used in MERGE actions +-- Create a table for sublink. +CREATE TABLE Extra (item_id int, volume int); +INSERT INTO Extra VALUES (10, 20); +INSERT INTO Extra VALUES (10, -7); +INSERT INTO Extra VALUES (20, 16); +INSERT INTO Extra VALUES (20, 5); +INSERT INTO Extra VALUES (30, 9); +-- The following query sum-up the volumes in Extra table and upinsert the Stock. +BEGIN; +MERGE INTO Stock USING Buy ON Stock.item_id = Buy.item_id + WHEN MATCHED THEN UPDATE SET + balance = balance + Buy.volume + + (SELECT sum(volume) FROM Extra WHERE Extra.item_id = Buy.item_id) + WHEN NOT MATCHED THEN INSERT VALUES + (Buy.item_id, Buy.volume + + (SELECT sum(volume) FROM Extra WHERE Extra.item_id = Buy.item_id)) +; +SELECT * FROM Stock ORDER BY item_id; + item_id | balance +---------+--------- + 10 | 3213 + 20 | 1900 + 30 | 309 +(3 rows) + +ROLLBACK; diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule index 191d1fe..2551b2a 100644 --- a/src/test/regress/parallel_schedule +++ b/src/test/regress/parallel_schedule @@ -91,7 +91,7 @@ test: select_views portals_p2 foreign_key cluster dependency guc bitmapops combo # NB: temp.sql does a reconnect which transiently uses 2 connections, # so keep this parallel group to at most 19 tests # ---------- -test: plancache limit plpgsql copy2 temp domain rangefuncs prepare without_oid conversion truncate alter_table sequence polymorphism rowtypes returning largeobject with xml +test: plancache limit plpgsql copy2 temp domain rangefuncs prepare without_oid conversion truncate alter_table sequence polymorphism rowtypes returning largeobject with xml merge # run stats by itself because its delay may be insufficient under heavy load test: stats diff --git a/src/test/regress/serial_schedule b/src/test/regress/serial_schedule index 80a9881..e7d7fae 100644 --- a/src/test/regress/serial_schedule +++ b/src/test/regress/serial_schedule @@ -123,4 +123,5 @@ test: returning test: largeobject test: with test: xml +test: merge test: stats diff --git a/src/test/regress/sql/merge.sql b/src/test/regress/sql/merge.sql new file mode 100644 index 0000000..5cdb5db --- /dev/null +++ b/src/test/regress/sql/merge.sql @@ -0,0 +1,135 @@ +-- MERGE -- + +-- +-- create basic tables for test, and insert some source rows to work from +-- +--The Stock table, which records the amount of each item on hand. +CREATE TABLE Stock(item_id int UNIQUE, balance int); +INSERT INTO Stock VALUES (10, 2200); +INSERT INTO Stock VALUES (20, 1900); +SELECT * FROM Stock; + +--The Buy table, which records the amount we bought today for each item. +CREATE TABLE Buy(item_id int, volume int); +INSERT INTO Buy values(10, 1000); +INSERT INTO Buy values(30, 300); +SELECT * FROM Buy; + +--The Sale table, which records the amount we sold today for each item. +CREATE TABLE Sale(item_id int, volume int); +INSERT INTO Sale VALUES (10, 2200); +INSERT INTO Sale VALUES (20, 1000); +SELECT * FROM Sale; + +-- +-- initial queries +-- +-- do a simple equivalent of an UPDATE join +BEGIN; +MERGE INTO Stock USING Buy ON Stock.item_id = Buy.item_id + WHEN MATCHED THEN UPDATE SET balance = balance + Buy.volume +; +SELECT * FROM Stock ORDER BY item_id; +ROLLBACK; + +-- do a simple equivalent of an INSERT SELECT +BEGIN; +MERGE INTO Stock USING Buy ON Stock.item_id = Buy.item_id + WHEN NOT MATCHED THEN INSERT VALUES (Buy.item_id, Buy.volume) +; +SELECT * FROM Stock ORDER BY item_id; +ROLLBACK; + +-- now the classic UPSERT +BEGIN; +MERGE INTO Stock USING Buy ON Stock.item_id = Buy.item_id + WHEN MATCHED THEN UPDATE SET balance = balance + Buy.volume + WHEN NOT MATCHED THEN INSERT VALUES (Buy.item_id, Buy.volume) +; +SELECT * FROM Stock ORDER BY item_id; +ROLLBACK; + +-- +-- Non-standard functionality +-- +-- a MERGE with DELETE action, which is not allowed in Standard. +-- Extra qualifications are allowed in each WHEN clause. +BEGIN; +MERGE INTO Stock USING Sale ON Stock.item_id = Sale.item_id + WHEN MATCHED AND balance - volume > 0 THEN UPDATE SET balance = balance - volume + WHEN MATCHED THEN DELETE; +; +SELECT * FROM Stock ORDER BY item_id; +ROLLBACK; + +-- The DO NOTHING action is another extension for MERGE. +-- rows specified by DO NOTHING are ignored +BEGIN; +MERGE INTO Stock USING Sale ON Stock.item_id = Sale.item_id + WHEN MATCHED AND balance - volume > 0 THEN DO NOTHING + WHEN MATCHED THEN DELETE +; +SELECT * FROM Stock ORDER BY item_id; +ROLLBACK; + +-- DO NOTHING is the default action for non-matching rows that do not +-- activate any WHEN clause. They are just ignored +BEGIN; +MERGE INTO Stock USING Sale ON Stock.item_id = Sale.item_id + WHEN MATCHED AND balance - volume > 100000 THEN UPDATE SET balance = balance - volume +; +SELECT * FROM Stock ORDER BY item_id; +ROLLBACK; + +-- Prepare the test data to generate multiple matching rows for a single target +INSERT INTO Buy values(10, 400); +SELECT * FROM Buy ORDER BY item_id; + +-- we now have a duplicate key in Buy, so when we join to +-- Stock we will generate 2 matching rows, not one. +-- According to standard this command should fail. +-- But it succeeds in PostgreSQL implementation by simply ignoring the second +BEGIN; +MERGE INTO Stock USING Buy ON Stock.item_id = Buy.item_id + WHEN MATCHED THEN UPDATE SET balance = balance + Buy.volume +; +SELECT * FROM Stock ORDER BY item_id; +ROLLBACK; + +-- +-- More complicated query +-- +-- The source table of MERGE could be a SELECT clause +BEGIN; +MERGE INTO Stock USING + (SELECT Buy.item_id, (Buy.volume - Sale.volume) as v + FROM Buy, Sale + WHERE Buy.item_id = Sale.item_id) + AS BS + ON Stock.item_id = BS.item_id + WHEN MATCHED THEN UPDATE SET balance = balance + BS.v +; +SELECT * FROM Stock ORDER BY item_id; +ROLLBACK; + +-- Subplan/sublinks can be used in MERGE actions +-- Create a table for sublink. +CREATE TABLE Extra (item_id int, volume int); +INSERT INTO Extra VALUES (10, 20); +INSERT INTO Extra VALUES (10, -7); +INSERT INTO Extra VALUES (20, 16); +INSERT INTO Extra VALUES (20, 5); +INSERT INTO Extra VALUES (30, 9); + +-- The following query sum-up the volumes in Extra table and upinsert the Stock. +BEGIN; +MERGE INTO Stock USING Buy ON Stock.item_id = Buy.item_id + WHEN MATCHED THEN UPDATE SET + balance = balance + Buy.volume + + (SELECT sum(volume) FROM Extra WHERE Extra.item_id = Buy.item_id) + WHEN NOT MATCHED THEN INSERT VALUES + (Buy.item_id, Buy.volume + + (SELECT sum(volume) FROM Extra WHERE Extra.item_id = Buy.item_id)) +; +SELECT * FROM Stock ORDER BY item_id; +ROLLBACK;