From 4b8d8a7c043699bee95d313200949227843168fe Mon Sep 17 00:00:00 2001 From: Shlok Kyal Date: Tue, 6 Feb 2024 14:45:03 +0530 Subject: [PATCH v16 7/7] Remove -S option to force unix domain connection With this patch removed -S option and added option for username(-u), port(-p) and socket directory(-s) for standby. This helps to force standby to use unix domain connection. --- doc/src/sgml/ref/pg_createsubscriber.sgml | 49 +++-- src/bin/pg_basebackup/pg_createsubscriber.c | 184 ++++++++++-------- .../t/041_pg_createsubscriber_standby.pl | 12 +- 3 files changed, 146 insertions(+), 99 deletions(-) diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml index 2ff31628ce..3cf729627c 100644 --- a/doc/src/sgml/ref/pg_createsubscriber.sgml +++ b/doc/src/sgml/ref/pg_createsubscriber.sgml @@ -29,11 +29,6 @@ PostgreSQL documentation datadir - - - - - connstr @@ -77,16 +72,6 @@ PostgreSQL documentation - - - - - - The connection string to the subscriber. For details see . - - - - @@ -108,6 +93,17 @@ PostgreSQL documentation + + port + port + + + the subscriber's port number; + default port number is 50111. + + + + @@ -118,6 +114,17 @@ PostgreSQL documentation + + dir + dir + + + directory to use for postmaster sockets during upgrade; + default is current working directory. + + + + @@ -129,6 +136,16 @@ PostgreSQL documentation + + username + username + + + subscriber's install user name. + + + + @@ -288,7 +305,7 @@ PostgreSQL documentation To create a logical replica for databases hr and finance from a physical replica at foo: -$ pg_createsubscriber -D /usr/local/pgsql/data -S "host=localhost" -d hr -d finance +$ pg_createsubscriber -D /usr/local/pgsql/data -d hr -d finance diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c index 52b0a94fb5..a2b39d6aa1 100644 --- a/src/bin/pg_basebackup/pg_createsubscriber.c +++ b/src/bin/pg_basebackup/pg_createsubscriber.c @@ -38,7 +38,6 @@ typedef struct CreateSubscriberOptions { char *subscriber_dir; /* standby/subscriber data directory */ - char *sub_conninfo_str; /* subscriber connection string */ SimpleStringList database_names; /* list of database names */ bool retain; /* retain log file? */ int recovery_timeout; /* stop recovery after this time */ @@ -61,7 +60,6 @@ typedef struct LogicalRepInfo static void cleanup_objects_atexit(void); static void usage(); -static char *get_base_conninfo(char *conninfo, char *dbname); static bool get_exec_path(const char *path); static bool check_data_directory(const char *datadir); static char *get_primary_conninfo_from_target(const char *base_conninfo, @@ -81,7 +79,7 @@ static char *create_logical_replication_slot(PGconn *conn, LogicalRepInfo *dbinf char *slot_name); static void drop_replication_slot(PGconn *conn, LogicalRepInfo *dbinfo, const char *slot_name); static char *setup_server_logfile(const char *datadir); -static void start_standby_server(const char *pg_ctl_path, const char *datadir, const char *logfile); +static void start_standby_server(const char *pg_ctl_path, const char *datadir, const char *logfile, unsigned short subport, char *sockdir); static void stop_standby_server(const char *pg_ctl_path, const char *datadir); static void pg_ctl_status(const char *pg_ctl_cmd, int rc, int action); static void wait_for_end_recovery(const char *conninfo, CreateSubscriberOptions opt); @@ -91,9 +89,12 @@ static void create_subscription(PGconn *conn, LogicalRepInfo *dbinfo); static void drop_subscription(PGconn *conn, LogicalRepInfo *dbinfo); static void set_replication_progress(PGconn *conn, LogicalRepInfo *dbinfo, const char *lsn); static void enable_subscription(PGconn *conn, LogicalRepInfo *dbinfo); +static char *construct_sub_conninfo(char *username, unsigned short subport, char *sockdir); +static void update_sub_info(SimpleStringList dbnames, LogicalRepInfo *dbinfo, const char *sub_base_conninfo); #define USEC_PER_SEC 1000000 #define WAIT_INTERVAL 1 /* 1 second */ +#define DEF_PGSPORT 50111 /* Options */ static const char *progname; @@ -109,6 +110,8 @@ static char *pg_resetwal_path = NULL; static LogicalRepInfo *dbinfo; static int num_dbs = 0; +static bool is_standby_restarted = false; + enum WaitPMResult { POSTMASTER_READY, @@ -185,11 +188,13 @@ usage(void) printf(_(" %s [OPTION]...\n"), progname); printf(_("\nOptions:\n")); printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n")); - printf(_(" -S, --subscriber-server=CONNSTR subscriber connection string\n")); printf(_(" -d, --database=DBNAME database to create a subscription\n")); printf(_(" -n, --dry-run stop before modifying anything\n")); printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n")); printf(_(" -r, --retain retain log file after success\n")); + printf(_(" -p, --port=PORT subscriber port number (default port is %d.)\n"), DEF_PGSPORT); + printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n")); + printf(_(" -u, --username=NAME subscriber superuser\n")); printf(_(" -v, --verbose output verbose messages\n")); printf(_(" -V, --version output version information, then exit\n")); printf(_(" -?, --help show this help, then exit\n")); @@ -197,63 +202,6 @@ usage(void) printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL); } -/* - * Validate a connection string. Returns a base connection string that is a - * connection string without a database name. - * Since we might process multiple databases, each database name will be - * appended to this base connection string to provide a final connection string. - * If the second argument (dbname) is not null, returns dbname if the provided - * connection string contains it. If option --database is not provided, uses - * dbname as the only database to setup the logical replica. - * It is the caller's responsibility to free the returned connection string and - * dbname. - */ -static char * -get_base_conninfo(char *conninfo, char *dbname) -{ - PQExpBuffer buf = createPQExpBuffer(); - PQconninfoOption *conn_opts = NULL; - PQconninfoOption *conn_opt; - char *errmsg = NULL; - char *ret; - int i; - - pg_log_info("validating connection string on subscriber"); - - conn_opts = PQconninfoParse(conninfo, &errmsg); - if (conn_opts == NULL) - { - pg_log_error("could not parse connection string: %s", errmsg); - return NULL; - } - - i = 0; - for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++) - { - if (strcmp(conn_opt->keyword, "dbname") == 0 && conn_opt->val != NULL) - { - if (dbname) - dbname = pg_strdup(conn_opt->val); - continue; - } - - if (conn_opt->val != NULL && conn_opt->val[0] != '\0') - { - if (i > 0) - appendPQExpBufferChar(buf, ' '); - appendPQExpBuffer(buf, "%s=%s", conn_opt->keyword, conn_opt->val); - i++; - } - } - - ret = pg_strdup(buf->data); - - destroyPQExpBuffer(buf); - PQconninfoFree(conn_opts); - - return ret; -} - /* * Get the absolute path from other PostgreSQL binaries (pg_ctl and * pg_resetwal) that is used by it. @@ -409,6 +357,26 @@ store_pub_sub_info(SimpleStringList dbnames, const char *pub_base_conninfo, cons return dbinfo; } +/* + * Update connection info of subscriber + */ +static void +update_sub_info(SimpleStringList dbnames, LogicalRepInfo *dbinfo, const char *sub_base_conninfo) +{ + SimpleStringListCell *cell; + int i = 0; + + for (cell = dbnames.head; cell; cell = cell->next) + { + char *conninfo; + + conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val); + dbinfo[i].subconninfo = conninfo; + + i++; + } +} + static PGconn * connect_database(const char *conninfo, bool replication_mode) { @@ -1121,14 +1089,26 @@ setup_server_logfile(const char *datadir) } static void -start_standby_server(const char *pg_ctl_path, const char *datadir, const char *logfile) +start_standby_server(const char *pg_ctl_path, const char *datadir, const char *logfile, unsigned short subport, char *sockdir) { char *pg_ctl_cmd; int rc; - pg_ctl_cmd = psprintf("\"%s\" start -D \"%s\" -s -l \"%s\"", pg_ctl_path, datadir, logfile); + pg_ctl_cmd = psprintf("\"%s\" start -D \"%s\" -s -o \"-p %d\" -l \"%s\"", pg_ctl_path, datadir, subport, logfile); + +#if !defined(WIN32) + /* prevent TCP/IP connections, restrict socket access */ + pg_ctl_cmd = psprintf("%s -o \"-c listen_addresses='' -c unix_socket_permissions=0700\"", pg_ctl_cmd); + + /* Have a sockdir? Tell the postmaster. */ + if (sockdir) + pg_ctl_cmd = psprintf("%s -o \"-c unix_socket_directories=%s\"", pg_ctl_cmd, sockdir); +#endif + rc = system(pg_ctl_cmd); pg_ctl_status(pg_ctl_cmd, rc, 1); + + is_standby_restarted = true; } static void @@ -1564,6 +1544,30 @@ enable_subscription(PGconn *conn, LogicalRepInfo *dbinfo) destroyPQExpBuffer(str); } +static char * +construct_sub_conninfo(char *username, unsigned short subport, char *sockdir) +{ + PQExpBuffer buf = createPQExpBuffer(); + char *ret; + + if (username) + appendPQExpBuffer(buf, "user=%s ", username); + +#if !defined(WIN32) + if (is_standby_restarted && sockdir) + appendPQExpBuffer(buf, "host=%s ", sockdir); +#endif + + appendPQExpBuffer(buf, "port=%d fallback_application_name=%s", + subport, progname); + + ret = pg_strdup(buf->data); + + destroyPQExpBuffer(buf); + + return ret; +} + int main(int argc, char **argv) { @@ -1572,12 +1576,14 @@ main(int argc, char **argv) {"help", no_argument, NULL, '?'}, {"version", no_argument, NULL, 'V'}, {"pgdata", required_argument, NULL, 'D'}, - {"subscriber-server", required_argument, NULL, 'S'}, {"database", required_argument, NULL, 'd'}, {"dry-run", no_argument, NULL, 'n'}, {"recovery-timeout", required_argument, NULL, 't'}, {"retain", no_argument, NULL, 'r'}, {"verbose", no_argument, NULL, 'v'}, + {"username", required_argument, NULL, 'u'}, + {"port", required_argument, NULL, 'p'}, + {"socketdir", required_argument, NULL, 's'}, {NULL, 0, NULL, 0} }; @@ -1604,6 +1610,10 @@ main(int argc, char **argv) char pidfile[MAXPGPATH]; + unsigned short subport = DEF_PGSPORT; + char *username = NULL; + char *sockdir = NULL; + pg_logging_init(argv[0]); pg_logging_set_level(PG_LOG_WARNING); progname = get_progname(argv[0]); @@ -1628,7 +1638,6 @@ main(int argc, char **argv) /* Default settings */ opt.subscriber_dir = NULL; - opt.sub_conninfo_str = NULL; opt.database_names = (SimpleStringList) { NULL, NULL @@ -1652,7 +1661,7 @@ main(int argc, char **argv) get_restricted_token(); - while ((c = getopt_long(argc, argv, "D:S:d:nrt:v", + while ((c = getopt_long(argc, argv, "D:d:nrt:s:u:p:v", long_options, &option_index)) != -1) { switch (c) @@ -1660,9 +1669,6 @@ main(int argc, char **argv) case 'D': opt.subscriber_dir = pg_strdup(optarg); break; - case 'S': - opt.sub_conninfo_str = pg_strdup(optarg); - break; case 'd': /* Ignore duplicated database names. */ if (!simple_string_list_member(&opt.database_names, optarg)) @@ -1671,6 +1677,9 @@ main(int argc, char **argv) num_dbs++; } break; + case 's': + sockdir = pg_strdup(optarg); + break; case 'n': dry_run = true; break; @@ -1683,6 +1692,14 @@ main(int argc, char **argv) case 'v': pg_logging_increase_verbosity(); break; + case 'u': + pfree(username); + username = pg_strdup(optarg); + break; + case 'p': + if ((subport = atoi(optarg)) <= 0) + pg_fatal("invalid old port number"); + break; default: /* getopt_long already emitted a complaint */ pg_log_error_hint("Try \"%s --help\" for more information.", progname); @@ -1711,13 +1728,7 @@ main(int argc, char **argv) exit(1); } - if (opt.sub_conninfo_str == NULL) - { - pg_log_error("no subscriber connection string specified"); - pg_log_error_hint("Try \"%s --help\" for more information.", progname); - exit(1); - } - sub_base_conninfo = get_base_conninfo(opt.sub_conninfo_str, dbname_conninfo); + sub_base_conninfo = construct_sub_conninfo(username, subport, sockdir); if (sub_base_conninfo == NULL) exit(1); @@ -1746,6 +1757,16 @@ main(int argc, char **argv) } } + /* Set current dir as default socket dir */ + if (sockdir == NULL) + { + char cwd[MAXPGPATH]; + + if (!getcwd(cwd, MAXPGPATH)) + pg_fatal("could not determine current directory"); + sockdir = pg_strdup(cwd); + } + /* Obtain a connection string from the target */ pub_base_conninfo = get_primary_conninfo_from_target(sub_base_conninfo, @@ -1896,7 +1917,16 @@ main(int argc, char **argv) if (!dry_run) { pg_log_info("starting the subscriber"); - start_standby_server(pg_ctl_path, opt.subscriber_dir, server_start_log); + start_standby_server(pg_ctl_path, opt.subscriber_dir, server_start_log, subport, sockdir); + } + + /* + * Update subinfo after the server is restarted + */ + if (is_standby_restarted) + { + sub_base_conninfo = construct_sub_conninfo(username, subport, sockdir); + update_sub_info(opt.database_names, dbinfo, sub_base_conninfo); } /* diff --git a/src/bin/pg_basebackup/t/041_pg_createsubscriber_standby.pl b/src/bin/pg_basebackup/t/041_pg_createsubscriber_standby.pl index a6ba58879f..c77f5f9523 100644 --- a/src/bin/pg_basebackup/t/041_pg_createsubscriber_standby.pl +++ b/src/bin/pg_basebackup/t/041_pg_createsubscriber_standby.pl @@ -56,9 +56,9 @@ command_fails( [ 'pg_createsubscriber', '--verbose', '--pgdata', $node_f->data_dir, - '--subscriber-server', $node_f->connstr('pg1'), '--database', 'pg1', - '--database', 'pg2' + '--database', 'pg2', + '--port', $node_s->port ], 'target database is not a physical standby'); @@ -67,9 +67,9 @@ command_ok( [ 'pg_createsubscriber', '--verbose', '--dry-run', '--pgdata', $node_s->data_dir, - '--subscriber-server', $node_s->connstr('pg1'), '--database', 'pg1', - '--database', 'pg2' + '--database', 'pg2', + '--port', $node_s->port ], 'run pg_createsubscriber --dry-run on node S'); @@ -82,9 +82,9 @@ command_ok( [ 'pg_createsubscriber', '--verbose', '--pgdata', $node_s->data_dir, - '--subscriber-server', $node_s->connstr('pg1'), '--database', 'pg1', - '--database', 'pg2' + '--database', 'pg2', + '--port', $node_s->port ], 'run pg_createsubscriber on node S'); -- 2.34.1