From 327fa75f88f913b4731e73b066e1d30b9225ed44 Mon Sep 17 00:00:00 2001 From: Shlok Kyal Date: Mon, 22 Jan 2024 12:42:34 +0530 Subject: [PATCH v8 2/4] Address some comments proposed on -hackers The patch has following changes: * Some comments reported on the thread * Add a timeout option for the recovery option * Reject if the target server is not a standby * Reject when the --subscriber-conninfo specifies non-local server * Add -u and -p options * Check wal_level and max_replication_slot parameters --- doc/src/sgml/ref/pg_subscriber.sgml | 21 +- src/bin/pg_basebackup/pg_subscriber.c | 911 +++++++++++------- src/bin/pg_basebackup/t/040_pg_subscriber.pl | 9 +- .../t/041_pg_subscriber_standby.pl | 8 +- 4 files changed, 601 insertions(+), 348 deletions(-) diff --git a/doc/src/sgml/ref/pg_subscriber.sgml b/doc/src/sgml/ref/pg_subscriber.sgml index 553185c35f..eaabfc7053 100644 --- a/doc/src/sgml/ref/pg_subscriber.sgml +++ b/doc/src/sgml/ref/pg_subscriber.sgml @@ -16,12 +16,18 @@ PostgreSQL documentation pg_subscriber - create a new logical replica from a standby server + Convert a standby replica to a logical replica pg_subscriber + + datadir + + publisher-conninfo + + subscriber-conninfo option @@ -29,17 +35,18 @@ PostgreSQL documentation Description - pg_subscriber takes the publisher and subscriber - connection strings, a cluster directory from a standby server and a list of - database names and it sets up a new logical replica using the physical - recovery process. + pg_subscriber creates a new subscriber from a physical + standby server. This allows users to quickly set up logical replication + system. - The pg_subscriber should be run at the target + The pg_subscriber has to be run at the target server. The source server (known as publisher server) should accept logical replication connections from the target server (known as subscriber server). - The target server should accept local logical replication connection. + The target server should accept logical replication connection from + localhost. diff --git a/src/bin/pg_basebackup/pg_subscriber.c b/src/bin/pg_basebackup/pg_subscriber.c index e998c29f9e..3880d15ef9 100644 --- a/src/bin/pg_basebackup/pg_subscriber.c +++ b/src/bin/pg_basebackup/pg_subscriber.c @@ -1,12 +1,12 @@ /*------------------------------------------------------------------------- * * pg_subscriber.c - * Create a new logical replica from a standby server + * Convert a standby replica to a logical replica * * Copyright (C) 2024, PostgreSQL Global Development Group * * IDENTIFICATION - * src/bin/pg_subscriber/pg_subscriber.c + * src/bin/pg_basebackup/pg_subscriber.c * *------------------------------------------------------------------------- */ @@ -32,81 +32,122 @@ #define PGS_OUTPUT_DIR "pg_subscriber_output.d" -typedef struct LogicalRepInfo +typedef struct LogicalRepPerdbInfo { - Oid oid; /* database OID */ - char *dbname; /* database name */ - char *pubconninfo; /* publication connection string for logical - * replication */ - char *subconninfo; /* subscription connection string for logical - * replication */ - char *pubname; /* publication name */ - char *subname; /* subscription name (also replication slot - * name) */ - - bool made_replslot; /* replication slot was created */ - bool made_publication; /* publication was created */ - bool made_subscription; /* subscription was created */ -} LogicalRepInfo; + Oid oid; + char *dbname; + bool made_replslot; /* replication slot was created */ + bool made_publication; /* publication was created */ + bool made_subscription; /* subscription was created */ +} LogicalRepPerdbInfo; + +typedef struct +{ + LogicalRepPerdbInfo *perdb; /* array of db infos */ + int ndbs; /* number of db infos */ +} LogicalRepPerdbInfoArr; + +typedef struct PrimaryInfo +{ + char *base_conninfo; + uint64 sysid; +} PrimaryInfo; + +typedef struct StandbyInfo +{ + char *base_conninfo; + char *bindir; + char *pgdata; + char *primary_slot_name; + uint64 sysid; +} StandbyInfo; static void cleanup_objects_atexit(void); static void usage(); -static char *get_base_conninfo(char *conninfo, char *dbname, - const char *noderole); -static bool get_exec_path(const char *path); +static char *get_base_conninfo(char *conninfo, char *dbname); +static bool get_exec_base_path(const char *path); static bool check_data_directory(const char *datadir); +static void store_db_names(LogicalRepPerdbInfo **perdb, int ndbs); +static void get_sysid_for_primary(PrimaryInfo *primary, char *dbname); +static void get_control_for_standby(StandbyInfo *standby); static char *concat_conninfo_dbname(const char *conninfo, const char *dbname); -static LogicalRepInfo *store_pub_sub_info(const char *pub_base_conninfo, const char *sub_base_conninfo); -static PGconn *connect_database(const char *conninfo); +static PGconn *connect_database(const char *base_conninfo, const char*dbname); static void disconnect_database(PGconn *conn); -static uint64 get_sysid_from_conn(const char *conninfo); -static uint64 get_control_from_datadir(const char *datadir); -static void modify_sysid(const char *pg_resetwal_path, const char *datadir); -static char *use_primary_slot_name(void); -static bool create_all_logical_replication_slots(LogicalRepInfo *dbinfo); -static char *create_logical_replication_slot(PGconn *conn, LogicalRepInfo *dbinfo, - char *slot_name); -static void drop_replication_slot(PGconn *conn, LogicalRepInfo *dbinfo, const char *slot_name); +static char *use_primary_slot_name(PrimaryInfo *primary, StandbyInfo *standby, + LogicalRepPerdbInfo *perdb); +static bool create_all_logical_replication_slots(PrimaryInfo *primary, + LogicalRepPerdbInfoArr *dbarr); +static char *create_logical_replication_slot(PGconn *conn, bool temporary, + LogicalRepPerdbInfo *perdb); +static void modify_sysid(const char *bindir, const char *datadir); +static void drop_replication_slot(PGconn *conn, LogicalRepPerdbInfo *perdb, + const char *slot_name); static void pg_ctl_status(const char *pg_ctl_cmd, int rc, int action); -static void wait_for_end_recovery(const char *conninfo); -static void create_publication(PGconn *conn, LogicalRepInfo *dbinfo); -static void drop_publication(PGconn *conn, LogicalRepInfo *dbinfo); -static void create_subscription(PGconn *conn, LogicalRepInfo *dbinfo); -static void drop_subscription(PGconn *conn, LogicalRepInfo *dbinfo); -static void set_replication_progress(PGconn *conn, LogicalRepInfo *dbinfo, const char *lsn); -static void enable_subscription(PGconn *conn, LogicalRepInfo *dbinfo); +static void wait_for_end_recovery(const char *base_conninfo, + const char *dbname); +static void create_publication(PGconn *conn, PrimaryInfo *primary, + LogicalRepPerdbInfo *perdb); +static void drop_publication(PGconn *conn, LogicalRepPerdbInfo *perdb); +static void create_subscription(PGconn *conn, StandbyInfo *standby, + char *base_conninfo, + LogicalRepPerdbInfo *perdb); +static void drop_subscription(PGconn *conn, LogicalRepPerdbInfo *perdb); +static void set_replication_progress(PGconn *conn, LogicalRepPerdbInfo *perdb, const char *lsn); +static void enable_subscription(PGconn *conn, LogicalRepPerdbInfo *perdb); +static void start_standby_server(StandbyInfo *standby, unsigned short subport, + char *server_start_log); +static char *construct_sub_conninfo(char *username, unsigned short subport); #define USEC_PER_SEC 1000000 -#define WAIT_INTERVAL 1 /* 1 second */ +#define DEFAULT_WAIT 60 +#define WAITS_PER_SEC 10 /* should divide USEC_PER_SEC evenly */ +#define DEF_PGSPORT 50111 /* Options */ -static const char *progname; - -static char *subscriber_dir = NULL; static char *pub_conninfo_str = NULL; -static char *sub_conninfo_str = NULL; static SimpleStringList database_names = {NULL, NULL}; -static char *primary_slot_name = NULL; +static int wait_seconds = DEFAULT_WAIT; +static bool retain = false; static bool dry_run = false; static bool success = false; +static const char *progname; +static LogicalRepPerdbInfoArr dbarr; +static PrimaryInfo primary; +static StandbyInfo standby; -static char *pg_ctl_path = NULL; -static char *pg_resetwal_path = NULL; +enum PGSWaitPMResult +{ + PGS_POSTMASTER_READY, + PGS_POSTMASTER_STANDBY, + PGS_POSTMASTER_STILL_STARTING, + PGS_POSTMASTER_FAILED +}; -static LogicalRepInfo *dbinfo; -static int num_dbs = 0; -static char temp_replslot[NAMEDATALEN] = {0}; -static bool made_transient_replslot = false; +/* + * Build the replication slot and subscription name. The name must not exceed + * NAMEDATALEN - 1. This current schema uses a maximum of 36 characters + * (14 + 10 + 1 + 10 + '\0'). System identifier is included to reduce the + * probability of collision. By default, subscription name is used as + * replication slot name. + */ +static inline void +get_subscription_name(Oid oid, int pid, char *subname, Size szsub) +{ + snprintf(subname, szsub, "pg_subscriber_%u_%d", oid, pid); +} -enum WaitPMResult +/* + * Build the publication name. The name must not exceed NAMEDATALEN - + * 1. This current schema uses a maximum of 35 characters (14 + 10 + + * '\0'). + */ +static inline void +get_publication_name(Oid oid, char *pubname, Size szpub) { - POSTMASTER_READY, - POSTMASTER_STANDBY, - POSTMASTER_STILL_STARTING, - POSTMASTER_FAILED -}; + snprintf(pubname, szpub, "pg_subscriber_%u", oid); +} /* @@ -125,41 +166,39 @@ cleanup_objects_atexit(void) if (success) return; - for (i = 0; i < num_dbs; i++) + for (i = 0; i < dbarr.ndbs; i++) { - if (dbinfo[i].made_subscription) + LogicalRepPerdbInfo *perdb = &dbarr.perdb[i]; + + if (perdb->made_subscription) { - conn = connect_database(dbinfo[i].subconninfo); + conn = connect_database(standby.base_conninfo, perdb->dbname); if (conn != NULL) { - drop_subscription(conn, &dbinfo[i]); + drop_subscription(conn, perdb); disconnect_database(conn); } } - if (dbinfo[i].made_publication || dbinfo[i].made_replslot) + if (perdb->made_publication || perdb->made_replslot) { - conn = connect_database(dbinfo[i].pubconninfo); + conn = connect_database(primary.base_conninfo, perdb->dbname); if (conn != NULL) { - if (dbinfo[i].made_publication) - drop_publication(conn, &dbinfo[i]); - if (dbinfo[i].made_replslot) - drop_replication_slot(conn, &dbinfo[i], NULL); + if (perdb->made_publication) + drop_publication(conn, perdb); + if (perdb->made_replslot) + { + char replslotname[NAMEDATALEN]; + + get_subscription_name(perdb->oid, (int) getpid(), + replslotname, NAMEDATALEN); + drop_replication_slot(conn, perdb, replslotname); + } disconnect_database(conn); } } } - - if (made_transient_replslot) - { - conn = connect_database(dbinfo[0].pubconninfo); - if (conn != NULL) - { - drop_replication_slot(conn, &dbinfo[0], temp_replslot); - disconnect_database(conn); - } - } } static void @@ -184,17 +223,16 @@ usage(void) /* * Validate a connection string. Returns a base connection string that is a - * connection string without a database name plus a fallback application name. - * Since we might process multiple databases, each database name will be - * appended to this base connection string to provide a final connection string. - * If the second argument (dbname) is not null, returns dbname if the provided - * connection string contains it. If option --database is not provided, uses - * dbname as the only database to setup the logical replica. - * It is the caller's responsibility to free the returned connection string and - * dbname. + * connection string without a database name. Since we might process multiple + * databases, each database name will be appended to this base connection + * string to provide a final connection string. If the second argument (dbname) + * is not null, returns dbname if the provided connection string contains it. + * If option --database is not provided, uses dbname as the only database to + * setup the logical replica. It is the caller's responsibility to free the + * returned connection string and dbname. */ static char * -get_base_conninfo(char *conninfo, char *dbname, const char *noderole) +get_base_conninfo(char *conninfo, char *dbname) { PQExpBuffer buf = createPQExpBuffer(); PQconninfoOption *conn_opts = NULL; @@ -203,7 +241,7 @@ get_base_conninfo(char *conninfo, char *dbname, const char *noderole) char *ret; int i; - pg_log_info("validating connection string on %s", noderole); + pg_log_info("validating connection string on publisher"); conn_opts = PQconninfoParse(conninfo, &errmsg); if (conn_opts == NULL) @@ -231,10 +269,6 @@ get_base_conninfo(char *conninfo, char *dbname, const char *noderole) } } - if (i > 0) - appendPQExpBufferChar(buf, ' '); - appendPQExpBuffer(buf, "fallback_application_name=%s", progname); - ret = pg_strdup(buf->data); destroyPQExpBuffer(buf); @@ -244,15 +278,16 @@ get_base_conninfo(char *conninfo, char *dbname, const char *noderole) } /* - * Get the absolute path from other PostgreSQL binaries (pg_ctl and - * pg_resetwal) that is used by it. + * Get the absolute binary path from another PostgreSQL binary (pg_ctl) and set + * to StandbyInfo. */ static bool -get_exec_path(const char *path) +get_exec_base_path(const char *path) { int rc; + char pg_ctl_path[MAXPGPATH]; + char *p; - pg_ctl_path = pg_malloc(MAXPGPATH); rc = find_other_exec(path, "pg_ctl", "pg_ctl (PostgreSQL) " PG_VERSION "\n", pg_ctl_path); @@ -277,30 +312,12 @@ get_exec_path(const char *path) pg_log_debug("pg_ctl path is: %s", pg_ctl_path); - pg_resetwal_path = pg_malloc(MAXPGPATH); - rc = find_other_exec(path, "pg_resetwal", - "pg_resetwal (PostgreSQL) " PG_VERSION "\n", - pg_resetwal_path); - if (rc < 0) - { - char full_path[MAXPGPATH]; - - if (find_my_exec(path, full_path) < 0) - strlcpy(full_path, progname, sizeof(full_path)); - if (rc == -1) - pg_log_error("The program \"%s\" is needed by %s but was not found in the\n" - "same directory as \"%s\".\n" - "Check your installation.", - "pg_resetwal", progname, full_path); - else - pg_log_error("The program \"%s\" was found by \"%s\"\n" - "but was not the same version as %s.\n" - "Check your installation.", - "pg_resetwal", full_path, progname); - return false; - } + /* Extract the directory part from the path */ + p = strrchr(pg_ctl_path, 'p'); + Assert(p); - pg_log_debug("pg_resetwal path is: %s", pg_resetwal_path); + *p = '\0'; + standby.bindir = pg_strdup(pg_ctl_path); return true; } @@ -364,49 +381,36 @@ concat_conninfo_dbname(const char *conninfo, const char *dbname) } /* - * Store publication and subscription information. + * Initialize per-db structure and store the name of databases */ -static LogicalRepInfo * -store_pub_sub_info(const char *pub_base_conninfo, const char *sub_base_conninfo) +static void +store_db_names(LogicalRepPerdbInfo **perdb, int ndbs) { - LogicalRepInfo *dbinfo; SimpleStringListCell *cell; int i = 0; - dbinfo = (LogicalRepInfo *) pg_malloc(num_dbs * sizeof(LogicalRepInfo)); + *perdb = (LogicalRepPerdbInfo *) pg_malloc0(sizeof(LogicalRepPerdbInfo) * + ndbs); for (cell = database_names.head; cell; cell = cell->next) { - char *conninfo; - - /* Publisher. */ - conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val); - dbinfo[i].pubconninfo = conninfo; - dbinfo[i].dbname = cell->val; - dbinfo[i].made_replslot = false; - dbinfo[i].made_publication = false; - dbinfo[i].made_subscription = false; - /* other struct fields will be filled later. */ - - /* Subscriber. */ - conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val); - dbinfo[i].subconninfo = conninfo; - + (*perdb)[i].dbname = pg_strdup(cell->val); i++; } - - return dbinfo; } static PGconn * -connect_database(const char *conninfo) +connect_database(const char *base_conninfo, const char*dbname) { PGconn *conn; PGresult *res; - const char *rconninfo; + + char *rconninfo; + char *concat_conninfo = concat_conninfo_dbname(base_conninfo, + dbname); /* logical replication mode */ - rconninfo = psprintf("%s replication=database", conninfo); + rconninfo = psprintf("%s replication=database", concat_conninfo); conn = PQconnectdb(rconninfo); if (PQstatus(conn) != CONNECTION_OK) @@ -424,6 +428,9 @@ connect_database(const char *conninfo) } PQclear(res); + pfree(rconninfo); + pfree(concat_conninfo); + return conn; } @@ -436,19 +443,18 @@ disconnect_database(PGconn *conn) } /* - * Obtain the system identifier using the provided connection. It will be used - * to compare if a data directory is a clone of another one. + * Obtain the system identifier from the primary server. It will be used to + * compare if a data directory is a clone of another one. */ -static uint64 -get_sysid_from_conn(const char *conninfo) +static void +get_sysid_for_primary(PrimaryInfo *primary, char *dbname) { PGconn *conn; PGresult *res; - uint64 sysid; pg_log_info("getting system identifier from publisher"); - conn = connect_database(conninfo); + conn = connect_database(primary->base_conninfo, dbname); if (conn == NULL) exit(1); @@ -471,43 +477,39 @@ get_sysid_from_conn(const char *conninfo) exit(1); } - sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10); + primary->sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10); - pg_log_info("system identifier is %llu on publisher", (unsigned long long) sysid); + pg_log_info("system identifier is %llu on publisher", + (unsigned long long) primary->sysid); disconnect_database(conn); - - return sysid; } /* - * Obtain the system identifier from control file. It will be used to compare - * if a data directory is a clone of another one. This routine is used locally - * and avoids a replication connection. + * Obtain the system identifier from a standby server. It will be used to + * compare if a data directory is a clone of another one. This routine is used + * locally and avoids a replication connection. */ -static uint64 -get_control_from_datadir(const char *datadir) +static void +get_control_for_standby(StandbyInfo *standby) { ControlFileData *cf; bool crc_ok; - uint64 sysid; pg_log_info("getting system identifier from subscriber"); - cf = get_controlfile(datadir, &crc_ok); + cf = get_controlfile(standby->pgdata, &crc_ok); if (!crc_ok) { pg_log_error("control file appears to be corrupt"); exit(1); } - sysid = cf->system_identifier; + standby->sysid = cf->system_identifier; - pg_log_info("system identifier is %llu on subscriber", (unsigned long long) sysid); + pg_log_info("system identifier is %llu on subscriber", (unsigned long long) standby->sysid); pfree(cf); - - return sysid; } /* @@ -516,7 +518,7 @@ get_control_from_datadir(const char *datadir) * files from one of the systems might be used in the other one. */ static void -modify_sysid(const char *pg_resetwal_path, const char *datadir) +modify_sysid(const char *bindir, const char *datadir) { ControlFileData *cf; bool crc_ok; @@ -551,7 +553,7 @@ modify_sysid(const char *pg_resetwal_path, const char *datadir) pg_log_info("running pg_resetwal on the subscriber"); - cmd_str = psprintf("\"%s\" -D \"%s\"", pg_resetwal_path, datadir); + cmd_str = psprintf("\"%s/pg_resetwal\" -D \"%s\"", bindir, datadir); pg_log_debug("command is: %s", cmd_str); @@ -571,14 +573,15 @@ modify_sysid(const char *pg_resetwal_path, const char *datadir) * Return a palloc'd slot name if the replication is using one. */ static char * -use_primary_slot_name(void) +use_primary_slot_name(PrimaryInfo *primary, StandbyInfo *standby, + LogicalRepPerdbInfo *perdb) { PGconn *conn; PGresult *res; PQExpBuffer str = createPQExpBuffer(); char *slot_name; - conn = connect_database(dbinfo[0].subconninfo); + conn = connect_database(standby->base_conninfo, perdb->dbname); if (conn == NULL) exit(1); @@ -604,7 +607,7 @@ use_primary_slot_name(void) disconnect_database(conn); - conn = connect_database(dbinfo[0].pubconninfo); + conn = connect_database(primary->base_conninfo, perdb->dbname); if (conn == NULL) exit(1); @@ -634,17 +637,19 @@ use_primary_slot_name(void) } static bool -create_all_logical_replication_slots(LogicalRepInfo *dbinfo) +create_all_logical_replication_slots(PrimaryInfo *primary, + LogicalRepPerdbInfoArr *dbarr) { int i; - for (i = 0; i < num_dbs; i++) + for (i = 0; i < dbarr->ndbs; i++) { PGconn *conn; PGresult *res; char replslotname[NAMEDATALEN]; + LogicalRepPerdbInfo *perdb = &dbarr->perdb[i]; - conn = connect_database(dbinfo[i].pubconninfo); + conn = connect_database(primary->base_conninfo, perdb->dbname); if (conn == NULL) exit(1); @@ -664,27 +669,14 @@ create_all_logical_replication_slots(LogicalRepInfo *dbinfo) } /* Remember database OID. */ - dbinfo[i].oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10); + perdb->oid = strtoul(PQgetvalue(res, 0, 0), NULL, 10); PQclear(res); - /* - * Build the replication slot name. The name must not exceed - * NAMEDATALEN - 1. This current schema uses a maximum of 36 - * characters (14 + 10 + 1 + 10 + '\0'). System identifier is included - * to reduce the probability of collision. By default, subscription - * name is used as replication slot name. - */ - snprintf(replslotname, sizeof(replslotname), - "pg_subscriber_%u_%d", - dbinfo[i].oid, - (int) getpid()); - dbinfo[i].subname = pg_strdup(replslotname); + get_subscription_name(perdb->oid, (int) getpid(), replslotname, NAMEDATALEN); /* Create replication slot on publisher. */ - if (create_logical_replication_slot(conn, &dbinfo[i], replslotname) != NULL || dry_run) - pg_log_info("create replication slot \"%s\" on publisher", replslotname); - else + if (create_logical_replication_slot(conn, false, perdb) == NULL && !dry_run) return false; disconnect_database(conn); @@ -701,30 +693,36 @@ create_all_logical_replication_slots(LogicalRepInfo *dbinfo) * result set that contains the consistent LSN. */ static char * -create_logical_replication_slot(PGconn *conn, LogicalRepInfo *dbinfo, - char *slot_name) +create_logical_replication_slot(PGconn *conn, bool temporary, + LogicalRepPerdbInfo *perdb) { PQExpBuffer str = createPQExpBuffer(); PGresult *res = NULL; char *lsn = NULL; - bool transient_replslot = false; + char slot_name[NAMEDATALEN]; Assert(conn != NULL); /* - * If no slot name is informed, it is a transient replication slot used - * only for catch up purposes. + * Construct a name of logical replication slot. The formatting is + * different depends on its persistency. + * + * For persistent slots: the name must be same as the subscription. + * For temporary slots: OID is not needed, but another string is added. */ - if (slot_name[0] == '\0') - { + if (!temporary) + get_subscription_name(perdb->oid, (int) getpid(), slot_name, NAMEDATALEN); + else snprintf(slot_name, NAMEDATALEN, "pg_subscriber_%d_startpoint", (int) getpid()); - transient_replslot = true; - } - pg_log_info("creating the replication slot \"%s\" on database \"%s\"", slot_name, dbinfo->dbname); + pg_log_info("creating the replication slot \"%s\" on database \"%s\"", slot_name, perdb->dbname); appendPQExpBuffer(str, "CREATE_REPLICATION_SLOT \"%s\"", slot_name); + + if(temporary) + appendPQExpBufferStr(str, " TEMPORARY"); + appendPQExpBufferStr(str, " LOGICAL \"pgoutput\" NOEXPORT_SNAPSHOT"); pg_log_debug("command is: %s", str->data); @@ -734,17 +732,14 @@ create_logical_replication_slot(PGconn *conn, LogicalRepInfo *dbinfo, res = PQexec(conn, str->data); if (PQresultStatus(res) != PGRES_TUPLES_OK) { - pg_log_error("could not create replication slot \"%s\" on database \"%s\": %s", slot_name, dbinfo->dbname, - PQresultErrorMessage(res)); + pg_log_error("could not create replication slot \"%s\" on database \"%s\": %s", + slot_name, perdb->dbname, PQresultErrorMessage(res)); return lsn; } } /* for cleanup purposes */ - if (transient_replslot) - made_transient_replslot = true; - else - dbinfo->made_replslot = true; + perdb->made_replslot = true; if (!dry_run) { @@ -758,14 +753,15 @@ create_logical_replication_slot(PGconn *conn, LogicalRepInfo *dbinfo, } static void -drop_replication_slot(PGconn *conn, LogicalRepInfo *dbinfo, const char *slot_name) +drop_replication_slot(PGconn *conn, LogicalRepPerdbInfo *perdb, + const char *slot_name) { PQExpBuffer str = createPQExpBuffer(); PGresult *res; Assert(conn != NULL); - pg_log_info("dropping the replication slot \"%s\" on database \"%s\"", slot_name, dbinfo->dbname); + pg_log_info("dropping the replication slot \"%s\" on database \"%s\"", slot_name, perdb->dbname); appendPQExpBuffer(str, "DROP_REPLICATION_SLOT \"%s\"", slot_name); @@ -775,7 +771,7 @@ drop_replication_slot(PGconn *conn, LogicalRepInfo *dbinfo, const char *slot_nam { res = PQexec(conn, str->data); if (PQresultStatus(res) != PGRES_COMMAND_OK) - pg_log_error("could not drop replication slot \"%s\" on database \"%s\": %s", slot_name, dbinfo->dbname, + pg_log_error("could not drop replication slot \"%s\" on database \"%s\": %s", slot_name, perdb->dbname, PQerrorMessage(conn)); PQclear(res); @@ -825,19 +821,22 @@ pg_ctl_status(const char *pg_ctl_cmd, int rc, int action) * Returns after the server finishes the recovery process. */ static void -wait_for_end_recovery(const char *conninfo) +wait_for_end_recovery(const char *base_conninfo, const char *dbname) { PGconn *conn; PGresult *res; - int status = POSTMASTER_STILL_STARTING; + int status = PGS_POSTMASTER_STILL_STARTING; + int cnt; + int rc; + char *pg_ctl_cmd; pg_log_info("waiting the postmaster to reach the consistent state"); - conn = connect_database(conninfo); + conn = connect_database(base_conninfo, dbname); if (conn == NULL) exit(1); - for (;;) + for (cnt = 0; cnt < wait_seconds * WAITS_PER_SEC; cnt++) { bool in_recovery; @@ -865,17 +864,32 @@ wait_for_end_recovery(const char *conninfo) */ if (!in_recovery || dry_run) { - status = POSTMASTER_READY; + status = PGS_POSTMASTER_READY; break; } /* Keep waiting. */ - pg_usleep(WAIT_INTERVAL * USEC_PER_SEC); + pg_usleep(USEC_PER_SEC / WAITS_PER_SEC); } disconnect_database(conn); - if (status == POSTMASTER_STILL_STARTING) + /* + * If timeout is reached exit the pg_subscriber and stop the standby node. + */ + if (cnt >= wait_seconds * WAITS_PER_SEC) + { + pg_log_error("recovery timed out"); + + pg_ctl_cmd = psprintf("\"%s/pg_ctl\" stop -D \"%s\" -s", + standby.bindir, standby.pgdata); + rc = system(pg_ctl_cmd); + pg_ctl_status(pg_ctl_cmd, rc, 0); + + exit(1); + } + + if (status == PGS_POSTMASTER_STILL_STARTING) { pg_log_error("server did not end recovery"); exit(1); @@ -888,17 +902,21 @@ wait_for_end_recovery(const char *conninfo) * Create a publication that includes all tables in the database. */ static void -create_publication(PGconn *conn, LogicalRepInfo *dbinfo) +create_publication(PGconn *conn, PrimaryInfo *primary, + LogicalRepPerdbInfo *perdb) { PQExpBuffer str = createPQExpBuffer(); PGresult *res; + char pubname[NAMEDATALEN]; Assert(conn != NULL); + get_publication_name(perdb->oid, pubname, NAMEDATALEN); + /* Check if the publication needs to be created. */ appendPQExpBuffer(str, "SELECT puballtables FROM pg_catalog.pg_publication WHERE pubname = '%s'", - dbinfo->pubname); + pubname); res = PQexec(conn, str->data); if (PQresultStatus(res) != PGRES_TUPLES_OK) { @@ -918,7 +936,7 @@ create_publication(PGconn *conn, LogicalRepInfo *dbinfo) */ if (strcmp(PQgetvalue(res, 0, 0), "t") == 0) { - pg_log_info("publication \"%s\" already exists", dbinfo->pubname); + pg_log_info("publication \"%s\" already exists", pubname); return; } else @@ -931,7 +949,7 @@ create_publication(PGconn *conn, LogicalRepInfo *dbinfo) * database oid in which puballtables is false. */ pg_log_error("publication \"%s\" does not replicate changes for all tables", - dbinfo->pubname); + pubname); pg_log_error_hint("Consider renaming this publication."); PQclear(res); PQfinish(conn); @@ -942,9 +960,9 @@ create_publication(PGconn *conn, LogicalRepInfo *dbinfo) PQclear(res); resetPQExpBuffer(str); - pg_log_info("creating publication \"%s\" on database \"%s\"", dbinfo->pubname, dbinfo->dbname); + pg_log_info("creating publication \"%s\" on database \"%s\"", pubname, perdb->dbname); - appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES", dbinfo->pubname); + appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES", pubname); pg_log_debug("command is: %s", str->data); @@ -954,14 +972,14 @@ create_publication(PGconn *conn, LogicalRepInfo *dbinfo) if (PQresultStatus(res) != PGRES_COMMAND_OK) { pg_log_error("could not create publication \"%s\" on database \"%s\": %s", - dbinfo->pubname, dbinfo->dbname, PQerrorMessage(conn)); + pubname, perdb->dbname, PQerrorMessage(conn)); PQfinish(conn); exit(1); } } /* for cleanup purposes */ - dbinfo->made_publication = true; + perdb->made_publication = true; if (!dry_run) PQclear(res); @@ -973,16 +991,19 @@ create_publication(PGconn *conn, LogicalRepInfo *dbinfo) * Remove publication if it couldn't finish all steps. */ static void -drop_publication(PGconn *conn, LogicalRepInfo *dbinfo) +drop_publication(PGconn *conn, LogicalRepPerdbInfo *perdb) { PQExpBuffer str = createPQExpBuffer(); PGresult *res; + char pubname[NAMEDATALEN]; Assert(conn != NULL); - pg_log_info("dropping publication \"%s\" on database \"%s\"", dbinfo->pubname, dbinfo->dbname); + get_publication_name(perdb->oid, pubname, NAMEDATALEN); - appendPQExpBuffer(str, "DROP PUBLICATION %s", dbinfo->pubname); + pg_log_info("dropping publication \"%s\" on database \"%s\"", pubname, perdb->dbname); + + appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname); pg_log_debug("command is: %s", str->data); @@ -990,7 +1011,7 @@ drop_publication(PGconn *conn, LogicalRepInfo *dbinfo) { res = PQexec(conn, str->data); if (PQresultStatus(res) != PGRES_COMMAND_OK) - pg_log_error("could not drop publication \"%s\" on database \"%s\": %s", dbinfo->pubname, dbinfo->dbname, PQerrorMessage(conn)); + pg_log_error("could not drop publication \"%s\" on database \"%s\": %s", pubname, perdb->dbname, PQerrorMessage(conn)); PQclear(res); } @@ -1011,19 +1032,27 @@ drop_publication(PGconn *conn, LogicalRepInfo *dbinfo) * initial location. */ static void -create_subscription(PGconn *conn, LogicalRepInfo *dbinfo) +create_subscription(PGconn *conn, StandbyInfo *standby, char *base_conninfo, + LogicalRepPerdbInfo *perdb) { PQExpBuffer str = createPQExpBuffer(); PGresult *res; + char subname[NAMEDATALEN]; + char pubname[NAMEDATALEN]; Assert(conn != NULL); - pg_log_info("creating subscription \"%s\" on database \"%s\"", dbinfo->subname, dbinfo->dbname); + + get_subscription_name(perdb->oid, (int) getpid(), subname, NAMEDATALEN); + get_publication_name(perdb->oid, pubname, NAMEDATALEN); + + pg_log_info("creating subscription \"%s\" on database \"%s\"", subname, + perdb->dbname); appendPQExpBuffer(str, "CREATE SUBSCRIPTION %s CONNECTION '%s' PUBLICATION %s " "WITH (create_slot = false, copy_data = false, enabled = false)", - dbinfo->subname, dbinfo->pubconninfo, dbinfo->pubname); + subname, concat_conninfo_dbname(base_conninfo, perdb->dbname), pubname); pg_log_debug("command is: %s", str->data); @@ -1033,14 +1062,14 @@ create_subscription(PGconn *conn, LogicalRepInfo *dbinfo) if (PQresultStatus(res) != PGRES_COMMAND_OK) { pg_log_error("could not create subscription \"%s\" on database \"%s\": %s", - dbinfo->subname, dbinfo->dbname, PQerrorMessage(conn)); + subname, perdb->dbname, PQerrorMessage(conn)); PQfinish(conn); exit(1); } } /* for cleanup purposes */ - dbinfo->made_subscription = true; + perdb->made_subscription = true; if (!dry_run) PQclear(res); @@ -1052,16 +1081,19 @@ create_subscription(PGconn *conn, LogicalRepInfo *dbinfo) * Remove subscription if it couldn't finish all steps. */ static void -drop_subscription(PGconn *conn, LogicalRepInfo *dbinfo) +drop_subscription(PGconn *conn, LogicalRepPerdbInfo *perdb) { PQExpBuffer str = createPQExpBuffer(); PGresult *res; + char subname[NAMEDATALEN]; Assert(conn != NULL); - pg_log_info("dropping subscription \"%s\" on database \"%s\"", dbinfo->subname, dbinfo->dbname); + get_subscription_name(perdb->oid, (int) getpid(), subname, NAMEDATALEN); + + pg_log_info("dropping subscription \"%s\" on database \"%s\"", subname, perdb->dbname); - appendPQExpBuffer(str, "DROP SUBSCRIPTION %s", dbinfo->subname); + appendPQExpBuffer(str, "DROP SUBSCRIPTION %s", subname); pg_log_debug("command is: %s", str->data); @@ -1069,7 +1101,7 @@ drop_subscription(PGconn *conn, LogicalRepInfo *dbinfo) { res = PQexec(conn, str->data); if (PQresultStatus(res) != PGRES_COMMAND_OK) - pg_log_error("could not drop subscription \"%s\" on database \"%s\": %s", dbinfo->subname, dbinfo->dbname, PQerrorMessage(conn)); + pg_log_error("could not drop subscription \"%s\" on database \"%s\": %s", subname, perdb->dbname, PQerrorMessage(conn)); PQclear(res); } @@ -1088,18 +1120,21 @@ drop_subscription(PGconn *conn, LogicalRepInfo *dbinfo) * printing purposes. */ static void -set_replication_progress(PGconn *conn, LogicalRepInfo *dbinfo, const char *lsn) +set_replication_progress(PGconn *conn, LogicalRepPerdbInfo *perdb, const char *lsn) { PQExpBuffer str = createPQExpBuffer(); PGresult *res; Oid suboid; char originname[NAMEDATALEN]; char lsnstr[17 + 1]; /* MAXPG_LSNLEN = 17 */ + char subname[NAMEDATALEN]; Assert(conn != NULL); + get_subscription_name(perdb->oid, (int) getpid(), subname, NAMEDATALEN); + appendPQExpBuffer(str, - "SELECT oid FROM pg_catalog.pg_subscription WHERE subname = '%s'", dbinfo->subname); + "SELECT oid FROM pg_catalog.pg_subscription WHERE subname = '%s'", subname); res = PQexec(conn, str->data); if (PQresultStatus(res) != PGRES_TUPLES_OK) @@ -1140,7 +1175,7 @@ set_replication_progress(PGconn *conn, LogicalRepInfo *dbinfo, const char *lsn) PQclear(res); pg_log_info("setting the replication progress (node name \"%s\" ; LSN %s) on database \"%s\"", - originname, lsnstr, dbinfo->dbname); + originname, lsnstr, perdb->dbname); resetPQExpBuffer(str); appendPQExpBuffer(str, @@ -1154,7 +1189,7 @@ set_replication_progress(PGconn *conn, LogicalRepInfo *dbinfo, const char *lsn) if (PQresultStatus(res) != PGRES_TUPLES_OK) { pg_log_error("could not set replication progress for the subscription \"%s\": %s", - dbinfo->subname, PQresultErrorMessage(res)); + subname, PQresultErrorMessage(res)); PQfinish(conn); exit(1); } @@ -1173,16 +1208,20 @@ set_replication_progress(PGconn *conn, LogicalRepInfo *dbinfo, const char *lsn) * of this setup. */ static void -enable_subscription(PGconn *conn, LogicalRepInfo *dbinfo) +enable_subscription(PGconn *conn, LogicalRepPerdbInfo *perdb) { PQExpBuffer str = createPQExpBuffer(); PGresult *res; + char subname[NAMEDATALEN]; Assert(conn != NULL); - pg_log_info("enabling subscription \"%s\" on database \"%s\"", dbinfo->subname, dbinfo->dbname); + get_subscription_name(perdb->oid, (int) getpid(), subname, NAMEDATALEN); + + pg_log_info("enabling subscription \"%s\" on database \"%s\"", subname, + perdb->dbname); - appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", dbinfo->subname); + appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname); pg_log_debug("command is: %s", str->data); @@ -1191,7 +1230,7 @@ enable_subscription(PGconn *conn, LogicalRepInfo *dbinfo) res = PQexec(conn, str->data); if (PQresultStatus(res) != PGRES_COMMAND_OK) { - pg_log_error("could not enable subscription \"%s\": %s", dbinfo->subname, + pg_log_error("could not enable subscription \"%s\": %s", subname, PQerrorMessage(conn)); PQfinish(conn); exit(1); @@ -1203,6 +1242,61 @@ enable_subscription(PGconn *conn, LogicalRepInfo *dbinfo) destroyPQExpBuffer(str); } +static void +start_standby_server(StandbyInfo *standby, unsigned short subport, + char *server_start_log) +{ + char timebuf[128]; + struct timeval time; + time_t tt; + int len; + int rc; + char *pg_ctl_cmd; + + if (server_start_log[0] == '\0') + { + /* append timestamp with ISO 8601 format. */ + gettimeofday(&time, NULL); + tt = (time_t) time.tv_sec; + strftime(timebuf, sizeof(timebuf), "%Y%m%dT%H%M%S", localtime(&tt)); + snprintf(timebuf + strlen(timebuf), sizeof(timebuf) - strlen(timebuf), + ".%03d", (int) (time.tv_usec / 1000)); + + len = snprintf(server_start_log, MAXPGPATH, + "%s/%s/server_start_%s.log", standby->pgdata, + PGS_OUTPUT_DIR, timebuf); + if (len >= MAXPGPATH) + { + pg_log_error("log file path is too long"); + exit(1); + } + } + pg_ctl_cmd = psprintf("\"%s/pg_ctl\" start -D \"%s\" -s -o \"-p %d\" -l \"%s\"", + standby->bindir, + standby->pgdata, subport, server_start_log); + rc = system(pg_ctl_cmd); + pg_ctl_status(pg_ctl_cmd, rc, 1); +} + +static char * +construct_sub_conninfo(char *username, unsigned short subport) +{ + PQExpBuffer buf = createPQExpBuffer(); + char *ret; + + if (username) + appendPQExpBuffer(buf, "user=%s ", username); + + appendPQExpBuffer(buf, "port=%d fallback_application_name=%s", + subport, progname); + + ret = pg_strdup(buf->data); + + destroyPQExpBuffer(buf); + + return ret; +} + int main(int argc, char **argv) { @@ -1214,6 +1308,10 @@ main(int argc, char **argv) {"publisher-conninfo", required_argument, NULL, 'P'}, {"subscriber-conninfo", required_argument, NULL, 'S'}, {"database", required_argument, NULL, 'd'}, + {"timeout", required_argument, NULL, 't'}, + {"username", required_argument, NULL, 'u'}, + {"port", required_argument, NULL, 'p'}, + {"retain", no_argument, NULL, 'r'}, {"dry-run", no_argument, NULL, 'n'}, {"verbose", no_argument, NULL, 'v'}, {NULL, 0, NULL, 0} @@ -1225,20 +1323,15 @@ main(int argc, char **argv) char *pg_ctl_cmd; - char *base_dir; - char *server_start_log; - - char timebuf[128]; - struct timeval time; - time_t tt; + char base_dir[MAXPGPATH]; + char server_start_log[MAXPGPATH] = {0}; int len; - char *pub_base_conninfo = NULL; - char *sub_base_conninfo = NULL; char *dbname_conninfo = NULL; - uint64 pub_sysid; - uint64 sub_sysid; + unsigned short subport = DEF_PGSPORT; + char *username = NULL; + struct stat statbuf; PGconn *conn; @@ -1250,6 +1343,13 @@ main(int argc, char **argv) int i; + PGresult *res; + + char *wal_level; + int max_replication_slots; + int nslots_old; + int nslots_new; + pg_logging_init(argv[0]); pg_logging_set_level(PG_LOG_WARNING); progname = get_progname(argv[0]); @@ -1286,28 +1386,40 @@ main(int argc, char **argv) } #endif - while ((c = getopt_long(argc, argv, "D:P:S:d:nv", + while ((c = getopt_long(argc, argv, "D:P:S:d:t:u:p:rnv", long_options, &option_index)) != -1) { switch (c) { case 'D': - subscriber_dir = pg_strdup(optarg); + standby.pgdata = pg_strdup(optarg); + canonicalize_path(standby.pgdata); break; case 'P': pub_conninfo_str = pg_strdup(optarg); break; - case 'S': - sub_conninfo_str = pg_strdup(optarg); - break; case 'd': /* Ignore duplicated database names. */ if (!simple_string_list_member(&database_names, optarg)) { simple_string_list_append(&database_names, optarg); - num_dbs++; + dbarr.ndbs++; } break; + case 't': + wait_seconds = atoi(optarg); + break; + case 'u': + pfree(username); + username = pg_strdup(optarg); + break; + case 'p': + if ((subport = atoi(optarg)) <= 0) + pg_fatal("invalid old port number"); + break; + case 'r': + retain = true; + break; case 'n': dry_run = true; break; @@ -1335,7 +1447,7 @@ main(int argc, char **argv) /* * Required arguments */ - if (subscriber_dir == NULL) + if (standby.pgdata == NULL) { pg_log_error("no subscriber data directory specified"); pg_log_error_hint("Try \"%s --help\" for more information.", progname); @@ -1358,21 +1470,14 @@ main(int argc, char **argv) pg_log_error_hint("Try \"%s --help\" for more information.", progname); exit(1); } - pub_base_conninfo = get_base_conninfo(pub_conninfo_str, dbname_conninfo, - "publisher"); - if (pub_base_conninfo == NULL) - exit(1); - if (sub_conninfo_str == NULL) - { - pg_log_error("no subscriber connection string specified"); - pg_log_error_hint("Try \"%s --help\" for more information.", progname); - exit(1); - } - sub_base_conninfo = get_base_conninfo(sub_conninfo_str, NULL, "subscriber"); - if (sub_base_conninfo == NULL) + primary.base_conninfo = get_base_conninfo(pub_conninfo_str, + dbname_conninfo); + if (primary.base_conninfo == NULL) exit(1); + standby.base_conninfo = construct_sub_conninfo(username, subport); + if (database_names.head == NULL) { pg_log_info("no database was specified"); @@ -1385,7 +1490,7 @@ main(int argc, char **argv) if (dbname_conninfo) { simple_string_list_append(&database_names, dbname_conninfo); - num_dbs++; + dbarr.ndbs++; pg_log_info("database \"%s\" was extracted from the publisher connection string", dbname_conninfo); @@ -1399,25 +1504,25 @@ main(int argc, char **argv) } /* - * Get the absolute path of pg_ctl and pg_resetwal on the subscriber. + * Get the absolute path of binaries on the subscriber. */ - if (!get_exec_path(argv[0])) + if (!get_exec_base_path(argv[0])) exit(1); /* rudimentary check for a data directory. */ - if (!check_data_directory(subscriber_dir)) + if (!check_data_directory(standby.pgdata)) exit(1); - /* Store database information for publisher and subscriber. */ - dbinfo = store_pub_sub_info(pub_base_conninfo, sub_base_conninfo); + /* Store database information to dbarr */ + store_db_names(&dbarr.perdb, dbarr.ndbs); /* * Check if the subscriber data directory has the same system identifier * than the publisher data directory. */ - pub_sysid = get_sysid_from_conn(dbinfo[0].pubconninfo); - sub_sysid = get_control_from_datadir(subscriber_dir); - if (pub_sysid != sub_sysid) + get_sysid_for_primary(&primary, dbarr.perdb[0].dbname); + get_control_for_standby(&standby); + if (primary.sysid != standby.sysid) { pg_log_error("subscriber data directory is not a copy of the source database cluster"); exit(1); @@ -1426,8 +1531,8 @@ main(int argc, char **argv) /* * Create the output directory to store any data generated by this tool. */ - base_dir = (char *) pg_malloc0(MAXPGPATH); - len = snprintf(base_dir, MAXPGPATH, "%s/%s", subscriber_dir, PGS_OUTPUT_DIR); + len = snprintf(base_dir, MAXPGPATH, "%s/%s", + standby.pgdata, PGS_OUTPUT_DIR); if (len >= MAXPGPATH) { pg_log_error("directory path for subscriber is too long"); @@ -1441,7 +1546,153 @@ main(int argc, char **argv) } /* subscriber PID file. */ - snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir); + snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", + standby.pgdata); + + /* Start the standby server anyway */ + start_standby_server(&standby, subport, server_start_log); + + /* + * Check wal_level in publisher and the max_replication_slots of publisher + */ + conn = connect_database(primary.base_conninfo, dbarr.perdb[0].dbname); + if (conn == NULL) + exit(1); + + res = PQexec(conn, "SELECT count(*) from pg_replication_slots;"); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not obtain number of replication slots"); + exit(1); + } + + if (PQntuples(res) != 1) + { + pg_log_error("could not determine parameter settings on publisher"); + exit(1); + } + + nslots_old = atoi(PQgetvalue(res, 0, 0)); + PQclear(res); + + res = PQexec(conn, "SELECT setting FROM pg_settings " + "WHERE name IN ('wal_level', 'max_replication_slots') " + "ORDER BY name DESC;"); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not obtain guc parameters on publisher"); + exit(1); + } + + if (PQntuples(res) != 2) + { + pg_log_error("could not determine parameter settings on publisher"); + exit(1); + } + + wal_level = PQgetvalue(res, 0, 0); + + if (strcmp(wal_level, "logical") != 0) + { + pg_log_error("wal_level must be \"logical\", but is set to \"%s\"", wal_level); + exit(1); + } + + max_replication_slots = atoi(PQgetvalue(res, 1, 0)); + nslots_new = nslots_old + dbarr.ndbs + 1; + + if (nslots_new > max_replication_slots) + { + pg_log_error("max_replication_slots (%d) must be greater than or equal to " + "the number of replication slots required (%d)", max_replication_slots, nslots_new); + exit(1); + } + + PQclear(res); + disconnect_database(conn); + + conn = connect_database(standby.base_conninfo, dbarr.perdb[0].dbname); + if (conn == NULL) + exit(1); + + /* + * Check the max_replication_slots in subscriber + */ + res = PQexec(conn, "SELECT count(*) from pg_replication_slots;"); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not obtain number of replication slots on subscriber"); + exit(1); + } + + if (PQntuples(res) != 1) + { + pg_log_error("could not determine parameter settings on subscriber"); + exit(1); + } + + nslots_old = atoi(PQgetvalue(res, 0, 0)); + PQclear(res); + + res = PQexec(conn, "SELECT setting FROM pg_settings " + "WHERE name = 'max_replication_slots';"); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not obtain guc parameters"); + exit(1); + } + + if (PQntuples(res) != 1) + { + pg_log_error("could not determine parameter settings on publisher"); + exit(1); + } + + max_replication_slots = atoi(PQgetvalue(res, 0, 0)); + nslots_new = nslots_old + dbarr.ndbs; + + if (nslots_new > max_replication_slots) + { + pg_log_error("max_replication_slots (%d) must be greater than or equal to " + "the number of replication slots required (%d)", max_replication_slots, nslots_new); + exit(1); + } + + PQclear(res); + + /* + * Exit the pg_subscriber if the node is not a standby server. + */ + res = PQexec(conn, "SELECT pg_catalog.pg_is_in_recovery()"); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not obtain recovery progress"); + exit(1); + } + + if (PQntuples(res) != 1) + { + pg_log_error("unexpected result from pg_is_in_recovery function"); + exit(1); + } + + /* Check if the server is in recovery */ + if (strcmp(PQgetvalue(res, 0, 0), "t") != 0) + { + pg_log_error("pg_subscriber is supported only on standby server"); + exit(1); + } + + PQclear(res); + disconnect_database(conn); + + /* subscriber PID file. */ + snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", standby.pgdata); /* * Stop the subscriber if it is a standby server. Before executing the @@ -1457,14 +1708,18 @@ main(int argc, char **argv) * replication slot has no use after the transformation, hence, it * will be removed at the end of this process. */ - primary_slot_name = use_primary_slot_name(); - if (primary_slot_name != NULL) - pg_log_info("primary has replication slot \"%s\"", primary_slot_name); + standby.primary_slot_name = use_primary_slot_name(&primary, + &standby, + &dbarr.perdb[0]); + if (standby.primary_slot_name != NULL) + pg_log_info("primary has replication slot \"%s\"", + standby.primary_slot_name); pg_log_info("subscriber is up and running"); pg_log_info("stopping the server to start the transformation steps"); - pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path, subscriber_dir); + pg_ctl_cmd = psprintf("\"%s/pg_ctl\" stop -D \"%s\" -s", + standby.bindir, standby.pgdata); rc = system(pg_ctl_cmd); pg_ctl_status(pg_ctl_cmd, rc, 0); } @@ -1472,7 +1727,7 @@ main(int argc, char **argv) /* * Create a replication slot for each database on the publisher. */ - if (!create_all_logical_replication_slots(dbinfo)) + if (!create_all_logical_replication_slots(&primary, &dbarr)) exit(1); /* @@ -1492,11 +1747,11 @@ main(int argc, char **argv) * replication connection open (depending when base backup was taken, the * connection should be open for a few hours). */ - conn = connect_database(dbinfo[0].pubconninfo); + conn = connect_database(primary.base_conninfo, dbarr.perdb[0].dbname); if (conn == NULL) exit(1); - consistent_lsn = create_logical_replication_slot(conn, &dbinfo[0], - temp_replslot); + consistent_lsn = create_logical_replication_slot(conn, true, + &dbarr.perdb[0]); /* * Write recovery parameters. @@ -1522,7 +1777,7 @@ main(int argc, char **argv) { appendPQExpBuffer(recoveryconfcontents, "recovery_target_lsn = '%s'\n", consistent_lsn); - WriteRecoveryConfig(conn, subscriber_dir, recoveryconfcontents); + WriteRecoveryConfig(conn, standby.pgdata, recoveryconfcontents); } disconnect_database(conn); @@ -1532,54 +1787,29 @@ main(int argc, char **argv) * Start subscriber and wait until accepting connections. */ pg_log_info("starting the subscriber"); - - /* append timestamp with ISO 8601 format. */ - gettimeofday(&time, NULL); - tt = (time_t) time.tv_sec; - strftime(timebuf, sizeof(timebuf), "%Y%m%dT%H%M%S", localtime(&tt)); - snprintf(timebuf + strlen(timebuf), sizeof(timebuf) - strlen(timebuf), - ".%03d", (int) (time.tv_usec / 1000)); - - server_start_log = (char *) pg_malloc0(MAXPGPATH); - len = snprintf(server_start_log, MAXPGPATH, "%s/%s/server_start_%s.log", subscriber_dir, PGS_OUTPUT_DIR, timebuf); - if (len >= MAXPGPATH) - { - pg_log_error("log file path is too long"); - exit(1); - } - - pg_ctl_cmd = psprintf("\"%s\" start -D \"%s\" -s -l \"%s\"", pg_ctl_path, subscriber_dir, server_start_log); - rc = system(pg_ctl_cmd); - pg_ctl_status(pg_ctl_cmd, rc, 1); + start_standby_server(&standby, subport, server_start_log); /* * Waiting the subscriber to be promoted. */ - wait_for_end_recovery(dbinfo[0].subconninfo); + wait_for_end_recovery(standby.base_conninfo, dbarr.perdb[0].dbname); /* * Create a publication for each database. This step should be executed * after promoting the subscriber to avoid replicating unnecessary * objects. */ - for (i = 0; i < num_dbs; i++) + for (i = 0; i < dbarr.ndbs; i++) { - char pubname[NAMEDATALEN]; + LogicalRepPerdbInfo *perdb = &dbarr.perdb[i]; /* Connect to publisher. */ - conn = connect_database(dbinfo[i].pubconninfo); + conn = connect_database(primary.base_conninfo, perdb->dbname); if (conn == NULL) exit(1); - /* - * Build the publication name. The name must not exceed NAMEDATALEN - - * 1. This current schema uses a maximum of 35 characters (14 + 10 + - * '\0'). - */ - snprintf(pubname, sizeof(pubname), "pg_subscriber_%u", dbinfo[i].oid); - dbinfo[i].pubname = pg_strdup(pubname); - - create_publication(conn, &dbinfo[i]); + /* Also create a publication */ + create_publication(conn, &primary, perdb); disconnect_database(conn); } @@ -1587,20 +1817,25 @@ main(int argc, char **argv) /* * Create a subscription for each database. */ - for (i = 0; i < num_dbs; i++) + for (i = 0; i < dbarr.ndbs; i++) { + LogicalRepPerdbInfo *perdb = &dbarr.perdb[i]; + /* Connect to subscriber. */ - conn = connect_database(dbinfo[i].subconninfo); + conn = connect_database(standby.base_conninfo, perdb->dbname); + if (conn == NULL) exit(1); - create_subscription(conn, &dbinfo[i]); + create_subscription(conn, &standby, primary.base_conninfo, perdb); /* Set the replication progress to the correct LSN. */ - set_replication_progress(conn, &dbinfo[i], consistent_lsn); + set_replication_progress(conn, perdb, consistent_lsn); /* Enable subscription. */ - enable_subscription(conn, &dbinfo[i]); + enable_subscription(conn, perdb); + + drop_publication(conn, perdb); disconnect_database(conn); } @@ -1613,19 +1848,21 @@ main(int argc, char **argv) * XXX we might not fail here. Instead, we provide a warning so the user * eventually drops the replication slot later. */ - conn = connect_database(dbinfo[0].pubconninfo); + conn = connect_database(primary.base_conninfo, dbarr.perdb[0].dbname); if (conn == NULL) { - pg_log_warning("could not drop transient replication slot \"%s\" on publisher", temp_replslot); - pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files."); + char *primary_slot_name = standby.primary_slot_name; + if (primary_slot_name != NULL) pg_log_warning("could not drop replication slot \"%s\" on primary", primary_slot_name); } else { - drop_replication_slot(conn, &dbinfo[0], temp_replslot); + LogicalRepPerdbInfo *perdb = &dbarr.perdb[0]; + char *primary_slot_name = standby.primary_slot_name; + if (primary_slot_name != NULL) - drop_replication_slot(conn, &dbinfo[0], primary_slot_name); + drop_replication_slot(conn, perdb, primary_slot_name); disconnect_database(conn); } @@ -1634,20 +1871,22 @@ main(int argc, char **argv) */ pg_log_info("stopping the subscriber"); - pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path, subscriber_dir); + pg_ctl_cmd = psprintf("\"%s/pg_ctl\" stop -D \"%s\" -s", + standby.bindir, standby.pgdata); rc = system(pg_ctl_cmd); pg_ctl_status(pg_ctl_cmd, rc, 0); /* * Change system identifier. */ - modify_sysid(pg_resetwal_path, subscriber_dir); + modify_sysid(standby.bindir, standby.pgdata); /* * Remove log file generated by this tool, if it runs successfully. * Otherwise, file is kept that may provide useful debugging information. */ - unlink(server_start_log); + if (!retain) + unlink(server_start_log); success = true; diff --git a/src/bin/pg_basebackup/t/040_pg_subscriber.pl b/src/bin/pg_basebackup/t/040_pg_subscriber.pl index 4ebff76b2d..9915b8cb3c 100644 --- a/src/bin/pg_basebackup/t/040_pg_subscriber.pl +++ b/src/bin/pg_basebackup/t/040_pg_subscriber.pl @@ -37,8 +37,13 @@ command_fails( '--verbose', '--pgdata', $datadir, '--publisher-conninfo', 'dbname=postgres', - '--subscriber-conninfo', 'dbname=postgres' ], 'no database name specified'); - +command_fails( + [ + 'pg_subscriber', '--verbose', + '--pgdata', $datadir, + '--publisher-conninfo', 'dbname=postgres', + ], + 'subscriber connection string specnfied non-local server'); done_testing(); diff --git a/src/bin/pg_basebackup/t/041_pg_subscriber_standby.pl b/src/bin/pg_basebackup/t/041_pg_subscriber_standby.pl index fbcd0fc82b..4e26607611 100644 --- a/src/bin/pg_basebackup/t/041_pg_subscriber_standby.pl +++ b/src/bin/pg_basebackup/t/041_pg_subscriber_standby.pl @@ -51,25 +51,27 @@ $node_s->start; $node_p->safe_psql('pg1', "INSERT INTO tbl1 VALUES('second row')"); $node_p->wait_for_replay_catchup($node_s); +$node_f->stop; + # Run pg_subscriber on about-to-fail node F command_fails( [ 'pg_subscriber', '--verbose', '--pgdata', $node_f->data_dir, '--publisher-conninfo', $node_p->connstr('pg1'), - '--subscriber-conninfo', $node_f->connstr('pg1'), '--database', 'pg1', '--database', 'pg2' ], 'subscriber data directory is not a copy of the source database cluster'); +$node_s->stop; + # dry run mode on node S command_ok( [ 'pg_subscriber', '--verbose', '--dry-run', '--pgdata', $node_s->data_dir, '--publisher-conninfo', $node_p->connstr('pg1'), - '--subscriber-conninfo', $node_s->connstr('pg1'), '--database', 'pg1', '--database', 'pg2' ], @@ -82,6 +84,7 @@ $node_s->start; # Check if node S is still a standby is($node_s->safe_psql('postgres', 'SELECT pg_is_in_recovery()'), 't', 'standby is in recovery'); +$node_s->stop; # Run pg_subscriber on node S command_ok( @@ -89,7 +92,6 @@ command_ok( 'pg_subscriber', '--verbose', '--pgdata', $node_s->data_dir, '--publisher-conninfo', $node_p->connstr('pg1'), - '--subscriber-conninfo', $node_s->connstr('pg1'), '--database', 'pg1', '--database', 'pg2' ], -- 2.43.0