From 6afd73774079f4a14406cc025a2d25db959db68d Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Wed, 31 Jan 2024 07:39:35 +0000 Subject: [PATCH v13 3/5] Avoid to use replication connections --- doc/src/sgml/ref/pg_createsubscriber.sgml | 1 - src/bin/pg_basebackup/pg_createsubscriber.c | 29 +++++++++------------ 2 files changed, 12 insertions(+), 18 deletions(-) diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml index 1c78ff92e0..53b42e6161 100644 --- a/doc/src/sgml/ref/pg_createsubscriber.sgml +++ b/doc/src/sgml/ref/pg_createsubscriber.sgml @@ -61,7 +61,6 @@ PostgreSQL documentation The pg_createsubscriber should be run at the target server. The source server (known as publisher server) should accept logical replication connections from the target server (known as subscriber server). - The target server should accept local logical replication connection. diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c index 0c0f31d86d..9c16533458 100644 --- a/src/bin/pg_basebackup/pg_createsubscriber.c +++ b/src/bin/pg_basebackup/pg_createsubscriber.c @@ -397,12 +397,8 @@ connect_database(const char *conninfo) { PGconn *conn; PGresult *res; - const char *rconninfo; - /* logical replication mode */ - rconninfo = psprintf("%s replication=database", conninfo); - - conn = PQconnectdb(rconninfo); + conn = PQconnectdb(conninfo); if (PQstatus(conn) != CONNECTION_OK) { pg_log_error("connection to database failed: %s", PQerrorMessage(conn)); @@ -446,26 +442,26 @@ get_sysid_from_conn(const char *conninfo) if (conn == NULL) exit(1); - res = PQexec(conn, "IDENTIFY_SYSTEM"); + res = PQexec(conn, "SELECT * FROM pg_control_system();"); if (PQresultStatus(res) != PGRES_TUPLES_OK) { - pg_log_error("could not send replication command \"%s\": %s", + pg_log_error("could not send command \"%s\": %s", "IDENTIFY_SYSTEM", PQresultErrorMessage(res)); PQclear(res); disconnect_database(conn); exit(1); } - if (PQntuples(res) != 1 || PQnfields(res) < 3) + if (PQntuples(res) != 1 || PQnfields(res) < 4) { pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields", - PQntuples(res), PQnfields(res), 1, 3); + PQntuples(res), PQnfields(res), 1, 4); PQclear(res); disconnect_database(conn); exit(1); } - sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10); + sysid = strtou64(PQgetvalue(res, 0, 2), NULL, 10); pg_log_info("system identifier is %llu on publisher", (unsigned long long) sysid); @@ -477,7 +473,7 @@ get_sysid_from_conn(const char *conninfo) /* * Obtain the system identifier from control file. It will be used to compare * if a data directory is a clone of another one. This routine is used locally - * and avoids a replication connection. + * and avoids a connection establishment. */ static uint64 get_control_from_datadir(const char *datadir) @@ -905,10 +901,8 @@ create_logical_replication_slot(PGconn *conn, LogicalRepInfo *dbinfo, pg_log_info("creating the replication slot \"%s\" on database \"%s\"", slot_name, dbinfo->dbname); - appendPQExpBuffer(str, "CREATE_REPLICATION_SLOT \"%s\"", slot_name); - if (transient_replslot) - appendPQExpBufferStr(str, " TEMPORARY"); - appendPQExpBufferStr(str, " LOGICAL \"pgoutput\" NOEXPORT_SNAPSHOT"); + appendPQExpBuffer(str, "SELECT * FROM pg_create_logical_replication_slot('%s', 'pgoutput', %s, false, false);", + slot_name, transient_replslot ? "true" : "false"); pg_log_debug("command is: %s", str->data); @@ -948,14 +942,15 @@ drop_replication_slot(PGconn *conn, LogicalRepInfo *dbinfo, const char *slot_nam pg_log_info("dropping the replication slot \"%s\" on database \"%s\"", slot_name, dbinfo->dbname); - appendPQExpBuffer(str, "DROP_REPLICATION_SLOT \"%s\"", slot_name); + appendPQExpBuffer(str, "SELECT * FROM pg_drop_replication_slot('%s');", + slot_name); pg_log_debug("command is: %s", str->data); if (!dry_run) { res = PQexec(conn, str->data); - if (PQresultStatus(res) != PGRES_COMMAND_OK) + if (PQresultStatus(res) != PGRES_TUPLES_OK) pg_log_error("could not drop replication slot \"%s\" on database \"%s\": %s", slot_name, dbinfo->dbname, PQerrorMessage(conn)); -- 2.43.0