From 0c00760b73defd0523c5d695a28c6d126689c414 Mon Sep 17 00:00:00 2001 From: Ashutosh Sharma Date: Tue, 14 Apr 2026 09:39:05 +0000 Subject: [PATCH 2/2] Add FIRST N and N (...) priority syntax to synchronized_standby_slots Extend synchronized_standby_slots to support explicit priority forms aligned with synchronous_standby_names: - FIRST N (slot1, slot2, ...) - N (slot1, slot2, ...) as shorthand for FIRST N Implementation details: - Disambiguate explicit priority syntax from plain-list input so slot names prefixed with "first" are not misclassified. - Extend StandbySlotsHaveCaughtup() priority handling to: - select in list order - skip missing, logical, invalidated, and inactive lagging slots - wait for active lagging higher-priority slots Tests and docs: - Add coverage for FIRST behavior, shorthand N (...) behavior, and plain-list disambiguation with first-prefixed slot names. - Update docs for FIRST and shorthand priority syntax semantics. Author: Satya Narlapuram Author: Ashutosh Sharma Reviewed-by: Shveta Malik Reviewed-by: Ajin Cherian Reviewed-by: Hou, Zhijie Reviewed-by: Dilip Kumar Reviewed-by: Surya Poondla Reviewed-by: Japin Li --- doc/src/sgml/config.sgml | 29 ++- src/backend/replication/slot.c | 97 +++++++- .../053_synchronized_standby_slots_quorum.pl | 212 +++++++++++++++++- 3 files changed, 322 insertions(+), 16 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 3ad7d09c28b..0bf1b7ab000 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -5158,6 +5158,7 @@ ANY num_sync ( : +[FIRST] num_sync ( slot_name [, ...] ) ANY num_sync ( slot_name [, ...] ) slot_name [, ...] @@ -5170,6 +5171,25 @@ ANY num_sync ( num_sync, specifies + priority-based semantics. Logical decoding will wait for the first + num_sync available + physical slots in priority order (the order they appear in the list). + Missing, logical, or invalidated slots are skipped. Inactive slots are + skipped only while they are lagging. However, if a slot exists and is + valid and active but has not yet caught up, the system will wait for it + rather than skipping to lower-priority slots. If, after skipping + unusable slots, fewer than + num_sync usable slots + remain, logical decoding waits until enough slots become usable and + caught up, or until the configuration is changed. The keyword + FIRST is + optional in this form, so + 2 (slot1, slot2, slot3) and + FIRST 2 (slot1, slot2, slot3) are equivalent. + A plain comma-separated list without a keyword specifies that all listed physical slots must confirm WAL @@ -5200,9 +5220,12 @@ ANY num_sync ( slot_name must + be double-quoted. + The use of synchronized_standby_slots guarantees that logical replication diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 38c36850349..38826fa4a41 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -222,6 +222,7 @@ static void ReplicationSlotDropPtr(ReplicationSlot *slot); static void RestoreSlotFromDisk(const char *name); static void CreateSlotOnDisk(ReplicationSlot *slot); static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel); +static bool IsPrioritySyncStandbySlotsSyntax(const char *value); /* * Register shared memory space needed for replication slots. @@ -2996,6 +2997,52 @@ GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause) return "none"; /* to keep compiler quiet */ } +/* + * Return true if value starts with explicit priority syntax: + * + * FIRST N (...) + * N (...) + * + * This is used to distinguish explicit priority syntax from simple list + * syntax whose first slot name may start with "first". + */ +static bool +IsPrioritySyncStandbySlotsSyntax(const char *value) +{ + const char *p = value; + + /* Skip leading whitespace */ + while (*p && isspace((unsigned char) *p)) + p++; + + /* + * Accept either explicit FIRST N (...) or bare N (...) priority syntax. + * If the input starts with FIRST, it must be followed by whitespace. + */ + if (pg_strncasecmp(p, "FIRST", 5) == 0) + { + p += 5; + + if (!isspace((unsigned char) *p)) + return false; + + while (*p && isspace((unsigned char) *p)) + p++; + } + + if (!isdigit((unsigned char) *p)) + return false; + + while (*p && isdigit((unsigned char) *p)) + p++; + + /* Explicit priority syntax then requires a parenthesized member list */ + while (*p && isspace((unsigned char) *p)) + p++; + + return (*p == '('); +} + /* * GUC check_hook for synchronized_standby_slots * @@ -3004,6 +3051,8 @@ GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause) * * slot1, slot2 -- wait for ALL listed slots * ANY N (slot1, slot2, ...) -- wait for any N-of-M (quorum) + * FIRST N (slot1, slot2, ...) -- wait for first N in priority order + * N (slot1, slot2, ...) -- shorthand for FIRST N * * Note: Simple list syntax is interpreted as "wait for ALL" for this GUC, * unlike synchronous_standby_names where it means "FIRST 1". @@ -3060,14 +3109,27 @@ check_synchronized_standby_slots(char **newval, void **extra, GucSource source) } /* - * For synchronized_standby_slots, a comma-separated list means all - * listed slots are required. The parser returns num_sync=1 in this - * shape, so map it to nmembers to enforce all-mode semantics. + * When using simple list syntax (e.g., "slot1, slot2"), the parser + * returns num_sync=1 and SYNC_REP_PRIORITY. We interpret this as + * "wait for ALL slots" by setting num_sync to nmembers. + * + * This differs from synchronous_standby_names where "standby1, standby2" + * means "wait for first 1 standby". For synchronized_standby_slots, + * requiring all slots provides safer failover semantics by default. */ if (syncrep_parse_result->num_sync == 1 && syncrep_parse_result->syncrep_method == SYNC_REP_PRIORITY && syncrep_parse_result->nmembers > 1) - syncrep_parse_result->num_sync = syncrep_parse_result->nmembers; + { + /* + * Distinguish simple list syntax from explicit priority syntax + * ("FIRST N (...)" or "N (...)"). This prevents a list whose + * first slot name starts with "first" (for example, + * "firstslot, secondslot") from being misclassified. + */ + if (!IsPrioritySyncStandbySlotsSyntax(*newval)) + syncrep_parse_result->num_sync = syncrep_parse_result->nmembers; + } /* validate every member name as a slot name */ mname = syncrep_parse_result->member_names; @@ -3258,6 +3320,12 @@ ReportUnavailableSyncStandbySlots(SyncStandbySlotsStateInfo *slot_states, * Simple list (e.g., "slot1, slot2"): * ALL slots must have caught up. Returns false otherwise. * + * FIRST N (e.g., "FIRST 2 (slot1, slot2, slot3)"): + * Wait for the first N eligible slots in priority order. Skips missing, + * invalid, logical, and inactive-lagging slots to find N eligible slots. + * If an active slot is lagging, waits for it (does not skip to lower + * priority slots). + * * ANY N (e.g., "ANY 2 (slot1, slot2, slot3)"): * Wait for any N eligible slots. Skips missing, invalid, logical, and * lagging slots (inactive or active) to find N slots that have caught up. @@ -3308,9 +3376,12 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) * on the first slot that is missing/invalid/logical, or the first slot * that is lagging (inactive or active). * - * wait_for_all = false means we select N from M candidates (ANY N syntax). - * In this mode, slots already caught up are counted even if inactive, and - * lagging slots are skipped until enough slots have caught up. + * wait_for_all = false means we select N from M candidates (FIRST N or + * ANY N syntax). In this mode, slots already caught up are counted even if + * inactive. In FIRST N mode, we skip missing/invalid/logical slots and + * lagging inactive slots, but wait for an active lagging slot with higher + * priority. In ANY N mode, we skip lagging slots (inactive or active) to + * find any N that have caught up. */ required = synchronized_standby_slots_config->num_sync; wait_for_all = (required == synchronized_standby_slots_config->nslotnames); @@ -3394,6 +3465,8 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) * If it is active and lagging, report it as lagging. * * In ALL mode: must wait for it. + * In FIRST N (priority) mode: lagging active slots block, while + * inactive slots can be skipped. * In ANY N (quorum) mode: skip and use another slot. */ slot_states[num_slot_states].slot_name = name; @@ -3403,7 +3476,8 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) slot_states[num_slot_states].restart_lsn = restart_lsn; num_slot_states++; - if (wait_for_all) + if (wait_for_all || + (!inactive && synchronized_standby_slots_config->syncrep_method == SYNC_REP_PRIORITY)) break; goto next_slot; } @@ -3416,7 +3490,12 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) caught_up_slot_num++; - /* Stop processing if the required number of slots have caught up. */ + /* + * Stop processing if the required number of slots have caught up. + * In priority mode (FIRST N), this ensures we select the first N + * slots in sequential order. In quorum mode (ANY N), we still + * process in order for efficiency, stopping once we find any N. + */ if (caught_up_slot_num >= required) break; diff --git a/src/test/recovery/t/053_synchronized_standby_slots_quorum.pl b/src/test/recovery/t/053_synchronized_standby_slots_quorum.pl index f182a8ccce9..cefc13c32fc 100644 --- a/src/test/recovery/t/053_synchronized_standby_slots_quorum.pl +++ b/src/test/recovery/t/053_synchronized_standby_slots_quorum.pl @@ -4,6 +4,7 @@ # Test synchronized_standby_slots with different syntax modes: # - Plain list (ALL mode): slot1, slot2 # - ANY N (quorum mode): ANY N (slot1, slot2, ...) +# - FIRST N (priority mode): FIRST N (slot1, slot2, ...) # # Setup: a 3-node cluster with one primary, two physical standbys, and a # logical decoding client using a failover-enabled slot. @@ -25,6 +26,12 @@ # - Skips missing/invalid/logical slots and lagging slots (inactive or active) # to find N caught-up slots # +# C) FIRST N (sb1_slot, sb2_slot) (priority mode) +# - Selects first N slots in priority order (list order) +# - Skips missing/invalid/logical slots and inactive lagging slots, +# but waits for active lagging slots +# - FIRST 1 works with one slot down (unlike plain list) + use strict; use warnings FATAL => 'all'; use PostgreSQL::Test::Cluster; @@ -237,18 +244,168 @@ is($decoded_bc, '1', 'plain list: works when all standbys are up'); ################################################## -# PART D: ANY 2 waits on an active lagging slot +# PART D: Verify FIRST N priority semantics ################################################## -# Bring standby1 back so sb1_slot can be controlled by a raw replication -# connection that keeps the slot active while lagging. +# FIRST N should: +# 1. Select first N slots in priority order (list order) +# 2. Skip missing/invalid/logical slots and inactive lagging slots to find +# N caught-up slots +# 3. Wait for active lagging slots (not skip to lower priority) + +# Test FIRST 2 (sb1_slot, sb2_slot) with both up — should wait for both. +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', + "'FIRST 2 (sb1_slot, sb2_slot)'"); +$primary->reload; + +$primary->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'qtest', 'first_2_both_up');" +); +$primary->wait_for_replay_catchup($standby1); +$primary->wait_for_replay_catchup($standby2); + +my $decoded_e2 = $primary->safe_psql('postgres', + q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL) + WHERE data LIKE '%first_2_both_up%';}); +is($decoded_e2, '1', + 'FIRST 2: decoding works when all required slots are up'); + +# Test FIRST 1 (sb1_slot, sb2_slot) with sb1_slot unavailable. +$standby1->stop; + +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', + "'FIRST 1 (sb1_slot, sb2_slot)'"); +$primary->reload; + +$primary->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'qtest', 'first_1_skip_unavailable');" +); +$primary->wait_for_replay_catchup($standby2); + +# FIRST 1 should skip sb1_slot (unavailable) and use sb2_slot. +my $decoded_e1 = $primary->safe_psql('postgres', + q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL) + WHERE data LIKE '%first_1_skip_unavailable%';}); +is($decoded_e1, '1', + 'FIRST 1: skips unavailable first slot, uses second slot'); + +# Test shorthand priority syntax: N (...) means FIRST N (...). +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', + "'1 (sb1_slot, sb2_slot)'"); +$primary->reload; + +$primary->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'qtest', 'num_1_shorthand_priority');" +); +$primary->wait_for_replay_catchup($standby2); + +my $decoded_num1 = $primary->safe_psql('postgres', + q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL) + WHERE data LIKE '%num_1_shorthand_priority%';}); +is($decoded_num1, '1', + '1 (...): shorthand priority syntax behaves like FIRST 1'); + +################################################## +# PART E: FIRST 1 and ANY 2 wait on an active lagging slot +################################################## + +# Bring standby1 back so sb1_slot is active and caught up. $standby1->start; $primary->wait_for_replay_catchup($standby1); + +# To test the active-but-lagging slot path deterministically, we open a raw +# replication connection to sb1_slot starting from a deliberately old LSN. +# psql in replication mode never sends Standby Status Update messages, so +# the walsender keeps sb1_slot's active_pid set but restart_lsn never +# advances. + +# Stop standby1 so its walsender releases sb1_slot, allowing our replication +# connection below to acquire it. $standby1->stop; +# Capture a safely old LSN to stream from — before the test WAL record. my $old_lsn = $primary->safe_psql('postgres', "SELECT pg_current_wal_lsn();"); +# FIRST 1 must wait for the highest-priority slot when it is active but lagging. +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', + "'FIRST 1 (sb1_slot, sb2_slot)'"); +$primary->reload; + +my $first_lag_lsn = $primary->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'qtest', 'first_1_lagging_blocks');" +); +$primary->wait_for_replay_catchup($standby2); + +# Open a raw replication connection to sb1_slot starting from $old_lsn. +# This activates the slot (active_pid IS NOT NULL) while keeping restart_lsn +# frozen below $first_lag_lsn for the lifetime of the connection. +my $repl_first = $primary->background_psql( + 'postgres', + replication => 'database', + on_error_stop => 0, + timeout => $PostgreSQL::Test::Utils::timeout_default); + +$repl_first->query_until( + qr/^$/, + "START_REPLICATION SLOT sb1_slot PHYSICAL $old_lsn;\n"); + +# Wait until sb1_slot shows active_pid, confirming the walsender is live. +$primary->poll_query_until('postgres', q{ + SELECT active_pid IS NOT NULL + FROM pg_replication_slots + WHERE slot_name = 'sb1_slot' +}) or die "replication connection did not activate sb1_slot"; + +# sb1_slot is now active and its restart_lsn is behind $first_lag_lsn. +# Start logical decoding in the background — it must block. +my $bg_first = $primary->background_psql( + 'postgres', + on_error_stop => 0, + timeout => $PostgreSQL::Test::Utils::timeout_default); + +$bg_first->query_until( + qr/decode_start/, q( + \echo decode_start + SELECT pg_logical_slot_peek_changes('logical_failover', NULL, NULL); +)); + +ok( $primary->poll_query_until( + 'postgres', q{ +SELECT EXISTS ( + SELECT 1 + FROM pg_stat_activity + WHERE wait_event = 'WaitForStandbyConfirmation' + AND query LIKE '%pg_logical_slot_peek_changes(''logical_failover''%' +); +}), + 'FIRST 1: decoding waits for active lagging higher-priority slot'); + +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''"); +$primary->reload; +$bg_first->quit; +$repl_first->quit; + +# Ensure the previous replication connection has fully released sb1_slot +# before reusing it in the next subtest. +$primary->poll_query_until('postgres', q{ + SELECT active_pid IS NULL + FROM pg_replication_slots + WHERE slot_name = 'sb1_slot' +}) or die "replication connection did not release sb1_slot"; + +# Consume the change so the slot is clean for the next test. +$primary->safe_psql('postgres', + q{SELECT pg_logical_slot_get_changes('logical_failover', NULL, NULL);}); + +# ANY 2 must also wait when only one of two required slots has caught up. +# Reuse the same technique: open a raw replication connection to sb1_slot +# from $old_lsn so it is active but its restart_lsn stays behind the target. + +# Capture another old LSN baseline before the next test WAL record. +$old_lsn = $primary->safe_psql('postgres', + "SELECT pg_current_wal_lsn();"); + $primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "'ANY 2 (sb1_slot, sb2_slot)'"); $primary->reload; @@ -311,7 +468,54 @@ $primary->wait_for_replay_catchup($standby1); ################################################## -# PART E: Verify GUC validation rejects bad values +# PART F: Plain list with first-prefixed slot name still means ALL mode +################################################## + +# Create a slot name starting with "first_" for parser disambiguation checks. +$primary->safe_psql('postgres', + "SELECT pg_create_physical_replication_slot('first_slot');"); + +# If simple-list syntax starts with a slot name like "first_slot", it must +# still be treated as ALL mode (not as explicit FIRST N syntax). +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', + "'first_slot, sb2_slot'"); +$primary->reload; + +$primary->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'qtest', 'first_prefix_all_mode_blocks');" +); +$primary->wait_for_replay_catchup($standby2); + +$log_offset = -s $primary->logfile; + +$bg = $primary->background_psql( + 'postgres', + on_error_stop => 0, + timeout => $PostgreSQL::Test::Utils::timeout_default); + +$bg->query_until( + qr/decode_start/, q( + \echo decode_start + SELECT pg_logical_slot_peek_changes('logical_failover', NULL, NULL); +)); + +# Plain list must require all listed slots; first_slot is intentionally inactive. +$primary->wait_for_log( + qr/replication slot \"first_slot\" specified in parameter "synchronized_standby_slots" does not have active_pid/, + $log_offset); + +pass('plain list with first-prefixed slot name blocks in ALL mode'); + +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''"); +$primary->reload; +$bg->quit; + +# Consume the change for the next test. +$primary->safe_psql('postgres', + q{SELECT pg_logical_slot_get_changes('logical_failover', NULL, NULL);}); + +################################################## +# PART G: Verify GUC validation rejects bad values ################################################## my ($result, $stdout, $stderr); -- 2.43.0