diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index 4e34f00..adbad75 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -792,7 +792,7 @@ host=localhost port=5432 dbname=mydb connect_timeout=10 The general form for a connection URI is: -postgresql://[user[:password]@][netloc][:port][/dbname][?param1=value1&...] +postgresql://[user[:password]@][netloc][:port][,netloc[:port]...][/dbname][?param1=value1&...] @@ -809,6 +809,7 @@ postgresql://localhost/mydb postgresql://user@localhost postgresql://user:secret@localhost postgresql://other@localhost/otherdb?connect_timeout=10&application_name=myapp +postgresql://node1,node2:5433,node3:4432,node4/mydb?hostorder=random&target_server_type=any Components of the hierarchical part of the URI can also be given as parameters. For example: @@ -831,7 +832,9 @@ postgresql:///mydb?host=localhost&port=5433 For improved compatibility with JDBC connection URIs, instances of parameter ssl=true are translated into - sslmode=require. + sslmode=require and + loadBalanceHosts=true into + hostorder=random. @@ -841,6 +844,10 @@ postgresql:///mydb?host=localhost&port=5433 postgresql://[2001:db8::1234]/database + + There can be several host specifications, optionally accompanied + with port, separated by comma. + The host component is interpreted as described for the parameter PostgreSQL was built). On machines without Unix-domain sockets, the default is to connect to localhost. + + There can be more than one host parameter in + the connection string. In this case these hosts would be considered + alternate entries into same database and if connection to first one + fails, the second would be attemped, and so on. This can be used + for high availability clusters or for load balancing. See the + parameter. + + + The network host name can be accompanied by a port number, separated by + colon. This port number is used only when connected to + this host. If there is no port number, the port specified in the + parameter would be used instead. + @@ -943,7 +964,42 @@ postgresql://%2Fvar%2Flib%2Fpostgresql/dbname + + hostorder + + + Specifies how to choose the host from the list of alternate hosts, + specified in the parameter. + + + If the value of this argument is sequential (the + default) connections to the hosts will be attempted in the order in + which they are specified. + + + If the value is random, the host to connect to + will be randomly picked from the list. It allows load balacing between + several cluster nodes. However, PostgreSQL doesn't currently support + multimaster clusters. So, without the use of third-party products, + only read-only connections can take advantage from + load-balancing. See + + + + + target_server_type + + + If this parameter is master, upon successful connection + the host is checked to determine whether it is in a recovery state. If it + is, it then tries next host in the connection string. If this parameter is + any, connection to standby nodes are considered + successful. + + + + port @@ -985,7 +1041,6 @@ postgresql://%2Fvar%2Flib%2Fpostgresql/dbname - connect_timeout @@ -996,7 +1051,28 @@ postgresql://%2Fvar%2Flib%2Fpostgresql/dbname - + + failover_timeout + + + Maximum time to cyclically retry all the hosts in the connection string. + (as decimal integer number of seconds). If not specified, then + hosts are tried just once. + + + If we have replicating cluster, and master node fails, it might + take some time to promote one of the standby nodes to the new master. + So clients which detect failure to connect to the master might + abandon attempts to reestablish a connection before the new master + becomes available. + + + Setting this parameter to a value that takes into account the amount of + time needed for failover to complete will ensure attempts to connect + to hosts continue to be made until the new master becomes available. + + + client_encoding @@ -7227,6 +7303,19 @@ user=admin An example file is provided at share/pg_service.conf.sample. + + If more than one host option is present in the same + section of the service file, it is interpeted as alternate servers for + failover or load-balancing. See + option in the connection string. + + + For all other options, the first value takes precedence over later ones. + + + Options specified in the connection string, along with the service option, + have precedence over values from the service file. + diff --git a/src/interfaces/libpq/Makefile b/src/interfaces/libpq/Makefile index b1789eb..4c14cc5 100644 --- a/src/interfaces/libpq/Makefile +++ b/src/interfaces/libpq/Makefile @@ -36,7 +36,7 @@ OBJS= fe-auth.o fe-connect.o fe-exec.o fe-misc.o fe-print.o fe-lobj.o \ libpq-events.o # libpgport C files we always use OBJS += chklocale.o inet_net_ntop.o noblock.o pgstrcasecmp.o pqsignal.o \ - thread.o + thread.o pgsleep.o # libpgport C files that are needed if identified by configure OBJS += $(filter crypt.o getaddrinfo.o getpeereid.o inet_aton.o open.o system.o snprintf.o strerror.o strlcpy.o win32error.o win32setlocale.o, $(LIBOBJS)) # src/backend/utils/mb @@ -129,6 +129,9 @@ install: all installdirs install-lib installcheck: $(MAKE) -C test $@ +check: + $(prove_check) + installdirs: installdirs-lib $(MKDIR_P) '$(DESTDIR)$(includedir)' '$(DESTDIR)$(includedir_internal)' '$(DESTDIR)$(datadir)' @@ -142,6 +145,7 @@ uninstall: uninstall-lib clean distclean: clean-lib $(MAKE) -C test $@ rm -f $(OBJS) pthread.h libpq.rc + rm -rf tmp_check # Might be left over from a Win32 client-only build rm -f pg_config_paths.h rm -f inet_net_ntop.c noblock.c pgstrcasecmp.c pqsignal.c thread.c diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index f3a9e5a..3bc58a1 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -299,7 +299,16 @@ static const internalPQconninfoOption PQconninfoOptions[] = { {"replication", NULL, NULL, NULL, "Replication", "D", 5, offsetof(struct pg_conn, replication)}, - + /* Parameters added by failover patch */ + {"hostorder", NULL, "sequential", NULL, + "Host order", "", 10, + offsetof(struct pg_conn, hostorder)}, + {"target_server_type", NULL, NULL, NULL, + "Target server type", "", 6, + offsetof(struct pg_conn, target_server_type)}, + {"failover_timeout", NULL, NULL, NULL, + "Failover Timeout", "", 10, + offsetof(struct pg_conn, failover_timeout)}, /* Terminating entry --- MUST BE LAST */ {NULL, NULL, NULL, NULL, NULL, NULL, 0} @@ -336,6 +345,7 @@ static PGconn *makeEmptyPGconn(void); static bool fillPGconn(PGconn *conn, PQconninfoOption *connOptions); static void freePGconn(PGconn *conn); static void closePGconn(PGconn *conn); +static void pqTerminateConn(PGconn *conn); static PQconninfoOption *conninfo_init(PQExpBuffer errorMessage); static PQconninfoOption *parse_connection_string(const char *conninfo, PQExpBuffer errorMessage, bool use_defaults); @@ -380,6 +390,7 @@ static bool getPgPassFilename(char *pgpassfile); static void dot_pg_pass_warning(PGconn *conn); static void default_threadlock(int acquire); +static int try_next_address(PGconn *conn); /* global variable because fe-auth.c needs to access it */ pgthreadlock_t pg_g_threadlock = default_threadlock; @@ -806,7 +817,9 @@ connectOptions2(PGconn *conn) { if (conn->pgpass) free(conn->pgpass); - conn->pgpass = PasswordFromFile(conn->pghost, conn->pgport, + conn->pgpass = PasswordFromFile( + conn->actualhost ? conn->actualhost : conn->pghost, + conn->actualport ? conn->actualport : conn->pgport, conn->dbName, conn->pguser); if (conn->pgpass == NULL) { @@ -890,6 +903,21 @@ connectOptions2(PGconn *conn) } /* + * Validate target_server_mode option. + */ + if (conn->target_server_type) + { + if (strcmp(conn->target_server_type, "any") != 0 + && strcmp(conn->target_server_type, "master") != 0) + { + conn->status = CONNECTION_BAD; + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("invalid target_server_type value: \"%s\" should be \"any\" or \"master\"\n"), + conn->target_server_type); + return false; + } + } + /* * Only if we get this far is it appropriate to try to connect. (We need a * state flag, rather than just the boolean result of this function, in * case someone tries to PQreset() the PGconn.) @@ -1173,6 +1201,8 @@ connectFailureMessage(PGconn *conn, int errorno) if (conn->pghostaddr && conn->pghostaddr[0] != '\0') displayed_host = conn->pghostaddr; + else if (conn->actualhost && conn->actualhost[0] != '\0') + displayed_host = conn->actualhost; else if (conn->pghost && conn->pghost[0] != '\0') displayed_host = conn->pghost; else @@ -1390,11 +1420,17 @@ setKeepalivesWin32(PGconn *conn) static int connectDBStart(PGconn *conn) { + struct nodeinfo + { + char *host; + char *port; + }; int portnum; char portstr[MAXPGPATH]; struct addrinfo *addrs = NULL; struct addrinfo hint; - const char *node; + struct nodeinfo *nodes, + *node; int ret; if (!conn) @@ -1436,21 +1472,184 @@ connectDBStart(PGconn *conn) if (conn->pghostaddr != NULL && conn->pghostaddr[0] != '\0') { /* Using pghostaddr avoids a hostname lookup */ - node = conn->pghostaddr; + + nodes = calloc(sizeof(struct nodeinfo), 2); + if (nodes == NULL) + { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("Out of memory\n")); + conn->options_valid = false; + goto connect_errReturn; + } + + nodes->host = strdup(conn->pghostaddr); + if (nodes->host == NULL) + { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("Out of memory\n")); + conn->options_valid = false; + goto connect_errReturn; + } + hint.ai_family = AF_UNSPEC; hint.ai_flags = AI_NUMERICHOST; } else if (conn->pghost != NULL && conn->pghost[0] != '\0') { /* Using pghost, so we have to look-up the hostname */ - node = conn->pghost; + char *p = conn->pghost, + *q, + *r; + int nodecount = 0, + nodesallocated = 4; + + /* + * Parse comma-separated list of host-port pairs into function-local + * array of records. + */ + nodes = malloc(sizeof(struct nodeinfo) * 4); + if (nodes == NULL) + { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("Out of memory\n")); + conn->options_valid = false; + goto connect_errReturn; + } + + while (*p) + { + q = p; + r = NULL; + + /* Scan for the comma or end of string */ + while (*q != ',' && *q != 0) + { + if (*q == ':') + r = q; + if (*q == ']') + r = NULL; /* if there is IPv6, colons before close + * bracket are part of address */ + q++; + } + if (r) + { + /* Host has explicitely specified port */ + char *nptr; + + /* Check if port is numeric */ + for (nptr=r+1;nptr'9') + { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("Port is not numeric")); + conn->options_valid = false; + goto connect_errReturn; + } + } + + /* Allocate memory for port string */ + nodes[nodecount].port = malloc(q - r); + if (nodes[nodecount].port == NULL) + { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("Out of memory\n")); + conn->options_valid = false; + goto connect_errReturn; + } + + strncpy(nodes[nodecount].port, r + 1, q - r); + nodes[nodecount].port[q - r - 1] = 0; + } + else + { + r = q; + nodes[nodecount].port = NULL; + } + + if ((*p) == '[' && *(r - 1) == ']') + { + /* IPv6 address found. Strip brackets */ + p++; + r--; + } + + /* Fill node record */ + nodes[nodecount].host = malloc(r - p + 1); + if (nodes[nodecount].host == NULL) + { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("Out of memory\n")); + conn->options_valid = false; + goto connect_errReturn; + } + + strncpy(nodes[nodecount].host, p, r - p); + nodes[nodecount].host[r - p] = 0; + + /* skip a comma */ + if (*q) + q++; + + nodecount++; + if (nodecount == nodesallocated) + nodes = realloc(nodes, sizeof(struct nodeinfo) * (nodesallocated += 4)); + if (nodes == NULL) + { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("Out of memory\n")); + conn->options_valid = false; + goto connect_errReturn; + } + + p = q; + } + + /* Fill end-of-host list marker */ + nodes[nodecount].host = NULL; + nodes[nodecount].port = NULL; hint.ai_family = AF_UNSPEC; + if (nodecount > 1 && conn->target_server_type == NULL) + { + /* + * if there is more than one host in the connect string and + * target_server_type is not specified explicitely, set + * target_server_type to "master", because default mode of + * operation is failover, and so, we need to connect to RW + * host, and keep other nodes of the cluster in the connect + * string just in case master would fail and one of standbys + * would be promoted to master. + * + * If we want to loadbalance readonly queries, set + * target_server_type = "any" explicitely + * + * But global default is "any" because if there is only one + * host in the connect string, we want backward-compatible + * behavoir. + */ + conn->target_server_type = strdup("master"); + if (conn->target_server_type == NULL) + { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("Out of memory\n")); + conn->options_valid = false; + goto connect_errReturn; + } + } } else { #ifdef HAVE_UNIX_SOCKETS /* pghostaddr and pghost are NULL, so use Unix domain socket */ - node = NULL; + nodes = calloc(sizeof(struct nodeinfo), 2); + if (nodes == NULL) + { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("Out of memory\n")); + conn->options_valid = false; + goto connect_errReturn; + } + hint.ai_family = AF_UNIX; UNIXSOCK_PATH(portstr, portnum, conn->pgunixsocket); if (strlen(portstr) >= UNIXSOCK_PATH_BUFLEN) @@ -1460,33 +1659,113 @@ connectDBStart(PGconn *conn) portstr, (int) (UNIXSOCK_PATH_BUFLEN - 1)); conn->options_valid = false; + free(nodes); + goto connect_errReturn; + } + + nodes->port = strdup(portstr); + if (nodes->port == NULL) + { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("Out of memory\n")); + conn->options_valid = false; goto connect_errReturn; } #else /* Without Unix sockets, default to localhost instead */ - node = DefaultHost; + nodes = calloc(sizeof(struct nodeinfo), 2); + if (nodes == NULL) + { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("Out of memory\n")); + conn->options_valid = false; + goto connect_errReturn; + } + hint.ai_family = AF_UNSPEC; + nodes->host = strdup(DefaultHost); + if (nodes->host == NULL) + { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("Out of memory\n")); + conn->options_valid = false; + goto connect_errReturn; + } #endif /* HAVE_UNIX_SOCKETS */ } /* Use pg_getaddrinfo_all() to resolve the address */ - ret = pg_getaddrinfo_all(node, portstr, &hint, &addrs); - if (ret || !addrs) + /* loop over all the host specs in the node variable */ + for (node = nodes; node->host != NULL || node->port != NULL; node++) { - if (node) - appendPQExpBuffer(&conn->errorMessage, - libpq_gettext("could not translate host name \"%s\" to address: %s\n"), - node, gai_strerror(ret)); + struct addrinfo *this_node_addrs; + + /* Resolve each hostname into list of addrinfo structures */ + ret = pg_getaddrinfo_all(node->host, (node->port ? node->port : portstr), + &hint, &this_node_addrs); + if (ret || !this_node_addrs) + { + if (node->host) + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("could not translate host name \"%s\" to address: %s\n"), + node->host, gai_strerror(ret)); + else + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("could not translate Unix-domain socket path \"%s\" to address: %s\n"), + node->port, gai_strerror(ret)); + if (this_node_addrs) + pg_freeaddrinfo_all(hint.ai_family, this_node_addrs); + + /* + * We shouldn't fail here unless there is no valid addrinfos left + */ + continue; + } + + if (node->host) + { + struct addrinfo *n; + + for (n = this_node_addrs; n != NULL; n = n->ai_next) + { + n->ai_canonname = strdup(node->host); + } + } + + /* add this host addrs to addrs field of PGconn structure */ + if (!addrs) + { + addrs = this_node_addrs; + } else - appendPQExpBuffer(&conn->errorMessage, - libpq_gettext("could not translate Unix-domain socket path \"%s\" to address: %s\n"), - portstr, gai_strerror(ret)); - if (addrs) - pg_freeaddrinfo_all(hint.ai_family, addrs); + { + struct addrinfo *p; + + /* This loop finds pointer to the last element of the list */ + for (p = addrs; p->ai_next != NULL; p = p->ai_next) + { + } + p->ai_next = this_node_addrs; + } + } + + /* Free nodes array */ + for (node = nodes; node->host != NULL || node->port != NULL; node++) + { + if (node->host) + free(node->host); + if (node->port) + free(node->port); + } + + free(nodes); + + /* Check if we've found at least one usable address */ + if (!addrs) + { conn->options_valid = false; goto connect_errReturn; } - #ifdef USE_SSL /* setup values based on SSL mode */ if (conn->sslmode[0] == 'd') /* "disable" */ @@ -1499,11 +1778,26 @@ connectDBStart(PGconn *conn) * Set up to try to connect, with protocol 3.0 as the first attempt. */ conn->addrlist = addrs; - conn->addr_cur = addrs; + + /* + * We cannot just assign first addrs record to addr_cur, because host + * order may be random. So, use try_next_address + */ + conn->addr_cur = NULL; + try_next_address(conn); conn->addrlist_family = hint.ai_family; conn->pversion = PG_PROTOCOL(3, 0); conn->send_appname = true; conn->status = CONNECTION_NEEDED; + if (conn->failover_timeout) + { + conn->failover_finish_time = time(NULL) + atoi(conn->failover_timeout); + } + else + { + conn->failover_finish_time = (time_t) 0; /* it is in past, so its + * ok */ + } /* * The code for processing CONNECTION_NEEDED state is in PQconnectPoll(), @@ -1521,6 +1815,23 @@ connect_errReturn: return 0; } +/* + * This function is used to convert integer port number from the + * addrinfo structure back into string representation, because function + * PQport needs to return string representation of port. + */ + +static char * +get_port_from_addrinfo(struct addrinfo * ai) +{ + char port[6]; + + sprintf(port, "%d", htons(((struct sockaddr_in *) ai->ai_addr)->sin_port)); + + /* allocation failure must be checked by caller */ + return strdup(port); +} + /* * connectDBComplete @@ -1603,6 +1914,118 @@ connectDBComplete(PGconn *conn) } } +/* + * Gets address of pointer to the list of addrinfo sturctures. + * If order is random, rearranges the list by moving random element to + * the beginning (and putting its addres into given pointer. + * Returns address of first list element + */ +static struct addrinfo * +get_next_element(struct addrinfo ** list, char *order) +{ + struct addrinfo *choice = NULL, + *prev, + *current, + *prechoice = NULL; + int count = 0; + + if (*list == NULL) + return NULL; + if (strcmp(order, "random") == 0) + { + /* Peek random element from the list. */ + for (current = *list, prev = NULL; current != NULL; + prev = current, current = current->ai_next) + { + count++; + if ((rand() & 0xffff) < 0x10000 / count) + { + choice = current; + prechoice = prev; + } + } + + /* + * If prechoice is not NULL, selected element is not first in the + * list. We have to move it to he head + */ + if (prechoice != NULL) + { + prechoice->ai_next = choice->ai_next; + choice->ai_next = *list; + *list = choice; + } + } + + /* We always return first element of the list */ + return *list; +} + +/* ------------- + * try_next_address + * Attempts to set next address from the list of known ones. + * Returns 1 if address is choosen and 0 if there are no more addresses + * to try + * Takes into account hostorder parameter + * ------------ + */ + +static int +try_next_address(PGconn *conn) +{ + if (strcmp(conn->hostorder, "random") == 0) + { + /* + * Initialize random number generator in case if nobody have done it + * before. Use value from rand along with time in case random number + * have been initialized by application. Use address of conn structure + * to load-balance different connections in the same app + */ + srand((unsigned int) ((long int) conn ^ (long int) time(NULL) ^ + (long int) rand())); + } + + if (conn->addr_cur == NULL) + { + + conn->addr_cur = get_next_element(&(conn->addrlist), + conn->hostorder); + conn->actualhost = conn->addr_cur->ai_canonname; + + return 1; + } + else + { + conn->addr_cur = get_next_element(&(conn->addr_cur->ai_next), + conn->hostorder); + } + + if (conn->addr_cur == NULL && time(NULL) < conn->failover_finish_time) + { + /* + * If failover timeout is set, retry list of hosts from the beginning + */ + pg_usleep(1000000); + conn->addr_cur = get_next_element(&(conn->addrlist), + conn->hostorder); + } + + if (conn->addr_cur != NULL) + { + /* + * Clean up error message buffer. + */ + resetPQExpBuffer(&conn->errorMessage); + conn->actualhost = conn->addr_cur->ai_canonname; + + return 1; + } + else + { + return 0; + } +} + /* ---------------- * PQconnectPoll * @@ -1681,6 +2104,9 @@ PQconnectPoll(PGconn *conn) case CONNECTION_NEEDED: break; + case CONNECTION_CHECK_RW: + break; + default: appendPQExpBufferStr(&conn->errorMessage, libpq_gettext( @@ -1718,11 +2144,11 @@ keep_going: /* We will come back to here until there is * ignore socket() failure if we have more addresses * to try */ - if (addr_cur->ai_next != NULL) + if (try_next_address(conn)) { - conn->addr_cur = addr_cur->ai_next; continue; } + appendPQExpBuffer(&conn->errorMessage, libpq_gettext("could not create socket: %s\n"), SOCK_STRERROR(SOCK_ERRNO, sebuf, sizeof(sebuf))); @@ -1739,7 +2165,7 @@ keep_going: /* We will come back to here until there is if (!connectNoDelay(conn)) { pqDropConnection(conn, true); - conn->addr_cur = addr_cur->ai_next; + try_next_address(conn); continue; } } @@ -1749,7 +2175,7 @@ keep_going: /* We will come back to here until there is libpq_gettext("could not set socket to nonblocking mode: %s\n"), SOCK_STRERROR(SOCK_ERRNO, sebuf, sizeof(sebuf))); pqDropConnection(conn, true); - conn->addr_cur = addr_cur->ai_next; + try_next_address(conn); continue; } @@ -1760,7 +2186,7 @@ keep_going: /* We will come back to here until there is libpq_gettext("could not set socket to close-on-exec mode: %s\n"), SOCK_STRERROR(SOCK_ERRNO, sebuf, sizeof(sebuf))); pqDropConnection(conn, true); - conn->addr_cur = addr_cur->ai_next; + try_next_address(conn); continue; } #endif /* F_SETFD */ @@ -1807,7 +2233,7 @@ keep_going: /* We will come back to here until there is if (err) { pqDropConnection(conn, true); - conn->addr_cur = addr_cur->ai_next; + try_next_address(conn); continue; } } @@ -1883,6 +2309,10 @@ keep_going: /* We will come back to here until there is * go do the next stuff. */ conn->status = CONNECTION_STARTED; + + /* + * Save the name of the current host + */ goto keep_going; } @@ -1898,7 +2328,7 @@ keep_going: /* We will come back to here until there is /* * Try the next address, if any. */ - conn->addr_cur = addr_cur->ai_next; + try_next_address(conn); } /* loop over addresses */ /* @@ -1944,9 +2374,8 @@ keep_going: /* We will come back to here until there is * If more addresses remain, keep trying, just as in the * case where connect() returned failure immediately. */ - if (conn->addr_cur->ai_next != NULL) + if (try_next_address(conn)) { - conn->addr_cur = conn->addr_cur->ai_next; conn->status = CONNECTION_NEEDED; goto keep_going; } @@ -2596,13 +3025,22 @@ keep_going: /* We will come back to here until there is conn->errorMessage.data[conn->errorMessage.len - 1] != '\n') appendPQExpBufferChar(&conn->errorMessage, '\n'); PQclear(res); + + /* + * If we have more than one host in the connect string, + * fatal message from one of them is not really fatal + */ + if (try_next_address(conn)) + { + /* Must drop the old connection */ + pqDropConnection(conn, true); + conn->status = CONNECTION_NEEDED; + goto keep_going; + } + goto error_return; } - /* We can release the address list now. */ - pg_freeaddrinfo_all(conn->addrlist_family, conn->addrlist); - conn->addrlist = NULL; - conn->addr_cur = NULL; /* Fire up post-connection housekeeping if needed */ if (PG_PROTOCOL_MAJOR(conn->pversion) < 3) @@ -2614,8 +3052,8 @@ keep_going: /* We will come back to here until there is } /* Otherwise, we are open for business! */ - conn->status = CONNECTION_OK; - return PGRES_POLLING_OK; + conn->status = CONNECTION_CHECK_RO; + goto keep_going; } case CONNECTION_SETENV: @@ -2645,9 +3083,141 @@ keep_going: /* We will come back to here until there is goto error_return; } - /* We are open for business! */ + /* + * check if connection is readonly if we need readwrite one + */ + conn->status = CONNECTION_CHECK_RO; + goto keep_going; + + case CONNECTION_CHECK_RO: + + /* + * Connection to readonly host is allowed if + * taget_server_type is set to 'any' or is not exlicitely + */ + if (conn->pghost == NULL || !conn->target_server_type || + strcmp(conn->target_server_type, "any") == 0) + + { + /* + * We can release the address list now. but first make a copy + * of name of host we are connected to or it would be freed + * with list + */ + if (conn->actualhost) + { + conn->actualhost = strdup(conn->actualhost); + conn->actualport = get_port_from_addrinfo(conn->addr_cur); + if (!conn->actualhost || !conn->actualport) + { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("Out of memory")); + goto error_return; + } + } + + pg_freeaddrinfo_all(conn->addrlist_family, conn->addrlist); + conn->addrlist = NULL; + conn->addr_cur = NULL; + + conn->status = CONNECTION_OK; + return PGRES_POLLING_OK; + } + + /* Otherwise request result pg_is_in_recovery() */ + /* pretend that status is OK for time of sending query */ conn->status = CONNECTION_OK; - return PGRES_POLLING_OK; + PQsendQuery(conn, "SELECT pg_catalog.pg_is_in_recovery()"); + conn->status = CONNECTION_CHECK_RW; + return PGRES_POLLING_READING; + + case CONNECTION_CHECK_RW: + { + char *value; + PGresult *res; + + conn->status = CONNECTION_OK; + if (!PQconsumeInput(conn)) + { + conn->status = CONNECTION_BAD; + return PGRES_POLLING_FAILED; + } + + if (PQisBusy(conn)) + { + /* Result is not ready yet */ + conn->status = CONNECTION_CHECK_RW; + return PGRES_POLLING_READING; + } + + res = PQgetResult(conn); + + /* + * Call PQgetResult second time to clear connection state. + * Should return NULL, so result is ignored + */ + PQgetResult(conn); + if (!res || PQresultStatus(res) != PGRES_TUPLES_OK || + PQntuples(res) != 1) + { + /* + * Something wrong happened with this host. skip to next + * one + */ + conn->status = CONNECTION_NEEDED; + } + else + { + value = PQgetvalue(res, 0, 0); + if (value[0] == 't') + { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("cannot make RW connection to hot " + "standby node %s"), conn->actualhost); + conn->status = CONNECTION_NEEDED; + } + } + + if (res) + PQclear(res); + if (conn->status != CONNECTION_OK) + { + ConnStatusType save_status = conn->status; + + conn->status = CONNECTION_OK; + pqTerminateConn(conn); + pqDropConnection(conn, true); + conn->sock = PGINVALID_SOCKET; + if (try_next_address(conn)) + { + conn->status = save_status; + goto keep_going; + } + else + { + conn->status = CONNECTION_BAD; + return PGRES_POLLING_FAILED; + } + } + + /* We can release the address list now. */ + if (conn->actualhost) + { + conn->actualhost = strdup(conn->actualhost); + conn->actualport = get_port_from_addrinfo(conn->addr_cur); + if (!conn->actualhost || !conn->actualport) + { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("Out of memory")); + goto error_return; + } + } + + pg_freeaddrinfo_all(conn->addrlist_family, conn->addrlist); + conn->addrlist = NULL; + conn->addr_cur = NULL; + return PGRES_POLLING_OK; + } default: appendPQExpBuffer(&conn->errorMessage, @@ -2938,19 +3508,16 @@ freePGconn(PGconn *conn) } /* - * closePGconn - * - properly close a connection to the backend + * pqTerminateConn + * + * - send terminate message to the backend, but do not free any + * transient state of PGconn object, which can be needed to reconnect * - * This should reset or release all transient state, but NOT the connection - * parameters. On exit, the PGconn should be in condition to start a fresh - * connection with the same parameters (see PQreset()). */ + static void -closePGconn(PGconn *conn) +pqTerminateConn(PGconn *conn) { - PGnotify *notify; - pgParameterStatus *pstatus; - /* * Note that the protocol doesn't allow us to send Terminate messages * during the startup phase. @@ -2966,6 +3533,26 @@ closePGconn(PGconn *conn) (void) pqFlush(conn); } + +} + +/* + * closePGconn + * - properly close a connection to the backend + * + * This should reset or release all transient state, but NOT the connection + * parameters. On exit, the PGconn should be in condition to start a fresh + * connection with the same parameters (see PQreset()). + */ +static void +closePGconn(PGconn *conn) +{ + PGnotify *notify; + pgParameterStatus *pstatus; + + /* Send terminate request to backend */ + pqTerminateConn(conn); + /* * Must reset the blocking status so a possible reconnect will work. * @@ -2978,11 +3565,29 @@ closePGconn(PGconn *conn) * Close the connection, reset all transient state, flush I/O buffers. */ pqDropConnection(conn, true); + conn->status = CONNECTION_BAD; /* Well, not really _bad_ - just * absent */ conn->asyncStatus = PGASYNC_IDLE; pqClearAsyncResult(conn); /* deallocate result */ resetPQExpBuffer(&conn->errorMessage); + + /* + * If addrlist is not freed, actualhost points in there. Otherwice it is + * allocated and should be freed + */ + if (conn->addrlist == NULL && conn->actualhost != NULL) + { + free(conn->actualhost); + conn->actualhost = NULL; + } + + if (conn->actualport) + { + free(conn->actualport); + conn->actualport = NULL; + } + pg_freeaddrinfo_all(conn->addrlist_family, conn->addrlist); conn->addrlist = NULL; conn->addr_cur = NULL; @@ -3969,6 +4574,8 @@ parseServiceFile(const char *serviceFile, { int linenr = 0, i; + bool hostflag = false; /* true if we already have seen 'host' + * parameter in the service file, and */ FILE *f; char buf[MAXBUFSIZE], *line; @@ -4088,7 +4695,38 @@ parseServiceFile(const char *serviceFile, if (strcmp(options[i].keyword, key) == 0) { if (options[i].val == NULL) + { options[i].val = strdup(val); + + /* + * Set flag that we get value of host option from + * this service file, so subsequent host lines + * should be appended to it, not ignored + */ + if (strcmp(key, "host") == 0) + hostflag = true; + } + else if (strcmp(key, "host") == 0 && hostflag) + { + /* + * Old host value is from same service file, so + * append new one to it + */ + char *old = options[i].val; + int oldlen = strlen(old); + + options[i].val = malloc(oldlen + 1 + strlen(val) + 1); + + if (options[i].val) + { + strncpy(options[i].val, old, oldlen); + options[i].val[oldlen] = ','; + strcpy(options[i].val + oldlen + 1, val); + } + + free(old); + } + if (!options[i].val) { printfPQExpBuffer(errorMessage, @@ -4809,86 +5447,127 @@ conninfo_uri_parse_options(PQconninfoOption *options, const char *uri, p = start; } - /* - * "p" has been incremented past optional URI credential information at - * this point and now points at the "netloc" part of the URI. - * - * Look for IPv6 address. - */ - if (*p == '[') + host = p; + if (*p == ':') { - host = ++p; - while (*p && *p != ']') - ++p; - if (!*p) + int portnum; + char *portstr; + + *(p++) = '\0'; + portstr = p; + portnum = 0; + while (*p >= '0' && *p <= '9') { - printfPQExpBuffer(errorMessage, - libpq_gettext("end of string reached when looking for matching \"]\" in IPv6 host address in URI: \"%s\"\n"), - uri); - goto cleanup; + portnum = portnum * 10 + (*(p++) - '0'); } - if (p == host) + if (portnum > 65535 || portnum < 1) { printfPQExpBuffer(errorMessage, - libpq_gettext("IPv6 host address may not be empty in URI: \"%s\"\n"), - uri); + libpq_gettext("invalid port number: \"%d\"\n"), + portnum); goto cleanup; } - - /* Cut off the bracket and advance */ - *(p++) = '\0'; - - /* - * The address may be followed by a port specifier or a slash or a - * query. - */ - if (*p && *p != ':' && *p != '/' && *p != '?') + prevchar = *p; + *p = '\0'; + if (*portstr && + !conninfo_storeval(options, "port", portstr, + errorMessage, false, true)) { - printfPQExpBuffer(errorMessage, - libpq_gettext("unexpected character \"%c\" at position %d in URI (expected \":\" or \"/\"): \"%s\"\n"), - *p, (int) (p - buf + 1), uri); goto cleanup; } } else { - /* not an IPv6 address: DNS-named or IPv4 netloc */ - host = p; + do + { + if (*p == ',') + p++; - /* - * Look for port specifier (colon) or end of host specifier (slash), - * or query (question mark). - */ - while (*p && *p != ':' && *p != '/' && *p != '?') - ++p; - } + /* + * "p" has been incremented past optional URI credential + * information at this point and now points at the "netloc" part + * of the URI. + * + * Look for IPv6 address. + */ + if (*p == '[') + { + char *ipv6start = ++p; - /* Save the hostname terminator before we null it */ - prevchar = *p; - *p = '\0'; + while (*p && *p != ']') + ++p; + if (!*p) + { + printfPQExpBuffer(errorMessage, + libpq_gettext("end of string reached when looking for matching \"]\" in IPv6 host address in URI: \"%s\"\n"), + uri); + goto cleanup; + } + if (p == ipv6start) + { + printfPQExpBuffer(errorMessage, + libpq_gettext("IPv6 host address may not be empty in URI: \"%s\"\n"), + uri); + goto cleanup; + } - if (*host && - !conninfo_storeval(options, "host", host, - errorMessage, false, true)) - goto cleanup; + p++; + /* + * The address may be followed by a port specifier, a comma or + * a slash or a query. + */ + if (*p && *p != ',' && *p != ':' && *p != '/' && *p != '?') + { + printfPQExpBuffer(errorMessage, + libpq_gettext("unexpected character \"%c\" at position %d in URI (expected \":\" or \"/\"): \"%s\"\n"), + *p, (int) (p - buf + 1), uri); + goto cleanup; + } - if (prevchar == ':') - { - const char *port = ++p; /* advance past host terminator */ + } + else + { + /* not an IPv6 address: DNS-named or IPv4 netloc */ - while (*p && *p != '/' && *p != '?') - ++p; + /* + * Look for port specifier (colon) or end of host specifier + * (slash), or query (question mark). + */ + while (*p && *p != ',' && *p != ':' && *p != '/' && *p != '?') + ++p; + } + /* Skip port specifier */ + if (*p == ':') + { + int portnum; + + p++; + portnum = 0; + while (*p >= '0' && *p <= '9') + { + portnum = portnum * 10 + (*(p++) - '0'); + } + if (portnum > 65535 || portnum < 1) + { + printfPQExpBuffer(errorMessage, + libpq_gettext("invalid port number: \"%d\"\n"), + portnum); + goto cleanup; + } + } + } while (*p == ','); + /* Save the hostname terminator before we null it */ prevchar = *p; *p = '\0'; - if (*port && - !conninfo_storeval(options, "port", port, + if (*host && + !conninfo_storeval(options, "host", host, errorMessage, false, true)) goto cleanup; - } + } if (prevchar && prevchar != '?') { const char *dbname = ++p; /* advance past host terminator */ @@ -5018,6 +5697,21 @@ conninfo_uri_parse_params(char *params, keyword = "sslmode"; value = "require"; } + if ((strcmp(keyword, "loadBalanceHosts") == 0 || + strcmp(keyword, "load_balance_hosts") == 0) && + strcmp(value, "true") == 0) + { + free(keyword); + free(value); + malloced = false; + keyword = "hostorder"; + value = "random"; + } + if (strcmp(keyword, "targetServerType") == 0) + { + free(keyword); + keyword = strdup("target_server_type"); + } /* * Store the value if the corresponding option exists; ignore @@ -5232,7 +5926,21 @@ conninfo_storeval(PQconninfoOption *connOptions, } if (option->val) + { + if (strcmp(option->keyword, "host") == 0) + { + /* Accumulate multiple hosts in the single string */ + int val_len = strlen(option->val), + new_len = strlen(value); + + free(value_copy); + value_copy = malloc(val_len + 1 + new_len + 1); + strncpy(value_copy, option->val, val_len + 1); + value_copy[val_len] = ','; + strncpy(value_copy + val_len + 1, value, new_len + 1); + } free(option->val); + } option->val = value_copy; return option; @@ -5352,7 +6060,9 @@ PQhost(const PGconn *conn) { if (!conn) return NULL; - if (conn->pghost != NULL && conn->pghost[0] != '\0') + if (conn->actualhost != NULL && conn->actualhost[0] != '\0') + return conn->actualhost; + else if (conn->pghost != NULL && conn->pghost[0] != '\0') return conn->pghost; else { @@ -5372,6 +6082,8 @@ PQport(const PGconn *conn) { if (!conn) return NULL; + if (conn->actualport) + return conn->actualport; return conn->pgport; } diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index 9ca0756..23560f4 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -62,7 +62,11 @@ typedef enum * backend startup. */ CONNECTION_SETENV, /* Negotiating environment. */ CONNECTION_SSL_STARTUP, /* Negotiating SSL. */ - CONNECTION_NEEDED /* Internal state: connect() needed */ + CONNECTION_NEEDED, /* Internal state: connect() needed */ + CONNECTION_CHECK_RO, /* Internal state: need to check is RO + * connection acceptable */ + CONNECTION_CHECK_RW, /* Internal state: waiting that server replies + * if it is in recovery */ } ConnStatusType; typedef enum diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index be6c370..d268453 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -334,7 +334,12 @@ struct pg_conn #if defined(ENABLE_GSS) || defined(ENABLE_SSPI) char *krbsrvname; /* Kerberos service name */ #endif - + char *hostorder; /* How to handle multiple hosts */ + char *target_server_type; /* If "any" could work with readonly + * standby server. Otherwise should be + * "master" */ + char *failover_timeout; /* If no usable server found, how long + * to wait before retry */ /* Optional file to write trace info to */ FILE *Pfdebug; @@ -382,6 +387,11 @@ struct pg_conn struct addrinfo *addrlist; /* list of possible backend addresses */ struct addrinfo *addr_cur; /* the one currently being tried */ int addrlist_family; /* needed to know how to free addrlist */ + time_t failover_finish_time; /* how long to retry host list waiting + * for new master to appear */ + char *actualhost; /* Name of host we are actually connected to + * (if there is list in the pghost) */ + char *actualport; /* Port number we are actually connected to */ PGSetenvStatusType setenv_state; /* for 2.0 protocol only */ const PQEnvironmentOption *next_eo; bool send_appname; /* okay to send application_name? */ @@ -467,6 +477,7 @@ struct pg_conn /* Buffer for receiving various parts of messages */ PQExpBufferData workBuffer; /* expansible string */ + }; /* PGcancel stores all data necessary to cancel a connection. A copy of this diff --git a/src/interfaces/libpq/test/expected.out b/src/interfaces/libpq/test/expected.out index d375e82..4832bdd 100644 --- a/src/interfaces/libpq/test/expected.out +++ b/src/interfaces/libpq/test/expected.out @@ -1,20 +1,20 @@ trying postgresql://uri-user:secret@host:12345/db -user='uri-user' password='secret' dbname='db' host='host' port='12345' (inet) +user='uri-user' password='secret' dbname='db' host='host:12345' (inet) trying postgresql://uri-user@host:12345/db -user='uri-user' dbname='db' host='host' port='12345' (inet) +user='uri-user' dbname='db' host='host:12345' (inet) trying postgresql://uri-user@host/db user='uri-user' dbname='db' host='host' (inet) trying postgresql://host:12345/db -dbname='db' host='host' port='12345' (inet) +dbname='db' host='host:12345' (inet) trying postgresql://host/db dbname='db' host='host' (inet) trying postgresql://uri-user@host:12345/ -user='uri-user' host='host' port='12345' (inet) +user='uri-user' host='host:12345' (inet) trying postgresql://uri-user@host/ user='uri-user' host='host' (inet) @@ -23,10 +23,10 @@ trying postgresql://uri-user@ user='uri-user' (local) trying postgresql://host:12345/ -host='host' port='12345' (inet) +host='host:12345' (inet) trying postgresql://host:12345 -host='host' port='12345' (inet) +host='host:12345' (inet) trying postgresql://host/db dbname='db' host='host' (inet) @@ -62,7 +62,7 @@ trying postgresql://host/db?u%7aer=someotheruser&port=12345 uri-regress: invalid URI query parameter: "uzer" trying postgresql://host:12345?user=uri-user -user='uri-user' host='host' port='12345' (inet) +user='uri-user' host='host:12345' (inet) trying postgresql://host?user=uri-user user='uri-user' host='host' (inet) @@ -71,19 +71,19 @@ trying postgresql://host? host='host' (inet) trying postgresql://[::1]:12345/db -dbname='db' host='::1' port='12345' (inet) +dbname='db' host='[::1]:12345' (inet) trying postgresql://[::1]/db -dbname='db' host='::1' (inet) +dbname='db' host='[::1]' (inet) trying postgresql://[2001:db8::1234]/ -host='2001:db8::1234' (inet) +host='[2001:db8::1234]' (inet) trying postgresql://[200z:db8::1234]/ -host='200z:db8::1234' (inet) +host='[200z:db8::1234]' (inet) trying postgresql://[::1] -host='::1' (inet) +host='[::1]' (inet) trying postgres:// (local) @@ -143,7 +143,7 @@ trying postgres://@host host='host' (inet) trying postgres://host:/ -host='host' (inet) +uri-regress: invalid port number: "0" trying postgres://:12345/ port='12345' (local) diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm index 535d6c0..5472818 100644 --- a/src/test/perl/PostgresNode.pm +++ b/src/test/perl/PostgresNode.pm @@ -398,6 +398,7 @@ sub init unless defined $params{hba_permit_replication}; $params{allows_streaming} = 0 unless defined $params{allows_streaming}; $params{has_archiving} = 0 unless defined $params{has_archiving}; + $params{use_tcp} = 0 unless defined $params{use_tcp}; mkdir $self->backup_dir; mkdir $self->archive_dir; @@ -432,7 +433,10 @@ sub init else { print $conf "unix_socket_directories = '$host'\n"; - print $conf "listen_addresses = ''\n"; + if ($params{use_tcp}) + print $conf "listen_addresses = '$test_localhost'\n"; + else + print $conf "listen_addresses = ''\n"; } close $conf;