contrib/dblink update - Mailing list pgsql-patches
From | Joe Conway |
---|---|
Subject | contrib/dblink update |
Date | |
Msg-id | 3CBA5374.1010400@joeconway.com Whole thread Raw |
Responses |
Re: contrib/dblink update
Re: contrib/dblink update Re: contrib/dblink update |
List | pgsql-patches |
Attached is an update to contrib/dblink. Please apply if there are no objections. Major changes: - removed cursor wrap around input sql to allow for remote execution of INSERT/UPDATE/DELETE - dblink now returns a resource id instead of a real pointer - added several utility functions I'm still hoping to add explicit cursor open/fetch/close support before 7.3 is released, but I need a bit more time on that. On a somewhat unrelated topic, I never got any feedback on the unknownin/out patch and the mb_substring patch. Is there anything else I need to do to get those applied? Thanks, Joe diff -cNr dblink.orig/README.dblink dblink/README.dblink *** dblink.orig/README.dblink Thu Dec 13 02:48:39 2001 --- dblink/README.dblink Sun Apr 14 20:02:06 2002 *************** *** 3,9 **** * * Functions returning results from a remote database * ! * Copyright (c) Joseph Conway <joe.conway@mail.com>, 2001; * * Permission to use, copy, modify, and distribute this software and its * documentation for any purpose, without fee, and without a written agreement --- 3,10 ---- * * Functions returning results from a remote database * ! * Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002, ! * ALL RIGHTS RESERVED; * * Permission to use, copy, modify, and distribute this software and its * documentation for any purpose, without fee, and without a written agreement *************** *** 25,36 **** */ ! Version 0.3 (14 June, 2001): ! Function to test returning data set from remote database ! Tested under Linux (Red Hat 6.2 and 7.0) and PostgreSQL 7.1 and 7.2devel Release Notes: Version 0.3 - fixed dblink invalid pointer causing corrupt elog message - fixed dblink_tok improper handling of null results --- 26,44 ---- */ ! Version 0.4 (7 April, 2002): ! Functions allowing remote database INSERT/UPDATE/DELETE/SELECT, and ! various utility functions. ! Tested under Linux (Red Hat 7.2) and PostgreSQL 7.2 and 7.3devel Release Notes: + Version 0.4 + - removed cursor wrap around input sql to allow for remote + execution of INSERT/UPDATE/DELETE + - dblink now returns a resource id instead of a real pointer + - added several utility functions -- see below + Version 0.3 - fixed dblink invalid pointer causing corrupt elog message - fixed dblink_tok improper handling of null results *************** *** 51,64 **** installs following functions into database template1: ! dblink() - returns a pointer to results from remote query ! dblink_tok() - extracts and returns individual field results Documentation ================================================================== Name ! dblink -- Returns a pointer to a data set from a remote database Synopsis --- 59,94 ---- installs following functions into database template1: ! dblink(text,text) RETURNS setof int ! - returns a resource id for results from remote query ! dblink_tok(int,int) RETURNS text ! - extracts and returns individual field results ! dblink_strtok(text,text,int) RETURNS text ! - extracts and returns individual token from delimited text ! dblink_get_pkey(name) RETURNS setof text ! - returns the field names of a relation's primary key fields ! dblink_last_oid(int) RETURNS oid ! - returns the last inserted oid ! dblink_build_sql_insert(name,int2vector,int2,_text,_text) RETURNS text ! - builds an insert statement using a local tuple, replacing the ! selection key field values with alternate supplied values ! dblink_build_sql_delete(name,int2vector,int2,_text) RETURNS text ! - builds a delete statement using supplied values for selection ! key field values ! dblink_build_sql_update(name,int2vector,int2,_text,_text) RETURNS text ! - builds an update statement using a local tuple, replacing the ! selection key field values with alternate supplied values ! dblink_current_query() RETURNS text ! - returns the current query string ! dblink_replace(text,text,text) RETURNS text ! - replace all occurences of substring-a in the input-string ! with substring-b Documentation ================================================================== Name ! dblink -- Returns a resource id for a data set from a remote database Synopsis *************** *** 78,84 **** Outputs ! Returns setof int (pointer) Example usage --- 108,114 ---- Outputs ! Returns setof int (res_id) Example usage *************** *** 94,106 **** Synopsis ! dblink_tok(int pointer, int fnumber) Inputs ! pointer ! a pointer returned by a call to dblink() fnumber --- 124,136 ---- Synopsis ! dblink_tok(int res_id, int fnumber) Inputs ! res_id ! a resource id returned by a call to dblink() fnumber *************** *** 131,136 **** --- 161,415 ---- select f1, f2 from myremotetable where f1 like 'bytea%'; ================================================================== + Name + + dblink_strtok -- Extracts and returns individual token from delimited text + + Synopsis + + dblink_strtok(text inputstring, text delimiter, int posn) RETURNS text + + Inputs + + inputstring + + any string you want to parse a token out of; + e.g. 'f=1&g=3&h=4' + + delimiter + + a single character to use as the delimiter; + e.g. '&' or '=' + + posn + + the position of the token of interest, 0 based; + e.g. 1 + + Outputs + + Returns text + + Example usage + + test=# select dblink_strtok(dblink_strtok('f=1&g=3&h=4','&',1),'=',1); + dblink_strtok + --------------- + 3 + (1 row) + + ================================================================== + Name + + dblink_get_pkey -- returns the field names of a relation's primary + key fields + + Synopsis + + dblink_get_pkey(name relname) RETURNS setof text + + Inputs + + relname + + any relation name; + e.g. 'foobar' + + Outputs + + Returns setof text -- one row for each primary key field, in order of + precedence + + Example usage + + test=# select dblink_get_pkey('foobar'); + dblink_get_pkey + ----------------- + f1 + f2 + f3 + f4 + f5 + (5 rows) + + + ================================================================== + Name + + dblink_last_oid -- Returns last inserted oid + + Synopsis + + dblink_last_oid(int res_id) RETURNS oid + + Inputs + + res_id + + any resource id returned by dblink function; + + Outputs + + Returns oid of last inserted tuple + + Example usage + + test=# select dblink_last_oid(dblink('hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd' + ,'insert into mytable (f1, f2) values (1,2)')); + + dblink_last_oid + ---------------- + 16553 + (1 row) + + + ================================================================== + Name + + dblink_build_sql_insert -- builds an insert statement using a local + tuple, replacing the selection key field + values with alternate supplied values + dblink_build_sql_delete -- builds a delete statement using supplied + values for selection key field values + dblink_build_sql_update -- builds an update statement using a local + tuple, replacing the selection key field + values with alternate supplied values + + + Synopsis + + dblink_build_sql_insert(name relname + ,int2vector primary_key_attnums + ,int2 num_primary_key_atts + ,_text src_pk_att_vals_array + ,_text tgt_pk_att_vals_array) RETURNS text + dblink_build_sql_delete(name relname + ,int2vector primary_key_attnums + ,int2 num_primary_key_atts + ,_text tgt_pk_att_vals_array) RETURNS text + dblink_build_sql_update(name relname + ,int2vector primary_key_attnums + ,int2 num_primary_key_atts + ,_text src_pk_att_vals_array + ,_text tgt_pk_att_vals_array) RETURNS text + + Inputs + + relname + + any relation name; + e.g. 'foobar' + + primary_key_attnums + + vector of primary key attnums (1 based, see pg_index.indkey); + e.g. '1 2' + + num_primary_key_atts + + number of primary key attnums in the vector; e.g. 2 + + src_pk_att_vals_array + + array of primary key values, used to look up the local matching + tuple, the values of which are then used to construct the SQL + statement + + tgt_pk_att_vals_array + + array of primary key values, used to replace the local tuple + values in the SQL statement + + Outputs + + Returns text -- requested SQL statement + + Example usage + + test=# select dblink_build_sql_insert('foo','1 2',2,'{"1", "a"}','{"1", "b''a"}'); + dblink_build_sql_insert + -------------------------------------------------- + INSERT INTO foo(f1,f2,f3) VALUES('1','b''a','1') + (1 row) + + test=# select dblink_build_sql_delete('MyFoo','1 2',2,'{"1", "b"}'); + dblink_build_sql_delete + --------------------------------------------- + DELETE FROM "MyFoo" WHERE f1='1' AND f2='b' + (1 row) + + test=# select dblink_build_sql_update('foo','1 2',2,'{"1", "a"}','{"1", "b"}'); + dblink_build_sql_update + ------------------------------------------------------------- + UPDATE foo SET f1='1',f2='b',f3='1' WHERE f1='1' AND f2='b' + (1 row) + + + ================================================================== + Name + + dblink_current_query -- returns the current query string + + Synopsis + + dblink_current_query () RETURNS text + + Inputs + + None + + Outputs + + Returns text -- a copy of the currently executing query + + Example usage + + test=# select dblink_current_query() from (select dblink('dbname=template1','select oid, proname from pg_proc where proname= ''byteacat''') as f1) as t1; + dblink_current_query + ----------------------------------------------------------------------------------------------------------------------------------------------------- + select dblink_current_query() from (select dblink('dbname=template1','select oid, proname from pg_proc where proname =''byteacat''') as f1) as t1; + (1 row) + + + ================================================================== + Name + + dblink_replace -- replace all occurences of substring-a in the + input-string with substring-b + + Synopsis + + dblink_replace(text input-string, text substring-a, text substring-b) RETURNS text + + Inputs + + input-string + + the starting string, before replacement of substring-a + + substring-a + + the substring to find and replace + + substring-b + + the substring to be substituted in place of substring-a + + Outputs + + Returns text -- a copy of the starting string, but with all occurences of + substring-a replaced with substring-b + + Example usage + + test=# select dblink_replace('12345678901234567890','56','hello'); + dblink_replace + ---------------------------- + 1234hello78901234hello7890 + (1 row) + + ================================================================== + -- Joe Conway diff -cNr dblink.orig/dblink.c dblink/dblink.c *** dblink.orig/dblink.c Wed Oct 24 22:49:19 2001 --- dblink/dblink.c Sun Apr 14 20:03:30 2002 *************** *** 3,9 **** * * Functions returning results from a remote database * ! * Copyright (c) Joseph Conway <joe.conway@mail.com>, 2001; * * Permission to use, copy, modify, and distribute this software and its * documentation for any purpose, without fee, and without a written agreement --- 3,10 ---- * * Functions returning results from a remote database * ! * Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002, ! * ALL RIGHTS RESERVED; * * Permission to use, copy, modify, and distribute this software and its * documentation for any purpose, without fee, and without a written agreement *************** *** 26,48 **** #include "dblink.h" PG_FUNCTION_INFO_V1(dblink); Datum dblink(PG_FUNCTION_ARGS) { ! PGconn *conn = NULL; ! PGresult *res = NULL; ! dblink_results *results; ! char *optstr; ! char *sqlstatement; ! char *curstr = "DECLARE mycursor CURSOR FOR "; ! char *execstatement; ! char *msg; ! int ntuples = 0; ! ReturnSetInfo *rsi; ! ! if (PG_ARGISNULL(0) || PG_ARGISNULL(1)) ! elog(ERROR, "dblink: NULL arguments are not permitted"); if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo)) elog(ERROR, "dblink: function called in context that does not accept a set result"); --- 27,49 ---- #include "dblink.h" + /* Global */ + List *res_id = NIL; + int res_id_index = 0; + PG_FUNCTION_INFO_V1(dblink); Datum dblink(PG_FUNCTION_ARGS) { ! PGconn *conn = NULL; ! PGresult *res = NULL; ! dblink_results *results; ! char *optstr; ! char *sqlstatement; ! char *execstatement; ! char *msg; ! int ntuples = 0; ! ReturnSetInfo *rsi; if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo)) elog(ERROR, "dblink: function called in context that does not accept a set result"); *************** *** 61,81 **** elog(ERROR, "dblink: connection error: %s", msg); } ! res = PQexec(conn, "BEGIN"); ! if (PQresultStatus(res) != PGRES_COMMAND_OK) ! { ! msg = pstrdup(PQerrorMessage(conn)); ! PQclear(res); ! PQfinish(conn); ! elog(ERROR, "dblink: begin error: %s", msg); ! } ! PQclear(res); ! ! execstatement = (char *) palloc(strlen(curstr) + strlen(sqlstatement) + 1); if (execstatement != NULL) { ! strcpy(execstatement, curstr); ! strcat(execstatement, sqlstatement); strcat(execstatement, "\0"); } else --- 62,71 ---- elog(ERROR, "dblink: connection error: %s", msg); } ! execstatement = (char *) palloc(strlen(sqlstatement) + 1); if (execstatement != NULL) { ! strcpy(execstatement, sqlstatement); strcat(execstatement, "\0"); } else *************** *** 94,163 **** /* * got results, start fetching them */ - PQclear(res); - - res = PQexec(conn, "FETCH ALL in mycursor"); - if (!res || PQresultStatus(res) != PGRES_TUPLES_OK) - { - msg = pstrdup(PQerrorMessage(conn)); - PQclear(res); - PQfinish(conn); - elog(ERROR, "dblink: sql error: %s", msg); - } - ntuples = PQntuples(res); ! if (ntuples > 0) ! { ! ! results = init_dblink_results(fcinfo->flinfo->fn_mcxt); ! results->tup_num = 0; ! results->res = res; ! res = NULL; ! ! fcinfo->flinfo->fn_extra = (void *) results; ! ! results = NULL; ! results = fcinfo->flinfo->fn_extra; ! ! /* close the cursor */ ! res = PQexec(conn, "CLOSE mycursor"); ! PQclear(res); ! ! /* commit the transaction */ ! res = PQexec(conn, "COMMIT"); ! PQclear(res); ! ! /* close the connection to the database and cleanup */ ! PQfinish(conn); ! ! rsi = (ReturnSetInfo *) fcinfo->resultinfo; ! rsi->isDone = ExprMultipleResult; ! ! PG_RETURN_POINTER(results); ! ! } ! else ! { ! PQclear(res); ! /* close the cursor */ ! res = PQexec(conn, "CLOSE mycursor"); ! PQclear(res); ! /* commit the transaction */ ! res = PQexec(conn, "COMMIT"); ! PQclear(res); ! /* close the connection to the database and cleanup */ ! PQfinish(conn); ! rsi = (ReturnSetInfo *) fcinfo->resultinfo; ! rsi->isDone = ExprEndResult; ! PG_RETURN_NULL(); ! } } } else --- 84,119 ---- /* * got results, start fetching them */ ntuples = PQntuples(res); ! /* ! * increment resource index ! */ ! res_id_index++; ! results = init_dblink_results(fcinfo->flinfo->fn_mcxt); ! results->tup_num = 0; ! results->res_id_index = res_id_index; ! results->res = res; ! /* ! * Append node to res_id to hold pointer to results. ! * Needed by dblink_tok to access the data ! */ ! append_res_ptr(results); ! /* ! * save pointer to results for the next function manager call ! */ ! fcinfo->flinfo->fn_extra = (void *) results; ! /* close the connection to the database and cleanup */ ! PQfinish(conn); ! rsi = (ReturnSetInfo *) fcinfo->resultinfo; ! rsi->isDone = ExprMultipleResult; ! PG_RETURN_INT32(res_id_index); } } else *************** *** 165,173 **** /* * check for more results */ - results = fcinfo->flinfo->fn_extra; results->tup_num++; ntuples = PQntuples(results->res); if (results->tup_num < ntuples) --- 121,130 ---- /* * check for more results */ results = fcinfo->flinfo->fn_extra; + results->tup_num++; + res_id_index = results->res_id_index; ntuples = PQntuples(results->res); if (results->tup_num < ntuples) *************** *** 179,196 **** rsi = (ReturnSetInfo *) fcinfo->resultinfo; rsi->isDone = ExprMultipleResult; ! PG_RETURN_POINTER(results); ! } else { /* * or if no more, clean things up */ - results = fcinfo->flinfo->fn_extra; PQclear(results->res); rsi = (ReturnSetInfo *) fcinfo->resultinfo; rsi->isDone = ExprEndResult; --- 136,154 ---- rsi = (ReturnSetInfo *) fcinfo->resultinfo; rsi->isDone = ExprMultipleResult; ! PG_RETURN_INT32(res_id_index); } else { /* * or if no more, clean things up */ results = fcinfo->flinfo->fn_extra; + remove_res_ptr(results); PQclear(results->res); + pfree(results); + fcinfo->flinfo->fn_extra = NULL; rsi = (ReturnSetInfo *) fcinfo->resultinfo; rsi->isDone = ExprEndResult; *************** *** 214,249 **** dblink_tok(PG_FUNCTION_ARGS) { dblink_results *results; ! int fldnum; ! text *result_text; ! char *result; ! int nfields = 0; ! int text_len = 0; ! ! if (PG_ARGISNULL(0) || PG_ARGISNULL(1)) ! elog(ERROR, "dblink: NULL arguments are not permitted"); ! results = (dblink_results *) PG_GETARG_POINTER(0); if (results == NULL) ! elog(ERROR, "dblink: function called with invalid result pointer"); fldnum = PG_GETARG_INT32(1); if (fldnum < 0) ! elog(ERROR, "dblink: field number < 0 not permitted"); nfields = PQnfields(results->res); if (fldnum > (nfields - 1)) ! elog(ERROR, "dblink: field number %d does not exist", fldnum); if (PQgetisnull(results->res, results->tup_num, fldnum) == 1) - { - PG_RETURN_NULL(); - - } else { - text_len = PQgetlength(results->res, results->tup_num, fldnum); result = (char *) palloc(text_len + 1); --- 172,208 ---- dblink_tok(PG_FUNCTION_ARGS) { dblink_results *results; ! int fldnum; ! text *result_text; ! char *result; ! int nfields = 0; ! int text_len = 0; ! results = get_res_ptr(PG_GETARG_INT32(0)); if (results == NULL) ! { ! if (res_id != NIL) ! { ! freeList(res_id); ! res_id = NIL; ! res_id_index = 0; ! } ! ! elog(ERROR, "dblink_tok: function called with invalid resource id"); ! } fldnum = PG_GETARG_INT32(1); if (fldnum < 0) ! elog(ERROR, "dblink_tok: field number < 0 not permitted"); nfields = PQnfields(results->res); if (fldnum > (nfields - 1)) ! elog(ERROR, "dblink_tok: field number %d does not exist", fldnum); if (PQgetisnull(results->res, results->tup_num, fldnum) == 1) PG_RETURN_NULL(); else { text_len = PQgetlength(results->res, results->tup_num, fldnum); result = (char *) palloc(text_len + 1); *************** *** 259,270 **** --- 218,838 ---- result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result))); PG_RETURN_TEXT_P(result_text); + } + } + + + /* + * dblink_strtok + * parse input string + * return ord item (0 based) + * based on provided field separator + */ + PG_FUNCTION_INFO_V1(dblink_strtok); + Datum + dblink_strtok(PG_FUNCTION_ARGS) + { + char *fldtext; + char *fldsep; + int fldnum; + char *buffer; + text *result_text; + + fldtext = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0)))); + fldsep = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1)))); + fldnum = PG_GETARG_INT32(2); + + if (fldtext[0] == '\0') + { + elog(ERROR, "get_strtok: blank list not permitted"); + } + if (fldsep[0] == '\0') + { + elog(ERROR, "get_strtok: blank field separator not permitted"); + } + buffer = get_strtok(fldtext, fldsep, fldnum); + + pfree(fldtext); + pfree(fldsep); + + if (buffer == NULL) + { + PG_RETURN_NULL(); + } + else + { + result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(buffer))); + pfree(buffer); + + PG_RETURN_TEXT_P(result_text); } } /* + * dblink_get_pkey + * + * Return comma delimited list of primary key + * fields for the supplied relation, + * or NULL if none exists. + */ + PG_FUNCTION_INFO_V1(dblink_get_pkey); + Datum + dblink_get_pkey(PG_FUNCTION_ARGS) + { + char *relname; + Oid relid; + char **result; + text *result_text; + int16 numatts; + ReturnSetInfo *rsi; + dblink_array_results *ret_set; + + if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo)) + elog(ERROR, "dblink: function called in context that does not accept a set result"); + + if (fcinfo->flinfo->fn_extra == NULL) + { + relname = NameStr(*PG_GETARG_NAME(0)); + + /* + * Convert relname to rel OID. + */ + relid = get_relid_from_relname(relname); + if (!OidIsValid(relid)) + elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist", + relname); + + /* + * get an array of attnums. + */ + result = get_pkey_attnames(relid, &numatts); + + if ((result != NULL) && (numatts > 0)) + { + ret_set = init_dblink_array_results(fcinfo->flinfo->fn_mcxt); + + ret_set->elem_num = 0; + ret_set->num_elems = numatts; + ret_set->res = result; + + fcinfo->flinfo->fn_extra = (void *) ret_set; + + rsi = (ReturnSetInfo *) fcinfo->resultinfo; + rsi->isDone = ExprMultipleResult; + + result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num]))); + + PG_RETURN_TEXT_P(result_text); + } + else + { + rsi = (ReturnSetInfo *) fcinfo->resultinfo; + rsi->isDone = ExprEndResult; + + PG_RETURN_NULL(); + } + } + else + { + /* + * check for more results + */ + ret_set = fcinfo->flinfo->fn_extra; + ret_set->elem_num++; + result = ret_set->res; + + if (ret_set->elem_num < ret_set->num_elems) + { + /* + * fetch next one + */ + rsi = (ReturnSetInfo *) fcinfo->resultinfo; + rsi->isDone = ExprMultipleResult; + + result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num]))); + PG_RETURN_TEXT_P(result_text); + } + else + { + int i; + + /* + * or if no more, clean things up + */ + for (i = 0; i < ret_set->num_elems; i++) + pfree(result[i]); + + pfree(ret_set->res); + pfree(ret_set); + + rsi = (ReturnSetInfo *) fcinfo->resultinfo; + rsi->isDone = ExprEndResult; + + PG_RETURN_NULL(); + } + } + PG_RETURN_NULL(); + } + + + /* + * dblink_last_oid + * return last inserted oid + */ + PG_FUNCTION_INFO_V1(dblink_last_oid); + Datum + dblink_last_oid(PG_FUNCTION_ARGS) + { + dblink_results *results; + + results = get_res_ptr(PG_GETARG_INT32(0)); + if (results == NULL) + { + if (res_id != NIL) + { + freeList(res_id); + res_id = NIL; + res_id_index = 0; + } + + elog(ERROR, "dblink_tok: function called with invalid resource id"); + } + + PG_RETURN_OID(PQoidValue(results->res)); + } + + + /* + * dblink_build_sql_insert + * + * Used to generate an SQL insert statement + * based on an existing tuple in a local relation. + * This is useful for selectively replicating data + * to another server via dblink. + * + * API: + * <relname> - name of local table of interest + * <pkattnums> - an int2vector of attnums which will be used + * to identify the local tuple of interest + * <pknumatts> - number of attnums in pkattnums + * <src_pkattvals_arry> - text array of key values which will be used + * to identify the local tuple of interest + * <tgt_pkattvals_arry> - text array of key values which will be used + * to build the string for execution remotely. These are substituted + * for their counterparts in src_pkattvals_arry + */ + PG_FUNCTION_INFO_V1(dblink_build_sql_insert); + Datum + dblink_build_sql_insert(PG_FUNCTION_ARGS) + { + Oid relid; + char *relname; + int16 *pkattnums; + int16 pknumatts; + char **src_pkattvals; + char **tgt_pkattvals; + ArrayType *src_pkattvals_arry; + ArrayType *tgt_pkattvals_arry; + int src_ndim; + int *src_dim; + int src_nitems; + int tgt_ndim; + int *tgt_dim; + int tgt_nitems; + int i; + char *ptr; + char *sql; + text *sql_text; + + relname = NameStr(*PG_GETARG_NAME(0)); + + /* + * Convert relname to rel OID. + */ + relid = get_relid_from_relname(relname); + if (!OidIsValid(relid)) + elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist", + relname); + + pkattnums = (int16 *) PG_GETARG_POINTER(1); + pknumatts = PG_GETARG_INT16(2); + /* + * There should be at least one key attribute + */ + if (pknumatts == 0) + elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0."); + + src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3); + tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4); + + /* + * Source array is made up of key values that will be used to + * locate the tuple of interest from the local system. + */ + src_ndim = ARR_NDIM(src_pkattvals_arry); + src_dim = ARR_DIMS(src_pkattvals_arry); + src_nitems = ArrayGetNItems(src_ndim, src_dim); + + /* + * There should be one source array key value for each key attnum + */ + if (src_nitems != pknumatts) + elog(ERROR, "dblink_build_sql_insert: source key array length does not match number of key attributes."); + + /* + * get array of pointers to c-strings from the input source array + */ + src_pkattvals = (char **) palloc(src_nitems * sizeof(char *)); + ptr = ARR_DATA_PTR(src_pkattvals_arry); + for (i = 0; i < src_nitems; i++) + { + src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); + ptr += INTALIGN(*(int32 *) ptr); + } + + /* + * Target array is made up of key values that will be used to + * build the SQL string for use on the remote system. + */ + tgt_ndim = ARR_NDIM(tgt_pkattvals_arry); + tgt_dim = ARR_DIMS(tgt_pkattvals_arry); + tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim); + + /* + * There should be one target array key value for each key attnum + */ + if (tgt_nitems != pknumatts) + elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes."); + + /* + * get array of pointers to c-strings from the input target array + */ + tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *)); + ptr = ARR_DATA_PTR(tgt_pkattvals_arry); + for (i = 0; i < tgt_nitems; i++) + { + tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); + ptr += INTALIGN(*(int32 *) ptr); + } + + /* + * Prep work is finally done. Go get the SQL string. + */ + sql = get_sql_insert(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals); + + /* + * Make it into TEXT for return to the client + */ + sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql))); + + /* + * And send it + */ + PG_RETURN_TEXT_P(sql_text); + } + + + /* + * dblink_build_sql_delete + * + * Used to generate an SQL delete statement. + * This is useful for selectively replicating a + * delete to another server via dblink. + * + * API: + * <relname> - name of remote table of interest + * <pkattnums> - an int2vector of attnums which will be used + * to identify the remote tuple of interest + * <pknumatts> - number of attnums in pkattnums + * <tgt_pkattvals_arry> - text array of key values which will be used + * to build the string for execution remotely. + */ + PG_FUNCTION_INFO_V1(dblink_build_sql_delete); + Datum + dblink_build_sql_delete(PG_FUNCTION_ARGS) + { + Oid relid; + char *relname; + int16 *pkattnums; + int16 pknumatts; + char **tgt_pkattvals; + ArrayType *tgt_pkattvals_arry; + int tgt_ndim; + int *tgt_dim; + int tgt_nitems; + int i; + char *ptr; + char *sql; + text *sql_text; + + relname = NameStr(*PG_GETARG_NAME(0)); + + /* + * Convert relname to rel OID. + */ + relid = get_relid_from_relname(relname); + if (!OidIsValid(relid)) + elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist", + relname); + + pkattnums = (int16 *) PG_GETARG_POINTER(1); + pknumatts = PG_GETARG_INT16(2); + /* + * There should be at least one key attribute + */ + if (pknumatts == 0) + elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0."); + + tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3); + + /* + * Target array is made up of key values that will be used to + * build the SQL string for use on the remote system. + */ + tgt_ndim = ARR_NDIM(tgt_pkattvals_arry); + tgt_dim = ARR_DIMS(tgt_pkattvals_arry); + tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim); + + /* + * There should be one target array key value for each key attnum + */ + if (tgt_nitems != pknumatts) + elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes."); + + /* + * get array of pointers to c-strings from the input target array + */ + tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *)); + ptr = ARR_DATA_PTR(tgt_pkattvals_arry); + for (i = 0; i < tgt_nitems; i++) + { + tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); + ptr += INTALIGN(*(int32 *) ptr); + } + + /* + * Prep work is finally done. Go get the SQL string. + */ + sql = get_sql_delete(relid, pkattnums, pknumatts, tgt_pkattvals); + + /* + * Make it into TEXT for return to the client + */ + sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql))); + + /* + * And send it + */ + PG_RETURN_TEXT_P(sql_text); + } + + + /* + * dblink_build_sql_update + * + * Used to generate an SQL update statement + * based on an existing tuple in a local relation. + * This is useful for selectively replicating data + * to another server via dblink. + * + * API: + * <relname> - name of local table of interest + * <pkattnums> - an int2vector of attnums which will be used + * to identify the local tuple of interest + * <pknumatts> - number of attnums in pkattnums + * <src_pkattvals_arry> - text array of key values which will be used + * to identify the local tuple of interest + * <tgt_pkattvals_arry> - text array of key values which will be used + * to build the string for execution remotely. These are substituted + * for their counterparts in src_pkattvals_arry + */ + PG_FUNCTION_INFO_V1(dblink_build_sql_update); + Datum + dblink_build_sql_update(PG_FUNCTION_ARGS) + { + Oid relid; + char *relname; + int16 *pkattnums; + int16 pknumatts; + char **src_pkattvals; + char **tgt_pkattvals; + ArrayType *src_pkattvals_arry; + ArrayType *tgt_pkattvals_arry; + int src_ndim; + int *src_dim; + int src_nitems; + int tgt_ndim; + int *tgt_dim; + int tgt_nitems; + int i; + char *ptr; + char *sql; + text *sql_text; + + relname = NameStr(*PG_GETARG_NAME(0)); + + /* + * Convert relname to rel OID. + */ + relid = get_relid_from_relname(relname); + if (!OidIsValid(relid)) + elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist", + relname); + + pkattnums = (int16 *) PG_GETARG_POINTER(1); + pknumatts = PG_GETARG_INT16(2); + /* + * There should be one source array key values for each key attnum + */ + if (pknumatts == 0) + elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0."); + + src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3); + tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4); + + /* + * Source array is made up of key values that will be used to + * locate the tuple of interest from the local system. + */ + src_ndim = ARR_NDIM(src_pkattvals_arry); + src_dim = ARR_DIMS(src_pkattvals_arry); + src_nitems = ArrayGetNItems(src_ndim, src_dim); + + /* + * There should be one source array key value for each key attnum + */ + if (src_nitems != pknumatts) + elog(ERROR, "dblink_build_sql_insert: source key array length does not match number of key attributes."); + + /* + * get array of pointers to c-strings from the input source array + */ + src_pkattvals = (char **) palloc(src_nitems * sizeof(char *)); + ptr = ARR_DATA_PTR(src_pkattvals_arry); + for (i = 0; i < src_nitems; i++) + { + src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); + ptr += INTALIGN(*(int32 *) ptr); + } + + /* + * Target array is made up of key values that will be used to + * build the SQL string for use on the remote system. + */ + tgt_ndim = ARR_NDIM(tgt_pkattvals_arry); + tgt_dim = ARR_DIMS(tgt_pkattvals_arry); + tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim); + + /* + * There should be one target array key value for each key attnum + */ + if (tgt_nitems != pknumatts) + elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes."); + + /* + * get array of pointers to c-strings from the input target array + */ + tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *)); + ptr = ARR_DATA_PTR(tgt_pkattvals_arry); + for (i = 0; i < tgt_nitems; i++) + { + tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr))); + ptr += INTALIGN(*(int32 *) ptr); + } + + /* + * Prep work is finally done. Go get the SQL string. + */ + sql = get_sql_update(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals); + + /* + * Make it into TEXT for return to the client + */ + sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql))); + + /* + * And send it + */ + PG_RETURN_TEXT_P(sql_text); + } + + + /* + * dblink_current_query + * return the current query string + * to allow its use in (among other things) + * rewrite rules + */ + PG_FUNCTION_INFO_V1(dblink_current_query); + Datum + dblink_current_query(PG_FUNCTION_ARGS) + { + text *result_text; + + result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(debug_query_string))); + PG_RETURN_TEXT_P(result_text); + } + + + /* + * dblink_replace_text + * replace all occurences of 'old_sub_str' in 'orig_str' + * with 'new_sub_str' to form 'new_str' + * + * returns 'orig_str' if 'old_sub_str' == '' or 'orig_str' == '' + * otherwise returns 'new_str' + */ + PG_FUNCTION_INFO_V1(dblink_replace_text); + Datum + dblink_replace_text(PG_FUNCTION_ARGS) + { + text *left_text; + text *right_text; + text *buf_text; + text *ret_text; + char *ret_str; + int curr_posn; + text *src_text = PG_GETARG_TEXT_P(0); + int src_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(src_text))); + text *from_sub_text = PG_GETARG_TEXT_P(1); + int from_sub_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(from_sub_text))); + text *to_sub_text = PG_GETARG_TEXT_P(2); + char *to_sub_str = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(to_sub_text))); + StringInfo str = makeStringInfo(); + + if (src_text_len == 0 || from_sub_text_len == 0) + PG_RETURN_TEXT_P(src_text); + + buf_text = DatumGetTextPCopy(PointerGetDatum(src_text)); + curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))); + + while (curr_posn > 0) + { + left_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text), 1, DatumGetInt32(DirectFunctionCall2(textpos,PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) - 1)); + right_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text), DatumGetInt32(DirectFunctionCall2(textpos,PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) + from_sub_text_len,-1)); + + appendStringInfo(str, DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(left_text)))); + appendStringInfo(str, to_sub_str); + + pfree(buf_text); + pfree(left_text); + buf_text = right_text; + curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))); + } + + appendStringInfo(str, DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(buf_text)))); + pfree(buf_text); + + ret_str = pstrdup(str->data); + ret_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(ret_str))); + + PG_RETURN_TEXT_P(ret_text); + } + + + /************************************************************* * internal functions */ *************** *** 285,293 **** --- 853,1408 ---- MemSet(retval, 0, sizeof(dblink_results)); retval->tup_num = -1; + retval->res_id_index =-1; retval->res = NULL; MemoryContextSwitchTo(oldcontext); return retval; } + + + /* + * init_dblink_array_results + * - create an empty dblink_array_results data structure + */ + dblink_array_results * + init_dblink_array_results(MemoryContext fn_mcxt) + { + MemoryContext oldcontext; + dblink_array_results *retval; + + oldcontext = MemoryContextSwitchTo(fn_mcxt); + + retval = (dblink_array_results *) palloc(sizeof(dblink_array_results)); + MemSet(retval, 0, sizeof(dblink_array_results)); + + retval->elem_num = -1; + retval->num_elems = 0; + retval->res = NULL; + + MemoryContextSwitchTo(oldcontext); + + return retval; + } + + /* + * get_pkey_attnames + * + * Get the primary key attnames for the given relation. + * Return NULL, and set numatts = 0, if no primary key exists. + */ + char ** + get_pkey_attnames(Oid relid, int16 *numatts) + { + Relation indexRelation; + ScanKeyData entry; + HeapScanDesc scan; + HeapTuple indexTuple; + int i; + char **result = NULL; + Relation rel; + TupleDesc tupdesc; + + /* + * Open relation using relid, get tupdesc + */ + rel = relation_open(relid, AccessShareLock); + tupdesc = rel->rd_att; + + /* + * Initialize numatts to 0 in case no primary key + * exists + */ + *numatts = 0; + + /* + * Use relid to get all related indexes + */ + indexRelation = heap_openr(IndexRelationName, AccessShareLock); + ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indrelid, + F_OIDEQ, ObjectIdGetDatum(relid)); + scan = heap_beginscan(indexRelation, false, SnapshotNow, + 1, &entry); + + while (HeapTupleIsValid(indexTuple = heap_getnext(scan, 0))) + { + Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple); + + /* + * We're only interested if it is the primary key + */ + if (index->indisprimary == TRUE) + { + i = 0; + while (index->indkey[i++] != 0) + (*numatts)++; + + if (*numatts > 0) + { + result = (char **) palloc(*numatts * sizeof(char *)); + for (i = 0; i < *numatts; i++) + result[i] = SPI_fname(tupdesc, index->indkey[i]); + } + break; + } + } + heap_endscan(scan); + heap_close(indexRelation, AccessShareLock); + relation_close(rel, AccessShareLock); + + return result; + } + + + /* + * get_strtok + * + * parse input string + * return ord item (0 based) + * based on provided field separator + */ + char * + get_strtok(char *fldtext, char *fldsep, int fldnum) + { + int j = 0; + char *result; + + if (fldnum < 0) + { + elog(ERROR, "get_strtok: field number < 0 not permitted"); + } + + if (fldsep[0] == '\0') + { + elog(ERROR, "get_strtok: blank field separator not permitted"); + } + + result = strtok(fldtext, fldsep); + for (j = 1; j < fldnum + 1; j++) + { + result = strtok(NULL, fldsep); + if (result == NULL) + return NULL; + } + + return pstrdup(result); + } + + char * + get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals) + { + Relation rel; + char *relname; + HeapTuple tuple; + TupleDesc tupdesc; + int natts; + StringInfo str = makeStringInfo(); + char *sql = NULL; + char *val = NULL; + int16 key; + unsigned int i; + + /* + * Open relation using relid + */ + rel = relation_open(relid, AccessShareLock); + relname = RelationGetRelationName(rel); + tupdesc = rel->rd_att; + natts = tupdesc->natts; + + tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals); + + appendStringInfo(str, "INSERT INTO %s(", quote_ident_cstr(relname)); + for (i = 0; i < natts; i++) + { + if (i > 0) + appendStringInfo(str, ","); + + appendStringInfo(str, NameStr(tupdesc->attrs[i]->attname)); + } + + appendStringInfo(str, ") VALUES("); + + /* + * remember attvals are 1 based + */ + for (i = 0; i < natts; i++) + { + if (i > 0) + appendStringInfo(str, ","); + + if (tgt_pkattvals != NULL) + key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1); + else + key = -1; + + if (key > -1) + val = pstrdup(tgt_pkattvals[key]); + else + val = SPI_getvalue(tuple, tupdesc, i + 1); + + if (val != NULL) + { + appendStringInfo(str, quote_literal_cstr(val)); + pfree(val); + } + else + appendStringInfo(str, "NULL"); + } + appendStringInfo(str, ")"); + + sql = pstrdup(str->data); + pfree(str->data); + pfree(str); + relation_close(rel, AccessShareLock); + + return (sql); + } + + char * + get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals) + { + Relation rel; + char *relname; + TupleDesc tupdesc; + int natts; + StringInfo str = makeStringInfo(); + char *sql = NULL; + char *val = NULL; + unsigned int i; + + /* + * Open relation using relid + */ + rel = relation_open(relid, AccessShareLock); + relname = RelationGetRelationName(rel); + tupdesc = rel->rd_att; + natts = tupdesc->natts; + + appendStringInfo(str, "DELETE FROM %s WHERE ", quote_ident_cstr(relname)); + for (i = 0; i < pknumatts; i++) + { + int16 pkattnum = pkattnums[i]; + + if (i > 0) + appendStringInfo(str, " AND "); + + appendStringInfo(str, NameStr(tupdesc->attrs[pkattnum - 1]->attname)); + + if (tgt_pkattvals != NULL) + val = pstrdup(tgt_pkattvals[i]); + else + elog(ERROR, "Target key array must not be NULL"); + + if (val != NULL) + { + appendStringInfo(str, "="); + appendStringInfo(str, quote_literal_cstr(val)); + pfree(val); + } + else + appendStringInfo(str, "IS NULL"); + } + + sql = pstrdup(str->data); + pfree(str->data); + pfree(str); + relation_close(rel, AccessShareLock); + + return (sql); + } + + char * + get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals) + { + Relation rel; + char *relname; + HeapTuple tuple; + TupleDesc tupdesc; + int natts; + StringInfo str = makeStringInfo(); + char *sql = NULL; + char *val = NULL; + int16 key; + int i; + + /* + * Open relation using relid + */ + rel = relation_open(relid, AccessShareLock); + relname = RelationGetRelationName(rel); + tupdesc = rel->rd_att; + natts = tupdesc->natts; + + tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals); + + appendStringInfo(str, "UPDATE %s SET ", quote_ident_cstr(relname)); + + for (i = 0; i < natts; i++) + { + if (i > 0) + appendStringInfo(str, ","); + + appendStringInfo(str, NameStr(tupdesc->attrs[i]->attname)); + appendStringInfo(str, "="); + + if (tgt_pkattvals != NULL) + key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1); + else + key = -1; + + if (key > -1) + val = pstrdup(tgt_pkattvals[key]); + else + val = SPI_getvalue(tuple, tupdesc, i + 1); + + if (val != NULL) + { + appendStringInfo(str, quote_literal_cstr(val)); + pfree(val); + } + else + appendStringInfo(str, "NULL"); + } + + appendStringInfo(str, " WHERE "); + + for (i = 0; i < pknumatts; i++) + { + int16 pkattnum = pkattnums[i]; + + if (i > 0) + appendStringInfo(str, " AND "); + + appendStringInfo(str, NameStr(tupdesc->attrs[pkattnum - 1]->attname)); + + if (tgt_pkattvals != NULL) + val = pstrdup(tgt_pkattvals[i]); + else + val = SPI_getvalue(tuple, tupdesc, pkattnum); + + if (val != NULL) + { + appendStringInfo(str, "="); + appendStringInfo(str, quote_literal_cstr(val)); + pfree(val); + } + else + appendStringInfo(str, "IS NULL"); + } + + sql = pstrdup(str->data); + pfree(str->data); + pfree(str); + relation_close(rel, AccessShareLock); + + return (sql); + } + + /* + * Return a properly quoted literal value. + * Uses quote_literal in quote.c + */ + static char * + quote_literal_cstr(char *rawstr) + { + text *rawstr_text; + text *result_text; + char *result; + + rawstr_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(rawstr))); + result_text = DatumGetTextP(DirectFunctionCall1(quote_literal, PointerGetDatum(rawstr_text))); + result = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(result_text))); + + return result; + } + + /* + * Return a properly quoted identifier. + * Uses quote_ident in quote.c + */ + static char * + quote_ident_cstr(char *rawstr) + { + text *rawstr_text; + text *result_text; + char *result; + + rawstr_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(rawstr))); + result_text = DatumGetTextP(DirectFunctionCall1(quote_ident, PointerGetDatum(rawstr_text))); + result = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(result_text))); + + return result; + } + + int16 + get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key) + { + int i; + + /* + * Not likely a long list anyway, so just scan for + * the value + */ + for (i = 0; i < pknumatts; i++) + if (key == pkattnums[i]) + return i; + + return -1; + } + + HeapTuple + get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals) + { + Relation rel; + char *relname; + TupleDesc tupdesc; + StringInfo str = makeStringInfo(); + char *sql = NULL; + int ret; + HeapTuple tuple; + int i; + char *val = NULL; + + /* + * Open relation using relid + */ + rel = relation_open(relid, AccessShareLock); + relname = RelationGetRelationName(rel); + tupdesc = rel->rd_att; + + /* + * Connect to SPI manager + */ + if ((ret = SPI_connect()) < 0) + elog(ERROR, "get_tuple_of_interest: SPI_connect returned %d", ret); + + /* + * Build sql statement to look up tuple of interest + * Use src_pkattvals as the criteria. + */ + appendStringInfo(str, "SELECT * from %s WHERE ", relname); + + for (i = 0; i < pknumatts; i++) + { + int16 pkattnum = pkattnums[i]; + + if (i > 0) + appendStringInfo(str, " AND "); + + appendStringInfo(str, NameStr(tupdesc->attrs[pkattnum - 1]->attname)); + + val = pstrdup(src_pkattvals[i]); + if (val != NULL) + { + appendStringInfo(str, "="); + appendStringInfo(str, quote_literal_cstr(val)); + pfree(val); + } + else + appendStringInfo(str, "IS NULL"); + } + + sql = pstrdup(str->data); + pfree(str->data); + pfree(str); + /* + * Retrieve the desired tuple + */ + ret = SPI_exec(sql, 0); + pfree(sql); + + /* + * Only allow one qualifying tuple + */ + if ((ret == SPI_OK_SELECT) && (SPI_processed > 1)) + { + elog(ERROR, "get_tuple_of_interest: Source criteria may not match more than one record."); + } + else if (ret == SPI_OK_SELECT && SPI_processed == 1) + { + SPITupleTable *tuptable = SPI_tuptable; + tuple = SPI_copytuple(tuptable->vals[0]); + + return tuple; + } + else + { + /* + * no qualifying tuples + */ + return NULL; + } + + /* + * never reached, but keep compiler quiet + */ + return NULL; + } + + Oid + get_relid_from_relname(char *relname) + { + #ifdef NamespaceRelationName + Oid relid; + + relid = RelnameGetRelid(relname); + #else + Relation rel; + Oid relid; + + rel = relation_openr(relname, AccessShareLock); + relid = RelationGetRelid(rel); + relation_close(rel, AccessShareLock); + #endif /* NamespaceRelationName */ + + return relid; + } + + dblink_results * + get_res_ptr(int32 res_id_index) + { + List *ptr; + + /* + * short circuit empty list + */ + if(res_id == NIL) + return NULL; + + /* + * OK, should be good to go + */ + foreach(ptr, res_id) + { + dblink_results *this_res_id = (dblink_results *) lfirst(ptr); + if (this_res_id->res_id_index == res_id_index) + return this_res_id; + } + return NULL; + } + + /* + * Add node to global List res_id + */ + void + append_res_ptr(dblink_results *results) + { + res_id = lappend(res_id, results); + } + + /* + * Remove node from global List + * using res_id_index + */ + void + remove_res_ptr(dblink_results *results) + { + res_id = lremove(results, res_id); + + if (res_id == NIL) + res_id_index = 0; + } + + diff -cNr dblink.orig/dblink.h dblink/dblink.h *** dblink.orig/dblink.h Mon Nov 5 09:46:22 2001 --- dblink/dblink.h Sun Apr 14 18:54:39 2002 *************** *** 3,9 **** * * Functions returning results from a remote database * ! * Copyright (c) Joseph Conway <joe.conway@mail.com>, 2001; * * Permission to use, copy, modify, and distribute this software and its * documentation for any purpose, without fee, and without a written agreement --- 3,10 ---- * * Functions returning results from a remote database * ! * Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002, ! * ALL RIGHTS RESERVED; * * Permission to use, copy, modify, and distribute this software and its * documentation for any purpose, without fee, and without a written agreement *************** *** 33,42 **** --- 34,64 ---- #include "libpq-int.h" #include "fmgr.h" #include "access/tupdesc.h" + #include "access/heapam.h" + #include "catalog/catname.h" + #include "catalog/pg_index.h" + #include "catalog/pg_type.h" #include "executor/executor.h" + #include "executor/spi.h" + #include "lib/stringinfo.h" #include "nodes/nodes.h" #include "nodes/execnodes.h" + #include "nodes/pg_list.h" + #include "parser/parse_type.h" + #include "tcop/tcopprot.h" #include "utils/builtins.h" + #include "utils/fmgroids.h" + #include "utils/array.h" + #include "utils/syscache.h" + + #ifdef NamespaceRelationName + #include "catalog/namespace.h" + #endif /* NamespaceRelationName */ + + /* + * Max SQL statement size + */ + #define DBLINK_MAX_SQLSTATE_SIZE 16384 /* * This struct holds the results of the remote query. *************** *** 50,70 **** int tup_num; /* * the actual query results */ PGresult *res; - } dblink_results; /* * External declarations */ extern Datum dblink(PG_FUNCTION_ARGS); extern Datum dblink_tok(PG_FUNCTION_ARGS); /* * Internal declarations */ dblink_results *init_dblink_results(MemoryContext fn_mcxt); #endif /* DBLINK_H */ --- 72,145 ---- int tup_num; /* + * resource index number for this context + */ + int res_id_index; + + /* * the actual query results */ PGresult *res; } dblink_results; + + /* + * This struct holds results in the form of an array. + * Use fn_extra to hold a pointer to it across calls + */ + typedef struct + { + /* + * elem being accessed + */ + int elem_num; + + /* + * number of elems + */ + int num_elems; + + /* + * the actual array + */ + void *res; + + } dblink_array_results; + /* * External declarations */ extern Datum dblink(PG_FUNCTION_ARGS); extern Datum dblink_tok(PG_FUNCTION_ARGS); + extern Datum dblink_strtok(PG_FUNCTION_ARGS); + extern Datum dblink_get_pkey(PG_FUNCTION_ARGS); + extern Datum dblink_last_oid(PG_FUNCTION_ARGS); + extern Datum dblink_build_sql_insert(PG_FUNCTION_ARGS); + extern Datum dblink_build_sql_delete(PG_FUNCTION_ARGS); + extern Datum dblink_build_sql_update(PG_FUNCTION_ARGS); + extern Datum dblink_current_query(PG_FUNCTION_ARGS); + extern Datum dblink_replace_text(PG_FUNCTION_ARGS); /* * Internal declarations */ dblink_results *init_dblink_results(MemoryContext fn_mcxt); + dblink_array_results *init_dblink_array_results(MemoryContext fn_mcxt); + char **get_pkey_attnames(Oid relid, int16 *numatts); + char *get_strtok(char *fldtext, char *fldsep, int fldnum); + char *getvalue(HeapTuple tuple, TupleDesc tupdesc, int fnumber); + char *get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals); + char *get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals); + char *get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals); + static char *quote_literal_cstr(char *rawstr); + static char *quote_ident_cstr(char *rawstr); + int16 get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key); + HeapTuple get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals); + Oid get_relid_from_relname(char *relname); + dblink_results *get_res_ptr(int32 res_id_index); + void append_res_ptr(dblink_results *results); + void remove_res_ptr(dblink_results *results); + + extern char *debug_query_string; #endif /* DBLINK_H */ diff -cNr dblink.orig/dblink.sql.in dblink/dblink.sql.in *** dblink.orig/dblink.sql.in Thu Jun 14 09:49:03 2001 --- dblink/dblink.sql.in Fri Apr 12 14:36:49 2002 *************** *** 1,5 **** ! CREATE FUNCTION dblink (text,text) RETURNS setof int ! AS 'MODULE_PATHNAME','dblink' LANGUAGE 'c'; ! CREATE FUNCTION dblink_tok (int,int) RETURNS text ! AS 'MODULE_PATHNAME','dblink_tok' LANGUAGE 'c'; --- 1,38 ---- ! CREATE OR REPLACE FUNCTION dblink (text,text) RETURNS setof int ! AS 'MODULE_PATHNAME','dblink' LANGUAGE 'c' ! WITH (isstrict); ! CREATE OR REPLACE FUNCTION dblink_tok (int,int) RETURNS text ! AS 'MODULE_PATHNAME','dblink_tok' LANGUAGE 'c' ! WITH (isstrict); ! ! CREATE OR REPLACE FUNCTION dblink_strtok (text,text,int) RETURNS text ! AS 'MODULE_PATHNAME','dblink_strtok' LANGUAGE 'c' ! WITH (iscachable, isstrict); ! ! CREATE OR REPLACE FUNCTION dblink_get_pkey (name) RETURNS setof text ! AS 'MODULE_PATHNAME','dblink_get_pkey' LANGUAGE 'c' ! WITH (isstrict); ! ! CREATE OR REPLACE FUNCTION dblink_last_oid (int) RETURNS oid ! AS 'MODULE_PATHNAME','dblink_last_oid' LANGUAGE 'c' ! WITH (isstrict); ! ! CREATE OR REPLACE FUNCTION dblink_build_sql_insert (name, int2vector, int2, _text, _text) RETURNS text ! AS 'MODULE_PATHNAME','dblink_build_sql_insert' LANGUAGE 'c' ! WITH (isstrict); ! ! CREATE OR REPLACE FUNCTION dblink_build_sql_delete (name, int2vector, int2, _text) RETURNS text ! AS 'MODULE_PATHNAME','dblink_build_sql_delete' LANGUAGE 'c' ! WITH (isstrict); ! ! CREATE OR REPLACE FUNCTION dblink_build_sql_update (name, int2vector, int2, _text, _text) RETURNS text ! AS 'MODULE_PATHNAME','dblink_build_sql_update' LANGUAGE 'c' ! WITH (isstrict); ! ! CREATE OR REPLACE FUNCTION dblink_current_query () RETURNS text ! AS 'MODULE_PATHNAME','dblink_current_query' LANGUAGE 'c'; ! ! CREATE OR REPLACE FUNCTION dblink_replace (text,text,text) RETURNS text ! AS 'MODULE_PATHNAME','dblink_replace_text' LANGUAGE 'c' ! WITH (iscachable, isstrict);
pgsql-patches by date: