From 1e2a9a7901c803cfe0d6b9d49be05eae6395ace6 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Wed, 7 Jan 2026 12:06:35 -0800 Subject: [PATCH v3] pg_upgrade: Optimize replication slot caught-up check check. Reviewed-by: Discussion: https://postgr.es/m/CAD21AoBZ0LAcw1OHGEKdW7S5TRJaURdhEk3CLAW69_siqfqyAg@mail.gmail.com --- src/backend/replication/logical/logical.c | 32 +++++- src/backend/utils/adt/pg_upgrade_support.c | 12 +- src/bin/pg_upgrade/info.c | 127 +++++++++++++++++++-- src/bin/pg_upgrade/t/003_logical_slots.pl | 25 ++-- src/include/catalog/pg_proc.dat | 2 +- src/include/replication/logical.h | 3 +- 6 files changed, 173 insertions(+), 28 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index b0ef1a12520..53b601a4d2a 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1988,15 +1988,21 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) /* * Read up to the end of WAL starting from the decoding slot's restart_lsn. * Return true if any meaningful/decodable WAL records are encountered, - * otherwise false. + * otherwise false. If last_pending_wal_p is set by the caller, it continues + * scanning WAL even after detecting a decodable WAL record, in order to + * get the last decodable WAL record's LSN. */ bool -LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal) +LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal, + XLogRecPtr *last_pending_wal_p) { bool has_pending_wal = false; Assert(MyReplicationSlot); + if (last_pending_wal_p != NULL) + *last_pending_wal_p = InvalidXLogRecPtr; + PG_TRY(); { LogicalDecodingContext *ctx; @@ -2024,7 +2030,7 @@ LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal) InvalidateSystemCaches(); /* Loop until the end of WAL or some changes are processed */ - while (!has_pending_wal && ctx->reader->EndRecPtr < end_of_wal) + while (ctx->reader->EndRecPtr < end_of_wal) { XLogRecord *record; char *errm = NULL; @@ -2037,7 +2043,22 @@ LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal) if (record != NULL) LogicalDecodingProcessRecord(ctx, ctx->reader); - has_pending_wal = ctx->processing_required; + has_pending_wal |= ctx->processing_required; + + if (ctx->processing_required) + { + /* + * Check is done if we don't want to check the last decodable + * WAL LSN. + */ + if (last_pending_wal_p == NULL) + break; + + *last_pending_wal_p = ctx->reader->ReadRecPtr; + + /* Reset the flag and continue checking */ + ctx->processing_required = false; + } CHECK_FOR_INTERRUPTS(); } @@ -2055,6 +2076,9 @@ LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal) } PG_END_TRY(); + Assert((last_pending_wal_p == NULL) || + has_pending_wal == XLogRecPtrIsValid(*last_pending_wal_p)); + return has_pending_wal; } diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c index 8953a17753e..849bee6834c 100644 --- a/src/backend/utils/adt/pg_upgrade_support.c +++ b/src/backend/utils/adt/pg_upgrade_support.c @@ -19,6 +19,7 @@ #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" #include "commands/extension.h" +#include "funcapi.h" #include "miscadmin.h" #include "replication/logical.h" #include "replication/logicallauncher.h" @@ -286,9 +287,9 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS) { Name slot_name; XLogRecPtr end_of_wal; - bool found_pending_wal; + XLogRecPtr last_pending_wal; - CHECK_IS_BINARY_UPGRADE; + /* CHECK_IS_BINARY_UPGRADE; */ /* * Binary upgrades only allowed super-user connections so we must have @@ -307,12 +308,15 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS) Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE); end_of_wal = GetFlushRecPtr(NULL); - found_pending_wal = LogicalReplicationSlotHasPendingWal(end_of_wal); + LogicalReplicationSlotHasPendingWal(end_of_wal, &last_pending_wal); /* Clean up */ ReplicationSlotRelease(); - PG_RETURN_BOOL(!found_pending_wal); + if (XLogRecPtrIsValid(last_pending_wal)) + PG_RETURN_LSN(last_pending_wal); + else + PG_RETURN_NULL(); } /* diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c index 2de0ee4d030..7f64ed6fded 100644 --- a/src/bin/pg_upgrade/info.c +++ b/src/bin/pg_upgrade/info.c @@ -29,8 +29,10 @@ static void free_rel_infos(RelInfoArr *rel_arr); static void print_db_infos(DbInfoArr *db_arr); static void print_rel_infos(RelInfoArr *rel_arr); static void print_slot_infos(LogicalSlotInfoArr *slot_arr); -static char *get_old_cluster_logical_slot_infos_query(void); +static char *get_old_cluster_logical_slot_infos_query(bool skip_caught_up_check); static void process_old_cluster_logical_slot_infos(DbInfo *dbinfo, PGresult *res, void *arg); +static void process_old_cluster_logical_slot_catchup_infos(DbInfo *dbinfo, PGresult *res, + void *arg); /* @@ -307,11 +309,77 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster) if (cluster == &old_cluster && GET_MAJOR_VERSION(cluster->major_version) > 1600) { - logical_slot_infos_query = get_old_cluster_logical_slot_infos_query(); + bool use_fast_check; + + /* + * The faster method for slot caught-up check is available on PG19 or + * newer. + */ + use_fast_check = (GET_MAJOR_VERSION(cluster->major_version) >= 1900); + + logical_slot_infos_query = get_old_cluster_logical_slot_infos_query(use_fast_check); + upgrade_task_add_step(task, logical_slot_infos_query, process_old_cluster_logical_slot_infos, true, NULL); + + /* + * Check whether slots have consumed all WAL records efficiently by + * using another query, it not during a live_check. + */ + if (use_fast_check && !user_opts.live_check) + { + /* + * We optimize the slot caught-up check to avoid reading the same + * WAL stream multiple times: execute the caught-up check only for + * the slot with the minimum confirmed_flush_lsn, and apply the + * same result to all other slots in the same database. This way, + * we check at most one logical slot per database. + * + * Note that we don't distinguish slots based on their output + * plugin. If a plugin applies replication origin filters, we + * might get a false positive (i.e., erroneously considering a + * slot caught up). However, such cases are very rare, and the + * impact of a false positive is minimal. + * + * The query returns the slot names and their caught-up status in + * the same order as the results collected by + * get_old_cluster_logical_slot_infos(). If this query is changed, + * please ensure it remains consistent with that function. + */ + const char *slot_caughtup_info_query = + "WITH check_caught_up AS ( " + " SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(s.slot_name) as last_pending_wal " + " FROM ( " + " SELECT slot_name " + " FROM pg_replication_slots " + " WHERE database = current_database() AND " + " slot_type = 'logical' AND " + " temporary IS FALSE AND " + " invalidation_reason IS NULL " + " ORDER BY confirmed_flush_lsn ASC " + " LIMIT 1 " + " ) s " + ") " + "SELECT slot_name, " + " CASE " + " WHEN invalidation_reason IS NULL THEN " + " last_pending_wal IS NULL OR " + " confirmed_flush_lsn > last_pending_wal " + " ELSE FALSE " + " END as caught_up " + "FROM pg_replication_slots, check_caught_up " + "WHERE slot_type = 'logical' AND " + " database = current_database() AND " + " temporary IS FALSE " + "ORDER BY 1"; + + upgrade_task_add_step(task, + slot_caughtup_info_query, + process_old_cluster_logical_slot_catchup_infos, + true, NULL); + } } upgrade_task_run(task, cluster); @@ -684,13 +752,14 @@ process_rel_infos(DbInfo *dbinfo, PGresult *res, void *arg) * is checked in check_old_cluster_for_valid_slots(). */ static char * -get_old_cluster_logical_slot_infos_query(void) +get_old_cluster_logical_slot_infos_query(bool skip_caught_up_check) { /* * Fetch the logical replication slot information. The check whether the - * slot is considered caught up is done by an upgrade function. This - * regards the slot as caught up if we don't find any decodable changes. - * See binary_upgrade_logical_slot_has_caught_up(). + * slot is considered caught up is done by an upgrade function, unless the + * caller sets skip_caught_up_check. This regards the slot as caught up if + * we don't find any decodable changes. See + * binary_upgrade_logical_slot_has_caught_up(). * * Note that we can't ensure whether the slot is caught up during * live_check as the new WAL records could be generated. @@ -708,16 +777,16 @@ get_old_cluster_logical_slot_infos_query(void) "FROM pg_catalog.pg_replication_slots " "WHERE slot_type = 'logical' AND " "database = current_database() AND " - "temporary IS FALSE;", - user_opts.live_check ? "FALSE" : + "temporary IS FALSE " + "ORDER BY 1;", + (skip_caught_up_check || user_opts.live_check) ? "FALSE" : "(CASE WHEN invalidation_reason IS NOT NULL THEN FALSE " "ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) " "END)"); } /* - * Callback function for processing results of the query returned by - * get_old_cluster_logical_slot_infos_query(), which is used for + * Callback function for processing results of the query, which is used for * get_db_rel_and_slot_infos()'s UpgradeTask. This function stores the logical * slot information for later use. */ @@ -765,6 +834,44 @@ process_old_cluster_logical_slot_infos(DbInfo *dbinfo, PGresult *res, void *arg) dbinfo->slot_arr.nslots = num_slots; } +/* + * Callback function for processing results of the query, which is used for + * get_db_rel_and_slot_infos()'s UpgradeTask. This function updates the caught_up + * field for each slot information collected by + * process_old_cluster_logical_slot_infos(). + */ +static void +process_old_cluster_logical_slot_catchup_infos(DbInfo *dbinfo, PGresult *res, void *arg) +{ + int num_slots = PQntuples(res); + + AssertVariableIsOfType(&process_old_cluster_logical_slot_catchup_infos, + UpgradeTaskProcessCB); + Assert(num_slots == dbinfo->slot_arr.nslots); + + /* + * Update the caught_up field of each logical slot. The caught_up values + * are retrieved in the same order as the slots were collected in + * process_old_cluster_logical_slot_infos(), so we can update the slots + * sequentially. + */ + for (int i = 0; i < num_slots; i++) + { + LogicalSlotInfo *s = &(dbinfo->slot_arr.slots[i]); + char *slotname; + bool caught_up; + + slotname = PQgetvalue(res, i, PQfnumber(res, "slot_name")); + caught_up = (strcmp(PQgetvalue(res, i, PQfnumber(res, "caught_up")), "t") == 0); + + /* Sanity check */ + if (strcmp(slotname, s->slotname) != 0) + pg_fatal("tried to update logical slot \"%s\", expected \"%s\"", + slotname, s->slotname); + + s->caught_up = caught_up; + } +} /* * count_old_cluster_logical_slots() diff --git a/src/bin/pg_upgrade/t/003_logical_slots.pl b/src/bin/pg_upgrade/t/003_logical_slots.pl index b9abc3a2e21..d666bbc7518 100644 --- a/src/bin/pg_upgrade/t/003_logical_slots.pl +++ b/src/bin/pg_upgrade/t/003_logical_slots.pl @@ -64,6 +64,7 @@ $oldpub->safe_psql( 'postgres', qq[ SELECT pg_create_logical_replication_slot('test_slot1', 'test_decoding'); SELECT pg_create_logical_replication_slot('test_slot2', 'test_decoding'); + SELECT pg_create_logical_replication_slot('test_slot3', 'test_decoding'); ]); $oldpub->stop(); @@ -77,7 +78,7 @@ command_checks_all( [@pg_upgrade_cmd], 1, [ - qr/"max_replication_slots" \(1\) must be greater than or equal to the number of logical replication slots \(2\) on the old cluster/ + qr/"max_replication_slots" \(1\) must be greater than or equal to the number of logical replication slots \(3\) on the old cluster/ ], [qr//], 'run of pg_upgrade where the new cluster has insufficient "max_replication_slots"' @@ -85,29 +86,31 @@ command_checks_all( ok(-d $newpub->data_dir . "/pg_upgrade_output.d", "pg_upgrade_output.d/ not removed after pg_upgrade failure"); -# Set 'max_replication_slots' to match the number of slots (2) present on the +# Set 'max_replication_slots' to match the number of slots (3) present on the # old cluster. Both slots will be used for subsequent tests. -$newpub->append_conf('postgresql.conf', "max_replication_slots = 2"); +$newpub->append_conf('postgresql.conf', "max_replication_slots = 3"); # ------------------------------ # TEST: Confirm pg_upgrade fails when the slot still has unconsumed WAL records # Preparations for the subsequent test: -# 1. Generate extra WAL records. At this point neither test_slot1 nor -# test_slot2 has consumed them. +# 1. Generate extra WAL records. At this point none of slots has consumed them. # # 2. Advance the slot test_slot2 up to the current WAL location, but test_slot1 # still has unconsumed WAL records. # # 3. Emit a non-transactional message. This will cause test_slot2 to detect the # unconsumed WAL record. +# +# 4. Advance the slot test_slots3 up to the current WAL location. $oldpub->start; $oldpub->safe_psql( 'postgres', qq[ CREATE TABLE tbl AS SELECT generate_series(1, 10) AS a; SELECT pg_replication_slot_advance('test_slot2', pg_current_wal_lsn()); - SELECT count(*) FROM pg_logical_emit_message('false', 'prefix', 'This is a non-transactional message'); + SELECT count(*) FROM pg_logical_emit_message('false', 'prefix', 'This is a non-transactional message', true); + SELECT pg_replication_slot_advance('test_slot3', pg_current_wal_lsn()); ]); $oldpub->stop; @@ -138,8 +141,9 @@ find( }, $newpub->data_dir . "/pg_upgrade_output.d"); -# Check the file content. Both slots should be reporting that they have -# unconsumed WAL records. +# Check the file content. While both test_slot1 and test_slot2 should be reporting +# that they have unconsumed WAL records, test_slot3 should not be reproted as +# it has caught up. like( slurp_file($slots_filename), qr/The slot \"test_slot1\" has not consumed the WAL yet/m, @@ -148,6 +152,10 @@ like( slurp_file($slots_filename), qr/The slot \"test_slot2\" has not consumed the WAL yet/m, 'the previous test failed due to unconsumed WALs'); +unlike( + slurp_file($slots_filename), + qr/test_slot3/m, + 'caught-up slot is not reported'); # ------------------------------ @@ -162,6 +170,7 @@ $oldpub->safe_psql( 'postgres', qq[ SELECT * FROM pg_drop_replication_slot('test_slot1'); SELECT * FROM pg_drop_replication_slot('test_slot2'); + SELECT * FROM pg_drop_replication_slot('test_slot3'); CREATE PUBLICATION regress_pub FOR ALL TABLES; ]); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 2ac69bf2df5..e92eb21cfc0 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11833,7 +11833,7 @@ prosrc => 'binary_upgrade_set_next_pg_tablespace_oid' }, { oid => '6312', descr => 'for use by pg_upgrade', proname => 'binary_upgrade_logical_slot_has_caught_up', provolatile => 'v', - proparallel => 'u', prorettype => 'bool', proargtypes => 'name', + proparallel => 'u', prorettype => 'pg_lsn', proargtypes => 'name', prosrc => 'binary_upgrade_logical_slot_has_caught_up' }, { oid => '6319', descr => 'for use by pg_upgrade (relation for pg_subscription_rel)', diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 5b43e181135..ad00c080372 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -148,7 +148,8 @@ extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId extern void ResetLogicalStreamingState(void); extern void UpdateDecodingStats(LogicalDecodingContext *ctx); -extern bool LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal); +extern bool LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal, + XLogRecPtr *last_pending_wal_p); extern XLogRecPtr LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto, bool *found_consistent_snapshot); -- 2.47.3