From 1a21430ada5c5cb8c065d8b4fa7fe51eaa24c5e0 Mon Sep 17 00:00:00 2001 From: Ashutosh Sharma Date: Thu, 12 Mar 2026 12:11:06 +0000 Subject: [PATCH] Add FIRST N and ANY N syntax support to synchronized_standby_slots Extend synchronized_standby_slots to support the same priority and quorum syntax as synchronous_standby_names: FIRST N (slot1, slot2, ...) - priority mode ANY N (slot1, slot2, ...) - quorum mode This allows logical replication availability to align with the commit durability guarantees provided by synchronous replication when using priority or quorum configurations. Implementation details The GUC parser reuses the existing syncrep_yyparse infrastructure to parse FIRST N and ANY N syntax. The behavior differs slightly from synchronous_standby_names in the following ways: 1. Plain list (slot1, slot2) In synchronized_standby_slots, this requires all listed slots to catch up, whereas in synchronous_standby_names it is equivalent to FIRST 1. This behavior is preserved for backward compatibility and because logical replication has different availability semantics. 2. FIRST N (priority mode) - Selects the first N slots according to the configured priority - Skips unavailable slots (missing, invalidated, or inactive) - Waits for valid but lagging slots 3. ANY N (quorum mode) - Allows any N slots that have caught up - Skips problematic slots (unavailable or lagging) - Maximizes availability when N-of-M durability is sufficient StandbySlotsHaveCaughtup() implements these semantics by distinguishing between two slot states: Unavailable slots: missing, logical, invalidated, or inactive Lagging slots: valid and active but not yet caught up A comprehensive test suite validates the behavior of all three modes, including fail-fast behavior, skip semantics, and GUC validation. Author: Satya Narlapuram Author: Ashutosh Sharma Reviewed-by: Shveta Malik Reviewed-by: Ajin Cherian --- doc/src/sgml/config.sgml | 75 ++- src/backend/replication/slot.c | 485 +++++++++++++----- .../052_synchronized_standby_slots_quorum.pl | 444 ++++++++++++++++ 3 files changed, 876 insertions(+), 128 deletions(-) create mode 100644 src/test/recovery/t/052_synchronized_standby_slots_quorum.pl diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 8cdd826fbd3..3e6e8d85999 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4964,17 +4964,72 @@ ANY num_sync ( : + +[FIRST] num_sync ( slot_name [, ...] ) +ANY num_sync ( slot_name [, ...] ) +slot_name [, ...] + + where num_sync is + the number of physical replication slots that must confirm WAL + receipt before logical decoding proceeds, + and slot_name + is the name of a physical replication slot. + num_sync + must be an integer value greater than zero and must not exceed the + number of listed slots. + + + The keyword FIRST, coupled with + num_sync, specifies + priority-based semantics. Logical decoding will wait for the first + num_sync available + physical slots in priority order (the order they appear in the list). + If a slot is missing or invalidated, it will be skipped. However, if + a slot exists and is valid but has not yet caught up, the system will + wait for it rather than skipping to lower-priority slots. + + + A plain comma-separated list without a keyword specifies that + all listed physical slots must confirm WAL + receipt. This differs from + where a simple list means FIRST 1. For + synchronized_standby_slots, requiring all slots + provides safer failover semantics by default. + + + The keyword ANY, coupled with + num_sync, specifies + quorum-based semantics. Logical decoding proceeds once at least + num_sync of the listed + slots have caught up. Missing, invalidated, or lagging slots are + skipped when looking for the required number of caught-up slots. + For example, a setting of ANY 1 (sb1_slot, sb2_slot) + will allow logical decoding to proceed as soon as either physical slot + has confirmed WAL receipt. This is useful in conjunction with + quorum-based synchronous replication + (synchronous_standby_names = 'ANY ...'), so that + logical decoding availability matches the commit durability guarantee. + + + FIRST and ANY are case-insensitive. + If these keywords are used as the name of a replication slot, + the slot_name must + be double-quoted. + + + This guarantees that logical replication failover slots do not consume changes until those changes are received - and flushed to corresponding physical standbys. If a + and flushed to the required physical standbys. If a logical replication connection is meant to switch to a physical standby after the standby is promoted, the physical replication slot for the standby should be listed here. Note that logical replication will not - proceed if the slots specified in the - synchronized_standby_slots do not exist or are invalidated. + proceed if the slots specified in + synchronized_standby_slots do not exist or are + invalidated. Additionally, the replication management functions pg_replication_slot_advance, @@ -4982,9 +5037,9 @@ ANY num_sync ( pg_logical_slot_peek_changes, - when used with logical failover slots, will block until all - physical slots specified in synchronized_standby_slots have - confirmed WAL receipt. + when used with logical failover slots, will block until the required + physical slots specified in synchronized_standby_slots + have confirmed WAL receipt. The standbys corresponding to the physical replication slots in diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index a9092fc2382..d3b6701e863 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -50,6 +50,7 @@ #include "replication/logicallauncher.h" #include "replication/slotsync.h" #include "replication/slot.h" +#include "replication/syncrep.h" #include "replication/walsender_private.h" #include "storage/fd.h" #include "storage/ipc.h" @@ -90,11 +91,18 @@ typedef struct ReplicationSlotOnDisk * Note: this must be a flat representation that can be held in a single chunk * of guc_malloc'd memory, so that it can be stored as the "extra" data for the * synchronized_standby_slots GUC. + * + * The layout mirrors SyncRepConfigData so that the same quorum / priority + * semantics can be expressed. The syncrep_method field uses the + * SYNC_REP_PRIORITY and SYNC_REP_QUORUM constants from syncrep.h. */ typedef struct { - /* Number of slot names in the slot_names[] */ - int nslotnames; + int config_size; /* total size of this struct, in bytes */ + int num_sync; /* number of slots that must confirm WAL + * receipt before logical decoding proceeds */ + uint8 syncrep_method; /* SYNC_REP_PRIORITY or SYNC_REP_QUORUM */ + int nslotnames; /* number of slot names that follow */ /* * slot_names contains 'nslotnames' consecutive null-terminated C strings. @@ -102,6 +110,27 @@ typedef struct char slot_names[FLEXIBLE_ARRAY_MEMBER]; } SyncStandbySlotsConfigData; +/* + * State of a replication slot specified in synchronized_standby_slots GUC. + */ +typedef enum +{ + SS_SLOT_OK, /* slot is valid and can participate */ + SS_SLOT_NOT_FOUND, /* slot does not exist */ + SS_SLOT_LOGICAL, /* slot is logical, not physical */ + SS_SLOT_INVALIDATED, /* slot has been invalidated */ + SS_SLOT_INACTIVE /* slot is inactive (standby not connected) */ +} SyncStandbySlotsState; + +/* + * Information about a synchronized standby slot's state. + */ +typedef struct +{ + const char *slot_name; /* name of the slot */ + SyncStandbySlotsState state; /* state of the slot */ +} SyncStandbySlotsStateInfo; + /* * Lookup table for slot invalidation causes. */ @@ -181,6 +210,7 @@ static void ReplicationSlotDropPtr(ReplicationSlot *slot); static void RestoreSlotFromDisk(const char *name); static void CreateSlotOnDisk(ReplicationSlot *slot); static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel); +static bool IsExplicitFirstSyncStandbySlotsSyntax(const char *value); /* * Report shared-memory space needed by ReplicationSlotsShmemInit. @@ -2947,94 +2977,180 @@ GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause) } /* - * A helper function to validate slots specified in GUC synchronized_standby_slots. + * Return true if value starts with explicit FIRST syntax: * - * The rawname will be parsed, and the result will be saved into *elemlist. + * FIRST N (...) + * + * This is used to distinguish explicit FIRST from simple list syntax whose + * first slot name may start with "first". */ static bool -validate_sync_standby_slots(char *rawname, List **elemlist) +IsExplicitFirstSyncStandbySlotsSyntax(const char *value) { - /* Verify syntax and parse string into a list of identifiers */ - if (!SplitIdentifierString(rawname, ',', elemlist)) - { - GUC_check_errdetail("List syntax is invalid."); + const char *p = value; + + /* Skip leading whitespace */ + while (*p && isspace((unsigned char) *p)) + p++; + + /* Must start with FIRST keyword */ + if (pg_strncasecmp(p, "FIRST", 5) != 0) return false; - } - /* Iterate the list to validate each slot name */ - foreach_ptr(char, name, *elemlist) - { - int err_code; - char *err_msg = NULL; - char *err_hint = NULL; + p += 5; - if (!ReplicationSlotValidateNameInternal(name, false, &err_code, - &err_msg, &err_hint)) - { - GUC_check_errcode(err_code); - GUC_check_errdetail("%s", err_msg); - if (err_hint != NULL) - GUC_check_errhint("%s", err_hint); - return false; - } - } + /* FIRST must be followed by whitespace, then a positive integer */ + if (!isspace((unsigned char) *p)) + return false; - return true; + while (*p && isspace((unsigned char) *p)) + p++; + + if (!isdigit((unsigned char) *p)) + return false; + + while (*p && isdigit((unsigned char) *p)) + p++; + + /* Explicit FIRST syntax then requires a parenthesized member list */ + while (*p && isspace((unsigned char) *p)) + p++; + + return (*p == '('); } /* * GUC check_hook for synchronized_standby_slots + * + * This reuses the syncrep_yyparse / syncrep_scanner infrastructure that is + * also used for synchronous_standby_names, so the same syntax is accepted: + * + * slot1, slot2 -- wait for ALL listed slots + * ANY N (slot1, slot2, ...) -- wait for any N-of-M (quorum) + * FIRST N (slot1, slot2, ...) -- wait for first N in priority order + * + * Note: Simple list syntax is interpreted as "wait for ALL" for this GUC, + * unlike synchronous_standby_names where it means "FIRST 1". + * + * After parsing, we additionally validate every name as a legal replication + * slot name. */ bool check_synchronized_standby_slots(char **newval, void **extra, GucSource source) { - char *rawname; - char *ptr; - List *elemlist; - int size; - bool ok; - SyncStandbySlotsConfigData *config; - - if ((*newval)[0] == '\0') - return true; + if (*newval != NULL && (*newval)[0] != '\0') + { + yyscan_t scanner; + int parse_rc; + SyncStandbySlotsConfigData *config; + const char *mname; + + /* Result of parsing is returned in one of these two variables */ + SyncRepConfigData *syncrep_parse_result = NULL; + char *syncrep_parse_error_msg = NULL; + + /* Parse the synchronized standby slots configuration */ + syncrep_scanner_init(*newval, &scanner); + parse_rc = syncrep_yyparse(&syncrep_parse_result, + &syncrep_parse_error_msg, + scanner); + syncrep_scanner_finish(scanner); + + if (parse_rc != 0 || syncrep_parse_result == NULL) + { + GUC_check_errcode(ERRCODE_SYNTAX_ERROR); + if (syncrep_parse_error_msg) + GUC_check_errdetail("%s", syncrep_parse_error_msg); + else + GUC_check_errdetail("\"%s\" parser failed.", + "synchronized_standby_slots"); + return false; + } - /* Need a modifiable copy of the GUC string */ - rawname = pstrdup(*newval); + if (syncrep_parse_result->num_sync <= 0) + { + GUC_check_errmsg("number of synchronized standby slots (%d) must be greater than zero", + syncrep_parse_result->num_sync); + return false; + } - /* Now verify if the specified slots exist and have correct type */ - ok = validate_sync_standby_slots(rawname, &elemlist); + /* + * When using simple list syntax (e.g., "slot1, slot2"), the parser + * returns num_sync=1 and SYNC_REP_PRIORITY. We interpret this as + * "wait for ALL slots" by setting num_sync to nmembers. + * + * This differs from synchronous_standby_names where "standby1, standby2" + * means "wait for first 1 standby". For synchronized_standby_slots, + * requiring all slots provides safer failover semantics by default. + * + * To distinguish simple list from explicit "FIRST N (...)", check + * whether the value starts with the FIRST keyword (after whitespace). + */ + if (syncrep_parse_result->num_sync == 1 && + syncrep_parse_result->syncrep_method == SYNC_REP_PRIORITY && + syncrep_parse_result->nmembers > 1) + { + if (!IsExplicitFirstSyncStandbySlotsSyntax(*newval)) + syncrep_parse_result->num_sync = syncrep_parse_result->nmembers; + } - if (!ok || elemlist == NIL) - { - pfree(rawname); - list_free(elemlist); - return ok; - } + /* Reject num_sync > nmembers — it can never be satisfied */ + if (syncrep_parse_result->num_sync > syncrep_parse_result->nmembers) + { + GUC_check_errmsg("number of synchronized standby slots (%d) must not exceed the number of listed slots (%d)", + syncrep_parse_result->num_sync, + syncrep_parse_result->nmembers); + return false; + } - /* Compute the size required for the SyncStandbySlotsConfigData struct */ - size = offsetof(SyncStandbySlotsConfigData, slot_names); - foreach_ptr(char, slot_name, elemlist) - size += strlen(slot_name) + 1; + /* validate every member name as a slot name */ + mname = syncrep_parse_result->member_names; - /* GUC extra value must be guc_malloc'd, not palloc'd */ - config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size); - if (!config) - return false; + for (int i = 0; i < syncrep_parse_result->nmembers; i++) + { + int err_code; + char *err_msg = NULL; + char *err_hint = NULL; - /* Transform the data into SyncStandbySlotsConfigData */ - config->nslotnames = list_length(elemlist); + if (!ReplicationSlotValidateNameInternal(mname, false, &err_code, + &err_msg, &err_hint)) + { + GUC_check_errcode(err_code); + GUC_check_errdetail("%s", err_msg); + if (err_hint != NULL) + GUC_check_errhint("%s", err_hint); + return false; + } - ptr = config->slot_names; - foreach_ptr(char, slot_name, elemlist) - { - strcpy(ptr, slot_name); - ptr += strlen(slot_name) + 1; - } + mname += strlen(mname) + 1; + } + + /* + * Build SyncStandbySlotsConfigData from the parsed SyncRepConfigData. + * Since the structures have identical layout, we can use the same + * config_size. + */ + config = (SyncStandbySlotsConfigData *) + guc_malloc(LOG, syncrep_parse_result->config_size); + if (!config) + return false; - *extra = config; + config->config_size = syncrep_parse_result->config_size; + config->num_sync = syncrep_parse_result->num_sync; + config->syncrep_method = syncrep_parse_result->syncrep_method; + config->nslotnames = syncrep_parse_result->nmembers; + + /* Copy all slot names in one operation */ + memcpy(config->slot_names, + syncrep_parse_result->member_names, + syncrep_parse_result->config_size - + offsetof(SyncRepConfigData, member_names)); + + *extra = config; + } + else + *extra = NULL; - pfree(rawname); - list_free(elemlist); return true; } @@ -3083,18 +3199,38 @@ SlotExistsInSyncStandbySlots(const char *slot_name) } /* - * Return true if the slots specified in synchronized_standby_slots have caught up to - * the given WAL location, false otherwise. + * Return true if the required standby slots have caught up to the given WAL + * location, false otherwise. + * + * The behavior depends on the synchronized_standby_slots configuration: + * + * Simple list (e.g., "slot1, slot2"): + * ALL slots must be present, valid, and caught up. Returns false + * immediately if any slot is missing, invalid, or lagging. + * + * FIRST N (e.g., "FIRST 2 (slot1, slot2, slot3)"): + * Wait for the first N available slots in priority order. Skips over + * missing/invalid slots to find N available ones. If a valid slot is + * lagging, waits for it (does not skip to lower priority slots). + * + * ANY N (e.g., "ANY 2 (slot1, slot2, slot3)"): + * Wait for any N available slots. Skips missing/invalid/lagging slots + * to find N slots that have caught up. * - * The elevel parameter specifies the error level used for logging messages - * related to slots that do not exist, are invalidated, or are inactive. + * The elevel parameter specifies the error level used for reporting issues + * related to the slots specified in synchronized_standby_slots when the + * catch-up requirement is not met. */ bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) { const char *name; int caught_up_slot_num = 0; + int required; XLogRecPtr min_restart_lsn = InvalidXLogRecPtr; + bool wait_for_all; + SyncStandbySlotsStateInfo *slot_states; + int num_slot_states = 0; /* * Don't need to wait for the standbys to catch up if there is no value in @@ -3118,12 +3254,42 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) ss_oldest_flush_lsn >= wait_for_lsn) return true; + /* + * Determine how many slots are required and whether we're in "wait for + * ALL" mode versus "wait for N-of-M" mode. + * + * wait_for_all = true means we need ALL slots to be ready (simple + * list syntax like "slot1, slot2"). In this mode, we stop checking + * on the first slot with a non-OK state or the first lagging slot. + * + * wait_for_all = false means we select N from M candidates (FIRST N or + * ANY N syntax). In this mode, we can skip over slots with non-OK states. + * In FIRST N mode, we only skip non-OK slots and wait for the lagging slot + * with higher priority. In ANY N mode, we skip non-OK slots and lagging + * slots to find any N that have caught up. + */ + required = synchronized_standby_slots_config->num_sync; + wait_for_all = (required == synchronized_standby_slots_config->nslotnames); + + /* + * Allocate array to track slot states. Size it to the total number of + * configured slots since in the worst case all could have non-OK states. + */ + slot_states = (SyncStandbySlotsStateInfo *) + palloc(sizeof(SyncStandbySlotsStateInfo) * synchronized_standby_slots_config->nslotnames); + /* * To prevent concurrent slot dropping and creation while filtering the * slots, take the ReplicationSlotControlLock outside of the loop. */ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + /* + * Iterate through configured slots, checking their state and tracking + * how many have caught up. Non-OK states (missing, logical, invalidated, + * inactive) are recorded for deferred reporting - messages are only + * emitted if the catch-up requirement isn't met. + */ name = synchronized_standby_slots_config->slot_names; for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++) { @@ -3134,35 +3300,28 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) slot = SearchNamedReplicationSlot(name, false); - /* - * If a slot name provided in synchronized_standby_slots does not - * exist, report a message and exit the loop. - */ if (!slot) { - ereport(elevel, - errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist", - name, "synchronized_standby_slots"), - errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".", - name), - errhint("Create the replication slot \"%s\" or amend parameter \"%s\".", - name, "synchronized_standby_slots")); - break; + /*Record Slot State */ + slot_states[num_slot_states].slot_name = name; + slot_states[num_slot_states].state = SS_SLOT_NOT_FOUND; + num_slot_states++; + + if (wait_for_all) + break; + goto next_slot; } - /* Same as above: if a slot is not physical, exit the loop. */ if (SlotIsLogical(slot)) { - ereport(elevel, - errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"", - name, "synchronized_standby_slots"), - errdetail("Logical replication is waiting for correction on replication slot \"%s\".", - name), - errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".", - name, "synchronized_standby_slots")); - break; + /*Record Slot State */ + slot_states[num_slot_states].slot_name = name; + slot_states[num_slot_states].state = SS_SLOT_LOGICAL; + num_slot_states++; + + if (wait_for_all) + break; + goto next_slot; } SpinLockAcquire(&slot->mutex); @@ -3173,33 +3332,40 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) if (invalidated) { - /* Specified physical slot has been invalidated */ - ereport(elevel, - errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated", - name, "synchronized_standby_slots"), - errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".", - name), - errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".", - name, "synchronized_standby_slots")); - break; + /*Record Slot State */ + slot_states[num_slot_states].slot_name = name; + slot_states[num_slot_states].state = SS_SLOT_INVALIDATED; + num_slot_states++; + + if (wait_for_all) + break; + goto next_slot; + } + + if (inactive) + { + /*Record Slot State */ + slot_states[num_slot_states].slot_name = name; + slot_states[num_slot_states].state = SS_SLOT_INACTIVE; + num_slot_states++; + + if (wait_for_all) + break; + goto next_slot; } if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn) { - /* Log a message if no active_pid for this physical slot */ - if (inactive) - ereport(elevel, - errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid", - name, "synchronized_standby_slots"), - errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".", - name), - errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".", - name, "synchronized_standby_slots")); - - /* Continue if the current slot hasn't caught up. */ - break; + /* + * Slot exists, is valid, and has an active connection, but + * hasn't caught up yet (it's lagging). + * In ALL mode: must wait for it. + * In FIRST N (priority) mode: must wait for this higher priority slot. + * In ANY N (quorum) mode: can skip and use another slot. + */ + if (wait_for_all || synchronized_standby_slots_config->syncrep_method == SYNC_REP_PRIORITY) + break; + goto next_slot; } Assert(restart_lsn >= wait_for_lsn); @@ -3210,17 +3376,99 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) caught_up_slot_num++; + /* + * Stop processing if the required number of slots have caught up. + * In priority mode (FIRST N), this ensures we select the first N + * slots in sequential order. In quorum mode (ANY N), we still + * process in order for efficiency, stopping once we find any N. + */ + if (caught_up_slot_num >= required) + break; + +next_slot: name += strlen(name) + 1; } LWLockRelease(ReplicationSlotControlLock); /* - * Return false if not all the standbys have caught up to the specified - * WAL location. + * If the required number of slots have not caught up, report any + * recorded non-OK states and return false. + * + * We only emit messages when the requirement is not met to avoid + * misleading messages in quorum/priority mode where other slots may + * have satisfied the condition despite some slots having issues. */ - if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames) + if (caught_up_slot_num < required) + { + for (int i = 0; i < num_slot_states; i++) + { + const char *slot_name = slot_states[i].slot_name; + + switch (slot_states[i].state) + { + case SS_SLOT_NOT_FOUND: + ereport(elevel, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist", + slot_name, "synchronized_standby_slots"), + errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".", + slot_name), + errhint("Create the replication slot \"%s\" or amend parameter \"%s\".", + slot_name, "synchronized_standby_slots")); + break; + + case SS_SLOT_LOGICAL: + ereport(elevel, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"", + slot_name, "synchronized_standby_slots"), + errdetail("Logical replication is waiting for correction on replication slot \"%s\".", + slot_name), + errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".", + slot_name, "synchronized_standby_slots")); + break; + + case SS_SLOT_INVALIDATED: + ereport(elevel, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated", + slot_name, "synchronized_standby_slots"), + errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".", + slot_name), + errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".", + slot_name, "synchronized_standby_slots")); + break; + + case SS_SLOT_INACTIVE: + ereport(elevel, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid", + slot_name, "synchronized_standby_slots"), + errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".", + slot_name), + errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".", + slot_name, "synchronized_standby_slots")); + break; + + default: + /* Should not happen */ + Assert(false); + break; + } + } + + /* If no specific slot state issues but requirement not met (slots still lagging) */ + if (num_slot_states == 0) + { + ereport(elevel, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("waiting for physical standbys corresponding to synchronized standby slots, some slots have not caught up")); + } + + pfree(slot_states); return false; + } /* The ss_oldest_flush_lsn must not retreat. */ Assert(!XLogRecPtrIsValid(ss_oldest_flush_lsn) || @@ -3228,6 +3476,7 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) ss_oldest_flush_lsn = min_restart_lsn; + pfree(slot_states); return true; } diff --git a/src/test/recovery/t/052_synchronized_standby_slots_quorum.pl b/src/test/recovery/t/052_synchronized_standby_slots_quorum.pl new file mode 100644 index 00000000000..9dc728dbe1d --- /dev/null +++ b/src/test/recovery/t/052_synchronized_standby_slots_quorum.pl @@ -0,0 +1,444 @@ + +# Copyright (c) 2024-2026, PostgreSQL Global Development Group + +# Test synchronized_standby_slots with different syntax modes: +# - Plain list (ALL mode): slot1, slot2 +# - ANY N (quorum mode): ANY N (slot1, slot2, ...) +# - FIRST N (priority mode): FIRST N (slot1, slot2, ...) +# +# Setup: a 3-node cluster with one primary, two physical standbys, and a +# logical decoding client using a failover-enabled slot. +# +# | ----> standby1 (primary_slot_name = sb1_slot) +# primary ------| +# | ----> standby2 (primary_slot_name = sb2_slot) +# +# synchronous_standby_names = 'ANY 1 (standby1, standby2)' +# +# Test scenarios: +# +# A) Plain list 'sb1_slot, sb2_slot' (ALL mode) +# - Works when all slots are available +# - Blocks immediately if ANY slot is unavailable +# +# B) ANY 1 (sb1_slot, sb2_slot) (quorum mode) +# - Proceeds when at least N slots have caught up +# - Skips unavailable/invalid/lagging slots to find N available +# +# C) FIRST N (sb1_slot, sb2_slot) (priority mode) +# - Selects first N slots in priority order (list order) +# - Skips unavailable slots but waits for valid lagging slots +# - FIRST 1 works with one slot down (unlike plain list) + +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# --------------------------------------------------------------------------- +# 1. Create a primary with logical replication level, autovacuum off +# --------------------------------------------------------------------------- +my $primary = PostgreSQL::Test::Cluster->new('primary'); +$primary->init(allows_streaming => 'logical'); +$primary->append_conf( + 'postgresql.conf', qq{ +autovacuum = off +}); +$primary->start; + +# Physical replication slots for the two standbys +$primary->safe_psql('postgres', + "SELECT pg_create_physical_replication_slot('sb1_slot');"); +$primary->safe_psql('postgres', + "SELECT pg_create_physical_replication_slot('sb2_slot');"); +$primary->safe_psql('postgres', + "SELECT pg_create_physical_replication_slot('first_slot');"); + +# --------------------------------------------------------------------------- +# 2. Create standby1 and standby2 from a fresh backup +# --------------------------------------------------------------------------- +my $backup_name = 'base_backup'; +$primary->backup($backup_name); + +my $connstr = $primary->connstr; + +my $standby1 = PostgreSQL::Test::Cluster->new('standby1'); +$standby1->init_from_backup( + $primary, $backup_name, + has_streaming => 1, + has_restoring => 1); +$standby1->append_conf( + 'postgresql.conf', qq( +hot_standby_feedback = on +primary_slot_name = 'sb1_slot' +primary_conninfo = '$connstr dbname=postgres' +)); + +my $standby2 = PostgreSQL::Test::Cluster->new('standby2'); +$standby2->init_from_backup( + $primary, $backup_name, + has_streaming => 1, + has_restoring => 1); +$standby2->append_conf( + 'postgresql.conf', qq( +hot_standby_feedback = on +primary_slot_name = 'sb2_slot' +primary_conninfo = '$connstr dbname=postgres' +)); + +$standby1->start; +$standby2->start; + +$primary->wait_for_replay_catchup($standby1); +$primary->wait_for_replay_catchup($standby2); + +# --------------------------------------------------------------------------- +# 3. Create a logical failover slot on the primary +# --------------------------------------------------------------------------- +$primary->safe_psql('postgres', + "SELECT pg_create_logical_replication_slot('logical_failover', 'test_decoding', false, false, true);" +); + +# --------------------------------------------------------------------------- +# 4. Configure quorum sync rep with ALL-mode synchronized_standby_slots +# --------------------------------------------------------------------------- +$primary->append_conf( + 'postgresql.conf', qq{ +synchronous_standby_names = 'ANY 1 (standby1, standby2)' +synchronized_standby_slots = 'sb1_slot, sb2_slot' +}); +$primary->reload; + +$primary->wait_for_replay_catchup($standby1); +$primary->wait_for_replay_catchup($standby2); + +# --------------------------------------------------------------------------- +# 5. Confirm that quorum sync rep is active for both standbys +# --------------------------------------------------------------------------- +is( $primary->safe_psql( + 'postgres', + q{SELECT count(*) FROM pg_stat_replication WHERE sync_state = 'quorum';} + ), + '2', + 'both standbys are in quorum sync state'); + +################################################## +# PART A: Plain list (ALL mode) blocks when any slot is unavailable +################################################## + +$standby1->stop; + +# Commit succeeds since standby2 satisfies the quorum. +my $emit_lsn = $primary->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'qtest', 'all_mode_blocks');" +); +like($emit_lsn, qr/^[0-9A-F]+\/[0-9A-F]+$/, + 'synchronous commit succeeds with quorum (standby2 alive)'); + +$primary->wait_for_replay_catchup($standby2); + +my $log_offset = -s $primary->logfile; + +my $bg = $primary->background_psql( + 'postgres', + on_error_stop => 0, + timeout => $PostgreSQL::Test::Utils::timeout_default); + +$bg->query_until( + qr/decode_start/, q( + \echo decode_start + SELECT pg_logical_slot_peek_changes('logical_failover', NULL, NULL); +)); + +# Wait for the primary to log a warning about sb1_slot not being active. +$primary->wait_for_log( + qr/replication slot \"sb1_slot\" specified in parameter "synchronized_standby_slots" does not have active_pid/, + $log_offset); + +pass('plain list (ALL mode): logical decoding blocked by unavailable sb1_slot'); + +# Unblock by clearing synchronized_standby_slots. +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''"); +$primary->reload; +$bg->quit; + +# Consume the change so the slot is clean for the next test. +$primary->safe_psql('postgres', + q{SELECT pg_logical_slot_get_changes('logical_failover', NULL, NULL);}); + +################################################## +# PART B: ANY mode (quorum) — logical decoding proceeds with N-of-M slots +################################################## + +# Switch synchronized_standby_slots to quorum mode: need only 1 of 2 slots. +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', + "'ANY 1 (sb1_slot, sb2_slot)'"); +$primary->reload; + +# standby1 is still down; standby2 is up. + +# Emit another transactional message — commits via quorum. +$primary->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'qtest', 'quorum_mode_works');" +); +$primary->wait_for_replay_catchup($standby2); + +# In quorum mode, logical decoding should NOT block because sb2_slot has +# caught up and 1-of-2 is sufficient. +my $decoded = $primary->safe_psql('postgres', + q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL) + WHERE data LIKE '%quorum_mode_works%';}); +is($decoded, '1', + 'ANY mode: logical decoding proceeds with only sb2_slot caught up'); + +################################################## +# PART C: Verify plain list (ALL mode) requires ALL slots +################################################## + +# Bring standby1 back. +$standby1->start; +$primary->wait_for_replay_catchup($standby1); + +# Switch to plain list (ALL mode) with both slots. +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', + "'sb1_slot, sb2_slot'"); +$primary->reload; + +$primary->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'qtest', 'both_caught_up');" +); +$primary->wait_for_replay_catchup($standby1); +$primary->wait_for_replay_catchup($standby2); + +my $decoded_bc = $primary->safe_psql('postgres', + q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL) + WHERE data LIKE '%both_caught_up%';}); +is($decoded_bc, '1', + 'plain list: works when all standbys are up'); + +# Now stop standby1 and verify that plain list BLOCKS. +$standby1->stop; + +$log_offset = -s $primary->logfile; + +$bg = $primary->background_psql( + 'postgres', + on_error_stop => 0, + timeout => $PostgreSQL::Test::Utils::timeout_default); + +# Emit a message while sb1_slot is unavailable. +$primary->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'qtest', 'all_mode_blocks_sb1');" +); +$primary->wait_for_replay_catchup($standby2); + +$bg->query_until( + qr/decode_start/, q( + \echo decode_start + SELECT pg_logical_slot_peek_changes('logical_failover', NULL, NULL); +)); + +# Plain list requires ALL slots, so it should block on sb1_slot. +$primary->wait_for_log( + qr/replication slot \"sb1_slot\" specified in parameter "synchronized_standby_slots" does not have active_pid/, + $log_offset); + +pass('plain list: blocks when any slot is unavailable (requires ALL)'); + +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''"); +$primary->reload; +$bg->quit; + +# Consume the change for the next test. +$primary->safe_psql('postgres', + q{SELECT pg_logical_slot_get_changes('logical_failover', NULL, NULL);}); + +################################################## +# PART D: ANY mode — verify it works with either standby +################################################## + +# Bring standby1 back, switch to ANY 1. +$standby1->start; +$primary->wait_for_replay_catchup($standby1); + +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', + "'ANY 1 (sb1_slot, sb2_slot)'"); +$primary->reload; + +# Stop standby2 this time to test quorum with only standby1. +$standby2->stop; + +$primary->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'qtest', 'standby2_down');" +); + +$primary->wait_for_replay_catchup($standby1); + +# Decoding proceeds via quorum with only sb1_slot. +my $decoded_d1 = $primary->safe_psql('postgres', + q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL) + WHERE data LIKE '%standby2_down%';}); +is($decoded_d1, '1', + 'ANY mode: decoding works with only standby1 up'); + +# Bring standby2 back and verify decoding still works. +$standby2->start; +$primary->wait_for_replay_catchup($standby2); + +$primary->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'qtest', 'both_up_again');" +); +$primary->wait_for_replay_catchup($standby1); +$primary->wait_for_replay_catchup($standby2); + +my $decoded_d2 = $primary->safe_psql('postgres', + q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL) + WHERE data LIKE '%both_up_again%';}); +is($decoded_d2, '1', + 'ANY mode: decoding works when both standbys are up'); + +################################################## +# PART E: Verify FIRST N priority semantics +################################################## + +# FIRST N should: +# 1. Select first N slots in priority order (list order) +# 2. Skip unavailable slots to find N available ones +# 3. Wait for valid but lagging slots (not skip to lower priority) + +# Test FIRST 1 (sb1_slot, sb2_slot) with sb1_slot unavailable. +$standby1->stop; + +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', + "'FIRST 1 (sb1_slot, sb2_slot)'"); +$primary->reload; + +$primary->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'qtest', 'first_1_skip_unavailable');" +); +$primary->wait_for_replay_catchup($standby2); + +# FIRST 1 should skip sb1_slot (unavailable) and use sb2_slot. +my $decoded_e1 = $primary->safe_psql('postgres', + q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL) + WHERE data LIKE '%first_1_skip_unavailable%';}); +is($decoded_e1, '1', + 'FIRST 1: skips unavailable first slot, uses second slot'); + +# Test FIRST 2 (sb1_slot, sb2_slot) with both up — should wait for both. +$standby1->start; +$primary->wait_for_replay_catchup($standby1); + +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', + "'FIRST 2 (sb1_slot, sb2_slot)'"); +$primary->reload; + +$primary->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'qtest', 'first_2_both_up');" +); +$primary->wait_for_replay_catchup($standby1); +$primary->wait_for_replay_catchup($standby2); + +my $decoded_e2 = $primary->safe_psql('postgres', + q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL) + WHERE data LIKE '%first_2_both_up%';}); +is($decoded_e2, '1', + 'FIRST 2: decoding works when all required slots are up'); + +# Test that FIRST 1 is different from plain list — FIRST 1 succeeds with one down. +$standby1->stop; + +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', + "'FIRST 1 (sb1_slot, sb2_slot)'"); +$primary->reload; + +$primary->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'qtest', 'first_1_vs_all');" +); +$primary->wait_for_replay_catchup($standby2); + +my $decoded_e3 = $primary->safe_psql('postgres', + q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL) + WHERE data LIKE '%first_1_vs_all%';}); +is($decoded_e3, '1', + 'FIRST 1: works with one slot down (unlike plain list which requires ALL)'); + +# Bring standby1 back for subsequent tests. +$standby1->start; +$primary->wait_for_replay_catchup($standby1); + +################################################## +# PART F: Plain list with first-prefixed slot name still means ALL mode +################################################## + +# If simple-list syntax starts with a slot name like "first_slot", it must +# still be treated as ALL mode (not as explicit FIRST N syntax). +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', + "'first_slot, sb2_slot'"); +$primary->reload; + +$primary->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'qtest', 'first_prefix_all_mode_blocks');" +); +$primary->wait_for_replay_catchup($standby2); + +$log_offset = -s $primary->logfile; + +$bg = $primary->background_psql( + 'postgres', + on_error_stop => 0, + timeout => $PostgreSQL::Test::Utils::timeout_default); + +$bg->query_until( + qr/decode_start/, q( + \echo decode_start + SELECT pg_logical_slot_peek_changes('logical_failover', NULL, NULL); +)); + +# Plain list must require all listed slots; first_slot is intentionally inactive. +$primary->wait_for_log( + qr/replication slot \"first_slot\" specified in parameter "synchronized_standby_slots" does not have active_pid/, + $log_offset); + +pass('plain list with first-prefixed slot name blocks in ALL mode'); + +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''"); +$primary->reload; +$bg->quit; + +# Consume the change for the next test. +$primary->safe_psql('postgres', + q{SELECT pg_logical_slot_get_changes('logical_failover', NULL, NULL);}); + +################################################## +# PART G: Verify GUC validation rejects bad values +################################################## + +my ($result, $stdout, $stderr); + +# N exceeds number of listed slots +($result, $stdout, $stderr) = $primary->psql('postgres', + "ALTER SYSTEM SET synchronized_standby_slots = 'ANY 3 (sb1_slot, sb2_slot)';"); +like($stderr, qr/ERROR/, + 'GUC rejects ANY N when N > number of listed slots'); + +# Missing closing parenthesis +($result, $stdout, $stderr) = $primary->psql('postgres', + "ALTER SYSTEM SET synchronized_standby_slots = 'ANY 1 (sb1_slot, sb2_slot';"); +like($stderr, qr/ERROR/, + 'GUC rejects malformed ANY syntax'); + +# Invalid slot name +($result, $stdout, $stderr) = $primary->psql('postgres', + "ALTER SYSTEM SET synchronized_standby_slots = 'ANY 1 (INVALID_UPPER)';"); +like($stderr, qr/ERROR/, + 'GUC rejects invalid slot name in ANY syntax'); + +# --------------------------------------------------------------------------- +# Cleanup +# --------------------------------------------------------------------------- +$primary->safe_psql('postgres', + "SELECT pg_drop_replication_slot('logical_failover');"); + +done_testing(); -- 2.43.0