From efae01bbbf8f26ad09273562261d93bd57f485fe Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Wed, 10 Jan 2024 05:53:56 +0000 Subject: [PATCH v4 3/3] Address some comments proposed on -hackers This patch contains below changes. * Add --port option * Restrict --subscriber-conninfo not to specify external server * Allow to specify publication/subscription options * Remove unecessary Assert * Save logs in Standby logfile --- src/bin/pg_basebackup/pg_subscriber.c | 110 +++++++++++++++++++++++--- 1 file changed, 98 insertions(+), 12 deletions(-) diff --git a/src/bin/pg_basebackup/pg_subscriber.c b/src/bin/pg_basebackup/pg_subscriber.c index c2d17dcda3..d502230b28 100644 --- a/src/bin/pg_basebackup/pg_subscriber.c +++ b/src/bin/pg_basebackup/pg_subscriber.c @@ -27,6 +27,7 @@ #include "fe_utils/recovery_gen.h" #include "fe_utils/simple_list.h" #include "getopt_long.h" +#include "libpq/pqcomm.h" #include "utils/pidfile.h" typedef struct LogicalRepInfo @@ -52,7 +53,7 @@ static char *get_base_conninfo(char *conninfo, char *dbname, const char *noderole); static bool get_exec_path(const char *path); static bool check_data_directory(const char *datadir); -static char *concat_conninfo_dbname(const char *conninfo, const char *dbname); +static char *concat_conninfo(const char *conninfo, const char *dbname, unsigned short port); static LogicalRepInfo *store_pub_sub_info(const char *pub_base_conninfo, const char *sub_base_conninfo); static PGconn *connect_database(const char *conninfo); static void disconnect_database(PGconn *conn); @@ -74,6 +75,7 @@ static void enable_subscription(PGconn *conn, LogicalRepInfo *dbinfo); #define USEC_PER_SEC 1000000 #define WAIT_INTERVAL 1 /* 1 second */ +#define DEF_PGSPORT 50111 /* Options */ static const char *progname; @@ -95,6 +97,11 @@ static int num_dbs = 0; static char temp_replslot[NAMEDATALEN] = {0}; static bool made_transient_replslot = false; +static unsigned short subport; + +static char *pubopts; +static char *subopts; + enum WaitPMResult { POSTMASTER_READY, @@ -120,6 +127,9 @@ cleanup_objects_atexit(void) if (success) return; + if (!dbinfo) + return; + for (i = 0; i < num_dbs; i++) { if (dbinfo[i].made_subscription) @@ -149,7 +159,8 @@ cleanup_objects_atexit(void) if (made_transient_replslot) { conn = connect_database(dbinfo[0].pubconninfo); - drop_replication_slot(conn, &dbinfo[0], temp_replslot); + if (conn != NULL) + drop_replication_slot(conn, &dbinfo[0], temp_replslot); disconnect_database(conn); } } @@ -214,6 +225,26 @@ get_base_conninfo(char *conninfo, char *dbname, const char *noderole) continue; } + /* + * If the dbname is NULL (this means the conninfo is for the + * subscriber), we also check that the connection string does not + * specify the non-local server. + */ + if (!dbname && + (strcmp(conn_opt->keyword, "host") == 0 || + strcmp(conn_opt->keyword, "hostaddr") == 0) && + conn_opt->val != NULL) + { + const char *value = conn_opt->val; + + if (value && strlen(value) > 0 && + /* check for 'local' host values */ + (strcmp(value, "localhost") != 0 && strcmp(value, "127.0.0.1") != 0 && + strcmp(value, "::1") != 0 && !is_unixsock_path(value))) + pg_fatal("--subscriber-conninfo must not be non-local connection: %s", + value); + } + if (conn_opt->val != NULL && conn_opt->val[0] != '\0') { if (i > 0) @@ -332,14 +363,14 @@ check_data_directory(const char *datadir) } /* - * Append database name into a base connection string. + * Append database name and/or port number into a base connection string. * * dbname is the only parameter that changes so it is not included in the base * connection string. This function concatenates dbname to build a "real" * connection string. */ static char * -concat_conninfo_dbname(const char *conninfo, const char *dbname) +concat_conninfo(const char *conninfo, const char *dbname, unsigned short port) { PQExpBuffer buf = createPQExpBuffer(); char *ret; @@ -349,6 +380,9 @@ concat_conninfo_dbname(const char *conninfo, const char *dbname) appendPQExpBufferStr(buf, conninfo); appendPQExpBuffer(buf, " dbname=%s", dbname); + if (port) + appendPQExpBuffer(buf, " port=%d", port); + ret = pg_strdup(buf->data); destroyPQExpBuffer(buf); @@ -372,7 +406,7 @@ store_pub_sub_info(const char *pub_base_conninfo, const char *sub_base_conninfo) char *conninfo; /* Publisher. */ - conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val); + conninfo = concat_conninfo(pub_base_conninfo, cell->val, 0); dbinfo[i].pubconninfo = conninfo; dbinfo[i].dbname = cell->val; dbinfo[i].made_replslot = false; @@ -381,7 +415,7 @@ store_pub_sub_info(const char *pub_base_conninfo, const char *sub_base_conninfo) /* other struct fields will be filled later. */ /* Subscriber. */ - conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val); + conninfo = concat_conninfo(sub_base_conninfo, cell->val, subport); dbinfo[i].subconninfo = conninfo; i++; @@ -689,8 +723,6 @@ drop_replication_slot(PGconn *conn, LogicalRepInfo *dbinfo, const char *slot_nam PQExpBuffer str = createPQExpBuffer(); PGresult *res; - Assert(conn != NULL); - pg_log_info("dropping the replication slot \"%s\" on database \"%s\"", slot_name, dbinfo->dbname); appendPQExpBuffer(str, "DROP_REPLICATION_SLOT \"%s\"", slot_name); @@ -872,6 +904,9 @@ create_publication(PGconn *conn, LogicalRepInfo *dbinfo) appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES", dbinfo->pubname); + if (pubopts) + appendPQExpBuffer(str, " WITH (%s)", pubopts); + pg_log_debug("command is: %s", str->data); if (!dry_run) @@ -948,9 +983,14 @@ create_subscription(PGconn *conn, LogicalRepInfo *dbinfo) appendPQExpBuffer(str, "CREATE SUBSCRIPTION %s CONNECTION '%s' PUBLICATION %s " - "WITH (create_slot = false, copy_data = false, enabled = false)", + "WITH (create_slot = false, copy_data = false, enabled = false", dbinfo->subname, dbinfo->pubconninfo, dbinfo->pubname); + if (subopts) + appendPQExpBuffer(str, ", %s)", subopts); + else + appendPQExpBufferStr(str, ")"); + pg_log_debug("command is: %s", str->data); if (!dry_run) @@ -1142,6 +1182,9 @@ main(int argc, char **argv) {"database", required_argument, NULL, 'd'}, {"dry-run", no_argument, NULL, 'n'}, {"verbose", no_argument, NULL, 'v'}, + {"port", required_argument, NULL, 'p'}, + {"pubopts", required_argument, NULL, 'o'}, + {"subopts", required_argument, NULL, 'O'}, {NULL, 0, NULL, 0} }; @@ -1168,11 +1211,17 @@ main(int argc, char **argv) int i; + char timebuf[128]; + struct timeval time; + time_t tt; + pg_logging_init(argv[0]); pg_logging_set_level(PG_LOG_WARNING); progname = get_progname(argv[0]); set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_subscriber")); + subport = getenv("PGSUBPORT") ? atoi(getenv("PGSUBPORT")) : DEF_PGSPORT; + if (argc > 1) { if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0) @@ -1204,7 +1253,7 @@ main(int argc, char **argv) } #endif - while ((c = getopt_long(argc, argv, "D:P:S:d:nv", + while ((c = getopt_long(argc, argv, "D:P:S:d:nvp:o:O:", long_options, &option_index)) != -1) { switch (c) @@ -1228,6 +1277,36 @@ main(int argc, char **argv) case 'v': pg_logging_increase_verbosity(); break; + case 'p': + if ((subport = atoi(optarg)) < 0) + pg_fatal("invalid port number"); + break; + case 'o': + /* append option? */ + if (!pubopts) + pubopts = pg_strdup(optarg); + else + { + char *old_pubopts = pubopts; + + pubopts = psprintf("%s %s", old_pubopts, optarg); + free(old_pubopts); + } + break; + + case 'O': + /* append option? */ + if (!subopts) + subopts = pg_strdup(optarg); + else + { + char *old_subopts = subopts; + + subopts = psprintf("%s %s", old_subopts, optarg); + free(old_subopts); + } + break; + default: /* getopt_long already emitted a complaint */ pg_log_error_hint("Try \"%s --help\" for more information.", progname); @@ -1418,9 +1497,16 @@ main(int argc, char **argv) /* * Start subscriber and wait until accepting connections. */ - pg_log_info("starting the subscriber"); + gettimeofday(&time, NULL); + tt = (time_t) time.tv_sec; + strftime(timebuf, sizeof(timebuf), "%Y%m%dT%H%M%S", localtime(&tt)); + /* append milliseconds */ + snprintf(timebuf + strlen(timebuf), sizeof(timebuf) - strlen(timebuf), + ".%03d", (int) (time.tv_usec / 1000)); - pg_ctl_cmd = psprintf("\"%s\" start -D \"%s\" -s", pg_ctl_path, subscriber_dir); + pg_log_info("starting the subscriber"); + pg_ctl_cmd = psprintf("\"%s\" start -D \"%s\" -s -o \"-p %d\" -l \"%sstandby_%s.log\"", + pg_ctl_path, subscriber_dir, subport, subscriber_dir, timebuf); rc = system(pg_ctl_cmd); pg_ctl_status(pg_ctl_cmd, rc, 1); -- 2.43.0