Re: dblink: rollback transaction - Mailing list pgsql-general
From | Joe Conway |
---|---|
Subject | Re: dblink: rollback transaction |
Date | |
Msg-id | 40229040.2060508@joeconway.com Whole thread Raw |
In response to | Re: dblink: rollback transaction ("Oleg Lebedev" <oleg.lebedev@waterford.org>) |
List | pgsql-general |
Oleg Lebedev wrote: > Agreed. I wonder if I should simulate local Xactions by using local > dblink calls? > What do you think, Joe? It is an interesting thought. Withing a single plpgsql function, open one local and one remote persistent, named dblink connection. Start a transaction in each. Go into your loop. Here's the problem -- I don't know how you can programmatically detect an error. Try playing with dblink_exec for this. If you can detect an error condition, you can then ABORT both transactions. > So, is it actually possible to use BEGIN; .. COMMIT; statement with > dblink? Sure. Use a named persistent connection. Then issue a BEGIN just like any other remote SQL statement (might be best to use dblink_exec with this also). > Even if I start the remote Xaction before the local one starts, there is > no way for me to catch an exception thrown by the local Xaction. I don't > think Pl/PgSQL supports exceptions. So, if the local Xaction throws an > exception then the whole process terminates. > > Ideas? [runs off to try a few things...] I played with this a bit, and found that with some minor changes to dblink_exec(), I can get the behavior we want, I think. =============================================================== Here's the SQL: =============================================================== \c remote drop table foo; create table foo(f1 int primary key, f2 text); insert into foo values (1,'a'); insert into foo values (2,'b'); insert into foo values (3,'b'); \c local drop table foo; create table foo(f1 int primary key, f2 text); --note this is missing on remote side create unique index uindx1 on foo(f2); create or replace function test() returns text as ' declare res text; tup record; sql text; begin -- leaving out result checking for clarity select into res dblink_connect(''localconn'',''dbname=local''); select into res dblink_connect(''remoteconn'',''dbname=remote''); select into res dblink_exec(''localconn'',''BEGIN''); select into res dblink_exec(''remoteconn'',''BEGIN''); for tup in select * from dblink(''remoteconn'',''select * from foo'') as t(f1 int, f2 text) loop sql := ''insert into foo values ('' || tup.f1::text || '','''''' || tup.f2 || '''''')''; select into res dblink_exec(''localconn'',sql); if res = ''ERROR'' then select into res dblink_exec(''localconn'',''ABORT''); select into res dblink_exec(''remoteconn'',''ABORT''); select into res dblink_disconnect(''localconn''); select into res dblink_disconnect(''remoteconn''); return ''ERROR''; else sql := ''delete from foo where f1 = '' || tup.f1::text; select into res dblink_exec(''remoteconn'',sql); end if; end loop; select into res dblink_exec(''localconn'',''COMMIT''); select into res dblink_exec(''remoteconn'',''COMMIT''); select into res dblink_disconnect(''localconn''); select into res dblink_disconnect(''remoteconn''); return ''OK''; end; ' language plpgsql; =============================================================== Here's the test: =============================================================== local=# select test(); NOTICE: sql error DETAIL: ERROR: duplicate key violates unique constraint "uindx1" CONTEXT: PL/pgSQL function "test" line 15 at select into variables test ------- ERROR (1 row) local=# select * from foo; f1 | f2 ----+---- (0 rows) local=# select * from dblink('dbname=remote','select * from foo') as t(f1 int, f2 text); f1 | f2 ----+---- 1 | a 2 | b 3 | b (3 rows) local=# drop index uindx1; DROP INDEX local=# select test(); test ------ OK (1 row) local=# select * from foo; f1 | f2 ----+---- 1 | a 2 | b 3 | b (3 rows) local=# select * from dblink('dbname=remote','select * from foo') as t(f1 int, f2 text); f1 | f2 ----+---- (0 rows) =============================================================== Patch attached. Thoughts? Joe Index: contrib/dblink/dblink.c =================================================================== RCS file: /opt/src/cvs/pgsql-server/contrib/dblink/dblink.c,v retrieving revision 1.29 diff -c -r1.29 dblink.c *** contrib/dblink/dblink.c 28 Nov 2003 05:03:01 -0000 1.29 --- contrib/dblink/dblink.c 5 Feb 2004 19:49:00 -0000 *************** *** 135,140 **** --- 135,150 ---- errmsg("%s", p2), \ errdetail("%s", msg))); \ } while (0) + #define DBLINK_RES_ERROR_AS_NOTICE(p2) \ + do { \ + msg = pstrdup(PQerrorMessage(conn)); \ + if (res) \ + PQclear(res); \ + ereport(NOTICE, \ + (errcode(ERRCODE_SYNTAX_ERROR), \ + errmsg("%s", p2), \ + errdetail("%s", msg))); \ + } while (0) #define DBLINK_CONN_NOT_AVAIL \ do { \ if(conname) \ *************** *** 731,739 **** if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) ! DBLINK_RES_ERROR("sql error"); ! if (PQresultStatus(res) == PGRES_COMMAND_OK) { /* need a tuple descriptor representing one TEXT column */ tupdesc = CreateTemplateTupleDesc(1, false); --- 741,762 ---- if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) ! { ! DBLINK_RES_ERROR_AS_NOTICE("sql error"); ! ! /* need a tuple descriptor representing one TEXT column */ ! tupdesc = CreateTemplateTupleDesc(1, false); ! TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", ! TEXTOID, -1, 0, false); ! /* ! * and save a copy of the command status string to return as our ! * result tuple ! */ ! sql_cmd_status = GET_TEXT("ERROR"); ! ! } ! else if (PQresultStatus(res) == PGRES_COMMAND_OK) { /* need a tuple descriptor representing one TEXT column */ tupdesc = CreateTemplateTupleDesc(1, false);
pgsql-general by date: