Thread: Re: [HACKERS] Patching dblink.c to avoid warning about

Re: [HACKERS] Patching dblink.c to avoid warning about

From
Joe Conway
Date:
Bruce Momjian wrote:
> There was also a problem in that if two cursors were opened, the first
> close would close the transaction.  I have fixed that code by changing
> the xact variable in to a counter that keeps track of the number of
> opened cursors and commits only when they are all closed.
>
> Both the dblink.c patch and the regression patch are at:
>
>     ftp://candle.pha.pa.us/pub/postgresql/mypatches
>

OK, I'll take a look, but I won't have time for a couple of days (I'm
not at home -- visiting my dad for his 80th birthday -- and have no
broadband access).

Joe

Re: [HACKERS] Patching dblink.c to avoid warning about open

From
Bruce Momjian
Date:
Joe Conway wrote:
> Bruce Momjian wrote:
> > There was also a problem in that if two cursors were opened, the first
> > close would close the transaction.  I have fixed that code by changing
> > the xact variable in to a counter that keeps track of the number of
> > opened cursors and commits only when they are all closed.
> >
> > Both the dblink.c patch and the regression patch are at:
> >
> >     ftp://candle.pha.pa.us/pub/postgresql/mypatches
> >
>
> OK, I'll take a look, but I won't have time for a couple of days (I'm
> not at home -- visiting my dad for his 80th birthday -- and have no
> broadband access).

No problem -- thanks.  I have slimmed down the patch by applying the
cosmetic parts to CVS.  Use the URL above to get the newest versions of
the dblink.c and regression changes.

--
  Bruce Momjian                        |  http://candle.pha.pa.us
  pgman@candle.pha.pa.us               |  (610) 359-1001
  +  If your life is a hard drive,     |  13 Roberts Road
  +  Christ can be your backup.        |  Newtown Square, Pennsylvania 19073

Re: [HACKERS] Patching dblink.c to avoid warning about

From
Joe Conway
Date:
Bruce Momjian wrote:
> No problem -- thanks.  I have slimmed down the patch by applying the
> cosmetic parts to CVS.  Use the URL above to get the newest versions of
> the dblink.c and regression changes.
>

Here is my counter-proposal to Bruce's dblink patch. Any comments?

Is it too late to apply this for 8.1? I tend to agree with calling this
a bugfix.

Thanks,

Joe
Index: dblink.c
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/dblink.c,v
retrieving revision 1.47
diff -c -r1.47 dblink.c
*** dblink.c    15 Oct 2005 02:49:04 -0000    1.47
--- dblink.c    16 Oct 2005 02:04:13 -0000
***************
*** 60,68 ****

  typedef struct remoteConn
  {
!     PGconn       *conn;            /* Hold the remote connection */
!     int            autoXactCursors;/* Indicates the number of open cursors,
!                                  * non-zero means we opened the xact ourselves */
  }    remoteConn;

  /*
--- 60,68 ----

  typedef struct remoteConn
  {
!     PGconn       *conn;                /* Hold the remote connection */
!     int            openCursorCount;    /* The number of open cursors */
!     bool        newXactForCursor;    /* Opened a transaction for a cursor */
  }    remoteConn;

  /*
***************
*** 84,93 ****
  static char *generate_relation_name(Oid relid);

  /* Global */
! List       *res_id = NIL;
! int            res_id_index = 0;
! PGconn       *persistent_conn = NULL;
! static HTAB *remoteConnHash = NULL;

  /*
   *    Following is list that holds multiple remote connections.
--- 84,91 ----
  static char *generate_relation_name(Oid relid);

  /* Global */
! static remoteConn       *pconn = NULL;
! static HTAB               *remoteConnHash = NULL;

  /*
   *    Following is list that holds multiple remote connections.
***************
*** 184,189 ****
--- 182,197 ----
              } \
      } while (0)

+ #define DBLINK_INIT \
+     do { \
+             if (!pconn) \
+             { \
+                 pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); \
+                 pconn->conn = NULL; \
+                 pconn->openCursorCount = 0; \
+                 pconn->newXactForCursor = FALSE; \
+             } \
+     } while (0)

  /*
   * Create a persistent connection to another database
***************
*** 199,204 ****
--- 207,214 ----
      PGconn       *conn = NULL;
      remoteConn *rconn = NULL;

+     DBLINK_INIT;
+
      if (PG_NARGS() == 2)
      {
          connstr = GET_STR(PG_GETARG_TEXT_P(1));
***************
*** 234,240 ****
          createNewConnection(connname, rconn);
      }
      else
!         persistent_conn = conn;

      PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
--- 244,250 ----
          createNewConnection(connname, rconn);
      }
      else
!         pconn->conn = conn;

      PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
***************
*** 250,255 ****
--- 260,267 ----
      remoteConn *rconn = NULL;
      PGconn       *conn = NULL;

+     DBLINK_INIT;
+
      if (PG_NARGS() == 1)
      {
          conname = GET_STR(PG_GETARG_TEXT_P(0));
***************
*** 258,264 ****
              conn = rconn->conn;
      }
      else
!         conn = persistent_conn;

      if (!conn)
          DBLINK_CONN_NOT_AVAIL;
--- 270,276 ----
              conn = rconn->conn;
      }
      else
!         conn = pconn->conn;

      if (!conn)
          DBLINK_CONN_NOT_AVAIL;
***************
*** 270,276 ****
          pfree(rconn);
      }
      else
!         persistent_conn = NULL;

      PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
--- 282,288 ----
          pfree(rconn);
      }
      else
!         pconn->conn = NULL;

      PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
***************
*** 285,290 ****
--- 297,304 ----
      char       *msg;
      PGresult   *res = NULL;
      PGconn       *conn = NULL;
+     int           *openCursorCount = NULL;
+     bool       *newXactForCursor = NULL;
      char       *curname = NULL;
      char       *sql = NULL;
      char       *conname = NULL;
***************
*** 292,303 ****
      remoteConn *rconn = NULL;
      bool        fail = true;    /* default to backward compatible behavior */

      if (PG_NARGS() == 2)
      {
          /* text,text */
          curname = GET_STR(PG_GETARG_TEXT_P(0));
          sql = GET_STR(PG_GETARG_TEXT_P(1));
!         conn = persistent_conn;
      }
      else if (PG_NARGS() == 3)
      {
--- 306,321 ----
      remoteConn *rconn = NULL;
      bool        fail = true;    /* default to backward compatible behavior */

+     DBLINK_INIT;
+
      if (PG_NARGS() == 2)
      {
          /* text,text */
          curname = GET_STR(PG_GETARG_TEXT_P(0));
          sql = GET_STR(PG_GETARG_TEXT_P(1));
!         conn = pconn->conn;
!         openCursorCount = &pconn->openCursorCount;
!         newXactForCursor = &pconn->newXactForCursor;
      }
      else if (PG_NARGS() == 3)
      {
***************
*** 307,313 ****
              curname = GET_STR(PG_GETARG_TEXT_P(0));
              sql = GET_STR(PG_GETARG_TEXT_P(1));
              fail = PG_GETARG_BOOL(2);
!             conn = persistent_conn;
          }
          else
          {
--- 325,333 ----
              curname = GET_STR(PG_GETARG_TEXT_P(0));
              sql = GET_STR(PG_GETARG_TEXT_P(1));
              fail = PG_GETARG_BOOL(2);
!             conn = pconn->conn;
!             openCursorCount = &pconn->openCursorCount;
!             newXactForCursor = &pconn->newXactForCursor;
          }
          else
          {
***************
*** 316,322 ****
--- 336,346 ----
              sql = GET_STR(PG_GETARG_TEXT_P(2));
              rconn = getConnectionByName(conname);
              if (rconn)
+             {
                  conn = rconn->conn;
+                 openCursorCount = &rconn->openCursorCount;
+                 newXactForCursor = &rconn->newXactForCursor;
+             }
          }
      }
      else if (PG_NARGS() == 4)
***************
*** 328,344 ****
          fail = PG_GETARG_BOOL(3);
          rconn = getConnectionByName(conname);
          if (rconn)
              conn = rconn->conn;
      }

      if (!conn)
          DBLINK_CONN_NOT_AVAIL;

!     res = PQexec(conn, "BEGIN");
!     if (PQresultStatus(res) != PGRES_COMMAND_OK)
!         DBLINK_RES_INTERNALERROR("begin error");

!     PQclear(res);

      appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname, sql);
      res = PQexec(conn, str->data);
--- 352,380 ----
          fail = PG_GETARG_BOOL(3);
          rconn = getConnectionByName(conname);
          if (rconn)
+         {
              conn = rconn->conn;
+             openCursorCount = &rconn->openCursorCount;
+             newXactForCursor = &rconn->newXactForCursor;
+         }
      }

      if (!conn)
          DBLINK_CONN_NOT_AVAIL;

!     /*    If we are not in a transaction, start one */
!     if (PQtransactionStatus(conn) == PQTRANS_IDLE)
!     {
!         res = PQexec(conn, "BEGIN");
!         if (PQresultStatus(res) != PGRES_COMMAND_OK)
!             DBLINK_RES_INTERNALERROR("begin error");
!         PQclear(res);
!         *newXactForCursor = TRUE;
!     }

!     /* if we started a transaction, increment cursor count */
!     if (*newXactForCursor)
!         (*openCursorCount)++;

      appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname, sql);
      res = PQexec(conn, str->data);
***************
*** 365,370 ****
--- 401,408 ----
  dblink_close(PG_FUNCTION_ARGS)
  {
      PGconn       *conn = NULL;
+     int           *openCursorCount = NULL;
+     bool       *newXactForCursor = NULL;
      PGresult   *res = NULL;
      char       *curname = NULL;
      char       *conname = NULL;
***************
*** 373,383 ****
      remoteConn *rconn = NULL;
      bool        fail = true;    /* default to backward compatible behavior */

      if (PG_NARGS() == 1)
      {
          /* text */
          curname = GET_STR(PG_GETARG_TEXT_P(0));
!         conn = persistent_conn;
      }
      else if (PG_NARGS() == 2)
      {
--- 411,425 ----
      remoteConn *rconn = NULL;
      bool        fail = true;    /* default to backward compatible behavior */

+     DBLINK_INIT;
+
      if (PG_NARGS() == 1)
      {
          /* text */
          curname = GET_STR(PG_GETARG_TEXT_P(0));
!         conn = pconn->conn;
!         openCursorCount = &pconn->openCursorCount;
!         newXactForCursor = &pconn->newXactForCursor;
      }
      else if (PG_NARGS() == 2)
      {
***************
*** 386,392 ****
          {
              curname = GET_STR(PG_GETARG_TEXT_P(0));
              fail = PG_GETARG_BOOL(1);
!             conn = persistent_conn;
          }
          else
          {
--- 428,436 ----
          {
              curname = GET_STR(PG_GETARG_TEXT_P(0));
              fail = PG_GETARG_BOOL(1);
!             conn = pconn->conn;
!             openCursorCount = &pconn->openCursorCount;
!             newXactForCursor = &pconn->newXactForCursor;
          }
          else
          {
***************
*** 394,400 ****
--- 438,448 ----
              curname = GET_STR(PG_GETARG_TEXT_P(1));
              rconn = getConnectionByName(conname);
              if (rconn)
+             {
                  conn = rconn->conn;
+                 openCursorCount = &rconn->openCursorCount;
+                 newXactForCursor = &rconn->newXactForCursor;
+             }
          }
      }
      if (PG_NARGS() == 3)
***************
*** 405,411 ****
--- 453,463 ----
          fail = PG_GETARG_BOOL(2);
          rconn = getConnectionByName(conname);
          if (rconn)
+         {
              conn = rconn->conn;
+             openCursorCount = &rconn->openCursorCount;
+             newXactForCursor = &rconn->newXactForCursor;
+         }
      }

      if (!conn)
***************
*** 428,439 ****

      PQclear(res);

!     /* commit the transaction */
!     res = PQexec(conn, "COMMIT");
!     if (PQresultStatus(res) != PGRES_COMMAND_OK)
!         DBLINK_RES_INTERNALERROR("commit error");

!     PQclear(res);

      PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
--- 480,501 ----

      PQclear(res);

!     /* if we started a transaction, decrement cursor count */
!     if (*newXactForCursor)
!     {
!         (*openCursorCount)--;

!         /* if count is zero, commit the transaction */
!         if (*openCursorCount == 0)
!         {
!             *newXactForCursor = FALSE;
!
!             res = PQexec(conn, "COMMIT");
!             if (PQresultStatus(res) != PGRES_COMMAND_OK)
!                 DBLINK_RES_INTERNALERROR("commit error");
!             PQclear(res);
!         }
!     }

      PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
***************
*** 456,461 ****
--- 518,525 ----
      char       *conname = NULL;
      remoteConn *rconn = NULL;

+     DBLINK_INIT;
+
      /* stuff done only on the first call of the function */
      if (SRF_IS_FIRSTCALL())
      {
***************
*** 485,491 ****
                  curname = GET_STR(PG_GETARG_TEXT_P(0));
                  howmany = PG_GETARG_INT32(1);
                  fail = PG_GETARG_BOOL(2);
!                 conn = persistent_conn;
              }
              else
              {
--- 549,555 ----
                  curname = GET_STR(PG_GETARG_TEXT_P(0));
                  howmany = PG_GETARG_INT32(1);
                  fail = PG_GETARG_BOOL(2);
!                 conn = pconn->conn;
              }
              else
              {
***************
*** 503,509 ****
              /* text,int */
              curname = GET_STR(PG_GETARG_TEXT_P(0));
              howmany = PG_GETARG_INT32(1);
!             conn = persistent_conn;
          }

          if (!conn)
--- 567,573 ----
              /* text,int */
              curname = GET_STR(PG_GETARG_TEXT_P(0));
              howmany = PG_GETARG_INT32(1);
!             conn = pconn->conn;
          }

          if (!conn)
***************
*** 648,653 ****
--- 712,719 ----
      MemoryContext oldcontext;
      bool        freeconn = false;

+     DBLINK_INIT;
+
      /* stuff done only on the first call of the function */
      if (SRF_IS_FIRSTCALL())
      {
***************
*** 678,684 ****
              /* text,text or text,bool */
              if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
              {
!                 conn = persistent_conn;
                  sql = GET_STR(PG_GETARG_TEXT_P(0));
                  fail = PG_GETARG_BOOL(1);
              }
--- 744,750 ----
              /* text,text or text,bool */
              if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
              {
!                 conn = pconn->conn;
                  sql = GET_STR(PG_GETARG_TEXT_P(0));
                  fail = PG_GETARG_BOOL(1);
              }
***************
*** 691,697 ****
          else if (PG_NARGS() == 1)
          {
              /* text */
!             conn = persistent_conn;
              sql = GET_STR(PG_GETARG_TEXT_P(0));
          }
          else
--- 757,763 ----
          else if (PG_NARGS() == 1)
          {
              /* text */
!             conn = pconn->conn;
              sql = GET_STR(PG_GETARG_TEXT_P(0));
          }
          else
***************
*** 857,862 ****
--- 923,930 ----
      bool        freeconn = false;
      bool        fail = true;    /* default to backward compatible behavior */

+     DBLINK_INIT;
+
      if (PG_NARGS() == 3)
      {
          /* must be text,text,bool */
***************
*** 869,875 ****
          /* might be text,text or text,bool */
          if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
          {
!             conn = persistent_conn;
              sql = GET_STR(PG_GETARG_TEXT_P(0));
              fail = PG_GETARG_BOOL(1);
          }
--- 937,943 ----
          /* might be text,text or text,bool */
          if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
          {
!             conn = pconn->conn;
              sql = GET_STR(PG_GETARG_TEXT_P(0));
              fail = PG_GETARG_BOOL(1);
          }
***************
*** 882,888 ****
      else if (PG_NARGS() == 1)
      {
          /* must be single text argument */
!         conn = persistent_conn;
          sql = GET_STR(PG_GETARG_TEXT_P(0));
      }
      else
--- 950,956 ----
      else if (PG_NARGS() == 1)
      {
          /* must be single text argument */
!         conn = pconn->conn;
          sql = GET_STR(PG_GETARG_TEXT_P(0));
      }
      else
Index: expected/dblink.out
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/expected/dblink.out,v
retrieving revision 1.15
diff -c -r1.15 dblink.out
*** expected/dblink.out    8 Oct 2005 16:10:38 -0000    1.15
--- expected/dblink.out    16 Oct 2005 02:05:28 -0000
***************
*** 436,441 ****
--- 436,523 ----
   ROLLBACK
  (1 row)

+ -- test opening cursor in a transaction
+ SELECT dblink_exec('myconn','BEGIN');
+  dblink_exec
+ -------------
+  BEGIN
+ (1 row)
+
+ -- an open transaction will prevent dblink_open() from opening its own
+ SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+  dblink_open
+ -------------
+  OK
+ (1 row)
+
+ -- this should not commit the transaction because the client opened it
+ SELECT dblink_close('myconn','rmt_foo_cursor');
+  dblink_close
+ --------------
+  OK
+ (1 row)
+
+ -- this should succeed because we have an open transaction
+ SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+   dblink_exec
+ ----------------
+  DECLARE CURSOR
+ (1 row)
+
+ -- commit remote transaction
+ SELECT dblink_exec('myconn','COMMIT');
+  dblink_exec
+ -------------
+  COMMIT
+ (1 row)
+
+ -- test automatic transactions for multiple cursor opens
+ SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+  dblink_open
+ -------------
+  OK
+ (1 row)
+
+ -- the second cursor
+ SELECT dblink_open('myconn','rmt_foo_cursor2','SELECT * FROM foo');
+  dblink_open
+ -------------
+  OK
+ (1 row)
+
+ -- this should not commit the transaction
+ SELECT dblink_close('myconn','rmt_foo_cursor2');
+  dblink_close
+ --------------
+  OK
+ (1 row)
+
+ -- this should succeed because we have an open transaction
+ SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+   dblink_exec
+ ----------------
+  DECLARE CURSOR
+ (1 row)
+
+ -- this should commit the transaction
+ SELECT dblink_close('myconn','rmt_foo_cursor');
+  dblink_close
+ --------------
+  OK
+ (1 row)
+
+ -- this should fail because there is no open transaction
+ SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+ ERROR:  sql error
+ DETAIL:  ERROR:  cursor "xact_test" already exists
+
+ -- reset remote transaction state
+ SELECT dblink_exec('myconn','ABORT');
+  dblink_exec
+ -------------
+  ROLLBACK
+ (1 row)
+
  -- open a cursor
  SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
   dblink_open
Index: sql/dblink.sql
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/sql/dblink.sql,v
retrieving revision 1.14
diff -c -r1.14 dblink.sql
*** sql/dblink.sql    8 Oct 2005 16:10:38 -0000    1.14
--- sql/dblink.sql    16 Oct 2005 01:59:11 -0000
***************
*** 217,222 ****
--- 217,258 ----
  -- reset remote transaction state
  SELECT dblink_exec('myconn','ABORT');

+ -- test opening cursor in a transaction
+ SELECT dblink_exec('myconn','BEGIN');
+
+ -- an open transaction will prevent dblink_open() from opening its own
+ SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+
+ -- this should not commit the transaction because the client opened it
+ SELECT dblink_close('myconn','rmt_foo_cursor');
+
+ -- this should succeed because we have an open transaction
+ SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+
+ -- commit remote transaction
+ SELECT dblink_exec('myconn','COMMIT');
+
+ -- test automatic transactions for multiple cursor opens
+ SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+
+ -- the second cursor
+ SELECT dblink_open('myconn','rmt_foo_cursor2','SELECT * FROM foo');
+
+ -- this should not commit the transaction
+ SELECT dblink_close('myconn','rmt_foo_cursor2');
+
+ -- this should succeed because we have an open transaction
+ SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+
+ -- this should commit the transaction
+ SELECT dblink_close('myconn','rmt_foo_cursor');
+
+ -- this should fail because there is no open transaction
+ SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+
+ -- reset remote transaction state
+ SELECT dblink_exec('myconn','ABORT');
+
  -- open a cursor
  SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');


Re: [HACKERS] Patching dblink.c to avoid warning about open transaction

From
Tom Lane
Date:
Joe Conway <mail@joeconway.com> writes:
> Here is my counter-proposal to Bruce's dblink patch. Any comments?

Minor coding suggestion: to me it seems messy to do

> +     int       *openCursorCount = NULL;
> +     bool       *newXactForCursor = NULL;

> !         openCursorCount = &pconn->openCursorCount;
> !         newXactForCursor = &pconn->newXactForCursor;

> !         *newXactForCursor = TRUE;

This looks a bit cluttered already, and would get more so if you need to
add more fields to a remoteConn.  Plus it confuses the reader (at least
this reader) who is left wondering if you intend that those variables
might sometimes point to something other than two fields of the same
remoteConn.  I think it would be shorter and clearer to write

    remoteConn  *remconn = NULL;
    ...
    remconn = rconn;
    ...
    remconn->newXactForCursor = TRUE;

Also, you might be able to combine this variable with the existing
rconn local variable and thus simplify the code even more.

> Is it too late to apply this for 8.1? I tend to agree with calling this
> a bugfix.

I think it's reasonable to fix now, yes.

            regards, tom lane

Re: [HACKERS] Patching dblink.c to avoid warning about

From
Joe Conway
Date:
Tom Lane wrote:
> I think it would be shorter and clearer to write
>
>     remoteConn  *remconn = NULL;
>     ...
>     remconn = rconn;
>     ...
>     remconn->newXactForCursor = TRUE;
>
> Also, you might be able to combine this variable with the existing
> rconn local variable and thus simplify the code even more.

Thanks for the review Tom -- as usual, great suggestions. The attached
(simpler) patch makes use of your advice. If there are no objections,
I'll apply this tomorrow evening.

Joe
Index: dblink.c
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/dblink.c,v
retrieving revision 1.47
diff -c -r1.47 dblink.c
*** dblink.c    15 Oct 2005 02:49:04 -0000    1.47
--- dblink.c    17 Oct 2005 02:11:59 -0000
***************
*** 60,68 ****

  typedef struct remoteConn
  {
!     PGconn       *conn;            /* Hold the remote connection */
!     int            autoXactCursors;/* Indicates the number of open cursors,
!                                  * non-zero means we opened the xact ourselves */
  }    remoteConn;

  /*
--- 60,68 ----

  typedef struct remoteConn
  {
!     PGconn       *conn;                /* Hold the remote connection */
!     int            openCursorCount;    /* The number of open cursors */
!     bool        newXactForCursor;    /* Opened a transaction for a cursor */
  }    remoteConn;

  /*
***************
*** 84,93 ****
  static char *generate_relation_name(Oid relid);

  /* Global */
! List       *res_id = NIL;
! int            res_id_index = 0;
! PGconn       *persistent_conn = NULL;
! static HTAB *remoteConnHash = NULL;

  /*
   *    Following is list that holds multiple remote connections.
--- 84,91 ----
  static char *generate_relation_name(Oid relid);

  /* Global */
! static remoteConn       *pconn = NULL;
! static HTAB               *remoteConnHash = NULL;

  /*
   *    Following is list that holds multiple remote connections.
***************
*** 184,189 ****
--- 182,197 ----
              } \
      } while (0)

+ #define DBLINK_INIT \
+     do { \
+             if (!pconn) \
+             { \
+                 pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); \
+                 pconn->conn = NULL; \
+                 pconn->openCursorCount = 0; \
+                 pconn->newXactForCursor = FALSE; \
+             } \
+     } while (0)

  /*
   * Create a persistent connection to another database
***************
*** 199,204 ****
--- 207,214 ----
      PGconn       *conn = NULL;
      remoteConn *rconn = NULL;

+     DBLINK_INIT;
+
      if (PG_NARGS() == 2)
      {
          connstr = GET_STR(PG_GETARG_TEXT_P(1));
***************
*** 234,240 ****
          createNewConnection(connname, rconn);
      }
      else
!         persistent_conn = conn;

      PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
--- 244,250 ----
          createNewConnection(connname, rconn);
      }
      else
!         pconn->conn = conn;

      PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
***************
*** 250,255 ****
--- 260,267 ----
      remoteConn *rconn = NULL;
      PGconn       *conn = NULL;

+     DBLINK_INIT;
+
      if (PG_NARGS() == 1)
      {
          conname = GET_STR(PG_GETARG_TEXT_P(0));
***************
*** 258,264 ****
              conn = rconn->conn;
      }
      else
!         conn = persistent_conn;

      if (!conn)
          DBLINK_CONN_NOT_AVAIL;
--- 270,276 ----
              conn = rconn->conn;
      }
      else
!         conn = pconn->conn;

      if (!conn)
          DBLINK_CONN_NOT_AVAIL;
***************
*** 270,276 ****
          pfree(rconn);
      }
      else
!         persistent_conn = NULL;

      PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
--- 282,288 ----
          pfree(rconn);
      }
      else
!         pconn->conn = NULL;

      PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
***************
*** 292,303 ****
      remoteConn *rconn = NULL;
      bool        fail = true;    /* default to backward compatible behavior */

      if (PG_NARGS() == 2)
      {
          /* text,text */
          curname = GET_STR(PG_GETARG_TEXT_P(0));
          sql = GET_STR(PG_GETARG_TEXT_P(1));
!         conn = persistent_conn;
      }
      else if (PG_NARGS() == 3)
      {
--- 304,317 ----
      remoteConn *rconn = NULL;
      bool        fail = true;    /* default to backward compatible behavior */

+     DBLINK_INIT;
+
      if (PG_NARGS() == 2)
      {
          /* text,text */
          curname = GET_STR(PG_GETARG_TEXT_P(0));
          sql = GET_STR(PG_GETARG_TEXT_P(1));
!         rconn = pconn;
      }
      else if (PG_NARGS() == 3)
      {
***************
*** 307,313 ****
              curname = GET_STR(PG_GETARG_TEXT_P(0));
              sql = GET_STR(PG_GETARG_TEXT_P(1));
              fail = PG_GETARG_BOOL(2);
!             conn = persistent_conn;
          }
          else
          {
--- 321,327 ----
              curname = GET_STR(PG_GETARG_TEXT_P(0));
              sql = GET_STR(PG_GETARG_TEXT_P(1));
              fail = PG_GETARG_BOOL(2);
!             rconn = pconn;
          }
          else
          {
***************
*** 315,322 ****
              curname = GET_STR(PG_GETARG_TEXT_P(1));
              sql = GET_STR(PG_GETARG_TEXT_P(2));
              rconn = getConnectionByName(conname);
-             if (rconn)
-                 conn = rconn->conn;
          }
      }
      else if (PG_NARGS() == 4)
--- 329,334 ----
***************
*** 327,344 ****
          sql = GET_STR(PG_GETARG_TEXT_P(2));
          fail = PG_GETARG_BOOL(3);
          rconn = getConnectionByName(conname);
-         if (rconn)
-             conn = rconn->conn;
      }

!     if (!conn)
          DBLINK_CONN_NOT_AVAIL;

!     res = PQexec(conn, "BEGIN");
!     if (PQresultStatus(res) != PGRES_COMMAND_OK)
!         DBLINK_RES_INTERNALERROR("begin error");

!     PQclear(res);

      appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname, sql);
      res = PQexec(conn, str->data);
--- 339,364 ----
          sql = GET_STR(PG_GETARG_TEXT_P(2));
          fail = PG_GETARG_BOOL(3);
          rconn = getConnectionByName(conname);
      }

!     if (!rconn || !rconn->conn)
          DBLINK_CONN_NOT_AVAIL;
+     else
+         conn = rconn->conn;

!     /*    If we are not in a transaction, start one */
!     if (PQtransactionStatus(conn) == PQTRANS_IDLE)
!     {
!         res = PQexec(conn, "BEGIN");
!         if (PQresultStatus(res) != PGRES_COMMAND_OK)
!             DBLINK_RES_INTERNALERROR("begin error");
!         PQclear(res);
!         rconn->newXactForCursor = TRUE;
!     }

!     /* if we started a transaction, increment cursor count */
!     if (rconn->newXactForCursor)
!         (rconn->openCursorCount)++;

      appendStringInfo(str, "DECLARE %s CURSOR FOR %s", curname, sql);
      res = PQexec(conn, str->data);
***************
*** 373,383 ****
      remoteConn *rconn = NULL;
      bool        fail = true;    /* default to backward compatible behavior */

      if (PG_NARGS() == 1)
      {
          /* text */
          curname = GET_STR(PG_GETARG_TEXT_P(0));
!         conn = persistent_conn;
      }
      else if (PG_NARGS() == 2)
      {
--- 393,405 ----
      remoteConn *rconn = NULL;
      bool        fail = true;    /* default to backward compatible behavior */

+     DBLINK_INIT;
+
      if (PG_NARGS() == 1)
      {
          /* text */
          curname = GET_STR(PG_GETARG_TEXT_P(0));
!         rconn = pconn;
      }
      else if (PG_NARGS() == 2)
      {
***************
*** 386,400 ****
          {
              curname = GET_STR(PG_GETARG_TEXT_P(0));
              fail = PG_GETARG_BOOL(1);
!             conn = persistent_conn;
          }
          else
          {
              conname = GET_STR(PG_GETARG_TEXT_P(0));
              curname = GET_STR(PG_GETARG_TEXT_P(1));
              rconn = getConnectionByName(conname);
-             if (rconn)
-                 conn = rconn->conn;
          }
      }
      if (PG_NARGS() == 3)
--- 408,420 ----
          {
              curname = GET_STR(PG_GETARG_TEXT_P(0));
              fail = PG_GETARG_BOOL(1);
!             rconn = pconn;
          }
          else
          {
              conname = GET_STR(PG_GETARG_TEXT_P(0));
              curname = GET_STR(PG_GETARG_TEXT_P(1));
              rconn = getConnectionByName(conname);
          }
      }
      if (PG_NARGS() == 3)
***************
*** 404,415 ****
          curname = GET_STR(PG_GETARG_TEXT_P(1));
          fail = PG_GETARG_BOOL(2);
          rconn = getConnectionByName(conname);
-         if (rconn)
-             conn = rconn->conn;
      }

!     if (!conn)
          DBLINK_CONN_NOT_AVAIL;

      appendStringInfo(str, "CLOSE %s", curname);

--- 424,435 ----
          curname = GET_STR(PG_GETARG_TEXT_P(1));
          fail = PG_GETARG_BOOL(2);
          rconn = getConnectionByName(conname);
      }

!     if (!rconn || !rconn->conn)
          DBLINK_CONN_NOT_AVAIL;
+     else
+         conn = rconn->conn;

      appendStringInfo(str, "CLOSE %s", curname);

***************
*** 428,439 ****

      PQclear(res);

!     /* commit the transaction */
!     res = PQexec(conn, "COMMIT");
!     if (PQresultStatus(res) != PGRES_COMMAND_OK)
!         DBLINK_RES_INTERNALERROR("commit error");

!     PQclear(res);

      PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
--- 448,469 ----

      PQclear(res);

!     /* if we started a transaction, decrement cursor count */
!     if (rconn->newXactForCursor)
!     {
!         (rconn->openCursorCount)--;

!         /* if count is zero, commit the transaction */
!         if (rconn->openCursorCount == 0)
!         {
!             rconn->newXactForCursor = FALSE;
!
!             res = PQexec(conn, "COMMIT");
!             if (PQresultStatus(res) != PGRES_COMMAND_OK)
!                 DBLINK_RES_INTERNALERROR("commit error");
!             PQclear(res);
!         }
!     }

      PG_RETURN_TEXT_P(GET_TEXT("OK"));
  }
***************
*** 456,461 ****
--- 486,493 ----
      char       *conname = NULL;
      remoteConn *rconn = NULL;

+     DBLINK_INIT;
+
      /* stuff done only on the first call of the function */
      if (SRF_IS_FIRSTCALL())
      {
***************
*** 485,491 ****
                  curname = GET_STR(PG_GETARG_TEXT_P(0));
                  howmany = PG_GETARG_INT32(1);
                  fail = PG_GETARG_BOOL(2);
!                 conn = persistent_conn;
              }
              else
              {
--- 517,523 ----
                  curname = GET_STR(PG_GETARG_TEXT_P(0));
                  howmany = PG_GETARG_INT32(1);
                  fail = PG_GETARG_BOOL(2);
!                 conn = pconn->conn;
              }
              else
              {
***************
*** 503,509 ****
              /* text,int */
              curname = GET_STR(PG_GETARG_TEXT_P(0));
              howmany = PG_GETARG_INT32(1);
!             conn = persistent_conn;
          }

          if (!conn)
--- 535,541 ----
              /* text,int */
              curname = GET_STR(PG_GETARG_TEXT_P(0));
              howmany = PG_GETARG_INT32(1);
!             conn = pconn->conn;
          }

          if (!conn)
***************
*** 648,653 ****
--- 680,687 ----
      MemoryContext oldcontext;
      bool        freeconn = false;

+     DBLINK_INIT;
+
      /* stuff done only on the first call of the function */
      if (SRF_IS_FIRSTCALL())
      {
***************
*** 678,684 ****
              /* text,text or text,bool */
              if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
              {
!                 conn = persistent_conn;
                  sql = GET_STR(PG_GETARG_TEXT_P(0));
                  fail = PG_GETARG_BOOL(1);
              }
--- 712,718 ----
              /* text,text or text,bool */
              if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
              {
!                 conn = pconn->conn;
                  sql = GET_STR(PG_GETARG_TEXT_P(0));
                  fail = PG_GETARG_BOOL(1);
              }
***************
*** 691,697 ****
          else if (PG_NARGS() == 1)
          {
              /* text */
!             conn = persistent_conn;
              sql = GET_STR(PG_GETARG_TEXT_P(0));
          }
          else
--- 725,731 ----
          else if (PG_NARGS() == 1)
          {
              /* text */
!             conn = pconn->conn;
              sql = GET_STR(PG_GETARG_TEXT_P(0));
          }
          else
***************
*** 857,862 ****
--- 891,898 ----
      bool        freeconn = false;
      bool        fail = true;    /* default to backward compatible behavior */

+     DBLINK_INIT;
+
      if (PG_NARGS() == 3)
      {
          /* must be text,text,bool */
***************
*** 869,875 ****
          /* might be text,text or text,bool */
          if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
          {
!             conn = persistent_conn;
              sql = GET_STR(PG_GETARG_TEXT_P(0));
              fail = PG_GETARG_BOOL(1);
          }
--- 905,911 ----
          /* might be text,text or text,bool */
          if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
          {
!             conn = pconn->conn;
              sql = GET_STR(PG_GETARG_TEXT_P(0));
              fail = PG_GETARG_BOOL(1);
          }
***************
*** 882,888 ****
      else if (PG_NARGS() == 1)
      {
          /* must be single text argument */
!         conn = persistent_conn;
          sql = GET_STR(PG_GETARG_TEXT_P(0));
      }
      else
--- 918,924 ----
      else if (PG_NARGS() == 1)
      {
          /* must be single text argument */
!         conn = pconn->conn;
          sql = GET_STR(PG_GETARG_TEXT_P(0));
      }
      else
Index: expected/dblink.out
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/expected/dblink.out,v
retrieving revision 1.15
diff -c -r1.15 dblink.out
*** expected/dblink.out    8 Oct 2005 16:10:38 -0000    1.15
--- expected/dblink.out    17 Oct 2005 02:04:09 -0000
***************
*** 436,441 ****
--- 436,523 ----
   ROLLBACK
  (1 row)

+ -- test opening cursor in a transaction
+ SELECT dblink_exec('myconn','BEGIN');
+  dblink_exec
+ -------------
+  BEGIN
+ (1 row)
+
+ -- an open transaction will prevent dblink_open() from opening its own
+ SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+  dblink_open
+ -------------
+  OK
+ (1 row)
+
+ -- this should not commit the transaction because the client opened it
+ SELECT dblink_close('myconn','rmt_foo_cursor');
+  dblink_close
+ --------------
+  OK
+ (1 row)
+
+ -- this should succeed because we have an open transaction
+ SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+   dblink_exec
+ ----------------
+  DECLARE CURSOR
+ (1 row)
+
+ -- commit remote transaction
+ SELECT dblink_exec('myconn','COMMIT');
+  dblink_exec
+ -------------
+  COMMIT
+ (1 row)
+
+ -- test automatic transactions for multiple cursor opens
+ SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+  dblink_open
+ -------------
+  OK
+ (1 row)
+
+ -- the second cursor
+ SELECT dblink_open('myconn','rmt_foo_cursor2','SELECT * FROM foo');
+  dblink_open
+ -------------
+  OK
+ (1 row)
+
+ -- this should not commit the transaction
+ SELECT dblink_close('myconn','rmt_foo_cursor2');
+  dblink_close
+ --------------
+  OK
+ (1 row)
+
+ -- this should succeed because we have an open transaction
+ SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+   dblink_exec
+ ----------------
+  DECLARE CURSOR
+ (1 row)
+
+ -- this should commit the transaction
+ SELECT dblink_close('myconn','rmt_foo_cursor');
+  dblink_close
+ --------------
+  OK
+ (1 row)
+
+ -- this should fail because there is no open transaction
+ SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+ ERROR:  sql error
+ DETAIL:  ERROR:  cursor "xact_test" already exists
+
+ -- reset remote transaction state
+ SELECT dblink_exec('myconn','ABORT');
+  dblink_exec
+ -------------
+  ROLLBACK
+ (1 row)
+
  -- open a cursor
  SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
   dblink_open
Index: sql/dblink.sql
===================================================================
RCS file: /opt/src/cvs/pgsql/contrib/dblink/sql/dblink.sql,v
retrieving revision 1.14
diff -c -r1.14 dblink.sql
*** sql/dblink.sql    8 Oct 2005 16:10:38 -0000    1.14
--- sql/dblink.sql    17 Oct 2005 02:04:09 -0000
***************
*** 217,222 ****
--- 217,258 ----
  -- reset remote transaction state
  SELECT dblink_exec('myconn','ABORT');

+ -- test opening cursor in a transaction
+ SELECT dblink_exec('myconn','BEGIN');
+
+ -- an open transaction will prevent dblink_open() from opening its own
+ SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+
+ -- this should not commit the transaction because the client opened it
+ SELECT dblink_close('myconn','rmt_foo_cursor');
+
+ -- this should succeed because we have an open transaction
+ SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+
+ -- commit remote transaction
+ SELECT dblink_exec('myconn','COMMIT');
+
+ -- test automatic transactions for multiple cursor opens
+ SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');
+
+ -- the second cursor
+ SELECT dblink_open('myconn','rmt_foo_cursor2','SELECT * FROM foo');
+
+ -- this should not commit the transaction
+ SELECT dblink_close('myconn','rmt_foo_cursor2');
+
+ -- this should succeed because we have an open transaction
+ SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+
+ -- this should commit the transaction
+ SELECT dblink_close('myconn','rmt_foo_cursor');
+
+ -- this should fail because there is no open transaction
+ SELECT dblink_exec('myconn','DECLARE xact_test CURSOR FOR SELECT * FROM foo');
+
+ -- reset remote transaction state
+ SELECT dblink_exec('myconn','ABORT');
+
  -- open a cursor
  SELECT dblink_open('myconn','rmt_foo_cursor','SELECT * FROM foo');


Re: [HACKERS] Patching dblink.c to avoid warning about

From
Bruce Momjian
Date:
Joe Conway wrote:
> Tom Lane wrote:
> > I think it would be shorter and clearer to write
> >
> >     remoteConn  *remconn = NULL;
> >     ...
> >     remconn = rconn;
> >     ...
> >     remconn->newXactForCursor = TRUE;
> >
> > Also, you might be able to combine this variable with the existing
> > rconn local variable and thus simplify the code even more.
>
> Thanks for the review Tom -- as usual, great suggestions. The attached
> (simpler) patch makes use of your advice. If there are no objections,
> I'll apply this tomorrow evening.

Looks good.  Thanks.

--
  Bruce Momjian                        |  http://candle.pha.pa.us
  pgman@candle.pha.pa.us               |  (610) 359-1001
  +  If your life is a hard drive,     |  13 Roberts Road
  +  Christ can be your backup.        |  Newtown Square, Pennsylvania 19073

Re: [HACKERS] Patching dblink.c to avoid warning about

From
Joe Conway
Date:
Bruce Momjian wrote:
> Joe Conway wrote:
>>Thanks for the review Tom -- as usual, great suggestions. The attached
>>(simpler) patch makes use of your advice. If there are no objections,
>>I'll apply this tomorrow evening.
>
> Looks good.  Thanks.
>

Committed.

Joe