From 30af430274381220895382d2602537f7ae84ce40 Mon Sep 17 00:00:00 2001 From: Hari Babu Date: Sun, 17 Mar 2019 02:11:37 +1100 Subject: [PATCH 6/6] Primary, prefer-standby and standby options New options to check whether the server is in recovery mode or not, before considering them to connect. To confirm whether the server is running in recovery mode or not, it sends the query to server as 'SELECT pg_is_in_recovery()'. --- src/interfaces/libpq/fe-connect.c | 193 ++++++++++++++++++++++++++++-- src/interfaces/libpq/libpq-fe.h | 8 +- src/interfaces/libpq/libpq-int.h | 2 +- 3 files changed, 190 insertions(+), 13 deletions(-) diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index 2e16841703..16a1dd35b3 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -124,6 +124,7 @@ static int ldapServiceLookup(const char *purl, PQconninfoOption *options, #define DefaultOption "" #define DefaultAuthtype "" #define DefaultTargetSessionAttrs "any" +#define DefaultTargetServerType "any" #ifdef USE_SSL #define DefaultSSLMode "prefer" #else @@ -322,7 +323,7 @@ static const internalPQconninfoOption PQconninfoOptions[] = { {"target_session_attrs", "PGTARGETSESSIONATTRS", DefaultTargetSessionAttrs, NULL, - "Target-Session-Attrs", "", 12, /* sizeof("prefer-read") = 12 */ + "Target-Session-Attrs", "", 15, /* sizeof("prefer-standby") = 15 */ offsetof(struct pg_conn, target_session_attrs)}, /* Terminating entry --- MUST BE LAST */ @@ -1251,6 +1252,12 @@ connectOptions2(PGconn *conn) conn->requested_session_type = SESSION_TYPE_PREFER_READ; else if (strcmp(conn->target_session_attrs, "read-only") == 0) conn->requested_session_type = SESSION_TYPE_READ_ONLY; + else if (strcmp(conn->target_session_attrs, "primary") == 0) + conn->requested_session_type = SESSION_TYPE_PRIMARY; + else if (strcmp(conn->target_session_attrs, "prefer-standby") == 0) + conn->requested_session_type = SESSION_TYPE_PREFER_STANDBY; + else if (strcmp(conn->target_session_attrs, "standby") == 0) + conn->requested_session_type = SESSION_TYPE_STANDBY; else { conn->status = CONNECTION_BAD; @@ -2106,6 +2113,7 @@ PQconnectPoll(PGconn *conn) case CONNECTION_NEEDED: case CONNECTION_CHECK_WRITABLE: case CONNECTION_CONSUME: + case CONNECTION_CHECK_RECOVERY: break; default: @@ -3259,7 +3267,9 @@ keep_going: /* We will come back to here until there is * may just skip the test in that case. */ if (conn->sversion >= 70400 && - conn->requested_session_type != SESSION_TYPE_ANY) + (conn->requested_session_type == SESSION_TYPE_READ_WRITE || + conn->requested_session_type == SESSION_TYPE_PREFER_READ || + conn->requested_session_type == SESSION_TYPE_READ_ONLY)) { if (conn->sversion < 120000) { @@ -3346,21 +3356,52 @@ keep_going: /* We will come back to here until there is return PGRES_POLLING_OK; } } + else if ((conn->sversion >= 90000 && + (conn->requested_session_type == SESSION_TYPE_PRIMARY || + conn->requested_session_type == SESSION_TYPE_PREFER_STANDBY || + conn->requested_session_type == SESSION_TYPE_STANDBY))) + { + /* + * Save existing error messages across the PQsendQuery + * attempt. This is necessary because PQsendQuery is + * going to reset conn->errorMessage, so we would lose + * error messages related to previous hosts we have tried + * and failed to connect to. + */ + if (!saveErrorMessage(conn, &savedMessage)) + goto error_return; + + conn->status = CONNECTION_OK; + if (!PQsendQuery(conn, + "SELECT pg_is_in_recovery()")) + { + restoreErrorMessage(conn, &savedMessage); + goto error_return; + } + + conn->status = CONNECTION_CHECK_RECOVERY; + + restoreErrorMessage(conn, &savedMessage); + return PGRES_POLLING_READING; + } /* - * Requested type is prefer-read, then record this host index - * and try the other before considering it later + * Requested type is prefer-read or prefer-standby, then record + * this host index and try others before considering it later */ - if ((conn->requested_session_type == SESSION_TYPE_PREFER_READ && + if (((conn->requested_session_type == SESSION_TYPE_PREFER_READ || + conn->requested_session_type == SESSION_TYPE_PREFER_STANDBY) && conn->read_write_host_index != -2) || - conn->requested_session_type == SESSION_TYPE_READ_ONLY) + (conn->requested_session_type == SESSION_TYPE_READ_ONLY || + conn->requested_session_type == SESSION_TYPE_STANDBY)) { /* Close connection politely. */ conn->status = CONNECTION_OK; sendTerminateConn(conn); /* Record read-write host index */ - if (conn->requested_session_type == SESSION_TYPE_PREFER_READ && + if ((conn->requested_session_type == SESSION_TYPE_PREFER_READ || + conn->requested_session_type == SESSION_TYPE_PREFER_STANDBY) && conn->read_write_host_index == -1) conn->read_write_host_index = conn->whichhost; @@ -3450,11 +3491,14 @@ keep_going: /* We will come back to here until there is * is found in the first scan). */ if ((readonly_server && - conn->requested_session_type == SESSION_TYPE_READ_WRITE) || + (conn->requested_session_type == SESSION_TYPE_READ_WRITE || + conn->requested_session_type == SESSION_TYPE_PRIMARY)) || (!readonly_server && - ((conn->requested_session_type == SESSION_TYPE_PREFER_READ && + (((conn->requested_session_type == SESSION_TYPE_PREFER_READ || + conn->requested_session_type == SESSION_TYPE_PRIMARY) && conn->read_write_host_index != -2) || - conn->requested_session_type == SESSION_TYPE_READ_ONLY))) + (conn->requested_session_type == SESSION_TYPE_READ_ONLY || + conn->requested_session_type == SESSION_TYPE_STANDBY)))) { /* Not a requested type; fail this connection. */ PQclear(res); @@ -3542,6 +3586,135 @@ keep_going: /* We will come back to here until there is goto keep_going; } + case CONNECTION_CHECK_RECOVERY: + { + const char *displayed_host; + const char *displayed_port; + + if (!saveErrorMessage(conn, &savedMessage)) + goto error_return; + + conn->status = CONNECTION_OK; + if (!PQconsumeInput(conn)) + { + restoreErrorMessage(conn, &savedMessage); + goto error_return; + } + + if (PQisBusy(conn)) + { + conn->status = CONNECTION_CHECK_RECOVERY; + restoreErrorMessage(conn, &savedMessage); + return PGRES_POLLING_READING; + } + + res = PQgetResult(conn); + if (res && (PQresultStatus(res) == PGRES_TUPLES_OK) && + PQntuples(res) == 1) + { + char *val; + bool standby_server; + + val = PQgetvalue(res, 0, 0); + standby_server = (strncmp(val, "t", 1) == 0); + + /* + * Server is in recovery mode and requested mode is primary, + * ignore it. Server is not in recovery mode and requested mode is + * prefer-standby, record it for the first time and try to + * consume in the next scan (it means no standby server + * is found in the first scan). + */ + if ((standby_server && + conn->requested_session_type == SESSION_TYPE_PRIMARY) || + (!standby_server && + (((conn->requested_session_type == SESSION_TYPE_PREFER_READ) && + conn->read_write_host_index != -2) || + conn->requested_session_type == SESSION_TYPE_STANDBY))) + { + /* Not a requested type; fail this connection. */ + PQclear(res); + restoreErrorMessage(conn, &savedMessage); + + /* Append error report to conn->errorMessage. */ + if (conn->connhost[conn->whichhost].type == CHT_HOST_ADDRESS) + displayed_host = conn->connhost[conn->whichhost].hostaddr; + else + displayed_host = conn->connhost[conn->whichhost].host; + displayed_port = conn->connhost[conn->whichhost].port; + if (displayed_port == NULL || displayed_port[0] == '\0') + displayed_port = DEF_PGPORT_STR; + + if (conn->requested_session_type == SESSION_TYPE_PRIMARY) + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("server is in recovery mode " + "\"%s:%s\"\n"), + displayed_host, displayed_port); + else + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("server is not in recovery mode " + "\"%s:%s\"\n"), + displayed_host, displayed_port); + + /* Close connection politely. */ + conn->status = CONNECTION_OK; + sendTerminateConn(conn); + + /* Record read-write host index */ + if (!standby_server && + conn->requested_session_type == SESSION_TYPE_PREFER_STANDBY && + conn->read_write_host_index == -1) + conn->read_write_host_index = conn->whichhost; + + /* + * Try next host if any, but we don't want to consider + * additional addresses for this host. + */ + conn->try_next_host = true; + goto keep_going; + } + + /* Session is requested type, so we're good. */ + PQclear(res); + termPQExpBuffer(&savedMessage); + + /* + * Finish reading any remaining messages before being + * considered as ready. + */ + conn->status = CONNECTION_CONSUME; + goto keep_going; + } + + /* + * Something went wrong with "SELECT pg_is_in_recovery()". We + * should try next addresses. + */ + if (res) + PQclear(res); + restoreErrorMessage(conn, &savedMessage); + + /* Append error report to conn->errorMessage. */ + if (conn->connhost[conn->whichhost].type == CHT_HOST_ADDRESS) + displayed_host = conn->connhost[conn->whichhost].hostaddr; + else + displayed_host = conn->connhost[conn->whichhost].host; + displayed_port = conn->connhost[conn->whichhost].port; + if (displayed_port == NULL || displayed_port[0] == '\0') + displayed_port = DEF_PGPORT_STR; + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("test \"SELECT pg_is_in_recovery()\" failed " + "on server \"%s:%s\"\n"), + displayed_host, displayed_port); + + /* Close connection politely. */ + conn->status = CONNECTION_OK; + sendTerminateConn(conn); + + /* Try next address */ + conn->try_next_addr = true; + goto keep_going; + } default: appendPQExpBuffer(&conn->errorMessage, libpq_gettext("invalid connection state %d, " diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index 5d0b885dae..4c1b849019 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -67,7 +67,8 @@ typedef enum * connection. */ CONNECTION_CONSUME, /* Wait for any pending message and consume * them. */ - CONNECTION_CHECK_TARGET /* Check if we have a proper target connection */ + CONNECTION_CHECK_TARGET, /* Check if we have a proper target connection */ + CONNECTION_CHECK_RECOVERY /* Check whether server is in recovery */ } ConnStatusType; typedef enum @@ -75,7 +76,10 @@ typedef enum SESSION_TYPE_ANY = 0, /* Any session (default) */ SESSION_TYPE_READ_WRITE, /* Read-write session */ SESSION_TYPE_PREFER_READ, /* Prefer read only session */ - SESSION_TYPE_READ_ONLY /* Read only session */ + SESSION_TYPE_READ_ONLY, /* Read only session */ + SESSION_TYPE_PRIMARY, /* Primary server */ + SESSION_TYPE_PREFER_STANDBY, /* Prefer Standby server */ + SESSION_TYPE_STANDBY /* Standby server */ } TargetSessionAttrsType; typedef enum diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 62a1b7bbd4..f4e9c1f64b 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -365,7 +365,7 @@ struct pg_conn /* * Type of connection to make. Possible values: any, read-write, - * prefer-read and read-only. + * prefer-read, read-only, primary, prefer-standby and standby. */ char *target_session_attrs; TargetSessionAttrsType requested_session_type; -- 2.20.1.windows.1