Re: Speed dblink using alternate libpq tuple storage - Mailing list pgsql-hackers
From | Kyotaro HORIGUCHI |
---|---|
Subject | Re: Speed dblink using alternate libpq tuple storage |
Date | |
Msg-id | 20120117.175333.162644966.horiguchi.kyotaro@oss.ntt.co.jp Whole thread Raw |
In response to | Speed dblink using alternate libpq tuple storage (Greg Smith <greg@2ndQuadrant.com>) |
Responses |
Re: Speed dblink using alternate libpq tuple storage
|
List | pgsql-hackers |
Hello, This is revised and rebased version of the patch. a. Old term `Add Tuple Function' is changed to 'Store Handler'. The reason why not `storage' is simply length of the symbols. b. I couldn't find the place to settle PGgetAsCString() in. It is removed and storeHandler()@dblink.c touches PGresAttValue directly in this new patch. Definition of PGresAttValue stays in lipq-fe.h and provided with comment. c. Refine error handling of dblink.c. I think it preserves the previous behavior for column number mismatch and type conversionexception. d. Document is revised. > It jumped from 332K tuples/sec to 450K, a 35% gain, and had a > lower memory footprint too. Test methodology and those results > are at > http://archives.postgresql.org/pgsql-hackers/2011-12/msg00008.php It is a disappointment that I found that the gain had become lower than that according to the re-measuring. For CentOS6.2 and other conditions are the same to the previous testing, the overall performance became hihger and the loss of libpq patch was 1.8% and the gain of full patch had been fallen to 5.6%. But the reduction of the memory usage was not changed. Original : 3.96s 100.0% w/libpq patch : 4.03s 101.8% w/libpq+dblink patch : 3.74s 94.4% The attachments are listed below. libpq_altstore_20120117.patch - Allow alternative storage for libpql. dblink_perf_20120117.patch - Modify dblink to use alternative storage mechanism. libpq_altstore_doc_20120117.patch - Document for libpq_altstore. Shows in "31.19. Alternatie result storage" regards, -- Kyotaro Horiguchi NTT Open Source Software Center diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index 1af8df6..83525e1 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -160,3 +160,6 @@ PQconnectStartParams 157PQping 158PQpingParams 159PQlibVersion 160 +PQregisterStoreHandler 161 +PQgetStoreHandlerParam 163 +PQsetStoreHandlerErrMes 164 diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index d454538..5559f0b 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -2692,6 +2692,7 @@ makeEmptyPGconn(void) conn->allow_ssl_try = true; conn->wait_ssl_try = false;#endif + conn->storeHandler = NULL; /* * We try to send at least 8K at a time, which is the usual size of pipe @@ -5076,3 +5077,10 @@ PQregisterThreadLock(pgthreadlock_t newhandler) return prev;} + +void +PQregisterStoreHandler(PGconn *conn, StoreHandler func, void *param) +{ + conn->storeHandler = func; + conn->storeHandlerParam = param; +} diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index b743566..96e5974 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -67,6 +67,10 @@ static int PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target);staticint check_field_number(const PGresult *res, int field_num); +static void *pqDefaultStoreHandler(PGresult *res, PQStoreFunc func, + int id, size_t len); +static void *pqAddTuple(PGresult *res, PGresAttValue *tup); +/* ---------------- * Space management for PGresult. @@ -160,6 +164,9 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status) result->curBlock = NULL; result->curOffset= 0; result->spaceLeft = 0; + result->storeHandler = pqDefaultStoreHandler; + result->storeHandlerParam = NULL; + result->storeHandlerErrMes = NULL; if (conn) { @@ -194,6 +201,12 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status) } result->nEvents = conn->nEvents; } + + if (conn->storeHandler) + { + result->storeHandler = conn->storeHandler; + result->storeHandlerParam = conn->storeHandlerParam; + } } else { @@ -487,6 +500,33 @@ PQresultAlloc(PGresult *res, size_t nBytes) return pqResultAlloc(res, nBytes, TRUE);} +void * +pqDefaultStoreHandler(PGresult *res, PQStoreFunc func, int id, size_t len) +{ + void *p; + + switch (func) + { + case PQSF_ALLOC_TEXT: + return pqResultAlloc(res, len, TRUE); + + case PQSF_ALLOC_BINARY: + p = pqResultAlloc(res, len, FALSE); + + if (id == -1) + res->storeHandlerParam = p; + + return p; + + case PQSF_ADD_TUPLE: + return pqAddTuple(res, res->storeHandlerParam); + + default: + /* Ignore */ + break; + } + return NULL; +}/* * pqResultAlloc - * Allocate subsidiary storage for a PGresult. @@ -830,9 +870,9 @@ pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)/* * pqAddTuple * add a row pointerto the PGresult structure, growing it if necessary - * Returns TRUE if OK, FALSE if not enough memory to add the row + * Returns tup if OK, NULL if not enough memory to add the row. */ -int +static void *pqAddTuple(PGresult *res, PGresAttValue *tup){ if (res->ntups >= res->tupArrSize) @@ -858,13 +898,13 @@ pqAddTuple(PGresult *res, PGresAttValue *tup) newTuples = (PGresAttValue **) realloc(res->tuples, newSize * sizeof(PGresAttValue *)); if (!newTuples) - return FALSE; /* malloc or realloc failed */ + return NULL; /* malloc or realloc failed */ res->tupArrSize = newSize; res->tuples = newTuples; } res->tuples[res->ntups] = tup; res->ntups++; - return TRUE; + return tup;}/* @@ -2822,6 +2862,35 @@ PQgetisnull(const PGresult *res, int tup_num, int field_num) return 0;} +/* PQgetAddStoreHandlerParam + * Get the pointer to the contextual parameter from PGresult which is + * registered to PGconn by PQregisterStoreHandler + */ +void * +PQgetStoreHandlerParam(const PGresult *res) +{ + if (!res) + return NULL; + return res->storeHandlerParam; +} + +/* PQsetStorHandlerErrMes + * Set the error message pass back to the caller of StoreHandler. + * + * mes must be a malloc'ed memory block and it will be released by + * the caller of StoreHandler. You can replace the previous message + * by alternative mes, or clear it with NULL. The previous one will + * be freed internally. + */ +void +PQsetStoreHandlerErrMes(PGresult *res, char *mes) +{ + /* Free existing message */ + if (res->storeHandlerErrMes) + free(res->storeHandlerErrMes); + res->storeHandlerErrMes = mes; +} +/* PQnparams: * returns the number of input parameters of a prepared statement. */ diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c index a7c3899..205502b 100644 --- a/src/interfaces/libpq/fe-protocol2.c +++ b/src/interfaces/libpq/fe-protocol2.c @@ -733,9 +733,10 @@ getAnotherTuple(PGconn *conn, bool binary) if (conn->curTuple == NULL) { conn->curTuple= (PGresAttValue *) - pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE); + result->storeHandler(result, PQSF_ALLOC_BINARY, -1, + nfields * sizeof(PGresAttValue)); if (conn->curTuple == NULL) - goto outOfMemory; + goto addTupleError; MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue)); /* @@ -757,7 +758,7 @@ getAnotherTuple(PGconn *conn, bool binary) { bitmap = (char *) malloc(nbytes); if (!bitmap) - goto outOfMemory; + goto addTupleError; } if (pqGetnchar(bitmap, nbytes, conn)) @@ -787,9 +788,12 @@ getAnotherTuple(PGconn *conn, bool binary) vlen = 0; if (tup[i].value == NULL) { - tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary); + PQStoreFunc func = + (binary ? PQSF_ALLOC_BINARY : PQSF_ALLOC_TEXT); + tup[i].value = + (char *) result->storeHandler(result, func, i, vlen + 1); if (tup[i].value == NULL) - goto outOfMemory; + goto addTupleError; } tup[i].len = vlen; /* read in the value */ @@ -812,8 +816,9 @@ getAnotherTuple(PGconn *conn, bool binary) } /* Success! Store the completed tuple in the result*/ - if (!pqAddTuple(result, tup)) - goto outOfMemory; + if (!result->storeHandler(result, PQSF_ADD_TUPLE, 0, 0)) + goto addTupleError; + /* and reset for a new message */ conn->curTuple = NULL; @@ -821,7 +826,7 @@ getAnotherTuple(PGconn *conn, bool binary) free(bitmap); return 0; -outOfMemory: +addTupleError: /* Replace partially constructed result with an error result */ /* @@ -829,8 +834,21 @@ outOfMemory: * there's not enough memory to concatenate messages... */ pqClearAsyncResult(conn); - printfPQExpBuffer(&conn->errorMessage, - libpq_gettext("out of memory for query result\n")); + resetPQExpBuffer(&conn->errorMessage); + + /* + * If error message is passed from addTupleFunc, set it into + * PGconn, assume out of memory if not. + */ + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext(result->storeHandlerErrMes ? + result->storeHandlerErrMes : + "out of memory for query result\n")); + if (result->storeHandlerErrMes) + { + free(result->storeHandlerErrMes); + result->storeHandlerErrMes = NULL; + } /* * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 892dcbc..117c38a 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -634,9 +634,10 @@ getAnotherTuple(PGconn *conn, int msgLength) if (conn->curTuple == NULL) { conn->curTuple= (PGresAttValue *) - pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE); + result->storeHandler(result, PQSF_ALLOC_BINARY, -1, + nfields * sizeof(PGresAttValue)); if (conn->curTuple == NULL) - goto outOfMemory; + goto addTupleError; MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue)); } tup = conn->curTuple; @@ -673,11 +674,12 @@ getAnotherTuple(PGconn *conn, int msgLength) vlen = 0; if (tup[i].value == NULL) { - bool isbinary = (result->attDescs[i].format != 0); - - tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary); + PQStoreFunc func = (result->attDescs[i].format != 0 ? + PQSF_ALLOC_BINARY : PQSF_ALLOC_TEXT); + tup[i].value = + (char *) result->storeHandler(result, func, i, vlen + 1); if (tup[i].value == NULL) - goto outOfMemory; + goto addTupleError; } tup[i].len = vlen; /* read in the value */ @@ -689,22 +691,36 @@ getAnotherTuple(PGconn *conn, int msgLength) } /* Success! Store the completed tuple in theresult */ - if (!pqAddTuple(result, tup)) - goto outOfMemory; + if (!result->storeHandler(result, PQSF_ADD_TUPLE, 0, 0)) + goto addTupleError; + /* and reset for a new message */ conn->curTuple = NULL; return 0; -outOfMemory: +addTupleError: /* * Replace partially constructed result with an error result. First * discard the old resultto try to win back some memory. */ pqClearAsyncResult(conn); - printfPQExpBuffer(&conn->errorMessage, - libpq_gettext("out of memory for query result\n")); + resetPQExpBuffer(&conn->errorMessage); + + /* + * If error message is passed from addTupleFunc, set it into + * PGconn, assume out of memory if not. + */ + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext(result->storeHandlerErrMes ? + result->storeHandlerErrMes : + "out of memory for query result\n")); + if (result->storeHandlerErrMes) + { + free(result->storeHandlerErrMes); + result->storeHandlerErrMes = NULL; + } pqSaveErrorResult(conn); /* Discard the failed message by pretending we read it */ diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index ef26ab9..6d86fa0 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -116,6 +116,16 @@ typedef enum PQPING_NO_ATTEMPT /* connection not attempted (bad params) */} PGPing; +/* PQStoreFunc is the enum for one of the parameters of storeHandler + * that decides what to do. See the typedef StoreHandler for + * details */ +typedef enum +{ + PQSF_ALLOC_TEXT, /* Requested non-aligned memory for text value */ + PQSF_ALLOC_BINARY, /* Requested aligned memory for binary value */ + PQSF_ADD_TUPLE /* Requested to add tuple data into store */ +} PQStoreFunc; +/* PGconn encapsulates a connection to the backend. * The contents of this struct are not supposed to be known to applications.*/ @@ -149,6 +159,15 @@ typedef struct pgNotify struct pgNotify *next; /* list link */} PGnotify; +/* PGresAttValue represents a value of one tuple field in string form. + NULL is represented as len < 0. Otherwise value points to a null + terminated C string with the length of len. */ +typedef struct pgresAttValue +{ + int len; /* length in bytes of the value */ + char *value; /* actual value, plus terminating zero byte */ +} PGresAttValue; +/* Function types for notice-handling callbacks */typedef void (*PQnoticeReceiver) (void *arg, const PGresult *res);typedefvoid (*PQnoticeProcessor) (void *arg, const char *message); @@ -416,6 +435,52 @@ extern PGPing PQping(const char *conninfo);extern PGPing PQpingParams(const char *const * keywords, const char *const * values, int expand_dbname); +/* + * Typedef for alternative result store handler. + * + * This function pointer is used for alternative result store handler + * callback in PGresult and PGconn. + * + * StoreHandler is called for three functions designated by the enum + * PQStoreFunc. + * + * id is the identifier for allocated memory block. The caller sets -1 + * for PGresAttValue array, and 0 to number of cols - 1 for each + * column. + * + * PQSF_ALLOC_TEXT requests the size bytes memory block for a text + * value which may not be alingned to the word boundary. + * + * PQSF_ALLOC_BINARY requests the size bytes memory block for a binary + * value which is aligned to the word boundary. + * + * PQSF_ADD_TUPLE requests to add tuple data into the result store, + * and free the memory blocks allocated by this function if necessary. + * id and size are to be ignored for this function. + * + * This function must return non-NULL value for success and must + * return NULL for failure and may set error message by + * PQsetStoreHandlerErrMes. It is assumed by caller as out of memory + * when the error message is NULL on failure. This function is assumed + * not to throw any exception. + */ +typedef void *(*StoreHandler)(PGresult *res, PQStoreFunc func, + int id, size_t size); + +/* + * Register alternative result store function to PGconn. + * + * By registering this function, pg_result disables its own result + * store and calls it to append rows one by one. + * + * func is tuple store function. See the typedef StoreHandler. + * + * storeHandlerParam is the contextual variable that can be get with + * PQgetStoreHandlerParam in StoreHandler. + */ +extern void PQregisterStoreHandler(PGconn *conn, StoreHandler func, + void *storeHandlerParam); +/* Force the write buffer to be written (or at least try) */extern int PQflush(PGconn *conn); @@ -454,6 +519,8 @@ extern char *PQcmdTuples(PGresult *res);extern char *PQgetvalue(const PGresult *res, int tup_num, intfield_num);extern int PQgetlength(const PGresult *res, int tup_num, int field_num);extern int PQgetisnull(constPGresult *res, int tup_num, int field_num); +extern void *PQgetStoreHandlerParam(const PGresult *res); +extern void PQsetStoreHandlerErrMes(PGresult *res, char *mes);extern int PQnparams(const PGresult *res);extern Oid PQparamtype(const PGresult *res, int param_num); diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index d967d60..e28e712 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -134,12 +134,6 @@ typedef struct pgresParamDesc#define NULL_LEN (-1) /* pg_result len for NULL value */ -typedef struct pgresAttValue -{ - int len; /* length in bytes of the value */ - char *value; /* actual value, plus terminating zero byte */ -} PGresAttValue; -/* Typedef for message-field list entries */typedef struct pgMessageField{ @@ -209,6 +203,11 @@ struct pg_result PGresult_data *curBlock; /* most recently allocated block */ int curOffset; /* start offset of free space in block */ int spaceLeft; /* number of free bytesremaining in block */ + + StoreHandler storeHandler; /* Result store handler. See + * StoreHandler for details. */ + void *storeHandlerParam; /* Contextual parameter for storeHandler */ + char *storeHandlerErrMes; /* Error message from storeHandler */};/* PGAsyncStatusType defines the state of the query-executionstate machine */ @@ -443,6 +442,13 @@ struct pg_conn /* Buffer for receiving various parts of messages */ PQExpBufferData workBuffer;/* expansible string */ + + /* Tuple store handler. The two fields below is copied to newly + * created PGresult if tupStoreHandler is not NULL. Use default + * function if NULL. */ + StoreHandler storeHandler; /* Result store handler. See + * StoreHandler for details. */ + void *storeHandlerParam; /* Contextual parameter for storeHandler */};/* PGcancel stores all data necessary to cancela connection. A copy of this @@ -507,7 +513,6 @@ extern voidpqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)/* This lets gcc check theformat string for consistency. */__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3))); -extern int pqAddTuple(PGresult *res, PGresAttValue *tup);extern void pqSaveMessageField(PGresult *res, char code, const char *value);extern void pqSaveParameterStatus(PGconn *conn, const char *name, diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index 36a8e3e..a8685a9 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -63,11 +63,24 @@ typedef struct remoteConn bool newXactForCursor; /* Opened a transaction for a cursor*/} remoteConn; +typedef struct storeInfo +{ + Tuplestorestate *tuplestore; + int nattrs; + AttInMetadata *attinmeta; + MemoryContext oldcontext; + char *attrvalbuf; + void **valbuf; + size_t *valbufsize; + bool error_occurred; + bool nummismatch; + ErrorData *edata; +} storeInfo; +/* * Internal declarations */static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async); -static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);static remoteConn *getConnectionByName(const char*name);static HTAB *createConnHash(void);static void createNewConnection(const char *name, remoteConn *rconn); @@ -90,6 +103,10 @@ static char *escape_param_str(const char *from);static void validate_pkattnums(Relation rel, int2vector *pkattnums_arg, int32 pknumatts_arg, int **pkattnums, int *pknumatts); +static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo); +static void finishStoreInfo(storeInfo *sinfo); +static void *storeHandler(PGresult *res, PQStoreFunc func, int id, size_t size); +/* Global */static remoteConn *pconn = NULL; @@ -503,6 +520,7 @@ dblink_fetch(PG_FUNCTION_ARGS) char *curname = NULL; int howmany = 0; bool fail = true; /* default to backward compatible */ + storeInfo storeinfo; DBLINK_INIT; @@ -559,15 +577,36 @@ dblink_fetch(PG_FUNCTION_ARGS) appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname); /* + * Result is stored into storeinfo.tuplestore instead of + * res->result retuned by PQexec below + */ + initStoreInfo(&storeinfo, fcinfo); + PQregisterStoreHandler(conn, storeHandler, &storeinfo); + + /* * Try to execute the query. Note that since libpq uses malloc, the * PGresult will be long-lived even thoughwe are still in a short-lived * memory context. */ res = PQexec(conn, buf.data); + finishStoreInfo(&storeinfo); + if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) { + /* finishStoreInfo saves the fields referred to below. */ + if (storeinfo.nummismatch) + { + /* This is only for backward compatibility */ + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match " + "the specified FROM clause rowtype"))); + } + else if (storeinfo.edata) + ReThrowError(storeinfo.edata); + dblink_res_error(conname, res, "could not fetch from cursor", fail); return (Datum) 0; } @@ -579,8 +618,8 @@ dblink_fetch(PG_FUNCTION_ARGS) (errcode(ERRCODE_INVALID_CURSOR_NAME), errmsg("cursor \"%s\" does not exist", curname))); } + PQclear(res); - materializeResult(fcinfo, res); return (Datum) 0;} @@ -640,6 +679,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) remoteConn *rconn = NULL; bool fail = true; /* default to backward compatible */ bool freeconn = false; + storeInfo storeinfo; /* check to see if caller supports us returning a tuplestore */ if (rsinfo == NULL || !IsA(rsinfo,ReturnSetInfo)) @@ -715,164 +755,213 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) rsinfo->setResult = NULL; rsinfo->setDesc= NULL; + + /* + * Result is stored into storeinfo.tuplestore instead of + * res->result retuned by PQexec/PQgetResult below + */ + initStoreInfo(&storeinfo, fcinfo); + PQregisterStoreHandler(conn, storeHandler, &storeinfo); + /* synchronous query, or async result retrieval */ if (!is_async) res = PQexec(conn, sql); else - { res = PQgetResult(conn); - /* NULL means we're all done with the async results */ - if (!res) - return (Datum) 0; - } - /* if needed, close the connection to the database and cleanup */ - if (freeconn) - PQfinish(conn); + finishStoreInfo(&storeinfo); - if (!res || - (PQresultStatus(res) != PGRES_COMMAND_OK && - PQresultStatus(res) != PGRES_TUPLES_OK)) + /* NULL res from async get means we're all done with the results */ + if (res || !is_async) { - dblink_res_error(conname, res, "could not execute query", fail); - return (Datum) 0; + if (freeconn) + PQfinish(conn); + + if (!res || + (PQresultStatus(res) != PGRES_COMMAND_OK && + PQresultStatus(res) != PGRES_TUPLES_OK)) + { + /* finishStoreInfo saves the fields referred to below. */ + if (storeinfo.nummismatch) + { + /* This is only for backward compatibility */ + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match " + "the specified FROM clause rowtype"))); + } + else if (storeinfo.edata) + ReThrowError(storeinfo.edata); + + dblink_res_error(conname, res, "could not execute query", fail); + return (Datum) 0; + } } + PQclear(res); - materializeResult(fcinfo, res); return (Datum) 0;} -/* - * Materialize the PGresult to return them as the function result. - * The res will be released in this function. - */static void -materializeResult(FunctionCallInfo fcinfo, PGresult *res) +initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo){ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; - - Assert(rsinfo->returnMode == SFRM_Materialize); - - PG_TRY(); + TupleDesc tupdesc; + int i; + + switch (get_call_result_type(fcinfo, NULL, &tupdesc)) { - TupleDesc tupdesc; - bool is_sql_cmd = false; - int ntuples; - int nfields; + case TYPEFUNC_COMPOSITE: + /* success */ + break; + case TYPEFUNC_RECORD: + /* failed to determine actual type of RECORD */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("function returning record called in context " + "that cannot accept type record"))); + break; + default: + /* result type isn't composite */ + elog(ERROR, "return type must be a row type"); + break; + } + + sinfo->oldcontext = MemoryContextSwitchTo( + rsinfo->econtext->ecxt_per_query_memory); + + /* make sure we have a persistent copy of the tupdesc */ + tupdesc = CreateTupleDescCopy(tupdesc); + + sinfo->error_occurred = FALSE; + sinfo->nummismatch = FALSE; + sinfo->edata = NULL; + sinfo->nattrs = tupdesc->natts; + sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem); + sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc); + sinfo->valbuf = (void **)malloc(sinfo->nattrs * sizeof(void *)); + sinfo->valbufsize = (size_t *)malloc(sinfo->nattrs * sizeof(size_t)); + for (i = 0 ; i < sinfo->nattrs ; i++) + { + sinfo->valbuf[i] = NULL; + sinfo->valbufsize[i] = 0; + } - if (PQresultStatus(res) == PGRES_COMMAND_OK) - { - is_sql_cmd = true; - - /* - * need a tuple descriptor representing one TEXT column to return - * the command status string as our result tuple - */ - tupdesc = CreateTemplateTupleDesc(1, false); - TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", - TEXTOID, -1, 0); - ntuples = 1; - nfields = 1; - } - else - { - Assert(PQresultStatus(res) == PGRES_TUPLES_OK); + /* Preallocate memory of same size with PGresAttDesc array for values. */ + sinfo->attrvalbuf = (char *) malloc(sinfo->nattrs * sizeof(PGresAttValue)); - is_sql_cmd = false; + rsinfo->setResult = sinfo->tuplestore; + rsinfo->setDesc = tupdesc; +} - /* get a tuple descriptor for our result type */ - switch (get_call_result_type(fcinfo, NULL, &tupdesc)) - { - case TYPEFUNC_COMPOSITE: - /* success */ - break; - case TYPEFUNC_RECORD: - /* failed to determine actual type of RECORD */ - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("function returning record called in context " - "that cannot accept type record"))); - break; - default: - /* result type isn't composite */ - elog(ERROR, "return type must be a row type"); - break; - } +static void +finishStoreInfo(storeInfo *sinfo) +{ + int i; - /* make sure we have a persistent copy of the tupdesc */ - tupdesc = CreateTupleDescCopy(tupdesc); - ntuples = PQntuples(res); - nfields = PQnfields(res); + for (i = 0 ; i < sinfo->nattrs ; i++) + { + if (sinfo->valbuf[i]) + { + free(sinfo->valbuf[i]); + sinfo->valbuf[i] = NULL; } + } + if (sinfo->attrvalbuf) + free(sinfo->attrvalbuf); + sinfo->attrvalbuf = NULL; + MemoryContextSwitchTo(sinfo->oldcontext); +} - /* - * check result and tuple descriptor have the same number of columns - */ - if (nfields != tupdesc->natts) - ereport(ERROR, - (errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("remote query result rowtype does not match " - "the specified FROM clause rowtype"))); +static void * +storeHandler(PGresult *res, PQStoreFunc func, int id, size_t size) +{ + storeInfo *sinfo = (storeInfo *)PQgetStoreHandlerParam(res); + HeapTuple tuple; + int fields = PQnfields(res); + int i; + PGresAttValue *attval; + char **cstrs; - if (ntuples > 0) - { - AttInMetadata *attinmeta; - Tuplestorestate *tupstore; - MemoryContext oldcontext; - int row; - char **values; - - attinmeta = TupleDescGetAttInMetadata(tupdesc); - - oldcontext = MemoryContextSwitchTo( - rsinfo->econtext->ecxt_per_query_memory); - tupstore = tuplestore_begin_heap(true, false, work_mem); - rsinfo->setResult = tupstore; - rsinfo->setDesc = tupdesc; - MemoryContextSwitchTo(oldcontext); + if (sinfo->error_occurred) + return NULL; + + switch (func) + { + case PQSF_ALLOC_TEXT: + case PQSF_ALLOC_BINARY: + if (id == -1) + return sinfo->attrvalbuf; - values = (char **) palloc(nfields * sizeof(char *)); + if (id < 0 || id >= sinfo->nattrs) + return NULL; - /* put all tuples into the tuplestore */ - for (row = 0; row < ntuples; row++) + if (sinfo->valbufsize[id] < size) { - HeapTuple tuple; + if (sinfo->valbuf[id] == NULL) + sinfo->valbuf[id] = malloc(size); + else + sinfo->valbuf[id] = realloc(sinfo->valbuf[id], size); + sinfo->valbufsize[id] = size; + } + return sinfo->valbuf[id]; - if (!is_sql_cmd) - { - int i; + case PQSF_ADD_TUPLE: + break; /* Go through */ + default: + /* Ignore */ + break; + } - for (i = 0; i < nfields; i++) - { - if (PQgetisnull(res, row, i)) - values[i] = NULL; - else - values[i] = PQgetvalue(res, row, i); - } - } - else - { - values[0] = PQcmdStatus(res); - } + if (sinfo->nattrs != fields) + { + sinfo->error_occurred = TRUE; + sinfo->nummismatch = TRUE; + finishStoreInfo(sinfo); - /* build the tuple and put it into the tuplestore. */ - tuple = BuildTupleFromCStrings(attinmeta, values); - tuplestore_puttuple(tupstore, tuple); - } + /* This error will be processed in + * dblink_record_internal(). So do not set error message + * here. */ + return NULL; + } - /* clean up and return the tuplestore */ - tuplestore_donestoring(tupstore); - } + /* + * Rewrite PGresAttValue[] to char(*)[] in-place. + */ + Assert(sizeof(char*) <= sizeof(PGresAttValue)); - PQclear(res); + attval = (PGresAttValue *)sinfo->attrvalbuf; + cstrs = (char **)sinfo->attrvalbuf; + for(i = 0 ; i < fields ; i++) + { + if (attval->len < 0) + cstrs[i] = NULL; + else + cstrs[i] = attval->value; + } + + PG_TRY(); + { + tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs); + tuplestore_puttuple(sinfo->tuplestore, tuple); } PG_CATCH(); { - /* be sure to release the libpq result */ - PQclear(res); - PG_RE_THROW(); + MemoryContext context; + /* + * Store exception for later ReThrow and cancel the exception. + */ + sinfo->error_occurred = TRUE; + context = MemoryContextSwitchTo(sinfo->oldcontext); + sinfo->edata = CopyErrorData(); + MemoryContextSwitchTo(context); + FlushErrorState(); + + return NULL; } PG_END_TRY(); + + return sinfo->attrvalbuf;}/* diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index 72c9384..8803999 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -7233,6 +7233,293 @@ int PQisthreadsafe(); </sect1> + <sect1 id="libpq-alterstorage"> + <title>Alternative result storage</title> + + <indexterm zone="libpq-alterstorage"> + <primary>PGresult</primary> + <secondary>PGconn</secondary> + </indexterm> + + <para> + As the standard usage, users can get the result of command + execution from <structname>PGresult</structname> aquired + with <function>PGgetResult</function> + from <structname>PGConn</structname>. While the memory areas for + the PGresult are allocated with malloc() internally within calls of + command execution functions such as <function>PQexec</function> + and <function>PQgetResult</function>. If you have difficulties to + handle the result records in the form of PGresult, you can instruct + PGconn to store them into your own storage instead of PGresult. + </para> + + <variablelist> + <varlistentry id="libpq-registerstorehandler"> + <term> + <function>PQregisterStoreHandler</function> + <indexterm> + <primary>PQregisterStoreHandler</primary> + </indexterm> + </term> + + <listitem> + <para> + Sets a callback function to allocate memory for each tuple and + column values, and add the complete tuple into the alternative + result storage. +<synopsis> +void PQregisterStoreHandler(PGconn *conn, + StoreHandler func, + void *param); +</synopsis> + </para> + + <para> + <variablelist> + <varlistentry> + <term><parameter>conn</parameter></term> + <listitem> + <para> + The connection object to set the storage handler + function. PGresult created from this connection calls this + function to store the result instead of storing into its + internal storage. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><parameter>func</parameter></term> + <listitem> + <para> + Storage handler function to set. NULL means to use the + default storage. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><parameter>param</parameter></term> + <listitem> + <para> + A pointer to contextual parameter passed + to <parameter>func</parameter>. You can get this poiner + in <type>StoreHandler</type> + by <function>PQgetStoreHandlerParam</function>. + </para> + </listitem> + </varlistentry> + </variablelist> + </para> + </listitem> + </varlistentry> + </variablelist> + + <variablelist> + <varlistentry id="libpq-storehandler"> + <term> + <type>Storehandler</type> + <indexterm> + <primary>StoreHandler</primary> + </indexterm> + </term> + + <listitem> + <para> + The type for the storage handler callback function. +<synopsis> +typedef enum +{ + PQSF_ALLOC_TEXT, + PQSF_ALLOC_BINARY, + PQSF_ADD_TUPLE +} PQStoreFunc; + +void *(*StoreHandler)(PGresult *res, + PQStoreFunc func, + int id, + size_t size); +</synopsis> + </para> + + <para> + Generally this function must return NULL for failure and should + set the error message + with <function>PGsetStoreHandlerErrMes</function> if the cause + is other than out of memory. This funcion must not throw any + exception. This function is called in the sequence following. + + <itemizedlist spacing="compact"> + <listitem> + <simpara>Call with <parameter>func</parameter> + = <firstterm>PQSF_ALLOC_BINARY</firstterm> + and <parameter>id</parameter> = -1 to request the memory + for a tuple to be used as an array + of <type>PGresAttValue</type>. </simpara> + </listitem> + <listitem> + <simpara>Call with <parameter>func</parameter> + = <firstterm>PQSF_ALLOC_TEXT</firstterm> + or <firstterm>PQSF_ALLOC_BINARY</firstterm> + and <parameter>id</parameter> is zero to the number of columns + - 1 to request the memory for each column value in current + tuple.</simpara> + </listitem> + <listitem> + <simpara>Call with <parameter>func</parameter> + = <firstterm>PQSF_ADD_TUPLE</firstterm> to request the + constructed tuple to be stored.</simpara> + </listitem> + </itemizedlist> + </para> + <para> + Calling <type>StoreHandler</type> + with <parameter>func</parameter> = + <firstterm>PQSF_ALLOC_TEXT</firstterm> is telling to return a + memory block with at least <parameter>size</parameter> bytes + which may not be aligned to the word boundary. + <parameter>id</parameter> is a zero or positive number + distinguishes the usage of requested memory block, that is the + position of the column for which the memory block is used. + </para> + <para> + When <parameter>func</parameter> + = <firstterm>PQSF_ALLOC_BINARY</firstterm>, this function is + telled to return a memory block with at + least <parameter>size</parameter> bytes which is aligned to the + word boundary. + <parameter>id</parameter> is the identifier distinguishes the + usage of requested memory block. -1 means that it is used as an + array of <type>PGresAttValue</type> to store the tuple. Zero or + positive numbers have the same meanings as for + <firstterm>PQSF_ALLOC_BINARY</firstterm>. + </para> + <para>When <parameter>func</parameter> + = <firstterm>PQSF_ADD_TUPLE</firstterm>, this function is + telled to store the <type>PGresAttValue</type> structure + constructed by the caller into your storage. The pointer to the + tuple structure is not passed so you should memorize the + pointer to the memory block passed back the caller on + <parameter>func</parameter> + = <parameter>PQSF_ALLOC_BINARY</parameter> + with <parameter>id</parameter> is -1. This function must return + any non-NULL values for success. You must properly put back the + memory blocks passed to the caller in this function if needed. + </para> + <variablelist> + <varlistentry> + <term><parameter>res</parameter></term> + <listitem> + <para> + A pointer to the <type>PGresult</type> object. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><parameter>func</parameter></term> + <listitem> + <para> + An <type>enum</type> value telling the function to perform. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><parameter>param</parameter></term> + <listitem> + <para> + A pointer to contextual parameter passed to func. + </para> + </listitem> + </varlistentry> + </variablelist> + </listitem> + </varlistentry> + </variablelist> + + <variablelist> + <varlistentry id="libpq-pqgetstorehandlerparam"> + <term> + <function>PQgetStoreHandlerParam</function> + <indexterm> + <primary>PQgetStoreHandlerParam</primary> + </indexterm> + </term> + <listitem> + <para> + Get the pointer passed to <function>PQregisterStoreHandler</function> + as <parameter>param</parameter>. +<synopsis> +void *PQgetStoreHandlerParam(PGresult *res) +</synopsis> + </para> + <para> + <variablelist> + <varlistentry> + <term><parameter>res</parameter></term> + <listitem> + <para> + A pointer to the <type>PGresult</type> object. + </para> + </listitem> + </varlistentry> + </variablelist> + </para> + </listitem> + </varlistentry> + </variablelist> + + <variablelist> + <varlistentry id="libpq-pqsetstorehandlererrmes"> + <term> + <function>PQsetStoreHandlerErrMes</function> + <indexterm> + <primary>PQsetStoreHandlerErrMes</primary> + </indexterm> + </term> + <listitem> + <para> + Set the message for the error occurred + in <type>StoreHandler</type>. If this message is not set, the + caller assumes the error to be out of memory. +<synopsis> +void PQsetStoreHandlerErrMes(PGresult *res, char *mes) +</synopsis> + </para> + <para> + <variablelist> + <varlistentry> + <term><parameter>res</parameter></term> + <listitem> + <para> + A pointer to the <type>PGresult</type> object + passed to <type>StoreHandler</type>. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><parameter>mes</parameter></term> + <listitem> + <para> + A pointer to the memory block containing the error + message, which is allocated + by <function>malloc()</function>. The memory block + will be freed with <function>free()</function> in the + caller of + <type>StoreHandler</type> only if it returns NULL. + </para> + <para> + If <parameter>res</parameter> already has a message previously + set, it is freed and then the given message is set. Set NULL + to cancel the the costom message. + </para> + </listitem> + </varlistentry> + </variablelist> + </para> + </listitem> + </varlistentry> + </variablelist> + </sect1> + + <sect1 id="libpq-build"> <title>Building <application>libpq</application> Programs</title>
pgsql-hackers by date: