Thread: contrib/dblink update

contrib/dblink update

From
Joe Conway
Date:
Attached is an update to contrib/dblink. Please apply if there are no
objections.

Major changes:
   - removed cursor wrap around input sql to allow for remote
     execution of INSERT/UPDATE/DELETE
   - dblink now returns a resource id instead of a real pointer
   - added several utility functions

I'm still hoping to add explicit cursor open/fetch/close support before
7.3 is released, but I need a bit more time on that.

On a somewhat unrelated topic, I never got any feedback on the
unknownin/out patch and the mb_substring patch. Is there anything else I
need to do to get those applied?

Thanks,

Joe
diff -cNr dblink.orig/README.dblink dblink/README.dblink
*** dblink.orig/README.dblink    Thu Dec 13 02:48:39 2001
--- dblink/README.dblink    Sun Apr 14 20:02:06 2002
***************
*** 3,9 ****
   *
   * Functions returning results from a remote database
   *
!  * Copyright (c) Joseph Conway <joe.conway@mail.com>, 2001;
   *
   * Permission to use, copy, modify, and distribute this software and its
   * documentation for any purpose, without fee, and without a written agreement
--- 3,10 ----
   *
   * Functions returning results from a remote database
   *
!  * Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002,
!  * ALL RIGHTS RESERVED;
   *
   * Permission to use, copy, modify, and distribute this software and its
   * documentation for any purpose, without fee, and without a written agreement
***************
*** 25,36 ****
   */


! Version 0.3 (14 June, 2001):
!   Function to test returning data set from remote database
!   Tested under Linux (Red Hat 6.2 and 7.0) and PostgreSQL 7.1 and 7.2devel

  Release Notes:

    Version 0.3
      - fixed dblink invalid pointer causing corrupt elog message
      - fixed dblink_tok improper handling of null results
--- 26,44 ----
   */


! Version 0.4 (7 April, 2002):
!   Functions allowing remote database INSERT/UPDATE/DELETE/SELECT, and
!   various utility functions.
!   Tested under Linux (Red Hat 7.2) and PostgreSQL 7.2 and 7.3devel

  Release Notes:

+   Version 0.4
+     - removed cursor wrap around input sql to allow for remote
+       execution of INSERT/UPDATE/DELETE
+     - dblink now returns a resource id instead of a real pointer
+     - added several utility functions -- see below
+
    Version 0.3
      - fixed dblink invalid pointer causing corrupt elog message
      - fixed dblink_tok improper handling of null results
***************
*** 51,64 ****

    installs following functions into database template1:

!      dblink() - returns a pointer to results from remote query
!      dblink_tok() - extracts and returns individual field results

  Documentation
  ==================================================================
  Name

! dblink -- Returns a pointer to a data set from a remote database

  Synopsis

--- 59,94 ----

    installs following functions into database template1:

!      dblink(text,text) RETURNS setof int
!        - returns a resource id for results from remote query
!      dblink_tok(int,int) RETURNS text
!        - extracts and returns individual field results
!      dblink_strtok(text,text,int) RETURNS text
!        - extracts and returns individual token from delimited text
!      dblink_get_pkey(name) RETURNS setof text
!        - returns the field names of a relation's primary key fields
!      dblink_last_oid(int) RETURNS oid
!        - returns the last inserted oid
!      dblink_build_sql_insert(name,int2vector,int2,_text,_text) RETURNS text
!        - builds an insert statement using a local tuple, replacing the
!          selection key field values with alternate supplied values
!      dblink_build_sql_delete(name,int2vector,int2,_text) RETURNS text
!        - builds a delete statement using supplied values for selection
!          key field values
!      dblink_build_sql_update(name,int2vector,int2,_text,_text) RETURNS text
!        - builds an update statement using a local tuple, replacing the
!          selection key field values with alternate supplied values
!      dblink_current_query() RETURNS text
!        - returns the current query string
!      dblink_replace(text,text,text) RETURNS text
!        - replace all occurences of substring-a in the input-string
!          with substring-b

  Documentation
  ==================================================================
  Name

! dblink -- Returns a resource id for a data set from a remote database

  Synopsis

***************
*** 78,84 ****

  Outputs

!   Returns setof int (pointer)

  Example usage

--- 108,114 ----

  Outputs

!   Returns setof int (res_id)

  Example usage

***************
*** 94,106 ****

  Synopsis

! dblink_tok(int pointer, int fnumber)

  Inputs

!   pointer

!     a pointer returned by a call to dblink()

    fnumber

--- 124,136 ----

  Synopsis

! dblink_tok(int res_id, int fnumber)

  Inputs

!   res_id

!     a resource id returned by a call to dblink()

    fnumber

***************
*** 131,136 ****
--- 161,415 ----
     select f1, f2 from myremotetable where f1 like 'bytea%';

  ==================================================================
+ Name
+
+ dblink_strtok -- Extracts and returns individual token from delimited text
+
+ Synopsis
+
+ dblink_strtok(text inputstring, text delimiter, int posn) RETURNS text
+
+ Inputs
+
+   inputstring
+
+     any string you want to parse a token out of;
+     e.g. 'f=1&g=3&h=4'
+
+   delimiter
+
+     a single character to use as the delimiter;
+     e.g. '&' or '='
+
+   posn
+
+     the position of the token of interest, 0 based;
+     e.g. 1
+
+ Outputs
+
+   Returns text
+
+ Example usage
+
+ test=# select dblink_strtok(dblink_strtok('f=1&g=3&h=4','&',1),'=',1);
+  dblink_strtok
+ ---------------
+  3
+ (1 row)
+
+ ==================================================================
+ Name
+
+ dblink_get_pkey -- returns the field names of a relation's primary
+                    key fields
+
+ Synopsis
+
+ dblink_get_pkey(name relname) RETURNS setof text
+
+ Inputs
+
+   relname
+
+     any relation name;
+     e.g. 'foobar'
+
+ Outputs
+
+   Returns setof text -- one row for each primary key field, in order of
+                         precedence
+
+ Example usage
+
+ test=# select dblink_get_pkey('foobar');
+  dblink_get_pkey
+ -----------------
+  f1
+  f2
+  f3
+  f4
+  f5
+ (5 rows)
+
+
+ ==================================================================
+ Name
+
+ dblink_last_oid -- Returns last inserted oid
+
+ Synopsis
+
+ dblink_last_oid(int res_id) RETURNS oid
+
+ Inputs
+
+   res_id
+
+     any resource id returned by dblink function;
+
+ Outputs
+
+   Returns oid of last inserted tuple
+
+ Example usage
+
+ test=# select dblink_last_oid(dblink('hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd'
+                ,'insert into mytable (f1, f2) values (1,2)'));
+
+  dblink_last_oid
+ ----------------
+  16553
+ (1 row)
+
+
+ ==================================================================
+ Name
+
+ dblink_build_sql_insert -- builds an insert statement using a local
+                            tuple, replacing the selection key field
+                            values with alternate supplied values
+ dblink_build_sql_delete -- builds a delete statement using supplied
+                            values for selection key field values
+ dblink_build_sql_update -- builds an update statement using a local
+                            tuple, replacing the selection key field
+                            values with alternate supplied values
+
+
+ Synopsis
+
+ dblink_build_sql_insert(name relname
+                          ,int2vector primary_key_attnums
+                          ,int2 num_primary_key_atts
+                          ,_text src_pk_att_vals_array
+                          ,_text tgt_pk_att_vals_array) RETURNS text
+ dblink_build_sql_delete(name relname
+                          ,int2vector primary_key_attnums
+                          ,int2 num_primary_key_atts
+                          ,_text tgt_pk_att_vals_array) RETURNS text
+ dblink_build_sql_update(name relname
+                          ,int2vector primary_key_attnums
+                          ,int2 num_primary_key_atts
+                          ,_text src_pk_att_vals_array
+                          ,_text tgt_pk_att_vals_array) RETURNS text
+
+ Inputs
+
+   relname
+
+     any relation name;
+     e.g. 'foobar'
+
+   primary_key_attnums
+
+     vector of primary key attnums (1 based, see pg_index.indkey);
+     e.g. '1 2'
+
+   num_primary_key_atts
+
+     number of primary key attnums in the vector; e.g. 2
+
+   src_pk_att_vals_array
+
+     array of primary key values, used to look up the local matching
+     tuple, the values of which are then used to construct the SQL
+     statement
+
+   tgt_pk_att_vals_array
+
+     array of primary key values, used to replace the local tuple
+     values in the SQL statement
+
+ Outputs
+
+   Returns text -- requested SQL statement
+
+ Example usage
+
+ test=# select dblink_build_sql_insert('foo','1 2',2,'{"1", "a"}','{"1", "b''a"}');
+              dblink_build_sql_insert
+ --------------------------------------------------
+  INSERT INTO foo(f1,f2,f3) VALUES('1','b''a','1')
+ (1 row)
+
+ test=# select dblink_build_sql_delete('MyFoo','1 2',2,'{"1", "b"}');
+            dblink_build_sql_delete
+ ---------------------------------------------
+  DELETE FROM "MyFoo" WHERE f1='1' AND f2='b'
+ (1 row)
+
+ test=# select dblink_build_sql_update('foo','1 2',2,'{"1", "a"}','{"1", "b"}');
+                    dblink_build_sql_update
+ -------------------------------------------------------------
+  UPDATE foo SET f1='1',f2='b',f3='1' WHERE f1='1' AND f2='b'
+ (1 row)
+
+
+ ==================================================================
+ Name
+
+ dblink_current_query -- returns the current query string
+
+ Synopsis
+
+ dblink_current_query () RETURNS text
+
+ Inputs
+
+   None
+
+ Outputs
+
+   Returns text -- a copy of the currently executing query
+
+ Example usage
+
+ test=# select dblink_current_query() from (select dblink('dbname=template1','select oid, proname from pg_proc where
proname= ''byteacat''') as f1) as t1; 
+                                                                 dblink_current_query
+
-----------------------------------------------------------------------------------------------------------------------------------------------------
+  select dblink_current_query() from (select dblink('dbname=template1','select oid, proname from pg_proc where proname
=''byteacat''') as f1) as t1; 
+ (1 row)
+
+
+ ==================================================================
+ Name
+
+ dblink_replace -- replace all occurences of substring-a in the
+                   input-string with substring-b
+
+ Synopsis
+
+ dblink_replace(text input-string, text substring-a, text substring-b) RETURNS text
+
+ Inputs
+
+   input-string
+
+     the starting string, before replacement of substring-a
+
+   substring-a
+
+     the substring to find and replace
+
+   substring-b
+
+     the substring to be substituted in place of substring-a
+
+ Outputs
+
+   Returns text -- a copy of the starting string, but with all occurences of
+                   substring-a replaced with substring-b
+
+ Example usage
+
+ test=# select dblink_replace('12345678901234567890','56','hello');
+        dblink_replace
+ ----------------------------
+  1234hello78901234hello7890
+ (1 row)
+
+ ==================================================================
+

  -- Joe Conway

diff -cNr dblink.orig/dblink.c dblink/dblink.c
*** dblink.orig/dblink.c    Wed Oct 24 22:49:19 2001
--- dblink/dblink.c    Sun Apr 14 20:03:30 2002
***************
*** 3,9 ****
   *
   * Functions returning results from a remote database
   *
!  * Copyright (c) Joseph Conway <joe.conway@mail.com>, 2001;
   *
   * Permission to use, copy, modify, and distribute this software and its
   * documentation for any purpose, without fee, and without a written agreement
--- 3,10 ----
   *
   * Functions returning results from a remote database
   *
!  * Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002,
!  * ALL RIGHTS RESERVED;
   *
   * Permission to use, copy, modify, and distribute this software and its
   * documentation for any purpose, without fee, and without a written agreement
***************
*** 26,48 ****

  #include "dblink.h"

  PG_FUNCTION_INFO_V1(dblink);
  Datum
  dblink(PG_FUNCTION_ARGS)
  {
!     PGconn       *conn = NULL;
!     PGresult   *res = NULL;
!     dblink_results *results;
!     char       *optstr;
!     char       *sqlstatement;
!     char       *curstr = "DECLARE mycursor CURSOR FOR ";
!     char       *execstatement;
!     char       *msg;
!     int            ntuples = 0;
!     ReturnSetInfo *rsi;
!
!     if (PG_ARGISNULL(0) || PG_ARGISNULL(1))
!         elog(ERROR, "dblink: NULL arguments are not permitted");

      if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo))
          elog(ERROR, "dblink: function called in context that does not accept a set result");
--- 27,49 ----

  #include "dblink.h"

+ /* Global */
+ List    *res_id = NIL;
+ int        res_id_index = 0;
+
  PG_FUNCTION_INFO_V1(dblink);
  Datum
  dblink(PG_FUNCTION_ARGS)
  {
!     PGconn            *conn = NULL;
!     PGresult        *res = NULL;
!     dblink_results    *results;
!     char            *optstr;
!     char            *sqlstatement;
!     char            *execstatement;
!     char            *msg;
!     int                ntuples = 0;
!     ReturnSetInfo    *rsi;

      if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo))
          elog(ERROR, "dblink: function called in context that does not accept a set result");
***************
*** 61,81 ****
              elog(ERROR, "dblink: connection error: %s", msg);
          }

!         res = PQexec(conn, "BEGIN");
!         if (PQresultStatus(res) != PGRES_COMMAND_OK)
!         {
!             msg = pstrdup(PQerrorMessage(conn));
!             PQclear(res);
!             PQfinish(conn);
!             elog(ERROR, "dblink: begin error: %s", msg);
!         }
!         PQclear(res);
!
!         execstatement = (char *) palloc(strlen(curstr) + strlen(sqlstatement) + 1);
          if (execstatement != NULL)
          {
!             strcpy(execstatement, curstr);
!             strcat(execstatement, sqlstatement);
              strcat(execstatement, "\0");
          }
          else
--- 62,71 ----
              elog(ERROR, "dblink: connection error: %s", msg);
          }

!         execstatement = (char *) palloc(strlen(sqlstatement) + 1);
          if (execstatement != NULL)
          {
!             strcpy(execstatement, sqlstatement);
              strcat(execstatement, "\0");
          }
          else
***************
*** 94,163 ****
              /*
               * got results, start fetching them
               */
-             PQclear(res);
-
-             res = PQexec(conn, "FETCH ALL in mycursor");
-             if (!res || PQresultStatus(res) != PGRES_TUPLES_OK)
-             {
-                 msg = pstrdup(PQerrorMessage(conn));
-                 PQclear(res);
-                 PQfinish(conn);
-                 elog(ERROR, "dblink: sql error: %s", msg);
-             }
-
              ntuples = PQntuples(res);

!             if (ntuples > 0)
!             {
!
!                 results = init_dblink_results(fcinfo->flinfo->fn_mcxt);
!                 results->tup_num = 0;
!                 results->res = res;
!                 res = NULL;
!
!                 fcinfo->flinfo->fn_extra = (void *) results;
!
!                 results = NULL;
!                 results = fcinfo->flinfo->fn_extra;
!
!                 /* close the cursor */
!                 res = PQexec(conn, "CLOSE mycursor");
!                 PQclear(res);
!
!                 /* commit the transaction */
!                 res = PQexec(conn, "COMMIT");
!                 PQclear(res);
!
!                 /* close the connection to the database and cleanup */
!                 PQfinish(conn);
!
!                 rsi = (ReturnSetInfo *) fcinfo->resultinfo;
!                 rsi->isDone = ExprMultipleResult;
!
!                 PG_RETURN_POINTER(results);
!
!             }
!             else
!             {

!                 PQclear(res);

!                 /* close the cursor */
!                 res = PQexec(conn, "CLOSE mycursor");
!                 PQclear(res);

!                 /* commit the transaction */
!                 res = PQexec(conn, "COMMIT");
!                 PQclear(res);

!                 /* close the connection to the database and cleanup */
!                 PQfinish(conn);

!                 rsi = (ReturnSetInfo *) fcinfo->resultinfo;
!                 rsi->isDone = ExprEndResult;

!                 PG_RETURN_NULL();
!             }
          }
      }
      else
--- 84,119 ----
              /*
               * got results, start fetching them
               */
              ntuples = PQntuples(res);

!             /*
!              * increment resource index
!              */
!             res_id_index++;

!             results = init_dblink_results(fcinfo->flinfo->fn_mcxt);
!             results->tup_num = 0;
!             results->res_id_index = res_id_index;
!             results->res = res;

!             /*
!              * Append node to res_id to hold pointer to results.
!              * Needed by dblink_tok to access the data
!              */
!             append_res_ptr(results);

!             /*
!              * save pointer to results for the next function manager call
!              */
!             fcinfo->flinfo->fn_extra = (void *) results;

!             /* close the connection to the database and cleanup */
!             PQfinish(conn);

!             rsi = (ReturnSetInfo *) fcinfo->resultinfo;
!             rsi->isDone = ExprMultipleResult;

!             PG_RETURN_INT32(res_id_index);
          }
      }
      else
***************
*** 165,173 ****
          /*
           * check for more results
           */
-
          results = fcinfo->flinfo->fn_extra;
          results->tup_num++;
          ntuples = PQntuples(results->res);

          if (results->tup_num < ntuples)
--- 121,130 ----
          /*
           * check for more results
           */
          results = fcinfo->flinfo->fn_extra;
+
          results->tup_num++;
+         res_id_index = results->res_id_index;
          ntuples = PQntuples(results->res);

          if (results->tup_num < ntuples)
***************
*** 179,196 ****
              rsi = (ReturnSetInfo *) fcinfo->resultinfo;
              rsi->isDone = ExprMultipleResult;

!             PG_RETURN_POINTER(results);
!
          }
          else
          {
              /*
               * or if no more, clean things up
               */
-
              results = fcinfo->flinfo->fn_extra;

              PQclear(results->res);

              rsi = (ReturnSetInfo *) fcinfo->resultinfo;
              rsi->isDone = ExprEndResult;
--- 136,154 ----
              rsi = (ReturnSetInfo *) fcinfo->resultinfo;
              rsi->isDone = ExprMultipleResult;

!             PG_RETURN_INT32(res_id_index);
          }
          else
          {
              /*
               * or if no more, clean things up
               */
              results = fcinfo->flinfo->fn_extra;

+             remove_res_ptr(results);
              PQclear(results->res);
+             pfree(results);
+             fcinfo->flinfo->fn_extra = NULL;

              rsi = (ReturnSetInfo *) fcinfo->resultinfo;
              rsi->isDone = ExprEndResult;
***************
*** 214,249 ****
  dblink_tok(PG_FUNCTION_ARGS)
  {
      dblink_results *results;
!     int            fldnum;
!     text       *result_text;
!     char       *result;
!     int            nfields = 0;
!     int            text_len = 0;
!
!     if (PG_ARGISNULL(0) || PG_ARGISNULL(1))
!         elog(ERROR, "dblink: NULL arguments are not permitted");

!     results = (dblink_results *) PG_GETARG_POINTER(0);
      if (results == NULL)
!         elog(ERROR, "dblink: function called with invalid result pointer");

      fldnum = PG_GETARG_INT32(1);
      if (fldnum < 0)
!         elog(ERROR, "dblink: field number < 0 not permitted");

      nfields = PQnfields(results->res);
      if (fldnum > (nfields - 1))
!         elog(ERROR, "dblink: field number %d does not exist", fldnum);

      if (PQgetisnull(results->res, results->tup_num, fldnum) == 1)
-     {
-
          PG_RETURN_NULL();
-
-     }
      else
      {
-
          text_len = PQgetlength(results->res, results->tup_num, fldnum);

          result = (char *) palloc(text_len + 1);
--- 172,208 ----
  dblink_tok(PG_FUNCTION_ARGS)
  {
      dblink_results *results;
!     int                fldnum;
!     text            *result_text;
!     char            *result;
!     int                nfields = 0;
!     int                text_len = 0;

!     results = get_res_ptr(PG_GETARG_INT32(0));
      if (results == NULL)
!     {
!         if (res_id != NIL)
!         {
!             freeList(res_id);
!             res_id = NIL;
!             res_id_index = 0;
!         }
!
!         elog(ERROR, "dblink_tok: function called with invalid resource id");
!     }

      fldnum = PG_GETARG_INT32(1);
      if (fldnum < 0)
!         elog(ERROR, "dblink_tok: field number < 0 not permitted");

      nfields = PQnfields(results->res);
      if (fldnum > (nfields - 1))
!         elog(ERROR, "dblink_tok: field number %d does not exist", fldnum);

      if (PQgetisnull(results->res, results->tup_num, fldnum) == 1)
          PG_RETURN_NULL();
      else
      {
          text_len = PQgetlength(results->res, results->tup_num, fldnum);

          result = (char *) palloc(text_len + 1);
***************
*** 259,270 ****
--- 218,838 ----
          result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result)));

          PG_RETURN_TEXT_P(result_text);
+     }
+ }
+
+
+ /*
+  * dblink_strtok
+  * parse input string
+  * return ord item (0 based)
+  * based on provided field separator
+  */
+ PG_FUNCTION_INFO_V1(dblink_strtok);
+ Datum
+ dblink_strtok(PG_FUNCTION_ARGS)
+ {
+     char        *fldtext;
+     char        *fldsep;
+     int            fldnum;
+     char        *buffer;
+     text        *result_text;
+
+     fldtext = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0))));
+     fldsep = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1))));
+     fldnum = PG_GETARG_INT32(2);
+
+     if (fldtext[0] == '\0')
+     {
+         elog(ERROR, "get_strtok: blank list not permitted");
+     }
+     if (fldsep[0] == '\0')
+     {
+         elog(ERROR, "get_strtok: blank field separator not permitted");
+     }

+     buffer = get_strtok(fldtext, fldsep, fldnum);
+
+     pfree(fldtext);
+     pfree(fldsep);
+
+     if (buffer == NULL)
+     {
+         PG_RETURN_NULL();
+     }
+     else
+     {
+         result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(buffer)));
+         pfree(buffer);
+
+         PG_RETURN_TEXT_P(result_text);
      }
  }


  /*
+  * dblink_get_pkey
+  *
+  * Return comma delimited list of primary key
+  * fields for the supplied relation,
+  * or NULL if none exists.
+  */
+ PG_FUNCTION_INFO_V1(dblink_get_pkey);
+ Datum
+ dblink_get_pkey(PG_FUNCTION_ARGS)
+ {
+     char                    *relname;
+     Oid                        relid;
+     char                    **result;
+     text                    *result_text;
+     int16                    numatts;
+     ReturnSetInfo            *rsi;
+     dblink_array_results    *ret_set;
+
+     if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo))
+         elog(ERROR, "dblink: function called in context that does not accept a set result");
+
+     if (fcinfo->flinfo->fn_extra == NULL)
+     {
+         relname = NameStr(*PG_GETARG_NAME(0));
+
+         /*
+          * Convert relname to rel OID.
+          */
+         relid = get_relid_from_relname(relname);
+         if (!OidIsValid(relid))
+             elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist",
+                  relname);
+
+         /*
+          * get an array of attnums.
+          */
+         result = get_pkey_attnames(relid, &numatts);
+
+         if ((result != NULL) && (numatts > 0))
+         {
+             ret_set = init_dblink_array_results(fcinfo->flinfo->fn_mcxt);
+
+             ret_set->elem_num = 0;
+             ret_set->num_elems = numatts;
+             ret_set->res = result;
+
+             fcinfo->flinfo->fn_extra = (void *) ret_set;
+
+             rsi = (ReturnSetInfo *) fcinfo->resultinfo;
+             rsi->isDone = ExprMultipleResult;
+
+             result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num])));
+
+             PG_RETURN_TEXT_P(result_text);
+         }
+         else
+         {
+             rsi = (ReturnSetInfo *) fcinfo->resultinfo;
+             rsi->isDone = ExprEndResult;
+
+             PG_RETURN_NULL();
+         }
+     }
+     else
+     {
+         /*
+          * check for more results
+          */
+         ret_set = fcinfo->flinfo->fn_extra;
+         ret_set->elem_num++;
+         result = ret_set->res;
+
+         if (ret_set->elem_num < ret_set->num_elems)
+         {
+             /*
+              * fetch next one
+              */
+             rsi = (ReturnSetInfo *) fcinfo->resultinfo;
+             rsi->isDone = ExprMultipleResult;
+
+             result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num])));
+             PG_RETURN_TEXT_P(result_text);
+         }
+         else
+         {
+             int        i;
+
+             /*
+              * or if no more, clean things up
+              */
+             for (i = 0; i < ret_set->num_elems; i++)
+                 pfree(result[i]);
+
+             pfree(ret_set->res);
+             pfree(ret_set);
+
+             rsi = (ReturnSetInfo *) fcinfo->resultinfo;
+             rsi->isDone = ExprEndResult;
+
+             PG_RETURN_NULL();
+         }
+     }
+     PG_RETURN_NULL();
+ }
+
+
+ /*
+  * dblink_last_oid
+  * return last inserted oid
+  */
+ PG_FUNCTION_INFO_V1(dblink_last_oid);
+ Datum
+ dblink_last_oid(PG_FUNCTION_ARGS)
+ {
+     dblink_results *results;
+
+     results = get_res_ptr(PG_GETARG_INT32(0));
+     if (results == NULL)
+     {
+         if (res_id != NIL)
+         {
+             freeList(res_id);
+             res_id = NIL;
+             res_id_index = 0;
+         }
+
+         elog(ERROR, "dblink_tok: function called with invalid resource id");
+     }
+
+     PG_RETURN_OID(PQoidValue(results->res));
+ }
+
+
+ /*
+  * dblink_build_sql_insert
+  *
+  * Used to generate an SQL insert statement
+  * based on an existing tuple in a local relation.
+  * This is useful for selectively replicating data
+  * to another server via dblink.
+  *
+  * API:
+  * <relname> - name of local table of interest
+  * <pkattnums> - an int2vector of attnums which will be used
+  * to identify the local tuple of interest
+  * <pknumatts> - number of attnums in pkattnums
+  * <src_pkattvals_arry> - text array of key values which will be used
+  * to identify the local tuple of interest
+  * <tgt_pkattvals_arry> - text array of key values which will be used
+  * to build the string for execution remotely. These are substituted
+  * for their counterparts in src_pkattvals_arry
+  */
+ PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
+ Datum
+ dblink_build_sql_insert(PG_FUNCTION_ARGS)
+ {
+     Oid            relid;
+     char        *relname;
+     int16        *pkattnums;
+     int16        pknumatts;
+     char        **src_pkattvals;
+     char        **tgt_pkattvals;
+     ArrayType    *src_pkattvals_arry;
+     ArrayType    *tgt_pkattvals_arry;
+     int            src_ndim;
+     int            *src_dim;
+     int            src_nitems;
+     int            tgt_ndim;
+     int            *tgt_dim;
+     int            tgt_nitems;
+     int            i;
+     char        *ptr;
+     char        *sql;
+     text        *sql_text;
+
+     relname = NameStr(*PG_GETARG_NAME(0));
+
+     /*
+      * Convert relname to rel OID.
+      */
+     relid = get_relid_from_relname(relname);
+     if (!OidIsValid(relid))
+         elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist",
+              relname);
+
+     pkattnums = (int16 *) PG_GETARG_POINTER(1);
+     pknumatts = PG_GETARG_INT16(2);
+     /*
+      * There should be at least one key attribute
+      */
+     if (pknumatts == 0)
+         elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0.");
+
+     src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
+     tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
+
+     /*
+      * Source array is made up of key values that will be used to
+      * locate the tuple of interest from the local system.
+      */
+     src_ndim = ARR_NDIM(src_pkattvals_arry);
+     src_dim = ARR_DIMS(src_pkattvals_arry);
+     src_nitems = ArrayGetNItems(src_ndim, src_dim);
+
+     /*
+      * There should be one source array key value for each key attnum
+      */
+     if (src_nitems != pknumatts)
+         elog(ERROR, "dblink_build_sql_insert: source key array length does not match number of key attributes.");
+
+     /*
+      * get array of pointers to c-strings from the input source array
+      */
+     src_pkattvals = (char **) palloc(src_nitems * sizeof(char *));
+     ptr = ARR_DATA_PTR(src_pkattvals_arry);
+     for (i = 0; i < src_nitems; i++)
+     {
+         src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
+         ptr += INTALIGN(*(int32 *) ptr);
+     }
+
+     /*
+      * Target array is made up of key values that will be used to
+      * build the SQL string for use on the remote system.
+      */
+     tgt_ndim = ARR_NDIM(tgt_pkattvals_arry);
+     tgt_dim = ARR_DIMS(tgt_pkattvals_arry);
+     tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim);
+
+     /*
+      * There should be one target array key value for each key attnum
+      */
+     if (tgt_nitems != pknumatts)
+         elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes.");
+
+     /*
+      * get array of pointers to c-strings from the input target array
+      */
+     tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
+     ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
+     for (i = 0; i < tgt_nitems; i++)
+     {
+         tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
+         ptr += INTALIGN(*(int32 *) ptr);
+     }
+
+     /*
+      * Prep work is finally done. Go get the SQL string.
+      */
+     sql = get_sql_insert(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
+
+     /*
+      * Make it into TEXT for return to the client
+      */
+     sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql)));
+
+     /*
+      * And send it
+      */
+     PG_RETURN_TEXT_P(sql_text);
+ }
+
+
+ /*
+  * dblink_build_sql_delete
+  *
+  * Used to generate an SQL delete statement.
+  * This is useful for selectively replicating a
+  * delete to another server via dblink.
+  *
+  * API:
+  * <relname> - name of remote table of interest
+  * <pkattnums> - an int2vector of attnums which will be used
+  * to identify the remote tuple of interest
+  * <pknumatts> - number of attnums in pkattnums
+  * <tgt_pkattvals_arry> - text array of key values which will be used
+  * to build the string for execution remotely.
+  */
+ PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
+ Datum
+ dblink_build_sql_delete(PG_FUNCTION_ARGS)
+ {
+     Oid            relid;
+     char        *relname;
+     int16        *pkattnums;
+     int16        pknumatts;
+     char        **tgt_pkattvals;
+     ArrayType    *tgt_pkattvals_arry;
+     int            tgt_ndim;
+     int            *tgt_dim;
+     int            tgt_nitems;
+     int            i;
+     char        *ptr;
+     char        *sql;
+     text        *sql_text;
+
+     relname = NameStr(*PG_GETARG_NAME(0));
+
+     /*
+      * Convert relname to rel OID.
+      */
+     relid = get_relid_from_relname(relname);
+     if (!OidIsValid(relid))
+         elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist",
+              relname);
+
+     pkattnums = (int16 *) PG_GETARG_POINTER(1);
+     pknumatts = PG_GETARG_INT16(2);
+     /*
+      * There should be at least one key attribute
+      */
+     if (pknumatts == 0)
+         elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0.");
+
+     tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
+
+     /*
+      * Target array is made up of key values that will be used to
+      * build the SQL string for use on the remote system.
+      */
+     tgt_ndim = ARR_NDIM(tgt_pkattvals_arry);
+     tgt_dim = ARR_DIMS(tgt_pkattvals_arry);
+     tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim);
+
+     /*
+      * There should be one target array key value for each key attnum
+      */
+     if (tgt_nitems != pknumatts)
+         elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes.");
+
+     /*
+      * get array of pointers to c-strings from the input target array
+      */
+     tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
+     ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
+     for (i = 0; i < tgt_nitems; i++)
+     {
+         tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
+         ptr += INTALIGN(*(int32 *) ptr);
+     }
+
+     /*
+      * Prep work is finally done. Go get the SQL string.
+      */
+     sql = get_sql_delete(relid, pkattnums, pknumatts, tgt_pkattvals);
+
+     /*
+      * Make it into TEXT for return to the client
+      */
+     sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql)));
+
+     /*
+      * And send it
+      */
+     PG_RETURN_TEXT_P(sql_text);
+ }
+
+
+ /*
+  * dblink_build_sql_update
+  *
+  * Used to generate an SQL update statement
+  * based on an existing tuple in a local relation.
+  * This is useful for selectively replicating data
+  * to another server via dblink.
+  *
+  * API:
+  * <relname> - name of local table of interest
+  * <pkattnums> - an int2vector of attnums which will be used
+  * to identify the local tuple of interest
+  * <pknumatts> - number of attnums in pkattnums
+  * <src_pkattvals_arry> - text array of key values which will be used
+  * to identify the local tuple of interest
+  * <tgt_pkattvals_arry> - text array of key values which will be used
+  * to build the string for execution remotely. These are substituted
+  * for their counterparts in src_pkattvals_arry
+  */
+ PG_FUNCTION_INFO_V1(dblink_build_sql_update);
+ Datum
+ dblink_build_sql_update(PG_FUNCTION_ARGS)
+ {
+     Oid            relid;
+     char        *relname;
+     int16        *pkattnums;
+     int16        pknumatts;
+     char        **src_pkattvals;
+     char        **tgt_pkattvals;
+     ArrayType    *src_pkattvals_arry;
+     ArrayType    *tgt_pkattvals_arry;
+     int            src_ndim;
+     int            *src_dim;
+     int            src_nitems;
+     int            tgt_ndim;
+     int            *tgt_dim;
+     int            tgt_nitems;
+     int            i;
+     char        *ptr;
+     char        *sql;
+     text        *sql_text;
+
+     relname = NameStr(*PG_GETARG_NAME(0));
+
+     /*
+      * Convert relname to rel OID.
+      */
+     relid = get_relid_from_relname(relname);
+     if (!OidIsValid(relid))
+         elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist",
+              relname);
+
+     pkattnums = (int16 *) PG_GETARG_POINTER(1);
+     pknumatts = PG_GETARG_INT16(2);
+     /*
+      * There should be one source array key values for each key attnum
+      */
+     if (pknumatts == 0)
+         elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0.");
+
+     src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
+     tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
+
+     /*
+      * Source array is made up of key values that will be used to
+      * locate the tuple of interest from the local system.
+      */
+     src_ndim = ARR_NDIM(src_pkattvals_arry);
+     src_dim = ARR_DIMS(src_pkattvals_arry);
+     src_nitems = ArrayGetNItems(src_ndim, src_dim);
+
+     /*
+      * There should be one source array key value for each key attnum
+      */
+     if (src_nitems != pknumatts)
+         elog(ERROR, "dblink_build_sql_insert: source key array length does not match number of key attributes.");
+
+     /*
+      * get array of pointers to c-strings from the input source array
+      */
+     src_pkattvals = (char **) palloc(src_nitems * sizeof(char *));
+     ptr = ARR_DATA_PTR(src_pkattvals_arry);
+     for (i = 0; i < src_nitems; i++)
+     {
+         src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
+         ptr += INTALIGN(*(int32 *) ptr);
+     }
+
+     /*
+      * Target array is made up of key values that will be used to
+      * build the SQL string for use on the remote system.
+      */
+     tgt_ndim = ARR_NDIM(tgt_pkattvals_arry);
+     tgt_dim = ARR_DIMS(tgt_pkattvals_arry);
+     tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim);
+
+     /*
+      * There should be one target array key value for each key attnum
+      */
+     if (tgt_nitems != pknumatts)
+         elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes.");
+
+     /*
+      * get array of pointers to c-strings from the input target array
+      */
+     tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
+     ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
+     for (i = 0; i < tgt_nitems; i++)
+     {
+         tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
+         ptr += INTALIGN(*(int32 *) ptr);
+     }
+
+     /*
+      * Prep work is finally done. Go get the SQL string.
+      */
+     sql = get_sql_update(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
+
+     /*
+      * Make it into TEXT for return to the client
+      */
+     sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql)));
+
+     /*
+      * And send it
+      */
+     PG_RETURN_TEXT_P(sql_text);
+ }
+
+
+ /*
+  * dblink_current_query
+  * return the current query string
+  * to allow its use in (among other things)
+  * rewrite rules
+  */
+ PG_FUNCTION_INFO_V1(dblink_current_query);
+ Datum
+ dblink_current_query(PG_FUNCTION_ARGS)
+ {
+     text        *result_text;
+
+     result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(debug_query_string)));
+     PG_RETURN_TEXT_P(result_text);
+ }
+
+
+ /*
+  * dblink_replace_text
+  * replace all occurences of 'old_sub_str' in 'orig_str'
+  * with 'new_sub_str' to form 'new_str'
+  *
+  * returns 'orig_str' if 'old_sub_str' == '' or 'orig_str' == ''
+  * otherwise returns 'new_str'
+  */
+ PG_FUNCTION_INFO_V1(dblink_replace_text);
+ Datum
+ dblink_replace_text(PG_FUNCTION_ARGS)
+ {
+     text        *left_text;
+     text        *right_text;
+     text        *buf_text;
+     text        *ret_text;
+     char        *ret_str;
+     int            curr_posn;
+     text        *src_text = PG_GETARG_TEXT_P(0);
+     int            src_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(src_text)));
+     text        *from_sub_text = PG_GETARG_TEXT_P(1);
+     int            from_sub_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(from_sub_text)));
+     text        *to_sub_text = PG_GETARG_TEXT_P(2);
+     char        *to_sub_str = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(to_sub_text)));
+     StringInfo    str = makeStringInfo();
+
+     if (src_text_len == 0 || from_sub_text_len == 0)
+         PG_RETURN_TEXT_P(src_text);
+
+     buf_text = DatumGetTextPCopy(PointerGetDatum(src_text));
+     curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text),
PointerGetDatum(from_sub_text)));
+
+     while (curr_posn > 0)
+     {
+         left_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text), 1,
DatumGetInt32(DirectFunctionCall2(textpos,PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) - 1)); 
+         right_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text),
DatumGetInt32(DirectFunctionCall2(textpos,PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) +
from_sub_text_len,-1)); 
+
+         appendStringInfo(str, DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(left_text))));
+         appendStringInfo(str, to_sub_str);
+
+         pfree(buf_text);
+         pfree(left_text);
+         buf_text = right_text;
+         curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text),
PointerGetDatum(from_sub_text)));
+     }
+
+     appendStringInfo(str, DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(buf_text))));
+     pfree(buf_text);
+
+     ret_str = pstrdup(str->data);
+     ret_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(ret_str)));
+
+     PG_RETURN_TEXT_P(ret_text);
+ }
+
+
+ /*************************************************************
   * internal functions
   */

***************
*** 285,293 ****
--- 853,1408 ----
      MemSet(retval, 0, sizeof(dblink_results));

      retval->tup_num = -1;
+     retval->res_id_index =-1;
      retval->res = NULL;

      MemoryContextSwitchTo(oldcontext);

      return retval;
  }
+
+
+ /*
+  * init_dblink_array_results
+  *     - create an empty dblink_array_results data structure
+  */
+ dblink_array_results *
+ init_dblink_array_results(MemoryContext fn_mcxt)
+ {
+     MemoryContext oldcontext;
+     dblink_array_results *retval;
+
+     oldcontext = MemoryContextSwitchTo(fn_mcxt);
+
+     retval = (dblink_array_results *) palloc(sizeof(dblink_array_results));
+     MemSet(retval, 0, sizeof(dblink_array_results));
+
+     retval->elem_num = -1;
+     retval->num_elems = 0;
+     retval->res = NULL;
+
+     MemoryContextSwitchTo(oldcontext);
+
+     return retval;
+ }
+
+ /*
+  * get_pkey_attnames
+  *
+  * Get the primary key attnames for the given relation.
+  * Return NULL, and set numatts = 0, if no primary key exists.
+  */
+ char **
+ get_pkey_attnames(Oid relid, int16 *numatts)
+ {
+     Relation        indexRelation;
+     ScanKeyData        entry;
+     HeapScanDesc    scan;
+     HeapTuple        indexTuple;
+     int                i;
+     char            **result = NULL;
+     Relation        rel;
+     TupleDesc        tupdesc;
+
+     /*
+      * Open relation using relid, get tupdesc
+      */
+     rel = relation_open(relid, AccessShareLock);
+     tupdesc = rel->rd_att;
+
+     /*
+      * Initialize numatts to 0 in case no primary key
+      * exists
+      */
+     *numatts = 0;
+
+     /*
+      * Use relid to get all related indexes
+      */
+     indexRelation = heap_openr(IndexRelationName, AccessShareLock);
+     ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indrelid,
+                            F_OIDEQ, ObjectIdGetDatum(relid));
+     scan = heap_beginscan(indexRelation, false, SnapshotNow,
+                           1, &entry);
+
+     while (HeapTupleIsValid(indexTuple = heap_getnext(scan, 0)))
+     {
+         Form_pg_index    index = (Form_pg_index) GETSTRUCT(indexTuple);
+
+         /*
+          * We're only interested if it is the primary key
+          */
+         if (index->indisprimary == TRUE)
+         {
+             i = 0;
+             while (index->indkey[i++] != 0)
+                 (*numatts)++;
+
+             if (*numatts > 0)
+             {
+                 result = (char **) palloc(*numatts * sizeof(char *));
+                 for (i = 0; i < *numatts; i++)
+                     result[i] = SPI_fname(tupdesc, index->indkey[i]);
+             }
+             break;
+         }
+     }
+     heap_endscan(scan);
+     heap_close(indexRelation, AccessShareLock);
+     relation_close(rel, AccessShareLock);
+
+     return result;
+ }
+
+
+ /*
+  * get_strtok
+  *
+  * parse input string
+  * return ord item (0 based)
+  * based on provided field separator
+  */
+ char *
+ get_strtok(char *fldtext, char *fldsep, int fldnum)
+ {
+     int            j = 0;
+     char        *result;
+
+     if (fldnum < 0)
+     {
+         elog(ERROR, "get_strtok: field number < 0 not permitted");
+     }
+
+     if (fldsep[0] == '\0')
+     {
+         elog(ERROR, "get_strtok: blank field separator not permitted");
+     }
+
+     result = strtok(fldtext, fldsep);
+     for (j = 1; j < fldnum + 1; j++)
+     {
+         result = strtok(NULL, fldsep);
+         if (result == NULL)
+             return NULL;
+     }
+
+     return pstrdup(result);
+ }
+
+ char *
+ get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
+ {
+     Relation        rel;
+     char            *relname;
+     HeapTuple        tuple;
+     TupleDesc        tupdesc;
+     int                natts;
+     StringInfo        str = makeStringInfo();
+     char            *sql = NULL;
+     char            *val = NULL;
+     int16            key;
+     unsigned int    i;
+
+     /*
+      * Open relation using relid
+      */
+     rel = relation_open(relid, AccessShareLock);
+     relname =  RelationGetRelationName(rel);
+     tupdesc = rel->rd_att;
+     natts = tupdesc->natts;
+
+     tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
+
+     appendStringInfo(str, "INSERT INTO %s(", quote_ident_cstr(relname));
+     for (i = 0; i < natts; i++)
+     {
+         if (i > 0)
+             appendStringInfo(str, ",");
+
+         appendStringInfo(str, NameStr(tupdesc->attrs[i]->attname));
+     }
+
+     appendStringInfo(str, ") VALUES(");
+
+     /*
+      * remember attvals are 1 based
+      */
+     for (i = 0; i < natts; i++)
+     {
+         if (i > 0)
+             appendStringInfo(str, ",");
+
+         if (tgt_pkattvals != NULL)
+             key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1);
+         else
+             key = -1;
+
+         if (key > -1)
+             val = pstrdup(tgt_pkattvals[key]);
+         else
+             val = SPI_getvalue(tuple, tupdesc, i + 1);
+
+         if (val != NULL)
+         {
+             appendStringInfo(str, quote_literal_cstr(val));
+             pfree(val);
+         }
+         else
+             appendStringInfo(str, "NULL");
+     }
+     appendStringInfo(str, ")");
+
+     sql = pstrdup(str->data);
+     pfree(str->data);
+     pfree(str);
+     relation_close(rel, AccessShareLock);
+
+     return (sql);
+ }
+
+ char *
+ get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals)
+ {
+     Relation        rel;
+     char            *relname;
+     TupleDesc        tupdesc;
+     int                natts;
+     StringInfo        str = makeStringInfo();
+     char            *sql = NULL;
+     char            *val = NULL;
+     unsigned int    i;
+
+     /*
+      * Open relation using relid
+      */
+     rel = relation_open(relid, AccessShareLock);
+     relname =  RelationGetRelationName(rel);
+     tupdesc = rel->rd_att;
+     natts = tupdesc->natts;
+
+     appendStringInfo(str, "DELETE FROM %s WHERE ", quote_ident_cstr(relname));
+     for (i = 0; i < pknumatts; i++)
+     {
+         int16    pkattnum = pkattnums[i];
+
+         if (i > 0)
+             appendStringInfo(str, " AND ");
+
+         appendStringInfo(str, NameStr(tupdesc->attrs[pkattnum - 1]->attname));
+
+         if (tgt_pkattvals != NULL)
+             val = pstrdup(tgt_pkattvals[i]);
+         else
+             elog(ERROR, "Target key array must not be NULL");
+
+         if (val != NULL)
+         {
+             appendStringInfo(str, "=");
+             appendStringInfo(str, quote_literal_cstr(val));
+             pfree(val);
+         }
+         else
+             appendStringInfo(str, "IS NULL");
+     }
+
+     sql = pstrdup(str->data);
+     pfree(str->data);
+     pfree(str);
+     relation_close(rel, AccessShareLock);
+
+     return (sql);
+ }
+
+ char *
+ get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
+ {
+     Relation        rel;
+     char            *relname;
+     HeapTuple        tuple;
+     TupleDesc        tupdesc;
+     int                natts;
+     StringInfo        str = makeStringInfo();
+     char            *sql = NULL;
+     char            *val = NULL;
+     int16            key;
+     int                i;
+
+     /*
+      * Open relation using relid
+      */
+     rel = relation_open(relid, AccessShareLock);
+     relname =  RelationGetRelationName(rel);
+     tupdesc = rel->rd_att;
+     natts = tupdesc->natts;
+
+     tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
+
+     appendStringInfo(str, "UPDATE %s SET ", quote_ident_cstr(relname));
+
+     for (i = 0; i < natts; i++)
+     {
+         if (i > 0)
+             appendStringInfo(str, ",");
+
+         appendStringInfo(str, NameStr(tupdesc->attrs[i]->attname));
+         appendStringInfo(str, "=");
+
+         if (tgt_pkattvals != NULL)
+             key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1);
+         else
+             key = -1;
+
+         if (key > -1)
+             val = pstrdup(tgt_pkattvals[key]);
+         else
+             val = SPI_getvalue(tuple, tupdesc, i + 1);
+
+         if (val != NULL)
+         {
+             appendStringInfo(str, quote_literal_cstr(val));
+             pfree(val);
+         }
+         else
+             appendStringInfo(str, "NULL");
+     }
+
+     appendStringInfo(str, " WHERE ");
+
+     for (i = 0; i < pknumatts; i++)
+     {
+         int16    pkattnum = pkattnums[i];
+
+         if (i > 0)
+             appendStringInfo(str, " AND ");
+
+         appendStringInfo(str, NameStr(tupdesc->attrs[pkattnum - 1]->attname));
+
+         if (tgt_pkattvals != NULL)
+             val = pstrdup(tgt_pkattvals[i]);
+         else
+             val = SPI_getvalue(tuple, tupdesc, pkattnum);
+
+         if (val != NULL)
+         {
+             appendStringInfo(str, "=");
+             appendStringInfo(str, quote_literal_cstr(val));
+             pfree(val);
+         }
+         else
+             appendStringInfo(str, "IS NULL");
+     }
+
+     sql = pstrdup(str->data);
+     pfree(str->data);
+     pfree(str);
+     relation_close(rel, AccessShareLock);
+
+     return (sql);
+ }
+
+ /*
+  * Return a properly quoted literal value.
+  * Uses quote_literal in quote.c
+  */
+ static char *
+ quote_literal_cstr(char *rawstr)
+ {
+     text        *rawstr_text;
+     text        *result_text;
+     char        *result;
+
+     rawstr_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(rawstr)));
+     result_text = DatumGetTextP(DirectFunctionCall1(quote_literal, PointerGetDatum(rawstr_text)));
+     result = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(result_text)));
+
+     return result;
+ }
+
+ /*
+  * Return a properly quoted identifier.
+  * Uses quote_ident in quote.c
+  */
+ static char *
+ quote_ident_cstr(char *rawstr)
+ {
+     text        *rawstr_text;
+     text        *result_text;
+     char        *result;
+
+     rawstr_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(rawstr)));
+     result_text = DatumGetTextP(DirectFunctionCall1(quote_ident, PointerGetDatum(rawstr_text)));
+     result = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(result_text)));
+
+     return result;
+ }
+
+ int16
+ get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key)
+ {
+     int        i;
+
+     /*
+      * Not likely a long list anyway, so just scan for
+      * the value
+      */
+     for (i = 0; i < pknumatts; i++)
+         if (key == pkattnums[i])
+             return i;
+
+     return -1;
+ }
+
+ HeapTuple
+ get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals)
+ {
+     Relation        rel;
+     char            *relname;
+     TupleDesc        tupdesc;
+     StringInfo        str = makeStringInfo();
+     char            *sql = NULL;
+     int                ret;
+     HeapTuple        tuple;
+     int                i;
+     char            *val = NULL;
+
+     /*
+      * Open relation using relid
+      */
+     rel = relation_open(relid, AccessShareLock);
+     relname =  RelationGetRelationName(rel);
+     tupdesc = rel->rd_att;
+
+     /*
+      * Connect to SPI manager
+      */
+     if ((ret = SPI_connect()) < 0)
+         elog(ERROR, "get_tuple_of_interest: SPI_connect returned %d", ret);
+
+     /*
+      * Build sql statement to look up tuple of interest
+      * Use src_pkattvals as the criteria.
+      */
+     appendStringInfo(str, "SELECT * from %s WHERE ", relname);
+
+     for (i = 0; i < pknumatts; i++)
+     {
+         int16    pkattnum = pkattnums[i];
+
+         if (i > 0)
+             appendStringInfo(str, " AND ");
+
+         appendStringInfo(str, NameStr(tupdesc->attrs[pkattnum - 1]->attname));
+
+         val = pstrdup(src_pkattvals[i]);
+         if (val != NULL)
+         {
+             appendStringInfo(str, "=");
+             appendStringInfo(str, quote_literal_cstr(val));
+             pfree(val);
+         }
+         else
+             appendStringInfo(str, "IS NULL");
+     }
+
+     sql = pstrdup(str->data);
+     pfree(str->data);
+     pfree(str);
+     /*
+      * Retrieve the desired tuple
+      */
+     ret = SPI_exec(sql, 0);
+     pfree(sql);
+
+     /*
+      * Only allow one qualifying tuple
+      */
+     if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
+     {
+         elog(ERROR, "get_tuple_of_interest: Source criteria may not match more than one record.");
+     }
+     else if (ret == SPI_OK_SELECT && SPI_processed == 1)
+     {
+         SPITupleTable *tuptable = SPI_tuptable;
+         tuple = SPI_copytuple(tuptable->vals[0]);
+
+         return tuple;
+     }
+     else
+     {
+         /*
+          * no qualifying tuples
+          */
+         return NULL;
+     }
+
+     /*
+      * never reached, but keep compiler quiet
+      */
+     return NULL;
+ }
+
+ Oid
+ get_relid_from_relname(char *relname)
+ {
+ #ifdef NamespaceRelationName
+     Oid                relid;
+
+     relid = RelnameGetRelid(relname);
+ #else
+     Relation        rel;
+     Oid                relid;
+
+     rel = relation_openr(relname, AccessShareLock);
+     relid = RelationGetRelid(rel);
+     relation_close(rel, AccessShareLock);
+ #endif   /* NamespaceRelationName */
+
+     return relid;
+ }
+
+ dblink_results    *
+ get_res_ptr(int32 res_id_index)
+ {
+     List    *ptr;
+
+     /*
+      * short circuit empty list
+      */
+     if(res_id == NIL)
+         return NULL;
+
+     /*
+      * OK, should be good to go
+      */
+     foreach(ptr, res_id)
+     {
+         dblink_results    *this_res_id = (dblink_results *) lfirst(ptr);
+         if (this_res_id->res_id_index == res_id_index)
+             return this_res_id;
+     }
+     return NULL;
+ }
+
+ /*
+  * Add node to global List res_id
+  */
+ void
+ append_res_ptr(dblink_results *results)
+ {
+     res_id = lappend(res_id, results);
+ }
+
+ /*
+  * Remove node from global List
+  * using res_id_index
+  */
+ void
+ remove_res_ptr(dblink_results *results)
+ {
+     res_id = lremove(results, res_id);
+
+     if (res_id == NIL)
+         res_id_index = 0;
+ }
+
+
diff -cNr dblink.orig/dblink.h dblink/dblink.h
*** dblink.orig/dblink.h    Mon Nov  5 09:46:22 2001
--- dblink/dblink.h    Sun Apr 14 18:54:39 2002
***************
*** 3,9 ****
   *
   * Functions returning results from a remote database
   *
!  * Copyright (c) Joseph Conway <joe.conway@mail.com>, 2001;
   *
   * Permission to use, copy, modify, and distribute this software and its
   * documentation for any purpose, without fee, and without a written agreement
--- 3,10 ----
   *
   * Functions returning results from a remote database
   *
!  * Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002,
!  * ALL RIGHTS RESERVED;
   *
   * Permission to use, copy, modify, and distribute this software and its
   * documentation for any purpose, without fee, and without a written agreement
***************
*** 33,42 ****
--- 34,64 ----
  #include "libpq-int.h"
  #include "fmgr.h"
  #include "access/tupdesc.h"
+ #include "access/heapam.h"
+ #include "catalog/catname.h"
+ #include "catalog/pg_index.h"
+ #include "catalog/pg_type.h"
  #include "executor/executor.h"
+ #include "executor/spi.h"
+ #include "lib/stringinfo.h"
  #include "nodes/nodes.h"
  #include "nodes/execnodes.h"
+ #include "nodes/pg_list.h"
+ #include "parser/parse_type.h"
+ #include "tcop/tcopprot.h"
  #include "utils/builtins.h"
+ #include "utils/fmgroids.h"
+ #include "utils/array.h"
+ #include "utils/syscache.h"
+
+ #ifdef NamespaceRelationName
+ #include "catalog/namespace.h"
+ #endif   /* NamespaceRelationName */
+
+ /*
+  * Max SQL statement size
+  */
+ #define DBLINK_MAX_SQLSTATE_SIZE        16384

  /*
   * This struct holds the results of the remote query.
***************
*** 50,70 ****
      int            tup_num;

      /*
       * the actual query results
       */
      PGresult   *res;
-
  }    dblink_results;

  /*
   * External declarations
   */
  extern Datum dblink(PG_FUNCTION_ARGS);
  extern Datum dblink_tok(PG_FUNCTION_ARGS);

  /*
   * Internal declarations
   */
  dblink_results *init_dblink_results(MemoryContext fn_mcxt);

  #endif   /* DBLINK_H */
--- 72,145 ----
      int            tup_num;

      /*
+      * resource index number for this context
+      */
+     int            res_id_index;
+
+     /*
       * the actual query results
       */
      PGresult   *res;
  }    dblink_results;

+
+ /*
+  * This struct holds results in the form of an array.
+  * Use fn_extra to hold a pointer to it across calls
+  */
+ typedef struct
+ {
+     /*
+      * elem being accessed
+      */
+     int            elem_num;
+
+     /*
+      * number of elems
+      */
+     int            num_elems;
+
+     /*
+      * the actual array
+      */
+     void        *res;
+
+ }    dblink_array_results;
+
  /*
   * External declarations
   */
  extern Datum dblink(PG_FUNCTION_ARGS);
  extern Datum dblink_tok(PG_FUNCTION_ARGS);
+ extern Datum dblink_strtok(PG_FUNCTION_ARGS);
+ extern Datum dblink_get_pkey(PG_FUNCTION_ARGS);
+ extern Datum dblink_last_oid(PG_FUNCTION_ARGS);
+ extern Datum dblink_build_sql_insert(PG_FUNCTION_ARGS);
+ extern Datum dblink_build_sql_delete(PG_FUNCTION_ARGS);
+ extern Datum dblink_build_sql_update(PG_FUNCTION_ARGS);
+ extern Datum dblink_current_query(PG_FUNCTION_ARGS);
+ extern Datum dblink_replace_text(PG_FUNCTION_ARGS);

  /*
   * Internal declarations
   */
  dblink_results *init_dblink_results(MemoryContext fn_mcxt);
+ dblink_array_results *init_dblink_array_results(MemoryContext fn_mcxt);
+ char **get_pkey_attnames(Oid relid, int16 *numatts);
+ char *get_strtok(char *fldtext, char *fldsep, int fldnum);
+ char *getvalue(HeapTuple tuple, TupleDesc tupdesc, int fnumber);
+ char *get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
+ char *get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals);
+ char *get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
+ static char *quote_literal_cstr(char *rawstr);
+ static char *quote_ident_cstr(char *rawstr);
+ int16 get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key);
+ HeapTuple get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals);
+ Oid get_relid_from_relname(char *relname);
+ dblink_results    *get_res_ptr(int32 res_id_index);
+ void append_res_ptr(dblink_results *results);
+ void remove_res_ptr(dblink_results *results);
+
+ extern char    *debug_query_string;

  #endif   /* DBLINK_H */
diff -cNr dblink.orig/dblink.sql.in dblink/dblink.sql.in
*** dblink.orig/dblink.sql.in    Thu Jun 14 09:49:03 2001
--- dblink/dblink.sql.in    Fri Apr 12 14:36:49 2002
***************
*** 1,5 ****
! CREATE FUNCTION dblink (text,text) RETURNS setof int
!   AS 'MODULE_PATHNAME','dblink' LANGUAGE 'c';

! CREATE FUNCTION dblink_tok (int,int) RETURNS text
!   AS 'MODULE_PATHNAME','dblink_tok' LANGUAGE 'c';
--- 1,38 ----
! CREATE OR REPLACE FUNCTION dblink (text,text) RETURNS setof int
!   AS 'MODULE_PATHNAME','dblink' LANGUAGE 'c'
!   WITH (isstrict);

! CREATE OR REPLACE FUNCTION dblink_tok (int,int) RETURNS text
!   AS 'MODULE_PATHNAME','dblink_tok' LANGUAGE 'c'
!   WITH (isstrict);
!
! CREATE OR REPLACE FUNCTION dblink_strtok (text,text,int) RETURNS text
!   AS 'MODULE_PATHNAME','dblink_strtok' LANGUAGE 'c'
!   WITH (iscachable, isstrict);
!
! CREATE OR REPLACE FUNCTION dblink_get_pkey (name) RETURNS setof text
!   AS 'MODULE_PATHNAME','dblink_get_pkey' LANGUAGE 'c'
!   WITH (isstrict);
!
! CREATE OR REPLACE FUNCTION dblink_last_oid (int) RETURNS oid
!   AS 'MODULE_PATHNAME','dblink_last_oid' LANGUAGE 'c'
!   WITH (isstrict);
!
! CREATE OR REPLACE FUNCTION dblink_build_sql_insert (name, int2vector, int2, _text, _text) RETURNS text
!   AS 'MODULE_PATHNAME','dblink_build_sql_insert' LANGUAGE 'c'
!   WITH (isstrict);
!
! CREATE OR REPLACE FUNCTION dblink_build_sql_delete (name, int2vector, int2, _text) RETURNS text
!   AS 'MODULE_PATHNAME','dblink_build_sql_delete' LANGUAGE 'c'
!   WITH (isstrict);
!
! CREATE OR REPLACE FUNCTION dblink_build_sql_update (name, int2vector, int2, _text, _text) RETURNS text
!   AS 'MODULE_PATHNAME','dblink_build_sql_update' LANGUAGE 'c'
!   WITH (isstrict);
!
! CREATE OR REPLACE FUNCTION dblink_current_query () RETURNS text
!   AS 'MODULE_PATHNAME','dblink_current_query' LANGUAGE 'c';
!
! CREATE OR REPLACE FUNCTION dblink_replace (text,text,text) RETURNS text
!   AS 'MODULE_PATHNAME','dblink_replace_text' LANGUAGE 'c'
!   WITH (iscachable, isstrict);

Re: contrib/dblink update

From
Tom Lane
Date:
Joe Conway <mail@joeconway.com> writes:
> On a somewhat unrelated topic, I never got any feedback on the
> unknownin/out patch and the mb_substring patch. Is there anything else I
> need to do to get those applied?

In my mind, the unknownin/out business was waiting on commentary from
Tatsuo or Hiroshi on whether it was a bug for the TEXT code to be
rejecting byte sequences that had got by the initial pq_getstr
conversion.  If it is, then we should be fixing TEXT not adding new code
for UNKNOWN.  But if TEXT's behavior is considered desirable then we'll
need to change UNKNOWN.

Is the mb patch the one that fixes John Gray's oversight in substring?
If so, I thought that was due to be applied.  Bruce, were you going to
do that?

            regards, tom lane

Re: contrib/dblink update

From
Joe Conway
Date:
Tom Lane wrote:
> Joe Conway <mail@joeconway.com> writes:
>
>>On a somewhat unrelated topic, I never got any feedback on the
>>unknownin/out patch and the mb_substring patch. Is there anything else I
>>need to do to get those applied?
>
>
> In my mind, the unknownin/out business was waiting on commentary from
> Tatsuo or Hiroshi on whether it was a bug for the TEXT code to be
> rejecting byte sequences that had got by the initial pq_getstr
> conversion.  If it is, then we should be fixing TEXT not adding new code
> for UNKNOWN.  But if TEXT's behavior is considered desirable then we'll
> need to change UNKNOWN.

OK -- I did think you wanted to change unknown regardless, so that it
would not alter the input C string in any way. The patch is basically
just that, textin/textout with the multibyte and recode code stripped out.

>
> Is the mb patch the one that fixes John Gray's oversight in substring?
> If so, I thought that was due to be applied.  Bruce, were you going to
> do that?

Yes -- this patch fixed the MB handling in substring and was approved by
John.

Thanks,

Joe





Re: contrib/dblink update

From
Bruce Momjian
Date:
Your patch has been added to the PostgreSQL unapplied patches list at:

    http://candle.pha.pa.us/cgi-bin/pgpatches

I will try to apply it within the next 48 hours.

---------------------------------------------------------------------------


Joe Conway wrote:
> Attached is an update to contrib/dblink. Please apply if there are no
> objections.
>
> Major changes:
>    - removed cursor wrap around input sql to allow for remote
>      execution of INSERT/UPDATE/DELETE
>    - dblink now returns a resource id instead of a real pointer
>    - added several utility functions
>
> I'm still hoping to add explicit cursor open/fetch/close support before
> 7.3 is released, but I need a bit more time on that.
>
> On a somewhat unrelated topic, I never got any feedback on the
> unknownin/out patch and the mb_substring patch. Is there anything else I
> need to do to get those applied?
>
> Thanks,
>
> Joe

--
  Bruce Momjian                        |  http://candle.pha.pa.us
  pgman@candle.pha.pa.us               |  (610) 853-3000
  +  If your life is a hard drive,     |  830 Blythe Avenue
  +  Christ can be your backup.        |  Drexel Hill, Pennsylvania 19026

Re: contrib/dblink update

From
Bruce Momjian
Date:
Patch applied.  Thanks.

---------------------------------------------------------------------------


Joe Conway wrote:
> Attached is an update to contrib/dblink. Please apply if there are no
> objections.
>
> Major changes:
>    - removed cursor wrap around input sql to allow for remote
>      execution of INSERT/UPDATE/DELETE
>    - dblink now returns a resource id instead of a real pointer
>    - added several utility functions
>
> I'm still hoping to add explicit cursor open/fetch/close support before
> 7.3 is released, but I need a bit more time on that.
>
> On a somewhat unrelated topic, I never got any feedback on the
> unknownin/out patch and the mb_substring patch. Is there anything else I
> need to do to get those applied?
>
> Thanks,
>
> Joe

> diff -cNr dblink.orig/README.dblink dblink/README.dblink
> *** dblink.orig/README.dblink    Thu Dec 13 02:48:39 2001
> --- dblink/README.dblink    Sun Apr 14 20:02:06 2002
> ***************
> *** 3,9 ****
>    *
>    * Functions returning results from a remote database
>    *
> !  * Copyright (c) Joseph Conway <joe.conway@mail.com>, 2001;
>    *
>    * Permission to use, copy, modify, and distribute this software and its
>    * documentation for any purpose, without fee, and without a written agreement
> --- 3,10 ----
>    *
>    * Functions returning results from a remote database
>    *
> !  * Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002,
> !  * ALL RIGHTS RESERVED;
>    *
>    * Permission to use, copy, modify, and distribute this software and its
>    * documentation for any purpose, without fee, and without a written agreement
> ***************
> *** 25,36 ****
>    */
>
>
> ! Version 0.3 (14 June, 2001):
> !   Function to test returning data set from remote database
> !   Tested under Linux (Red Hat 6.2 and 7.0) and PostgreSQL 7.1 and 7.2devel
>
>   Release Notes:
>
>     Version 0.3
>       - fixed dblink invalid pointer causing corrupt elog message
>       - fixed dblink_tok improper handling of null results
> --- 26,44 ----
>    */
>
>
> ! Version 0.4 (7 April, 2002):
> !   Functions allowing remote database INSERT/UPDATE/DELETE/SELECT, and
> !   various utility functions.
> !   Tested under Linux (Red Hat 7.2) and PostgreSQL 7.2 and 7.3devel
>
>   Release Notes:
>
> +   Version 0.4
> +     - removed cursor wrap around input sql to allow for remote
> +       execution of INSERT/UPDATE/DELETE
> +     - dblink now returns a resource id instead of a real pointer
> +     - added several utility functions -- see below
> +
>     Version 0.3
>       - fixed dblink invalid pointer causing corrupt elog message
>       - fixed dblink_tok improper handling of null results
> ***************
> *** 51,64 ****
>
>     installs following functions into database template1:
>
> !      dblink() - returns a pointer to results from remote query
> !      dblink_tok() - extracts and returns individual field results
>
>   Documentation
>   ==================================================================
>   Name
>
> ! dblink -- Returns a pointer to a data set from a remote database
>
>   Synopsis
>
> --- 59,94 ----
>
>     installs following functions into database template1:
>
> !      dblink(text,text) RETURNS setof int
> !        - returns a resource id for results from remote query
> !      dblink_tok(int,int) RETURNS text
> !        - extracts and returns individual field results
> !      dblink_strtok(text,text,int) RETURNS text
> !        - extracts and returns individual token from delimited text
> !      dblink_get_pkey(name) RETURNS setof text
> !        - returns the field names of a relation's primary key fields
> !      dblink_last_oid(int) RETURNS oid
> !        - returns the last inserted oid
> !      dblink_build_sql_insert(name,int2vector,int2,_text,_text) RETURNS text
> !        - builds an insert statement using a local tuple, replacing the
> !          selection key field values with alternate supplied values
> !      dblink_build_sql_delete(name,int2vector,int2,_text) RETURNS text
> !        - builds a delete statement using supplied values for selection
> !          key field values
> !      dblink_build_sql_update(name,int2vector,int2,_text,_text) RETURNS text
> !        - builds an update statement using a local tuple, replacing the
> !          selection key field values with alternate supplied values
> !      dblink_current_query() RETURNS text
> !        - returns the current query string
> !      dblink_replace(text,text,text) RETURNS text
> !        - replace all occurences of substring-a in the input-string
> !          with substring-b
>
>   Documentation
>   ==================================================================
>   Name
>
> ! dblink -- Returns a resource id for a data set from a remote database
>
>   Synopsis
>
> ***************
> *** 78,84 ****
>
>   Outputs
>
> !   Returns setof int (pointer)
>
>   Example usage
>
> --- 108,114 ----
>
>   Outputs
>
> !   Returns setof int (res_id)
>
>   Example usage
>
> ***************
> *** 94,106 ****
>
>   Synopsis
>
> ! dblink_tok(int pointer, int fnumber)
>
>   Inputs
>
> !   pointer
>
> !     a pointer returned by a call to dblink()
>
>     fnumber
>
> --- 124,136 ----
>
>   Synopsis
>
> ! dblink_tok(int res_id, int fnumber)
>
>   Inputs
>
> !   res_id
>
> !     a resource id returned by a call to dblink()
>
>     fnumber
>
> ***************
> *** 131,136 ****
> --- 161,415 ----
>      select f1, f2 from myremotetable where f1 like 'bytea%';
>
>   ==================================================================
> + Name
> +
> + dblink_strtok -- Extracts and returns individual token from delimited text
> +
> + Synopsis
> +
> + dblink_strtok(text inputstring, text delimiter, int posn) RETURNS text
> +
> + Inputs
> +
> +   inputstring
> +
> +     any string you want to parse a token out of;
> +     e.g. 'f=1&g=3&h=4'
> +
> +   delimiter
> +
> +     a single character to use as the delimiter;
> +     e.g. '&' or '='
> +
> +   posn
> +
> +     the position of the token of interest, 0 based;
> +     e.g. 1
> +
> + Outputs
> +
> +   Returns text
> +
> + Example usage
> +
> + test=# select dblink_strtok(dblink_strtok('f=1&g=3&h=4','&',1),'=',1);
> +  dblink_strtok
> + ---------------
> +  3
> + (1 row)
> +
> + ==================================================================
> + Name
> +
> + dblink_get_pkey -- returns the field names of a relation's primary
> +                    key fields
> +
> + Synopsis
> +
> + dblink_get_pkey(name relname) RETURNS setof text
> +
> + Inputs
> +
> +   relname
> +
> +     any relation name;
> +     e.g. 'foobar'
> +
> + Outputs
> +
> +   Returns setof text -- one row for each primary key field, in order of
> +                         precedence
> +
> + Example usage
> +
> + test=# select dblink_get_pkey('foobar');
> +  dblink_get_pkey
> + -----------------
> +  f1
> +  f2
> +  f3
> +  f4
> +  f5
> + (5 rows)
> +
> +
> + ==================================================================
> + Name
> +
> + dblink_last_oid -- Returns last inserted oid
> +
> + Synopsis
> +
> + dblink_last_oid(int res_id) RETURNS oid
> +
> + Inputs
> +
> +   res_id
> +
> +     any resource id returned by dblink function;
> +
> + Outputs
> +
> +   Returns oid of last inserted tuple
> +
> + Example usage
> +
> + test=# select dblink_last_oid(dblink('hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd'
> +                ,'insert into mytable (f1, f2) values (1,2)'));
> +
> +  dblink_last_oid
> + ----------------
> +  16553
> + (1 row)
> +
> +
> + ==================================================================
> + Name
> +
> + dblink_build_sql_insert -- builds an insert statement using a local
> +                            tuple, replacing the selection key field
> +                            values with alternate supplied values
> + dblink_build_sql_delete -- builds a delete statement using supplied
> +                            values for selection key field values
> + dblink_build_sql_update -- builds an update statement using a local
> +                            tuple, replacing the selection key field
> +                            values with alternate supplied values
> +
> +
> + Synopsis
> +
> + dblink_build_sql_insert(name relname
> +                          ,int2vector primary_key_attnums
> +                          ,int2 num_primary_key_atts
> +                          ,_text src_pk_att_vals_array
> +                          ,_text tgt_pk_att_vals_array) RETURNS text
> + dblink_build_sql_delete(name relname
> +                          ,int2vector primary_key_attnums
> +                          ,int2 num_primary_key_atts
> +                          ,_text tgt_pk_att_vals_array) RETURNS text
> + dblink_build_sql_update(name relname
> +                          ,int2vector primary_key_attnums
> +                          ,int2 num_primary_key_atts
> +                          ,_text src_pk_att_vals_array
> +                          ,_text tgt_pk_att_vals_array) RETURNS text
> +
> + Inputs
> +
> +   relname
> +
> +     any relation name;
> +     e.g. 'foobar'
> +
> +   primary_key_attnums
> +
> +     vector of primary key attnums (1 based, see pg_index.indkey);
> +     e.g. '1 2'
> +
> +   num_primary_key_atts
> +
> +     number of primary key attnums in the vector; e.g. 2
> +
> +   src_pk_att_vals_array
> +
> +     array of primary key values, used to look up the local matching
> +     tuple, the values of which are then used to construct the SQL
> +     statement
> +
> +   tgt_pk_att_vals_array
> +
> +     array of primary key values, used to replace the local tuple
> +     values in the SQL statement
> +
> + Outputs
> +
> +   Returns text -- requested SQL statement
> +
> + Example usage
> +
> + test=# select dblink_build_sql_insert('foo','1 2',2,'{"1", "a"}','{"1", "b''a"}');
> +              dblink_build_sql_insert
> + --------------------------------------------------
> +  INSERT INTO foo(f1,f2,f3) VALUES('1','b''a','1')
> + (1 row)
> +
> + test=# select dblink_build_sql_delete('MyFoo','1 2',2,'{"1", "b"}');
> +            dblink_build_sql_delete
> + ---------------------------------------------
> +  DELETE FROM "MyFoo" WHERE f1='1' AND f2='b'
> + (1 row)
> +
> + test=# select dblink_build_sql_update('foo','1 2',2,'{"1", "a"}','{"1", "b"}');
> +                    dblink_build_sql_update
> + -------------------------------------------------------------
> +  UPDATE foo SET f1='1',f2='b',f3='1' WHERE f1='1' AND f2='b'
> + (1 row)
> +
> +
> + ==================================================================
> + Name
> +
> + dblink_current_query -- returns the current query string
> +
> + Synopsis
> +
> + dblink_current_query () RETURNS text
> +
> + Inputs
> +
> +   None
> +
> + Outputs
> +
> +   Returns text -- a copy of the currently executing query
> +
> + Example usage
> +
> + test=# select dblink_current_query() from (select dblink('dbname=template1','select oid, proname from pg_proc where
proname= ''byteacat''') as f1) as t1; 
> +                                                                 dblink_current_query
> +
-----------------------------------------------------------------------------------------------------------------------------------------------------
> +  select dblink_current_query() from (select dblink('dbname=template1','select oid, proname from pg_proc where
proname= ''byteacat''') as f1) as t1; 
> + (1 row)
> +
> +
> + ==================================================================
> + Name
> +
> + dblink_replace -- replace all occurences of substring-a in the
> +                   input-string with substring-b
> +
> + Synopsis
> +
> + dblink_replace(text input-string, text substring-a, text substring-b) RETURNS text
> +
> + Inputs
> +
> +   input-string
> +
> +     the starting string, before replacement of substring-a
> +
> +   substring-a
> +
> +     the substring to find and replace
> +
> +   substring-b
> +
> +     the substring to be substituted in place of substring-a
> +
> + Outputs
> +
> +   Returns text -- a copy of the starting string, but with all occurences of
> +                   substring-a replaced with substring-b
> +
> + Example usage
> +
> + test=# select dblink_replace('12345678901234567890','56','hello');
> +        dblink_replace
> + ----------------------------
> +  1234hello78901234hello7890
> + (1 row)
> +
> + ==================================================================
> +
>
>   -- Joe Conway
>
> diff -cNr dblink.orig/dblink.c dblink/dblink.c
> *** dblink.orig/dblink.c    Wed Oct 24 22:49:19 2001
> --- dblink/dblink.c    Sun Apr 14 20:03:30 2002
> ***************
> *** 3,9 ****
>    *
>    * Functions returning results from a remote database
>    *
> !  * Copyright (c) Joseph Conway <joe.conway@mail.com>, 2001;
>    *
>    * Permission to use, copy, modify, and distribute this software and its
>    * documentation for any purpose, without fee, and without a written agreement
> --- 3,10 ----
>    *
>    * Functions returning results from a remote database
>    *
> !  * Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002,
> !  * ALL RIGHTS RESERVED;
>    *
>    * Permission to use, copy, modify, and distribute this software and its
>    * documentation for any purpose, without fee, and without a written agreement
> ***************
> *** 26,48 ****
>
>   #include "dblink.h"
>
>   PG_FUNCTION_INFO_V1(dblink);
>   Datum
>   dblink(PG_FUNCTION_ARGS)
>   {
> !     PGconn       *conn = NULL;
> !     PGresult   *res = NULL;
> !     dblink_results *results;
> !     char       *optstr;
> !     char       *sqlstatement;
> !     char       *curstr = "DECLARE mycursor CURSOR FOR ";
> !     char       *execstatement;
> !     char       *msg;
> !     int            ntuples = 0;
> !     ReturnSetInfo *rsi;
> !
> !     if (PG_ARGISNULL(0) || PG_ARGISNULL(1))
> !         elog(ERROR, "dblink: NULL arguments are not permitted");
>
>       if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo))
>           elog(ERROR, "dblink: function called in context that does not accept a set result");
> --- 27,49 ----
>
>   #include "dblink.h"
>
> + /* Global */
> + List    *res_id = NIL;
> + int        res_id_index = 0;
> +
>   PG_FUNCTION_INFO_V1(dblink);
>   Datum
>   dblink(PG_FUNCTION_ARGS)
>   {
> !     PGconn            *conn = NULL;
> !     PGresult        *res = NULL;
> !     dblink_results    *results;
> !     char            *optstr;
> !     char            *sqlstatement;
> !     char            *execstatement;
> !     char            *msg;
> !     int                ntuples = 0;
> !     ReturnSetInfo    *rsi;
>
>       if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo))
>           elog(ERROR, "dblink: function called in context that does not accept a set result");
> ***************
> *** 61,81 ****
>               elog(ERROR, "dblink: connection error: %s", msg);
>           }
>
> !         res = PQexec(conn, "BEGIN");
> !         if (PQresultStatus(res) != PGRES_COMMAND_OK)
> !         {
> !             msg = pstrdup(PQerrorMessage(conn));
> !             PQclear(res);
> !             PQfinish(conn);
> !             elog(ERROR, "dblink: begin error: %s", msg);
> !         }
> !         PQclear(res);
> !
> !         execstatement = (char *) palloc(strlen(curstr) + strlen(sqlstatement) + 1);
>           if (execstatement != NULL)
>           {
> !             strcpy(execstatement, curstr);
> !             strcat(execstatement, sqlstatement);
>               strcat(execstatement, "\0");
>           }
>           else
> --- 62,71 ----
>               elog(ERROR, "dblink: connection error: %s", msg);
>           }
>
> !         execstatement = (char *) palloc(strlen(sqlstatement) + 1);
>           if (execstatement != NULL)
>           {
> !             strcpy(execstatement, sqlstatement);
>               strcat(execstatement, "\0");
>           }
>           else
> ***************
> *** 94,163 ****
>               /*
>                * got results, start fetching them
>                */
> -             PQclear(res);
> -
> -             res = PQexec(conn, "FETCH ALL in mycursor");
> -             if (!res || PQresultStatus(res) != PGRES_TUPLES_OK)
> -             {
> -                 msg = pstrdup(PQerrorMessage(conn));
> -                 PQclear(res);
> -                 PQfinish(conn);
> -                 elog(ERROR, "dblink: sql error: %s", msg);
> -             }
> -
>               ntuples = PQntuples(res);
>
> !             if (ntuples > 0)
> !             {
> !
> !                 results = init_dblink_results(fcinfo->flinfo->fn_mcxt);
> !                 results->tup_num = 0;
> !                 results->res = res;
> !                 res = NULL;
> !
> !                 fcinfo->flinfo->fn_extra = (void *) results;
> !
> !                 results = NULL;
> !                 results = fcinfo->flinfo->fn_extra;
> !
> !                 /* close the cursor */
> !                 res = PQexec(conn, "CLOSE mycursor");
> !                 PQclear(res);
> !
> !                 /* commit the transaction */
> !                 res = PQexec(conn, "COMMIT");
> !                 PQclear(res);
> !
> !                 /* close the connection to the database and cleanup */
> !                 PQfinish(conn);
> !
> !                 rsi = (ReturnSetInfo *) fcinfo->resultinfo;
> !                 rsi->isDone = ExprMultipleResult;
> !
> !                 PG_RETURN_POINTER(results);
> !
> !             }
> !             else
> !             {
>
> !                 PQclear(res);
>
> !                 /* close the cursor */
> !                 res = PQexec(conn, "CLOSE mycursor");
> !                 PQclear(res);
>
> !                 /* commit the transaction */
> !                 res = PQexec(conn, "COMMIT");
> !                 PQclear(res);
>
> !                 /* close the connection to the database and cleanup */
> !                 PQfinish(conn);
>
> !                 rsi = (ReturnSetInfo *) fcinfo->resultinfo;
> !                 rsi->isDone = ExprEndResult;
>
> !                 PG_RETURN_NULL();
> !             }
>           }
>       }
>       else
> --- 84,119 ----
>               /*
>                * got results, start fetching them
>                */
>               ntuples = PQntuples(res);
>
> !             /*
> !              * increment resource index
> !              */
> !             res_id_index++;
>
> !             results = init_dblink_results(fcinfo->flinfo->fn_mcxt);
> !             results->tup_num = 0;
> !             results->res_id_index = res_id_index;
> !             results->res = res;
>
> !             /*
> !              * Append node to res_id to hold pointer to results.
> !              * Needed by dblink_tok to access the data
> !              */
> !             append_res_ptr(results);
>
> !             /*
> !              * save pointer to results for the next function manager call
> !              */
> !             fcinfo->flinfo->fn_extra = (void *) results;
>
> !             /* close the connection to the database and cleanup */
> !             PQfinish(conn);
>
> !             rsi = (ReturnSetInfo *) fcinfo->resultinfo;
> !             rsi->isDone = ExprMultipleResult;
>
> !             PG_RETURN_INT32(res_id_index);
>           }
>       }
>       else
> ***************
> *** 165,173 ****
>           /*
>            * check for more results
>            */
> -
>           results = fcinfo->flinfo->fn_extra;
>           results->tup_num++;
>           ntuples = PQntuples(results->res);
>
>           if (results->tup_num < ntuples)
> --- 121,130 ----
>           /*
>            * check for more results
>            */
>           results = fcinfo->flinfo->fn_extra;
> +
>           results->tup_num++;
> +         res_id_index = results->res_id_index;
>           ntuples = PQntuples(results->res);
>
>           if (results->tup_num < ntuples)
> ***************
> *** 179,196 ****
>               rsi = (ReturnSetInfo *) fcinfo->resultinfo;
>               rsi->isDone = ExprMultipleResult;
>
> !             PG_RETURN_POINTER(results);
> !
>           }
>           else
>           {
>               /*
>                * or if no more, clean things up
>                */
> -
>               results = fcinfo->flinfo->fn_extra;
>
>               PQclear(results->res);
>
>               rsi = (ReturnSetInfo *) fcinfo->resultinfo;
>               rsi->isDone = ExprEndResult;
> --- 136,154 ----
>               rsi = (ReturnSetInfo *) fcinfo->resultinfo;
>               rsi->isDone = ExprMultipleResult;
>
> !             PG_RETURN_INT32(res_id_index);
>           }
>           else
>           {
>               /*
>                * or if no more, clean things up
>                */
>               results = fcinfo->flinfo->fn_extra;
>
> +             remove_res_ptr(results);
>               PQclear(results->res);
> +             pfree(results);
> +             fcinfo->flinfo->fn_extra = NULL;
>
>               rsi = (ReturnSetInfo *) fcinfo->resultinfo;
>               rsi->isDone = ExprEndResult;
> ***************
> *** 214,249 ****
>   dblink_tok(PG_FUNCTION_ARGS)
>   {
>       dblink_results *results;
> !     int            fldnum;
> !     text       *result_text;
> !     char       *result;
> !     int            nfields = 0;
> !     int            text_len = 0;
> !
> !     if (PG_ARGISNULL(0) || PG_ARGISNULL(1))
> !         elog(ERROR, "dblink: NULL arguments are not permitted");
>
> !     results = (dblink_results *) PG_GETARG_POINTER(0);
>       if (results == NULL)
> !         elog(ERROR, "dblink: function called with invalid result pointer");
>
>       fldnum = PG_GETARG_INT32(1);
>       if (fldnum < 0)
> !         elog(ERROR, "dblink: field number < 0 not permitted");
>
>       nfields = PQnfields(results->res);
>       if (fldnum > (nfields - 1))
> !         elog(ERROR, "dblink: field number %d does not exist", fldnum);
>
>       if (PQgetisnull(results->res, results->tup_num, fldnum) == 1)
> -     {
> -
>           PG_RETURN_NULL();
> -
> -     }
>       else
>       {
> -
>           text_len = PQgetlength(results->res, results->tup_num, fldnum);
>
>           result = (char *) palloc(text_len + 1);
> --- 172,208 ----
>   dblink_tok(PG_FUNCTION_ARGS)
>   {
>       dblink_results *results;
> !     int                fldnum;
> !     text            *result_text;
> !     char            *result;
> !     int                nfields = 0;
> !     int                text_len = 0;
>
> !     results = get_res_ptr(PG_GETARG_INT32(0));
>       if (results == NULL)
> !     {
> !         if (res_id != NIL)
> !         {
> !             freeList(res_id);
> !             res_id = NIL;
> !             res_id_index = 0;
> !         }
> !
> !         elog(ERROR, "dblink_tok: function called with invalid resource id");
> !     }
>
>       fldnum = PG_GETARG_INT32(1);
>       if (fldnum < 0)
> !         elog(ERROR, "dblink_tok: field number < 0 not permitted");
>
>       nfields = PQnfields(results->res);
>       if (fldnum > (nfields - 1))
> !         elog(ERROR, "dblink_tok: field number %d does not exist", fldnum);
>
>       if (PQgetisnull(results->res, results->tup_num, fldnum) == 1)
>           PG_RETURN_NULL();
>       else
>       {
>           text_len = PQgetlength(results->res, results->tup_num, fldnum);
>
>           result = (char *) palloc(text_len + 1);
> ***************
> *** 259,270 ****
> --- 218,838 ----
>           result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result)));
>
>           PG_RETURN_TEXT_P(result_text);
> +     }
> + }
> +
> +
> + /*
> +  * dblink_strtok
> +  * parse input string
> +  * return ord item (0 based)
> +  * based on provided field separator
> +  */
> + PG_FUNCTION_INFO_V1(dblink_strtok);
> + Datum
> + dblink_strtok(PG_FUNCTION_ARGS)
> + {
> +     char        *fldtext;
> +     char        *fldsep;
> +     int            fldnum;
> +     char        *buffer;
> +     text        *result_text;
> +
> +     fldtext = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0))));
> +     fldsep = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1))));
> +     fldnum = PG_GETARG_INT32(2);
> +
> +     if (fldtext[0] == '\0')
> +     {
> +         elog(ERROR, "get_strtok: blank list not permitted");
> +     }
> +     if (fldsep[0] == '\0')
> +     {
> +         elog(ERROR, "get_strtok: blank field separator not permitted");
> +     }
>
> +     buffer = get_strtok(fldtext, fldsep, fldnum);
> +
> +     pfree(fldtext);
> +     pfree(fldsep);
> +
> +     if (buffer == NULL)
> +     {
> +         PG_RETURN_NULL();
> +     }
> +     else
> +     {
> +         result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(buffer)));
> +         pfree(buffer);
> +
> +         PG_RETURN_TEXT_P(result_text);
>       }
>   }
>
>
>   /*
> +  * dblink_get_pkey
> +  *
> +  * Return comma delimited list of primary key
> +  * fields for the supplied relation,
> +  * or NULL if none exists.
> +  */
> + PG_FUNCTION_INFO_V1(dblink_get_pkey);
> + Datum
> + dblink_get_pkey(PG_FUNCTION_ARGS)
> + {
> +     char                    *relname;
> +     Oid                        relid;
> +     char                    **result;
> +     text                    *result_text;
> +     int16                    numatts;
> +     ReturnSetInfo            *rsi;
> +     dblink_array_results    *ret_set;
> +
> +     if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo))
> +         elog(ERROR, "dblink: function called in context that does not accept a set result");
> +
> +     if (fcinfo->flinfo->fn_extra == NULL)
> +     {
> +         relname = NameStr(*PG_GETARG_NAME(0));
> +
> +         /*
> +          * Convert relname to rel OID.
> +          */
> +         relid = get_relid_from_relname(relname);
> +         if (!OidIsValid(relid))
> +             elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist",
> +                  relname);
> +
> +         /*
> +          * get an array of attnums.
> +          */
> +         result = get_pkey_attnames(relid, &numatts);
> +
> +         if ((result != NULL) && (numatts > 0))
> +         {
> +             ret_set = init_dblink_array_results(fcinfo->flinfo->fn_mcxt);
> +
> +             ret_set->elem_num = 0;
> +             ret_set->num_elems = numatts;
> +             ret_set->res = result;
> +
> +             fcinfo->flinfo->fn_extra = (void *) ret_set;
> +
> +             rsi = (ReturnSetInfo *) fcinfo->resultinfo;
> +             rsi->isDone = ExprMultipleResult;
> +
> +             result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num])));
> +
> +             PG_RETURN_TEXT_P(result_text);
> +         }
> +         else
> +         {
> +             rsi = (ReturnSetInfo *) fcinfo->resultinfo;
> +             rsi->isDone = ExprEndResult;
> +
> +             PG_RETURN_NULL();
> +         }
> +     }
> +     else
> +     {
> +         /*
> +          * check for more results
> +          */
> +         ret_set = fcinfo->flinfo->fn_extra;
> +         ret_set->elem_num++;
> +         result = ret_set->res;
> +
> +         if (ret_set->elem_num < ret_set->num_elems)
> +         {
> +             /*
> +              * fetch next one
> +              */
> +             rsi = (ReturnSetInfo *) fcinfo->resultinfo;
> +             rsi->isDone = ExprMultipleResult;
> +
> +             result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num])));
> +             PG_RETURN_TEXT_P(result_text);
> +         }
> +         else
> +         {
> +             int        i;
> +
> +             /*
> +              * or if no more, clean things up
> +              */
> +             for (i = 0; i < ret_set->num_elems; i++)
> +                 pfree(result[i]);
> +
> +             pfree(ret_set->res);
> +             pfree(ret_set);
> +
> +             rsi = (ReturnSetInfo *) fcinfo->resultinfo;
> +             rsi->isDone = ExprEndResult;
> +
> +             PG_RETURN_NULL();
> +         }
> +     }
> +     PG_RETURN_NULL();
> + }
> +
> +
> + /*
> +  * dblink_last_oid
> +  * return last inserted oid
> +  */
> + PG_FUNCTION_INFO_V1(dblink_last_oid);
> + Datum
> + dblink_last_oid(PG_FUNCTION_ARGS)
> + {
> +     dblink_results *results;
> +
> +     results = get_res_ptr(PG_GETARG_INT32(0));
> +     if (results == NULL)
> +     {
> +         if (res_id != NIL)
> +         {
> +             freeList(res_id);
> +             res_id = NIL;
> +             res_id_index = 0;
> +         }
> +
> +         elog(ERROR, "dblink_tok: function called with invalid resource id");
> +     }
> +
> +     PG_RETURN_OID(PQoidValue(results->res));
> + }
> +
> +
> + /*
> +  * dblink_build_sql_insert
> +  *
> +  * Used to generate an SQL insert statement
> +  * based on an existing tuple in a local relation.
> +  * This is useful for selectively replicating data
> +  * to another server via dblink.
> +  *
> +  * API:
> +  * <relname> - name of local table of interest
> +  * <pkattnums> - an int2vector of attnums which will be used
> +  * to identify the local tuple of interest
> +  * <pknumatts> - number of attnums in pkattnums
> +  * <src_pkattvals_arry> - text array of key values which will be used
> +  * to identify the local tuple of interest
> +  * <tgt_pkattvals_arry> - text array of key values which will be used
> +  * to build the string for execution remotely. These are substituted
> +  * for their counterparts in src_pkattvals_arry
> +  */
> + PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
> + Datum
> + dblink_build_sql_insert(PG_FUNCTION_ARGS)
> + {
> +     Oid            relid;
> +     char        *relname;
> +     int16        *pkattnums;
> +     int16        pknumatts;
> +     char        **src_pkattvals;
> +     char        **tgt_pkattvals;
> +     ArrayType    *src_pkattvals_arry;
> +     ArrayType    *tgt_pkattvals_arry;
> +     int            src_ndim;
> +     int            *src_dim;
> +     int            src_nitems;
> +     int            tgt_ndim;
> +     int            *tgt_dim;
> +     int            tgt_nitems;
> +     int            i;
> +     char        *ptr;
> +     char        *sql;
> +     text        *sql_text;
> +
> +     relname = NameStr(*PG_GETARG_NAME(0));
> +
> +     /*
> +      * Convert relname to rel OID.
> +      */
> +     relid = get_relid_from_relname(relname);
> +     if (!OidIsValid(relid))
> +         elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist",
> +              relname);
> +
> +     pkattnums = (int16 *) PG_GETARG_POINTER(1);
> +     pknumatts = PG_GETARG_INT16(2);
> +     /*
> +      * There should be at least one key attribute
> +      */
> +     if (pknumatts == 0)
> +         elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0.");
> +
> +     src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
> +     tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
> +
> +     /*
> +      * Source array is made up of key values that will be used to
> +      * locate the tuple of interest from the local system.
> +      */
> +     src_ndim = ARR_NDIM(src_pkattvals_arry);
> +     src_dim = ARR_DIMS(src_pkattvals_arry);
> +     src_nitems = ArrayGetNItems(src_ndim, src_dim);
> +
> +     /*
> +      * There should be one source array key value for each key attnum
> +      */
> +     if (src_nitems != pknumatts)
> +         elog(ERROR, "dblink_build_sql_insert: source key array length does not match number of key attributes.");
> +
> +     /*
> +      * get array of pointers to c-strings from the input source array
> +      */
> +     src_pkattvals = (char **) palloc(src_nitems * sizeof(char *));
> +     ptr = ARR_DATA_PTR(src_pkattvals_arry);
> +     for (i = 0; i < src_nitems; i++)
> +     {
> +         src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
> +         ptr += INTALIGN(*(int32 *) ptr);
> +     }
> +
> +     /*
> +      * Target array is made up of key values that will be used to
> +      * build the SQL string for use on the remote system.
> +      */
> +     tgt_ndim = ARR_NDIM(tgt_pkattvals_arry);
> +     tgt_dim = ARR_DIMS(tgt_pkattvals_arry);
> +     tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim);
> +
> +     /*
> +      * There should be one target array key value for each key attnum
> +      */
> +     if (tgt_nitems != pknumatts)
> +         elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes.");
> +
> +     /*
> +      * get array of pointers to c-strings from the input target array
> +      */
> +     tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
> +     ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
> +     for (i = 0; i < tgt_nitems; i++)
> +     {
> +         tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
> +         ptr += INTALIGN(*(int32 *) ptr);
> +     }
> +
> +     /*
> +      * Prep work is finally done. Go get the SQL string.
> +      */
> +     sql = get_sql_insert(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
> +
> +     /*
> +      * Make it into TEXT for return to the client
> +      */
> +     sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql)));
> +
> +     /*
> +      * And send it
> +      */
> +     PG_RETURN_TEXT_P(sql_text);
> + }
> +
> +
> + /*
> +  * dblink_build_sql_delete
> +  *
> +  * Used to generate an SQL delete statement.
> +  * This is useful for selectively replicating a
> +  * delete to another server via dblink.
> +  *
> +  * API:
> +  * <relname> - name of remote table of interest
> +  * <pkattnums> - an int2vector of attnums which will be used
> +  * to identify the remote tuple of interest
> +  * <pknumatts> - number of attnums in pkattnums
> +  * <tgt_pkattvals_arry> - text array of key values which will be used
> +  * to build the string for execution remotely.
> +  */
> + PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
> + Datum
> + dblink_build_sql_delete(PG_FUNCTION_ARGS)
> + {
> +     Oid            relid;
> +     char        *relname;
> +     int16        *pkattnums;
> +     int16        pknumatts;
> +     char        **tgt_pkattvals;
> +     ArrayType    *tgt_pkattvals_arry;
> +     int            tgt_ndim;
> +     int            *tgt_dim;
> +     int            tgt_nitems;
> +     int            i;
> +     char        *ptr;
> +     char        *sql;
> +     text        *sql_text;
> +
> +     relname = NameStr(*PG_GETARG_NAME(0));
> +
> +     /*
> +      * Convert relname to rel OID.
> +      */
> +     relid = get_relid_from_relname(relname);
> +     if (!OidIsValid(relid))
> +         elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist",
> +              relname);
> +
> +     pkattnums = (int16 *) PG_GETARG_POINTER(1);
> +     pknumatts = PG_GETARG_INT16(2);
> +     /*
> +      * There should be at least one key attribute
> +      */
> +     if (pknumatts == 0)
> +         elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0.");
> +
> +     tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
> +
> +     /*
> +      * Target array is made up of key values that will be used to
> +      * build the SQL string for use on the remote system.
> +      */
> +     tgt_ndim = ARR_NDIM(tgt_pkattvals_arry);
> +     tgt_dim = ARR_DIMS(tgt_pkattvals_arry);
> +     tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim);
> +
> +     /*
> +      * There should be one target array key value for each key attnum
> +      */
> +     if (tgt_nitems != pknumatts)
> +         elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes.");
> +
> +     /*
> +      * get array of pointers to c-strings from the input target array
> +      */
> +     tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
> +     ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
> +     for (i = 0; i < tgt_nitems; i++)
> +     {
> +         tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
> +         ptr += INTALIGN(*(int32 *) ptr);
> +     }
> +
> +     /*
> +      * Prep work is finally done. Go get the SQL string.
> +      */
> +     sql = get_sql_delete(relid, pkattnums, pknumatts, tgt_pkattvals);
> +
> +     /*
> +      * Make it into TEXT for return to the client
> +      */
> +     sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql)));
> +
> +     /*
> +      * And send it
> +      */
> +     PG_RETURN_TEXT_P(sql_text);
> + }
> +
> +
> + /*
> +  * dblink_build_sql_update
> +  *
> +  * Used to generate an SQL update statement
> +  * based on an existing tuple in a local relation.
> +  * This is useful for selectively replicating data
> +  * to another server via dblink.
> +  *
> +  * API:
> +  * <relname> - name of local table of interest
> +  * <pkattnums> - an int2vector of attnums which will be used
> +  * to identify the local tuple of interest
> +  * <pknumatts> - number of attnums in pkattnums
> +  * <src_pkattvals_arry> - text array of key values which will be used
> +  * to identify the local tuple of interest
> +  * <tgt_pkattvals_arry> - text array of key values which will be used
> +  * to build the string for execution remotely. These are substituted
> +  * for their counterparts in src_pkattvals_arry
> +  */
> + PG_FUNCTION_INFO_V1(dblink_build_sql_update);
> + Datum
> + dblink_build_sql_update(PG_FUNCTION_ARGS)
> + {
> +     Oid            relid;
> +     char        *relname;
> +     int16        *pkattnums;
> +     int16        pknumatts;
> +     char        **src_pkattvals;
> +     char        **tgt_pkattvals;
> +     ArrayType    *src_pkattvals_arry;
> +     ArrayType    *tgt_pkattvals_arry;
> +     int            src_ndim;
> +     int            *src_dim;
> +     int            src_nitems;
> +     int            tgt_ndim;
> +     int            *tgt_dim;
> +     int            tgt_nitems;
> +     int            i;
> +     char        *ptr;
> +     char        *sql;
> +     text        *sql_text;
> +
> +     relname = NameStr(*PG_GETARG_NAME(0));
> +
> +     /*
> +      * Convert relname to rel OID.
> +      */
> +     relid = get_relid_from_relname(relname);
> +     if (!OidIsValid(relid))
> +         elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist",
> +              relname);
> +
> +     pkattnums = (int16 *) PG_GETARG_POINTER(1);
> +     pknumatts = PG_GETARG_INT16(2);
> +     /*
> +      * There should be one source array key values for each key attnum
> +      */
> +     if (pknumatts == 0)
> +         elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0.");
> +
> +     src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
> +     tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
> +
> +     /*
> +      * Source array is made up of key values that will be used to
> +      * locate the tuple of interest from the local system.
> +      */
> +     src_ndim = ARR_NDIM(src_pkattvals_arry);
> +     src_dim = ARR_DIMS(src_pkattvals_arry);
> +     src_nitems = ArrayGetNItems(src_ndim, src_dim);
> +
> +     /*
> +      * There should be one source array key value for each key attnum
> +      */
> +     if (src_nitems != pknumatts)
> +         elog(ERROR, "dblink_build_sql_insert: source key array length does not match number of key attributes.");
> +
> +     /*
> +      * get array of pointers to c-strings from the input source array
> +      */
> +     src_pkattvals = (char **) palloc(src_nitems * sizeof(char *));
> +     ptr = ARR_DATA_PTR(src_pkattvals_arry);
> +     for (i = 0; i < src_nitems; i++)
> +     {
> +         src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
> +         ptr += INTALIGN(*(int32 *) ptr);
> +     }
> +
> +     /*
> +      * Target array is made up of key values that will be used to
> +      * build the SQL string for use on the remote system.
> +      */
> +     tgt_ndim = ARR_NDIM(tgt_pkattvals_arry);
> +     tgt_dim = ARR_DIMS(tgt_pkattvals_arry);
> +     tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim);
> +
> +     /*
> +      * There should be one target array key value for each key attnum
> +      */
> +     if (tgt_nitems != pknumatts)
> +         elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes.");
> +
> +     /*
> +      * get array of pointers to c-strings from the input target array
> +      */
> +     tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
> +     ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
> +     for (i = 0; i < tgt_nitems; i++)
> +     {
> +         tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
> +         ptr += INTALIGN(*(int32 *) ptr);
> +     }
> +
> +     /*
> +      * Prep work is finally done. Go get the SQL string.
> +      */
> +     sql = get_sql_update(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
> +
> +     /*
> +      * Make it into TEXT for return to the client
> +      */
> +     sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql)));
> +
> +     /*
> +      * And send it
> +      */
> +     PG_RETURN_TEXT_P(sql_text);
> + }
> +
> +
> + /*
> +  * dblink_current_query
> +  * return the current query string
> +  * to allow its use in (among other things)
> +  * rewrite rules
> +  */
> + PG_FUNCTION_INFO_V1(dblink_current_query);
> + Datum
> + dblink_current_query(PG_FUNCTION_ARGS)
> + {
> +     text        *result_text;
> +
> +     result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(debug_query_string)));
> +     PG_RETURN_TEXT_P(result_text);
> + }
> +
> +
> + /*
> +  * dblink_replace_text
> +  * replace all occurences of 'old_sub_str' in 'orig_str'
> +  * with 'new_sub_str' to form 'new_str'
> +  *
> +  * returns 'orig_str' if 'old_sub_str' == '' or 'orig_str' == ''
> +  * otherwise returns 'new_str'
> +  */
> + PG_FUNCTION_INFO_V1(dblink_replace_text);
> + Datum
> + dblink_replace_text(PG_FUNCTION_ARGS)
> + {
> +     text        *left_text;
> +     text        *right_text;
> +     text        *buf_text;
> +     text        *ret_text;
> +     char        *ret_str;
> +     int            curr_posn;
> +     text        *src_text = PG_GETARG_TEXT_P(0);
> +     int            src_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(src_text)));
> +     text        *from_sub_text = PG_GETARG_TEXT_P(1);
> +     int            from_sub_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(from_sub_text)));
> +     text        *to_sub_text = PG_GETARG_TEXT_P(2);
> +     char        *to_sub_str = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(to_sub_text)));
> +     StringInfo    str = makeStringInfo();
> +
> +     if (src_text_len == 0 || from_sub_text_len == 0)
> +         PG_RETURN_TEXT_P(src_text);
> +
> +     buf_text = DatumGetTextPCopy(PointerGetDatum(src_text));
> +     curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text),
PointerGetDatum(from_sub_text)));
> +
> +     while (curr_posn > 0)
> +     {
> +         left_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text), 1,
DatumGetInt32(DirectFunctionCall2(textpos,PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) - 1)); 
> +         right_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text),
DatumGetInt32(DirectFunctionCall2(textpos,PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) +
from_sub_text_len,-1)); 
> +
> +         appendStringInfo(str, DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(left_text))));
> +         appendStringInfo(str, to_sub_str);
> +
> +         pfree(buf_text);
> +         pfree(left_text);
> +         buf_text = right_text;
> +         curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text),
PointerGetDatum(from_sub_text)));
> +     }
> +
> +     appendStringInfo(str, DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(buf_text))));
> +     pfree(buf_text);
> +
> +     ret_str = pstrdup(str->data);
> +     ret_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(ret_str)));
> +
> +     PG_RETURN_TEXT_P(ret_text);
> + }
> +
> +
> + /*************************************************************
>    * internal functions
>    */
>
> ***************
> *** 285,293 ****
> --- 853,1408 ----
>       MemSet(retval, 0, sizeof(dblink_results));
>
>       retval->tup_num = -1;
> +     retval->res_id_index =-1;
>       retval->res = NULL;
>
>       MemoryContextSwitchTo(oldcontext);
>
>       return retval;
>   }
> +
> +
> + /*
> +  * init_dblink_array_results
> +  *     - create an empty dblink_array_results data structure
> +  */
> + dblink_array_results *
> + init_dblink_array_results(MemoryContext fn_mcxt)
> + {
> +     MemoryContext oldcontext;
> +     dblink_array_results *retval;
> +
> +     oldcontext = MemoryContextSwitchTo(fn_mcxt);
> +
> +     retval = (dblink_array_results *) palloc(sizeof(dblink_array_results));
> +     MemSet(retval, 0, sizeof(dblink_array_results));
> +
> +     retval->elem_num = -1;
> +     retval->num_elems = 0;
> +     retval->res = NULL;
> +
> +     MemoryContextSwitchTo(oldcontext);
> +
> +     return retval;
> + }
> +
> + /*
> +  * get_pkey_attnames
> +  *
> +  * Get the primary key attnames for the given relation.
> +  * Return NULL, and set numatts = 0, if no primary key exists.
> +  */
> + char **
> + get_pkey_attnames(Oid relid, int16 *numatts)
> + {
> +     Relation        indexRelation;
> +     ScanKeyData        entry;
> +     HeapScanDesc    scan;
> +     HeapTuple        indexTuple;
> +     int                i;
> +     char            **result = NULL;
> +     Relation        rel;
> +     TupleDesc        tupdesc;
> +
> +     /*
> +      * Open relation using relid, get tupdesc
> +      */
> +     rel = relation_open(relid, AccessShareLock);
> +     tupdesc = rel->rd_att;
> +
> +     /*
> +      * Initialize numatts to 0 in case no primary key
> +      * exists
> +      */
> +     *numatts = 0;
> +
> +     /*
> +      * Use relid to get all related indexes
> +      */
> +     indexRelation = heap_openr(IndexRelationName, AccessShareLock);
> +     ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indrelid,
> +                            F_OIDEQ, ObjectIdGetDatum(relid));
> +     scan = heap_beginscan(indexRelation, false, SnapshotNow,
> +                           1, &entry);
> +
> +     while (HeapTupleIsValid(indexTuple = heap_getnext(scan, 0)))
> +     {
> +         Form_pg_index    index = (Form_pg_index) GETSTRUCT(indexTuple);
> +
> +         /*
> +          * We're only interested if it is the primary key
> +          */
> +         if (index->indisprimary == TRUE)
> +         {
> +             i = 0;
> +             while (index->indkey[i++] != 0)
> +                 (*numatts)++;
> +
> +             if (*numatts > 0)
> +             {
> +                 result = (char **) palloc(*numatts * sizeof(char *));
> +                 for (i = 0; i < *numatts; i++)
> +                     result[i] = SPI_fname(tupdesc, index->indkey[i]);
> +             }
> +             break;
> +         }
> +     }
> +     heap_endscan(scan);
> +     heap_close(indexRelation, AccessShareLock);
> +     relation_close(rel, AccessShareLock);
> +
> +     return result;
> + }
> +
> +
> + /*
> +  * get_strtok
> +  *
> +  * parse input string
> +  * return ord item (0 based)
> +  * based on provided field separator
> +  */
> + char *
> + get_strtok(char *fldtext, char *fldsep, int fldnum)
> + {
> +     int            j = 0;
> +     char        *result;
> +
> +     if (fldnum < 0)
> +     {
> +         elog(ERROR, "get_strtok: field number < 0 not permitted");
> +     }
> +
> +     if (fldsep[0] == '\0')
> +     {
> +         elog(ERROR, "get_strtok: blank field separator not permitted");
> +     }
> +
> +     result = strtok(fldtext, fldsep);
> +     for (j = 1; j < fldnum + 1; j++)
> +     {
> +         result = strtok(NULL, fldsep);
> +         if (result == NULL)
> +             return NULL;
> +     }
> +
> +     return pstrdup(result);
> + }
> +
> + char *
> + get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
> + {
> +     Relation        rel;
> +     char            *relname;
> +     HeapTuple        tuple;
> +     TupleDesc        tupdesc;
> +     int                natts;
> +     StringInfo        str = makeStringInfo();
> +     char            *sql = NULL;
> +     char            *val = NULL;
> +     int16            key;
> +     unsigned int    i;
> +
> +     /*
> +      * Open relation using relid
> +      */
> +     rel = relation_open(relid, AccessShareLock);
> +     relname =  RelationGetRelationName(rel);
> +     tupdesc = rel->rd_att;
> +     natts = tupdesc->natts;
> +
> +     tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
> +
> +     appendStringInfo(str, "INSERT INTO %s(", quote_ident_cstr(relname));
> +     for (i = 0; i < natts; i++)
> +     {
> +         if (i > 0)
> +             appendStringInfo(str, ",");
> +
> +         appendStringInfo(str, NameStr(tupdesc->attrs[i]->attname));
> +     }
> +
> +     appendStringInfo(str, ") VALUES(");
> +
> +     /*
> +      * remember attvals are 1 based
> +      */
> +     for (i = 0; i < natts; i++)
> +     {
> +         if (i > 0)
> +             appendStringInfo(str, ",");
> +
> +         if (tgt_pkattvals != NULL)
> +             key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1);
> +         else
> +             key = -1;
> +
> +         if (key > -1)
> +             val = pstrdup(tgt_pkattvals[key]);
> +         else
> +             val = SPI_getvalue(tuple, tupdesc, i + 1);
> +
> +         if (val != NULL)
> +         {
> +             appendStringInfo(str, quote_literal_cstr(val));
> +             pfree(val);
> +         }
> +         else
> +             appendStringInfo(str, "NULL");
> +     }
> +     appendStringInfo(str, ")");
> +
> +     sql = pstrdup(str->data);
> +     pfree(str->data);
> +     pfree(str);
> +     relation_close(rel, AccessShareLock);
> +
> +     return (sql);
> + }
> +
> + char *
> + get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals)
> + {
> +     Relation        rel;
> +     char            *relname;
> +     TupleDesc        tupdesc;
> +     int                natts;
> +     StringInfo        str = makeStringInfo();
> +     char            *sql = NULL;
> +     char            *val = NULL;
> +     unsigned int    i;
> +
> +     /*
> +      * Open relation using relid
> +      */
> +     rel = relation_open(relid, AccessShareLock);
> +     relname =  RelationGetRelationName(rel);
> +     tupdesc = rel->rd_att;
> +     natts = tupdesc->natts;
> +
> +     appendStringInfo(str, "DELETE FROM %s WHERE ", quote_ident_cstr(relname));
> +     for (i = 0; i < pknumatts; i++)
> +     {
> +         int16    pkattnum = pkattnums[i];
> +
> +         if (i > 0)
> +             appendStringInfo(str, " AND ");
> +
> +         appendStringInfo(str, NameStr(tupdesc->attrs[pkattnum - 1]->attname));
> +
> +         if (tgt_pkattvals != NULL)
> +             val = pstrdup(tgt_pkattvals[i]);
> +         else
> +             elog(ERROR, "Target key array must not be NULL");
> +
> +         if (val != NULL)
> +         {
> +             appendStringInfo(str, "=");
> +             appendStringInfo(str, quote_literal_cstr(val));
> +             pfree(val);
> +         }
> +         else
> +             appendStringInfo(str, "IS NULL");
> +     }
> +
> +     sql = pstrdup(str->data);
> +     pfree(str->data);
> +     pfree(str);
> +     relation_close(rel, AccessShareLock);
> +
> +     return (sql);
> + }
> +
> + char *
> + get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
> + {
> +     Relation        rel;
> +     char            *relname;
> +     HeapTuple        tuple;
> +     TupleDesc        tupdesc;
> +     int                natts;
> +     StringInfo        str = makeStringInfo();
> +     char            *sql = NULL;
> +     char            *val = NULL;
> +     int16            key;
> +     int                i;
> +
> +     /*
> +      * Open relation using relid
> +      */
> +     rel = relation_open(relid, AccessShareLock);
> +     relname =  RelationGetRelationName(rel);
> +     tupdesc = rel->rd_att;
> +     natts = tupdesc->natts;
> +
> +     tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
> +
> +     appendStringInfo(str, "UPDATE %s SET ", quote_ident_cstr(relname));
> +
> +     for (i = 0; i < natts; i++)
> +     {
> +         if (i > 0)
> +             appendStringInfo(str, ",");
> +
> +         appendStringInfo(str, NameStr(tupdesc->attrs[i]->attname));
> +         appendStringInfo(str, "=");
> +
> +         if (tgt_pkattvals != NULL)
> +             key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1);
> +         else
> +             key = -1;
> +
> +         if (key > -1)
> +             val = pstrdup(tgt_pkattvals[key]);
> +         else
> +             val = SPI_getvalue(tuple, tupdesc, i + 1);
> +
> +         if (val != NULL)
> +         {
> +             appendStringInfo(str, quote_literal_cstr(val));
> +             pfree(val);
> +         }
> +         else
> +             appendStringInfo(str, "NULL");
> +     }
> +
> +     appendStringInfo(str, " WHERE ");
> +
> +     for (i = 0; i < pknumatts; i++)
> +     {
> +         int16    pkattnum = pkattnums[i];
> +
> +         if (i > 0)
> +             appendStringInfo(str, " AND ");
> +
> +         appendStringInfo(str, NameStr(tupdesc->attrs[pkattnum - 1]->attname));
> +
> +         if (tgt_pkattvals != NULL)
> +             val = pstrdup(tgt_pkattvals[i]);
> +         else
> +             val = SPI_getvalue(tuple, tupdesc, pkattnum);
> +
> +         if (val != NULL)
> +         {
> +             appendStringInfo(str, "=");
> +             appendStringInfo(str, quote_literal_cstr(val));
> +             pfree(val);
> +         }
> +         else
> +             appendStringInfo(str, "IS NULL");
> +     }
> +
> +     sql = pstrdup(str->data);
> +     pfree(str->data);
> +     pfree(str);
> +     relation_close(rel, AccessShareLock);
> +
> +     return (sql);
> + }
> +
> + /*
> +  * Return a properly quoted literal value.
> +  * Uses quote_literal in quote.c
> +  */
> + static char *
> + quote_literal_cstr(char *rawstr)
> + {
> +     text        *rawstr_text;
> +     text        *result_text;
> +     char        *result;
> +
> +     rawstr_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(rawstr)));
> +     result_text = DatumGetTextP(DirectFunctionCall1(quote_literal, PointerGetDatum(rawstr_text)));
> +     result = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(result_text)));
> +
> +     return result;
> + }
> +
> + /*
> +  * Return a properly quoted identifier.
> +  * Uses quote_ident in quote.c
> +  */
> + static char *
> + quote_ident_cstr(char *rawstr)
> + {
> +     text        *rawstr_text;
> +     text        *result_text;
> +     char        *result;
> +
> +     rawstr_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(rawstr)));
> +     result_text = DatumGetTextP(DirectFunctionCall1(quote_ident, PointerGetDatum(rawstr_text)));
> +     result = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(result_text)));
> +
> +     return result;
> + }
> +
> + int16
> + get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key)
> + {
> +     int        i;
> +
> +     /*
> +      * Not likely a long list anyway, so just scan for
> +      * the value
> +      */
> +     for (i = 0; i < pknumatts; i++)
> +         if (key == pkattnums[i])
> +             return i;
> +
> +     return -1;
> + }
> +
> + HeapTuple
> + get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals)
> + {
> +     Relation        rel;
> +     char            *relname;
> +     TupleDesc        tupdesc;
> +     StringInfo        str = makeStringInfo();
> +     char            *sql = NULL;
> +     int                ret;
> +     HeapTuple        tuple;
> +     int                i;
> +     char            *val = NULL;
> +
> +     /*
> +      * Open relation using relid
> +      */
> +     rel = relation_open(relid, AccessShareLock);
> +     relname =  RelationGetRelationName(rel);
> +     tupdesc = rel->rd_att;
> +
> +     /*
> +      * Connect to SPI manager
> +      */
> +     if ((ret = SPI_connect()) < 0)
> +         elog(ERROR, "get_tuple_of_interest: SPI_connect returned %d", ret);
> +
> +     /*
> +      * Build sql statement to look up tuple of interest
> +      * Use src_pkattvals as the criteria.
> +      */
> +     appendStringInfo(str, "SELECT * from %s WHERE ", relname);
> +
> +     for (i = 0; i < pknumatts; i++)
> +     {
> +         int16    pkattnum = pkattnums[i];
> +
> +         if (i > 0)
> +             appendStringInfo(str, " AND ");
> +
> +         appendStringInfo(str, NameStr(tupdesc->attrs[pkattnum - 1]->attname));
> +
> +         val = pstrdup(src_pkattvals[i]);
> +         if (val != NULL)
> +         {
> +             appendStringInfo(str, "=");
> +             appendStringInfo(str, quote_literal_cstr(val));
> +             pfree(val);
> +         }
> +         else
> +             appendStringInfo(str, "IS NULL");
> +     }
> +
> +     sql = pstrdup(str->data);
> +     pfree(str->data);
> +     pfree(str);
> +     /*
> +      * Retrieve the desired tuple
> +      */
> +     ret = SPI_exec(sql, 0);
> +     pfree(sql);
> +
> +     /*
> +      * Only allow one qualifying tuple
> +      */
> +     if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
> +     {
> +         elog(ERROR, "get_tuple_of_interest: Source criteria may not match more than one record.");
> +     }
> +     else if (ret == SPI_OK_SELECT && SPI_processed == 1)
> +     {
> +         SPITupleTable *tuptable = SPI_tuptable;
> +         tuple = SPI_copytuple(tuptable->vals[0]);
> +
> +         return tuple;
> +     }
> +     else
> +     {
> +         /*
> +          * no qualifying tuples
> +          */
> +         return NULL;
> +     }
> +
> +     /*
> +      * never reached, but keep compiler quiet
> +      */
> +     return NULL;
> + }
> +
> + Oid
> + get_relid_from_relname(char *relname)
> + {
> + #ifdef NamespaceRelationName
> +     Oid                relid;
> +
> +     relid = RelnameGetRelid(relname);
> + #else
> +     Relation        rel;
> +     Oid                relid;
> +
> +     rel = relation_openr(relname, AccessShareLock);
> +     relid = RelationGetRelid(rel);
> +     relation_close(rel, AccessShareLock);
> + #endif   /* NamespaceRelationName */
> +
> +     return relid;
> + }
> +
> + dblink_results    *
> + get_res_ptr(int32 res_id_index)
> + {
> +     List    *ptr;
> +
> +     /*
> +      * short circuit empty list
> +      */
> +     if(res_id == NIL)
> +         return NULL;
> +
> +     /*
> +      * OK, should be good to go
> +      */
> +     foreach(ptr, res_id)
> +     {
> +         dblink_results    *this_res_id = (dblink_results *) lfirst(ptr);
> +         if (this_res_id->res_id_index == res_id_index)
> +             return this_res_id;
> +     }
> +     return NULL;
> + }
> +
> + /*
> +  * Add node to global List res_id
> +  */
> + void
> + append_res_ptr(dblink_results *results)
> + {
> +     res_id = lappend(res_id, results);
> + }
> +
> + /*
> +  * Remove node from global List
> +  * using res_id_index
> +  */
> + void
> + remove_res_ptr(dblink_results *results)
> + {
> +     res_id = lremove(results, res_id);
> +
> +     if (res_id == NIL)
> +         res_id_index = 0;
> + }
> +
> +
> diff -cNr dblink.orig/dblink.h dblink/dblink.h
> *** dblink.orig/dblink.h    Mon Nov  5 09:46:22 2001
> --- dblink/dblink.h    Sun Apr 14 18:54:39 2002
> ***************
> *** 3,9 ****
>    *
>    * Functions returning results from a remote database
>    *
> !  * Copyright (c) Joseph Conway <joe.conway@mail.com>, 2001;
>    *
>    * Permission to use, copy, modify, and distribute this software and its
>    * documentation for any purpose, without fee, and without a written agreement
> --- 3,10 ----
>    *
>    * Functions returning results from a remote database
>    *
> !  * Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002,
> !  * ALL RIGHTS RESERVED;
>    *
>    * Permission to use, copy, modify, and distribute this software and its
>    * documentation for any purpose, without fee, and without a written agreement
> ***************
> *** 33,42 ****
> --- 34,64 ----
>   #include "libpq-int.h"
>   #include "fmgr.h"
>   #include "access/tupdesc.h"
> + #include "access/heapam.h"
> + #include "catalog/catname.h"
> + #include "catalog/pg_index.h"
> + #include "catalog/pg_type.h"
>   #include "executor/executor.h"
> + #include "executor/spi.h"
> + #include "lib/stringinfo.h"
>   #include "nodes/nodes.h"
>   #include "nodes/execnodes.h"
> + #include "nodes/pg_list.h"
> + #include "parser/parse_type.h"
> + #include "tcop/tcopprot.h"
>   #include "utils/builtins.h"
> + #include "utils/fmgroids.h"
> + #include "utils/array.h"
> + #include "utils/syscache.h"
> +
> + #ifdef NamespaceRelationName
> + #include "catalog/namespace.h"
> + #endif   /* NamespaceRelationName */
> +
> + /*
> +  * Max SQL statement size
> +  */
> + #define DBLINK_MAX_SQLSTATE_SIZE        16384
>
>   /*
>    * This struct holds the results of the remote query.
> ***************
> *** 50,70 ****
>       int            tup_num;
>
>       /*
>        * the actual query results
>        */
>       PGresult   *res;
> -
>   }    dblink_results;
>
>   /*
>    * External declarations
>    */
>   extern Datum dblink(PG_FUNCTION_ARGS);
>   extern Datum dblink_tok(PG_FUNCTION_ARGS);
>
>   /*
>    * Internal declarations
>    */
>   dblink_results *init_dblink_results(MemoryContext fn_mcxt);
>
>   #endif   /* DBLINK_H */
> --- 72,145 ----
>       int            tup_num;
>
>       /*
> +      * resource index number for this context
> +      */
> +     int            res_id_index;
> +
> +     /*
>        * the actual query results
>        */
>       PGresult   *res;
>   }    dblink_results;
>
> +
> + /*
> +  * This struct holds results in the form of an array.
> +  * Use fn_extra to hold a pointer to it across calls
> +  */
> + typedef struct
> + {
> +     /*
> +      * elem being accessed
> +      */
> +     int            elem_num;
> +
> +     /*
> +      * number of elems
> +      */
> +     int            num_elems;
> +
> +     /*
> +      * the actual array
> +      */
> +     void        *res;
> +
> + }    dblink_array_results;
> +
>   /*
>    * External declarations
>    */
>   extern Datum dblink(PG_FUNCTION_ARGS);
>   extern Datum dblink_tok(PG_FUNCTION_ARGS);
> + extern Datum dblink_strtok(PG_FUNCTION_ARGS);
> + extern Datum dblink_get_pkey(PG_FUNCTION_ARGS);
> + extern Datum dblink_last_oid(PG_FUNCTION_ARGS);
> + extern Datum dblink_build_sql_insert(PG_FUNCTION_ARGS);
> + extern Datum dblink_build_sql_delete(PG_FUNCTION_ARGS);
> + extern Datum dblink_build_sql_update(PG_FUNCTION_ARGS);
> + extern Datum dblink_current_query(PG_FUNCTION_ARGS);
> + extern Datum dblink_replace_text(PG_FUNCTION_ARGS);
>
>   /*
>    * Internal declarations
>    */
>   dblink_results *init_dblink_results(MemoryContext fn_mcxt);
> + dblink_array_results *init_dblink_array_results(MemoryContext fn_mcxt);
> + char **get_pkey_attnames(Oid relid, int16 *numatts);
> + char *get_strtok(char *fldtext, char *fldsep, int fldnum);
> + char *getvalue(HeapTuple tuple, TupleDesc tupdesc, int fnumber);
> + char *get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
> + char *get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals);
> + char *get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
> + static char *quote_literal_cstr(char *rawstr);
> + static char *quote_ident_cstr(char *rawstr);
> + int16 get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key);
> + HeapTuple get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals);
> + Oid get_relid_from_relname(char *relname);
> + dblink_results    *get_res_ptr(int32 res_id_index);
> + void append_res_ptr(dblink_results *results);
> + void remove_res_ptr(dblink_results *results);
> +
> + extern char    *debug_query_string;
>
>   #endif   /* DBLINK_H */
> diff -cNr dblink.orig/dblink.sql.in dblink/dblink.sql.in
> *** dblink.orig/dblink.sql.in    Thu Jun 14 09:49:03 2001
> --- dblink/dblink.sql.in    Fri Apr 12 14:36:49 2002
> ***************
> *** 1,5 ****
> ! CREATE FUNCTION dblink (text,text) RETURNS setof int
> !   AS 'MODULE_PATHNAME','dblink' LANGUAGE 'c';
>
> ! CREATE FUNCTION dblink_tok (int,int) RETURNS text
> !   AS 'MODULE_PATHNAME','dblink_tok' LANGUAGE 'c';
> --- 1,38 ----
> ! CREATE OR REPLACE FUNCTION dblink (text,text) RETURNS setof int
> !   AS 'MODULE_PATHNAME','dblink' LANGUAGE 'c'
> !   WITH (isstrict);
>
> ! CREATE OR REPLACE FUNCTION dblink_tok (int,int) RETURNS text
> !   AS 'MODULE_PATHNAME','dblink_tok' LANGUAGE 'c'
> !   WITH (isstrict);
> !
> ! CREATE OR REPLACE FUNCTION dblink_strtok (text,text,int) RETURNS text
> !   AS 'MODULE_PATHNAME','dblink_strtok' LANGUAGE 'c'
> !   WITH (iscachable, isstrict);
> !
> ! CREATE OR REPLACE FUNCTION dblink_get_pkey (name) RETURNS setof text
> !   AS 'MODULE_PATHNAME','dblink_get_pkey' LANGUAGE 'c'
> !   WITH (isstrict);
> !
> ! CREATE OR REPLACE FUNCTION dblink_last_oid (int) RETURNS oid
> !   AS 'MODULE_PATHNAME','dblink_last_oid' LANGUAGE 'c'
> !   WITH (isstrict);
> !
> ! CREATE OR REPLACE FUNCTION dblink_build_sql_insert (name, int2vector, int2, _text, _text) RETURNS text
> !   AS 'MODULE_PATHNAME','dblink_build_sql_insert' LANGUAGE 'c'
> !   WITH (isstrict);
> !
> ! CREATE OR REPLACE FUNCTION dblink_build_sql_delete (name, int2vector, int2, _text) RETURNS text
> !   AS 'MODULE_PATHNAME','dblink_build_sql_delete' LANGUAGE 'c'
> !   WITH (isstrict);
> !
> ! CREATE OR REPLACE FUNCTION dblink_build_sql_update (name, int2vector, int2, _text, _text) RETURNS text
> !   AS 'MODULE_PATHNAME','dblink_build_sql_update' LANGUAGE 'c'
> !   WITH (isstrict);
> !
> ! CREATE OR REPLACE FUNCTION dblink_current_query () RETURNS text
> !   AS 'MODULE_PATHNAME','dblink_current_query' LANGUAGE 'c';
> !
> ! CREATE OR REPLACE FUNCTION dblink_replace (text,text,text) RETURNS text
> !   AS 'MODULE_PATHNAME','dblink_replace_text' LANGUAGE 'c'
> !   WITH (iscachable, isstrict);

>
> ---------------------------(end of broadcast)---------------------------
> TIP 5: Have you checked our extensive FAQ?
>
> http://www.postgresql.org/users-lounge/docs/faq.html

--
  Bruce Momjian                        |  http://candle.pha.pa.us
  pgman@candle.pha.pa.us               |  (610) 853-3000
  +  If your life is a hard drive,     |  830 Blythe Avenue
  +  Christ can be your backup.        |  Drexel Hill, Pennsylvania 19026