Re: ask for review of MERGE - Mailing list pgsql-hackers
From | Greg Smith |
---|---|
Subject | Re: ask for review of MERGE |
Date | |
Msg-id | 4CBB92BE.2030703@2ndquadrant.com Whole thread Raw |
In response to | Re: ask for review of MERGE (Boxuan Zhai <bxzhai2010@gmail.com>) |
Responses |
Re: ask for review of MERGE
|
List | pgsql-hackers |
Boxuan Zhai wrote: > Yes, it should be NULL instead of NIL. OK, I applied that patch to my local copy and pushed it out to github: http://github.com/greg2ndQuadrant/postgres/commit/9013ba9e81490e3623add1b029760817021297c0 That represents what I tested against. However, when I tried to merge against HEAD once I was finished, I discovered this patch has bit-rotted significantly. If you have a local copy that works for you, I would not recommend pulling in the PostgreSQL repo updates done in the last couple of weeks yet. Friday's "Allow WITH clauses to be attached to INSERT, UPDATE, DELETE statements" commit in particular conflicts quite a bit with your changes. Attached is a rebased patch that applies to HEAD now after a round of fixes to resolve those. But it doesn't work yet, because of recent change to the ExecUpdate and ExecDelete functions you're calling from src/backend/executor/nodeModifyTable.c inside ExecMerge. If you can continue working on the patch without merging recent repo work, I can hack away at fixing that once I figure out what got changed there recently. It's taken some painful git work to sort out what I've done so far, there's more left to do, and I know that's not an area you specialize in. Back to the feature review...I dove into how I expected this to work, relative to what it actually does at the moment. That didn't really go too well so far, but I don't know that this represents any fundamental issue with the patch. Just situations the existing code didn't really anticipate we have to flush out. As a general community FYI here, while it's taken me a while to get up to speed on this whole feature, I expect to keep chugging away on this regardless of the CommitFest boundaries. This feature is both too big and too important to just stop working on it because a date has passed. Onto the test cases. The examples that Boxuan has been working with, and that the regression tests included with the patch exercise, all involve two tables being joined together using MERGE. The use case I decided to test instead was when you're trying to simulate an UPSERT where only a single table is involved. I couldn't get to this to work correctly. Maybe I'm just using MERGE wrong here, but I tried so many different variations without success (including one that's basically copied from Simon's original regression test set suggestions) that I suspect there may be a subtle problem with the implementation instead. To replicate the most straightforward variations of what I ran into, you can start with the same data that's populated by the regression test set: CREATE TABLE Stock(item_id int UNIQUE, balance int); INSERT INTO Stock VALUES (10, 2200); INSERT INTO Stock VALUES (20, 1900); SELECT * FROM Stock; item_id | balance ---------+--------- 10 | 2200 20 | 1900 If you now execute the following: MERGE INTO Stock t USING (SELECT * FROM Stock WHERE item_id=10) AS s ON s.item_id=t.item_id WHEN MATCHED THEN UPDATE SET balance=s.balance + 1 WHEN NOT MATCHED THEN INSERT VALUES (10,1) ; This works fine, and updates the matching row: item_id | balance ---------+--------- 20 | 1900 10 | 2201 But if I give it a key that doesn't exist instead: MERGE INTO Stock t USING (SELECT * FROM Stock WHERE item_id=30) AS s ON s.item_id=t.item_id WHEN MATCHED THEN UPDATE SET balance=s.balance + 1 WHEN NOT MATCHED THEN INSERT VALUES (30,1) ; This doesn't execute the NOT MATCHED case and INSERT the way I expected it to. It just gives back "MERGE 0". Since I wasn't sure if the whole "subquery in the USING clause" case was really implemented fully, I then tried to do this with something more like the working regression test examples. I expected this to do the same thing as the first example: MERGE INTO Stock t USING Stock s ON s.item_id=10 AND s.item_id=t.item_id WHEN MATCHED THEN UPDATE SET balance=s.balance + 1 WHEN NOT MATCHED THEN INSERT VALUES (10,1) ; But it gives back this: ERROR: duplicate key value violates unique constraint "stock_item_id_key" DETAIL: Key (item_id)=(10) already exists. Can't tell from that whether it's hitting the MATCHED or NOT MATCHED side of things to generate that. But it doesn't work any better if you give it an example that doesn't exist: MERGE INTO Stock t USING Stock s ON s.item_id=30 AND s.item_id=t.item_id WHEN MATCHED THEN UPDATE SET balance=s.balance + 1 WHEN NOT MATCHED THEN INSERT VALUES (30,1) ; ERROR: duplicate key value violates unique constraint "stock_item_id_key" DETAIL: Key (item_id)=(30) already exists. Now that one is really weird, because that key value certainly doesn't exist yet in the table. There seem to be a couple of issues in the area of joining a table with itself here. Feel free to tell me I just don't know how to use MERGE if that's the case in any of these. The other thing I noticed that may take some work to sort out is that I haven't had any luck getting MERGE to execute from within a plpgsql function. I was hoping I could use this to update the pgbench tables: CREATE OR REPLACE FUNCTION merge_account_balance(key INT, delta NUMERIC) RETURNS VOID AS $$ BEGIN MERGE INTO pgbench_accounts t USING (SELECT * FROM pgbench_accounts WHERE aid = key) AS s ON t.aid=s.aid WHEN MATCHED THEN UPDATE SET abalance = s.abalance + delta WHEN NOT MATCHED THEN INSERT VALUES (key,1+(key / 100000)::integer,delta,''); END; $$ LANGUAGE plpgsql; But I just get this instead: ERROR: "pgbench_accounts" is not a known variable LINE 4: MERGE INTO pgbench_accounts t USING (SELECT * FROM p... The other way I wrote the MERGE statement above (not using a subquery) does the same thing. I know that error messages is coming from the changes introduced in http://archives.postgresql.org/pgsql-committers/2010-01/msg00161.php but I'm not familiar enough with the whole grammar implementation to know what that means yet. That's what I found so far in my second pass over this. Once the first problem here is sorted out, I've already worked out how to test the performance of the code using pgbench. Have all the scripts ready to go once the correct MERGE statement is plugged into them, just ran into this same class of problems when I tried them. So far I was only able to see how fast the UPDATE path worked though, which isn't very helpful yet. My hope here is to test the MERGE implementation vs. the classic pl/pgsql implementation of UPSERT, calling both within a function so it's a fair comparison, and see how that goes. This may flush out concurrency bugs that are in the MERGE code as well. -- Greg Smith, 2ndQuadrant US greg@2ndQuadrant.com Baltimore, MD PostgreSQL Training, Services and Support www.2ndQuadrant.us diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml index f5d67a2..29652ac 100644 --- a/doc/src/sgml/ref/allfiles.sgml +++ b/doc/src/sgml/ref/allfiles.sgml @@ -119,6 +119,7 @@ Complete list of usable sgml source files in this directory. <!entity listen system "listen.sgml"> <!entity load system "load.sgml"> <!entity lock system "lock.sgml"> +<!entity merge system "merge.sgml"> <!entity move system "move.sgml"> <!entity notify system "notify.sgml"> <!entity prepare system "prepare.sgml"> diff --git a/doc/src/sgml/ref/merge.sgml b/doc/src/sgml/ref/merge.sgml new file mode 100644 index 0000000..7c73623 --- /dev/null +++ b/doc/src/sgml/ref/merge.sgml @@ -0,0 +1,409 @@ +<!-- +$PostgreSQL$ +--> + +<refentry id="SQL-MERGE"> + <refmeta> + <refentrytitle id="SQL-MERGE-TITLE">MERGE</refentrytitle> + <refmiscinfo>SQL - Language Statements</refmiscinfo> + </refmeta> + + <refnamediv> + <refname>MERGE</refname> + <refpurpose>update, insert or delete rows of a table based upon source data</refpurpose> + </refnamediv> + + <indexterm zone="sql-merge"> + <primary>MERGE</primary> + </indexterm> + + <refsynopsisdiv> +<synopsis> +MERGE INTO <replaceable class="PARAMETER">table</replaceable> [ [ AS ] <replaceable class="parameter">alias</replaceable>] +USING <replaceable class="PARAMETER">source-query</replaceable> +ON <replaceable class="PARAMETER">join_condition</replaceable> +[<replaceable class="PARAMETER">when_clause</replaceable> [...]] + +where <replaceable class="PARAMETER">when_clause</replaceable> is + +{ WHEN MATCHED [ AND <replaceable class="PARAMETER">condition</replaceable> ] THEN { <replaceable class="PARAMETER">merge_update</replaceable>| DELETE | DO NOTHING | RAISE ERROR} + WHEN NOT MATCHED [ AND <replaceable class="PARAMETER">condition</replaceable> ] THEN { <replaceable class="PARAMETER">merge_insert</replaceable>| DO NOTHING | RAISE ERROR} } + +where <replaceable class="PARAMETER">merge_update</replaceable> is + +UPDATE SET { <replaceable class="PARAMETER">column</replaceable> = { <replaceable class="PARAMETER">expression</replaceable>| DEFAULT } | + ( <replaceable class="PARAMETER">column</replaceable> [, ...] ) = ( { <replaceable class="PARAMETER">expression</replaceable>| DEFAULT } [, ...] ) } [, ...] + +and <replaceable class="PARAMETER">merge_insert</replaceable> is + +INSERT [( <replaceable class="PARAMETER">column</replaceable> [, ...] )] { VALUES ( { <replaceable class="PARAMETER">expression</replaceable>| DEFAULT } [, ...] ) | DEFAULT VALUES } +</synopsis> + </refsynopsisdiv> + + <refsect1> + <title>Description</title> + + <para> + <command>MERGE</command> performs at most one action on each row from + the target table, driven by the rows from the source query. This + provides a way to specify a single SQL statement that can conditionally + <command>UPDATE</command> or <command>INSERT</command> rows, a task + that would otherwise require multiple procedural language statements. + </para> + + <para> + First, the <command>MERGE</command> command performs a left outer join + from source query to target table, producing zero or more merged rows. For + each merged row, <literal>WHEN</> clauses are evaluated in the + specified order until one of them is activated. The corresponding action + is then applied and processing continues for the next row. + </para> + + <para> + <command>MERGE</command> actions have the same effect as + regular <command>UPDATE</command>, <command>INSERT</command>, or + <command>DELETE</command> commands of the same names, though the syntax + is slightly different. + </para> + + <para> + If no <literal>WHEN</> clause activates then an implicit action of + <literal>RAISE ERROR</> is performed for that row. If that + implicit action is not desirable an explicit action of + <literal>DO NOTHING</> may be specified instead. + </para> + + <para> + <command>MERGE</command> will only affect rows only in the specified table. + </para> + + <para> + There is no <literal>RETURNING</> clause with <command>MERGE</command>. + </para> + + <para> + There is no MERGE privilege. + You must have the <literal>UPDATE</literal> privilege on the table + if you specify an update action, the <literal>INSERT</literal> privilege if + you specify an insert action and/or the <literal>DELETE</literal> privilege + if you wish to delete. You will also require the + <literal>SELECT</literal> privilege to any table whose values are read + in the <replaceable class="parameter">expressions</replaceable> or + <replaceable class="parameter">condition</replaceable>. + </para> + </refsect1> + + <refsect1> + <title>Parameters</title> + + <variablelist> + <varlistentry> + <term><replaceable class="PARAMETER">table</replaceable></term> + <listitem> + <para> + The name (optionally schema-qualified) of the table to merge into. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><replaceable class="parameter">alias</replaceable></term> + <listitem> + <para> + A substitute name for the target table. When an alias is + provided, it completely hides the actual name of the table. For + example, given <literal>MERGE foo AS f</>, the remainder of the + <command>MERGE</command> statement must refer to this table as + <literal>f</> not <literal>foo</>. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><replaceable class="PARAMETER">source-query</replaceable></term> + <listitem> + <para> + A query (<command>SELECT</command> statement or <command>VALUES</command> + statement) that supplies the rows to be merged into the target table. + Refer to the <xref linkend="sql-select"> + statement or <xref linkend="sql-values"> + statement for a description of the syntax. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><replaceable class="PARAMETER">join_condition</replaceable></term> + <listitem> + <para> + <replaceable class="parameter">join_condition</replaceable> is + an expression resulting in a value of type + <type>boolean</type> (similar to a <literal>WHERE</literal> + clause) that specifies which rows in the join are considered to + match. You should ensure that the join produces at most one output + row for each row to be modified. An attempt to modify any row of the + target table more than once will result in an error. This behaviour + requires the user to take greater care in using <command>MERGE</command>, + though is required explicitly by the SQL Standard. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><replaceable class="PARAMETER">condition</replaceable></term> + <listitem> + <para> + An expression that returns a value of type <type>boolean</type>. + If this expression returns <literal>true</> then the <literal>WHEN</> + clause will be activated and the corresponding action will occur for + that row. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><replaceable class="PARAMETER">merge_update</replaceable></term> + <listitem> + <para> + The specification of an <literal>UPDATE</> action. Do not include + the table name, as you would normally do with an + <xref linkend="sql-update"> command. + For example, <literal>UPDATE tab SET col = 1</> is invalid. Also, + do not include a <literal>WHERE</> clause, since only the current + can be updated. For example, + <literal>UPDATE SET col = 1 WHERE key = 57</> is invalid. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><replaceable class="PARAMETER">merge_insert</replaceable></term> + <listitem> + <para> + The specification of an <literal>INSERT</> action. Do not include + the table name, as you would normally do with an + <xref linkend="sql-insert"> command. + For example, <literal>INSERT INTO tab VALUES (1, 50)</> is invalid. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><replaceable class="PARAMETER">column</replaceable></term> + <listitem> + <para> + The name of a column in <replaceable + class="PARAMETER">table</replaceable>. + The column name can be qualified with a subfield name or array + subscript, if needed. Do not include the table's name in the + specification of a target column — for example, + <literal>UPDATE SET tab.col = 1</> is invalid. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><replaceable class="PARAMETER">expression</replaceable></term> + <listitem> + <para> + An expression to assign to the column. The expression can use the + old values of this and other columns in the table. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><literal>DEFAULT</literal></term> + <listitem> + <para> + Set the column to its default value (which will be NULL if no + specific default expression has been assigned to it). + </para> + </listitem> + </varlistentry> + + </variablelist> + </refsect1> + + <refsect1> + <title>Outputs</title> + + <para> + On successful completion, a <command>MERGE</> command returns a command + tag of the form +<screen> +MERGE <replaceable class="parameter">total-count</replaceable> +</screen> + The <replaceable class="parameter">total-count</replaceable> is the number + of rows changed (either updated, inserted or deleted). + If <replaceable class="parameter">total-count</replaceable> is 0, no rows + were changed (this is not considered an error). + </para> + + <para> + The number of rows updated, inserted or deleted is not available as part + of the command tag. An optional NOTIFY message can be generated to + present this information, if desired. +<screen> +NOTIFY: 34 rows processed: 11 updated, 5 deleted, 15 inserted, 3 default inserts, 0 no action +</screen> + </para> + + </refsect1> + + <refsect1> + <title>Notes</title> + + <para> + What essentially happens is that the target table is left outer-joined to + the tables mentioned in the <replaceable>source-query</replaceable>, and + each output row of the join may then activate at most one when-clause. + The row will be matched only once per statement, so the status of + <literal>MATCHED</> or <literal>NOT MATCHED</> cannot change once testing + of <literal>WHEN</> clauses has begun. <command>MERGE</command> will not + invoke Rules. + </para> + + <para> + The following steps take place during the execution of + <command>MERGE</command>. + <orderedlist> + <listitem> + <para> + Perform any BEFORE STATEMENT triggers for actions specified, whether or + not they actually occur. + </para> + </listitem> + <listitem> + <para> + Perform left outer join from source to target table. Then for each row: + <orderedlist> + <listitem> + <para> + Evaluate whether each row is MATCHED or NOT MATCHED. + </para> + </listitem> + <listitem> + <para> + Test each WHEN condition in the order specified until one activates. + Identify the action and its event type. + </para> + </listitem> + <listitem> + <para> + Perform any BEFORE ROW triggers that fire for the action's event type. + </para> + </listitem> + <listitem> + <para> + Apply the action specified. + </para> + </listitem> + <listitem> + <para> + Perform any AFTER ROW triggers that fire for the action's event type. + </para> + </listitem> + </orderedlist> + </para> + </listitem> + <listitem> + <para> + Perform any AFTER STATEMENT triggers for actions specified, whether or + not they actually occur. + </para> + </listitem> + </orderedlist> + In summary, statement triggers for an event type (say, INSERT) will + be fired whenever we <emphasis>specify</> an action of that kind. Row-level + triggers will fire only for event type <emphasis>activated</>. + So a <command>MERGE</command> might fire statement triggers for both + <literal>UPDATE</> and <literal>INSERT</>, even though only + <literal>UPDATE</> row triggers were fired. + </para> + + </refsect1> + + <refsect1> + <title>Examples</title> + + <para> + Attempt to insert a new stock item along with the quantity of stock. If + the item already exists, instead update the stock count of the existing + item. +<programlisting> +MERGE INTO wines w +USING (VALUES('Chateau Lafite 2003', '24')) v +ON v.column1 = w.winename +WHEN NOT MATCHED THEN + INSERT VALUES(v.column1, v.column2) +WHEN MATCHED THEN + UPDATE SET stock = stock + v.column2; +</programlisting> + </para> + + <para> + Perform maintenance on CustomerAccounts based upon new Transactions. + The following statement will fail if any accounts have had more than + one transaction + +<programlisting> +MERGE CustomerAccount CA + +USING (SELECT CustomerId, TransactionValue, + FROM Transactions + WHERE TransactionId > 35345678) AS T + +ON T.CustomerId = CA.CustomerId + +WHEN MATCHED THEN + UPDATE SET Balance = Balance - TransactionValue + +WHEN NOT MATCHED THEN + INSERT (CustomerId, Balance) + VALUES (T.CustomerId, T.TransactionValue) +; +</programlisting> + + so the right way to do this is to pre-aggregate the data + +<programlisting> +MERGE CustomerAccount CA + +USING (SELECT CustomerId, Sum(TransactionValue) As TransactionSum + FROM Transactions + WHERE TransactionId > 35345678 + GROUP BY CustomerId) AS T + +ON T.CustomerId = CA.CustomerId + +WHEN MATCHED THEN + UPDATE SET Balance = Balance - TransactionSum + +WHEN NOT MATCHED THEN + INSERT (CustomerId, Balance) + VALUES (T.CustomerId, T.TransactionSum) +; +</programlisting> + </para> + + </refsect1> + + <refsect1> + <title>Compatibility</title> + + <para> + This command conforms to the <acronym>SQL</acronym> standard, except + that the <literal>DELETE</literal> and <literal>DO NOTHING</> actions + are <productname>PostgreSQL</productname> extensions. + </para> + + <para> + According to the standard, the column-list syntax for an <literal>UPDATE</> + action should allow a list of columns to be assigned from a single + row-valued expression. + This is not currently implemented — the source must be a list + of independent expressions. + </para> + </refsect1> +</refentry> diff --git a/doc/src/sgml/ref/update.sgml b/doc/src/sgml/ref/update.sgml index 5968db1..4b681bb 100644 --- a/doc/src/sgml/ref/update.sgml +++ b/doc/src/sgml/ref/update.sgml @@ -340,6 +340,9 @@ UPDATE wines SET stock = stock + 24 WHERE winename = 'Chateau Lafite 2003'; -- continue with other operations, and eventually COMMIT; </programlisting> + + This operation can be executed in a single statement using + <xref linkend="sql-merge" endterm="sql-merge-title">. </para> <para> diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml index 463746c..cf9d678 100644 --- a/doc/src/sgml/reference.sgml +++ b/doc/src/sgml/reference.sgml @@ -147,6 +147,7 @@ &listen; &load; &lock; + &merge; &move; ¬ify; &prepare; diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index f494ec9..a5814a7 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -81,6 +81,8 @@ static void show_sort_keys_common(PlanState *planstate, static void show_sort_info(SortState *sortstate, ExplainState *es); static void show_hash_info(HashState *hashstate, ExplainState *es); static const char *explain_get_index_name(Oid indexId); +static void ExplainMergeActions(ModifyTableState *mt_planstate, + List *ancestors, ExplainState *es); static void ExplainScanTarget(Scan *plan, ExplainState *es); static void ExplainMemberNodes(List *plans, PlanState **planstates, List *ancestors, ExplainState *es); @@ -644,6 +646,9 @@ ExplainNode(PlanState *planstate, List *ancestors, case CMD_DELETE: pname = operation = "Delete"; break; + case CMD_MERGE: + pname = operation = "Merge"; + break; default: pname = "???"; break; @@ -1207,6 +1212,11 @@ ExplainNode(PlanState *planstate, List *ancestors, if (innerPlanState(planstate)) ExplainNode(innerPlanState(planstate), ancestors, "Inner", NULL, es); + + if (IsA(plan, ModifyTable) && + ((ModifyTable *)plan)->operation == CMD_MERGE) + ExplainMergeActions((ModifyTableState *) planstate, + ancestors, es); /* special child plans */ switch (nodeTag(plan)) @@ -1547,6 +1557,89 @@ explain_get_index_name(Oid indexId) return result; } +static void +ExplainMergeActions(ModifyTableState *mt_planstate, List *ancestors, + ExplainState *es) +{ + ListCell *l; + int actno = 1; + StringInfo buf = makeStringInfo(); + StringInfo acttitle = makeStringInfo(); + MergeActionSet *actset; + + if (mt_planstate->operation != CMD_MERGE || + mt_planstate->mt_mergeActPstates == NULL) + return; + + actset = mt_planstate->mt_mergeActPstates[0]; + + foreach(l, actset->actions) + { + ModifyTableState *mt_state = (ModifyTableState *) lfirst(l); + MergeActionState *act_pstate = (MergeActionState *) mt_state->mt_plans[0]; + MergeAction *act_plan = (MergeAction *) act_pstate->ps.plan; + + /*prepare the title*/ + resetStringInfo(acttitle); + appendStringInfo(acttitle, "Action %d", actno); + + /*prepare the string for printing*/ + resetStringInfo(buf); + switch(act_pstate->operation) + { + case CMD_INSERT: + appendStringInfoString(buf, "Insert When "); + break; + case CMD_UPDATE: + appendStringInfoString(buf, "Update When "); + break; + case CMD_DELETE: + appendStringInfoString(buf, "Delete When "); + break; + case CMD_DONOTHING: + appendStringInfoString(buf, "Do Nothing When "); + break; + default: + elog(ERROR, "unknown merge action"); + } + + if (act_plan->matched) + appendStringInfoString(buf, "Matched "); + else + appendStringInfoString(buf, "Not Mactched "); + + if (act_plan->flattenedqual) + appendStringInfoString(buf, "And "); + + /*print the action type*/ + ExplainPropertyText(acttitle->data, buf->data, es); + + /*print the action qual*/ + show_qual(act_plan->flattenedqual, "Qual", + &act_pstate->ps, ancestors, true, es); + + /*print the target list of action*/ + if (es->verbose && + (act_plan->operation == CMD_INSERT || + act_plan->operation == CMD_UPDATE)) + { + List *orignialtlist; + + orignialtlist = act_plan->plan.targetlist; + act_plan->plan.targetlist = act_plan->flattenedtlist; + show_plan_tlist((PlanState *) act_pstate, ancestors, es); + act_plan->plan.targetlist = orignialtlist; + } + + if (act_pstate->ps.subPlan) + ExplainSubPlans(act_pstate->ps.subPlan, ancestors, "SubPlan", es); + + actno++; + } + + ExplainPropertyText("MainPlan", "", es); +} + /* * Show the target of a Scan node */ diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c index d69fdcf..0bd0b68 100644 --- a/src/backend/commands/trigger.c +++ b/src/backend/commands/trigger.c @@ -2443,6 +2443,87 @@ ExecASTruncateTriggers(EState *estate, ResultRelInfo *relinfo) false, NULL, NULL, NIL, NULL); } +void +ExecBSMergeTriggers(ModifyTableState *mt_state) +{ + ListCell *l; + MergeActionSet *actset; + bool doUpdateTriggers = false; + bool doInsertTriggers = false; + bool doDeleteTriggers = false; + + /* Scan the actions to see what kind of statements there is */ + actset = mt_state->mt_mergeActPstates[0]; + foreach(l, actset->actions) + { + ModifyTableState *actmtstate; + MergeActionState *actPstate; + MergeAction *actplan; + + actmtstate = (ModifyTableState *) lfirst(l); + actPstate = (MergeActionState *) actmtstate->mt_plans[0]; + actplan = (MergeAction *) actPstate->ps.plan; + + if (actplan->operation == CMD_UPDATE) + doUpdateTriggers = true; + else if (actplan->operation == CMD_INSERT) + doInsertTriggers = true; + else if (actplan->operation == CMD_DELETE) + doDeleteTriggers = true; + } + + /* And fire the triggers */ + if (doUpdateTriggers) + ExecBSUpdateTriggers(mt_state->ps.state, + mt_state->ps.state->es_result_relations); + if (doInsertTriggers) + ExecBSInsertTriggers(mt_state->ps.state, + mt_state->ps.state->es_result_relations); + if (doDeleteTriggers) + ExecBSDeleteTriggers(mt_state->ps.state, + mt_state->ps.state->es_result_relations); +} + +void +ExecASMergeTriggers(ModifyTableState *mt_state) +{ + ListCell *l; + MergeActionSet *actset; + bool doUpdateTriggers = false; + bool doInsertTriggers = false; + bool doDeleteTriggers = false; + + /* Scan the actions to see what kind of statements there is */ + actset = mt_state->mt_mergeActPstates[0]; + foreach(l, actset->actions) + { + ModifyTableState *actmtstate; + MergeActionState *actPstate; + MergeAction *actplan; + + actmtstate = (ModifyTableState *)lfirst(l); + actPstate = (MergeActionState *)actmtstate->mt_plans[0]; + actplan = (MergeAction *)actPstate->ps.plan; + + if(actplan->operation == CMD_UPDATE) + doUpdateTriggers = true; + else if(actplan->operation == CMD_INSERT) + doInsertTriggers = true; + else if(actplan->operation == CMD_DELETE) + doDeleteTriggers = true; + } + + /* And fire the triggers */ + if (doUpdateTriggers) + ExecASUpdateTriggers(mt_state->ps.state, + mt_state->ps.state->es_result_relations); + if (doInsertTriggers) + ExecASInsertTriggers(mt_state->ps.state, + mt_state->ps.state->es_result_relations); + if (doDeleteTriggers) + ExecASDeleteTriggers(mt_state->ps.state, + mt_state->ps.state->es_result_relations); +} static HeapTuple GetTupleForTrigger(EState *estate, diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 69f3a28..8637fa8 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -170,6 +170,7 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) case CMD_INSERT: case CMD_DELETE: case CMD_UPDATE: + case CMD_MERGE: estate->es_output_cid = GetCurrentCommandId(true); break; diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index edd175e..2be0491 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -154,6 +154,11 @@ ExecInitNode(Plan *node, EState *estate, int eflags) estate, eflags); break; + case T_MergeAction: + result = (PlanState *) ExecInitMergeAction((MergeAction *) node, + estate, eflags); + break; + case T_Append: result = (PlanState *) ExecInitAppend((Append *) node, estate, eflags); diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 541adaf..893794f 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -704,6 +704,103 @@ lreplace:; return NULL; } +static TupleTableSlot * +ExecMerge(ItemPointer tupleid, + TupleTableSlot *slot, + TupleTableSlot *planSlot, + MergeActionSet *actset, + EState *estate) +{ + + TupleTableSlot *actslot = NULL; + ListCell *each; + + /* + * Try the merge actions one by one until we have a match. + */ + foreach(each, actset->actions) + { + ModifyTableState *mt_pstate; + MergeActionState *action_pstate; + ExprContext *econtext; + bool matched; + + mt_pstate = (ModifyTableState *) lfirst(each); + Assert(IsA(mt_pstate, ModifyTableState)); + + /* + * mt_pstate is supposed to have only ONE mt_plans, + * which is a MergeActionState + */ + action_pstate = (MergeActionState *) mt_pstate->mt_plans[0]; + matched = ((MergeAction *)action_pstate->ps.plan)->matched; + + /* + * If tupleid == NULL, it is a NOT MATCHED case, + * else, it is a MATCHED case, + */ + if ((tupleid == NULL && matched) || + (tupleid != NULL && !matched)) + continue; + + /* Setup the expression context. */ + econtext = action_pstate->ps.ps_ExprContext; + + /* + * Check that additional quals match, if any. + */ + if (action_pstate->ps.qual) + { + ResetExprContext(econtext); + + econtext->ecxt_scantuple = slot; + econtext->ecxt_outertuple = planSlot; + + if (!ExecQual(action_pstate->ps.qual, econtext, false)) + continue; + } + + /* Ok, we have a match. Perform the action */ + + /* First project any RETURNING result tuple slot, if needed */ + if (action_pstate->operation == CMD_INSERT || + action_pstate->operation == CMD_UPDATE) + actslot = ExecProcessReturning(action_pstate->ps.ps_ProjInfo, + slot, planSlot); + + switch (action_pstate->operation) + { + case CMD_INSERT: + return ExecInsert(actslot, planSlot, estate); + + case CMD_UPDATE: + return ExecUpdate(tupleid, + actslot, + planSlot, + &mt_pstate->mt_epqstate, + estate); + + case CMD_DELETE: + return ExecDelete(tupleid, + planSlot, + &mt_pstate->mt_epqstate, + estate); + + case CMD_DONOTHING: + return NULL; + + default: + elog(ERROR, "unknown merge action type for execute"); + break; + } + } + + /* + * No matching action found. Perform the default action, which is + * DO NOTHING. + */ + return NULL; +} /* * Process BEFORE EACH STATEMENT triggers @@ -725,6 +822,9 @@ fireBSTriggers(ModifyTableState *node) ExecBSDeleteTriggers(node->ps.state, node->ps.state->es_result_relations); break; + case CMD_MERGE: + ExecBSMergeTriggers(node); + break; default: elog(ERROR, "unknown operation"); break; @@ -751,6 +851,9 @@ fireASTriggers(ModifyTableState *node) ExecASDeleteTriggers(node->ps.state, node->ps.state->es_result_relations); break; + case CMD_MERGE: + ExecASMergeTriggers(node); + break; default: elog(ERROR, "unknown operation"); break; @@ -777,6 +880,7 @@ ExecModifyTable(ModifyTableState *node) ItemPointer tupleid = NULL; ItemPointerData tuple_ctid; HeapTupleHeader oldtuple = NULL; + MergeActionSet *mergeActSet = NULL; /* * On first call, fire BEFORE STATEMENT triggers before proceeding. @@ -798,6 +902,8 @@ ExecModifyTable(ModifyTableState *node) /* Preload local variables */ subplanstate = node->mt_plans[node->mt_whichplan]; junkfilter = estate->es_result_relation_info->ri_junkFilter; + if(node->mt_mergeActPstates) + mergeActSet = node->mt_mergeActPstates[node->mt_whichplan]; /* * Fetch rows from subplan(s), and execute the required table modification @@ -824,6 +930,8 @@ ExecModifyTable(ModifyTableState *node) estate->es_result_relation_info++; subplanstate = node->mt_plans[node->mt_whichplan]; junkfilter = estate->es_result_relation_info->ri_junkFilter; + if(node->mt_mergeActPstates) + mergeActSet = node->mt_mergeActPstates[node->mt_whichplan]; EvalPlanQualSetPlan(&node->mt_epqstate, subplanstate->plan); continue; } @@ -839,7 +947,7 @@ ExecModifyTable(ModifyTableState *node) /* * extract the 'ctid' or 'wholerow' junk attribute. */ - if (operation == CMD_UPDATE || operation == CMD_DELETE) + if (operation == CMD_UPDATE || operation == CMD_DELETE || operation == CMD_MERGE) { Datum datum; bool isNull; @@ -849,13 +957,24 @@ ExecModifyTable(ModifyTableState *node) datum = ExecGetJunkAttribute(slot, junkfilter->jf_junkAttNo, &isNull); - /* shouldn't ever get a null result... */ if (isNull) - elog(ERROR, "ctid is NULL"); + { + /* + * Shouldn't ever get a null result for UPDATE or DELETE. + * MERGE gets a null ctid in "NOT MATCHED" case + */ + if (operation != CMD_MERGE) + elog(ERROR, "ctid is NULL"); + else + tupleid = NULL; + } + else + { - tupleid = (ItemPointer) DatumGetPointer(datum); - tuple_ctid = *tupleid; /* be sure we don't free ctid!! */ - tupleid = &tuple_ctid; + tupleid = (ItemPointer) DatumGetPointer(datum); + tuple_ctid = *tupleid; /* be sure we don't free the ctid!! */ + tupleid = &tuple_ctid; + } } else { @@ -890,6 +1009,10 @@ ExecModifyTable(ModifyTableState *node) slot = ExecDelete(tupleid, oldtuple, planSlot, &node->mt_epqstate, estate); break; + case CMD_MERGE: + slot = ExecMerge(tupleid, slot, planSlot, + mergeActSet, estate); + break; default: elog(ERROR, "unknown operation"); break; @@ -917,6 +1040,69 @@ ExecModifyTable(ModifyTableState *node) return NULL; } +/* + * When init a merge plan, we also need init its action plans. + * These action plans are "MergeAction" plans. + * + * This function mainly handles the tlist and qual in the plan. + * The returning result is a "MergeActionState". + */ +MergeActionState * +ExecInitMergeAction(MergeAction *node, EState *estate, int eflags) +{ + MergeActionState *result; + + /* + * do nothing when we get to the end of a leaf on tree. + */ + if (node == NULL) + return NULL; + + /* + * create state structure + */ + result = makeNode(MergeActionState); + result->operation = node->operation; + result->ps.plan = (Plan *)node; + result->ps.state = estate; + + /* + * tuple table initialization + */ + ExecInitResultTupleSlot(estate, &result->ps); + + /* + * initialize tuple type + */ + ExecAssignResultTypeFromTL(&result->ps); + + /* + * create expression context for node + */ + ExecAssignExprContext(estate, &result->ps); + + /* + * initialize child expressions + */ + result->ps.targetlist = (List *) + ExecInitExpr((Expr *) node->plan.targetlist, &result->ps); + + result->ps.qual = (List *) + ExecInitExpr((Expr *) node->plan.qual, &result->ps); + + /* + * init the projection information + */ + ExecAssignProjectionInfo(&result->ps, NULL); + + /* + * XXX: do we need a check for the plan output here ? + * (by calling the ExecCheckPlanOutput() function + */ + + return result; +} + /* ---------------------------------------------------------------- * ExecInitModifyTable * ---------------------------------------------------------------- @@ -932,6 +1118,7 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) Plan *subplan; ListCell *l; int i; + bool isMergeAction = false; /* check for unsupported flags */ Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); @@ -972,6 +1159,16 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) foreach(l, node->plans) { subplan = (Plan *) lfirst(l); + + /* + * test if this subplan node is a MergeAction. + * We need this information for setting the junkfilter. + * junkfilter is necessary for an ordinary UPDATE/DELETE plan, + * but not for an UPDATE/DELETE merge action + */ + if (IsA(subplan, MergeAction)) + isMergeAction = true; + mtstate->mt_plans[i] = ExecInitNode(subplan, estate, eflags); estate->es_result_relation_info++; i++; @@ -1101,7 +1298,11 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) break; case CMD_UPDATE: case CMD_DELETE: - junk_filter_needed = true; + case CMD_MERGE: + if(!isMergeAction) + junk_filter_needed = true; + break; + case CMD_DONOTHING: break; default: elog(ERROR, "unknown operation"); @@ -1124,9 +1325,10 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) resultRelInfo->ri_RelationDesc->rd_att->tdhasoid, ExecInitExtraTupleSlot(estate)); - if (operation == CMD_UPDATE || operation == CMD_DELETE) + if (operation == CMD_UPDATE || operation == CMD_DELETE || + operation == CMD_MERGE) { - /* For UPDATE/DELETE, find the appropriate junk attr now */ + /* For UPDATE/DELETE/MERGE, find the appropriate junk attr now */ if (resultRelInfo->ri_RelationDesc->rd_rel->relkind == RELKIND_RELATION) { j->jf_junkAttNo = ExecFindJunkAttribute(j, "ctid"); @@ -1161,6 +1363,36 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) if (estate->es_trig_tuple_slot == NULL) estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate); + /* + * for the merge actions, we need to do similar things as above. + *Each action in each action set should be init by the ExecInitNode. + *The returned planstates will take the place of the original plan nodes. + *These new action set will be put in the mt_mergeActPstates array. + */ + if(node->operation == CMD_MERGE) + { + /*we have one action set for each result relation (main plan)*/ + mtstate->mt_mergeActPstates = + (MergeActionSet **) palloc0(sizeof(MergeActionSet*) * nplans); + + estate->es_result_relation_info = estate->es_result_relations; + i = 0; + foreach(l, node->mergeActPlan) + { + ListCell *e; + MergeActionSet *actset = (MergeActionSet *) lfirst(l); + + foreach(e, actset->actions) + { + lfirst(e) = ExecInitNode((Plan *)lfirst(e), estate, 0); + } + mtstate->mt_mergeActPstates[i] = actset; + estate->es_result_relation_info++; + i++; + } + estate->es_result_relation_info = NULL; + } + return mtstate; } diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 508d7c7..4a57aa7 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -177,6 +177,42 @@ _copyModifyTable(ModifyTable *from) COPY_NODE_FIELD(returningLists); COPY_NODE_FIELD(rowMarks); COPY_SCALAR_FIELD(epqParam); + COPY_NODE_FIELD(mergeActPlan); + + return newnode; +} + +/* + * _copyMergeAction + */ +static MergeAction * +_copyMergeAction(MergeAction *from) +{ + MergeAction *newnode = makeNode(MergeAction); + + /* + * copy node superclass fields + */ + CopyPlanFields((Plan *) from, (Plan *) newnode); + + COPY_SCALAR_FIELD(operation); + COPY_SCALAR_FIELD(matched); + COPY_NODE_FIELD(flattenedqual); + COPY_NODE_FIELD(flattenedtlist); + + return newnode; +} + +/* + * _copyMergeActionSet + */ +static MergeActionSet * +_copyMergeActionSet(MergeActionSet *from) +{ + MergeActionSet *newnode = makeNode(MergeActionSet); + + COPY_SCALAR_FIELD(result_relation); + COPY_NODE_FIELD(actions); return newnode; } @@ -2300,6 +2336,10 @@ _copyQuery(Query *from) COPY_NODE_FIELD(rowMarks); COPY_NODE_FIELD(setOperations); COPY_NODE_FIELD(constraintDeps); + COPY_SCALAR_FIELD(isMergeAction); + COPY_SCALAR_FIELD(matched); + COPY_SCALAR_FIELD(sourceAttrNo); + COPY_NODE_FIELD(mergeActQry); return newnode; } @@ -2309,6 +2349,7 @@ _copyInsertStmt(InsertStmt *from) { InsertStmt *newnode = makeNode(InsertStmt); + COPY_SCALAR_FIELD(isMergeAction); COPY_NODE_FIELD(relation); COPY_NODE_FIELD(cols); COPY_NODE_FIELD(selectStmt); @@ -2323,6 +2364,7 @@ _copyDeleteStmt(DeleteStmt *from) { DeleteStmt *newnode = makeNode(DeleteStmt); + COPY_SCALAR_FIELD(isMergeAction); COPY_NODE_FIELD(relation); COPY_NODE_FIELD(usingClause); COPY_NODE_FIELD(whereClause); @@ -2337,6 +2379,7 @@ _copyUpdateStmt(UpdateStmt *from) { UpdateStmt *newnode = makeNode(UpdateStmt); + COPY_SCALAR_FIELD(isMergeAction); COPY_NODE_FIELD(relation); COPY_NODE_FIELD(targetList); COPY_NODE_FIELD(whereClause); @@ -2374,6 +2417,47 @@ _copySelectStmt(SelectStmt *from) return newnode; } +static MergeStmt * +_copyMergeStmt(MergeStmt *from) +{ + MergeStmt *newnode = makeNode(MergeStmt); + + COPY_NODE_FIELD(relation); + COPY_NODE_FIELD(source); + COPY_NODE_FIELD(matchCondition); + COPY_NODE_FIELD(actions); + + return newnode; +} + +static MergeConditionAction * +_copyMergeConditionAction(MergeConditionAction *from) +{ + MergeConditionAction *newnode = makeNode(MergeConditionAction); + + COPY_SCALAR_FIELD(match); + COPY_NODE_FIELD(condition); + COPY_NODE_FIELD(action); + + return newnode; +} + +static MergeDoNothing * +_copyMergeDoNothing(MergeDoNothing *from) +{ + MergeDoNothing *newnode = makeNode(MergeDoNothing); + + return newnode; +} + +static MergeError* +_copyMergeError(MergeError *from) +{ + MergeError *newnode = makeNode(MergeError); + + return newnode; +} + static SetOperationStmt * _copySetOperationStmt(SetOperationStmt *from) { @@ -3650,6 +3734,12 @@ copyObject(void *from) case T_ModifyTable: retval = _copyModifyTable(from); break; + case T_MergeAction: + retval = _copyMergeAction(from); + break; + case T_MergeActionSet: + retval = _copyMergeActionSet(from); + break; case T_Append: retval = _copyAppend(from); break; @@ -3953,6 +4043,18 @@ copyObject(void *from) case T_SelectStmt: retval = _copySelectStmt(from); break; + case T_MergeStmt: + retval = _copyMergeStmt(from); + break; + case T_MergeConditionAction: + retval = _copyMergeConditionAction(from); + break; + case T_MergeDoNothing: + retval = _copyMergeDoNothing(from); + break; + case T_MergeError: + retval = _copyMergeError(from); + break; case T_SetOperationStmt: retval = _copySetOperationStmt(from); break; diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 19262aa..a95057b 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -879,6 +879,10 @@ _equalQuery(Query *a, Query *b) COMPARE_NODE_FIELD(rowMarks); COMPARE_NODE_FIELD(setOperations); COMPARE_NODE_FIELD(constraintDeps); + COMPARE_SCALAR_FIELD(isMergeAction); + COMPARE_SCALAR_FIELD(matched); + COMPARE_SCALAR_FIELD(sourceAttrNo); + COMPARE_NODE_FIELD(mergeActQry); return true; } @@ -886,6 +890,7 @@ _equalQuery(Query *a, Query *b) static bool _equalInsertStmt(InsertStmt *a, InsertStmt *b) { + COMPARE_SCALAR_FIELD(isMergeAction); COMPARE_NODE_FIELD(relation); COMPARE_NODE_FIELD(cols); COMPARE_NODE_FIELD(selectStmt); @@ -898,6 +903,7 @@ _equalInsertStmt(InsertStmt *a, InsertStmt *b) static bool _equalDeleteStmt(DeleteStmt *a, DeleteStmt *b) { + COMPARE_SCALAR_FIELD(isMergeAction); COMPARE_NODE_FIELD(relation); COMPARE_NODE_FIELD(usingClause); COMPARE_NODE_FIELD(whereClause); @@ -910,6 +916,7 @@ _equalDeleteStmt(DeleteStmt *a, DeleteStmt *b) static bool _equalUpdateStmt(UpdateStmt *a, UpdateStmt *b) { + COMPARE_SCALAR_FIELD(isMergeAction); COMPARE_NODE_FIELD(relation); COMPARE_NODE_FIELD(targetList); COMPARE_NODE_FIELD(whereClause); diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index ee2aeb0..3106040 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -332,6 +332,7 @@ _outModifyTable(StringInfo str, ModifyTable *node) WRITE_NODE_FIELD(returningLists); WRITE_NODE_FIELD(rowMarks); WRITE_INT_FIELD(epqParam); + WRITE_NODE_FIELD(mergeActPlan); } static void @@ -2062,6 +2063,53 @@ _outQuery(StringInfo str, Query *node) WRITE_NODE_FIELD(rowMarks); WRITE_NODE_FIELD(setOperations); WRITE_NODE_FIELD(constraintDeps); + WRITE_BOOL_FIELD(isMergeAction); + WRITE_BOOL_FIELD(matched); + WRITE_INT_FIELD(sourceAttrNo); + WRITE_NODE_FIELD(mergeActQry); +} + +static void +_outMergeConditionAction(StringInfo str, MergeConditionAction *node) +{ + WRITE_NODE_TYPE("MERGECONDITIONACTION"); + + WRITE_BOOL_FIELD(match); + WRITE_NODE_FIELD(condition); + WRITE_NODE_FIELD(action); +} + +static void +_outMergeStmt(StringInfo str, MergeStmt *node) +{ + WRITE_NODE_TYPE("MERGESTMT"); + + WRITE_NODE_FIELD(relation); + WRITE_NODE_FIELD(source); + WRITE_NODE_FIELD(matchCondition); + WRITE_NODE_FIELD(actions); +} + +static void +_outMergeAction(StringInfo str, MergeAction *node) +{ + WRITE_NODE_TYPE("MERGEACTION"); + + _outPlanInfo(str, (Plan *)node); + + WRITE_ENUM_FIELD(operation, CmdType); + WRITE_BOOL_FIELD(matched); + WRITE_NODE_FIELD(flattenedqual); + WRITE_NODE_FIELD(flattenedtlist); +} + +static void +_outMergeActionSet(StringInfo str, MergeActionSet *node) +{ + WRITE_NODE_TYPE("MERGEACTIONSET"); + + WRITE_INT_FIELD(result_relation); + WRITE_NODE_FIELD(actions); } static void @@ -2953,6 +3001,18 @@ _outNode(StringInfo str, void *obj) case T_XmlSerialize: _outXmlSerialize(str, obj); break; + case T_MergeAction: + _outMergeAction(str, obj); + break; + case T_MergeActionSet: + _outMergeActionSet(str, obj); + break; + case T_MergeStmt: + _outMergeStmt(str, obj); + break; + case T_MergeConditionAction: + _outMergeConditionAction(str,obj); + break; default: diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 9a4e03e..e06c5b8 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -219,6 +219,10 @@ _readQuery(void) READ_NODE_FIELD(rowMarks); READ_NODE_FIELD(setOperations); READ_NODE_FIELD(constraintDeps); + READ_BOOL_FIELD(isMergeAction); + READ_BOOL_FIELD(matched); + READ_INT_FIELD(sourceAttrNo); + READ_NODE_FIELD(mergeActQry); READ_DONE(); } diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 52dd27b..ec6dd22 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -4090,7 +4090,7 @@ make_result(PlannerInfo *root, ModifyTable * make_modifytable(CmdType operation, List *resultRelations, List *subplans, List *returningLists, - List *rowMarks, int epqParam) + List *rowMarks, List *mergeActPlans, int epqParam) { ModifyTable *node = makeNode(ModifyTable); Plan *plan = &node->plan; @@ -4143,6 +4143,8 @@ make_modifytable(CmdType operation, List *resultRelations, node->returningLists = returningLists; node->rowMarks = rowMarks; node->epqParam = epqParam; + if (operation == CMD_MERGE) + node->mergeActPlan = mergeActPlans; return node; } diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 93daedc..55ca01f 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -103,7 +103,8 @@ static void get_column_info_for_window(PlannerInfo *root, WindowClause *wc, int *ordNumCols, AttrNumber **ordColIdx, Oid **ordOperators); - +static ModifyTable *merge_action_planner(PlannerInfo *root, Plan *mainPlan); +static MergeActionSet *merge_action_set_planner(PlannerInfo *root, Plan *mainPlan); /***************************************************************************** * @@ -453,6 +454,27 @@ subquery_planner(PlannerGlobal *glob, Query *parse, } /* + * for MERGE command, we need to preprocess + * the expressions for merge actions too. + */ + if(parse->commandType == CMD_MERGE) + { + ListCell *e; + + foreach(e, parse->mergeActQry) + { + Query *actqry = (Query *)lfirst(e); + + actqry->targetList = (List *) preprocess_expression(root, + (Node *) actqry->targetList, + EXPRKIND_TARGET); + actqry->jointree->quals = preprocess_expression(root, + (Node *) actqry->jointree->quals, + EXPRKIND_QUAL); + } + } + + /* * In some cases we may want to transfer a HAVING clause into WHERE. We * cannot do so if the HAVING clause contains aggregates (obviously) or * volatile functions (since a HAVING clause is supposed to be executed @@ -529,6 +551,7 @@ subquery_planner(PlannerGlobal *glob, Query *parse, { List *returningLists; List *rowMarks; + List *mergeAction; /* * Deal with the RETURNING clause if any. It's convenient to pass @@ -560,12 +583,19 @@ subquery_planner(PlannerGlobal *glob, Query *parse, else rowMarks = root->rowMarks; + /*if here is a MERGE command, we need to plan the actions too*/ + if(parse->commandType == CMD_MERGE) + mergeAction = list_make1(merge_action_set_planner(root, plan)); + else + mergeAction = NIL; + plan = (Plan *) make_modifytable(parse->commandType, copyObject(root->resultRelations), list_make1(plan), returningLists, rowMarks, - SS_assign_special_param(root)); + mergeAction, + SS_assign_special_param(root)); } } @@ -586,6 +616,130 @@ subquery_planner(PlannerGlobal *glob, Query *parse, } /* +to generate the merge action set for the main plan. +Call this function after the grouping_planner() + +Only works for MERGE command +*/ +static MergeActionSet * +merge_action_set_planner(PlannerInfo *root, Plan *mainPlan) +{ + MergeActionSet *result; + Query *mainQry = root->parse; + ListCell *l; + PlannerInfo subroot; + + /*for non-merge command, no need to plan the merge actions*/ + if(mainQry->commandType != CMD_MERGE || + mainQry->mergeActQry == NIL) + return NULL; + + /*do a copy of the root info*/ + memcpy(&subroot, root, sizeof(PlannerInfo)); + + /*create the result node*/ + result = makeNode(MergeActionSet); + result->result_relation = mainQry->resultRelation; + result->actions = NIL; + + /*plan the actions one by one*/ + foreach(l, mainQry->mergeActQry) + { + ModifyTable *actplan; + + /*put the action query into the subroot*/ + subroot.parse = (Query *) lfirst(l); + actplan = merge_action_planner(&subroot, mainPlan); + result->actions = lappend(result->actions, actplan); + } + return result; +} + +/* create plan for a single merge action */ +static ModifyTable * +merge_action_planner(PlannerInfo *root, Plan *mainPlan) +{ + Query *parse = root->parse; + MergeAction *actplan; + ModifyTable *result; + + List *returningLists; + List *rowMarks; + + /* + * no having clause in a merge action + */ + Assert(parse->havingQual == NULL); + + /* + * Create the action plan node + */ + actplan = makeNode(MergeAction); + actplan->operation = parse->commandType; + actplan->matched = parse->matched; + + /* copy the cost from the top_plan */ + actplan->plan.startup_cost = mainPlan->startup_cost; + actplan->plan.total_cost = mainPlan->total_cost; + actplan->plan.plan_rows = mainPlan->plan_rows; + actplan->plan.plan_width = mainPlan->plan_width; + + /* + * Here, the quals expressions are flattened, which is accepted + * by deparse functions in EXPLAIN. + * But, these expressions will be processed by push_up_vars + * latterly and become not flat again. + * So, we need to keep a copy of current quals for explaining. + */ + actplan->flattenedqual = (List *) copyObject(parse->jointree->quals); + actplan->plan.qual = (List *)parse->jointree->quals; + + /* prepare the target list */ + if (parse->targetList) + { + actplan->plan.targetlist = preprocess_targetlist(root, + parse->targetList); + /*the target list should also be copied for EXPLAIN*/ + actplan->flattenedtlist = (List *) copyObject(actplan->plan.targetlist); + } + + /* + *In general situation, all the vars in target list and quals are flattened. + *But, we want them to point to the attributes of the top join plan, not to + *the subplans. So push them up again here. + */ + push_up_merge_action_vars(actplan, parse); + + if (parse->returningList) + { + List *rlist; + + Assert(parse->resultRelation); + rlist = set_returning_clause_references(root->glob, + parse->returningList, + &actplan->plan, + parse->resultRelation); + returningLists = list_make1(rlist); + } + else + returningLists = NIL; + + if (parse->rowMarks) + rowMarks = NIL; + else + rowMarks = root->rowMarks; + + result = make_modifytable(parse->commandType, + list_make1_int(parse->resultRelation), + list_make1(actplan), + returningLists, + rowMarks, + NIL, + SS_assign_special_param(root)); + return result; +} + +/* * preprocess_expression * Do subquery_planner's preprocessing work for an expression, * which can be a targetlist, a WHERE clause (including JOIN/ON @@ -730,6 +884,7 @@ inheritance_planner(PlannerInfo *root) List *returningLists = NIL; List *rtable = NIL; List *rowMarks; + List *mergeActSets = NIL; List *tlist; PlannerInfo subroot; ListCell *l; @@ -791,6 +946,66 @@ inheritance_planner(PlannerInfo *root) appinfo->child_relid); returningLists = lappend(returningLists, rlist); } + + /* + *For a merge command, we need to generate a set of action plans corresponding + *to current result relations. + *The adjust_appendrel_attrs() will not process the merge action list of + *the main query. And, We need to do this here for a MERGE command. + *In fact, no need to do a full query tree mutator on the action queries. + *only adjust the target list and quals. + */ + if (parse->commandType == CMD_MERGE) + { + ListCell *e; + MergeActionSet *maset; + + /* + *the parse in subroot now is a copy of the main query of current result relation + *Here we need to generate a copy of the action queries and shift their target table + *to current result relation + */ + subroot.parse->mergeActQry = NIL; + + foreach(e, parse->mergeActQry) + { + Query *actqry = (Query *)lfirst(e); + Query *newactqry = makeNode(Query); + + /*copy most of the common fields from original query*/ + *newactqry = *actqry; + + /*reset the result relation to current child table*/ + newactqry->resultRelation = subroot.parse->resultRelation; + + /*make the range table to be consistent with current main query*/ + newactqry->rtable = subroot.parse->rtable; + + /*adjust the target list*/ + newactqry->targetList = (List *) adjust_appendrel_attrs( + (Node *) actqry->targetList, + appinfo); + newactqry->targetList = adjust_inherited_tlist(newactqry->targetList, + appinfo); + /*and qual*/ + newactqry->jointree = makeNode(FromExpr); + newactqry->jointree->fromlist = subroot.parse->jointree->fromlist; + newactqry->jointree->quals = adjust_appendrel_attrs( + actqry->jointree->quals, + appinfo); + + /*put this new action query in to the action list of current main query*/ + subroot.parse->mergeActQry = lappend(subroot.parse->mergeActQry, newactqry); + } + + /* + * now we have a complete query (main query + action queries) that has been + *shifted to current result relation. Plan these actions here. + */ + maset = merge_action_set_planner(&subroot, subplan); + Assert(maset != NULL); + mergeActSets = lappend(mergeActSets, maset); + } } root->resultRelations = resultRelations; @@ -843,6 +1058,7 @@ inheritance_planner(PlannerInfo *root) subplans, returningLists, rowMarks, + mergeActSets, SS_assign_special_param(root)); } diff --git a/src/backend/optimizer/prep/preptlist.c b/src/backend/optimizer/prep/preptlist.c index 2c97c71..73e832d 100644 --- a/src/backend/optimizer/prep/preptlist.c +++ b/src/backend/optimizer/prep/preptlist.c @@ -78,6 +78,76 @@ preprocess_targetlist(PlannerInfo *root, List *tlist) result_relation, range_table); /* + for MERGE command, we also need to expend the target list of the main query. + Note that, the target list of main query is the combination of attrs + from Source table and target table. We only want to expend the part + of target table. + + We do this in an aggressive way: + 1. Truncate the old target list, keep only the entries for source table + 2. expend the target list of result relation from an NIL list. + 3. Append this new list at the end of old target list. + */ + if (command_type == CMD_MERGE) + { + ListCell *l; + List *TLforResultRelatoin = NIL; + int new_resno; + + Assert(parse->sourceAttrNo > 0); + + tlist = list_truncate(tlist, parse->sourceAttrNo); + + TLforResultRelatoin = expand_targetlist(TLforResultRelatoin, command_type, + result_relation, range_table); + new_resno = parse->sourceAttrNo + 1; + foreach(l, TLforResultRelatoin) + { + TargetEntry *te = (TargetEntry *)lfirst(l); + te->resno = new_resno++; + } + + tlist = list_concat(tlist, TLforResultRelatoin); + } + + /* + * for "update" , "delete" and "merge" queries, add ctid of the result relation into + * the target list so that the ctid will propagate through execution and + * ExecutePlan() will be able to identify the right tuple to replace or + * delete. This extra field is marked "junk" so that it is not stored + * back into the tuple. + * + * BUT, if the query node is a merge action, + * we don't need to expand the ctid attribute in tlist. + * The tlist of the merge top level plan already contains + * a "ctid" junk attr of the target relation. + */ + if (!parse->isMergeAction && + (command_type == CMD_UPDATE || + command_type == CMD_DELETE || + command_type == CMD_MERGE)) + { + TargetEntry *tle; + Var *var; + + var = makeVar(result_relation, SelfItemPointerAttributeNumber, + TIDOID, -1, 0); + tle = makeTargetEntry((Expr *) var, + list_length(tlist) + 1, + pstrdup("ctid"), + true); + /* + * For an UPDATE, expand_targetlist already created a fresh tlist. For + * DELETE, better do a listCopy so that we don't destructively modify + * the original tlist (is this really necessary?). + */ + if (command_type == CMD_DELETE) + tlist = list_copy(tlist); + + tlist = lappend(tlist, tle); + } + + /* * Add necessary junk columns for rowmarked rels. These values are needed * for locking of rels selected FOR UPDATE/SHARE, and to do EvalPlanQual * rechecking. While we are at it, store these junk attnos in the @@ -305,6 +375,7 @@ expand_targetlist(List *tlist, int command_type, } break; case CMD_UPDATE: + case CMD_MERGE: if (!att_tup->attisdropped) { new_expr = (Node *) makeVar(result_relation, diff --git a/src/backend/optimizer/prep/prepunion.c b/src/backend/optimizer/prep/prepunion.c index 0d3a739..2fda504 100644 --- a/src/backend/optimizer/prep/prepunion.c +++ b/src/backend/optimizer/prep/prepunion.c @@ -101,9 +101,6 @@ static Bitmapset *translate_col_privs(const Bitmapset *parent_privs, static Node *adjust_appendrel_attrs_mutator(Node *node, AppendRelInfo *context); static Relids adjust_relid_set(Relids relids, Index oldrelid, Index newrelid); -static List *adjust_inherited_tlist(List *tlist, - AppendRelInfo *context); - /* * plan_set_operations @@ -1741,8 +1738,9 @@ adjust_relid_set(Relids relids, Index oldrelid, Index newrelid) * scribble on. * * Note that this is not needed for INSERT because INSERT isn't inheritable. + * But the INSERT actions in MERGE need this function */ -static List * +List * adjust_inherited_tlist(List *tlist, AppendRelInfo *context) { bool changed_it = false; diff --git a/src/backend/optimizer/util/var.c b/src/backend/optimizer/util/var.c index 399f3a9..862dc5c 100644 --- a/src/backend/optimizer/util/var.c +++ b/src/backend/optimizer/util/var.c @@ -67,6 +67,16 @@ typedef struct bool inserted_sublink; /* have we inserted a SubLink? */ } flatten_join_alias_vars_context; +typedef struct +{ + int varno_source; + int varno_target; + int varno_join; + + int offset_source; + int offset_target; +} push_up_merge_action_vars_context; + static bool pull_varnos_walker(Node *node, pull_varnos_context *context); static bool pull_varattnos_walker(Node *node, Bitmapset **varattnos); @@ -83,6 +93,8 @@ static bool pull_var_clause_walker(Node *node, static Node *flatten_join_alias_vars_mutator(Node *node, flatten_join_alias_vars_context *context); static Relids alias_relid_set(PlannerInfo *root, Relids relids); +static bool push_up_merge_action_vars_walker(Node *node, + push_up_merge_action_vars_context *context); /* @@ -677,6 +689,79 @@ pull_var_clause_walker(Node *node, pull_var_clause_context *context) (void *) context); } +/* + * When prepare for the MERGE command, we have made a + * left join between the Source table and target table as the + * main plan. + * + * In this case, the range table contains ONLY THREE range table entries: + * 1. the source table, which may be a subquery or a plain table + * 2. the entry of the target table, which is a plain table + * 3. join expression with the source table and target table as its parameters. + * + * Each merge action of the command has its own query and + * plan nodes as well. And, the vars in its target list and qual + * expressions may refers to the attribute in any one of the above 3 + * range table entries. + * + * However, since the result tuple slots of merge actions are + * projected from the returned tuple of the join, we need to + * mapping the vars of source table and target table to their + * corresponding attributes in the third range table entry. + * + * This function does the opposite of the flatten_join_alias_vars() + * function. It walks through the target list and qual of a + * MergeAction plan, changes the vars' varno and varattno to the + * corresponding position in the upper level join RTE. + */ +void +push_up_merge_action_vars(MergeAction *actplan, Query *actqry) +{ + push_up_merge_action_vars_context context; + + context.varno_source = Merge_SourceTableRTindex; + context.varno_target = actqry->resultRelation; + context.varno_join = Merge_TopJoinTableRTindex; + context.offset_source = 0; + context.offset_target = actqry->sourceAttrNo; + + push_up_merge_action_vars_walker((Node *)actplan->plan.targetlist, + &context); + push_up_merge_action_vars_walker((Node *)actplan->plan.qual, + &context); +} + +static bool +push_up_merge_action_vars_walker(Node *node, + push_up_merge_action_vars_context *context) +{ + if (node == NULL) + return false; + if (IsA(node, Var)) + { + Var *var = (Var *)node; + + if(var->varno == context->varno_source) + { + var->varno = context->varno_join; + var->varattno += context->offset_source; + return false; + } + else if(var->varno == context->varno_target) + { + var->varno = context->varno_join; + var->varattno += context->offset_target; + return false; + } + else if(var->varno == context->varno_join) + return false; + else + elog(ERROR, "the vars in merge action tlist of qual should only belongs to the source table or target table"); + } + + return expression_tree_walker(node, push_up_merge_action_vars_walker, + (void *) context); +} /* * flatten_join_alias_vars diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c index bb2bf04..73a4708 100644 --- a/src/backend/parser/analyze.c +++ b/src/backend/parser/analyze.c @@ -48,6 +48,7 @@ static Query *transformInsertStmt(ParseState *pstate, InsertStmt *stmt); static List *transformInsertRow(ParseState *pstate, List *exprlist, List *stmtcols, List *icolumns, List *attrnos); static int count_rowexpr_columns(ParseState *pstate, Node *expr); +static Query *transformMergeStmt(ParseState *pstate, MergeStmt *stmt); static Query *transformSelectStmt(ParseState *pstate, SelectStmt *stmt); static Query *transformValuesClause(ParseState *pstate, SelectStmt *stmt); static Query *transformSetOperationStmt(ParseState *pstate, SelectStmt *stmt); @@ -176,6 +177,10 @@ transformStmt(ParseState *pstate, Node *parseTree) result = transformUpdateStmt(pstate, (UpdateStmt *) parseTree); break; + case T_MergeStmt: + result = transformMergeStmt(pstate, (MergeStmt *)parseTree); + break; + case T_SelectStmt: { SelectStmt *n = (SelectStmt *) parseTree; @@ -246,6 +251,7 @@ analyze_requires_snapshot(Node *parseTree) case T_DeleteStmt: case T_UpdateStmt: case T_SelectStmt: + case T_MergeStmt: result = true; break; @@ -299,12 +305,24 @@ transformDeleteStmt(ParseState *pstate, DeleteStmt *stmt) qry->distinctClause = NIL; /* - * The USING clause is non-standard SQL syntax, and is equivalent in - * functionality to the FROM list that can be specified for UPDATE. The - * USING keyword is used rather than FROM because FROM is already a - * keyword in the DELETE syntax. + * The input stmt could be a MergeDelete node. + * In this case, we don't need the process on range table. */ - transformFromClause(pstate, stmt->usingClause); + if (!stmt->isMergeAction) + { + /* set up range table with just the result rel */ + qry->resultRelation = setTargetTable(pstate, stmt->relation, + interpretInhOption(stmt->relation->inhOpt), + true, + ACL_DELETE); + /* + * The USING clause is non-standard SQL syntax, and is equivalent in + * functionality to the FROM list that can be specified for UPDATE. The + * USING keyword is used rather than FROM because FROM is already a + * keyword in the DELETE syntax. + */ + transformFromClause(pstate, stmt->usingClause); + } qual = transformWhereClause(pstate, stmt->whereClause, "WHERE"); @@ -369,6 +387,8 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt) * VALUES clause. If we have any of those, treat it as a general SELECT; * so it will work, but you can't use DEFAULT items together with those. */ + + /* a MergeInsert statement is always a VALUES clause*/ isGeneralSelect = (selectStmt && (selectStmt->valuesLists == NIL || selectStmt->sortClause != NIL || selectStmt->limitOffset != NULL || @@ -407,7 +427,8 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt) * mentioned in the SELECT part. Note that the target table is not added * to the joinlist or namespace. */ - qry->resultRelation = setTargetTable(pstate, stmt->relation, + if (!stmt->isMergeAction) /* for MergeInsert, no need to do this */ + qry->resultRelation = setTargetTable(pstate, stmt->relation, false, false, ACL_INSERT); /* Validate stmt->cols list, or build default list if no list given */ @@ -1802,16 +1823,18 @@ transformUpdateStmt(ParseState *pstate, UpdateStmt *stmt) qry->cteList = transformWithClause(pstate, stmt->withClause); } - qry->resultRelation = setTargetTable(pstate, stmt->relation, - interpretInhOption(stmt->relation->inhOpt), - true, - ACL_UPDATE); - - /* - * the FROM clause is non-standard SQL syntax. We used to be able to do - * this with REPLACE in POSTQUEL so we keep the feature. - */ - transformFromClause(pstate, stmt->fromClause); + if (!stmt->isMergeAction) /* for MergeUpdate, no need to do this */ + { + qry->resultRelation = setTargetTable(pstate, stmt->relation, + interpretInhOption(stmt->relation->inhOpt), + true, + ACL_UPDATE); + /* + * the FROM clause is non-standard SQL syntax. We used to be able to do + * this with REPLACE in POSTQUEL so we keep the feature. + */ + transformFromClause(pstate, stmt->fromClause); + } qry->targetList = transformTargetList(pstate, stmt->targetList); @@ -2313,3 +2336,329 @@ applyLockingClause(Query *qry, Index rtindex, rc->pushedDown = pushedDown; qry->rowMarks = lappend(qry->rowMarks, rc); } + +/* + * transform an action of merge command into a query. + * No change of the pstate range table is allowed in this function. + */ +static Query * +transformMergeActions(ParseState *pstate, MergeStmt *stmt, + MergeConditionAction *condact) +{ + Query *actqry; + + /* + * First, we need to make sure that DELETE and UPDATE + * actions are only taken in MATCHED condition, + * and INSERTs are only taken when not MATCHED + */ + switch(condact->action->type) + { + case T_DeleteStmt:/*a delete action*/ + { + DeleteStmt *deleteact = (DeleteStmt *) condact->action; + Assert(deleteact->isMergeAction); + + if (!condact->match) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("The DELETE action in MERGE command is not allowed when NOT MATCHED"))); + + /*put new right code to the result relation. + This line chages the RTE in range table directly*/ + pstate->p_target_rangetblentry->requiredPerms |= ACL_DELETE; + + deleteact->relation = stmt->relation; + deleteact->usingClause = list_make1(stmt->source); + deleteact->whereClause = condact->condition; + + /*parse the action query*/ + actqry = transformStmt(pstate, (Node *) deleteact); + + if (!IsA(actqry, Query) || + actqry->commandType != CMD_DELETE || + actqry->utilityStmt != NULL) + elog(ERROR, "improper DELETE action in merge stmt"); + + actqry->isMergeAction = true; + actqry->matched = condact->match; + + return actqry; + } + break; + case T_UpdateStmt:/*an update action*/ + { + UpdateStmt *updateact = (UpdateStmt *) condact->action; + Assert(updateact->isMergeAction); + + if (!condact->match) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("The UPDATE action in MERGE command is not allowed when NOT MATCHED"))); + + pstate->p_target_rangetblentry->requiredPerms |= ACL_UPDATE; + + /*the "targetlist" of the updateact is filled in the parser */ + updateact->relation = stmt->relation; + updateact->fromClause = list_make1(stmt->source); + updateact->whereClause = condact->condition; + + /*parse the action query*/ + actqry = transformStmt(pstate, (Node *)updateact); + + if (!IsA(actqry, Query) || + actqry->commandType != CMD_UPDATE|| + actqry->utilityStmt != NULL) + elog(ERROR, "improper UPDATE action in merge stmt"); + + actqry->isMergeAction = true; + actqry->matched = condact->match; + + return actqry; + } + break; + case T_InsertStmt:/*an insert action*/ + { + InsertStmt *insertact = (InsertStmt *) condact->action; + Assert(insertact->isMergeAction); + + if(condact->match) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("The INSERT action in MERGE command is not allowed when MATCHED"))); + + pstate->p_target_rangetblentry->requiredPerms |= ACL_INSERT; + + /*the "cols" and "selectStmt" of the insertact is filled in the parser */ + insertact->relation = stmt->relation; + + /* + the merge insert action has a strange feature. + In an ordinary INSERT, the VALUES list can only + contains constants and DEFAULT. (am I right??) + But in the INSERT action of MERGE command, + the VALUES list can have expressions with + variables(attributes of the target and source tables). + Besides, in the ordinary INSERT, a VALUES list can + never be followed by a WHERE clause. + But in MERGE INSERT action, there are matching conditions. + + Thus, the output qry of this function is an INSERT + query in the style of "INSERT...VALUES...", + except that we have other range tables and a WHERE clause. + Note that it is also different from the "INSERT ... SELECT..." + query, in which the whole SELECT is a subquery. + (We don't have subquery here). + + We construct this novel query structure in order + to keep consistency with other merge action types + (DELETE, UPDATE). In this way, all the merge action + queries are in fact share the very same Range Table, + They only differs in their target lists and join trees + */ + + /*parse the action query, this will call + transformInsertStmt() which analyzes the VALUES list.*/ + actqry = transformStmt(pstate, (Node *)insertact); + + /*do the WHERE clause here, Since the + transformInsertStmt() function only analyzes + the VALUES list but not the WHERE clause*/ + actqry->jointree = makeFromExpr(pstate->p_joinlist, + transformWhereClause(pstate, + condact->condition, + "WHERE")); + if(!IsA(actqry, Query) || + actqry->commandType != CMD_INSERT|| + actqry->utilityStmt != NULL) + elog(ERROR, "improper INSERT action in merge stmt"); + + actqry->isMergeAction = true; + actqry->matched = condact->match; + + return actqry; + } + break; + case T_MergeDoNothing: + { + MergeDoNothing *nothingact; + + nothingact = (MergeDoNothing *)(condact->action); + + Assert(IsA(nothingact,MergeDoNothing)); + + actqry = makeNode(Query); + + actqry->jointree = makeFromExpr(pstate->p_joinlist, + transformWhereClause(pstate, + condact->condition, + "WHERE")); + actqry->rtable = pstate->p_rtable; + + actqry->commandType = CMD_DONOTHING; + actqry->isMergeAction = true; + actqry->matched = condact->match; + + return actqry; + } + break; + default: + elog(ERROR, "unknown MERGE action type %d", condact->action->type); + break; + } + + /*never comes here*/ + return NULL; +} + +static Query * +transformMergeStmt(ParseState *pstate, MergeStmt *stmt) +{ + Query *qry; + + ColumnRef *starRef; + ResTarget *starResTarget; + ListCell *act; + ListCell *l; + JoinExpr *joinexp; + RangeTblEntry *resultRTE = NULL; + RangeTblEntry *topjoinRTE = NULL; + RangeTblEntry *sourceRTE = NULL; + + /*firstly, create the output node structure*/ + qry = makeNode(Query); + qry->commandType = CMD_MERGE; + + /* + * What we are doing here is to create a query like + * "SELECT * FROM <source_rel> LEFT JOIN <target_rel> ON <match_condition>;" + * + * Note: + * 1. we set the "match condition" as the join qualification. + * The left join will scan both the matched and non-matched tuples. + * + * 2. a normal SELECT query has no "target relation". + * But here we need to set the target relation in query, + * like the UPDATE/DELETE/INSERT queries. + * So this is a left join SELECT with a "target table" in its range table. + * + * 3. We don't have a specific ACL level for Merge, here we just use + * ACL_SELECT. + * But we will add other ACL levels when handle each merge actions. + */ + + /* + * Before transforming the FROM clause, acquire write lock on the + * target relation. We don't want to add it to the range table yet, + * so we use setTargetTableLock() instead of setTargetTable(). + */ + setTargetTableLock(pstate, stmt->relation); + + /* + * Make the join expression on source table and target table, + * as the only element in FROM list + */ + joinexp = makeNode(JoinExpr); + joinexp->jointype = JOIN_LEFT; + joinexp->isNatural = FALSE; + joinexp->larg = stmt->source; + joinexp->rarg = (Node *)stmt->relation; + joinexp->quals = stmt->matchCondition; + + /* + * transform the FROM clause. The target relation and + * source relation will be added to the range table here. + */ + transformFromClause(pstate, list_make1(joinexp)); + + /* the targetList of the main query is "*" */ + starRef = makeNode(ColumnRef); + starRef->fields = list_make1(makeNode(A_Star)); + starRef->location = 1; + + starResTarget = makeNode(ResTarget); + starResTarget->name = NULL; + starResTarget->indirection = NIL; + starResTarget->val = (Node *)starRef; + starResTarget->location = 1; + + qry->targetList = transformTargetList(pstate, list_make1(starResTarget)); + + /* we don't need a WHERE clause here. Set it null. */ + qry->jointree = makeFromExpr(pstate->p_joinlist, NULL); + + /* + *The range table of a MERGE command query has only 3 RTE. + *RTE 1 is the source table, RTE 2 is the target table, RTE 3 is the left join + *between source and target table. + */ + qry->rtable = pstate->p_rtable; + + /*Set RTE 2 as the result relation of query*/ + qry->resultRelation = 2; + resultRTE = rt_fetch(qry->resultRelation, pstate->p_rtable); + if(resultRTE->relid != pstate->p_target_relation->rd_id) + elog(ERROR, "The target relation entry should be the second one in range table"); + + resultRTE->requiredPerms = ACL_SELECT; + resultRTE->inh = interpretInhOption(stmt->relation->inhOpt); + pstate->p_target_rangetblentry = resultRTE; + + /* + *we also need to find out how many attributes are there in the source table. + *There are many ways to do this. Here, I choose to scan the join var list of the + *top join table entry. And count how many vars belong to source table. + */ + topjoinRTE = rt_fetch(Merge_TopJoinTableRTindex, pstate->p_rtable); + Assert(topjoinRTE->jointype == JOIN_LEFT); + + qry->sourceAttrNo = 0; + foreach(l, topjoinRTE->joinaliasvars) + { + Var *jv = (Var *)lfirst(l); + + if(jv->varno == Merge_SourceTableRTindex) + qry->sourceAttrNo++; + } + /*lets do a simple check here*/ + sourceRTE = rt_fetch(Merge_SourceTableRTindex, pstate->p_rtable); + Assert(qry->sourceAttrNo == list_length(sourceRTE->eref->colnames)); + + /* + * For each action, transform it to a separate query. + * The action queries share the range table with the main query. + * + * In other words, in the extra conditions of the sub actions, + * we don't allow involvement of new tables + */ + qry->mergeActQry = NIL; + + foreach(act,stmt->actions) + { + MergeConditionAction *mca = (MergeConditionAction *) lfirst(act); + Query *actqry; + + /* transform the act (and its condition) as a single query. */ + actqry = transformMergeActions(pstate, stmt, mca); + + /* + * since we don't invoke setTargetTable() in transformMergeActions(), + * we need to set actqry->resultRelation here + */ + actqry->resultRelation = qry->resultRelation; + + /*record the source attr number in each action query, for latter use*/ + actqry->sourceAttrNo = qry->sourceAttrNo; + + /* put it into the list */ + qry->mergeActQry = lappend(qry->mergeActQry, actqry); + } + + /* + * set the sublink mark at the last. + * Thus, the sublink in actions will be counted in. + */ + qry->hasSubLinks = pstate->p_hasSubLinks; + + return qry; +} diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 609c472..f045abd 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -212,6 +212,11 @@ static RangeVar *makeRangeVarFromAnyName(List *names, int position, core_yyscan_ DeallocateStmt PrepareStmt ExecuteStmt DropOwnedStmt ReassignOwnedStmt AlterTSConfigurationStmt AlterTSDictionaryStmt + MergeStmt + +%type <node> opt_and_condition merge_condition_action merge_action +%type <boolean> opt_not +%type <list> merge_condition_action_list %type <node> select_no_parens select_with_parens select_clause simple_select values_clause @@ -483,7 +488,7 @@ static RangeVar *makeRangeVarFromAnyName(List *names, int position, core_yyscan_ DEFERRABLE DEFERRED DEFINER DELETE_P DELIMITER DELIMITERS DESC DICTIONARY DISABLE_P DISCARD DISTINCT DO DOCUMENT_P DOMAIN_P DOUBLE_P DROP - EACH ELSE ENABLE_P ENCODING ENCRYPTED END_P ENUM_P ESCAPE EXCEPT + EACH ELSE ENABLE_P ENCODING ENCRYPTED END_P ENUM_P ERROR_P ESCAPE EXCEPT EXCLUDE EXCLUDING EXCLUSIVE EXECUTE EXISTS EXPLAIN EXTERNAL EXTRACT FALSE_P FAMILY FETCH FIRST_P FLOAT_P FOLLOWING FOR FORCE FOREIGN FORWARD @@ -506,7 +511,7 @@ static RangeVar *makeRangeVarFromAnyName(List *names, int position, core_yyscan_ LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOGIN_P - MAPPING MATCH MAXVALUE MINUTE_P MINVALUE MODE MONTH_P MOVE + MAPPING MATCH MATCHED MAXVALUE MERGE MINUTE_P MINVALUE MODE MONTH_P MOVE NAME_P NAMES NATIONAL NATURAL NCHAR NEXT NO NOCREATEDB NOCREATEROLE NOCREATEUSER NOINHERIT NOLOGIN_P NONE NOSUPERUSER @@ -521,7 +526,7 @@ static RangeVar *makeRangeVarFromAnyName(List *names, int position, core_yyscan_ QUOTE - RANGE READ REAL REASSIGN RECHECK RECURSIVE REF REFERENCES REINDEX + RAISE RANGE READ REAL REASSIGN RECHECK RECURSIVE REF REFERENCES REINDEX RELATIVE_P RELEASE RENAME REPEATABLE REPLACE REPLICA RESET RESTART RESTRICT RETURNING RETURNS REVOKE RIGHT ROLE ROLLBACK ROW ROWS RULE @@ -730,6 +735,7 @@ stmt : | ListenStmt | LoadStmt | LockStmt + | MergeStmt | NotifyStmt | PrepareStmt | ReassignOwnedStmt @@ -7128,6 +7134,7 @@ ExplainableStmt: | InsertStmt | UpdateStmt | DeleteStmt + | MergeStmt | DeclareCursorStmt | CreateAsStmt | ExecuteStmt /* by default all are $$=$1 */ @@ -7476,6 +7483,112 @@ set_target_list: /***************************************************************************** * * QUERY: + * MERGE STATEMENT + * + *****************************************************************************/ + + +MergeStmt: + MERGE INTO relation_expr_opt_alias + USING table_ref + ON a_expr + merge_condition_action_list + { + MergeStmt *m = makeNode(MergeStmt); + + m->relation = $3; + m->source = $5; + m->matchCondition = $7; + m->actions = $8; + + $$ = (Node *)m; + } + ; + +merge_condition_action_list: + merge_condition_action + { $$ = list_make1($1); } + | merge_condition_action_list merge_condition_action + { $$ = lappend($1,$2); } + ; + +merge_condition_action: + WHEN opt_not MATCHED opt_and_condition THEN merge_action + { + MergeConditionAction *m = makeNode(MergeConditionAction); + + m->match = $2; + m->condition = $4; + m->action = $6; + + $$ = (Node *)m; + } + ; + +opt_and_condition: + AND a_expr { $$ = $2; } + | /*EMPTY*/ { $$ = NULL; } + ; + +opt_not: + NOT { $$ = false; } + | /*EMPTY*/ { $$ = true; } + ; + +merge_action: + DELETE_P + { + DeleteStmt *n = makeNode(DeleteStmt); + n->isMergeAction = true; + $$ = (Node *) n; + } + | UPDATE SET set_clause_list + { + UpdateStmt *n = makeNode(UpdateStmt); + n->targetList = $3; + n->isMergeAction = true; + + $$ = (Node *) n; + } + | INSERT values_clause + { + InsertStmt *n = makeNode(InsertStmt); + n->cols = NIL; + n->selectStmt = $2; + n->isMergeAction = true; + + $$ = (Node *) n; + } + + | INSERT '(' insert_column_list ')' values_clause + { + InsertStmt *n = makeNode(InsertStmt); + n->cols = $3; + n->selectStmt = $5; + n->isMergeAction = true; + + $$ = (Node *) n; + } + | INSERT DEFAULT VALUES + { + InsertStmt *n = makeNode(InsertStmt); + n->cols = NIL; + n->selectStmt = NULL; + n->isMergeAction = true; + + $$ = (Node *) n; + } + | DO NOTHING + { + $$ = (Node *) makeNode(MergeDoNothing); + } + ; + + + +/***************************************************************************** + * + * QUERY: * CURSOR STATEMENTS * *****************************************************************************/ @@ -11108,6 +11221,7 @@ unreserved_keyword: | ENCODING | ENCRYPTED | ENUM_P + | ERROR_P | ESCAPE | EXCLUDE | EXCLUDING @@ -11162,7 +11276,9 @@ unreserved_keyword: | LOGIN_P | MAPPING | MATCH + | MATCHED | MAXVALUE + | MERGE | MINUTE_P | MINVALUE | MODE @@ -11205,6 +11321,7 @@ unreserved_keyword: | PROCEDURAL | PROCEDURE | QUOTE + | RAISE | RANGE | READ | REASSIGN diff --git a/src/backend/parser/parse_clause.c b/src/backend/parser/parse_clause.c index 5e13fa4..d149f5e 100644 --- a/src/backend/parser/parse_clause.c +++ b/src/backend/parser/parse_clause.c @@ -214,6 +214,25 @@ setTargetTable(ParseState *pstate, RangeVar *relation, return rtindex; } +void +setTargetTableLock(ParseState *pstate, RangeVar *relation) +{ + + /* Close old target; this could only happen for multi-action rules */ + if (pstate->p_target_relation != NULL) + heap_close(pstate->p_target_relation, NoLock); + + /* + * Open target rel and grab suitable lock (which we will hold till end of + * transaction). + * + * free_parsestate() will eventually do the corresponding heap_close(), + * but *not* release the lock. + */ + pstate->p_target_relation = parserOpenTable(pstate, relation, + RowExclusiveLock); +} + /* * Simplify InhOption (yes/no/default) into boolean yes/no. * diff --git a/src/backend/rewrite/rewriteHandler.c b/src/backend/rewrite/rewriteHandler.c index 31b2595..c6a34cc 100644 --- a/src/backend/rewrite/rewriteHandler.c +++ b/src/backend/rewrite/rewriteHandler.c @@ -2000,6 +2000,41 @@ RewriteQuery(Query *parsetree, List *rewrite_events) return rewritten; } +/*if the merge action type has already been processed by rewriter*/ +#define insert_rewrite (1<<0) +#define delete_rewrite (1<<1) +#define update_rewrite (1<<2) + +/*if the merge action type is fully replace by rules.*/ +#define insert_instead (1<<3) +#define delete_instead (1<<4) +#define update_instead (1<<5) + +#define merge_action_already_rewrite(acttype, flag) \ + ((acttype == CMD_INSERT && (flag & insert_rewrite)) || \ + (acttype == CMD_UPDATE && (flag & update_rewrite)) || \ + (acttype == CMD_DELETE && (flag & delete_rewrite))) + +#define set_action_rewrite(acttype, flag) \ + if(acttype == CMD_INSERT) \ + {flag |= insert_rewrite;}\ + else if(acttype == CMD_UPDATE) \ + {flag |= update_rewrite;}\ + else if(acttype == CMD_DELETE) \ + {flag |= delete_rewrite;} + +#define merge_action_instead(acttype, flag) \ + ((acttype == CMD_INSERT && (flag & insert_instead)) || \ + (acttype == CMD_UPDATE && (flag & update_instead)) || \ + (acttype == CMD_DELETE && (flag & delete_instead))) + +#define set_action_instead(acttype, flag)\ + if(acttype == CMD_INSERT) \ + {flag |= insert_instead;}\ + else if(acttype == CMD_UPDATE) \ + {flag |= update_instead;}\ + else if(acttype == CMD_DELETE) \ + {flag |= delete_instead;} /* * QueryRewrite - @@ -2025,7 +2060,148 @@ QueryRewrite(Query *parsetree) * * Apply all non-SELECT rules possibly getting 0 or many queries */ - querylist = RewriteQuery(parsetree, NIL); + if (parsetree->commandType == CMD_MERGE) + { + /* + * for MERGE, we have a set of action queries (not subquery). + * each of these action queries should rewritten with RewriteQuery(). + */ + ListCell *l; + int flag = 0; + List *pre_qry = NIL; + List *post_qry = NIL; + + querylist = NIL; + + /*rewrite the merge action queries one by one.*/ + foreach(l, parsetree->mergeActQry) + { + List *queryList4action = NIL; + Query *actionqry; + Query *q; + + actionqry = lfirst(l); + + /* no rewriting for DO NOTHING action*/ + if (actionqry->commandType == CMD_DONOTHING) + continue; + + /* + * if this kind of actions are fully replaced by rules, + * we change it into a DO NOTHING action + */ + if (merge_action_instead(actionqry->commandType, flag)) + { + /* + * Still need to call RewriteQuery(), + * since we need the process on target list and so on. + * BUT, the returned list is discarded + */ + RewriteQuery(actionqry, NIL); + actionqry->commandType = CMD_DONOTHING; + actionqry->targetList = NIL; + continue; + } + + /* if this kind of actions are already processed by rewriter, skip it.*/ + if (merge_action_already_rewrite(actionqry->commandType, flag)) + { + RewriteQuery(actionqry, NIL); + continue; + } + + /* ok this action has not been processed before, let's do it now. */ + queryList4action = RewriteQuery(actionqry, NIL); + + /* this kind of actions has been processed, set the flag */ + set_action_rewrite(actionqry->commandType, flag); + + /* + * if the returning list is empty, this merge action + * is replaced by a do-nothing rule + */ + if (queryList4action == NIL) + { + /* set the flag for other merge actions of the same type */ + set_action_instead(actionqry->commandType, flag); + actionqry->commandType = CMD_DONOTHING; + actionqry->targetList = NIL; + continue; + } + + /* + * if the rewriter return a non-NIL list, the merge action query + * could be one element in it. + * if so, it must be the head (for INSERT action) + * or tail (for UPDATE/DELETE action). + */ + + /* test the list head */ + q = (Query *) linitial(queryList4action); + if (q->querySource == QSRC_ORIGINAL) + { + /* + * the merge action is the head, the remaining part + * of the list are the queries generated by rules + * we put them in the post_qry list. + */ + if (querylist == NIL) + querylist = list_make1(parsetree); + + queryList4action = list_delete_first(queryList4action); + post_qry = list_concat(post_qry,queryList4action); + + continue; + } + + /*test the list tail*/ + q = (Query *) llast(queryList4action); + if (q->querySource == QSRC_ORIGINAL) + { + /* + * the merge action is the tail. + * Put the rule queries in pre_qry list + */ + if (querylist == NIL) + querylist = list_make1(parsetree); + + queryList4action = list_truncate(queryList4action, + list_length(queryList4action) - 1); + + pre_qry = list_concat(pre_qry,queryList4action); + continue; + } + + /* + * here, the merge action query is not in the rewritten query list, + * which means the action is replaced by INSTEAD rule(s). + * We need to change it into do noting action. + * + * For a INSERT action, we put the rule queries in the post list + * otherwise, in the pre list + */ + if(actionqry->commandType == CMD_INSERT) + post_qry = list_concat(post_qry,queryList4action); + else + pre_qry = list_concat(pre_qry,queryList4action); + + set_action_instead(actionqry->commandType, flag); + actionqry->commandType = CMD_DONOTHING; + actionqry->targetList = NIL; + } + + /* + * finally, put the 3 lists into one. + * If all the merge actions are replaced by rules, + * the original merge query + * will not be involved in the querylist. + */ + querylist = list_concat(pre_qry,querylist); + querylist = list_concat(querylist, post_qry); + + } + else + querylist = RewriteQuery(parsetree, NIL); /* * Step 2 diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index cba90a9..375923e 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -979,7 +979,7 @@ exec_simple_query(const char *query_string) NULL, 0); plantree_list = pg_plan_queries(querytree_list, 0, NULL); - +//printf("the plann is \n%s\n", nodeToString(plantree_list)); /* Done with the snapshot used for parsing/planning */ if (snapshot_set) PopActiveSnapshot(); diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c index 8eb02da..52ed285 100644 --- a/src/backend/tcop/pquery.c +++ b/src/backend/tcop/pquery.c @@ -225,6 +225,10 @@ ProcessQuery(PlannedStmt *plan, snprintf(completionTag, COMPLETION_TAG_BUFSIZE, "DELETE %u", queryDesc->estate->es_processed); break; + case CMD_MERGE: + snprintf(completionTag, COMPLETION_TAG_BUFSIZE, + "MERGE %u", queryDesc->estate->es_processed); + break; default: strcpy(completionTag, "???"); break; diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 75cb354..e583348 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -126,6 +126,7 @@ CommandIsReadOnly(Node *parsetree) case CMD_UPDATE: case CMD_INSERT: case CMD_DELETE: + case CMD_MERGE: return false; default: elog(WARNING, "unrecognized commandType: %d", @@ -1412,6 +1413,10 @@ CreateCommandTag(Node *parsetree) tag = "SELECT"; break; + case T_MergeStmt: + tag = "MERGE"; + break; + /* utility statements --- same whether raw or cooked */ case T_TransactionStmt: { @@ -2257,6 +2262,7 @@ GetCommandLogLevel(Node *parsetree) case T_InsertStmt: case T_DeleteStmt: case T_UpdateStmt: + case T_MergeStmt: lev = LOGSTMT_MOD; break; diff --git a/src/include/commands/trigger.h b/src/include/commands/trigger.h index 30dc314..a41a169 100644 --- a/src/include/commands/trigger.h +++ b/src/include/commands/trigger.h @@ -178,6 +178,8 @@ extern void ExecBSTruncateTriggers(EState *estate, ResultRelInfo *relinfo); extern void ExecASTruncateTriggers(EState *estate, ResultRelInfo *relinfo); +extern void ExecBSMergeTriggers(ModifyTableState *mt_state); +extern void ExecASMergeTriggers(ModifyTableState *mt_state); extern void AfterTriggerBeginXact(void); extern void AfterTriggerBeginQuery(void); diff --git a/src/include/executor/nodeModifyTable.h b/src/include/executor/nodeModifyTable.h index a8a28a4..96b46f0 100644 --- a/src/include/executor/nodeModifyTable.h +++ b/src/include/executor/nodeModifyTable.h @@ -16,6 +16,7 @@ #include "nodes/execnodes.h" extern ModifyTableState *ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags); +extern MergeActionState *ExecInitMergeAction(MergeAction *node, EState *estate, int eflags); extern TupleTableSlot *ExecModifyTable(ModifyTableState *node); extern void ExecEndModifyTable(ModifyTableState *node); extern void ExecReScanModifyTable(ModifyTableState *node); diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 78188a3..19c0d43 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1032,9 +1032,23 @@ typedef struct ModifyTableState int mt_whichplan; /* which one is being executed (0..n-1) */ EPQState mt_epqstate; /* for evaluating EvalPlanQual rechecks */ bool fireBSTriggers; /* do we need to fire stmt triggers? */ + MergeActionSet **mt_mergeActPstates; /*the list of the planstate of merge command actions. + NULL if this is not a merge command. + The elements if it are still MergeActionSet nodes. + But the action list in these nodes are ModifyTableState */ } ModifyTableState; /* ---------------- + * MergeActionState information + * ---------------- + */ +typedef struct MergeActionState +{ + PlanState ps; /* its first field is NodeTag */ + CmdType operation; +} MergeActionState; + +/* ---------------- * AppendState information * * nplans how many plans are in the array diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 15dabe3..293800c 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -44,6 +44,8 @@ typedef enum NodeTag T_Plan = 100, T_Result, T_ModifyTable, + T_MergeAction, + T_MergeActionSet, T_Append, T_MergeAppend, T_RecursiveUnion, @@ -87,6 +89,7 @@ typedef enum NodeTag T_PlanState = 200, T_ResultState, T_ModifyTableState, + T_MergeActionState, T_AppendState, T_MergeAppendState, T_RecursiveUnionState, @@ -350,6 +353,10 @@ typedef enum NodeTag T_AlterUserMappingStmt, T_DropUserMappingStmt, T_AlterTableSpaceOptionsStmt, + T_MergeStmt, + T_MergeConditionAction, + T_MergeDoNothing, + T_MergeError, T_SecLabelStmt, /* @@ -515,6 +522,8 @@ typedef enum CmdType CMD_UPDATE, /* update stmt */ CMD_INSERT, /* insert stmt */ CMD_DELETE, + CMD_MERGE, /* merge stmt */ + CMD_DONOTHING, /*DO NOTHING action in MERGE command*/ CMD_UTILITY, /* cmds like create, destroy, copy, vacuum, * etc. */ CMD_NOTHING /* dummy command for instead nothing rules diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index e0bdebd..3293b54 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -149,8 +149,24 @@ typedef struct Query List *constraintDeps; /* a list of pg_constraint OIDs that the query * depends on to be semantically valid */ + + /* fields for MERGE command */ + bool isMergeAction; /* if this query is a merge action. */ + bool matched; /* this is a MATCHED action or NOT */ + int sourceAttrNo; /*the number of attributes in source table*/ + List *mergeActQry; /* the list of all the merge actions. + * used only for merge query statement */ } Query; +/* +* In MERGE command, the initial MERGE query has only three range table entry +*The first one is the source table; The second one is the target table; and The +*third one is the left join entry of them. +*During the whole process of a MEREGE command, the rt_index for the source table +*entry and the join table entry will never change. We set them as constant here. +*/ +#define Merge_SourceTableRTindex 1 +#define Merge_TopJoinTableRTindex 3 /**************************************************************************** * Supporting data structures for Parse Trees @@ -892,6 +908,7 @@ typedef struct CommonTableExpr typedef struct InsertStmt { NodeTag type; + bool isMergeAction; /*if this is a merge insert action*/ RangeVar *relation; /* relation to insert into */ List *cols; /* optional: names of the target columns */ Node *selectStmt; /* the source SELECT/VALUES, or NULL */ @@ -906,6 +923,7 @@ typedef struct InsertStmt typedef struct DeleteStmt { NodeTag type; + bool isMergeAction; /*if this is a merge delete action*/ RangeVar *relation; /* relation to delete from */ List *usingClause; /* optional using clause for more tables */ Node *whereClause; /* qualifications */ @@ -920,6 +938,7 @@ typedef struct DeleteStmt typedef struct UpdateStmt { NodeTag type; + bool isMergeAction; /*if this is a merge delete action*/ RangeVar *relation; /* relation to update */ List *targetList; /* the target list (of ResTarget) */ Node *whereClause; /* qualifications */ @@ -996,6 +1015,54 @@ typedef struct SelectStmt /* Eventually add fields for CORRESPONDING spec here */ } SelectStmt; +/* ---------------------- + * Merge Statement + * ---------------------- + */ +typedef struct MergeStmt +{ + NodeTag type; + RangeVar *relation; /* target relation for merge */ + + /* + * source relations for the merge. + * Currently, we only allow single-source merge, + * so the length of this list should always be 1. + */ + Node *source; + Node *matchCondition; /* qualifications of the merge */ + + /* list of MergeConditionAction structure. + * It stores all the matched / not-matched + * conditions and the corresponding actions + * The elements of this list are MergeConditionAction + * nodes. + */ + List *actions; +} MergeStmt; + +/* + * the structure for the actions of MERGE command. + * Holds info of the clauses like + * "WHEN MATCHED AND ... THEN UPDATE/DELETE/INSERT" + */ +typedef struct MergeConditionAction +{ + NodeTag type; + bool match; /* WHEN MATCHED or WHEN NOT MATCHED? */ + Node *condition; /* the AND condition for this action */ + Node *action; /* the actions: delete, insert or update */ +} MergeConditionAction; + +typedef struct MergeDoNothing +{ + NodeTag type; +} MergeDoNothing; + +typedef struct MergeError +{ + NodeTag type; +} MergeError; /* ---------------------- * Set Operation node for post-analysis query trees diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 81038f6..6791b34 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -169,9 +169,38 @@ typedef struct ModifyTable List *returningLists; /* per-target-table RETURNING tlists */ List *rowMarks; /* PlanRowMarks (non-locking only) */ int epqParam; /* ID of Param for EvalPlanQual re-eval */ + List *mergeActPlan; /* the plans for merge actions, + * which are MergeActionSet nodes. + * one set for one target relation */ } ModifyTable; /* ---------------- + * MergeActionSet node - + * The node contains all actions of MERGE command for one specific target relation + * ---------------- + */ +typedef struct MergeActionSet +{ + NodeTag type; + int result_relation; + List *actions; +}MergeActionSet; + +/* ---------------- + * MergeAction node - + * The plan node for the actions of MERGE command + * ---------------- + */ +typedef struct MergeAction +{ + Plan plan; + CmdType operation; /* INSERT, UPDATE, DELETE, DO_NOTHING or RAISE_ERR */ + bool matched; /* is this a MATCHED or NOT MATCHED rule? */ + List *flattenedqual; /* the flattened qual expression of action */ + List *flattenedtlist;/*the fattened target list*/ +} MergeAction; + +/* ---------------- * Append node - * Generate the concatenation of the results of sub-plans. * ---------------- diff --git a/src/include/optimizer/planmain.h b/src/include/optimizer/planmain.h index 5bb0e09..3e8bd0f 100644 --- a/src/include/optimizer/planmain.h +++ b/src/include/optimizer/planmain.h @@ -80,7 +80,7 @@ extern Result *make_result(PlannerInfo *root, List *tlist, Node *resconstantqual, Plan *subplan); extern ModifyTable *make_modifytable(CmdType operation, List *resultRelations, List *subplans, List *returningLists, - List *rowMarks, int epqParam); + List *rowMarks, List *mergeActPlans, int epqParam); extern bool is_projection_capable_plan(Plan *plan); /* diff --git a/src/include/optimizer/prep.h b/src/include/optimizer/prep.h index f8dd542..9ed2a60 100644 --- a/src/include/optimizer/prep.h +++ b/src/include/optimizer/prep.h @@ -53,4 +53,6 @@ extern void expand_inherited_tables(PlannerInfo *root); extern Node *adjust_appendrel_attrs(Node *node, AppendRelInfo *appinfo); +extern List *adjust_inherited_tlist(List * tlist, AppendRelInfo * context); + #endif /* PREP_H */ diff --git a/src/include/optimizer/var.h b/src/include/optimizer/var.h index 8fd9977..e9f4e17 100644 --- a/src/include/optimizer/var.h +++ b/src/include/optimizer/var.h @@ -15,6 +15,7 @@ #define VAR_H #include "nodes/relation.h" +#include "nodes/plannodes.h" typedef enum { @@ -32,5 +33,6 @@ extern int locate_var_of_relation(Node *node, int relid, int levelsup); extern int find_minimum_var_level(Node *node); extern List *pull_var_clause(Node *node, PVCPlaceHolderBehavior behavior); extern Node *flatten_join_alias_vars(PlannerInfo *root, Node *node); +extern void push_up_merge_action_vars(MergeAction *actplan, Query *actqry); #endif /* VAR_H */ diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index d3ea04b..0863234 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -142,6 +142,7 @@ PG_KEYWORD("encoding", ENCODING, UNRESERVED_KEYWORD) PG_KEYWORD("encrypted", ENCRYPTED, UNRESERVED_KEYWORD) PG_KEYWORD("end", END_P, RESERVED_KEYWORD) PG_KEYWORD("enum", ENUM_P, UNRESERVED_KEYWORD) +PG_KEYWORD("error", ERROR_P, UNRESERVED_KEYWORD) PG_KEYWORD("escape", ESCAPE, UNRESERVED_KEYWORD) PG_KEYWORD("except", EXCEPT, RESERVED_KEYWORD) PG_KEYWORD("exclude", EXCLUDE, UNRESERVED_KEYWORD) @@ -231,7 +232,9 @@ PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD) PG_KEYWORD("login", LOGIN_P, UNRESERVED_KEYWORD) PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD) PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD) +PG_KEYWORD("matched", MATCHED, UNRESERVED_KEYWORD) PG_KEYWORD("maxvalue", MAXVALUE, UNRESERVED_KEYWORD) +PG_KEYWORD("merge", MERGE, UNRESERVED_KEYWORD) PG_KEYWORD("minute", MINUTE_P, UNRESERVED_KEYWORD) PG_KEYWORD("minvalue", MINVALUE, UNRESERVED_KEYWORD) PG_KEYWORD("mode", MODE, UNRESERVED_KEYWORD) @@ -298,6 +301,7 @@ PG_KEYWORD("privileges", PRIVILEGES, UNRESERVED_KEYWORD) PG_KEYWORD("procedural", PROCEDURAL, UNRESERVED_KEYWORD) PG_KEYWORD("procedure", PROCEDURE, UNRESERVED_KEYWORD) PG_KEYWORD("quote", QUOTE, UNRESERVED_KEYWORD) +PG_KEYWORD("raise", RAISE, UNRESERVED_KEYWORD) PG_KEYWORD("range", RANGE, UNRESERVED_KEYWORD) PG_KEYWORD("read", READ, UNRESERVED_KEYWORD) PG_KEYWORD("real", REAL, COL_NAME_KEYWORD) diff --git a/src/include/parser/parse_clause.h b/src/include/parser/parse_clause.h index 0e8a80e..831421b 100644 --- a/src/include/parser/parse_clause.h +++ b/src/include/parser/parse_clause.h @@ -19,6 +19,7 @@ extern void transformFromClause(ParseState *pstate, List *frmList); extern int setTargetTable(ParseState *pstate, RangeVar *relation, bool inh, bool alsoSource, AclMode requiredPerms); +extern void setTargetTableLock(ParseState *pstate, RangeVar *relation); extern bool interpretInhOption(InhOption inhOpt); extern bool interpretOidsOption(List *defList); diff --git a/src/test/regress/expected/merge.out b/src/test/regress/expected/merge.out new file mode 100644 index 0000000..7ee155a --- /dev/null +++ b/src/test/regress/expected/merge.out @@ -0,0 +1,202 @@ +-- MERGE -- +-- +-- create basic tables for test, and insert some source rows to work from +-- +--The Stock table, which records the amount of each item on hand. +CREATE TABLE Stock(item_id int UNIQUE, balance int); +NOTICE: CREATE TABLE / UNIQUE will create implicit index "stock_item_id_key" for table "stock" +INSERT INTO Stock VALUES (10, 2200); +INSERT INTO Stock VALUES (20, 1900); +SELECT * FROM Stock; + item_id | balance +---------+--------- + 10 | 2200 + 20 | 1900 +(2 rows) + +--The Buy table, which records the amount we bought today for each item. +CREATE TABLE Buy(item_id int, volume int); +INSERT INTO Buy values(10, 1000); +INSERT INTO Buy values(30, 300); +SELECT * FROM Buy; + item_id | volume +---------+-------- + 10 | 1000 + 30 | 300 +(2 rows) + +--The Sale table, which records the amount we sold today for each item. +CREATE TABLE Sale(item_id int, volume int); +INSERT INTO Sale VALUES (10, 2200); +INSERT INTO Sale VALUES (20, 1000); +SELECT * FROM Sale; + item_id | volume +---------+-------- + 10 | 2200 + 20 | 1000 +(2 rows) + +-- +-- initial queries +-- +-- do a simple equivalent of an UPDATE join +BEGIN; +MERGE INTO Stock USING Buy ON Stock.item_id = Buy.item_id + WHEN MATCHED THEN UPDATE SET balance = balance + Buy.volume +; +SELECT * FROM Stock ORDER BY item_id; + item_id | balance +---------+--------- + 10 | 3200 + 20 | 1900 +(2 rows) + +ROLLBACK; +-- do a simple equivalent of an INSERT SELECT +BEGIN; +MERGE INTO Stock USING Buy ON Stock.item_id = Buy.item_id + WHEN NOT MATCHED THEN INSERT VALUES (Buy.item_id, Buy.volume) +; +SELECT * FROM Stock ORDER BY item_id; + item_id | balance +---------+--------- + 10 | 2200 + 20 | 1900 + 30 | 300 +(3 rows) + +ROLLBACK; +-- now the classic UPSERT +BEGIN; +MERGE INTO Stock USING Buy ON Stock.item_id = Buy.item_id + WHEN MATCHED THEN UPDATE SET balance = balance + Buy.volume + WHEN NOT MATCHED THEN INSERT VALUES (Buy.item_id, Buy.volume) +; +SELECT * FROM Stock ORDER BY item_id; + item_id | balance +---------+--------- + 10 | 3200 + 20 | 1900 + 30 | 300 +(3 rows) + +ROLLBACK; +-- +-- Non-standard functionality +-- +-- a MERGE with DELETE action, which is not allowed in Standard. +-- Extra qualifications are allowed in each WHEN clause. +BEGIN; +MERGE INTO Stock USING Sale ON Stock.item_id = Sale.item_id + WHEN MATCHED AND balance - volume > 0 THEN UPDATE SET balance = balance - volume + WHEN MATCHED THEN DELETE; +; +SELECT * FROM Stock ORDER BY item_id; + item_id | balance +---------+--------- + 20 | 900 +(1 row) + +ROLLBACK; +-- The DO NOTHING action is another extension for MERGE. +-- rows specified by DO NOTHING are ignored +BEGIN; +MERGE INTO Stock USING Sale ON Stock.item_id = Sale.item_id + WHEN MATCHED AND balance - volume > 0 THEN DO NOTHING + WHEN MATCHED THEN DELETE +; +SELECT * FROM Stock ORDER BY item_id; + item_id | balance +---------+--------- + 20 | 1900 +(1 row) + +ROLLBACK; +-- DO NOTHING is the default action for non-matching rows that do not +-- activate any WHEN clause. They are just ignored +BEGIN; +MERGE INTO Stock USING Sale ON Stock.item_id = Sale.item_id + WHEN MATCHED AND balance - volume > 100000 THEN UPDATE SET balance = balance - volume +; +SELECT * FROM Stock ORDER BY item_id; + item_id | balance +---------+--------- + 10 | 2200 + 20 | 1900 +(2 rows) + +ROLLBACK; +-- Prepare the test data to generate multiple matching rows for a single target +INSERT INTO Buy values(10, 400); +SELECT * FROM Buy ORDER BY item_id; + item_id | volume +---------+-------- + 10 | 1000 + 10 | 400 + 30 | 300 +(3 rows) + +-- we now have a duplicate key in Buy, so when we join to +-- Stock we will generate 2 matching rows, not one. +-- According to standard this command should fail. +-- But it succeeds in PostgreSQL implementation by simply ignoring the second +BEGIN; +MERGE INTO Stock USING Buy ON Stock.item_id = Buy.item_id + WHEN MATCHED THEN UPDATE SET balance = balance + Buy.volume +; +SELECT * FROM Stock ORDER BY item_id; + item_id | balance +---------+--------- + 10 | 3200 + 20 | 1900 +(2 rows) + +ROLLBACK; +-- +-- More complicated query +-- +-- The source table of MERGE could be a SELECT clause +BEGIN; +MERGE INTO Stock USING + (SELECT Buy.item_id, (Buy.volume - Sale.volume) as v + FROM Buy, Sale + WHERE Buy.item_id = Sale.item_id) + AS BS + ON Stock.item_id = BS.item_id + WHEN MATCHED THEN UPDATE SET balance = balance + BS.v +; +SELECT * FROM Stock ORDER BY item_id; + item_id | balance +---------+--------- + 10 | 1000 + 20 | 1900 +(2 rows) + +ROLLBACK; +-- Subplan/sublinks can be used in MERGE actions +-- Create a table for sublink. +CREATE TABLE Extra (item_id int, volume int); +INSERT INTO Extra VALUES (10, 20); +INSERT INTO Extra VALUES (10, -7); +INSERT INTO Extra VALUES (20, 16); +INSERT INTO Extra VALUES (20, 5); +INSERT INTO Extra VALUES (30, 9); +-- The following query sum-up the volumes in Extra table and upinsert the Stock. +BEGIN; +MERGE INTO Stock USING Buy ON Stock.item_id = Buy.item_id + WHEN MATCHED THEN UPDATE SET + balance = balance + Buy.volume + + (SELECT sum(volume) FROM Extra WHERE Extra.item_id = Buy.item_id) + WHEN NOT MATCHED THEN INSERT VALUES + (Buy.item_id, Buy.volume + + (SELECT sum(volume) FROM Extra WHERE Extra.item_id = Buy.item_id)) +; +SELECT * FROM Stock ORDER BY item_id; + item_id | balance +---------+--------- + 10 | 3213 + 20 | 1900 + 30 | 309 +(3 rows) + +ROLLBACK; diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule index 3b99e86..9c2d298 100644 --- a/src/test/regress/parallel_schedule +++ b/src/test/regress/parallel_schedule @@ -91,7 +91,7 @@ test: select_views portals_p2 foreign_key cluster dependency guc bitmapops combo # NB: temp.sql does a reconnect which transiently uses 2 connections, # so keep this parallel group to at most 19 tests # ---------- -test: plancache limit plpgsql copy2 temp domain rangefuncs prepare without_oid conversion truncate alter_table sequencepolymorphism rowtypes returning largeobject with xml +test: plancache limit plpgsql copy2 temp domain rangefuncs prepare without_oid conversion truncate alter_table sequencepolymorphism rowtypes returning largeobject with xml merge # run stats by itself because its delay may be insufficient under heavy load test: stats diff --git a/src/test/regress/serial_schedule b/src/test/regress/serial_schedule index b348f0e..fde54d3 100644 --- a/src/test/regress/serial_schedule +++ b/src/test/regress/serial_schedule @@ -124,4 +124,5 @@ test: returning test: largeobject test: with test: xml +test: merge test: stats diff --git a/src/test/regress/sql/merge.sql b/src/test/regress/sql/merge.sql new file mode 100644 index 0000000..5cdb5db --- /dev/null +++ b/src/test/regress/sql/merge.sql @@ -0,0 +1,135 @@ +-- MERGE -- + +-- +-- create basic tables for test, and insert some source rows to work from +-- +--The Stock table, which records the amount of each item on hand. +CREATE TABLE Stock(item_id int UNIQUE, balance int); +INSERT INTO Stock VALUES (10, 2200); +INSERT INTO Stock VALUES (20, 1900); +SELECT * FROM Stock; + +--The Buy table, which records the amount we bought today for each item. +CREATE TABLE Buy(item_id int, volume int); +INSERT INTO Buy values(10, 1000); +INSERT INTO Buy values(30, 300); +SELECT * FROM Buy; + +--The Sale table, which records the amount we sold today for each item. +CREATE TABLE Sale(item_id int, volume int); +INSERT INTO Sale VALUES (10, 2200); +INSERT INTO Sale VALUES (20, 1000); +SELECT * FROM Sale; + +-- +-- initial queries +-- +-- do a simple equivalent of an UPDATE join +BEGIN; +MERGE INTO Stock USING Buy ON Stock.item_id = Buy.item_id + WHEN MATCHED THEN UPDATE SET balance = balance + Buy.volume +; +SELECT * FROM Stock ORDER BY item_id; +ROLLBACK; + +-- do a simple equivalent of an INSERT SELECT +BEGIN; +MERGE INTO Stock USING Buy ON Stock.item_id = Buy.item_id + WHEN NOT MATCHED THEN INSERT VALUES (Buy.item_id, Buy.volume) +; +SELECT * FROM Stock ORDER BY item_id; +ROLLBACK; + +-- now the classic UPSERT +BEGIN; +MERGE INTO Stock USING Buy ON Stock.item_id = Buy.item_id + WHEN MATCHED THEN UPDATE SET balance = balance + Buy.volume + WHEN NOT MATCHED THEN INSERT VALUES (Buy.item_id, Buy.volume) +; +SELECT * FROM Stock ORDER BY item_id; +ROLLBACK; + +-- +-- Non-standard functionality +-- +-- a MERGE with DELETE action, which is not allowed in Standard. +-- Extra qualifications are allowed in each WHEN clause. +BEGIN; +MERGE INTO Stock USING Sale ON Stock.item_id = Sale.item_id + WHEN MATCHED AND balance - volume > 0 THEN UPDATE SET balance = balance - volume + WHEN MATCHED THEN DELETE; +; +SELECT * FROM Stock ORDER BY item_id; +ROLLBACK; + +-- The DO NOTHING action is another extension for MERGE. +-- rows specified by DO NOTHING are ignored +BEGIN; +MERGE INTO Stock USING Sale ON Stock.item_id = Sale.item_id + WHEN MATCHED AND balance - volume > 0 THEN DO NOTHING + WHEN MATCHED THEN DELETE +; +SELECT * FROM Stock ORDER BY item_id; +ROLLBACK; + +-- DO NOTHING is the default action for non-matching rows that do not +-- activate any WHEN clause. They are just ignored +BEGIN; +MERGE INTO Stock USING Sale ON Stock.item_id = Sale.item_id + WHEN MATCHED AND balance - volume > 100000 THEN UPDATE SET balance = balance - volume +; +SELECT * FROM Stock ORDER BY item_id; +ROLLBACK; + +-- Prepare the test data to generate multiple matching rows for a single target +INSERT INTO Buy values(10, 400); +SELECT * FROM Buy ORDER BY item_id; + +-- we now have a duplicate key in Buy, so when we join to +-- Stock we will generate 2 matching rows, not one. +-- According to standard this command should fail. +-- But it succeeds in PostgreSQL implementation by simply ignoring the second +BEGIN; +MERGE INTO Stock USING Buy ON Stock.item_id = Buy.item_id + WHEN MATCHED THEN UPDATE SET balance = balance + Buy.volume +; +SELECT * FROM Stock ORDER BY item_id; +ROLLBACK; + +-- +-- More complicated query +-- +-- The source table of MERGE could be a SELECT clause +BEGIN; +MERGE INTO Stock USING + (SELECT Buy.item_id, (Buy.volume - Sale.volume) as v + FROM Buy, Sale + WHERE Buy.item_id = Sale.item_id) + AS BS + ON Stock.item_id = BS.item_id + WHEN MATCHED THEN UPDATE SET balance = balance + BS.v +; +SELECT * FROM Stock ORDER BY item_id; +ROLLBACK; + +-- Subplan/sublinks can be used in MERGE actions +-- Create a table for sublink. +CREATE TABLE Extra (item_id int, volume int); +INSERT INTO Extra VALUES (10, 20); +INSERT INTO Extra VALUES (10, -7); +INSERT INTO Extra VALUES (20, 16); +INSERT INTO Extra VALUES (20, 5); +INSERT INTO Extra VALUES (30, 9); + +-- The following query sum-up the volumes in Extra table and upinsert the Stock. +BEGIN; +MERGE INTO Stock USING Buy ON Stock.item_id = Buy.item_id + WHEN MATCHED THEN UPDATE SET + balance = balance + Buy.volume + + (SELECT sum(volume) FROM Extra WHERE Extra.item_id = Buy.item_id) + WHEN NOT MATCHED THEN INSERT VALUES + (Buy.item_id, Buy.volume + + (SELECT sum(volume) FROM Extra WHERE Extra.item_id = Buy.item_id)) +; +SELECT * FROM Stock ORDER BY item_id; +ROLLBACK;
pgsql-hackers by date: