Re: Streaming Replication on win32 - Mailing list pgsql-hackers

From Heikki Linnakangas
Subject Re: Streaming Replication on win32
Date
Msg-id 4B584C99.8090004@enterprisedb.com
Whole thread Raw
In response to Re: Streaming Replication on win32  (Heikki Linnakangas <heikki.linnakangas@enterprisedb.com>)
Responses Re: Streaming Replication on win32  (Joe Conway <mail@joeconway.com>)
List pgsql-hackers
Heikki Linnakangas wrote:
> Magnus Hagander wrote:
>> 2010/1/17 Heikki Linnakangas <heikki.linnakangas@enterprisedb.com>:
>>> We could replace the blocking PQexec() calls with PQsendQuery(), and use
>>>  the emulated version of select() to wait.
>> Hmm. That would at least theoretically work, but aren't there still
>> places we may end up blocking further down? Or are those ok?
>
> There's also PQconnect that needs similar treatment (using
> PQconnectStart/Poll()), but that's it.

So here's a patch implementing that for contrib/dblink. Walreceiver
needs the same treatment.

The implementation should be shared between the two, but I'm not sure
how. We can't just put the wrapper functions to a module in
src/backend/port/, because the wrapper functions depend on libpq. Maybe
put them in a new header file as static functions, and include that in
contrib/dblink/dblink.c and src/backend/replication/libpqwalreceiver.c.

--
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 2c1d7a2..fa11709 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -34,6 +34,14 @@

 #include <limits.h>

+#ifdef WIN23
+/* These are needed by the interruptible libpq function replacements */
+#include <time.h>
+#include <unistd.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#endif
+
 #include "libpq-fe.h"
 #include "fmgr.h"
 #include "funcapi.h"
@@ -101,6 +109,193 @@ static void dblink_res_error(const char *conname, PGresult *res, const char *dbl
 static char *get_connect_string(const char *servername);
 static char *escape_param_str(const char *from);

+#ifdef WIN23
+/*
+ * Replacement functions for blocking libpq functions, for Windows.
+ *
+ * On Windows, the vanilla select() function doesn't react to our emulated
+ * signals. PQexec() and PQconnectdb() use select(), so they're
+ * uninterruptible. These replacement functions use the corresponding
+ * asynchronous libpq functions and backend version of select() to implement
+ * the same functionality, but in a way that's interrupted by signals.
+ *
+ * These work on other platforms as well, but presumably it's more efficient
+ * to let libpq block.
+ */
+
+static PGresult *
+dblink_PQexec(PGconn *conn, const char *command)
+{
+    int            sock;
+    PGresult   *result,
+               *lastResult;
+
+    /* Send query. This can block too, but we ignore that for now. */
+    if (PQsendQuery(conn, command) == 0)
+        return NULL;
+
+    /* Wait for response */
+    sock = PQsocket(conn);
+
+    while(PQisBusy(conn))
+    {
+        fd_set        input_mask;
+
+        FD_ZERO(&input_mask);
+
+        FD_SET        (sock, &input_mask);
+
+        /*
+         * Note that we don't check the return code. We assume that
+         * PQconsumeInput() will get the same error, and set the result
+         * as failed.
+         */
+        select(sock + 1, &input_mask, NULL, NULL, NULL);
+        PQconsumeInput(conn);
+    }
+
+    /*
+     * Emulate PQexec()'s behavior of returning the *last* result, if
+     * there's many. dblink doesn't normally issue statements that return
+     * multiple results, but the user-supplied SQL statement passed to
+     * dblink() might. You'll only get the last result back, so it's not a
+     * very sensible thing to do, but we must still handle it gracefully.
+     *
+     * We don't try to concatenate error messages like PQexec() does.
+     * Doesn't seem worth the effort.
+     */
+    lastResult = NULL;
+    while((result = PQgetResult(conn)) != NULL)
+    {
+        if (lastResult != NULL)
+        {
+            if (PQresultStatus(lastResult) != PGRES_COMMAND_OK &&
+                PQresultStatus(lastResult) != PGRES_TUPLES_OK)
+            {
+                PQclear(result);
+                result = lastResult;
+            }
+            else
+                PQclear(lastResult);
+        }
+        lastResult = result;
+    }
+
+    return lastResult;
+}
+
+static PGconn *
+dblink_PQconnectdb(const char *conninfo)
+{
+    PGconn *conn;
+    PostgresPollingStatusType status;
+    PQconninfoOption *options;
+    int timeout_secs = 0;
+    time_t end_time;
+    int sock;
+
+    conn = PQconnectStart(conninfo);
+    if (conn == NULL)
+        return NULL;
+
+    if (PQstatus(conn) == CONNECTION_BAD)
+        return conn;
+
+    /* Extract timeout from the connection string */
+    options = PQconninfoParse(conninfo, NULL);
+    if (options)
+    {
+        PQconninfoOption *option;
+        for (option = options; option->keyword != NULL; option++)
+        {
+            if (strcmp(option->keyword, "connect_timeout") == 0)
+            {
+                if (option->val != NULL && option->val[0] != '\0')
+                {
+                    timeout_secs = atoi(option->val);
+                    break;
+                }
+            }
+        }
+        PQconninfoFree(options);
+    }
+    if (timeout_secs > 0)
+        end_time = time(NULL) + timeout_secs;
+
+    sock = PQsocket(conn);
+
+    /* Wait for connection to be established */
+    for (;;)
+    {
+        fd_set    input_mask;
+        fd_set    output_mask;
+        time_t    now;
+        struct timeval timeout;
+        struct timeval *timeout_ptr;
+
+        FD_ZERO(&input_mask);
+        FD_ZERO(&output_mask);
+
+        status = PQconnectPoll(conn);
+        switch(status)
+        {
+            case PGRES_POLLING_OK:
+            case PGRES_POLLING_FAILED:
+                return conn;
+
+            case PGRES_POLLING_READING:
+                FD_SET(sock, &input_mask);
+                break;
+
+            case PGRES_POLLING_WRITING:
+                FD_SET(sock, &output_mask);
+                break;
+
+            default:
+                elog(ERROR, "unknown PQconnectPoll() return value: %d", status);
+        }
+
+        if (timeout_secs > 0)
+        {
+            now = time(NULL);
+            timeout.tv_sec = (now > end_time) ? 0 : (end_time - now);
+            timeout.tv_usec = 0;
+            timeout_ptr = &timeout;
+        }
+        else
+            timeout_ptr = NULL;
+
+        /*
+         * Note that we don't check an error code. We assume that
+         * PQconnectPoll() will get the same error, and return failure.
+         */
+        if (select(sock + 1, &input_mask, &output_mask, NULL, timeout_ptr) == 0)
+        {
+            /* Timeout */
+            PQfinish(conn);
+
+            /*
+             * This message is subtly different from the one from the message
+             * you get on other platforms, where PQconnectdb() handles the
+             * timeout. The "timeout expired" message here gets translated
+             * using the backend .po file, while the message emitted by
+             * PQconnectdb() is translated using libpq .po file. I hope that
+             * makes no difference in practice.
+             */
+            ereport(ERROR,
+                    (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
+                     errmsg("could not establish connection"),
+                     errdetail("timeout expired")));
+        }
+    }
+    return NULL; /* not reached, keep compiler quiet */
+}
+
+#define PQexec(conn, command) dblink_PQexec(conn, command)
+#define PQconnectdb(conninfo) dblink_PQconnectdb(conninfo)
+
+#endif
+
 /* Global */
 static remoteConn *pconn = NULL;
 static HTAB *remoteConnHash = NULL;

pgsql-hackers by date:

Previous
From: Dave Page
Date:
Subject: Re: 8.5 vs. 9.0
Next
From: Fujii Masao
Date:
Subject: Streaming replication and a disk full in primary