From e98e1ba0661e3d658de8d46ff9082f6cd4040b41 Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Sat, 16 Mar 2024 18:47:35 +0530 Subject: [PATCH v29 2/2] Keep standby server's minimum recovery point less than the consistent_lsn. Standby server should not get ahead of the replication slot's lsn. If this happens we will not be able to promote the standby server as it will not be able to reach the consistent point because the standby server's Minimum recovery ending location will be greater than the consistent_lsn. Fixed this by pausing the replay before the replication slots are created. --- src/bin/pg_basebackup/pg_createsubscriber.c | 93 ++++++++++++++++++++- 1 file changed, 92 insertions(+), 1 deletion(-) diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c index c565f8524a..18e8757403 100644 --- a/src/bin/pg_basebackup/pg_createsubscriber.c +++ b/src/bin/pg_basebackup/pg_createsubscriber.c @@ -95,6 +95,8 @@ static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo); static void create_subscription(PGconn *conn, struct LogicalRepInfo *dbinfo); static void set_replication_progress(PGconn *conn, struct LogicalRepInfo *dbinfo, const char *lsn); +static void pause_replay_stantby_server(struct LogicalRepInfo *dbinfo, + struct CreateSubscriberOptions *opt); static void enable_subscription(PGconn *conn, struct LogicalRepInfo *dbinfo); #define USEC_PER_SEC 1000000 @@ -917,7 +919,9 @@ check_subscriber(struct LogicalRepInfo *dbinfo) appendPQExpBuffer(str, "SELECT pg_catalog.pg_has_role(current_user, %u, 'MEMBER'), " "pg_catalog.has_database_privilege(current_user, '%s', 'CREATE'), " - "pg_catalog.has_function_privilege(current_user, 'pg_catalog.pg_replication_origin_advance(text, pg_lsn)', 'EXECUTE')", + "pg_catalog.has_function_privilege(current_user, 'pg_catalog.pg_replication_origin_advance(text, pg_lsn)', 'EXECUTE'), " + "pg_catalog.has_function_privilege(current_user, 'pg_catalog.pg_wal_replay_pause()', 'EXECUTE'), " + "pg_catalog.has_function_privilege(current_user, 'pg_catalog.pg_get_wal_replay_pause_state()', 'EXECUTE')", ROLE_PG_CREATE_SUBSCRIPTION, dbinfo[0].dbname); pg_log_debug("command is: %s", str->data); @@ -949,6 +953,18 @@ check_subscriber(struct LogicalRepInfo *dbinfo) "pg_catalog.pg_replication_origin_advance(text, pg_lsn)"); failed = true; } + if (strcmp(PQgetvalue(res, 0, 3), "t") != 0) + { + pg_log_error("permission denied for function \"%s\"", + "pg_catalog.pg_wal_replay_pause()"); + failed = true; + } + if (strcmp(PQgetvalue(res, 0, 4), "t") != 0) + { + pg_log_error("permission denied for function \"%s\"", + "pg_catalog.pg_get_wal_replay_pause_state()"); + failed = true; + } destroyPQExpBuffer(str); PQclear(res); @@ -1026,6 +1042,72 @@ check_subscriber(struct LogicalRepInfo *dbinfo) exit(1); } +/* + * Pause replaying at the standby server. + */ +static void +pause_replay_stantby_server(struct LogicalRepInfo *dbinfo, + struct CreateSubscriberOptions *opt) +{ + PGconn *conn; + PGresult *res; + + /* Connect to subscriber. */ + conn = connect_database(dbinfo[0].subconninfo, true); + + pg_log_info("Pausing the replay in standby server"); + pg_log_debug("command is: SELECT pg_catalog.pg_wal_replay_pause()"); + + if (!dry_run) + { + int timer = 0; + + res = PQexec(conn, "SELECT pg_catalog.pg_wal_replay_pause()"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not pause replay in standby server: %s", + PQresultErrorMessage(res)); + disconnect_database(conn, true); + } + PQclear(res); + + /* Wait till replay has paused */ + for(;;) + { + pg_log_debug("command is: SELECT pg_catalog.pg_get_wal_replay_pause_state()"); + res = PQexec(conn, "SELECT pg_catalog.pg_get_wal_replay_pause_state()"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not get pause replay state in standby server: %s", + PQresultErrorMessage(res)); + disconnect_database(conn, true); + } + + if (strcmp(PQgetvalue(res, 0, 0), "paused") == 0) + { + PQclear(res); + break; + } + + PQclear(res); + + /* Bail out after recovery_timeout seconds if this option is set */ + if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout) + { + pg_log_error("timed out waiting to pause replay"); + disconnect_database(conn, true); + } + + /* Keep waiting */ + pg_usleep(WAIT_INTERVAL * USEC_PER_SEC); + + timer += WAIT_INTERVAL; + } + } + + disconnect_database(conn, false); +} + /* * Create the subscriptions, adjust the initial location for logical * replication and enable the subscriptions. That's the last step for logical @@ -1948,6 +2030,15 @@ main(int argc, char **argv) */ check_publisher(dbinfo); + /* + * Standby server should not get ahead of the replication slot's lsn. If + * this happens we will not be able to promote the standby server as it + * will not be able to reach the consistent point because the standby + * server's Minimum recovery ending location will be greater than the + * consistent_lsn. + */ + pause_replay_stantby_server(dbinfo, &opt); + /* * Create the required objects for each database on publisher. This step * is here mainly because if we stop the standby we cannot verify if the -- 2.34.1