Re: contrib/dblink update - Mailing list pgsql-patches

From Bruce Momjian
Subject Re: contrib/dblink update
Date
Msg-id 200204240228.g3O2SUj22558@candle.pha.pa.us
Whole thread Raw
In response to contrib/dblink update  (Joe Conway <mail@joeconway.com>)
List pgsql-patches
Patch applied.  Thanks.

---------------------------------------------------------------------------


Joe Conway wrote:
> Attached is an update to contrib/dblink. Please apply if there are no
> objections.
>
> Major changes:
>    - removed cursor wrap around input sql to allow for remote
>      execution of INSERT/UPDATE/DELETE
>    - dblink now returns a resource id instead of a real pointer
>    - added several utility functions
>
> I'm still hoping to add explicit cursor open/fetch/close support before
> 7.3 is released, but I need a bit more time on that.
>
> On a somewhat unrelated topic, I never got any feedback on the
> unknownin/out patch and the mb_substring patch. Is there anything else I
> need to do to get those applied?
>
> Thanks,
>
> Joe

> diff -cNr dblink.orig/README.dblink dblink/README.dblink
> *** dblink.orig/README.dblink    Thu Dec 13 02:48:39 2001
> --- dblink/README.dblink    Sun Apr 14 20:02:06 2002
> ***************
> *** 3,9 ****
>    *
>    * Functions returning results from a remote database
>    *
> !  * Copyright (c) Joseph Conway <joe.conway@mail.com>, 2001;
>    *
>    * Permission to use, copy, modify, and distribute this software and its
>    * documentation for any purpose, without fee, and without a written agreement
> --- 3,10 ----
>    *
>    * Functions returning results from a remote database
>    *
> !  * Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002,
> !  * ALL RIGHTS RESERVED;
>    *
>    * Permission to use, copy, modify, and distribute this software and its
>    * documentation for any purpose, without fee, and without a written agreement
> ***************
> *** 25,36 ****
>    */
>
>
> ! Version 0.3 (14 June, 2001):
> !   Function to test returning data set from remote database
> !   Tested under Linux (Red Hat 6.2 and 7.0) and PostgreSQL 7.1 and 7.2devel
>
>   Release Notes:
>
>     Version 0.3
>       - fixed dblink invalid pointer causing corrupt elog message
>       - fixed dblink_tok improper handling of null results
> --- 26,44 ----
>    */
>
>
> ! Version 0.4 (7 April, 2002):
> !   Functions allowing remote database INSERT/UPDATE/DELETE/SELECT, and
> !   various utility functions.
> !   Tested under Linux (Red Hat 7.2) and PostgreSQL 7.2 and 7.3devel
>
>   Release Notes:
>
> +   Version 0.4
> +     - removed cursor wrap around input sql to allow for remote
> +       execution of INSERT/UPDATE/DELETE
> +     - dblink now returns a resource id instead of a real pointer
> +     - added several utility functions -- see below
> +
>     Version 0.3
>       - fixed dblink invalid pointer causing corrupt elog message
>       - fixed dblink_tok improper handling of null results
> ***************
> *** 51,64 ****
>
>     installs following functions into database template1:
>
> !      dblink() - returns a pointer to results from remote query
> !      dblink_tok() - extracts and returns individual field results
>
>   Documentation
>   ==================================================================
>   Name
>
> ! dblink -- Returns a pointer to a data set from a remote database
>
>   Synopsis
>
> --- 59,94 ----
>
>     installs following functions into database template1:
>
> !      dblink(text,text) RETURNS setof int
> !        - returns a resource id for results from remote query
> !      dblink_tok(int,int) RETURNS text
> !        - extracts and returns individual field results
> !      dblink_strtok(text,text,int) RETURNS text
> !        - extracts and returns individual token from delimited text
> !      dblink_get_pkey(name) RETURNS setof text
> !        - returns the field names of a relation's primary key fields
> !      dblink_last_oid(int) RETURNS oid
> !        - returns the last inserted oid
> !      dblink_build_sql_insert(name,int2vector,int2,_text,_text) RETURNS text
> !        - builds an insert statement using a local tuple, replacing the
> !          selection key field values with alternate supplied values
> !      dblink_build_sql_delete(name,int2vector,int2,_text) RETURNS text
> !        - builds a delete statement using supplied values for selection
> !          key field values
> !      dblink_build_sql_update(name,int2vector,int2,_text,_text) RETURNS text
> !        - builds an update statement using a local tuple, replacing the
> !          selection key field values with alternate supplied values
> !      dblink_current_query() RETURNS text
> !        - returns the current query string
> !      dblink_replace(text,text,text) RETURNS text
> !        - replace all occurences of substring-a in the input-string
> !          with substring-b
>
>   Documentation
>   ==================================================================
>   Name
>
> ! dblink -- Returns a resource id for a data set from a remote database
>
>   Synopsis
>
> ***************
> *** 78,84 ****
>
>   Outputs
>
> !   Returns setof int (pointer)
>
>   Example usage
>
> --- 108,114 ----
>
>   Outputs
>
> !   Returns setof int (res_id)
>
>   Example usage
>
> ***************
> *** 94,106 ****
>
>   Synopsis
>
> ! dblink_tok(int pointer, int fnumber)
>
>   Inputs
>
> !   pointer
>
> !     a pointer returned by a call to dblink()
>
>     fnumber
>
> --- 124,136 ----
>
>   Synopsis
>
> ! dblink_tok(int res_id, int fnumber)
>
>   Inputs
>
> !   res_id
>
> !     a resource id returned by a call to dblink()
>
>     fnumber
>
> ***************
> *** 131,136 ****
> --- 161,415 ----
>      select f1, f2 from myremotetable where f1 like 'bytea%';
>
>   ==================================================================
> + Name
> +
> + dblink_strtok -- Extracts and returns individual token from delimited text
> +
> + Synopsis
> +
> + dblink_strtok(text inputstring, text delimiter, int posn) RETURNS text
> +
> + Inputs
> +
> +   inputstring
> +
> +     any string you want to parse a token out of;
> +     e.g. 'f=1&g=3&h=4'
> +
> +   delimiter
> +
> +     a single character to use as the delimiter;
> +     e.g. '&' or '='
> +
> +   posn
> +
> +     the position of the token of interest, 0 based;
> +     e.g. 1
> +
> + Outputs
> +
> +   Returns text
> +
> + Example usage
> +
> + test=# select dblink_strtok(dblink_strtok('f=1&g=3&h=4','&',1),'=',1);
> +  dblink_strtok
> + ---------------
> +  3
> + (1 row)
> +
> + ==================================================================
> + Name
> +
> + dblink_get_pkey -- returns the field names of a relation's primary
> +                    key fields
> +
> + Synopsis
> +
> + dblink_get_pkey(name relname) RETURNS setof text
> +
> + Inputs
> +
> +   relname
> +
> +     any relation name;
> +     e.g. 'foobar'
> +
> + Outputs
> +
> +   Returns setof text -- one row for each primary key field, in order of
> +                         precedence
> +
> + Example usage
> +
> + test=# select dblink_get_pkey('foobar');
> +  dblink_get_pkey
> + -----------------
> +  f1
> +  f2
> +  f3
> +  f4
> +  f5
> + (5 rows)
> +
> +
> + ==================================================================
> + Name
> +
> + dblink_last_oid -- Returns last inserted oid
> +
> + Synopsis
> +
> + dblink_last_oid(int res_id) RETURNS oid
> +
> + Inputs
> +
> +   res_id
> +
> +     any resource id returned by dblink function;
> +
> + Outputs
> +
> +   Returns oid of last inserted tuple
> +
> + Example usage
> +
> + test=# select dblink_last_oid(dblink('hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd'
> +                ,'insert into mytable (f1, f2) values (1,2)'));
> +
> +  dblink_last_oid
> + ----------------
> +  16553
> + (1 row)
> +
> +
> + ==================================================================
> + Name
> +
> + dblink_build_sql_insert -- builds an insert statement using a local
> +                            tuple, replacing the selection key field
> +                            values with alternate supplied values
> + dblink_build_sql_delete -- builds a delete statement using supplied
> +                            values for selection key field values
> + dblink_build_sql_update -- builds an update statement using a local
> +                            tuple, replacing the selection key field
> +                            values with alternate supplied values
> +
> +
> + Synopsis
> +
> + dblink_build_sql_insert(name relname
> +                          ,int2vector primary_key_attnums
> +                          ,int2 num_primary_key_atts
> +                          ,_text src_pk_att_vals_array
> +                          ,_text tgt_pk_att_vals_array) RETURNS text
> + dblink_build_sql_delete(name relname
> +                          ,int2vector primary_key_attnums
> +                          ,int2 num_primary_key_atts
> +                          ,_text tgt_pk_att_vals_array) RETURNS text
> + dblink_build_sql_update(name relname
> +                          ,int2vector primary_key_attnums
> +                          ,int2 num_primary_key_atts
> +                          ,_text src_pk_att_vals_array
> +                          ,_text tgt_pk_att_vals_array) RETURNS text
> +
> + Inputs
> +
> +   relname
> +
> +     any relation name;
> +     e.g. 'foobar'
> +
> +   primary_key_attnums
> +
> +     vector of primary key attnums (1 based, see pg_index.indkey);
> +     e.g. '1 2'
> +
> +   num_primary_key_atts
> +
> +     number of primary key attnums in the vector; e.g. 2
> +
> +   src_pk_att_vals_array
> +
> +     array of primary key values, used to look up the local matching
> +     tuple, the values of which are then used to construct the SQL
> +     statement
> +
> +   tgt_pk_att_vals_array
> +
> +     array of primary key values, used to replace the local tuple
> +     values in the SQL statement
> +
> + Outputs
> +
> +   Returns text -- requested SQL statement
> +
> + Example usage
> +
> + test=# select dblink_build_sql_insert('foo','1 2',2,'{"1", "a"}','{"1", "b''a"}');
> +              dblink_build_sql_insert
> + --------------------------------------------------
> +  INSERT INTO foo(f1,f2,f3) VALUES('1','b''a','1')
> + (1 row)
> +
> + test=# select dblink_build_sql_delete('MyFoo','1 2',2,'{"1", "b"}');
> +            dblink_build_sql_delete
> + ---------------------------------------------
> +  DELETE FROM "MyFoo" WHERE f1='1' AND f2='b'
> + (1 row)
> +
> + test=# select dblink_build_sql_update('foo','1 2',2,'{"1", "a"}','{"1", "b"}');
> +                    dblink_build_sql_update
> + -------------------------------------------------------------
> +  UPDATE foo SET f1='1',f2='b',f3='1' WHERE f1='1' AND f2='b'
> + (1 row)
> +
> +
> + ==================================================================
> + Name
> +
> + dblink_current_query -- returns the current query string
> +
> + Synopsis
> +
> + dblink_current_query () RETURNS text
> +
> + Inputs
> +
> +   None
> +
> + Outputs
> +
> +   Returns text -- a copy of the currently executing query
> +
> + Example usage
> +
> + test=# select dblink_current_query() from (select dblink('dbname=template1','select oid, proname from pg_proc where
proname= ''byteacat''') as f1) as t1; 
> +                                                                 dblink_current_query
> +
-----------------------------------------------------------------------------------------------------------------------------------------------------
> +  select dblink_current_query() from (select dblink('dbname=template1','select oid, proname from pg_proc where
proname= ''byteacat''') as f1) as t1; 
> + (1 row)
> +
> +
> + ==================================================================
> + Name
> +
> + dblink_replace -- replace all occurences of substring-a in the
> +                   input-string with substring-b
> +
> + Synopsis
> +
> + dblink_replace(text input-string, text substring-a, text substring-b) RETURNS text
> +
> + Inputs
> +
> +   input-string
> +
> +     the starting string, before replacement of substring-a
> +
> +   substring-a
> +
> +     the substring to find and replace
> +
> +   substring-b
> +
> +     the substring to be substituted in place of substring-a
> +
> + Outputs
> +
> +   Returns text -- a copy of the starting string, but with all occurences of
> +                   substring-a replaced with substring-b
> +
> + Example usage
> +
> + test=# select dblink_replace('12345678901234567890','56','hello');
> +        dblink_replace
> + ----------------------------
> +  1234hello78901234hello7890
> + (1 row)
> +
> + ==================================================================
> +
>
>   -- Joe Conway
>
> diff -cNr dblink.orig/dblink.c dblink/dblink.c
> *** dblink.orig/dblink.c    Wed Oct 24 22:49:19 2001
> --- dblink/dblink.c    Sun Apr 14 20:03:30 2002
> ***************
> *** 3,9 ****
>    *
>    * Functions returning results from a remote database
>    *
> !  * Copyright (c) Joseph Conway <joe.conway@mail.com>, 2001;
>    *
>    * Permission to use, copy, modify, and distribute this software and its
>    * documentation for any purpose, without fee, and without a written agreement
> --- 3,10 ----
>    *
>    * Functions returning results from a remote database
>    *
> !  * Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002,
> !  * ALL RIGHTS RESERVED;
>    *
>    * Permission to use, copy, modify, and distribute this software and its
>    * documentation for any purpose, without fee, and without a written agreement
> ***************
> *** 26,48 ****
>
>   #include "dblink.h"
>
>   PG_FUNCTION_INFO_V1(dblink);
>   Datum
>   dblink(PG_FUNCTION_ARGS)
>   {
> !     PGconn       *conn = NULL;
> !     PGresult   *res = NULL;
> !     dblink_results *results;
> !     char       *optstr;
> !     char       *sqlstatement;
> !     char       *curstr = "DECLARE mycursor CURSOR FOR ";
> !     char       *execstatement;
> !     char       *msg;
> !     int            ntuples = 0;
> !     ReturnSetInfo *rsi;
> !
> !     if (PG_ARGISNULL(0) || PG_ARGISNULL(1))
> !         elog(ERROR, "dblink: NULL arguments are not permitted");
>
>       if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo))
>           elog(ERROR, "dblink: function called in context that does not accept a set result");
> --- 27,49 ----
>
>   #include "dblink.h"
>
> + /* Global */
> + List    *res_id = NIL;
> + int        res_id_index = 0;
> +
>   PG_FUNCTION_INFO_V1(dblink);
>   Datum
>   dblink(PG_FUNCTION_ARGS)
>   {
> !     PGconn            *conn = NULL;
> !     PGresult        *res = NULL;
> !     dblink_results    *results;
> !     char            *optstr;
> !     char            *sqlstatement;
> !     char            *execstatement;
> !     char            *msg;
> !     int                ntuples = 0;
> !     ReturnSetInfo    *rsi;
>
>       if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo))
>           elog(ERROR, "dblink: function called in context that does not accept a set result");
> ***************
> *** 61,81 ****
>               elog(ERROR, "dblink: connection error: %s", msg);
>           }
>
> !         res = PQexec(conn, "BEGIN");
> !         if (PQresultStatus(res) != PGRES_COMMAND_OK)
> !         {
> !             msg = pstrdup(PQerrorMessage(conn));
> !             PQclear(res);
> !             PQfinish(conn);
> !             elog(ERROR, "dblink: begin error: %s", msg);
> !         }
> !         PQclear(res);
> !
> !         execstatement = (char *) palloc(strlen(curstr) + strlen(sqlstatement) + 1);
>           if (execstatement != NULL)
>           {
> !             strcpy(execstatement, curstr);
> !             strcat(execstatement, sqlstatement);
>               strcat(execstatement, "\0");
>           }
>           else
> --- 62,71 ----
>               elog(ERROR, "dblink: connection error: %s", msg);
>           }
>
> !         execstatement = (char *) palloc(strlen(sqlstatement) + 1);
>           if (execstatement != NULL)
>           {
> !             strcpy(execstatement, sqlstatement);
>               strcat(execstatement, "\0");
>           }
>           else
> ***************
> *** 94,163 ****
>               /*
>                * got results, start fetching them
>                */
> -             PQclear(res);
> -
> -             res = PQexec(conn, "FETCH ALL in mycursor");
> -             if (!res || PQresultStatus(res) != PGRES_TUPLES_OK)
> -             {
> -                 msg = pstrdup(PQerrorMessage(conn));
> -                 PQclear(res);
> -                 PQfinish(conn);
> -                 elog(ERROR, "dblink: sql error: %s", msg);
> -             }
> -
>               ntuples = PQntuples(res);
>
> !             if (ntuples > 0)
> !             {
> !
> !                 results = init_dblink_results(fcinfo->flinfo->fn_mcxt);
> !                 results->tup_num = 0;
> !                 results->res = res;
> !                 res = NULL;
> !
> !                 fcinfo->flinfo->fn_extra = (void *) results;
> !
> !                 results = NULL;
> !                 results = fcinfo->flinfo->fn_extra;
> !
> !                 /* close the cursor */
> !                 res = PQexec(conn, "CLOSE mycursor");
> !                 PQclear(res);
> !
> !                 /* commit the transaction */
> !                 res = PQexec(conn, "COMMIT");
> !                 PQclear(res);
> !
> !                 /* close the connection to the database and cleanup */
> !                 PQfinish(conn);
> !
> !                 rsi = (ReturnSetInfo *) fcinfo->resultinfo;
> !                 rsi->isDone = ExprMultipleResult;
> !
> !                 PG_RETURN_POINTER(results);
> !
> !             }
> !             else
> !             {
>
> !                 PQclear(res);
>
> !                 /* close the cursor */
> !                 res = PQexec(conn, "CLOSE mycursor");
> !                 PQclear(res);
>
> !                 /* commit the transaction */
> !                 res = PQexec(conn, "COMMIT");
> !                 PQclear(res);
>
> !                 /* close the connection to the database and cleanup */
> !                 PQfinish(conn);
>
> !                 rsi = (ReturnSetInfo *) fcinfo->resultinfo;
> !                 rsi->isDone = ExprEndResult;
>
> !                 PG_RETURN_NULL();
> !             }
>           }
>       }
>       else
> --- 84,119 ----
>               /*
>                * got results, start fetching them
>                */
>               ntuples = PQntuples(res);
>
> !             /*
> !              * increment resource index
> !              */
> !             res_id_index++;
>
> !             results = init_dblink_results(fcinfo->flinfo->fn_mcxt);
> !             results->tup_num = 0;
> !             results->res_id_index = res_id_index;
> !             results->res = res;
>
> !             /*
> !              * Append node to res_id to hold pointer to results.
> !              * Needed by dblink_tok to access the data
> !              */
> !             append_res_ptr(results);
>
> !             /*
> !              * save pointer to results for the next function manager call
> !              */
> !             fcinfo->flinfo->fn_extra = (void *) results;
>
> !             /* close the connection to the database and cleanup */
> !             PQfinish(conn);
>
> !             rsi = (ReturnSetInfo *) fcinfo->resultinfo;
> !             rsi->isDone = ExprMultipleResult;
>
> !             PG_RETURN_INT32(res_id_index);
>           }
>       }
>       else
> ***************
> *** 165,173 ****
>           /*
>            * check for more results
>            */
> -
>           results = fcinfo->flinfo->fn_extra;
>           results->tup_num++;
>           ntuples = PQntuples(results->res);
>
>           if (results->tup_num < ntuples)
> --- 121,130 ----
>           /*
>            * check for more results
>            */
>           results = fcinfo->flinfo->fn_extra;
> +
>           results->tup_num++;
> +         res_id_index = results->res_id_index;
>           ntuples = PQntuples(results->res);
>
>           if (results->tup_num < ntuples)
> ***************
> *** 179,196 ****
>               rsi = (ReturnSetInfo *) fcinfo->resultinfo;
>               rsi->isDone = ExprMultipleResult;
>
> !             PG_RETURN_POINTER(results);
> !
>           }
>           else
>           {
>               /*
>                * or if no more, clean things up
>                */
> -
>               results = fcinfo->flinfo->fn_extra;
>
>               PQclear(results->res);
>
>               rsi = (ReturnSetInfo *) fcinfo->resultinfo;
>               rsi->isDone = ExprEndResult;
> --- 136,154 ----
>               rsi = (ReturnSetInfo *) fcinfo->resultinfo;
>               rsi->isDone = ExprMultipleResult;
>
> !             PG_RETURN_INT32(res_id_index);
>           }
>           else
>           {
>               /*
>                * or if no more, clean things up
>                */
>               results = fcinfo->flinfo->fn_extra;
>
> +             remove_res_ptr(results);
>               PQclear(results->res);
> +             pfree(results);
> +             fcinfo->flinfo->fn_extra = NULL;
>
>               rsi = (ReturnSetInfo *) fcinfo->resultinfo;
>               rsi->isDone = ExprEndResult;
> ***************
> *** 214,249 ****
>   dblink_tok(PG_FUNCTION_ARGS)
>   {
>       dblink_results *results;
> !     int            fldnum;
> !     text       *result_text;
> !     char       *result;
> !     int            nfields = 0;
> !     int            text_len = 0;
> !
> !     if (PG_ARGISNULL(0) || PG_ARGISNULL(1))
> !         elog(ERROR, "dblink: NULL arguments are not permitted");
>
> !     results = (dblink_results *) PG_GETARG_POINTER(0);
>       if (results == NULL)
> !         elog(ERROR, "dblink: function called with invalid result pointer");
>
>       fldnum = PG_GETARG_INT32(1);
>       if (fldnum < 0)
> !         elog(ERROR, "dblink: field number < 0 not permitted");
>
>       nfields = PQnfields(results->res);
>       if (fldnum > (nfields - 1))
> !         elog(ERROR, "dblink: field number %d does not exist", fldnum);
>
>       if (PQgetisnull(results->res, results->tup_num, fldnum) == 1)
> -     {
> -
>           PG_RETURN_NULL();
> -
> -     }
>       else
>       {
> -
>           text_len = PQgetlength(results->res, results->tup_num, fldnum);
>
>           result = (char *) palloc(text_len + 1);
> --- 172,208 ----
>   dblink_tok(PG_FUNCTION_ARGS)
>   {
>       dblink_results *results;
> !     int                fldnum;
> !     text            *result_text;
> !     char            *result;
> !     int                nfields = 0;
> !     int                text_len = 0;
>
> !     results = get_res_ptr(PG_GETARG_INT32(0));
>       if (results == NULL)
> !     {
> !         if (res_id != NIL)
> !         {
> !             freeList(res_id);
> !             res_id = NIL;
> !             res_id_index = 0;
> !         }
> !
> !         elog(ERROR, "dblink_tok: function called with invalid resource id");
> !     }
>
>       fldnum = PG_GETARG_INT32(1);
>       if (fldnum < 0)
> !         elog(ERROR, "dblink_tok: field number < 0 not permitted");
>
>       nfields = PQnfields(results->res);
>       if (fldnum > (nfields - 1))
> !         elog(ERROR, "dblink_tok: field number %d does not exist", fldnum);
>
>       if (PQgetisnull(results->res, results->tup_num, fldnum) == 1)
>           PG_RETURN_NULL();
>       else
>       {
>           text_len = PQgetlength(results->res, results->tup_num, fldnum);
>
>           result = (char *) palloc(text_len + 1);
> ***************
> *** 259,270 ****
> --- 218,838 ----
>           result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result)));
>
>           PG_RETURN_TEXT_P(result_text);
> +     }
> + }
> +
> +
> + /*
> +  * dblink_strtok
> +  * parse input string
> +  * return ord item (0 based)
> +  * based on provided field separator
> +  */
> + PG_FUNCTION_INFO_V1(dblink_strtok);
> + Datum
> + dblink_strtok(PG_FUNCTION_ARGS)
> + {
> +     char        *fldtext;
> +     char        *fldsep;
> +     int            fldnum;
> +     char        *buffer;
> +     text        *result_text;
> +
> +     fldtext = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0))));
> +     fldsep = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1))));
> +     fldnum = PG_GETARG_INT32(2);
> +
> +     if (fldtext[0] == '\0')
> +     {
> +         elog(ERROR, "get_strtok: blank list not permitted");
> +     }
> +     if (fldsep[0] == '\0')
> +     {
> +         elog(ERROR, "get_strtok: blank field separator not permitted");
> +     }
>
> +     buffer = get_strtok(fldtext, fldsep, fldnum);
> +
> +     pfree(fldtext);
> +     pfree(fldsep);
> +
> +     if (buffer == NULL)
> +     {
> +         PG_RETURN_NULL();
> +     }
> +     else
> +     {
> +         result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(buffer)));
> +         pfree(buffer);
> +
> +         PG_RETURN_TEXT_P(result_text);
>       }
>   }
>
>
>   /*
> +  * dblink_get_pkey
> +  *
> +  * Return comma delimited list of primary key
> +  * fields for the supplied relation,
> +  * or NULL if none exists.
> +  */
> + PG_FUNCTION_INFO_V1(dblink_get_pkey);
> + Datum
> + dblink_get_pkey(PG_FUNCTION_ARGS)
> + {
> +     char                    *relname;
> +     Oid                        relid;
> +     char                    **result;
> +     text                    *result_text;
> +     int16                    numatts;
> +     ReturnSetInfo            *rsi;
> +     dblink_array_results    *ret_set;
> +
> +     if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo))
> +         elog(ERROR, "dblink: function called in context that does not accept a set result");
> +
> +     if (fcinfo->flinfo->fn_extra == NULL)
> +     {
> +         relname = NameStr(*PG_GETARG_NAME(0));
> +
> +         /*
> +          * Convert relname to rel OID.
> +          */
> +         relid = get_relid_from_relname(relname);
> +         if (!OidIsValid(relid))
> +             elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist",
> +                  relname);
> +
> +         /*
> +          * get an array of attnums.
> +          */
> +         result = get_pkey_attnames(relid, &numatts);
> +
> +         if ((result != NULL) && (numatts > 0))
> +         {
> +             ret_set = init_dblink_array_results(fcinfo->flinfo->fn_mcxt);
> +
> +             ret_set->elem_num = 0;
> +             ret_set->num_elems = numatts;
> +             ret_set->res = result;
> +
> +             fcinfo->flinfo->fn_extra = (void *) ret_set;
> +
> +             rsi = (ReturnSetInfo *) fcinfo->resultinfo;
> +             rsi->isDone = ExprMultipleResult;
> +
> +             result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num])));
> +
> +             PG_RETURN_TEXT_P(result_text);
> +         }
> +         else
> +         {
> +             rsi = (ReturnSetInfo *) fcinfo->resultinfo;
> +             rsi->isDone = ExprEndResult;
> +
> +             PG_RETURN_NULL();
> +         }
> +     }
> +     else
> +     {
> +         /*
> +          * check for more results
> +          */
> +         ret_set = fcinfo->flinfo->fn_extra;
> +         ret_set->elem_num++;
> +         result = ret_set->res;
> +
> +         if (ret_set->elem_num < ret_set->num_elems)
> +         {
> +             /*
> +              * fetch next one
> +              */
> +             rsi = (ReturnSetInfo *) fcinfo->resultinfo;
> +             rsi->isDone = ExprMultipleResult;
> +
> +             result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num])));
> +             PG_RETURN_TEXT_P(result_text);
> +         }
> +         else
> +         {
> +             int        i;
> +
> +             /*
> +              * or if no more, clean things up
> +              */
> +             for (i = 0; i < ret_set->num_elems; i++)
> +                 pfree(result[i]);
> +
> +             pfree(ret_set->res);
> +             pfree(ret_set);
> +
> +             rsi = (ReturnSetInfo *) fcinfo->resultinfo;
> +             rsi->isDone = ExprEndResult;
> +
> +             PG_RETURN_NULL();
> +         }
> +     }
> +     PG_RETURN_NULL();
> + }
> +
> +
> + /*
> +  * dblink_last_oid
> +  * return last inserted oid
> +  */
> + PG_FUNCTION_INFO_V1(dblink_last_oid);
> + Datum
> + dblink_last_oid(PG_FUNCTION_ARGS)
> + {
> +     dblink_results *results;
> +
> +     results = get_res_ptr(PG_GETARG_INT32(0));
> +     if (results == NULL)
> +     {
> +         if (res_id != NIL)
> +         {
> +             freeList(res_id);
> +             res_id = NIL;
> +             res_id_index = 0;
> +         }
> +
> +         elog(ERROR, "dblink_tok: function called with invalid resource id");
> +     }
> +
> +     PG_RETURN_OID(PQoidValue(results->res));
> + }
> +
> +
> + /*
> +  * dblink_build_sql_insert
> +  *
> +  * Used to generate an SQL insert statement
> +  * based on an existing tuple in a local relation.
> +  * This is useful for selectively replicating data
> +  * to another server via dblink.
> +  *
> +  * API:
> +  * <relname> - name of local table of interest
> +  * <pkattnums> - an int2vector of attnums which will be used
> +  * to identify the local tuple of interest
> +  * <pknumatts> - number of attnums in pkattnums
> +  * <src_pkattvals_arry> - text array of key values which will be used
> +  * to identify the local tuple of interest
> +  * <tgt_pkattvals_arry> - text array of key values which will be used
> +  * to build the string for execution remotely. These are substituted
> +  * for their counterparts in src_pkattvals_arry
> +  */
> + PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
> + Datum
> + dblink_build_sql_insert(PG_FUNCTION_ARGS)
> + {
> +     Oid            relid;
> +     char        *relname;
> +     int16        *pkattnums;
> +     int16        pknumatts;
> +     char        **src_pkattvals;
> +     char        **tgt_pkattvals;
> +     ArrayType    *src_pkattvals_arry;
> +     ArrayType    *tgt_pkattvals_arry;
> +     int            src_ndim;
> +     int            *src_dim;
> +     int            src_nitems;
> +     int            tgt_ndim;
> +     int            *tgt_dim;
> +     int            tgt_nitems;
> +     int            i;
> +     char        *ptr;
> +     char        *sql;
> +     text        *sql_text;
> +
> +     relname = NameStr(*PG_GETARG_NAME(0));
> +
> +     /*
> +      * Convert relname to rel OID.
> +      */
> +     relid = get_relid_from_relname(relname);
> +     if (!OidIsValid(relid))
> +         elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist",
> +              relname);
> +
> +     pkattnums = (int16 *) PG_GETARG_POINTER(1);
> +     pknumatts = PG_GETARG_INT16(2);
> +     /*
> +      * There should be at least one key attribute
> +      */
> +     if (pknumatts == 0)
> +         elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0.");
> +
> +     src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
> +     tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
> +
> +     /*
> +      * Source array is made up of key values that will be used to
> +      * locate the tuple of interest from the local system.
> +      */
> +     src_ndim = ARR_NDIM(src_pkattvals_arry);
> +     src_dim = ARR_DIMS(src_pkattvals_arry);
> +     src_nitems = ArrayGetNItems(src_ndim, src_dim);
> +
> +     /*
> +      * There should be one source array key value for each key attnum
> +      */
> +     if (src_nitems != pknumatts)
> +         elog(ERROR, "dblink_build_sql_insert: source key array length does not match number of key attributes.");
> +
> +     /*
> +      * get array of pointers to c-strings from the input source array
> +      */
> +     src_pkattvals = (char **) palloc(src_nitems * sizeof(char *));
> +     ptr = ARR_DATA_PTR(src_pkattvals_arry);
> +     for (i = 0; i < src_nitems; i++)
> +     {
> +         src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
> +         ptr += INTALIGN(*(int32 *) ptr);
> +     }
> +
> +     /*
> +      * Target array is made up of key values that will be used to
> +      * build the SQL string for use on the remote system.
> +      */
> +     tgt_ndim = ARR_NDIM(tgt_pkattvals_arry);
> +     tgt_dim = ARR_DIMS(tgt_pkattvals_arry);
> +     tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim);
> +
> +     /*
> +      * There should be one target array key value for each key attnum
> +      */
> +     if (tgt_nitems != pknumatts)
> +         elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes.");
> +
> +     /*
> +      * get array of pointers to c-strings from the input target array
> +      */
> +     tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
> +     ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
> +     for (i = 0; i < tgt_nitems; i++)
> +     {
> +         tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
> +         ptr += INTALIGN(*(int32 *) ptr);
> +     }
> +
> +     /*
> +      * Prep work is finally done. Go get the SQL string.
> +      */
> +     sql = get_sql_insert(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
> +
> +     /*
> +      * Make it into TEXT for return to the client
> +      */
> +     sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql)));
> +
> +     /*
> +      * And send it
> +      */
> +     PG_RETURN_TEXT_P(sql_text);
> + }
> +
> +
> + /*
> +  * dblink_build_sql_delete
> +  *
> +  * Used to generate an SQL delete statement.
> +  * This is useful for selectively replicating a
> +  * delete to another server via dblink.
> +  *
> +  * API:
> +  * <relname> - name of remote table of interest
> +  * <pkattnums> - an int2vector of attnums which will be used
> +  * to identify the remote tuple of interest
> +  * <pknumatts> - number of attnums in pkattnums
> +  * <tgt_pkattvals_arry> - text array of key values which will be used
> +  * to build the string for execution remotely.
> +  */
> + PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
> + Datum
> + dblink_build_sql_delete(PG_FUNCTION_ARGS)
> + {
> +     Oid            relid;
> +     char        *relname;
> +     int16        *pkattnums;
> +     int16        pknumatts;
> +     char        **tgt_pkattvals;
> +     ArrayType    *tgt_pkattvals_arry;
> +     int            tgt_ndim;
> +     int            *tgt_dim;
> +     int            tgt_nitems;
> +     int            i;
> +     char        *ptr;
> +     char        *sql;
> +     text        *sql_text;
> +
> +     relname = NameStr(*PG_GETARG_NAME(0));
> +
> +     /*
> +      * Convert relname to rel OID.
> +      */
> +     relid = get_relid_from_relname(relname);
> +     if (!OidIsValid(relid))
> +         elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist",
> +              relname);
> +
> +     pkattnums = (int16 *) PG_GETARG_POINTER(1);
> +     pknumatts = PG_GETARG_INT16(2);
> +     /*
> +      * There should be at least one key attribute
> +      */
> +     if (pknumatts == 0)
> +         elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0.");
> +
> +     tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
> +
> +     /*
> +      * Target array is made up of key values that will be used to
> +      * build the SQL string for use on the remote system.
> +      */
> +     tgt_ndim = ARR_NDIM(tgt_pkattvals_arry);
> +     tgt_dim = ARR_DIMS(tgt_pkattvals_arry);
> +     tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim);
> +
> +     /*
> +      * There should be one target array key value for each key attnum
> +      */
> +     if (tgt_nitems != pknumatts)
> +         elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes.");
> +
> +     /*
> +      * get array of pointers to c-strings from the input target array
> +      */
> +     tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
> +     ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
> +     for (i = 0; i < tgt_nitems; i++)
> +     {
> +         tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
> +         ptr += INTALIGN(*(int32 *) ptr);
> +     }
> +
> +     /*
> +      * Prep work is finally done. Go get the SQL string.
> +      */
> +     sql = get_sql_delete(relid, pkattnums, pknumatts, tgt_pkattvals);
> +
> +     /*
> +      * Make it into TEXT for return to the client
> +      */
> +     sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql)));
> +
> +     /*
> +      * And send it
> +      */
> +     PG_RETURN_TEXT_P(sql_text);
> + }
> +
> +
> + /*
> +  * dblink_build_sql_update
> +  *
> +  * Used to generate an SQL update statement
> +  * based on an existing tuple in a local relation.
> +  * This is useful for selectively replicating data
> +  * to another server via dblink.
> +  *
> +  * API:
> +  * <relname> - name of local table of interest
> +  * <pkattnums> - an int2vector of attnums which will be used
> +  * to identify the local tuple of interest
> +  * <pknumatts> - number of attnums in pkattnums
> +  * <src_pkattvals_arry> - text array of key values which will be used
> +  * to identify the local tuple of interest
> +  * <tgt_pkattvals_arry> - text array of key values which will be used
> +  * to build the string for execution remotely. These are substituted
> +  * for their counterparts in src_pkattvals_arry
> +  */
> + PG_FUNCTION_INFO_V1(dblink_build_sql_update);
> + Datum
> + dblink_build_sql_update(PG_FUNCTION_ARGS)
> + {
> +     Oid            relid;
> +     char        *relname;
> +     int16        *pkattnums;
> +     int16        pknumatts;
> +     char        **src_pkattvals;
> +     char        **tgt_pkattvals;
> +     ArrayType    *src_pkattvals_arry;
> +     ArrayType    *tgt_pkattvals_arry;
> +     int            src_ndim;
> +     int            *src_dim;
> +     int            src_nitems;
> +     int            tgt_ndim;
> +     int            *tgt_dim;
> +     int            tgt_nitems;
> +     int            i;
> +     char        *ptr;
> +     char        *sql;
> +     text        *sql_text;
> +
> +     relname = NameStr(*PG_GETARG_NAME(0));
> +
> +     /*
> +      * Convert relname to rel OID.
> +      */
> +     relid = get_relid_from_relname(relname);
> +     if (!OidIsValid(relid))
> +         elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist",
> +              relname);
> +
> +     pkattnums = (int16 *) PG_GETARG_POINTER(1);
> +     pknumatts = PG_GETARG_INT16(2);
> +     /*
> +      * There should be one source array key values for each key attnum
> +      */
> +     if (pknumatts == 0)
> +         elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0.");
> +
> +     src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
> +     tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
> +
> +     /*
> +      * Source array is made up of key values that will be used to
> +      * locate the tuple of interest from the local system.
> +      */
> +     src_ndim = ARR_NDIM(src_pkattvals_arry);
> +     src_dim = ARR_DIMS(src_pkattvals_arry);
> +     src_nitems = ArrayGetNItems(src_ndim, src_dim);
> +
> +     /*
> +      * There should be one source array key value for each key attnum
> +      */
> +     if (src_nitems != pknumatts)
> +         elog(ERROR, "dblink_build_sql_insert: source key array length does not match number of key attributes.");
> +
> +     /*
> +      * get array of pointers to c-strings from the input source array
> +      */
> +     src_pkattvals = (char **) palloc(src_nitems * sizeof(char *));
> +     ptr = ARR_DATA_PTR(src_pkattvals_arry);
> +     for (i = 0; i < src_nitems; i++)
> +     {
> +         src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
> +         ptr += INTALIGN(*(int32 *) ptr);
> +     }
> +
> +     /*
> +      * Target array is made up of key values that will be used to
> +      * build the SQL string for use on the remote system.
> +      */
> +     tgt_ndim = ARR_NDIM(tgt_pkattvals_arry);
> +     tgt_dim = ARR_DIMS(tgt_pkattvals_arry);
> +     tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim);
> +
> +     /*
> +      * There should be one target array key value for each key attnum
> +      */
> +     if (tgt_nitems != pknumatts)
> +         elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes.");
> +
> +     /*
> +      * get array of pointers to c-strings from the input target array
> +      */
> +     tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
> +     ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
> +     for (i = 0; i < tgt_nitems; i++)
> +     {
> +         tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
> +         ptr += INTALIGN(*(int32 *) ptr);
> +     }
> +
> +     /*
> +      * Prep work is finally done. Go get the SQL string.
> +      */
> +     sql = get_sql_update(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
> +
> +     /*
> +      * Make it into TEXT for return to the client
> +      */
> +     sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql)));
> +
> +     /*
> +      * And send it
> +      */
> +     PG_RETURN_TEXT_P(sql_text);
> + }
> +
> +
> + /*
> +  * dblink_current_query
> +  * return the current query string
> +  * to allow its use in (among other things)
> +  * rewrite rules
> +  */
> + PG_FUNCTION_INFO_V1(dblink_current_query);
> + Datum
> + dblink_current_query(PG_FUNCTION_ARGS)
> + {
> +     text        *result_text;
> +
> +     result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(debug_query_string)));
> +     PG_RETURN_TEXT_P(result_text);
> + }
> +
> +
> + /*
> +  * dblink_replace_text
> +  * replace all occurences of 'old_sub_str' in 'orig_str'
> +  * with 'new_sub_str' to form 'new_str'
> +  *
> +  * returns 'orig_str' if 'old_sub_str' == '' or 'orig_str' == ''
> +  * otherwise returns 'new_str'
> +  */
> + PG_FUNCTION_INFO_V1(dblink_replace_text);
> + Datum
> + dblink_replace_text(PG_FUNCTION_ARGS)
> + {
> +     text        *left_text;
> +     text        *right_text;
> +     text        *buf_text;
> +     text        *ret_text;
> +     char        *ret_str;
> +     int            curr_posn;
> +     text        *src_text = PG_GETARG_TEXT_P(0);
> +     int            src_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(src_text)));
> +     text        *from_sub_text = PG_GETARG_TEXT_P(1);
> +     int            from_sub_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(from_sub_text)));
> +     text        *to_sub_text = PG_GETARG_TEXT_P(2);
> +     char        *to_sub_str = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(to_sub_text)));
> +     StringInfo    str = makeStringInfo();
> +
> +     if (src_text_len == 0 || from_sub_text_len == 0)
> +         PG_RETURN_TEXT_P(src_text);
> +
> +     buf_text = DatumGetTextPCopy(PointerGetDatum(src_text));
> +     curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text),
PointerGetDatum(from_sub_text)));
> +
> +     while (curr_posn > 0)
> +     {
> +         left_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text), 1,
DatumGetInt32(DirectFunctionCall2(textpos,PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) - 1)); 
> +         right_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text),
DatumGetInt32(DirectFunctionCall2(textpos,PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) +
from_sub_text_len,-1)); 
> +
> +         appendStringInfo(str, DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(left_text))));
> +         appendStringInfo(str, to_sub_str);
> +
> +         pfree(buf_text);
> +         pfree(left_text);
> +         buf_text = right_text;
> +         curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text),
PointerGetDatum(from_sub_text)));
> +     }
> +
> +     appendStringInfo(str, DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(buf_text))));
> +     pfree(buf_text);
> +
> +     ret_str = pstrdup(str->data);
> +     ret_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(ret_str)));
> +
> +     PG_RETURN_TEXT_P(ret_text);
> + }
> +
> +
> + /*************************************************************
>    * internal functions
>    */
>
> ***************
> *** 285,293 ****
> --- 853,1408 ----
>       MemSet(retval, 0, sizeof(dblink_results));
>
>       retval->tup_num = -1;
> +     retval->res_id_index =-1;
>       retval->res = NULL;
>
>       MemoryContextSwitchTo(oldcontext);
>
>       return retval;
>   }
> +
> +
> + /*
> +  * init_dblink_array_results
> +  *     - create an empty dblink_array_results data structure
> +  */
> + dblink_array_results *
> + init_dblink_array_results(MemoryContext fn_mcxt)
> + {
> +     MemoryContext oldcontext;
> +     dblink_array_results *retval;
> +
> +     oldcontext = MemoryContextSwitchTo(fn_mcxt);
> +
> +     retval = (dblink_array_results *) palloc(sizeof(dblink_array_results));
> +     MemSet(retval, 0, sizeof(dblink_array_results));
> +
> +     retval->elem_num = -1;
> +     retval->num_elems = 0;
> +     retval->res = NULL;
> +
> +     MemoryContextSwitchTo(oldcontext);
> +
> +     return retval;
> + }
> +
> + /*
> +  * get_pkey_attnames
> +  *
> +  * Get the primary key attnames for the given relation.
> +  * Return NULL, and set numatts = 0, if no primary key exists.
> +  */
> + char **
> + get_pkey_attnames(Oid relid, int16 *numatts)
> + {
> +     Relation        indexRelation;
> +     ScanKeyData        entry;
> +     HeapScanDesc    scan;
> +     HeapTuple        indexTuple;
> +     int                i;
> +     char            **result = NULL;
> +     Relation        rel;
> +     TupleDesc        tupdesc;
> +
> +     /*
> +      * Open relation using relid, get tupdesc
> +      */
> +     rel = relation_open(relid, AccessShareLock);
> +     tupdesc = rel->rd_att;
> +
> +     /*
> +      * Initialize numatts to 0 in case no primary key
> +      * exists
> +      */
> +     *numatts = 0;
> +
> +     /*
> +      * Use relid to get all related indexes
> +      */
> +     indexRelation = heap_openr(IndexRelationName, AccessShareLock);
> +     ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indrelid,
> +                            F_OIDEQ, ObjectIdGetDatum(relid));
> +     scan = heap_beginscan(indexRelation, false, SnapshotNow,
> +                           1, &entry);
> +
> +     while (HeapTupleIsValid(indexTuple = heap_getnext(scan, 0)))
> +     {
> +         Form_pg_index    index = (Form_pg_index) GETSTRUCT(indexTuple);
> +
> +         /*
> +          * We're only interested if it is the primary key
> +          */
> +         if (index->indisprimary == TRUE)
> +         {
> +             i = 0;
> +             while (index->indkey[i++] != 0)
> +                 (*numatts)++;
> +
> +             if (*numatts > 0)
> +             {
> +                 result = (char **) palloc(*numatts * sizeof(char *));
> +                 for (i = 0; i < *numatts; i++)
> +                     result[i] = SPI_fname(tupdesc, index->indkey[i]);
> +             }
> +             break;
> +         }
> +     }
> +     heap_endscan(scan);
> +     heap_close(indexRelation, AccessShareLock);
> +     relation_close(rel, AccessShareLock);
> +
> +     return result;
> + }
> +
> +
> + /*
> +  * get_strtok
> +  *
> +  * parse input string
> +  * return ord item (0 based)
> +  * based on provided field separator
> +  */
> + char *
> + get_strtok(char *fldtext, char *fldsep, int fldnum)
> + {
> +     int            j = 0;
> +     char        *result;
> +
> +     if (fldnum < 0)
> +     {
> +         elog(ERROR, "get_strtok: field number < 0 not permitted");
> +     }
> +
> +     if (fldsep[0] == '\0')
> +     {
> +         elog(ERROR, "get_strtok: blank field separator not permitted");
> +     }
> +
> +     result = strtok(fldtext, fldsep);
> +     for (j = 1; j < fldnum + 1; j++)
> +     {
> +         result = strtok(NULL, fldsep);
> +         if (result == NULL)
> +             return NULL;
> +     }
> +
> +     return pstrdup(result);
> + }
> +
> + char *
> + get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
> + {
> +     Relation        rel;
> +     char            *relname;
> +     HeapTuple        tuple;
> +     TupleDesc        tupdesc;
> +     int                natts;
> +     StringInfo        str = makeStringInfo();
> +     char            *sql = NULL;
> +     char            *val = NULL;
> +     int16            key;
> +     unsigned int    i;
> +
> +     /*
> +      * Open relation using relid
> +      */
> +     rel = relation_open(relid, AccessShareLock);
> +     relname =  RelationGetRelationName(rel);
> +     tupdesc = rel->rd_att;
> +     natts = tupdesc->natts;
> +
> +     tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
> +
> +     appendStringInfo(str, "INSERT INTO %s(", quote_ident_cstr(relname));
> +     for (i = 0; i < natts; i++)
> +     {
> +         if (i > 0)
> +             appendStringInfo(str, ",");
> +
> +         appendStringInfo(str, NameStr(tupdesc->attrs[i]->attname));
> +     }
> +
> +     appendStringInfo(str, ") VALUES(");
> +
> +     /*
> +      * remember attvals are 1 based
> +      */
> +     for (i = 0; i < natts; i++)
> +     {
> +         if (i > 0)
> +             appendStringInfo(str, ",");
> +
> +         if (tgt_pkattvals != NULL)
> +             key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1);
> +         else
> +             key = -1;
> +
> +         if (key > -1)
> +             val = pstrdup(tgt_pkattvals[key]);
> +         else
> +             val = SPI_getvalue(tuple, tupdesc, i + 1);
> +
> +         if (val != NULL)
> +         {
> +             appendStringInfo(str, quote_literal_cstr(val));
> +             pfree(val);
> +         }
> +         else
> +             appendStringInfo(str, "NULL");
> +     }
> +     appendStringInfo(str, ")");
> +
> +     sql = pstrdup(str->data);
> +     pfree(str->data);
> +     pfree(str);
> +     relation_close(rel, AccessShareLock);
> +
> +     return (sql);
> + }
> +
> + char *
> + get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals)
> + {
> +     Relation        rel;
> +     char            *relname;
> +     TupleDesc        tupdesc;
> +     int                natts;
> +     StringInfo        str = makeStringInfo();
> +     char            *sql = NULL;
> +     char            *val = NULL;
> +     unsigned int    i;
> +
> +     /*
> +      * Open relation using relid
> +      */
> +     rel = relation_open(relid, AccessShareLock);
> +     relname =  RelationGetRelationName(rel);
> +     tupdesc = rel->rd_att;
> +     natts = tupdesc->natts;
> +
> +     appendStringInfo(str, "DELETE FROM %s WHERE ", quote_ident_cstr(relname));
> +     for (i = 0; i < pknumatts; i++)
> +     {
> +         int16    pkattnum = pkattnums[i];
> +
> +         if (i > 0)
> +             appendStringInfo(str, " AND ");
> +
> +         appendStringInfo(str, NameStr(tupdesc->attrs[pkattnum - 1]->attname));
> +
> +         if (tgt_pkattvals != NULL)
> +             val = pstrdup(tgt_pkattvals[i]);
> +         else
> +             elog(ERROR, "Target key array must not be NULL");
> +
> +         if (val != NULL)
> +         {
> +             appendStringInfo(str, "=");
> +             appendStringInfo(str, quote_literal_cstr(val));
> +             pfree(val);
> +         }
> +         else
> +             appendStringInfo(str, "IS NULL");
> +     }
> +
> +     sql = pstrdup(str->data);
> +     pfree(str->data);
> +     pfree(str);
> +     relation_close(rel, AccessShareLock);
> +
> +     return (sql);
> + }
> +
> + char *
> + get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
> + {
> +     Relation        rel;
> +     char            *relname;
> +     HeapTuple        tuple;
> +     TupleDesc        tupdesc;
> +     int                natts;
> +     StringInfo        str = makeStringInfo();
> +     char            *sql = NULL;
> +     char            *val = NULL;
> +     int16            key;
> +     int                i;
> +
> +     /*
> +      * Open relation using relid
> +      */
> +     rel = relation_open(relid, AccessShareLock);
> +     relname =  RelationGetRelationName(rel);
> +     tupdesc = rel->rd_att;
> +     natts = tupdesc->natts;
> +
> +     tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
> +
> +     appendStringInfo(str, "UPDATE %s SET ", quote_ident_cstr(relname));
> +
> +     for (i = 0; i < natts; i++)
> +     {
> +         if (i > 0)
> +             appendStringInfo(str, ",");
> +
> +         appendStringInfo(str, NameStr(tupdesc->attrs[i]->attname));
> +         appendStringInfo(str, "=");
> +
> +         if (tgt_pkattvals != NULL)
> +             key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1);
> +         else
> +             key = -1;
> +
> +         if (key > -1)
> +             val = pstrdup(tgt_pkattvals[key]);
> +         else
> +             val = SPI_getvalue(tuple, tupdesc, i + 1);
> +
> +         if (val != NULL)
> +         {
> +             appendStringInfo(str, quote_literal_cstr(val));
> +             pfree(val);
> +         }
> +         else
> +             appendStringInfo(str, "NULL");
> +     }
> +
> +     appendStringInfo(str, " WHERE ");
> +
> +     for (i = 0; i < pknumatts; i++)
> +     {
> +         int16    pkattnum = pkattnums[i];
> +
> +         if (i > 0)
> +             appendStringInfo(str, " AND ");
> +
> +         appendStringInfo(str, NameStr(tupdesc->attrs[pkattnum - 1]->attname));
> +
> +         if (tgt_pkattvals != NULL)
> +             val = pstrdup(tgt_pkattvals[i]);
> +         else
> +             val = SPI_getvalue(tuple, tupdesc, pkattnum);
> +
> +         if (val != NULL)
> +         {
> +             appendStringInfo(str, "=");
> +             appendStringInfo(str, quote_literal_cstr(val));
> +             pfree(val);
> +         }
> +         else
> +             appendStringInfo(str, "IS NULL");
> +     }
> +
> +     sql = pstrdup(str->data);
> +     pfree(str->data);
> +     pfree(str);
> +     relation_close(rel, AccessShareLock);
> +
> +     return (sql);
> + }
> +
> + /*
> +  * Return a properly quoted literal value.
> +  * Uses quote_literal in quote.c
> +  */
> + static char *
> + quote_literal_cstr(char *rawstr)
> + {
> +     text        *rawstr_text;
> +     text        *result_text;
> +     char        *result;
> +
> +     rawstr_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(rawstr)));
> +     result_text = DatumGetTextP(DirectFunctionCall1(quote_literal, PointerGetDatum(rawstr_text)));
> +     result = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(result_text)));
> +
> +     return result;
> + }
> +
> + /*
> +  * Return a properly quoted identifier.
> +  * Uses quote_ident in quote.c
> +  */
> + static char *
> + quote_ident_cstr(char *rawstr)
> + {
> +     text        *rawstr_text;
> +     text        *result_text;
> +     char        *result;
> +
> +     rawstr_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(rawstr)));
> +     result_text = DatumGetTextP(DirectFunctionCall1(quote_ident, PointerGetDatum(rawstr_text)));
> +     result = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(result_text)));
> +
> +     return result;
> + }
> +
> + int16
> + get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key)
> + {
> +     int        i;
> +
> +     /*
> +      * Not likely a long list anyway, so just scan for
> +      * the value
> +      */
> +     for (i = 0; i < pknumatts; i++)
> +         if (key == pkattnums[i])
> +             return i;
> +
> +     return -1;
> + }
> +
> + HeapTuple
> + get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals)
> + {
> +     Relation        rel;
> +     char            *relname;
> +     TupleDesc        tupdesc;
> +     StringInfo        str = makeStringInfo();
> +     char            *sql = NULL;
> +     int                ret;
> +     HeapTuple        tuple;
> +     int                i;
> +     char            *val = NULL;
> +
> +     /*
> +      * Open relation using relid
> +      */
> +     rel = relation_open(relid, AccessShareLock);
> +     relname =  RelationGetRelationName(rel);
> +     tupdesc = rel->rd_att;
> +
> +     /*
> +      * Connect to SPI manager
> +      */
> +     if ((ret = SPI_connect()) < 0)
> +         elog(ERROR, "get_tuple_of_interest: SPI_connect returned %d", ret);
> +
> +     /*
> +      * Build sql statement to look up tuple of interest
> +      * Use src_pkattvals as the criteria.
> +      */
> +     appendStringInfo(str, "SELECT * from %s WHERE ", relname);
> +
> +     for (i = 0; i < pknumatts; i++)
> +     {
> +         int16    pkattnum = pkattnums[i];
> +
> +         if (i > 0)
> +             appendStringInfo(str, " AND ");
> +
> +         appendStringInfo(str, NameStr(tupdesc->attrs[pkattnum - 1]->attname));
> +
> +         val = pstrdup(src_pkattvals[i]);
> +         if (val != NULL)
> +         {
> +             appendStringInfo(str, "=");
> +             appendStringInfo(str, quote_literal_cstr(val));
> +             pfree(val);
> +         }
> +         else
> +             appendStringInfo(str, "IS NULL");
> +     }
> +
> +     sql = pstrdup(str->data);
> +     pfree(str->data);
> +     pfree(str);
> +     /*
> +      * Retrieve the desired tuple
> +      */
> +     ret = SPI_exec(sql, 0);
> +     pfree(sql);
> +
> +     /*
> +      * Only allow one qualifying tuple
> +      */
> +     if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
> +     {
> +         elog(ERROR, "get_tuple_of_interest: Source criteria may not match more than one record.");
> +     }
> +     else if (ret == SPI_OK_SELECT && SPI_processed == 1)
> +     {
> +         SPITupleTable *tuptable = SPI_tuptable;
> +         tuple = SPI_copytuple(tuptable->vals[0]);
> +
> +         return tuple;
> +     }
> +     else
> +     {
> +         /*
> +          * no qualifying tuples
> +          */
> +         return NULL;
> +     }
> +
> +     /*
> +      * never reached, but keep compiler quiet
> +      */
> +     return NULL;
> + }
> +
> + Oid
> + get_relid_from_relname(char *relname)
> + {
> + #ifdef NamespaceRelationName
> +     Oid                relid;
> +
> +     relid = RelnameGetRelid(relname);
> + #else
> +     Relation        rel;
> +     Oid                relid;
> +
> +     rel = relation_openr(relname, AccessShareLock);
> +     relid = RelationGetRelid(rel);
> +     relation_close(rel, AccessShareLock);
> + #endif   /* NamespaceRelationName */
> +
> +     return relid;
> + }
> +
> + dblink_results    *
> + get_res_ptr(int32 res_id_index)
> + {
> +     List    *ptr;
> +
> +     /*
> +      * short circuit empty list
> +      */
> +     if(res_id == NIL)
> +         return NULL;
> +
> +     /*
> +      * OK, should be good to go
> +      */
> +     foreach(ptr, res_id)
> +     {
> +         dblink_results    *this_res_id = (dblink_results *) lfirst(ptr);
> +         if (this_res_id->res_id_index == res_id_index)
> +             return this_res_id;
> +     }
> +     return NULL;
> + }
> +
> + /*
> +  * Add node to global List res_id
> +  */
> + void
> + append_res_ptr(dblink_results *results)
> + {
> +     res_id = lappend(res_id, results);
> + }
> +
> + /*
> +  * Remove node from global List
> +  * using res_id_index
> +  */
> + void
> + remove_res_ptr(dblink_results *results)
> + {
> +     res_id = lremove(results, res_id);
> +
> +     if (res_id == NIL)
> +         res_id_index = 0;
> + }
> +
> +
> diff -cNr dblink.orig/dblink.h dblink/dblink.h
> *** dblink.orig/dblink.h    Mon Nov  5 09:46:22 2001
> --- dblink/dblink.h    Sun Apr 14 18:54:39 2002
> ***************
> *** 3,9 ****
>    *
>    * Functions returning results from a remote database
>    *
> !  * Copyright (c) Joseph Conway <joe.conway@mail.com>, 2001;
>    *
>    * Permission to use, copy, modify, and distribute this software and its
>    * documentation for any purpose, without fee, and without a written agreement
> --- 3,10 ----
>    *
>    * Functions returning results from a remote database
>    *
> !  * Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002,
> !  * ALL RIGHTS RESERVED;
>    *
>    * Permission to use, copy, modify, and distribute this software and its
>    * documentation for any purpose, without fee, and without a written agreement
> ***************
> *** 33,42 ****
> --- 34,64 ----
>   #include "libpq-int.h"
>   #include "fmgr.h"
>   #include "access/tupdesc.h"
> + #include "access/heapam.h"
> + #include "catalog/catname.h"
> + #include "catalog/pg_index.h"
> + #include "catalog/pg_type.h"
>   #include "executor/executor.h"
> + #include "executor/spi.h"
> + #include "lib/stringinfo.h"
>   #include "nodes/nodes.h"
>   #include "nodes/execnodes.h"
> + #include "nodes/pg_list.h"
> + #include "parser/parse_type.h"
> + #include "tcop/tcopprot.h"
>   #include "utils/builtins.h"
> + #include "utils/fmgroids.h"
> + #include "utils/array.h"
> + #include "utils/syscache.h"
> +
> + #ifdef NamespaceRelationName
> + #include "catalog/namespace.h"
> + #endif   /* NamespaceRelationName */
> +
> + /*
> +  * Max SQL statement size
> +  */
> + #define DBLINK_MAX_SQLSTATE_SIZE        16384
>
>   /*
>    * This struct holds the results of the remote query.
> ***************
> *** 50,70 ****
>       int            tup_num;
>
>       /*
>        * the actual query results
>        */
>       PGresult   *res;
> -
>   }    dblink_results;
>
>   /*
>    * External declarations
>    */
>   extern Datum dblink(PG_FUNCTION_ARGS);
>   extern Datum dblink_tok(PG_FUNCTION_ARGS);
>
>   /*
>    * Internal declarations
>    */
>   dblink_results *init_dblink_results(MemoryContext fn_mcxt);
>
>   #endif   /* DBLINK_H */
> --- 72,145 ----
>       int            tup_num;
>
>       /*
> +      * resource index number for this context
> +      */
> +     int            res_id_index;
> +
> +     /*
>        * the actual query results
>        */
>       PGresult   *res;
>   }    dblink_results;
>
> +
> + /*
> +  * This struct holds results in the form of an array.
> +  * Use fn_extra to hold a pointer to it across calls
> +  */
> + typedef struct
> + {
> +     /*
> +      * elem being accessed
> +      */
> +     int            elem_num;
> +
> +     /*
> +      * number of elems
> +      */
> +     int            num_elems;
> +
> +     /*
> +      * the actual array
> +      */
> +     void        *res;
> +
> + }    dblink_array_results;
> +
>   /*
>    * External declarations
>    */
>   extern Datum dblink(PG_FUNCTION_ARGS);
>   extern Datum dblink_tok(PG_FUNCTION_ARGS);
> + extern Datum dblink_strtok(PG_FUNCTION_ARGS);
> + extern Datum dblink_get_pkey(PG_FUNCTION_ARGS);
> + extern Datum dblink_last_oid(PG_FUNCTION_ARGS);
> + extern Datum dblink_build_sql_insert(PG_FUNCTION_ARGS);
> + extern Datum dblink_build_sql_delete(PG_FUNCTION_ARGS);
> + extern Datum dblink_build_sql_update(PG_FUNCTION_ARGS);
> + extern Datum dblink_current_query(PG_FUNCTION_ARGS);
> + extern Datum dblink_replace_text(PG_FUNCTION_ARGS);
>
>   /*
>    * Internal declarations
>    */
>   dblink_results *init_dblink_results(MemoryContext fn_mcxt);
> + dblink_array_results *init_dblink_array_results(MemoryContext fn_mcxt);
> + char **get_pkey_attnames(Oid relid, int16 *numatts);
> + char *get_strtok(char *fldtext, char *fldsep, int fldnum);
> + char *getvalue(HeapTuple tuple, TupleDesc tupdesc, int fnumber);
> + char *get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
> + char *get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals);
> + char *get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
> + static char *quote_literal_cstr(char *rawstr);
> + static char *quote_ident_cstr(char *rawstr);
> + int16 get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key);
> + HeapTuple get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals);
> + Oid get_relid_from_relname(char *relname);
> + dblink_results    *get_res_ptr(int32 res_id_index);
> + void append_res_ptr(dblink_results *results);
> + void remove_res_ptr(dblink_results *results);
> +
> + extern char    *debug_query_string;
>
>   #endif   /* DBLINK_H */
> diff -cNr dblink.orig/dblink.sql.in dblink/dblink.sql.in
> *** dblink.orig/dblink.sql.in    Thu Jun 14 09:49:03 2001
> --- dblink/dblink.sql.in    Fri Apr 12 14:36:49 2002
> ***************
> *** 1,5 ****
> ! CREATE FUNCTION dblink (text,text) RETURNS setof int
> !   AS 'MODULE_PATHNAME','dblink' LANGUAGE 'c';
>
> ! CREATE FUNCTION dblink_tok (int,int) RETURNS text
> !   AS 'MODULE_PATHNAME','dblink_tok' LANGUAGE 'c';
> --- 1,38 ----
> ! CREATE OR REPLACE FUNCTION dblink (text,text) RETURNS setof int
> !   AS 'MODULE_PATHNAME','dblink' LANGUAGE 'c'
> !   WITH (isstrict);
>
> ! CREATE OR REPLACE FUNCTION dblink_tok (int,int) RETURNS text
> !   AS 'MODULE_PATHNAME','dblink_tok' LANGUAGE 'c'
> !   WITH (isstrict);
> !
> ! CREATE OR REPLACE FUNCTION dblink_strtok (text,text,int) RETURNS text
> !   AS 'MODULE_PATHNAME','dblink_strtok' LANGUAGE 'c'
> !   WITH (iscachable, isstrict);
> !
> ! CREATE OR REPLACE FUNCTION dblink_get_pkey (name) RETURNS setof text
> !   AS 'MODULE_PATHNAME','dblink_get_pkey' LANGUAGE 'c'
> !   WITH (isstrict);
> !
> ! CREATE OR REPLACE FUNCTION dblink_last_oid (int) RETURNS oid
> !   AS 'MODULE_PATHNAME','dblink_last_oid' LANGUAGE 'c'
> !   WITH (isstrict);
> !
> ! CREATE OR REPLACE FUNCTION dblink_build_sql_insert (name, int2vector, int2, _text, _text) RETURNS text
> !   AS 'MODULE_PATHNAME','dblink_build_sql_insert' LANGUAGE 'c'
> !   WITH (isstrict);
> !
> ! CREATE OR REPLACE FUNCTION dblink_build_sql_delete (name, int2vector, int2, _text) RETURNS text
> !   AS 'MODULE_PATHNAME','dblink_build_sql_delete' LANGUAGE 'c'
> !   WITH (isstrict);
> !
> ! CREATE OR REPLACE FUNCTION dblink_build_sql_update (name, int2vector, int2, _text, _text) RETURNS text
> !   AS 'MODULE_PATHNAME','dblink_build_sql_update' LANGUAGE 'c'
> !   WITH (isstrict);
> !
> ! CREATE OR REPLACE FUNCTION dblink_current_query () RETURNS text
> !   AS 'MODULE_PATHNAME','dblink_current_query' LANGUAGE 'c';
> !
> ! CREATE OR REPLACE FUNCTION dblink_replace (text,text,text) RETURNS text
> !   AS 'MODULE_PATHNAME','dblink_replace_text' LANGUAGE 'c'
> !   WITH (iscachable, isstrict);

>
> ---------------------------(end of broadcast)---------------------------
> TIP 5: Have you checked our extensive FAQ?
>
> http://www.postgresql.org/users-lounge/docs/faq.html

--
  Bruce Momjian                        |  http://candle.pha.pa.us
  pgman@candle.pha.pa.us               |  (610) 853-3000
  +  If your life is a hard drive,     |  830 Blythe Avenue
  +  Christ can be your backup.        |  Drexel Hill, Pennsylvania 19026

pgsql-patches by date:

Previous
From: Bruce Momjian
Date:
Subject: Re: Win32 Error descriptions + config
Next
From: Bruce Momjian
Date:
Subject: Re: [PATCH]errors_zh_TW.properties