From 11c1303416d65499a17e2060280687d4020b4ba8 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Fri, 2 Feb 2024 08:27:13 +0000 Subject: [PATCH v15 2/6] Use replication connection when we connect to the primary --- src/bin/pg_basebackup/pg_createsubscriber.c | 46 +++++++++++++++------ 1 file changed, 33 insertions(+), 13 deletions(-) diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c index 28a82902b3..1fee8727ad 100644 --- a/src/bin/pg_basebackup/pg_createsubscriber.c +++ b/src/bin/pg_basebackup/pg_createsubscriber.c @@ -68,7 +68,7 @@ static bool get_exec_path(const char *path); static bool check_data_directory(const char *datadir); static char *concat_conninfo_dbname(const char *conninfo, const char *dbname); static LogicalRepInfo *store_pub_sub_info(SimpleStringList dbnames, const char *pub_base_conninfo, const char *sub_base_conninfo); -static PGconn *connect_database(const char *conninfo); +static PGconn *connect_database(const char *conninfo, bool replication_mode); static void disconnect_database(PGconn *conn); static uint64 get_primary_sysid(const char *conninfo); static uint64 get_standby_sysid(const char *datadir); @@ -118,6 +118,19 @@ enum WaitPMResult }; +static inline PGconn * +connect_primary(const char *conninfo) +{ + return connect_database(conninfo, true); +} + +static inline PGconn * +connect_standby(const char *conninfo) +{ + return connect_database(conninfo, false); +} + + /* * Cleanup objects that were created by pg_createsubscriber if there is an error. * @@ -138,7 +151,7 @@ cleanup_objects_atexit(void) { if (dbinfo[i].made_subscription) { - conn = connect_database(dbinfo[i].subconninfo); + conn = connect_standby(dbinfo[i].subconninfo); if (conn != NULL) { drop_subscription(conn, &dbinfo[i]); @@ -150,7 +163,7 @@ cleanup_objects_atexit(void) if (dbinfo[i].made_publication || dbinfo[i].made_replslot) { - conn = connect_database(dbinfo[i].pubconninfo); + conn = connect_primary(dbinfo[i].pubconninfo); if (conn != NULL) { if (dbinfo[i].made_publication) @@ -398,12 +411,17 @@ store_pub_sub_info(SimpleStringList dbnames, const char *pub_base_conninfo, cons } static PGconn * -connect_database(const char *conninfo) +connect_database(const char *conninfo, bool replication_mode) { PGconn *conn; PGresult *res; + char *rconninfo = NULL; - conn = PQconnectdb(conninfo); + /* logical replication mode */ + if (replication_mode) + rconninfo = psprintf("%s replication=database", conninfo); + + conn = PQconnectdb(rconninfo ? rconninfo : conninfo); if (PQstatus(conn) != CONNECTION_OK) { pg_log_error("connection to database failed: %s", PQerrorMessage(conn)); @@ -417,6 +435,8 @@ connect_database(const char *conninfo) pg_log_error("could not clear search_path: %s", PQresultErrorMessage(res)); return NULL; } + + pg_free(rconninfo); PQclear(res); return conn; @@ -443,7 +463,7 @@ get_primary_sysid(const char *conninfo) pg_log_info("getting system identifier from publisher"); - conn = connect_database(conninfo); + conn = connect_primary(conninfo); if (conn == NULL) exit(1); @@ -568,7 +588,7 @@ setup_publisher(LogicalRepInfo *dbinfo) char pubname[NAMEDATALEN]; char replslotname[NAMEDATALEN]; - conn = connect_database(dbinfo[i].pubconninfo); + conn = connect_primary(dbinfo[i].pubconninfo); if (conn == NULL) exit(1); @@ -659,7 +679,7 @@ check_publisher(LogicalRepInfo *dbinfo) * wal_level = logical max_replication_slots >= current + number of dbs to * be converted max_wal_senders >= current + number of dbs to be converted */ - conn = connect_database(dbinfo[0].pubconninfo); + conn = connect_primary(dbinfo[0].pubconninfo); if (conn == NULL) exit(1); @@ -768,7 +788,7 @@ check_subscriber(LogicalRepInfo *dbinfo) pg_log_info("checking settings on subscriber"); - conn = connect_database(dbinfo[0].subconninfo); + conn = connect_standby(dbinfo[0].subconninfo); if (conn == NULL) exit(1); @@ -879,7 +899,7 @@ setup_subscriber(LogicalRepInfo *dbinfo, const char *consistent_lsn) for (int i = 0; i < num_dbs; i++) { /* Connect to subscriber. */ - conn = connect_database(dbinfo[i].subconninfo); + conn = connect_standby(dbinfo[i].subconninfo); if (conn == NULL) exit(1); @@ -1110,7 +1130,7 @@ wait_for_end_recovery(const char *conninfo, CreateSubscriberOptions opt) pg_log_info("waiting the postmaster to reach the consistent state"); - conn = connect_database(conninfo); + conn = connect_standby(conninfo); if (conn == NULL) exit(1); @@ -1772,7 +1792,7 @@ main(int argc, char **argv) * consistent LSN but it should be changed after adding pg_basebackup * support. */ - conn = connect_database(dbinfo[0].pubconninfo); + conn = connect_primary(dbinfo[0].pubconninfo); if (conn == NULL) exit(1); consistent_lsn = create_logical_replication_slot(conn, &dbinfo[0], @@ -1837,7 +1857,7 @@ main(int argc, char **argv) */ if (primary_slot_name != NULL) { - conn = connect_database(dbinfo[0].pubconninfo); + conn = connect_primary(dbinfo[0].pubconninfo); if (conn != NULL) { drop_replication_slot(conn, &dbinfo[0], primary_slot_name); -- 2.43.0