From 48e1c968d96096cb4f20604889738050e9e3c565 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Mon, 5 Jan 2026 09:58:23 -0800 Subject: [PATCH v1] pg_upgrade: Optimize replication slot caught-up check logic. Author: Reviewed-by: Discussion: https://postgr.es/m/ --- src/bin/pg_upgrade/info.c | 159 ++++++++++++++++++++++++++------------ 1 file changed, 111 insertions(+), 48 deletions(-) diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c index 2de0ee4d030..568e81e7776 100644 --- a/src/bin/pg_upgrade/info.c +++ b/src/bin/pg_upgrade/info.c @@ -29,8 +29,8 @@ static void free_rel_infos(RelInfoArr *rel_arr); static void print_db_infos(DbInfoArr *db_arr); static void print_rel_infos(RelInfoArr *rel_arr); static void print_slot_infos(LogicalSlotInfoArr *slot_arr); -static char *get_old_cluster_logical_slot_infos_query(void); static void process_old_cluster_logical_slot_infos(DbInfo *dbinfo, PGresult *res, void *arg); +static void process_old_cluter_logical_slot_catchup_infos(DbInfo *dbinfo, PGresult *res, void *arg); /* @@ -307,11 +307,77 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster) if (cluster == &old_cluster && GET_MAJOR_VERSION(cluster->major_version) > 1600) { - logical_slot_infos_query = get_old_cluster_logical_slot_infos_query(); + const char *slot_info_query = "SELECT slot_name, plugin, two_phase, failover, " + "invalidation_reason IS NOT NULL as invalid " + "FROM pg_catalog.pg_replication_slots " + "WHERE slot_type = 'logical' AND " + "database = current_database() AND " + "temporary IS FALSE;"; + + /* + * Fetch the logical replication slot information. + * + * The temporary slots are explicitly ignored while checking because + * such slots cannot exist after the upgrade. During the upgrade, + * clusters are started and stopped several times causing any + * temporary slots to be removed. + */ upgrade_task_add_step(task, - logical_slot_infos_query, + slot_info_query, process_old_cluster_logical_slot_infos, true, NULL); + + if (!user_opts.live_check) + { + /* + * Check whether slots have consumed all WAL records. + * + * The determination of whether a slot is caught up is performed + * by an upgrade function. This considers the slot caught up if no + * decodable changes are found. See + * binary_upgrade_logical_slot_has_caught_up(). + * + * Note that we cannot guarantee that the slot is caught up during + * a live_check, as new WAL records could be generated + * concurrently. + * + * We optimize this check to avoid reading the same WAL stream + * multiple times: execute the caught-up check only for the slot + * with the minimum confirmed_flush_lsn, and apply the same result + * to all other slots in the same database. This way, we check at + * most one logical slot per database. + * + * Note that we don't distinguish slots based on their output + * plugin. If a plugin applies replication origin filters, we + * might get a false positive (i.e., erroneously considering a + * slot caught up). However, such cases are very rare, and the + * impact of a false positive is minimal. + */ + const char *slot_caughtup_info_query = + "WITH check_caught_up AS ( " + " SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name) AS caught_up " + " FROM pg_replication_slots AS s " + " WHERE database = current_database() AND " + " slot_type = 'logical' AND " + " temporary IS FALSE AND " + " invalidation_reason IS NULL " + " ORDER BY confirmed_flush_lsn ASC " + " LIMIT 1" + ") " + "SELECT slot_name, " + " CASE WHEN invalidation_reason IS NULL THEN c.caught_up " + " ELSE FALSE " + " END as caught_up " + "FROM pg_replication_slots AS s, check_caught_up AS c " + "WHERE s.database = current_database() AND " + " s.slot_type = 'logical' AND " + " s.temporary IS FALSE"; + + upgrade_task_add_step(task, + slot_caughtup_info_query, + process_old_cluter_logical_slot_catchup_infos, + true, NULL); + } } upgrade_task_run(task, cluster); @@ -676,48 +742,7 @@ process_rel_infos(DbInfo *dbinfo, PGresult *res, void *arg) } /* - * get_old_cluster_logical_slot_infos_query() - * - * Returns the query for retrieving the logical slot information for all the - * logical replication slots in the database, for use by - * get_db_rel_and_slot_infos()'s UpgradeTask. The status of each logical slot - * is checked in check_old_cluster_for_valid_slots(). - */ -static char * -get_old_cluster_logical_slot_infos_query(void) -{ - /* - * Fetch the logical replication slot information. The check whether the - * slot is considered caught up is done by an upgrade function. This - * regards the slot as caught up if we don't find any decodable changes. - * See binary_upgrade_logical_slot_has_caught_up(). - * - * Note that we can't ensure whether the slot is caught up during - * live_check as the new WAL records could be generated. - * - * We intentionally skip checking the WALs for invalidated slots as the - * corresponding WALs could have been removed for such slots. - * - * The temporary slots are explicitly ignored while checking because such - * slots cannot exist after the upgrade. During the upgrade, clusters are - * started and stopped several times causing any temporary slots to be - * removed. - */ - return psprintf("SELECT slot_name, plugin, two_phase, failover, " - "%s as caught_up, invalidation_reason IS NOT NULL as invalid " - "FROM pg_catalog.pg_replication_slots " - "WHERE slot_type = 'logical' AND " - "database = current_database() AND " - "temporary IS FALSE;", - user_opts.live_check ? "FALSE" : - "(CASE WHEN invalidation_reason IS NOT NULL THEN FALSE " - "ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) " - "END)"); -} - -/* - * Callback function for processing results of the query returned by - * get_old_cluster_logical_slot_infos_query(), which is used for + * Callback function for processing results of the query, which is used for * get_db_rel_and_slot_infos()'s UpgradeTask. This function stores the logical * slot information for later use. */ @@ -736,7 +761,6 @@ process_old_cluster_logical_slot_infos(DbInfo *dbinfo, PGresult *res, void *arg) int i_plugin; int i_twophase; int i_failover; - int i_caught_up; int i_invalid; slotinfos = (LogicalSlotInfo *) pg_malloc(sizeof(LogicalSlotInfo) * num_slots); @@ -745,7 +769,6 @@ process_old_cluster_logical_slot_infos(DbInfo *dbinfo, PGresult *res, void *arg) i_plugin = PQfnumber(res, "plugin"); i_twophase = PQfnumber(res, "two_phase"); i_failover = PQfnumber(res, "failover"); - i_caught_up = PQfnumber(res, "caught_up"); i_invalid = PQfnumber(res, "invalid"); for (int slotnum = 0; slotnum < num_slots; slotnum++) @@ -756,8 +779,14 @@ process_old_cluster_logical_slot_infos(DbInfo *dbinfo, PGresult *res, void *arg) curr->plugin = pg_strdup(PQgetvalue(res, slotnum, i_plugin)); curr->two_phase = (strcmp(PQgetvalue(res, slotnum, i_twophase), "t") == 0); curr->failover = (strcmp(PQgetvalue(res, slotnum, i_failover), "t") == 0); - curr->caught_up = (strcmp(PQgetvalue(res, slotnum, i_caught_up), "t") == 0); curr->invalid = (strcmp(PQgetvalue(res, slotnum, i_invalid), "t") == 0); + + /* + * Set false by default. This field will be updated in a separate + * task, process_old_cluter_logical_slot_catchup_infos, if we're + * not doing live_check. + */ + curr->caught_up = false; } } @@ -765,6 +794,40 @@ process_old_cluster_logical_slot_infos(DbInfo *dbinfo, PGresult *res, void *arg) dbinfo->slot_arr.nslots = num_slots; } +/* + * Callback function for processing results of the query, which is used for + * get_db_rel_and_slot_infos()'s UpgradeTask. This function updates the caught_up + * field for each slot information. + */ +static void +process_old_cluter_logical_slot_catchup_infos(DbInfo *dbinfo, PGresult *res, void *arg) +{ + int num_slots = PQntuples(res); + + AssertVariableIsOfType(&process_old_cluter_logical_slot_catchup_infos, + UpgradeTaskProcessCB); + Assert(num_slots == dbinfo->slot_arr.nslots); + + for (int i = 0; i < num_slots; i++) + { + char *slotname; + bool caught_up; + + slotname = PQgetvalue(res, i, PQfnumber(res, "slot_name")); + caught_up = (strcmp(PQgetvalue(res, i, PQfnumber(res, "caught_up")), "t") == 0); + + for (int slotnum = 0; slotnum < dbinfo->slot_arr.nslots; slotnum++) + { + LogicalSlotInfo *s = &(dbinfo->slot_arr.slots[slotnum]); + + if (strcmp(s->slotname, slotname) == 0) + { + s->caught_up = caught_up; + break; + } + } + } +} /* * count_old_cluster_logical_slots() -- 2.47.3