Re: Speed dblink using alternate libpq tuple storage - Mailing list pgsql-hackers

From Kyotaro HORIGUCHI
Subject Re: Speed dblink using alternate libpq tuple storage
Date
Msg-id 20120327.112042.74201667.horiguchi.kyotaro@oss.ntt.co.jp
Whole thread Raw
In response to Re: Speed dblink using alternate libpq tuple storage  (Kyotaro HORIGUCHI <horiguchi.kyotaro@oss.ntt.co.jp>)
Responses Re: Speed dblink using alternate libpq tuple storage  (Tom Lane <tgl@sss.pgh.pa.us>)
List pgsql-hackers
I'm sorry to have coded a silly bug.

The previous patch has a bug in realloc size calculation.
And separation of the 'connname patch' was incomplete in regtest.
It is fixed in this patch.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 36a8e3e..4de28ef 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -63,11 +63,23 @@ typedef struct remoteConn    bool        newXactForCursor;        /* Opened a transaction for a
cursor*/} remoteConn;
 
+typedef struct storeInfo
+{
+    Tuplestorestate *tuplestore;
+    int nattrs;
+    MemoryContext oldcontext;
+    AttInMetadata *attinmeta;
+    char* valbuf;
+    int valbuflen;
+    char **cstrs;
+    bool error_occurred;
+    bool nummismatch;
+} storeInfo;
+/* * Internal declarations */static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
-static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);static remoteConn *getConnectionByName(const
char*name);static HTAB *createConnHash(void);static void createNewConnection(const char *name, remoteConn *rconn);
 
@@ -90,6 +102,10 @@ static char *escape_param_str(const char *from);static void validate_pkattnums(Relation rel,
          int2vector *pkattnums_arg, int32 pknumatts_arg,                   int **pkattnums, int *pknumatts);
 
+static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo);
+static void finishStoreInfo(storeInfo *sinfo);
+static int storeHandler(PGresult *res, PGrowValue *columns, void *param);
+/* Global */static remoteConn *pconn = NULL;
@@ -111,6 +127,9 @@ typedef struct remoteConnHashEnt/* initial number of connection hashes */#define NUMCONN 16
+/* Initial block size for value buffer in storeHandler */
+#define INITBUFLEN 64
+/* general utility */#define xpfree(var_) \    do { \
@@ -503,6 +522,7 @@ dblink_fetch(PG_FUNCTION_ARGS)    char       *curname = NULL;    int            howmany = 0;
bool       fail = true;    /* default to backward compatible */
 
+    storeInfo   storeinfo;    DBLINK_INIT;
@@ -559,15 +579,51 @@ dblink_fetch(PG_FUNCTION_ARGS)    appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
/*
 
+     * Result is stored into storeinfo.tuplestore instead of
+     * res->result retuned by PQexec below
+     */
+    initStoreInfo(&storeinfo, fcinfo);
+    PQsetRowProcessor(conn, storeHandler, &storeinfo);
+
+    /*     * Try to execute the query.  Note that since libpq uses malloc, the     * PGresult will be long-lived even
thoughwe are still in a short-lived     * memory context.     */
 
-    res = PQexec(conn, buf.data);
+    PG_TRY();
+    {
+        res = PQexec(conn, buf.data);
+    }
+    PG_CATCH();
+    {
+        ErrorData *edata;
+
+        finishStoreInfo(&storeinfo);
+        edata = CopyErrorData();
+        FlushErrorState();
+
+        /* Skip remaining results when storeHandler raises exception. */
+        PQskipResult(conn, TRUE);
+        ReThrowError(edata);
+    }
+    PG_END_TRY();
+
+    finishStoreInfo(&storeinfo);
+    if (!res ||        (PQresultStatus(res) != PGRES_COMMAND_OK &&         PQresultStatus(res) != PGRES_TUPLES_OK))
{
+        /* finishStoreInfo saves the fields referred to below. */
+        if (storeinfo.nummismatch)
+        {
+            /* This is only for backward compatibility */
+            ereport(ERROR,
+                    (errcode(ERRCODE_DATATYPE_MISMATCH),
+                     errmsg("remote query result rowtype does not match "
+                            "the specified FROM clause rowtype")));
+        }
+        dblink_res_error(conname, res, "could not fetch from cursor", fail);        return (Datum) 0;    }
@@ -579,8 +635,8 @@ dblink_fetch(PG_FUNCTION_ARGS)                (errcode(ERRCODE_INVALID_CURSOR_NAME),
errmsg("cursor \"%s\" does not exist", curname)));    }
 
+    PQclear(res);
-    materializeResult(fcinfo, res);    return (Datum) 0;}
@@ -640,6 +696,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)    remoteConn *rconn = NULL;    bool
      fail = true;    /* default to backward compatible */    bool        freeconn = false;
 
+    storeInfo   storeinfo;    /* check to see if caller supports us returning a tuplestore */    if (rsinfo == NULL ||
!IsA(rsinfo,ReturnSetInfo))
 
@@ -660,6 +717,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)        {            /*
text,text,bool*/            DBLINK_GET_CONN;
 
+            conname = text_to_cstring(PG_GETARG_TEXT_PP(0));            sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
        fail = PG_GETARG_BOOL(2);        }
 
@@ -715,164 +773,234 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)    rsinfo->setResult = NULL;
rsinfo->setDesc= NULL;
 
-    /* synchronous query, or async result retrieval */
-    if (!is_async)
-        res = PQexec(conn, sql);
-    else
+
+    /*
+     * Result is stored into storeinfo.tuplestore instead of
+     * res->result retuned by PQexec/PQgetResult below
+     */
+    initStoreInfo(&storeinfo, fcinfo);
+    PQsetRowProcessor(conn, storeHandler, &storeinfo);
+
+    PG_TRY();    {
-        res = PQgetResult(conn);
-        /* NULL means we're all done with the async results */
-        if (!res)
-            return (Datum) 0;
+        /* synchronous query, or async result retrieval */
+        if (!is_async)
+            res = PQexec(conn, sql);
+        else
+            res = PQgetResult(conn);    }
+    PG_CATCH();
+    {
+        ErrorData *edata;
-    /* if needed, close the connection to the database and cleanup */
-    if (freeconn)
-        PQfinish(conn);
+        finishStoreInfo(&storeinfo);
+        edata = CopyErrorData();
+        FlushErrorState();
-    if (!res ||
-        (PQresultStatus(res) != PGRES_COMMAND_OK &&
-         PQresultStatus(res) != PGRES_TUPLES_OK))
+        /* Skip remaining results when storeHandler raises exception. */
+        PQskipResult(conn, TRUE);
+        ReThrowError(edata);
+    }
+    PG_END_TRY();
+
+    finishStoreInfo(&storeinfo);
+
+    /* NULL res from async get means we're all done with the results */
+    if (res || !is_async)    {
-        dblink_res_error(conname, res, "could not execute query", fail);
-        return (Datum) 0;
+        if (freeconn)
+            PQfinish(conn);
+
+        /*
+         * exclude mismatch of the numbers of the colums here so as to
+         * behave as before.
+         */
+        if (!res ||
+            (PQresultStatus(res) != PGRES_COMMAND_OK &&
+             PQresultStatus(res) != PGRES_TUPLES_OK &&
+             !storeinfo.nummismatch))
+        {
+            dblink_res_error(conname, res, "could not execute query", fail);
+            return (Datum) 0;
+        }
+
+        /* Set command return status when the query was a command. */
+        if (PQresultStatus(res) == PGRES_COMMAND_OK)
+        {
+            char *values[1];
+            HeapTuple tuple;
+            AttInMetadata *attinmeta;
+            ReturnSetInfo *rcinfo = (ReturnSetInfo*)fcinfo->resultinfo;
+            
+            values[0] = PQcmdStatus(res);
+            attinmeta = TupleDescGetAttInMetadata(rcinfo->setDesc);
+            tuple = BuildTupleFromCStrings(attinmeta, values);
+            tuplestore_puttuple(rcinfo->setResult, tuple);
+        }
+        else if (get_call_result_type(fcinfo, NULL, NULL) == TYPEFUNC_RECORD)
+        {
+            /* failed to determine actual type of RECORD */
+            ereport(ERROR,
+                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                     errmsg("function returning record called in context "
+                            "that cannot accept type record")));
+        }
+
+        /* finishStoreInfo saves the fields referred to below. */
+        if (storeinfo.nummismatch)
+        {
+            /* This is only for backward compatibility */
+            ereport(ERROR,
+                    (errcode(ERRCODE_DATATYPE_MISMATCH),
+                     errmsg("remote query result rowtype does not match "
+                            "the specified FROM clause rowtype")));
+        }    }
-    materializeResult(fcinfo, res);
+    if (res)
+        PQclear(res);
+    return (Datum) 0;}
-/*
- * Materialize the PGresult to return them as the function result.
- * The res will be released in this function.
- */static void
-materializeResult(FunctionCallInfo fcinfo, PGresult *res)
+initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo){    ReturnSetInfo *rsinfo = (ReturnSetInfo *)
fcinfo->resultinfo;
+    TupleDesc    tupdesc;
-    Assert(rsinfo->returnMode == SFRM_Materialize);
-
-    PG_TRY();
+    sinfo->oldcontext = MemoryContextSwitchTo(
+        rsinfo->econtext->ecxt_per_query_memory);
+        
+    switch (get_call_result_type(fcinfo, NULL, &tupdesc))    {
-        TupleDesc    tupdesc;
-        bool        is_sql_cmd = false;
-        int            ntuples;
-        int            nfields;
-
-        if (PQresultStatus(res) == PGRES_COMMAND_OK)
-        {
-            is_sql_cmd = true;
-
-            /*
-             * need a tuple descriptor representing one TEXT column to return
-             * the command status string as our result tuple
-             */
+        case TYPEFUNC_COMPOSITE:
+            tupdesc = CreateTupleDescCopy(tupdesc);
+            sinfo->nattrs = tupdesc->natts;
+            break;
+        case TYPEFUNC_RECORD:            tupdesc = CreateTemplateTupleDesc(1, false);
TupleDescInitEntry(tupdesc,(AttrNumber) 1, "status",                               TEXTOID, -1, 0);
 
-            ntuples = 1;
-            nfields = 1;
-        }
-        else
-        {
-            Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
+            sinfo->nattrs = 1;
+            break;
+        default:
+            /* result type isn't composite */
+            elog(ERROR, "return type must be a row type");
+            break;
+    }
-            is_sql_cmd = false;
+    /* make sure we have a persistent copy of the tupdesc */
-            /* get a tuple descriptor for our result type */
-            switch (get_call_result_type(fcinfo, NULL, &tupdesc))
-            {
-                case TYPEFUNC_COMPOSITE:
-                    /* success */
-                    break;
-                case TYPEFUNC_RECORD:
-                    /* failed to determine actual type of RECORD */
-                    ereport(ERROR,
-                            (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                        errmsg("function returning record called in context "
-                               "that cannot accept type record")));
-                    break;
-                default:
-                    /* result type isn't composite */
-                    elog(ERROR, "return type must be a row type");
-                    break;
-            }
+    sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+    sinfo->error_occurred = FALSE;
+    sinfo->nummismatch = FALSE;
+    sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
+    sinfo->valbuflen = INITBUFLEN;
+    sinfo->valbuf = (char *)palloc(sinfo->valbuflen);
+    sinfo->cstrs = (char **)palloc(sinfo->nattrs * sizeof(char *));
-            /* make sure we have a persistent copy of the tupdesc */
-            tupdesc = CreateTupleDescCopy(tupdesc);
-            ntuples = PQntuples(res);
-            nfields = PQnfields(res);
-        }
+    rsinfo->setResult = sinfo->tuplestore;
+    rsinfo->setDesc = tupdesc;
+}
-        /*
-         * check result and tuple descriptor have the same number of columns
-         */
-        if (nfields != tupdesc->natts)
-            ereport(ERROR,
-                    (errcode(ERRCODE_DATATYPE_MISMATCH),
-                     errmsg("remote query result rowtype does not match "
-                            "the specified FROM clause rowtype")));
+static void
+finishStoreInfo(storeInfo *sinfo)
+{
+    if (sinfo->valbuf)
+    {
+        pfree(sinfo->valbuf);
+        sinfo->valbuf = NULL;
+    }
-        if (ntuples > 0)
-        {
-            AttInMetadata *attinmeta;
-            Tuplestorestate *tupstore;
-            MemoryContext oldcontext;
-            int            row;
-            char      **values;
-
-            attinmeta = TupleDescGetAttInMetadata(tupdesc);
-
-            oldcontext = MemoryContextSwitchTo(
-                                    rsinfo->econtext->ecxt_per_query_memory);
-            tupstore = tuplestore_begin_heap(true, false, work_mem);
-            rsinfo->setResult = tupstore;
-            rsinfo->setDesc = tupdesc;
-            MemoryContextSwitchTo(oldcontext);
+    if (sinfo->cstrs)
+    {
+        pfree(sinfo->cstrs);
+        sinfo->cstrs = NULL;
+    }
-            values = (char **) palloc(nfields * sizeof(char *));
+    MemoryContextSwitchTo(sinfo->oldcontext);
+}
-            /* put all tuples into the tuplestore */
-            for (row = 0; row < ntuples; row++)
-            {
-                HeapTuple    tuple;
+/* Prototype of this function is PQrowProcessor */
+static int
+storeHandler(PGresult *res, PGrowValue *columns, void *param)
+{
+    storeInfo *sinfo = (storeInfo *)param;
+    HeapTuple  tuple;
+    int        newbuflen;
+    int        fields = PQnfields(res);
+    int        i;
+    char       **cstrs = sinfo->cstrs;
+    char       *pbuf;
+
+    if (sinfo->error_occurred)
+        return -1;
+
+    if (sinfo->nattrs != fields)
+    {
+        sinfo->error_occurred = TRUE;
+        sinfo->nummismatch = TRUE;
+        finishStoreInfo(sinfo);
-                if (!is_sql_cmd)
-                {
-                    int            i;
+        /* This error will be processed in dblink_record_internal() */
+        return -1;
+    }
-                    for (i = 0; i < nfields; i++)
-                    {
-                        if (PQgetisnull(res, row, i))
-                            values[i] = NULL;
-                        else
-                            values[i] = PQgetvalue(res, row, i);
-                    }
-                }
-                else
-                {
-                    values[0] = PQcmdStatus(res);
-                }
+    /*
+     * value input functions assumes that the input string is
+     * terminated by zero. We should make the values to be so.
+     */
-                /* build the tuple and put it into the tuplestore. */
-                tuple = BuildTupleFromCStrings(attinmeta, values);
-                tuplestore_puttuple(tupstore, tuple);
-            }
+    /*
+     * The length of the buffer for each field is value length + 1 for
+     * zero-termination
+     */
+    newbuflen = fields;
+    for(i = 0 ; i < fields ; i++)
+        newbuflen += columns[i].len;
+
+    if (newbuflen > sinfo->valbuflen)
+    {
+        int tmplen = sinfo->valbuflen * 2;
+        /*
+         * Try to (re)allocate in bigger steps to avoid flood of allocations
+         * on weird data.
+         */
+        while (newbuflen > tmplen && tmplen >= 0)
+            tmplen *= 2;
-            /* clean up and return the tuplestore */
-            tuplestore_donestoring(tupstore);
-        }
+        /* Check if the integer was wrap-rounded. */
+        if (tmplen < 0)
+            elog(ERROR, "Buffer size for one row exceeds integer limit");
-        PQclear(res);
+        sinfo->valbuf = (char *)repalloc(sinfo->valbuf, tmplen);
+        sinfo->valbuflen = tmplen;    }
-    PG_CATCH();
+
+    pbuf = sinfo->valbuf;
+    for(i = 0 ; i < fields ; i++)    {
-        /* be sure to release the libpq result */
-        PQclear(res);
-        PG_RE_THROW();
+        int len = columns[i].len;
+        if (len < 0)
+            cstrs[i] = NULL;
+        else
+        {
+            cstrs[i] = pbuf;
+            memcpy(pbuf, columns[i].value, len);
+            pbuf += len;
+            *pbuf++ = '\0';
+        }    }
-    PG_END_TRY();
+
+    /*
+     * These functions may throw exception. It will be caught in
+     * dblink_record_internal()
+     */
+    tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
+    tuplestore_puttuple(sinfo->tuplestore, tuple);
+
+    return 1;}/*
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 4de28ef..05d7e98 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -733,6 +733,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)            else            {
      DBLINK_GET_CONN;
 
+                conname = text_to_cstring(PG_GETARG_TEXT_PP(0));                sql =
text_to_cstring(PG_GETARG_TEXT_PP(1));           }        }
 
@@ -763,6 +764,8 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)        else            /* shouldn't
happen*/            elog(ERROR, "wrong number of arguments");
 
+
+        conname = text_to_cstring(PG_GETARG_TEXT_PP(0));    }    if (!conn)
diff --git a/contrib/dblink/expected/dblink.out b/contrib/dblink/expected/dblink.out
index 511dd5e..2dcba15 100644
--- a/contrib/dblink/expected/dblink.out
+++ b/contrib/dblink/expected/dblink.out
@@ -371,7 +371,7 @@ SELECT *FROM dblink('myconn','SELECT * FROM foobar',false) AS t(a int, b text, c text[])WHERE t.a >
7;NOTICE: relation "foobar" does not exist
 
-CONTEXT:  Error occurred on dblink connection named "unnamed": could not execute query.
+CONTEXT:  Error occurred on dblink connection named "myconn": could not execute query. a | b | c ---+---+---(0 rows)

pgsql-hackers by date:

Previous
From: Greg Stark
Date:
Subject: Re: Cross-backend signals and administration (Was: Re: pg_terminate_backend for same-role)
Next
From: limaozeng
Date:
Subject: Can param evaluation use same estate as executor?