Re: contrib/dblink update - Mailing list pgsql-patches
From | Bruce Momjian |
---|---|
Subject | Re: contrib/dblink update |
Date | |
Msg-id | 200204240228.g3O2SUj22558@candle.pha.pa.us Whole thread Raw |
In response to | contrib/dblink update (Joe Conway <mail@joeconway.com>) |
List | pgsql-patches |
Patch applied. Thanks. --------------------------------------------------------------------------- Joe Conway wrote: > Attached is an update to contrib/dblink. Please apply if there are no > objections. > > Major changes: > - removed cursor wrap around input sql to allow for remote > execution of INSERT/UPDATE/DELETE > - dblink now returns a resource id instead of a real pointer > - added several utility functions > > I'm still hoping to add explicit cursor open/fetch/close support before > 7.3 is released, but I need a bit more time on that. > > On a somewhat unrelated topic, I never got any feedback on the > unknownin/out patch and the mb_substring patch. Is there anything else I > need to do to get those applied? > > Thanks, > > Joe > diff -cNr dblink.orig/README.dblink dblink/README.dblink > *** dblink.orig/README.dblink Thu Dec 13 02:48:39 2001 > --- dblink/README.dblink Sun Apr 14 20:02:06 2002 > *************** > *** 3,9 **** > * > * Functions returning results from a remote database > * > ! * Copyright (c) Joseph Conway <joe.conway@mail.com>, 2001; > * > * Permission to use, copy, modify, and distribute this software and its > * documentation for any purpose, without fee, and without a written agreement > --- 3,10 ---- > * > * Functions returning results from a remote database > * > ! * Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002, > ! * ALL RIGHTS RESERVED; > * > * Permission to use, copy, modify, and distribute this software and its > * documentation for any purpose, without fee, and without a written agreement > *************** > *** 25,36 **** > */ > > > ! Version 0.3 (14 June, 2001): > ! Function to test returning data set from remote database > ! Tested under Linux (Red Hat 6.2 and 7.0) and PostgreSQL 7.1 and 7.2devel > > Release Notes: > > Version 0.3 > - fixed dblink invalid pointer causing corrupt elog message > - fixed dblink_tok improper handling of null results > --- 26,44 ---- > */ > > > ! Version 0.4 (7 April, 2002): > ! Functions allowing remote database INSERT/UPDATE/DELETE/SELECT, and > ! various utility functions. > ! Tested under Linux (Red Hat 7.2) and PostgreSQL 7.2 and 7.3devel > > Release Notes: > > + Version 0.4 > + - removed cursor wrap around input sql to allow for remote > + execution of INSERT/UPDATE/DELETE > + - dblink now returns a resource id instead of a real pointer > + - added several utility functions -- see below > + > Version 0.3 > - fixed dblink invalid pointer causing corrupt elog message > - fixed dblink_tok improper handling of null results > *************** > *** 51,64 **** > > installs following functions into database template1: > > ! dblink() - returns a pointer to results from remote query > ! dblink_tok() - extracts and returns individual field results > > Documentation > ================================================================== > Name > > ! dblink -- Returns a pointer to a data set from a remote database > > Synopsis > > --- 59,94 ---- > > installs following functions into database template1: > > ! dblink(text,text) RETURNS setof int > ! - returns a resource id for results from remote query > ! dblink_tok(int,int) RETURNS text > ! - extracts and returns individual field results > ! dblink_strtok(text,text,int) RETURNS text > ! - extracts and returns individual token from delimited text > ! dblink_get_pkey(name) RETURNS setof text > ! - returns the field names of a relation's primary key fields > ! dblink_last_oid(int) RETURNS oid > ! - returns the last inserted oid > ! dblink_build_sql_insert(name,int2vector,int2,_text,_text) RETURNS text > ! - builds an insert statement using a local tuple, replacing the > ! selection key field values with alternate supplied values > ! dblink_build_sql_delete(name,int2vector,int2,_text) RETURNS text > ! - builds a delete statement using supplied values for selection > ! key field values > ! dblink_build_sql_update(name,int2vector,int2,_text,_text) RETURNS text > ! - builds an update statement using a local tuple, replacing the > ! selection key field values with alternate supplied values > ! dblink_current_query() RETURNS text > ! - returns the current query string > ! dblink_replace(text,text,text) RETURNS text > ! - replace all occurences of substring-a in the input-string > ! with substring-b > > Documentation > ================================================================== > Name > > ! dblink -- Returns a resource id for a data set from a remote database > > Synopsis > > *************** > *** 78,84 **** > > Outputs > > ! Returns setof int (pointer) > > Example usage > > --- 108,114 ---- > > Outputs > > ! Returns setof int (res_id) > > Example usage > > *************** > *** 94,106 **** > > Synopsis > > ! dblink_tok(int pointer, int fnumber) > > Inputs > > ! pointer > > ! a pointer returned by a call to dblink() > > fnumber > > --- 124,136 ---- > > Synopsis > > ! dblink_tok(int res_id, int fnumber) > > Inputs > > ! res_id > > ! a resource id returned by a call to dblink() > > fnumber > > *************** > *** 131,136 **** > --- 161,415 ---- > select f1, f2 from myremotetable where f1 like 'bytea%'; > > ================================================================== > + Name > + > + dblink_strtok -- Extracts and returns individual token from delimited text > + > + Synopsis > + > + dblink_strtok(text inputstring, text delimiter, int posn) RETURNS text > + > + Inputs > + > + inputstring > + > + any string you want to parse a token out of; > + e.g. 'f=1&g=3&h=4' > + > + delimiter > + > + a single character to use as the delimiter; > + e.g. '&' or '=' > + > + posn > + > + the position of the token of interest, 0 based; > + e.g. 1 > + > + Outputs > + > + Returns text > + > + Example usage > + > + test=# select dblink_strtok(dblink_strtok('f=1&g=3&h=4','&',1),'=',1); > + dblink_strtok > + --------------- > + 3 > + (1 row) > + > + ================================================================== > + Name > + > + dblink_get_pkey -- returns the field names of a relation's primary > + key fields > + > + Synopsis > + > + dblink_get_pkey(name relname) RETURNS setof text > + > + Inputs > + > + relname > + > + any relation name; > + e.g. 'foobar' > + > + Outputs > + > + Returns setof text -- one row for each primary key field, in order of > + precedence > + > + Example usage > + > + test=# select dblink_get_pkey('foobar'); > + dblink_get_pkey > + ----------------- > + f1 > + f2 > + f3 > + f4 > + f5 > + (5 rows) > + > + > + ================================================================== > + Name > + > + dblink_last_oid -- Returns last inserted oid > + > + Synopsis > + > + dblink_last_oid(int res_id) RETURNS oid > + > + Inputs > + > + res_id > + > + any resource id returned by dblink function; > + > + Outputs > + > + Returns oid of last inserted tuple > + > + Example usage > + > + test=# select dblink_last_oid(dblink('hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd' > + ,'insert into mytable (f1, f2) values (1,2)')); > + > + dblink_last_oid > + ---------------- > + 16553 > + (1 row) > + > + > + ================================================================== > + Name > + > + dblink_build_sql_insert -- builds an insert statement using a local > + tuple, replacing the selection key field > + values with alternate supplied values > + dblink_build_sql_delete -- builds a delete statement using supplied > + values for selection key field values > + dblink_build_sql_update -- builds an update statement using a local > + tuple, replacing the selection key field > + values with alternate supplied values > + > + > + Synopsis > + > + dblink_build_sql_insert(name relname > + ,int2vector primary_key_attnums > + ,int2 num_primary_key_atts > + ,_text src_pk_att_vals_array > + ,_text tgt_pk_att_vals_array) RETURNS text > + dblink_build_sql_delete(name relname > + ,int2vector primary_key_attnums > + ,int2 num_primary_key_atts > + ,_text tgt_pk_att_vals_array) RETURNS text > + dblink_build_sql_update(name relname > + ,int2vector primary_key_attnums > + ,int2 num_primary_key_atts > + ,_text src_pk_att_vals_array > + ,_text tgt_pk_att_vals_array) RETURNS text > + > + Inputs > + > + relname > + > + any relation name; > + e.g. 'foobar' > + > + primary_key_attnums > + > + vector of primary key attnums (1 based, see pg_index.indkey); > + e.g. '1 2' > + > + num_primary_key_atts > + > + number of primary key attnums in the vector; e.g. 2 > + > + src_pk_att_vals_array > + > + array of primary key values, used to look up the local matching > + tuple, the values of which are then used to construct the SQL > + statement > + > + tgt_pk_att_vals_array > + > + array of primary key values, used to replace the local tuple > + values in the SQL statement > + > + Outputs > + > + Returns text -- requested SQL statement > + > + Example usage > + > + test=# select dblink_build_sql_insert('foo','1 2',2,'{"1", "a"}','{"1", "b''a"}'); > + dblink_build_sql_insert > + -------------------------------------------------- > + INSERT INTO foo(f1,f2,f3) VALUES('1','b''a','1') > + (1 row) > + > + test=# select dblink_build_sql_delete('MyFoo','1 2',2,'{"1", "b"}'); > + dblink_build_sql_delete > + --------------------------------------------- > + DELETE FROM "MyFoo" WHERE f1='1' AND f2='b' > + (1 row) > + > + test=# select dblink_build_sql_update('foo','1 2',2,'{"1", "a"}','{"1", "b"}'); > + dblink_build_sql_update > + ------------------------------------------------------------- > + UPDATE foo SET f1='1',f2='b',f3='1' WHERE f1='1' AND f2='b' > + (1 row) > + > + > + ================================================================== > + Name > + > + dblink_current_query -- returns the current query string > + > + Synopsis > + > + dblink_current_query () RETURNS text > + > + Inputs > + > + None > + > + Outputs > + > + Returns text -- a copy of the currently executing query > + > + Example usage > + > + test=# select dblink_current_query() from (select dblink('dbname=template1','select oid, proname from pg_proc where proname= ''byteacat''') as f1) as t1; > + dblink_current_query > + ----------------------------------------------------------------------------------------------------------------------------------------------------- > + select dblink_current_query() from (select dblink('dbname=template1','select oid, proname from pg_proc where proname= ''byteacat''') as f1) as t1; > + (1 row) > + > + > + ================================================================== > + Name > + > + dblink_replace -- replace all occurences of substring-a in the > + input-string with substring-b > + > + Synopsis > + > + dblink_replace(text input-string, text substring-a, text substring-b) RETURNS text > + > + Inputs > + > + input-string > + > + the starting string, before replacement of substring-a > + > + substring-a > + > + the substring to find and replace > + > + substring-b > + > + the substring to be substituted in place of substring-a > + > + Outputs > + > + Returns text -- a copy of the starting string, but with all occurences of > + substring-a replaced with substring-b > + > + Example usage > + > + test=# select dblink_replace('12345678901234567890','56','hello'); > + dblink_replace > + ---------------------------- > + 1234hello78901234hello7890 > + (1 row) > + > + ================================================================== > + > > -- Joe Conway > > diff -cNr dblink.orig/dblink.c dblink/dblink.c > *** dblink.orig/dblink.c Wed Oct 24 22:49:19 2001 > --- dblink/dblink.c Sun Apr 14 20:03:30 2002 > *************** > *** 3,9 **** > * > * Functions returning results from a remote database > * > ! * Copyright (c) Joseph Conway <joe.conway@mail.com>, 2001; > * > * Permission to use, copy, modify, and distribute this software and its > * documentation for any purpose, without fee, and without a written agreement > --- 3,10 ---- > * > * Functions returning results from a remote database > * > ! * Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002, > ! * ALL RIGHTS RESERVED; > * > * Permission to use, copy, modify, and distribute this software and its > * documentation for any purpose, without fee, and without a written agreement > *************** > *** 26,48 **** > > #include "dblink.h" > > PG_FUNCTION_INFO_V1(dblink); > Datum > dblink(PG_FUNCTION_ARGS) > { > ! PGconn *conn = NULL; > ! PGresult *res = NULL; > ! dblink_results *results; > ! char *optstr; > ! char *sqlstatement; > ! char *curstr = "DECLARE mycursor CURSOR FOR "; > ! char *execstatement; > ! char *msg; > ! int ntuples = 0; > ! ReturnSetInfo *rsi; > ! > ! if (PG_ARGISNULL(0) || PG_ARGISNULL(1)) > ! elog(ERROR, "dblink: NULL arguments are not permitted"); > > if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo)) > elog(ERROR, "dblink: function called in context that does not accept a set result"); > --- 27,49 ---- > > #include "dblink.h" > > + /* Global */ > + List *res_id = NIL; > + int res_id_index = 0; > + > PG_FUNCTION_INFO_V1(dblink); > Datum > dblink(PG_FUNCTION_ARGS) > { > ! PGconn *conn = NULL; > ! PGresult *res = NULL; > ! dblink_results *results; > ! char *optstr; > ! char *sqlstatement; > ! char *execstatement; > ! char *msg; > ! int ntuples = 0; > ! ReturnSetInfo *rsi; > > if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo)) > elog(ERROR, "dblink: function called in context that does not accept a set result"); > *************** > *** 61,81 **** > elog(ERROR, "dblink: connection error: %s", msg); > } > > ! res = PQexec(conn, "BEGIN"); > ! if (PQresultStatus(res) != PGRES_COMMAND_OK) > ! { > ! msg = pstrdup(PQerrorMessage(conn)); > ! PQclear(res); > ! PQfinish(conn); > ! elog(ERROR, "dblink: begin error: %s", msg); > ! } > ! PQclear(res); > ! > ! execstatement = (char *) palloc(strlen(curstr) + strlen(sqlstatement) + 1); > if (execstatement != NULL) > { > ! strcpy(execstatement, curstr); > ! strcat(execstatement, sqlstatement); > strcat(execstatement, "\0"); > } > else > --- 62,71 ---- > elog(ERROR, "dblink: connection error: %s", msg); > } > > ! execstatement = (char *) palloc(strlen(sqlstatement) + 1); > if (execstatement != NULL) > { > ! strcpy(execstatement, sqlstatement); > strcat(execstatement, "\0"); > } > else > *************** > *** 94,163 **** > /* > * got results, start fetching them > */ > - PQclear(res); > - > - res = PQexec(conn, "FETCH ALL in mycursor"); > - if (!res || PQresultStatus(res) != PGRES_TUPLES_OK) > - { > - msg = pstrdup(PQerrorMessage(conn)); > - PQclear(res); > - PQfinish(conn); > - elog(ERROR, "dblink: sql error: %s", msg); > - } > - > ntuples = PQntuples(res); > > ! if (ntuples > 0) > ! { > ! > ! results = init_dblink_results(fcinfo->flinfo->fn_mcxt); > ! results->tup_num = 0; > ! results->res = res; > ! res = NULL; > ! > ! fcinfo->flinfo->fn_extra = (void *) results; > ! > ! results = NULL; > ! results = fcinfo->flinfo->fn_extra; > ! > ! /* close the cursor */ > ! res = PQexec(conn, "CLOSE mycursor"); > ! PQclear(res); > ! > ! /* commit the transaction */ > ! res = PQexec(conn, "COMMIT"); > ! PQclear(res); > ! > ! /* close the connection to the database and cleanup */ > ! PQfinish(conn); > ! > ! rsi = (ReturnSetInfo *) fcinfo->resultinfo; > ! rsi->isDone = ExprMultipleResult; > ! > ! PG_RETURN_POINTER(results); > ! > ! } > ! else > ! { > > ! PQclear(res); > > ! /* close the cursor */ > ! res = PQexec(conn, "CLOSE mycursor"); > ! PQclear(res); > > ! /* commit the transaction */ > ! res = PQexec(conn, "COMMIT"); > ! PQclear(res); > > ! /* close the connection to the database and cleanup */ > ! PQfinish(conn); > > ! rsi = (ReturnSetInfo *) fcinfo->resultinfo; > ! rsi->isDone = ExprEndResult; > > ! PG_RETURN_NULL(); > ! } > } > } > else > --- 84,119 ---- > /* > * got results, start fetching them > */ > ntuples = PQntuples(res); > > ! /* > ! * increment resource index > ! */ > ! res_id_index++; > > ! results = init_dblink_results(fcinfo->flinfo->fn_mcxt); > ! results->tup_num = 0; > ! results->res_id_index = res_id_index; > ! results->res = res; > > ! /* > ! * Append node to res_id to hold pointer to results. > ! * Needed by dblink_tok to access the data > ! */ > ! append_res_ptr(results); > > ! /* > ! * save pointer to results for the next function manager call > ! */ > ! fcinfo->flinfo->fn_extra = (void *) results; > > ! /* close the connection to the database and cleanup */ > ! PQfinish(conn); > > ! rsi = (ReturnSetInfo *) fcinfo->resultinfo; > ! rsi->isDone = ExprMultipleResult; > > ! PG_RETURN_INT32(res_id_index); > } > } > else > *************** > *** 165,173 **** > /* > * check for more results > */ > - > results = fcinfo->flinfo->fn_extra; > results->tup_num++; > ntuples = PQntuples(results->res); > > if (results->tup_num < ntuples) > --- 121,130 ---- > /* > * check for more results > */ > results = fcinfo->flinfo->fn_extra; > + > results->tup_num++; > + res_id_index = results->res_id_index; > ntuples = PQntuples(results->res); > > if (results->tup_num < ntuples) > *************** > *** 179,196 **** > rsi = (ReturnSetInfo *) fcinfo->resultinfo; > rsi->isDone = ExprMultipleResult; > > ! PG_RETURN_POINTER(results); > ! > } > else > { > /* > * or if no more, clean things up > */ > - > results = fcinfo->flinfo->fn_extra; > > PQclear(results->res); > > rsi = (ReturnSetInfo *) fcinfo->resultinfo; > rsi->isDone = ExprEndResult; > --- 136,154 ---- > rsi = (ReturnSetInfo *) fcinfo->resultinfo; > rsi->isDone = ExprMultipleResult; > > ! PG_RETURN_INT32(res_id_index); > } > else > { > /* > * or if no more, clean things up > */ > results = fcinfo->flinfo->fn_extra; > > + remove_res_ptr(results); > PQclear(results->res); > + pfree(results); > + fcinfo->flinfo->fn_extra = NULL; > > rsi = (ReturnSetInfo *) fcinfo->resultinfo; > rsi->isDone = ExprEndResult; > *************** > *** 214,249 **** > dblink_tok(PG_FUNCTION_ARGS) > { > dblink_results *results; > ! int fldnum; > ! text *result_text; > ! char *result; > ! int nfields = 0; > ! int text_len = 0; > ! > ! if (PG_ARGISNULL(0) || PG_ARGISNULL(1)) > ! elog(ERROR, "dblink: NULL arguments are not permitted"); > > ! results = (dblink_results *) PG_GETARG_POINTER(0); > if (results == NULL) > ! elog(ERROR, "dblink: function called with invalid result pointer"); > > fldnum = PG_GETARG_INT32(1); > if (fldnum < 0) > ! elog(ERROR, "dblink: field number < 0 not permitted"); > > nfields = PQnfields(results->res); > if (fldnum > (nfields - 1)) > ! elog(ERROR, "dblink: field number %d does not exist", fldnum); > > if (PQgetisnull(results->res, results->tup_num, fldnum) == 1) > - { > - > PG_RETURN_NULL(); > - > - } > else > { > - > text_len = PQgetlength(results->res, results->tup_num, fldnum); > > result = (char *) palloc(text_len + 1); > --- 172,208 ---- > dblink_tok(PG_FUNCTION_ARGS) > { > dblink_results *results; > ! int fldnum; > ! text *result_text; > ! char *result; > ! int nfields = 0; > ! int text_len = 0; > > ! results = get_res_ptr(PG_GETARG_INT32(0)); > if (results == NULL) > ! { > ! if (res_id != NIL) > ! { > ! freeList(res_id); > ! res_id = NIL; > ! res_id_index = 0; > ! } > ! > ! elog(ERROR, "dblink_tok: function called with invalid resource id"); > ! } > > fldnum = PG_GETARG_INT32(1); > if (fldnum < 0) > ! elog(ERROR, "dblink_tok: field number < 0 not permitted"); > > nfields = PQnfields(results->res); > if (fldnum > (nfields - 1)) > ! elog(ERROR, "dblink_tok: field number %d does not exist", fldnum); > > if (PQgetisnull(results->res, results->tup_num, fldnum) == 1) > PG_RETURN_NULL(); > else > { > text_len = PQgetlength(results->res, results->tup_num, fldnum); > > result = (char *) palloc(text_len + 1); > *************** > *** 259,270 **** > --- 218,838 ---- > result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result))); > > PG_RETURN_TEXT_P(result_text); > + } > + } > + > + > + /* > + * dblink_strtok > + * parse input string > + * return ord item (0 based) > + * based on provided field separator > + */ > + PG_FUNCTION_INFO_V1(dblink_strtok); > + Datum > + dblink_strtok(PG_FUNCTION_ARGS) > + { > + char *fldtext; > + char *fldsep; > + int fldnum; > + char *buffer; > + text *result_text; > + > + fldtext = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0)))); > + fldsep = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1)))); > + fldnum = PG_GETARG_INT32(2); > + > + if (fldtext[0] == '\0') > + { > + elog(ERROR, "get_strtok: blank list not permitted"); > + } > + if (fldsep[0] == '\0') > + { > + elog(ERROR, "get_strtok: blank field separator not permitted"); > + } > > + buffer = get_strtok(fldtext, fldsep, fldnum); > + > + pfree(fldtext); > + pfree(fldsep); > + > + if (buffer == NULL) > + { > + PG_RETURN_NULL(); > + } > + else > + { > + result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(buffer))); > + pfree(buffer); > + > + PG_RETURN_TEXT_P(result_text); > } > } > > > /* > + * dblink_get_pkey > + * > + * Return comma delimited list of primary key > + * fields for the supplied relation, > + * or NULL if none exists. > + */ > + PG_FUNCTION_INFO_V1(dblink_get_pkey); > + Datum > + dblink_get_pkey(PG_FUNCTION_ARGS) > + { > + char *relname; > + Oid relid; > + char **result; > + text *result_text; > + int16 numatts; > + ReturnSetInfo *rsi; > + dblink_array_results *ret_set; > + > + if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo)) > + elog(ERROR, "dblink: function called in context that does not accept a set result"); > + > + if (fcinfo->flinfo->fn_extra == NULL) > + { > + relname = NameStr(*PG_GETARG_NAME(0)); > + > + /* > + * Convert relname to rel OID. > + */ > + relid = get_relid_from_relname(relname); > + if (!OidIsValid(relid)) > + elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist", > + relname); > + > + /* > + * get an array of attnums. > + */ > + result = get_pkey_attnames(relid, &numatts); > + > + if ((result != NULL) && (numatts > 0)) > + { > + ret_set = init_dblink_array_results(fcinfo->flinfo->fn_mcxt); > + > + ret_set->elem_num = 0; > + ret_set->num_elems = numatts; > + ret_set->res = result; > + > + fcinfo->flinfo->fn_extra = (void *) ret_set; > + > + rsi = (ReturnSetInfo *) fcinfo->resultinfo; > + rsi->isDone = ExprMultipleResult; > + > + result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num]))); > + > + PG_RETURN_TEXT_P(result_text); > + } > + else > + { > + rsi = (ReturnSetInfo *) fcinfo->resultinfo; > + rsi->isDone = ExprEndResult; > + > + PG_RETURN_NULL(); > + } > + } > + else > + { > + /* > + * check for more results > + */ > + ret_set = fcinfo->flinfo->fn_extra; > + ret_set->elem_num++; > + result = ret_set->res; > + > + if (ret_set->elem_num < ret_set->num_elems) > + { > + /* > + * fetch next one > + */ > + rsi = (ReturnSetInfo *) fcinfo->resultinfo; > + rsi->isDone = ExprMultipleResult; > + > + result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num]))); > + PG_RETURN_TEXT_P(result_text); > + } > + else > + { > + int i; > + > + /* > + * or if no more, clean things up > + */ > + for (i = 0; i < ret_set->num_elems; i++) > + pfree(result[i]); > + > + pfree(ret_set->res); > + pfree(ret_set); > + > + rsi = (ReturnSetInfo *) fcinfo->resultinfo; > + rsi->isDone = ExprEndResult; > + > + PG_RETURN_NULL(); > + } > + } > + PG_RETURN_NULL(); > + } > + > + > + /* > + * dblink_last_oid > + * return last inserted oid > + */ > + PG_FUNCTION_INFO_V1(dblink_last_oid); > + Datum > + dblink_last_oid(PG_FUNCTION_ARGS) > + { > + dblink_results *results; > + > + results = get_res_ptr(PG_GETARG_INT32(0)); > + if (results == NULL) > + { > + if (res_id != NIL) > + { > + freeList(res_id); > + res_id = NIL; > + res_id_index = 0; > + } > + > + elog(ERROR, "dblink_tok: function called with invalid resource id"); > + } > + > + PG_RETURN_OID(PQoidValue(results->res)); > + } > + > + > + /* > + * dblink_build_sql_insert > + * > + * Used to generate an SQL insert statement > + * based on an existing tuple in a local relation. > + * This is useful for selectively replicating data > + * to another server via dblink. > + * > + * API: > + * <relname> - name of local table of interest > + * <pkattnums> - an int2vector of attnums which will be used > + * to identify the local tuple of interest > + * <pknumatts> - number of attnums in pkattnums > + * <src_pkattvals_arry> - text array of key values which will be used > + * to identify the local tuple of interest > + * <tgt_pkattvals_arry> - text array of key values which will be used > + * to build the string for execution remotely. These are substituted > + * for their counterparts in src_pkattvals_arry > + */ > + PG_FUNCTION_INFO_V1(dblink_build_sql_insert); > + Datum > + dblink_build_sql_insert(PG_FUNCTION_ARGS) > + { > + Oid relid; > + char *relname; > + int16 *pkattnums; > + int16 pknumatts; > + char **src_pkattvals; > + char **tgt_pkattvals; > + ArrayType *src_pkattvals_arry; > + ArrayType *tgt_pkattvals_arry; > + int src_ndim; > + int *src_dim; > + int src_nitems; > + int tgt_ndim; > + int *tgt_dim; > + int tgt_nitems; > + int i; > + char *ptr; > + char *sql; > + text *sql_text; > + > + relname = NameStr(*PG_GETARG_NAME(0)); > + > + /* > + * Convert relname to rel OID. > + */ > + relid = get_relid_from_relname(relname); > + if (!OidIsValid(relid)) > + elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist", > + relname); > + > + pkattnums = (int16 *) PG_GETARG_POINTER(1); > + pknumatts = PG_GETARG_INT16(2); > + /* > + * There should be at least one key attribute > + */ > + if (pknumatts == 0) > + elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0."); > + > + src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3); > + tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4); > + > + /* > + * Source array is made up of key values that will be used to > + * locate the tuple of interest from the local system. > + */ > + src_ndim = ARR_NDIM(src_pkattvals_arry); > + src_dim = ARR_DIMS(src_pkattvals_arry); > + src_nitems = ArrayGetNItems(src_ndim, src_dim); > + > + /* > + * There should be one source array key value for each key attnum > + */ > + if (src_nitems != pknumatts) > + elog(ERROR, "dblink_build_sql_insert: source key array length does not match number of key attributes."); > + > + /* > + * get array of pointers to c-strings from the input source array > + */ > + src_pkattvals = (char **) palloc(src_nitems * sizeof(char *)); > + ptr = ARR_DATA_PTR(src_pkattvals_arry); > + for (i = 0; i < src_nitems; i++) > + { > + src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); > + ptr += INTALIGN(*(int32 *) ptr); > + } > + > + /* > + * Target array is made up of key values that will be used to > + * build the SQL string for use on the remote system. > + */ > + tgt_ndim = ARR_NDIM(tgt_pkattvals_arry); > + tgt_dim = ARR_DIMS(tgt_pkattvals_arry); > + tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim); > + > + /* > + * There should be one target array key value for each key attnum > + */ > + if (tgt_nitems != pknumatts) > + elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes."); > + > + /* > + * get array of pointers to c-strings from the input target array > + */ > + tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *)); > + ptr = ARR_DATA_PTR(tgt_pkattvals_arry); > + for (i = 0; i < tgt_nitems; i++) > + { > + tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); > + ptr += INTALIGN(*(int32 *) ptr); > + } > + > + /* > + * Prep work is finally done. Go get the SQL string. > + */ > + sql = get_sql_insert(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals); > + > + /* > + * Make it into TEXT for return to the client > + */ > + sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql))); > + > + /* > + * And send it > + */ > + PG_RETURN_TEXT_P(sql_text); > + } > + > + > + /* > + * dblink_build_sql_delete > + * > + * Used to generate an SQL delete statement. > + * This is useful for selectively replicating a > + * delete to another server via dblink. > + * > + * API: > + * <relname> - name of remote table of interest > + * <pkattnums> - an int2vector of attnums which will be used > + * to identify the remote tuple of interest > + * <pknumatts> - number of attnums in pkattnums > + * <tgt_pkattvals_arry> - text array of key values which will be used > + * to build the string for execution remotely. > + */ > + PG_FUNCTION_INFO_V1(dblink_build_sql_delete); > + Datum > + dblink_build_sql_delete(PG_FUNCTION_ARGS) > + { > + Oid relid; > + char *relname; > + int16 *pkattnums; > + int16 pknumatts; > + char **tgt_pkattvals; > + ArrayType *tgt_pkattvals_arry; > + int tgt_ndim; > + int *tgt_dim; > + int tgt_nitems; > + int i; > + char *ptr; > + char *sql; > + text *sql_text; > + > + relname = NameStr(*PG_GETARG_NAME(0)); > + > + /* > + * Convert relname to rel OID. > + */ > + relid = get_relid_from_relname(relname); > + if (!OidIsValid(relid)) > + elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist", > + relname); > + > + pkattnums = (int16 *) PG_GETARG_POINTER(1); > + pknumatts = PG_GETARG_INT16(2); > + /* > + * There should be at least one key attribute > + */ > + if (pknumatts == 0) > + elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0."); > + > + tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3); > + > + /* > + * Target array is made up of key values that will be used to > + * build the SQL string for use on the remote system. > + */ > + tgt_ndim = ARR_NDIM(tgt_pkattvals_arry); > + tgt_dim = ARR_DIMS(tgt_pkattvals_arry); > + tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim); > + > + /* > + * There should be one target array key value for each key attnum > + */ > + if (tgt_nitems != pknumatts) > + elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes."); > + > + /* > + * get array of pointers to c-strings from the input target array > + */ > + tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *)); > + ptr = ARR_DATA_PTR(tgt_pkattvals_arry); > + for (i = 0; i < tgt_nitems; i++) > + { > + tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); > + ptr += INTALIGN(*(int32 *) ptr); > + } > + > + /* > + * Prep work is finally done. Go get the SQL string. > + */ > + sql = get_sql_delete(relid, pkattnums, pknumatts, tgt_pkattvals); > + > + /* > + * Make it into TEXT for return to the client > + */ > + sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql))); > + > + /* > + * And send it > + */ > + PG_RETURN_TEXT_P(sql_text); > + } > + > + > + /* > + * dblink_build_sql_update > + * > + * Used to generate an SQL update statement > + * based on an existing tuple in a local relation. > + * This is useful for selectively replicating data > + * to another server via dblink. > + * > + * API: > + * <relname> - name of local table of interest > + * <pkattnums> - an int2vector of attnums which will be used > + * to identify the local tuple of interest > + * <pknumatts> - number of attnums in pkattnums > + * <src_pkattvals_arry> - text array of key values which will be used > + * to identify the local tuple of interest > + * <tgt_pkattvals_arry> - text array of key values which will be used > + * to build the string for execution remotely. These are substituted > + * for their counterparts in src_pkattvals_arry > + */ > + PG_FUNCTION_INFO_V1(dblink_build_sql_update); > + Datum > + dblink_build_sql_update(PG_FUNCTION_ARGS) > + { > + Oid relid; > + char *relname; > + int16 *pkattnums; > + int16 pknumatts; > + char **src_pkattvals; > + char **tgt_pkattvals; > + ArrayType *src_pkattvals_arry; > + ArrayType *tgt_pkattvals_arry; > + int src_ndim; > + int *src_dim; > + int src_nitems; > + int tgt_ndim; > + int *tgt_dim; > + int tgt_nitems; > + int i; > + char *ptr; > + char *sql; > + text *sql_text; > + > + relname = NameStr(*PG_GETARG_NAME(0)); > + > + /* > + * Convert relname to rel OID. > + */ > + relid = get_relid_from_relname(relname); > + if (!OidIsValid(relid)) > + elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist", > + relname); > + > + pkattnums = (int16 *) PG_GETARG_POINTER(1); > + pknumatts = PG_GETARG_INT16(2); > + /* > + * There should be one source array key values for each key attnum > + */ > + if (pknumatts == 0) > + elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0."); > + > + src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3); > + tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4); > + > + /* > + * Source array is made up of key values that will be used to > + * locate the tuple of interest from the local system. > + */ > + src_ndim = ARR_NDIM(src_pkattvals_arry); > + src_dim = ARR_DIMS(src_pkattvals_arry); > + src_nitems = ArrayGetNItems(src_ndim, src_dim); > + > + /* > + * There should be one source array key value for each key attnum > + */ > + if (src_nitems != pknumatts) > + elog(ERROR, "dblink_build_sql_insert: source key array length does not match number of key attributes."); > + > + /* > + * get array of pointers to c-strings from the input source array > + */ > + src_pkattvals = (char **) palloc(src_nitems * sizeof(char *)); > + ptr = ARR_DATA_PTR(src_pkattvals_arry); > + for (i = 0; i < src_nitems; i++) > + { > + src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); > + ptr += INTALIGN(*(int32 *) ptr); > + } > + > + /* > + * Target array is made up of key values that will be used to > + * build the SQL string for use on the remote system. > + */ > + tgt_ndim = ARR_NDIM(tgt_pkattvals_arry); > + tgt_dim = ARR_DIMS(tgt_pkattvals_arry); > + tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim); > + > + /* > + * There should be one target array key value for each key attnum > + */ > + if (tgt_nitems != pknumatts) > + elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes."); > + > + /* > + * get array of pointers to c-strings from the input target array > + */ > + tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *)); > + ptr = ARR_DATA_PTR(tgt_pkattvals_arry); > + for (i = 0; i < tgt_nitems; i++) > + { > + tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); > + ptr += INTALIGN(*(int32 *) ptr); > + } > + > + /* > + * Prep work is finally done. Go get the SQL string. > + */ > + sql = get_sql_update(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals); > + > + /* > + * Make it into TEXT for return to the client > + */ > + sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql))); > + > + /* > + * And send it > + */ > + PG_RETURN_TEXT_P(sql_text); > + } > + > + > + /* > + * dblink_current_query > + * return the current query string > + * to allow its use in (among other things) > + * rewrite rules > + */ > + PG_FUNCTION_INFO_V1(dblink_current_query); > + Datum > + dblink_current_query(PG_FUNCTION_ARGS) > + { > + text *result_text; > + > + result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(debug_query_string))); > + PG_RETURN_TEXT_P(result_text); > + } > + > + > + /* > + * dblink_replace_text > + * replace all occurences of 'old_sub_str' in 'orig_str' > + * with 'new_sub_str' to form 'new_str' > + * > + * returns 'orig_str' if 'old_sub_str' == '' or 'orig_str' == '' > + * otherwise returns 'new_str' > + */ > + PG_FUNCTION_INFO_V1(dblink_replace_text); > + Datum > + dblink_replace_text(PG_FUNCTION_ARGS) > + { > + text *left_text; > + text *right_text; > + text *buf_text; > + text *ret_text; > + char *ret_str; > + int curr_posn; > + text *src_text = PG_GETARG_TEXT_P(0); > + int src_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(src_text))); > + text *from_sub_text = PG_GETARG_TEXT_P(1); > + int from_sub_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(from_sub_text))); > + text *to_sub_text = PG_GETARG_TEXT_P(2); > + char *to_sub_str = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(to_sub_text))); > + StringInfo str = makeStringInfo(); > + > + if (src_text_len == 0 || from_sub_text_len == 0) > + PG_RETURN_TEXT_P(src_text); > + > + buf_text = DatumGetTextPCopy(PointerGetDatum(src_text)); > + curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))); > + > + while (curr_posn > 0) > + { > + left_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text), 1, DatumGetInt32(DirectFunctionCall2(textpos,PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) - 1)); > + right_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text), DatumGetInt32(DirectFunctionCall2(textpos,PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) + from_sub_text_len,-1)); > + > + appendStringInfo(str, DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(left_text)))); > + appendStringInfo(str, to_sub_str); > + > + pfree(buf_text); > + pfree(left_text); > + buf_text = right_text; > + curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))); > + } > + > + appendStringInfo(str, DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(buf_text)))); > + pfree(buf_text); > + > + ret_str = pstrdup(str->data); > + ret_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(ret_str))); > + > + PG_RETURN_TEXT_P(ret_text); > + } > + > + > + /************************************************************* > * internal functions > */ > > *************** > *** 285,293 **** > --- 853,1408 ---- > MemSet(retval, 0, sizeof(dblink_results)); > > retval->tup_num = -1; > + retval->res_id_index =-1; > retval->res = NULL; > > MemoryContextSwitchTo(oldcontext); > > return retval; > } > + > + > + /* > + * init_dblink_array_results > + * - create an empty dblink_array_results data structure > + */ > + dblink_array_results * > + init_dblink_array_results(MemoryContext fn_mcxt) > + { > + MemoryContext oldcontext; > + dblink_array_results *retval; > + > + oldcontext = MemoryContextSwitchTo(fn_mcxt); > + > + retval = (dblink_array_results *) palloc(sizeof(dblink_array_results)); > + MemSet(retval, 0, sizeof(dblink_array_results)); > + > + retval->elem_num = -1; > + retval->num_elems = 0; > + retval->res = NULL; > + > + MemoryContextSwitchTo(oldcontext); > + > + return retval; > + } > + > + /* > + * get_pkey_attnames > + * > + * Get the primary key attnames for the given relation. > + * Return NULL, and set numatts = 0, if no primary key exists. > + */ > + char ** > + get_pkey_attnames(Oid relid, int16 *numatts) > + { > + Relation indexRelation; > + ScanKeyData entry; > + HeapScanDesc scan; > + HeapTuple indexTuple; > + int i; > + char **result = NULL; > + Relation rel; > + TupleDesc tupdesc; > + > + /* > + * Open relation using relid, get tupdesc > + */ > + rel = relation_open(relid, AccessShareLock); > + tupdesc = rel->rd_att; > + > + /* > + * Initialize numatts to 0 in case no primary key > + * exists > + */ > + *numatts = 0; > + > + /* > + * Use relid to get all related indexes > + */ > + indexRelation = heap_openr(IndexRelationName, AccessShareLock); > + ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indrelid, > + F_OIDEQ, ObjectIdGetDatum(relid)); > + scan = heap_beginscan(indexRelation, false, SnapshotNow, > + 1, &entry); > + > + while (HeapTupleIsValid(indexTuple = heap_getnext(scan, 0))) > + { > + Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple); > + > + /* > + * We're only interested if it is the primary key > + */ > + if (index->indisprimary == TRUE) > + { > + i = 0; > + while (index->indkey[i++] != 0) > + (*numatts)++; > + > + if (*numatts > 0) > + { > + result = (char **) palloc(*numatts * sizeof(char *)); > + for (i = 0; i < *numatts; i++) > + result[i] = SPI_fname(tupdesc, index->indkey[i]); > + } > + break; > + } > + } > + heap_endscan(scan); > + heap_close(indexRelation, AccessShareLock); > + relation_close(rel, AccessShareLock); > + > + return result; > + } > + > + > + /* > + * get_strtok > + * > + * parse input string > + * return ord item (0 based) > + * based on provided field separator > + */ > + char * > + get_strtok(char *fldtext, char *fldsep, int fldnum) > + { > + int j = 0; > + char *result; > + > + if (fldnum < 0) > + { > + elog(ERROR, "get_strtok: field number < 0 not permitted"); > + } > + > + if (fldsep[0] == '\0') > + { > + elog(ERROR, "get_strtok: blank field separator not permitted"); > + } > + > + result = strtok(fldtext, fldsep); > + for (j = 1; j < fldnum + 1; j++) > + { > + result = strtok(NULL, fldsep); > + if (result == NULL) > + return NULL; > + } > + > + return pstrdup(result); > + } > + > + char * > + get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals) > + { > + Relation rel; > + char *relname; > + HeapTuple tuple; > + TupleDesc tupdesc; > + int natts; > + StringInfo str = makeStringInfo(); > + char *sql = NULL; > + char *val = NULL; > + int16 key; > + unsigned int i; > + > + /* > + * Open relation using relid > + */ > + rel = relation_open(relid, AccessShareLock); > + relname = RelationGetRelationName(rel); > + tupdesc = rel->rd_att; > + natts = tupdesc->natts; > + > + tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals); > + > + appendStringInfo(str, "INSERT INTO %s(", quote_ident_cstr(relname)); > + for (i = 0; i < natts; i++) > + { > + if (i > 0) > + appendStringInfo(str, ","); > + > + appendStringInfo(str, NameStr(tupdesc->attrs[i]->attname)); > + } > + > + appendStringInfo(str, ") VALUES("); > + > + /* > + * remember attvals are 1 based > + */ > + for (i = 0; i < natts; i++) > + { > + if (i > 0) > + appendStringInfo(str, ","); > + > + if (tgt_pkattvals != NULL) > + key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1); > + else > + key = -1; > + > + if (key > -1) > + val = pstrdup(tgt_pkattvals[key]); > + else > + val = SPI_getvalue(tuple, tupdesc, i + 1); > + > + if (val != NULL) > + { > + appendStringInfo(str, quote_literal_cstr(val)); > + pfree(val); > + } > + else > + appendStringInfo(str, "NULL"); > + } > + appendStringInfo(str, ")"); > + > + sql = pstrdup(str->data); > + pfree(str->data); > + pfree(str); > + relation_close(rel, AccessShareLock); > + > + return (sql); > + } > + > + char * > + get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals) > + { > + Relation rel; > + char *relname; > + TupleDesc tupdesc; > + int natts; > + StringInfo str = makeStringInfo(); > + char *sql = NULL; > + char *val = NULL; > + unsigned int i; > + > + /* > + * Open relation using relid > + */ > + rel = relation_open(relid, AccessShareLock); > + relname = RelationGetRelationName(rel); > + tupdesc = rel->rd_att; > + natts = tupdesc->natts; > + > + appendStringInfo(str, "DELETE FROM %s WHERE ", quote_ident_cstr(relname)); > + for (i = 0; i < pknumatts; i++) > + { > + int16 pkattnum = pkattnums[i]; > + > + if (i > 0) > + appendStringInfo(str, " AND "); > + > + appendStringInfo(str, NameStr(tupdesc->attrs[pkattnum - 1]->attname)); > + > + if (tgt_pkattvals != NULL) > + val = pstrdup(tgt_pkattvals[i]); > + else > + elog(ERROR, "Target key array must not be NULL"); > + > + if (val != NULL) > + { > + appendStringInfo(str, "="); > + appendStringInfo(str, quote_literal_cstr(val)); > + pfree(val); > + } > + else > + appendStringInfo(str, "IS NULL"); > + } > + > + sql = pstrdup(str->data); > + pfree(str->data); > + pfree(str); > + relation_close(rel, AccessShareLock); > + > + return (sql); > + } > + > + char * > + get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals) > + { > + Relation rel; > + char *relname; > + HeapTuple tuple; > + TupleDesc tupdesc; > + int natts; > + StringInfo str = makeStringInfo(); > + char *sql = NULL; > + char *val = NULL; > + int16 key; > + int i; > + > + /* > + * Open relation using relid > + */ > + rel = relation_open(relid, AccessShareLock); > + relname = RelationGetRelationName(rel); > + tupdesc = rel->rd_att; > + natts = tupdesc->natts; > + > + tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals); > + > + appendStringInfo(str, "UPDATE %s SET ", quote_ident_cstr(relname)); > + > + for (i = 0; i < natts; i++) > + { > + if (i > 0) > + appendStringInfo(str, ","); > + > + appendStringInfo(str, NameStr(tupdesc->attrs[i]->attname)); > + appendStringInfo(str, "="); > + > + if (tgt_pkattvals != NULL) > + key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1); > + else > + key = -1; > + > + if (key > -1) > + val = pstrdup(tgt_pkattvals[key]); > + else > + val = SPI_getvalue(tuple, tupdesc, i + 1); > + > + if (val != NULL) > + { > + appendStringInfo(str, quote_literal_cstr(val)); > + pfree(val); > + } > + else > + appendStringInfo(str, "NULL"); > + } > + > + appendStringInfo(str, " WHERE "); > + > + for (i = 0; i < pknumatts; i++) > + { > + int16 pkattnum = pkattnums[i]; > + > + if (i > 0) > + appendStringInfo(str, " AND "); > + > + appendStringInfo(str, NameStr(tupdesc->attrs[pkattnum - 1]->attname)); > + > + if (tgt_pkattvals != NULL) > + val = pstrdup(tgt_pkattvals[i]); > + else > + val = SPI_getvalue(tuple, tupdesc, pkattnum); > + > + if (val != NULL) > + { > + appendStringInfo(str, "="); > + appendStringInfo(str, quote_literal_cstr(val)); > + pfree(val); > + } > + else > + appendStringInfo(str, "IS NULL"); > + } > + > + sql = pstrdup(str->data); > + pfree(str->data); > + pfree(str); > + relation_close(rel, AccessShareLock); > + > + return (sql); > + } > + > + /* > + * Return a properly quoted literal value. > + * Uses quote_literal in quote.c > + */ > + static char * > + quote_literal_cstr(char *rawstr) > + { > + text *rawstr_text; > + text *result_text; > + char *result; > + > + rawstr_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(rawstr))); > + result_text = DatumGetTextP(DirectFunctionCall1(quote_literal, PointerGetDatum(rawstr_text))); > + result = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(result_text))); > + > + return result; > + } > + > + /* > + * Return a properly quoted identifier. > + * Uses quote_ident in quote.c > + */ > + static char * > + quote_ident_cstr(char *rawstr) > + { > + text *rawstr_text; > + text *result_text; > + char *result; > + > + rawstr_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(rawstr))); > + result_text = DatumGetTextP(DirectFunctionCall1(quote_ident, PointerGetDatum(rawstr_text))); > + result = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(result_text))); > + > + return result; > + } > + > + int16 > + get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key) > + { > + int i; > + > + /* > + * Not likely a long list anyway, so just scan for > + * the value > + */ > + for (i = 0; i < pknumatts; i++) > + if (key == pkattnums[i]) > + return i; > + > + return -1; > + } > + > + HeapTuple > + get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals) > + { > + Relation rel; > + char *relname; > + TupleDesc tupdesc; > + StringInfo str = makeStringInfo(); > + char *sql = NULL; > + int ret; > + HeapTuple tuple; > + int i; > + char *val = NULL; > + > + /* > + * Open relation using relid > + */ > + rel = relation_open(relid, AccessShareLock); > + relname = RelationGetRelationName(rel); > + tupdesc = rel->rd_att; > + > + /* > + * Connect to SPI manager > + */ > + if ((ret = SPI_connect()) < 0) > + elog(ERROR, "get_tuple_of_interest: SPI_connect returned %d", ret); > + > + /* > + * Build sql statement to look up tuple of interest > + * Use src_pkattvals as the criteria. > + */ > + appendStringInfo(str, "SELECT * from %s WHERE ", relname); > + > + for (i = 0; i < pknumatts; i++) > + { > + int16 pkattnum = pkattnums[i]; > + > + if (i > 0) > + appendStringInfo(str, " AND "); > + > + appendStringInfo(str, NameStr(tupdesc->attrs[pkattnum - 1]->attname)); > + > + val = pstrdup(src_pkattvals[i]); > + if (val != NULL) > + { > + appendStringInfo(str, "="); > + appendStringInfo(str, quote_literal_cstr(val)); > + pfree(val); > + } > + else > + appendStringInfo(str, "IS NULL"); > + } > + > + sql = pstrdup(str->data); > + pfree(str->data); > + pfree(str); > + /* > + * Retrieve the desired tuple > + */ > + ret = SPI_exec(sql, 0); > + pfree(sql); > + > + /* > + * Only allow one qualifying tuple > + */ > + if ((ret == SPI_OK_SELECT) && (SPI_processed > 1)) > + { > + elog(ERROR, "get_tuple_of_interest: Source criteria may not match more than one record."); > + } > + else if (ret == SPI_OK_SELECT && SPI_processed == 1) > + { > + SPITupleTable *tuptable = SPI_tuptable; > + tuple = SPI_copytuple(tuptable->vals[0]); > + > + return tuple; > + } > + else > + { > + /* > + * no qualifying tuples > + */ > + return NULL; > + } > + > + /* > + * never reached, but keep compiler quiet > + */ > + return NULL; > + } > + > + Oid > + get_relid_from_relname(char *relname) > + { > + #ifdef NamespaceRelationName > + Oid relid; > + > + relid = RelnameGetRelid(relname); > + #else > + Relation rel; > + Oid relid; > + > + rel = relation_openr(relname, AccessShareLock); > + relid = RelationGetRelid(rel); > + relation_close(rel, AccessShareLock); > + #endif /* NamespaceRelationName */ > + > + return relid; > + } > + > + dblink_results * > + get_res_ptr(int32 res_id_index) > + { > + List *ptr; > + > + /* > + * short circuit empty list > + */ > + if(res_id == NIL) > + return NULL; > + > + /* > + * OK, should be good to go > + */ > + foreach(ptr, res_id) > + { > + dblink_results *this_res_id = (dblink_results *) lfirst(ptr); > + if (this_res_id->res_id_index == res_id_index) > + return this_res_id; > + } > + return NULL; > + } > + > + /* > + * Add node to global List res_id > + */ > + void > + append_res_ptr(dblink_results *results) > + { > + res_id = lappend(res_id, results); > + } > + > + /* > + * Remove node from global List > + * using res_id_index > + */ > + void > + remove_res_ptr(dblink_results *results) > + { > + res_id = lremove(results, res_id); > + > + if (res_id == NIL) > + res_id_index = 0; > + } > + > + > diff -cNr dblink.orig/dblink.h dblink/dblink.h > *** dblink.orig/dblink.h Mon Nov 5 09:46:22 2001 > --- dblink/dblink.h Sun Apr 14 18:54:39 2002 > *************** > *** 3,9 **** > * > * Functions returning results from a remote database > * > ! * Copyright (c) Joseph Conway <joe.conway@mail.com>, 2001; > * > * Permission to use, copy, modify, and distribute this software and its > * documentation for any purpose, without fee, and without a written agreement > --- 3,10 ---- > * > * Functions returning results from a remote database > * > ! * Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002, > ! * ALL RIGHTS RESERVED; > * > * Permission to use, copy, modify, and distribute this software and its > * documentation for any purpose, without fee, and without a written agreement > *************** > *** 33,42 **** > --- 34,64 ---- > #include "libpq-int.h" > #include "fmgr.h" > #include "access/tupdesc.h" > + #include "access/heapam.h" > + #include "catalog/catname.h" > + #include "catalog/pg_index.h" > + #include "catalog/pg_type.h" > #include "executor/executor.h" > + #include "executor/spi.h" > + #include "lib/stringinfo.h" > #include "nodes/nodes.h" > #include "nodes/execnodes.h" > + #include "nodes/pg_list.h" > + #include "parser/parse_type.h" > + #include "tcop/tcopprot.h" > #include "utils/builtins.h" > + #include "utils/fmgroids.h" > + #include "utils/array.h" > + #include "utils/syscache.h" > + > + #ifdef NamespaceRelationName > + #include "catalog/namespace.h" > + #endif /* NamespaceRelationName */ > + > + /* > + * Max SQL statement size > + */ > + #define DBLINK_MAX_SQLSTATE_SIZE 16384 > > /* > * This struct holds the results of the remote query. > *************** > *** 50,70 **** > int tup_num; > > /* > * the actual query results > */ > PGresult *res; > - > } dblink_results; > > /* > * External declarations > */ > extern Datum dblink(PG_FUNCTION_ARGS); > extern Datum dblink_tok(PG_FUNCTION_ARGS); > > /* > * Internal declarations > */ > dblink_results *init_dblink_results(MemoryContext fn_mcxt); > > #endif /* DBLINK_H */ > --- 72,145 ---- > int tup_num; > > /* > + * resource index number for this context > + */ > + int res_id_index; > + > + /* > * the actual query results > */ > PGresult *res; > } dblink_results; > > + > + /* > + * This struct holds results in the form of an array. > + * Use fn_extra to hold a pointer to it across calls > + */ > + typedef struct > + { > + /* > + * elem being accessed > + */ > + int elem_num; > + > + /* > + * number of elems > + */ > + int num_elems; > + > + /* > + * the actual array > + */ > + void *res; > + > + } dblink_array_results; > + > /* > * External declarations > */ > extern Datum dblink(PG_FUNCTION_ARGS); > extern Datum dblink_tok(PG_FUNCTION_ARGS); > + extern Datum dblink_strtok(PG_FUNCTION_ARGS); > + extern Datum dblink_get_pkey(PG_FUNCTION_ARGS); > + extern Datum dblink_last_oid(PG_FUNCTION_ARGS); > + extern Datum dblink_build_sql_insert(PG_FUNCTION_ARGS); > + extern Datum dblink_build_sql_delete(PG_FUNCTION_ARGS); > + extern Datum dblink_build_sql_update(PG_FUNCTION_ARGS); > + extern Datum dblink_current_query(PG_FUNCTION_ARGS); > + extern Datum dblink_replace_text(PG_FUNCTION_ARGS); > > /* > * Internal declarations > */ > dblink_results *init_dblink_results(MemoryContext fn_mcxt); > + dblink_array_results *init_dblink_array_results(MemoryContext fn_mcxt); > + char **get_pkey_attnames(Oid relid, int16 *numatts); > + char *get_strtok(char *fldtext, char *fldsep, int fldnum); > + char *getvalue(HeapTuple tuple, TupleDesc tupdesc, int fnumber); > + char *get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals); > + char *get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals); > + char *get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals); > + static char *quote_literal_cstr(char *rawstr); > + static char *quote_ident_cstr(char *rawstr); > + int16 get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key); > + HeapTuple get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals); > + Oid get_relid_from_relname(char *relname); > + dblink_results *get_res_ptr(int32 res_id_index); > + void append_res_ptr(dblink_results *results); > + void remove_res_ptr(dblink_results *results); > + > + extern char *debug_query_string; > > #endif /* DBLINK_H */ > diff -cNr dblink.orig/dblink.sql.in dblink/dblink.sql.in > *** dblink.orig/dblink.sql.in Thu Jun 14 09:49:03 2001 > --- dblink/dblink.sql.in Fri Apr 12 14:36:49 2002 > *************** > *** 1,5 **** > ! CREATE FUNCTION dblink (text,text) RETURNS setof int > ! AS 'MODULE_PATHNAME','dblink' LANGUAGE 'c'; > > ! CREATE FUNCTION dblink_tok (int,int) RETURNS text > ! AS 'MODULE_PATHNAME','dblink_tok' LANGUAGE 'c'; > --- 1,38 ---- > ! CREATE OR REPLACE FUNCTION dblink (text,text) RETURNS setof int > ! AS 'MODULE_PATHNAME','dblink' LANGUAGE 'c' > ! WITH (isstrict); > > ! CREATE OR REPLACE FUNCTION dblink_tok (int,int) RETURNS text > ! AS 'MODULE_PATHNAME','dblink_tok' LANGUAGE 'c' > ! WITH (isstrict); > ! > ! CREATE OR REPLACE FUNCTION dblink_strtok (text,text,int) RETURNS text > ! AS 'MODULE_PATHNAME','dblink_strtok' LANGUAGE 'c' > ! WITH (iscachable, isstrict); > ! > ! CREATE OR REPLACE FUNCTION dblink_get_pkey (name) RETURNS setof text > ! AS 'MODULE_PATHNAME','dblink_get_pkey' LANGUAGE 'c' > ! WITH (isstrict); > ! > ! CREATE OR REPLACE FUNCTION dblink_last_oid (int) RETURNS oid > ! AS 'MODULE_PATHNAME','dblink_last_oid' LANGUAGE 'c' > ! WITH (isstrict); > ! > ! CREATE OR REPLACE FUNCTION dblink_build_sql_insert (name, int2vector, int2, _text, _text) RETURNS text > ! AS 'MODULE_PATHNAME','dblink_build_sql_insert' LANGUAGE 'c' > ! WITH (isstrict); > ! > ! CREATE OR REPLACE FUNCTION dblink_build_sql_delete (name, int2vector, int2, _text) RETURNS text > ! AS 'MODULE_PATHNAME','dblink_build_sql_delete' LANGUAGE 'c' > ! WITH (isstrict); > ! > ! CREATE OR REPLACE FUNCTION dblink_build_sql_update (name, int2vector, int2, _text, _text) RETURNS text > ! AS 'MODULE_PATHNAME','dblink_build_sql_update' LANGUAGE 'c' > ! WITH (isstrict); > ! > ! CREATE OR REPLACE FUNCTION dblink_current_query () RETURNS text > ! AS 'MODULE_PATHNAME','dblink_current_query' LANGUAGE 'c'; > ! > ! CREATE OR REPLACE FUNCTION dblink_replace (text,text,text) RETURNS text > ! AS 'MODULE_PATHNAME','dblink_replace_text' LANGUAGE 'c' > ! WITH (iscachable, isstrict); > > ---------------------------(end of broadcast)--------------------------- > TIP 5: Have you checked our extensive FAQ? > > http://www.postgresql.org/users-lounge/docs/faq.html -- Bruce Momjian | http://candle.pha.pa.us pgman@candle.pha.pa.us | (610) 853-3000 + If your life is a hard drive, | 830 Blythe Avenue + Christ can be your backup. | Drexel Hill, Pennsylvania 19026
pgsql-patches by date: