Re: dblink: rollback transaction - Mailing list pgsql-general

From Joe Conway
Subject Re: dblink: rollback transaction
Date
Msg-id 40229040.2060508@joeconway.com
Whole thread Raw
In response to Re: dblink: rollback transaction  ("Oleg Lebedev" <oleg.lebedev@waterford.org>)
List pgsql-general
Oleg Lebedev wrote:
> Agreed. I wonder if I should simulate local Xactions by using local
> dblink calls?
> What do you think, Joe?

It is an interesting thought. Withing a single plpgsql function, open
one local and one remote persistent, named dblink connection. Start a
transaction in each. Go into your loop. Here's the problem -- I don't
know how you can programmatically detect an error. Try playing with
dblink_exec for this. If you can detect an error condition, you can then
ABORT both transactions.

> So, is it actually possible to use BEGIN; .. COMMIT; statement with
> dblink?

Sure. Use a named persistent connection. Then issue a BEGIN just like
any other remote SQL statement (might be best to use dblink_exec with
this also).

> Even if I start the remote Xaction before the local one starts, there is
> no way for me to catch an exception thrown by the local Xaction. I don't
> think Pl/PgSQL supports exceptions. So, if the local Xaction throws an
> exception then the whole process terminates.
>
> Ideas?

[runs off to try a few things...]

I played with this a bit, and found that with some minor changes to
dblink_exec(), I can get the behavior we want, I think.

===============================================================
Here's the SQL:
===============================================================

\c remote
drop table foo;
create table foo(f1 int primary key, f2 text);
insert into foo values (1,'a');
insert into foo values (2,'b');
insert into foo values (3,'b');

\c local
drop table foo;
create table foo(f1 int primary key, f2 text);
--note this is missing on remote side
create unique index uindx1 on foo(f2);

create or replace function test() returns text as '
declare
  res text;
  tup record;
  sql text;
begin
  -- leaving out result checking for clarity
  select into res dblink_connect(''localconn'',''dbname=local'');
  select into res dblink_connect(''remoteconn'',''dbname=remote'');
  select into res dblink_exec(''localconn'',''BEGIN'');
  select into res dblink_exec(''remoteconn'',''BEGIN'');

  for tup in select * from dblink(''remoteconn'',''select * from foo'')
   as t(f1 int, f2 text) loop
   sql := ''insert into foo values ('' || tup.f1::text || '','''''' ||
tup.f2 || '''''')'';
   select into res dblink_exec(''localconn'',sql);
   if res = ''ERROR'' then
    select into res dblink_exec(''localconn'',''ABORT'');
    select into res dblink_exec(''remoteconn'',''ABORT'');
    select into res dblink_disconnect(''localconn'');
    select into res dblink_disconnect(''remoteconn'');
    return ''ERROR'';
   else
    sql := ''delete from foo where f1 = '' || tup.f1::text;
    select into res dblink_exec(''remoteconn'',sql);
   end if;
  end loop;
  select into res dblink_exec(''localconn'',''COMMIT'');
  select into res dblink_exec(''remoteconn'',''COMMIT'');
  select into res dblink_disconnect(''localconn'');
  select into res dblink_disconnect(''remoteconn'');
  return ''OK'';
end;
' language plpgsql;


===============================================================
Here's the test:
===============================================================
local=# select test();
NOTICE:  sql error
DETAIL:  ERROR:  duplicate key violates unique constraint "uindx1"

CONTEXT:  PL/pgSQL function "test" line 15 at select into variables
  test
-------
  ERROR
(1 row)

local=# select * from foo;
  f1 | f2
----+----
(0 rows)

local=# select * from dblink('dbname=remote','select * from foo') as
t(f1 int, f2 text);
  f1 | f2
----+----
   1 | a
   2 | b
   3 | b
(3 rows)

local=# drop index uindx1;
DROP INDEX
local=# select test();
  test
------
  OK
(1 row)

local=# select * from foo;
  f1 | f2
----+----
   1 | a
   2 | b
   3 | b
(3 rows)

local=# select * from dblink('dbname=remote','select * from foo') as
t(f1 int, f2 text);
  f1 | f2
----+----
(0 rows)

===============================================================

Patch attached. Thoughts?

Joe




Index: contrib/dblink/dblink.c
===================================================================
RCS file: /opt/src/cvs/pgsql-server/contrib/dblink/dblink.c,v
retrieving revision 1.29
diff -c -r1.29 dblink.c
*** contrib/dblink/dblink.c    28 Nov 2003 05:03:01 -0000    1.29
--- contrib/dblink/dblink.c    5 Feb 2004 19:49:00 -0000
***************
*** 135,140 ****
--- 135,150 ----
                       errmsg("%s", p2), \
                       errdetail("%s", msg))); \
      } while (0)
+ #define DBLINK_RES_ERROR_AS_NOTICE(p2) \
+     do { \
+             msg = pstrdup(PQerrorMessage(conn)); \
+             if (res) \
+                 PQclear(res); \
+             ereport(NOTICE, \
+                     (errcode(ERRCODE_SYNTAX_ERROR), \
+                      errmsg("%s", p2), \
+                      errdetail("%s", msg))); \
+     } while (0)
  #define DBLINK_CONN_NOT_AVAIL \
      do { \
          if(conname) \
***************
*** 731,739 ****
      if (!res ||
          (PQresultStatus(res) != PGRES_COMMAND_OK &&
           PQresultStatus(res) != PGRES_TUPLES_OK))
!         DBLINK_RES_ERROR("sql error");

!     if (PQresultStatus(res) == PGRES_COMMAND_OK)
      {
          /* need a tuple descriptor representing one TEXT column */
          tupdesc = CreateTemplateTupleDesc(1, false);
--- 741,762 ----
      if (!res ||
          (PQresultStatus(res) != PGRES_COMMAND_OK &&
           PQresultStatus(res) != PGRES_TUPLES_OK))
!     {
!         DBLINK_RES_ERROR_AS_NOTICE("sql error");
!
!         /* need a tuple descriptor representing one TEXT column */
!         tupdesc = CreateTemplateTupleDesc(1, false);
!         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
!                            TEXTOID, -1, 0, false);

!         /*
!          * and save a copy of the command status string to return as our
!          * result tuple
!          */
!         sql_cmd_status = GET_TEXT("ERROR");
!
!     }
!     else if (PQresultStatus(res) == PGRES_COMMAND_OK)
      {
          /* need a tuple descriptor representing one TEXT column */
          tupdesc = CreateTemplateTupleDesc(1, false);

pgsql-general by date:

Previous
From: Mike Charnoky
Date:
Subject: Re: pg_restore and large files
Next
From: Joe Conway
Date:
Subject: Re: dblink - custom datatypes don't work