Re: Allow substitute allocators for PGresult. - Mailing list pgsql-hackers
From | Kyotaro HORIGUCHI |
---|---|
Subject | Re: Allow substitute allocators for PGresult. |
Date | |
Msg-id | 20111201.194803.07040624.horiguchi.kyotaro@oss.ntt.co.jp Whole thread Raw |
In response to | Re: Allow substitute allocators for PGresult. (Kyotaro HORIGUCHI <horiguchi.kyotaro@oss.ntt.co.jp>) |
Responses |
Re: Allow substitute allocators for PGresult.
Re: Allow substitute allocators for PGresult. |
List | pgsql-hackers |
Ouch! I'm sorry for making a reverse patch for the first modification. This is an amendment of the message below. The body text is copied into this message. http://archives.postgresql.org/message-id/20111201.192419.103527179.horiguchi.kyotaro@oss.ntt.co.jp ======= Hello, This is the next version of Allow substitute allocators for PGresult. Totally chaning the concept from the previous one, this patch allows libpq to handle alternative tuple store for received tuples. Design guidelines are shown below. - No need to modify existing client code of libpq. - Existing libpq client runs with roughly same performance, and dblink with modification runs faster to some extent and requires less memory. I have measured roughly of run time and memory requirement for three configurations on CentOS6 on Vbox with 2GB mem 4 cores running on Win7-Corei7, transferring (30 bytes * 2 cols) * 2000000 tuples (120MB net) within this virutal machine. The results are below. xfer time Peak RSS Original : 6.02s 850MB libpq patch + Original dblink : 6.11s 850MB full patch : 4.44s 643MB xfer time here is the mean of five 'real time's measured by running sql script like this after warmup run. === test.sql select dblink_connect('c', 'host=localhost port=5432 dbname=test'); select * from dblink('c', 'select a,c from foo limit 2000000') as (a text, b bytea) limit 1; select dblink_disconnect('c'); === $ for i in $(seq 1 10); do time psql test -f t.sql; done === Peak RSS is measured by picking up heap Rss in /proc/[pid]/smaps. It seems somewhat slow using patched libpq and original dblink, but it seems within error range too. If this amount of slowdown is not permissible, it might be improved by restoring the static call route before for extra redundancy of the code. On the other hand, full patch version seems obviously fast and requires less memory. Isn't it nice? This patch consists of two sub patches. The first is a patch for libpq to allow rewiring tuple storage mechanism. But default behavior is not changed. Existing libpq client should run with it. The second is modify dblink to storing received tuples into tuplestore directly using the mechanism above. regards, -- Kyotaro Horiguchi NTT Open Source Software Center diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index 1af8df6..a360d78 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -160,3 +160,7 @@ PQconnectStartParams 157PQping 158PQpingParams 159PQlibVersion 160 +PQregisterTupleAdder 161 +PQgetAsCstring 162 +PQgetAddTupleParam 163 +PQsetAddTupleErrMes 164 diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index 50f3f83..437be26 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -2692,6 +2692,7 @@ makeEmptyPGconn(void) conn->allow_ssl_try = true; conn->wait_ssl_try = false;#endif + conn->addTupleFunc = NULL; /* * We try to send at least 8K at a time, which is the usual size of pipe @@ -5064,3 +5065,10 @@ PQregisterThreadLock(pgthreadlock_t newhandler) return prev;} + +void +PQregisterTupleAdder(PGconn *conn, addTupleFunction func, void *param) +{ + conn->addTupleFunc = func; + conn->addTupleFuncParam = param; +} diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index 113aab0..c8ec9bd 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -48,7 +48,6 @@ char *const pgresStatus[] = {static int static_client_encoding = PG_SQL_ASCII;static bool static_std_strings= false; -static PGEvent *dupEvents(PGEvent *events, int count);static bool PQsendQueryStart(PGconn *conn);static int PQsendQueryGuts(PGconn*conn, @@ -66,7 +65,9 @@ static PGresult *PQexecFinish(PGconn *conn);static int PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target);static int check_field_number(const PGresult *res, int field_num); - +static void *pqDefaultAddTupleFunc(PGresult *res, AddTupFunc func, + int id, size_t len); +static void *pqAddTuple(PGresult *res, PGresAttValue *tup);/* ---------------- * Space management for PGresult. @@ -160,6 +161,9 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status) result->curBlock = NULL; result->curOffset= 0; result->spaceLeft = 0; + result->addTupleFunc = pqDefaultAddTupleFunc; + result->addTupleFuncParam = NULL; + result->addTupleFuncErrMes = NULL; if (conn) { @@ -194,6 +198,12 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status) } result->nEvents = conn->nEvents; } + + if (conn->addTupleFunc) + { + result->addTupleFunc = conn->addTupleFunc; + result->addTupleFuncParam = conn->addTupleFuncParam; + } } else { @@ -487,6 +497,33 @@ PQresultAlloc(PGresult *res, size_t nBytes) return pqResultAlloc(res, nBytes, TRUE);} +void * +pqDefaultAddTupleFunc(PGresult *res, AddTupFunc func, int id, size_t len) +{ + void *p; + + switch (func) + { + case ADDTUP_ALLOC_TEXT: + return pqResultAlloc(res, len, TRUE); + + case ADDTUP_ALLOC_BINARY: + p = pqResultAlloc(res, len, FALSE); + + if (id == -1) + res->addTupleFuncParam = p; + + return p; + + case ADDTUP_ADD_TUPLE: + return pqAddTuple(res, res->addTupleFuncParam); + + default: + /* Ignore */ + break; + } + return NULL; +}/* * pqResultAlloc - * Allocate subsidiary storage for a PGresult. @@ -830,9 +867,9 @@ pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)/* * pqAddTuple * add a row pointerto the PGresult structure, growing it if necessary - * Returns TRUE if OK, FALSE if not enough memory to add the row + * Returns tup if OK, NULL if not enough memory to add the row. */ -int +static void *pqAddTuple(PGresult *res, PGresAttValue *tup){ if (res->ntups >= res->tupArrSize) @@ -858,13 +895,13 @@ pqAddTuple(PGresult *res, PGresAttValue *tup) newTuples = (PGresAttValue **) realloc(res->tuples, newSize * sizeof(PGresAttValue *)); if (!newTuples) - return FALSE; /* malloc or realloc failed */ + return NULL; /* malloc or realloc failed */ res->tupArrSize = newSize; res->tuples = newTuples; } res->tuples[res->ntups] = tup; res->ntups++; - return TRUE; + return tup;}/* @@ -2822,6 +2859,43 @@ PQgetisnull(const PGresult *res, int tup_num, int field_num) return 0;} +/* PQgetAsCString + * returns the field as C string. + */ +char * +PQgetAsCstring(PGresAttValue *attval) +{ + return attval->len == NULL_LEN ? NULL : attval->value; +} + +/* PQgetAddTupleParam + * Get the pointer to the contextual parameter from PGresult which is + * registered to PGconn by PQregisterTupleAdder + */ +void * +PQgetAddTupleParam(const PGresult *res) +{ + if (!res) + return NULL; + return res->addTupleFuncParam; +} + +/* PQsetAddTupleErrMes + * Set the error message pass back to the caller of addTupleFunc + * mes must be a malloc'ed memory block and it is released by the + * caller of addTupleFunc if set. + * You can replace the previous message by alternative mes, or clear + * it with NULL. + */ +void +PQsetAddTupleErrMes(PGresult *res, char *mes) +{ + /* Free existing message */ + if (res->addTupleFuncErrMes) + free(res->addTupleFuncErrMes); + res->addTupleFuncErrMes = mes; +} +/* PQnparams: * returns the number of input parameters of a prepared statement. */ diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c index 77c4d5a..c7f74ae 100644 --- a/src/interfaces/libpq/fe-protocol2.c +++ b/src/interfaces/libpq/fe-protocol2.c @@ -733,9 +733,10 @@ getAnotherTuple(PGconn *conn, bool binary) if (conn->curTuple == NULL) { conn->curTuple= (PGresAttValue *) - pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE); + result->addTupleFunc(result, ADDTUP_ALLOC_BINARY, -1, + nfields * sizeof(PGresAttValue)); if (conn->curTuple == NULL) - goto outOfMemory; + goto addTupleError; MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue)); /* @@ -757,7 +758,7 @@ getAnotherTuple(PGconn *conn, bool binary) { bitmap = (char *) malloc(nbytes); if (!bitmap) - goto outOfMemory; + goto addTupleError; } if (pqGetnchar(bitmap, nbytes, conn)) @@ -787,9 +788,12 @@ getAnotherTuple(PGconn *conn, bool binary) vlen = 0; if (tup[i].value == NULL) { - tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary); + AddTupFunc func = + (binary ? ADDTUP_ALLOC_BINARY : ADDTUP_ALLOC_TEXT); + tup[i].value = + (char *) result->addTupleFunc(result, func, i, vlen + 1); if (tup[i].value == NULL) - goto outOfMemory; + goto addTupleError; } tup[i].len = vlen; /* read in the value */ @@ -812,8 +816,9 @@ getAnotherTuple(PGconn *conn, bool binary) } /* Success! Store the completed tuple in the result*/ - if (!pqAddTuple(result, tup)) - goto outOfMemory; + if (!result->addTupleFunc(result, ADDTUP_ADD_TUPLE, 0, 0)) + goto addTupleError; + /* and reset for a new message */ conn->curTuple = NULL; @@ -821,7 +826,7 @@ getAnotherTuple(PGconn *conn, bool binary) free(bitmap); return 0; -outOfMemory: +addTupleError: /* Replace partially constructed result with an error result */ /* @@ -829,8 +834,21 @@ outOfMemory: * there's not enough memory to concatenate messages... */ pqClearAsyncResult(conn); - printfPQExpBuffer(&conn->errorMessage, - libpq_gettext("out of memory for query result\n")); + resetPQExpBuffer(&conn->errorMessage); + + /* + * If error message is passed from addTupleFunc, set it into + * PGconn, assume out of memory if not. + */ + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext(result->addTupleFuncErrMes ? + result->addTupleFuncErrMes : + "out of memory for query result\n")); + if (result->addTupleFuncErrMes) + { + free(result->addTupleFuncErrMes); + result->addTupleFuncErrMes = NULL; + } /* * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 45a84d8..d14b57a 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -634,9 +634,10 @@ getAnotherTuple(PGconn *conn, int msgLength) if (conn->curTuple == NULL) { conn->curTuple= (PGresAttValue *) - pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE); + result->addTupleFunc(result, ADDTUP_ALLOC_BINARY, -1, + nfields * sizeof(PGresAttValue)); if (conn->curTuple == NULL) - goto outOfMemory; + goto addTupleError; MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue)); } tup = conn->curTuple; @@ -673,11 +674,12 @@ getAnotherTuple(PGconn *conn, int msgLength) vlen = 0; if (tup[i].value == NULL) { - bool isbinary = (result->attDescs[i].format != 0); - - tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary); + AddTupFunc func = (result->attDescs[i].format != 0 ? + ADDTUP_ALLOC_BINARY : ADDTUP_ALLOC_TEXT); + tup[i].value = + (char *) result->addTupleFunc(result, func, i, vlen + 1); if (tup[i].value == NULL) - goto outOfMemory; + goto addTupleError; } tup[i].len = vlen; /* read in the value */ @@ -689,22 +691,36 @@ getAnotherTuple(PGconn *conn, int msgLength) } /* Success! Store the completed tuple in theresult */ - if (!pqAddTuple(result, tup)) - goto outOfMemory; + if (!result->addTupleFunc(result, ADDTUP_ADD_TUPLE, 0, 0)) + goto addTupleError; + /* and reset for a new message */ conn->curTuple = NULL; return 0; -outOfMemory: +addTupleError: /* * Replace partially constructed result with an error result. First * discard the old resultto try to win back some memory. */ pqClearAsyncResult(conn); - printfPQExpBuffer(&conn->errorMessage, - libpq_gettext("out of memory for query result\n")); + resetPQExpBuffer(&conn->errorMessage); + + /* + * If error message is passed from addTupleFunc, set it into + * PGconn, assume out of memory if not. + */ + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext(result->addTupleFuncErrMes ? + result->addTupleFuncErrMes : + "out of memory for query result\n")); + if (result->addTupleFuncErrMes) + { + free(result->addTupleFuncErrMes); + result->addTupleFuncErrMes = NULL; + } pqSaveErrorResult(conn); /* Discard the failed message by pretending we read it */ diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index d13a5b9..bdce294 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -116,6 +116,16 @@ typedef enum PQPING_NO_ATTEMPT /* connection not attempted (bad params) */} PGPing; +/* AddTupFunc is one of the parameters of addTupleFunc that decides + * the function of the addTupleFunction. See addTupleFunction for + * details */ +typedef enum +{ + ADDTUP_ALLOC_TEXT, /* Returns non-aligned memory for text value */ + ADDTUP_ALLOC_BINARY, /* Returns aligned memory for binary value */ + ADDTUP_ADD_TUPLE /* Adds tuple data into tuple storage */ +} AddTupFunc; +/* PGconn encapsulates a connection to the backend. * The contents of this struct are not supposed to be known to applications.*/ @@ -225,6 +235,12 @@ typedef struct pgresAttDesc int atttypmod; /* type-specific modifier info */} PGresAttDesc; +typedef struct pgresAttValue +{ + int len; /* length in bytes of the value */ + char *value; /* actual value, plus terminating zero byte */ +} PGresAttValue; +/* ---------------- * Exported functions of libpq * ---------------- @@ -416,6 +432,52 @@ extern PGPing PQping(const char *conninfo);extern PGPing PQpingParams(const char *const * keywords, const char *const * values, int expand_dbname); +/* + * Typedef for tuple storage function. + * + * This function pointer is used for tuple storage function in + * PGresult and PGconn. + * + * addTupleFunction is called for four types of function designated by + * the enum AddTupFunc. + * + * id is the identifier for allocated memory block. The caller sets -1 + * for PGresAttValue array, and 0 to number of cols - 1 for each + * column. + * + * ADDTUP_ALLOC_TEXT requests the size bytes memory block for a text + * value which may not be alingned to the word boundary. + * + * ADDTUP_ALLOC_BINARY requests the size bytes memory block for a + * binary value which is aligned to the word boundary. + * + * ADDTUP_ADD_TUPLE requests to add tuple data into storage, and + * free the memory blocks allocated by this function if necessary. + * id and size are ignored. + * + * This function must return non-NULL value for success and must + * return NULL for failure and may set error message by + * PQsetAddTupleErrMes in malloc'ed memory. Assumed by caller as out + * of memory if the error message is NULL on failure. This function is + * assumed not to throw any exception. + */ + typedef void *(*addTupleFunction)(PGresult *res, AddTupFunc func, + int id, size_t size); + +/* + * Register alternative tuple storage function to PGconn. + * + * By registering this function, pg_result disables its own tuple + * storage and calls it to append rows one by one. + * + * func is tuple store function. See addTupleFunction. + * + * addTupFuncParam is contextual storage that can be get with + * PQgetAddTupleParam in func. + */ +extern void PQregisterTupleAdder(PGconn *conn, addTupleFunction func, + void *addTupFuncParam); +/* Force the write buffer to be written (or at least try) */extern int PQflush(PGconn *conn); @@ -454,6 +516,9 @@ extern char *PQcmdTuples(PGresult *res);extern char *PQgetvalue(const PGresult *res, int tup_num, intfield_num);extern int PQgetlength(const PGresult *res, int tup_num, int field_num);extern int PQgetisnull(constPGresult *res, int tup_num, int field_num); +extern char *PQgetAsCstring(PGresAttValue *attdesc); +extern void *PQgetAddTupleParam(const PGresult *res); +extern void PQsetAddTupleErrMes(PGresult *res, char *mes);extern int PQnparams(const PGresult *res);extern Oid PQparamtype(const PGresult *res, int param_num); diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 64dfcb2..45e4c93 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -134,12 +134,6 @@ typedef struct pgresParamDesc#define NULL_LEN (-1) /* pg_result len for NULL value */ -typedef struct pgresAttValue -{ - int len; /* length in bytes of the value */ - char *value; /* actual value, plus terminating zero byte */ -} PGresAttValue; -/* Typedef for message-field list entries */typedef struct pgMessageField{ @@ -209,6 +203,11 @@ struct pg_result PGresult_data *curBlock; /* most recently allocated block */ int curOffset; /* start offset of free space in block */ int spaceLeft; /* number of free bytesremaining in block */ + + addTupleFunction addTupleFunc; /* Tuple storage function. See + * addTupleFunction for details. */ + void *addTupleFuncParam; /* Contextual parameter for addTupleFunc */ + char *addTupleFuncErrMes; /* Error message returned from addTupFunc */};/* PGAsyncStatusType defines the stateof the query-execution state machine */ @@ -443,6 +442,13 @@ struct pg_conn /* Buffer for receiving various parts of messages */ PQExpBufferData workBuffer;/* expansible string */ + + /* Tuple store function. The two fields below is copied to newly + * created PGresult if addTupleFunc is not NULL. Use default + * function if addTupleFunc is NULL. */ + addTupleFunction addTupleFunc; /* Tuple storage function. See + * addTupleFunction for details. */ + void *addTupleFuncParam; /* Contextual parameter for addTupFunc */};/* PGcancel stores all data necessary to cancela connection. A copy of this @@ -507,7 +513,6 @@ extern voidpqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)/* This lets gcc check theformat string for consistency. */__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3))); -extern int pqAddTuple(PGresult *res, PGresAttValue *tup);extern void pqSaveMessageField(PGresult *res, char code, const char *value);extern void pqSaveParameterStatus(PGconn *conn, const char *name, diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index 62c810a..fb2e10e 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -63,11 +63,23 @@ typedef struct remoteConn bool newXactForCursor; /* Opened a transaction for a cursor*/} remoteConn; +typedef struct storeInfo +{ + Tuplestorestate *tuplestore; + int nattrs; + AttInMetadata *attinmeta; + MemoryContext oldcontext; + char *attrvalbuf; + void **valbuf; + size_t *valbufsize; + bool error_occurred; + bool nummismatch; +} storeInfo; +/* * Internal declarations */static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async); -static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);static remoteConn *getConnectionByName(const char*name);static HTAB *createConnHash(void);static void createNewConnection(const char *name, remoteConn *rconn); @@ -90,6 +102,10 @@ static char *escape_param_str(const char *from);static void validate_pkattnums(Relation rel, int2vector *pkattnums_arg, int32 pknumatts_arg, int **pkattnums, int *pknumatts); +static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo); +static void finishStoreInfo(storeInfo *sinfo); +static void *addTuple(PGresult *res, AddTupFunc func, int id, size_t size); +/* Global */static remoteConn *pconn = NULL; @@ -503,6 +519,7 @@ dblink_fetch(PG_FUNCTION_ARGS) char *curname = NULL; int howmany = 0; bool fail = true; /* default to backward compatible */ + storeInfo storeinfo; DBLINK_INIT; @@ -559,15 +576,30 @@ dblink_fetch(PG_FUNCTION_ARGS) appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname); /* + * Result is stored into storeinfo.tuplestore instead of + * res->result retuned by PQexec below + */ + initStoreInfo(&storeinfo, fcinfo); + PQregisterTupleAdder(conn, addTuple, &storeinfo); + + /* * Try to execute the query. Note that since libpq uses malloc, the * PGresult will be long-lived even thoughwe are still in a short-lived * memory context. */ res = PQexec(conn, buf.data); + finishStoreInfo(&storeinfo); + if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) { + /* This is only for backward compatibility */ + if (storeinfo.nummismatch) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match " + "the specified FROM clause rowtype"))); dblink_res_error(conname, res, "could not fetchfrom cursor", fail); return (Datum) 0; } @@ -580,7 +612,6 @@ dblink_fetch(PG_FUNCTION_ARGS) errmsg("cursor \"%s\" does not exist", curname))); } - materializeResult(fcinfo, res); return (Datum) 0;} @@ -640,6 +671,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) remoteConn *rconn = NULL; bool fail = true; /* default to backward compatible */ bool freeconn = false; + storeInfo storeinfo; /* check to see if caller supports us returning a tuplestore */ if (rsinfo == NULL || !IsA(rsinfo,ReturnSetInfo)) @@ -715,164 +747,206 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) rsinfo->setResult = NULL; rsinfo->setDesc= NULL; + + /* + * Result is stored into storeinfo.tuplestore instead of + * res->result retuned by PQexec/PQgetResult below + */ + initStoreInfo(&storeinfo, fcinfo); + PQregisterTupleAdder(conn, addTuple, &storeinfo); + /* synchronous query, or async result retrieval */ if (!is_async) res = PQexec(conn, sql); else - { res = PQgetResult(conn); - /* NULL means we're all done with the async results */ - if (!res) - return (Datum) 0; - } - /* if needed, close the connection to the database and cleanup */ - if (freeconn) - PQfinish(conn); + finishStoreInfo(&storeinfo); - if (!res || - (PQresultStatus(res) != PGRES_COMMAND_OK && - PQresultStatus(res) != PGRES_TUPLES_OK)) + /* NULL res from async get means we're all done with the results */ + if (res || !is_async) { - dblink_res_error(conname, res, "could not execute query", fail); - return (Datum) 0; + if (freeconn) + PQfinish(conn); + + if (!res || + (PQresultStatus(res) != PGRES_COMMAND_OK && + PQresultStatus(res) != PGRES_TUPLES_OK)) + { + /* This is only for backward compatibility */ + if (storeinfo.nummismatch) + { + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match " + "the specified FROM clause rowtype"))); + } + dblink_res_error(conname, res, "could not execute query", fail); + return (Datum) 0; + } } - materializeResult(fcinfo, res); return (Datum) 0;} -/* - * Materialize the PGresult to return them as the function result. - * The res will be released in this function. - */static void -materializeResult(FunctionCallInfo fcinfo, PGresult *res) +initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo){ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; - - Assert(rsinfo->returnMode == SFRM_Materialize); - - PG_TRY(); + TupleDesc tupdesc; + int i; + + switch (get_call_result_type(fcinfo, NULL, &tupdesc)) { - TupleDesc tupdesc; - bool is_sql_cmd = false; - int ntuples; - int nfields; + case TYPEFUNC_COMPOSITE: + /* success */ + break; + case TYPEFUNC_RECORD: + /* failed to determine actual type of RECORD */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("function returning record called in context " + "that cannot accept type record"))); + break; + default: + /* result type isn't composite */ + elog(ERROR, "return type must be a row type"); + break; + } + + sinfo->oldcontext = MemoryContextSwitchTo( + rsinfo->econtext->ecxt_per_query_memory); + + /* make sure we have a persistent copy of the tupdesc */ + tupdesc = CreateTupleDescCopy(tupdesc); + + sinfo->error_occurred = FALSE; + sinfo->nummismatch = FALSE; + sinfo->nattrs = tupdesc->natts; + sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem); + sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc); + sinfo->valbuf = (void **)malloc(sinfo->nattrs * sizeof(void *)); + sinfo->valbufsize = (size_t *)malloc(sinfo->nattrs * sizeof(size_t)); + for (i = 0 ; i < sinfo->nattrs ; i++) + { + sinfo->valbuf[i] = NULL; + sinfo->valbufsize[i] = 0; + } - if (PQresultStatus(res) == PGRES_COMMAND_OK) - { - is_sql_cmd = true; - - /* - * need a tuple descriptor representing one TEXT column to return - * the command status string as our result tuple - */ - tupdesc = CreateTemplateTupleDesc(1, false); - TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", - TEXTOID, -1, 0); - ntuples = 1; - nfields = 1; - } - else - { - Assert(PQresultStatus(res) == PGRES_TUPLES_OK); + /* Preallocate memory of same size with PGresAttDesc array for values. */ + sinfo->attrvalbuf = (char *) malloc(sinfo->nattrs * sizeof(PGresAttValue)); - is_sql_cmd = false; + rsinfo->setResult = sinfo->tuplestore; + rsinfo->setDesc = tupdesc; +} - /* get a tuple descriptor for our result type */ - switch (get_call_result_type(fcinfo, NULL, &tupdesc)) - { - case TYPEFUNC_COMPOSITE: - /* success */ - break; - case TYPEFUNC_RECORD: - /* failed to determine actual type of RECORD */ - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("function returning record called in context " - "that cannot accept type record"))); - break; - default: - /* result type isn't composite */ - elog(ERROR, "return type must be a row type"); - break; - } +static void +finishStoreInfo(storeInfo *sinfo) +{ + int i; - /* make sure we have a persistent copy of the tupdesc */ - tupdesc = CreateTupleDescCopy(tupdesc); - ntuples = PQntuples(res); - nfields = PQnfields(res); + for (i = 0 ; i < sinfo->nattrs ; i++) + { + if (sinfo->valbuf[i]) + { + free(sinfo->valbuf[i]); + sinfo->valbuf[i] = NULL; } + } + if (sinfo->attrvalbuf) + free(sinfo->attrvalbuf); + sinfo->attrvalbuf = NULL; + MemoryContextSwitchTo(sinfo->oldcontext); +} - /* - * check result and tuple descriptor have the same number of columns - */ - if (nfields != tupdesc->natts) - ereport(ERROR, - (errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("remote query result rowtype does not match " - "the specified FROM clause rowtype"))); +static void * +addTuple(PGresult *res, AddTupFunc func, int id, size_t size) +{ + storeInfo *sinfo = (storeInfo *)PQgetAddTupleParam(res); + HeapTuple tuple; + int fields = PQnfields(res); + int i; + PGresAttValue *attval; + char **cstrs; - if (ntuples > 0) - { - AttInMetadata *attinmeta; - Tuplestorestate *tupstore; - MemoryContext oldcontext; - int row; - char **values; - - attinmeta = TupleDescGetAttInMetadata(tupdesc); - - oldcontext = MemoryContextSwitchTo( - rsinfo->econtext->ecxt_per_query_memory); - tupstore = tuplestore_begin_heap(true, false, work_mem); - rsinfo->setResult = tupstore; - rsinfo->setDesc = tupdesc; - MemoryContextSwitchTo(oldcontext); + if (sinfo->error_occurred) + return NULL; - values = (char **) palloc(nfields * sizeof(char *)); + switch (func) + { + case ADDTUP_ALLOC_TEXT: + case ADDTUP_ALLOC_BINARY: + if (id == -1) + return sinfo->attrvalbuf; + + if (id < 0 || id >= sinfo->nattrs) + return NULL; - /* put all tuples into the tuplestore */ - for (row = 0; row < ntuples; row++) + if (sinfo->valbufsize[id] < size) { - HeapTuple tuple; + if (sinfo->valbuf[id] == NULL) + sinfo->valbuf[id] = malloc(size); + else + sinfo->valbuf[id] = realloc(sinfo->valbuf[id], size); + sinfo->valbufsize[id] = size; + } + return sinfo->valbuf[id]; - if (!is_sql_cmd) - { - int i; + case ADDTUP_ADD_TUPLE: + break; /* Go through */ + default: + /* Ignore */ + break; + } - for (i = 0; i < nfields; i++) - { - if (PQgetisnull(res, row, i)) - values[i] = NULL; - else - values[i] = PQgetvalue(res, row, i); - } - } - else - { - values[0] = PQcmdStatus(res); - } + if (sinfo->nattrs != fields) + { + sinfo->error_occurred = TRUE; + sinfo->nummismatch = TRUE; + finishStoreInfo(sinfo); - /* build the tuple and put it into the tuplestore. */ - tuple = BuildTupleFromCStrings(attinmeta, values); - tuplestore_puttuple(tupstore, tuple); - } + PQsetAddTupleErrMes(res, + strdup("function returning record called in " + "context that cannot accept type record")); + return NULL; + } - /* clean up and return the tuplestore */ - tuplestore_donestoring(tupstore); - } + /* + * Rewrite PGresAttDesc[] to char(*)[] in-place. + */ + Assert(sizeof(char*) <= sizeof(PGresAttValue)); + attval = (PGresAttValue *)sinfo->attrvalbuf; + cstrs = (char **)sinfo->attrvalbuf; + for(i = 0 ; i < fields ; i++) + cstrs[i] = PQgetAsCstring(attval++); - PQclear(res); + PG_TRY(); + { + tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs); + tuplestore_puttuple(sinfo->tuplestore, tuple); } PG_CATCH(); { - /* be sure to release the libpq result */ - PQclear(res); - PG_RE_THROW(); + /* + * Return the error message in the exception to the caller and + * cancel the exception. + */ + ErrorData *edata; + + sinfo->error_occurred = TRUE; + sinfo->nummismatch = TRUE; + + finishStoreInfo(sinfo); + + edata = CopyErrorData(); + FlushErrorState(); + + PQsetAddTupleErrMes(res, strdup(edata->message)); + return NULL; } PG_END_TRY(); + + return sinfo->attrvalbuf;}/*
pgsql-hackers by date: