diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index 46c7cc5..ffdf9e3 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -56,11 +56,27 @@ PG_MODULE_MAGIC; +typedef struct storeInfo +{ + Tuplestorestate *tuplestore; + int nattrs; + MemoryContext oldcontext; + AttInMetadata *attinmeta; + TupleDesc tupdesc; + char* valbuf; + int valbuflen; + char **cstrs; + bool error_occurred; + bool nummismatch; +} storeInfo; + typedef struct remoteConn { PGconn *conn; /* Hold the remote connection */ int openCursorCount; /* The number of open cursors */ bool newXactForCursor; /* Opened a transaction for a cursor */ + bool materialize_needed; /* Materialize result if true */ + } remoteConn; /* @@ -91,11 +107,12 @@ static char *escape_param_str(const char *from); static void validate_pkattnums(Relation rel, int2vector *pkattnums_arg, int32 pknumatts_arg, int **pkattnums, int *pknumatts); +static void initStoreInfo(FunctionCallInfo fcinfo, storeInfo *sinfo); +static int storeHandler(PGresult *res, PGrowValue *columns, void *param); /* Global */ static remoteConn *pconn = NULL; static HTAB *remoteConnHash = NULL; - /* * Following is list that holds multiple remote connections. * Calling convention of each dblink function changes to accept @@ -112,6 +129,9 @@ typedef struct remoteConnHashEnt /* initial number of connection hashes */ #define NUMCONN 16 +/* Initial block size for value buffer in storeHandler */ +#define INITBUFLEN 64 + /* general utility */ #define xpfree(var_) \ do { \ @@ -201,6 +221,7 @@ typedef struct remoteConnHashEnt pconn->conn = NULL; \ pconn->openCursorCount = 0; \ pconn->newXactForCursor = FALSE; \ + pconn->materialize_needed = false; \ } \ } while (0) @@ -229,8 +250,11 @@ dblink_connect(PG_FUNCTION_ARGS) conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0)); if (connname) + { rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); + rconn->materialize_needed = false; + } /* first check for valid foreign data server */ connstr = get_connect_string(conname_or_str); @@ -504,6 +528,7 @@ dblink_fetch(PG_FUNCTION_ARGS) char *curname = NULL; int howmany = 0; bool fail = true; /* default to backward compatible */ + storeInfo storeInfo; prepTuplestoreResult(fcinfo); @@ -557,15 +582,52 @@ dblink_fetch(PG_FUNCTION_ARGS) appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname); /* + * Result is stored into storeinfo.tuplestore instead of + * res->result retuned by PQexec below + */ + initStoreInfo(fcinfo, &storeInfo); + PQsetRowProcessor(conn, storeHandler, &storeInfo); + + /* * Try to execute the query. Note that since libpq uses malloc, the * PGresult will be long-lived even though we are still in a short-lived * memory context. */ - res = PQexec(conn, buf.data); + PG_TRY(); + { + res = PQexec(conn, buf.data); + } + PG_CATCH(); + { + ErrorData *edata; + + PQsetRowProcessor(conn, NULL, NULL); + edata = CopyErrorData(); + FlushErrorState(); + + /* Skip remaining results when storeHandler raises exception. */ + PQskipResult(conn, TRUE); + ReThrowError(edata); + } + PG_END_TRY(); + PQsetRowProcessor(conn, NULL, NULL); + if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) { + if (storeInfo.nummismatch) + { + if (res) + PQclear(res); + + /* This is only for backward compatibility */ + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match " + "the specified FROM clause rowtype"))); + } + dblink_res_error(conname, res, "could not fetch from cursor", fail); return (Datum) 0; } @@ -577,8 +639,8 @@ dblink_fetch(PG_FUNCTION_ARGS) (errcode(ERRCODE_INVALID_CURSOR_NAME), errmsg("cursor \"%s\" does not exist", curname))); } + PQclear(res); - materializeResult(fcinfo, res); return (Datum) 0; } @@ -616,6 +678,8 @@ dblink_send_query(PG_FUNCTION_ARGS) if (retval != 1) elog(NOTICE, "%s", PQerrorMessage(conn)); + rconn->materialize_needed = false; + PG_RETURN_INT32(retval); } @@ -638,11 +702,12 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) remoteConn *rconn = NULL; bool fail = true; /* default to backward compatible */ bool freeconn = false; - - prepTuplestoreResult(fcinfo); + storeInfo storeInfo; DBLINK_INIT; + prepTuplestoreResult(fcinfo); + if (!is_async) { if (PG_NARGS() == 3) @@ -698,31 +763,97 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) if (!conn) DBLINK_CONN_NOT_AVAIL; - - /* synchronous query, or async result retrieval */ - if (!is_async) - res = PQexec(conn, sql); + + if (!is_async || (rconn && !rconn->materialize_needed)) + { + /* + * Result is stored into storeinfo.tuplestore instead of + * res->result retuned by PQexec/PQgetResult below + */ + initStoreInfo(fcinfo, &storeInfo); + PQsetRowProcessor(conn, storeHandler, &storeInfo); + } else + storeInfo.nummismatch = false; + + PG_TRY(); { - res = PQgetResult(conn); - /* NULL means we're all done with the async results */ - if (!res) - return (Datum) 0; + /* synchronous query, or async result retrieval */ + if (!is_async) + res = PQexec(conn, sql); + else + res = PQgetResult(conn); } + PG_CATCH(); + { + ErrorData *edata; - /* if needed, close the connection to the database and cleanup */ - if (freeconn) - PQfinish(conn); + PQsetRowProcessor(conn, NULL, NULL); + edata = CopyErrorData(); + FlushErrorState(); - if (!res || - (PQresultStatus(res) != PGRES_COMMAND_OK && - PQresultStatus(res) != PGRES_TUPLES_OK)) + /* Skip remaining results when storeHandler raises exception. */ + PQskipResult(conn, TRUE); + ReThrowError(edata); + } + PG_END_TRY(); + PQsetRowProcessor(conn, NULL, NULL); + + /* NULL res from async get means we're all done with the results */ + if (res || !is_async) { - dblink_res_error(conname, res, "could not execute query", fail); - return (Datum) 0; + if (freeconn) + PQfinish(conn); + + /* + * exclude mismatch of the numbers of the colums here so as to + * behave as before. + */ + if (!res || + (PQresultStatus(res) != PGRES_COMMAND_OK && + PQresultStatus(res) != PGRES_TUPLES_OK && + !storeInfo.nummismatch)) + { + dblink_res_error(conname, res, "could not execute query", fail); + return (Datum) 0; + } + + /* + * Materialize result if command result or materiarize is needed. + * Current libpq and dblink API design does not allow to use row + * processor for asynchronous query when dblink_is_busy is called prior + * to dblink_get_result. + */ + if (PQresultStatus(res) == PGRES_COMMAND_OK || + (rconn && rconn->materialize_needed)) + { + materializeResult(fcinfo, res); + return (Datum) 0; + } + else if (get_call_result_type(fcinfo, NULL, NULL) == TYPEFUNC_RECORD) + { + PQclear(res); + + /* failed to determine actual type of RECORD */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("function returning record called in context " + "that cannot accept type record"))); + } + + if (storeInfo.nummismatch) + { + /* This is only for backward compatibility */ + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match " + "the specified FROM clause rowtype"))); + } } - materializeResult(fcinfo, res); + if (res) + PQclear(res); + return (Datum) 0; } @@ -890,375 +1021,518 @@ materializeResult(FunctionCallInfo fcinfo, PGresult *res) PG_END_TRY(); } -/* - * List all open dblink connections by name. - * Returns an array of all connection names. - * Takes no params - */ -PG_FUNCTION_INFO_V1(dblink_get_connections); -Datum -dblink_get_connections(PG_FUNCTION_ARGS) -{ - HASH_SEQ_STATUS status; - remoteConnHashEnt *hentry; - ArrayBuildState *astate = NULL; - - if (remoteConnHash) - { - hash_seq_init(&status, remoteConnHash); - while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL) - { - /* stash away current value */ - astate = accumArrayResult(astate, - CStringGetTextDatum(hentry->name), - false, TEXTOID, CurrentMemoryContext); - } - } - - if (astate) - PG_RETURN_ARRAYTYPE_P(makeArrayResult(astate, - CurrentMemoryContext)); - else - PG_RETURN_NULL(); -} - -/* - * Checks if a given remote connection is busy - * - * Returns 1 if the connection is busy, 0 otherwise - * Params: - * text connection_name - name of the connection to check - * - */ -PG_FUNCTION_INFO_V1(dblink_is_busy); -Datum -dblink_is_busy(PG_FUNCTION_ARGS) -{ - char *conname = NULL; - PGconn *conn = NULL; - remoteConn *rconn = NULL; - - DBLINK_INIT; - DBLINK_GET_NAMED_CONN; - - PQconsumeInput(conn); - PG_RETURN_INT32(PQisBusy(conn)); -} - -/* - * Cancels a running request on a connection - * - * Returns text: - * "OK" if the cancel request has been sent correctly, - * an error message otherwise - * - * Params: - * text connection_name - name of the connection to check - * - */ -PG_FUNCTION_INFO_V1(dblink_cancel_query); -Datum -dblink_cancel_query(PG_FUNCTION_ARGS) -{ - int res = 0; - char *conname = NULL; - PGconn *conn = NULL; - remoteConn *rconn = NULL; - PGcancel *cancel; - char errbuf[256]; - - DBLINK_INIT; - DBLINK_GET_NAMED_CONN; - cancel = PQgetCancel(conn); - - res = PQcancel(cancel, errbuf, 256); - PQfreeCancel(cancel); - - if (res == 1) - PG_RETURN_TEXT_P(cstring_to_text("OK")); - else - PG_RETURN_TEXT_P(cstring_to_text(errbuf)); -} - - -/* - * Get error message from a connection - * - * Returns text: - * "OK" if no error, an error message otherwise - * - * Params: - * text connection_name - name of the connection to check - * - */ -PG_FUNCTION_INFO_V1(dblink_error_message); -Datum -dblink_error_message(PG_FUNCTION_ARGS) -{ - char *msg; - char *conname = NULL; - PGconn *conn = NULL; - remoteConn *rconn = NULL; - - DBLINK_INIT; - DBLINK_GET_NAMED_CONN; - - msg = PQerrorMessage(conn); - if (msg == NULL || msg[0] == '\0') - PG_RETURN_TEXT_P(cstring_to_text("OK")); - else - PG_RETURN_TEXT_P(cstring_to_text(msg)); -} - -/* - * Execute an SQL non-SELECT command - */ -PG_FUNCTION_INFO_V1(dblink_exec); -Datum -dblink_exec(PG_FUNCTION_ARGS) +static void +initStoreInfo(FunctionCallInfo fcinfo, storeInfo *sinfo) { - text *volatile sql_cmd_status = NULL; - PGconn *volatile conn = NULL; - volatile bool freeconn = false; - - DBLINK_INIT; - - PG_TRY(); - { - char *msg; - PGresult *res = NULL; - char *connstr = NULL; - char *sql = NULL; - char *conname = NULL; - remoteConn *rconn = NULL; - bool fail = true; /* default to backward compatible behavior */ - - if (PG_NARGS() == 3) - { - /* must be text,text,bool */ - DBLINK_GET_CONN; - sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); - fail = PG_GETARG_BOOL(2); - } - else if (PG_NARGS() == 2) - { - /* might be text,text or text,bool */ - if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) - { - conn = pconn->conn; - sql = text_to_cstring(PG_GETARG_TEXT_PP(0)); - fail = PG_GETARG_BOOL(1); - } - else - { - DBLINK_GET_CONN; - sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); - } - } - else if (PG_NARGS() == 1) - { - /* must be single text argument */ - conn = pconn->conn; - sql = text_to_cstring(PG_GETARG_TEXT_PP(0)); - } - else - /* shouldn't happen */ - elog(ERROR, "wrong number of arguments"); - - if (!conn) - DBLINK_CONN_NOT_AVAIL; + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc = NULL; + MemoryContext oldcontext; - res = PQexec(conn, sql); - if (!res || - (PQresultStatus(res) != PGRES_COMMAND_OK && - PQresultStatus(res) != PGRES_TUPLES_OK)) - { - dblink_res_error(conname, res, "could not execute command", fail); + oldcontext = MemoryContextSwitchTo( + rsinfo->econtext->ecxt_per_query_memory); - /* - * and save a copy of the command status string to return as our - * result tuple - */ - sql_cmd_status = cstring_to_text("ERROR"); - } - else if (PQresultStatus(res) == PGRES_COMMAND_OK) - { - /* - * and save a copy of the command status string to return as our - * result tuple - */ - sql_cmd_status = cstring_to_text(PQcmdStatus(res)); - PQclear(res); - } - else - { - PQclear(res); - ereport(ERROR, - (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED), - errmsg("statement returning results not allowed"))); - } - } - PG_CATCH(); + switch (get_call_result_type(fcinfo, NULL, &tupdesc)) { - /* if needed, close the connection to the database */ - if (freeconn) - PQfinish(conn); - PG_RE_THROW(); - } - PG_END_TRY(); - - /* if needed, close the connection to the database */ - if (freeconn) - PQfinish(conn); - - PG_RETURN_TEXT_P(sql_cmd_status); -} - - -/* - * dblink_get_pkey - * - * Return list of primary key fields for the supplied relation, - * or NULL if none exists. - */ -PG_FUNCTION_INFO_V1(dblink_get_pkey); -Datum -dblink_get_pkey(PG_FUNCTION_ARGS) -{ - int16 numatts; - char **results; - FuncCallContext *funcctx; - int32 call_cntr; - int32 max_calls; - AttInMetadata *attinmeta; - MemoryContext oldcontext; - - /* stuff done only on the first call of the function */ - if (SRF_IS_FIRSTCALL()) - { - Relation rel; - TupleDesc tupdesc; - - /* create a function context for cross-call persistence */ - funcctx = SRF_FIRSTCALL_INIT(); - - /* - * switch to memory context appropriate for multiple function calls - */ - oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); - - /* open target relation */ - rel = get_rel_from_relname(PG_GETARG_TEXT_P(0), AccessShareLock, ACL_SELECT); - - /* get the array of attnums */ - results = get_pkey_attnames(rel, &numatts); - - relation_close(rel, AccessShareLock); - - /* - * need a tuple descriptor representing one INT and one TEXT column - */ - tupdesc = CreateTemplateTupleDesc(2, false); - TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position", - INT4OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname", - TEXTOID, -1, 0); - - /* - * Generate attribute metadata needed later to produce tuples from raw - * C strings - */ - attinmeta = TupleDescGetAttInMetadata(tupdesc); - funcctx->attinmeta = attinmeta; - - if ((results != NULL) && (numatts > 0)) - { - funcctx->max_calls = numatts; - - /* got results, keep track of them */ - funcctx->user_fctx = results; - } - else - { - /* fast track when no results */ - MemoryContextSwitchTo(oldcontext); - SRF_RETURN_DONE(funcctx); - } - - MemoryContextSwitchTo(oldcontext); + case TYPEFUNC_COMPOSITE: + tupdesc = CreateTupleDescCopy(tupdesc); + sinfo->nattrs = tupdesc->natts; + break; + + case TYPEFUNC_RECORD: + tupdesc = CreateTemplateTupleDesc(1, false); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", + TEXTOID, -1, 0); + sinfo->nattrs = 1; + break; + + default: + /* result type isn't composite */ + elog(ERROR, "return type must be a row type"); + break; } - /* stuff done on every call of the function */ - funcctx = SRF_PERCALL_SETUP(); - - /* - * initialize per-call variables - */ - call_cntr = funcctx->call_cntr; - max_calls = funcctx->max_calls; - - results = (char **) funcctx->user_fctx; - attinmeta = funcctx->attinmeta; - - if (call_cntr < max_calls) /* do when there is more left to send */ - { - char **values; - HeapTuple tuple; - Datum result; + sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc); + sinfo->error_occurred = FALSE; + sinfo->nummismatch = FALSE; + sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem); + sinfo->valbuflen = INITBUFLEN; + sinfo->valbuf = (char *)palloc(sinfo->valbuflen); + sinfo->cstrs = (char **)palloc(sinfo->nattrs * sizeof(char *)); - values = (char **) palloc(2 * sizeof(char *)); - values[0] = (char *) palloc(12); /* sign, 10 digits, '\0' */ - - sprintf(values[0], "%d", call_cntr + 1); - - values[1] = results[call_cntr]; - - /* build the tuple */ - tuple = BuildTupleFromCStrings(attinmeta, values); - - /* make the tuple into a datum */ - result = HeapTupleGetDatum(tuple); + rsinfo->setResult = sinfo->tuplestore; + rsinfo->setDesc = tupdesc; - SRF_RETURN_NEXT(funcctx, result); - } - else - { - /* do when there is no more left */ - SRF_RETURN_DONE(funcctx); - } + MemoryContextSwitchTo(oldcontext); } - -/* - * dblink_build_sql_insert - * - * Used to generate an SQL insert statement - * based on an existing tuple in a local relation. - * This is useful for selectively replicating data - * to another server via dblink. - * - * API: - * - name of local table of interest - * - an int2vector of attnums which will be used - * to identify the local tuple of interest - * - number of attnums in pkattnums - * - text array of key values which will be used - * to identify the local tuple of interest - * - text array of key values which will be used - * to build the string for execution remotely. These are substituted - * for their counterparts in src_pkattvals_arry - */ -PG_FUNCTION_INFO_V1(dblink_build_sql_insert); -Datum -dblink_build_sql_insert(PG_FUNCTION_ARGS) -{ - text *relname_text = PG_GETARG_TEXT_P(0); - int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1); - int32 pknumatts_arg = PG_GETARG_INT32(2); - ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3); - ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4); - Relation rel; - int *pkattnums; + /* Prototype of this function is PQrowProcessor */ + static int + storeHandler(PGresult *res, PGrowValue *columns, void *param) + { + storeInfo *sinfo = (storeInfo *)param; + HeapTuple tuple; + int newbuflen; + int fields = PQnfields(res); + int i; + char **cstrs = sinfo->cstrs; + char *pbuf; + + if (sinfo->error_occurred) + return -1; + + if (sinfo->nattrs == 0) + { + int i; + TupleDesc tupdesc = CreateTemplateTupleDesc(fields, false); + + sinfo->nattrs = fields; + for (i = 1 ; i <= fields ; i++) + TupleDescInitEntry(tupdesc, (AttrNumber)i, "hoge", + TEXTOID, -1, 0); + sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc); + sinfo->tupdesc = tupdesc; + } + + if (sinfo->nattrs != fields) + { + sinfo->error_occurred = TRUE; + sinfo->nummismatch = TRUE; + + /* This error will be processed in dblink_record_internal() */ + return -1; + } + + /* + * value input functions assumes that the input string is + * terminated by zero. We should make the values to be so. + */ + + /* + * The length of the buffer for each field is value length + 1 for + * zero-termination + */ + newbuflen = fields; + for(i = 0 ; i < fields ; i++) + newbuflen += columns[i].len; + + if (newbuflen > sinfo->valbuflen) + { + int tmplen = sinfo->valbuflen * 2; + /* + * Try to (re)allocate in bigger steps to avoid flood of allocations + * on weird data. + */ + while (newbuflen > tmplen && tmplen >= 0) + tmplen *= 2; + + /* Check if the integer was wrap-rounded. */ + if (tmplen < 0) + elog(ERROR, "Buffer size for one row exceeds integer limit"); + sinfo->valbuf = (char *)repalloc(sinfo->valbuf, tmplen); + sinfo->valbuflen = tmplen; + } + + pbuf = sinfo->valbuf; + for(i = 0 ; i < fields ; i++) + { + int len = columns[i].len; + if (len < 0) + cstrs[i] = NULL; + else + { + cstrs[i] = pbuf; + memcpy(pbuf, columns[i].value, len); + pbuf += len; + *pbuf++ = '\0'; + } + } + + /* + * These functions may throw exception. It will be caught in + * dblink_record_internal() + */ + tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs); + tuplestore_puttuple(sinfo->tuplestore, tuple); + + return 1; + } + + /* + * List all open dblink connections by name. + * Returns an array of all connection names. + * Takes no params + */ + PG_FUNCTION_INFO_V1(dblink_get_connections); + Datum + dblink_get_connections(PG_FUNCTION_ARGS) + { + HASH_SEQ_STATUS status; + remoteConnHashEnt *hentry; + ArrayBuildState *astate = NULL; + + if (remoteConnHash) + { + hash_seq_init(&status, remoteConnHash); + while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL) + { + /* stash away current value */ + astate = accumArrayResult(astate, + CStringGetTextDatum(hentry->name), + false, TEXTOID, CurrentMemoryContext); + } + } + + if (astate) + PG_RETURN_ARRAYTYPE_P(makeArrayResult(astate, + CurrentMemoryContext)); + else + PG_RETURN_NULL(); + } + + /* + * Checks if a given remote connection is busy + * + * Returns 1 if the connection is busy, 0 otherwise + * Params: + * text connection_name - name of the connection to check + * + */ + PG_FUNCTION_INFO_V1(dblink_is_busy); + Datum + dblink_is_busy(PG_FUNCTION_ARGS) + { + char *conname = NULL; + PGconn *conn = NULL; + remoteConn *rconn = NULL; + + DBLINK_INIT; + DBLINK_GET_NAMED_CONN; + + /* + * The result will be read by calling dblink_is_busy on current implement. + * This disables to use storeHandler afterwards. Materialize needs return + * type information of dblink_get_result which is not available here. + */ + rconn->materialize_needed = true; + + PQconsumeInput(conn); + PG_RETURN_INT32(PQisBusy(conn)); + } + + /* + * Cancels a running request on a connection + * + * Returns text: + * "OK" if the cancel request has been sent correctly, + * an error message otherwise + * + * Params: + * text connection_name - name of the connection to check + * + */ + PG_FUNCTION_INFO_V1(dblink_cancel_query); + Datum + dblink_cancel_query(PG_FUNCTION_ARGS) + { + int res = 0; + char *conname = NULL; + PGconn *conn = NULL; + remoteConn *rconn = NULL; + PGcancel *cancel; + char errbuf[256]; + + DBLINK_INIT; + DBLINK_GET_NAMED_CONN; + cancel = PQgetCancel(conn); + + res = PQcancel(cancel, errbuf, 256); + PQfreeCancel(cancel); + + if (res == 1) + PG_RETURN_TEXT_P(cstring_to_text("OK")); + else + PG_RETURN_TEXT_P(cstring_to_text(errbuf)); + } + + + /* + * Get error message from a connection + * + * Returns text: + * "OK" if no error, an error message otherwise + * + * Params: + * text connection_name - name of the connection to check + * + */ + PG_FUNCTION_INFO_V1(dblink_error_message); + Datum + dblink_error_message(PG_FUNCTION_ARGS) + { + char *msg; + char *conname = NULL; + PGconn *conn = NULL; + remoteConn *rconn = NULL; + + DBLINK_INIT; + DBLINK_GET_NAMED_CONN; + + msg = PQerrorMessage(conn); + if (msg == NULL || msg[0] == '\0') + PG_RETURN_TEXT_P(cstring_to_text("OK")); + else + PG_RETURN_TEXT_P(cstring_to_text(msg)); + } + + /* + * Execute an SQL non-SELECT command + */ + PG_FUNCTION_INFO_V1(dblink_exec); + Datum + dblink_exec(PG_FUNCTION_ARGS) + { + text *volatile sql_cmd_status = NULL; + PGconn *volatile conn = NULL; + volatile bool freeconn = false; + + DBLINK_INIT; + + PG_TRY(); + { + char *msg; + PGresult *res = NULL; + char *connstr = NULL; + char *sql = NULL; + char *conname = NULL; + remoteConn *rconn = NULL; + bool fail = true; /* default to backward compatible behavior */ + + if (PG_NARGS() == 3) + { + /* must be text,text,bool */ + DBLINK_GET_CONN; + sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); + fail = PG_GETARG_BOOL(2); + } + else if (PG_NARGS() == 2) + { + /* might be text,text or text,bool */ + if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) + { + conn = pconn->conn; + sql = text_to_cstring(PG_GETARG_TEXT_PP(0)); + fail = PG_GETARG_BOOL(1); + } + else + { + DBLINK_GET_CONN; + sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); + } + } + else if (PG_NARGS() == 1) + { + /* must be single text argument */ + conn = pconn->conn; + sql = text_to_cstring(PG_GETARG_TEXT_PP(0)); + } + else + /* shouldn't happen */ + elog(ERROR, "wrong number of arguments"); + + if (!conn) + DBLINK_CONN_NOT_AVAIL; + + res = PQexec(conn, sql); + if (!res || + (PQresultStatus(res) != PGRES_COMMAND_OK && + PQresultStatus(res) != PGRES_TUPLES_OK)) + { + dblink_res_error(conname, res, "could not execute command", fail); + + /* + * and save a copy of the command status string to return as our + * result tuple + */ + sql_cmd_status = cstring_to_text("ERROR"); + } + else if (PQresultStatus(res) == PGRES_COMMAND_OK) + { + /* + * and save a copy of the command status string to return as our + * result tuple + */ + sql_cmd_status = cstring_to_text(PQcmdStatus(res)); + PQclear(res); + } + else + { + PQclear(res); + ereport(ERROR, + (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED), + errmsg("statement returning results not allowed"))); + } + } + PG_CATCH(); + { + /* if needed, close the connection to the database */ + if (freeconn) + PQfinish(conn); + PG_RE_THROW(); + } + PG_END_TRY(); + + /* if needed, close the connection to the database */ + if (freeconn) + PQfinish(conn); + + PG_RETURN_TEXT_P(sql_cmd_status); + } + + + /* + * dblink_get_pkey + * + * Return list of primary key fields for the supplied relation, + * or NULL if none exists. + */ + PG_FUNCTION_INFO_V1(dblink_get_pkey); + Datum + dblink_get_pkey(PG_FUNCTION_ARGS) + { + int16 numatts; + char **results; + FuncCallContext *funcctx; + int32 call_cntr; + int32 max_calls; + AttInMetadata *attinmeta; + MemoryContext oldcontext; + + /* stuff done only on the first call of the function */ + if (SRF_IS_FIRSTCALL()) + { + Relation rel; + TupleDesc tupdesc; + + /* create a function context for cross-call persistence */ + funcctx = SRF_FIRSTCALL_INIT(); + + /* + * switch to memory context appropriate for multiple function calls + */ + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + /* open target relation */ + rel = get_rel_from_relname(PG_GETARG_TEXT_P(0), AccessShareLock, ACL_SELECT); + + /* get the array of attnums */ + results = get_pkey_attnames(rel, &numatts); + + relation_close(rel, AccessShareLock); + + /* + * need a tuple descriptor representing one INT and one TEXT column + */ + tupdesc = CreateTemplateTupleDesc(2, false); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position", + INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname", + TEXTOID, -1, 0); + + /* + * Generate attribute metadata needed later to produce tuples from raw + * C strings + */ + attinmeta = TupleDescGetAttInMetadata(tupdesc); + funcctx->attinmeta = attinmeta; + + if ((results != NULL) && (numatts > 0)) + { + funcctx->max_calls = numatts; + + /* got results, keep track of them */ + funcctx->user_fctx = results; + } + else + { + /* fast track when no results */ + MemoryContextSwitchTo(oldcontext); + SRF_RETURN_DONE(funcctx); + } + + MemoryContextSwitchTo(oldcontext); + } + + /* stuff done on every call of the function */ + funcctx = SRF_PERCALL_SETUP(); + + /* + * initialize per-call variables + */ + call_cntr = funcctx->call_cntr; + max_calls = funcctx->max_calls; + + results = (char **) funcctx->user_fctx; + attinmeta = funcctx->attinmeta; + + if (call_cntr < max_calls) /* do when there is more left to send */ + { + char **values; + HeapTuple tuple; + Datum result; + + values = (char **) palloc(2 * sizeof(char *)); + values[0] = (char *) palloc(12); /* sign, 10 digits, '\0' */ + + sprintf(values[0], "%d", call_cntr + 1); + + values[1] = results[call_cntr]; + + /* build the tuple */ + tuple = BuildTupleFromCStrings(attinmeta, values); + + /* make the tuple into a datum */ + result = HeapTupleGetDatum(tuple); + + SRF_RETURN_NEXT(funcctx, result); + } + else + { + /* do when there is no more left */ + SRF_RETURN_DONE(funcctx); + } + } + + + /* + * dblink_build_sql_insert + * + * Used to generate an SQL insert statement + * based on an existing tuple in a local relation. + * This is useful for selectively replicating data + * to another server via dblink. + * + * API: + * - name of local table of interest + * - an int2vector of attnums which will be used + * to identify the local tuple of interest + * - number of attnums in pkattnums + * - text array of key values which will be used + * to identify the local tuple of interest + * - text array of key values which will be used + * to build the string for execution remotely. These are substituted + * for their counterparts in src_pkattvals_arry + */ + PG_FUNCTION_INFO_V1(dblink_build_sql_insert); + Datum + dblink_build_sql_insert(PG_FUNCTION_ARGS) + { + text *relname_text = PG_GETARG_TEXT_P(0); + int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1); + int32 pknumatts_arg = PG_GETARG_INT32(2); + ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3); + ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4); + Relation rel; + int *pkattnums; int pknumatts; char **src_pkattvals; char **tgt_pkattvals;