From 6f42ffd64e501aff09cb6d85ecb49fa27eddbccc Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Sun, 24 Sep 2023 23:45:33 -0400 Subject: [PATCH v20 1/2] Allow logical walsenders to wait for physical standbys --- doc/src/sgml/config.sgml | 42 ++++ src/backend/replication/slot.c | 169 +++++++++++++++- src/backend/replication/walsender.c | 188 ++++++++++++++++++ .../utils/activity/wait_event_names.txt | 1 + src/backend/utils/misc/guc_tables.c | 30 +++ src/backend/utils/misc/postgresql.conf.sample | 4 + src/include/replication/slot.h | 9 + src/include/utils/guc_hooks.h | 4 + src/test/recovery/meson.build | 1 + src/test/recovery/t/050_verify_slot_order.pl | 146 ++++++++++++++ 10 files changed, 593 insertions(+), 1 deletion(-) create mode 100644 src/test/recovery/t/050_verify_slot_order.pl diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 38684af5b1..46b0d1010e 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4344,6 +4344,24 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows + + standby_slot_names (string) + + standby_slot_names configuration parameter + + + + + List of physical replication slots that logical replication waits for. + Specify * to wait for all physical replication + slots. If a logical replication connection is meant to switch to a + physical standby after the standby is promoted, the physical + replication slot for the standby should be listed here. This ensures + that logical replication is not ahead of the physical standby. + + + + @@ -4492,6 +4510,30 @@ ANY num_sync ( + synchronize_slot_names (string) + + synchronize_slot_names configuration parameter + + + + + Specifies a list of logical replication slots that a streaming + replication standby should synchronize from the primary server. This is + necessary to be able to retarget those logical replication connections + to this standby if it gets promoted. Specify * to + synchronize all logical replication slots. The default is empty. On + primary, the logical walsenders associated with logical replication + slots specified in this parameter will wait for the standby servers + specified in parameter. In + other words, primary ensures those logical replication slots will + never get ahead of the standby servers. On standby server, the logical + replication slots specified are synchronized from the primary. Set this + parameter to same value on both primary and standby. + + + + diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 3ded3c1473..7370452d3b 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -52,6 +52,8 @@ #include "storage/proc.h" #include "storage/procarray.h" #include "utils/builtins.h" +#include "utils/guc_hooks.h" +#include "utils/varlena.h" /* * Replication slot on-disk data structure. @@ -98,9 +100,13 @@ ReplicationSlotCtlData *ReplicationSlotCtl = NULL; /* My backend's replication slot in the shared memory array */ ReplicationSlot *MyReplicationSlot = NULL; -/* GUC variable */ +/* GUC variables */ int max_replication_slots = 10; /* the maximum number of replication * slots */ +char *synchronize_slot_names; +char *standby_slot_names; +List *standby_slot_names_list = NIL; +List *synchronize_slot_names_list = NIL; static void ReplicationSlotShmemExit(int code, Datum arg); static void ReplicationSlotDropAcquired(void); @@ -2121,3 +2127,164 @@ RestoreSlotFromDisk(const char *name) (errmsg("too many replication slots active before shutdown"), errhint("Increase max_replication_slots and try again."))); } + +/* + * A helper function to simplify check_hook implementation for + * synchronize_slot_names and standby_slot_names GUCs. + */ +static bool +validate_slot_names(char **newval, List **elemlist) +{ + char *rawname; + + /* Need a modifiable copy of string */ + rawname = pstrdup(*newval); + + /* Parse string into list of identifiers */ + if (!SplitIdentifierString(rawname, ',', elemlist)) + { + /* syntax error in name list */ + GUC_check_errdetail("List syntax is invalid."); + pfree(rawname); + list_free(*elemlist); + return false; + } + + return true; +} + +/* + * A helper function to validate 'type'(logical/physical) of slots for + * synchronize_slot_names and standby_slot_names GUCs. + * + * The caller is expected to pass last argument as per the 'type' of slots + * expected for the concerned GUC. + * + * NOTE: The flow where synchronize_slot_names will be sent from physical + * standby to walsender is yet to be implemented. Then this function will + * be used there as well to validate type of 'synchronize_slot_names' and + * thus it is made generic to handle both logical=true/false. + */ +static bool +validate_slot_type(List *elemlist, bool logical) +{ + ListCell *lc; + + /* + * Skip check if replication slots' data is not initialized yet i.e. we + * are in startup process. + * + * TODO: analyze what to do in this case when we do not have slots-data. + */ + if (!ReplicationSlotCtl) + return true; + + foreach(lc, elemlist) + { + char *name = lfirst(lc); + ReplicationSlot *slot; + + slot = SearchNamedReplicationSlot(name, true); + + if (!slot) + { + GUC_check_errdetail("replication slot \"%s\" does not exist", name); + return false; + } + + /* If caller expects logical slot while we got physical, return error */ + if (logical && SlotIsPhysical(slot)) + { + GUC_check_errdetail("cannot have physical replication slot \"%s\" " + "in this parameter", name); + return false; + } + + /* If caller expects physical slot while we got logical, return error */ + if (!logical && SlotIsLogical(slot)) + { + GUC_check_errdetail("cannot have logical replication slot \"%s\" " + "in this parameter", name); + return false; + } + } + + return true; +} + +/* + * GUC check_hook for synchronize_slot_names + * + * TODO: Ideally synchronize_slot_names should be physical standby's GUC. + * It should be conveyed somehow by physical standby to primary. + */ +bool +check_synchronize_slot_names(char **newval, void **extra, GucSource source) +{ + List *elemlist; + + /* Special handling for "*" which means all. */ + if (strcmp(*newval, "*") == 0) + return true; + + if (strcmp(*newval, "") == 0) + return true; + + if (!validate_slot_names(newval, &elemlist)) + return false; + + list_free(elemlist); + return true; +} + +/* + * GUC check_hook for standby_slot_names + */ +bool +check_standby_slot_names(char **newval, void **extra, GucSource source) +{ + List *elemlist; + + /* Special handling for "*" which means all. */ + if (strcmp(*newval, "*") == 0) + return true; + + if (strcmp(*newval, "") == 0) + return true; + + if (!validate_slot_names(newval, &elemlist)) + return false; + + if (!validate_slot_type(elemlist, false /* physical slots expected */ )) + { + list_free(elemlist); + return false; + } + + list_free(elemlist); + return true; +} + +/* + * Initialize the lists from raw synchronize_slot_names and standby_slot_names + * and cache these, in order to avoid parsing these repeatedly. Done at + * WALSender startup and after each SIGHUP. + */ +void +SlotSyncInitConfig(void) +{ + char *rawname; + + if (strcmp(standby_slot_names, "") != 0) + { + rawname = pstrdup(standby_slot_names); + SplitIdentifierString(rawname, ',', &standby_slot_names_list); + } + + if ((strcmp(synchronize_slot_names, "") != 0 && + strcmp(synchronize_slot_names, "*") != 0)) + { + rawname = pstrdup(synchronize_slot_names); + SplitIdentifierString(rawname, ',', &synchronize_slot_names_list); + } +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index e250b0567e..14c8879aeb 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1318,6 +1318,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) replication_active = true; SyncRepInitConfig(); + SlotSyncInitConfig(); /* Main loop of walsender */ WalSndLoop(XLogSendLogical); @@ -1448,6 +1449,7 @@ ProcessPendingWrites(void) ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); SyncRepInitConfig(); + SlotSyncInitConfig(); } /* Try to flush pending output to the client */ @@ -1527,6 +1529,180 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId ProcessPendingWrites(); } +/* + * Does this Wal Sender need to wait for physical standby. + * + * Check if this logical walsender needs to wait for physical standby + * corresponding to physical slots specified in standby_slot_names GUC. + */ +static bool +WalSndWaitForStandbyNeeded() +{ + ListCell *l; + + Assert(MyReplicationSlot != NULL); + Assert(SlotIsLogical(MyReplicationSlot)); + + if (strcmp(standby_slot_names, "") == 0) + return false; + + /* + * Check if the slot associated with this logical walsender is asked to + * wait for physical standbys. + */ + if (strcmp(synchronize_slot_names, "") == 0) + return false; + + /* + * "*" means all logical walsenders should wait for physical standbys, + * else check if MyReplicationSlot is specified in synchronize_slot_names. + */ + if (strcmp(synchronize_slot_names, "*") != 0) + { + bool shouldwait = false; + + foreach(l, synchronize_slot_names_list) + { + char *name = lfirst(l); + + if (strcmp(name, NameStr(MyReplicationSlot->data.name)) == 0) + { + shouldwait = true; + break; + } + } + + if (!shouldwait) + return false; + } + + return true; +} + +/* + * Wait for physical standby to confirm receiving give lsn. + * + * Here logical walsender corresponding to a logical slot specified in + * synchronize_slot_names GUC waits for physical standbys corresponding to + * physical slots specified in standby_slot_names GUC. + */ +static void +WalSndWaitForStandbyConfirmation(XLogRecPtr wait_for_lsn) +{ + List *standby_slot_cpy; + ListCell *l; + ReplicationSlot *slot; + + standby_slot_cpy = list_copy(standby_slot_names_list); + +retry: + foreach(l, standby_slot_cpy) + { + char *name = lfirst(l); + XLogRecPtr restart_lsn; + bool invalidated; + + CHECK_FOR_INTERRUPTS(); + + /* Process any requests or signals received recently */ + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + SyncRepInitConfig(); + } + + /* If postmaster asked us to stop, don't wait anymore */ + if (got_STOPPING) + break; + + slot = SearchNamedReplicationSlot(name, true); + + /* + * It may happen that the slot specified in standby_slot_names GUC + * value is dropped, so let's skip over it. + */ + if (!slot) + { + ereport(WARNING, + errmsg("replication slot \"%s\" specified in parameter" + " \"%s\" does not exist, ignoring", + name, "standby_slot_names")); + standby_slot_cpy = foreach_delete_current(standby_slot_cpy, l); + continue; + } + + /* + * If logical slot name is given in standby_slot_names, give WARNING + * and skip it. Since it is harmless, so WARNING should be enough, no + * need to error-out. + */ + if (SlotIsLogical(slot)) + { + ereport(WARNING, + errmsg("cannot have logical replication slot \"%s\" in " + "parameter \"%s\", ignoring", + name, "standby_slot_names")); + standby_slot_cpy = foreach_delete_current(standby_slot_cpy, l); + continue; + } + + /* physical slots advance restart_lsn on remote flush */ + SpinLockAcquire(&slot->mutex); + restart_lsn = slot->data.restart_lsn; + invalidated = slot->data.invalidated != RS_INVAL_NONE; + SpinLockRelease(&slot->mutex); + + /* + * Specified physical slot may have been invalidated, so no point in + * waiting for it. + */ + if (restart_lsn == InvalidXLogRecPtr || invalidated) + { + ereport(WARNING, + errmsg("physical slot \"%s\" specified in parameter \"%s\" " + "has been invalidated, ignoring", + name, "standby_slot_names")); + standby_slot_cpy = foreach_delete_current(standby_slot_cpy, l); + continue; + } + + /* If the slot is past the wait_for_lsn, no need to wait anymore */ + if (restart_lsn >= wait_for_lsn) + { + standby_slot_cpy = foreach_delete_current(standby_slot_cpy, l); + continue; + } + + } + + /* Exit if done waiting for everyone or postmaster asked us to stop */ + if ((list_length(standby_slot_cpy) == 0) || got_STOPPING) + { + return; + } + + /* Check for input from the client */ + ProcessRepliesIfAny(); + + /* Die if timeout was reached */ + WalSndCheckTimeOut(); + + /* Send keepalive if the time has come */ + WalSndKeepaliveIfNecessary(); + + /* XXX: Is waiting for 1 second before retrying enough or more or less? */ + (void) WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 1000L, + WAIT_EVENT_WAL_SENDER_WAIT_FOR_STANDBY_CONFIRMATION); + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + + goto retry; +} + /* * Wait till WAL < loc is flushed to disk so it can be safely sent to client. * @@ -1570,6 +1746,7 @@ WalSndWaitForWal(XLogRecPtr loc) ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); SyncRepInitConfig(); + SlotSyncInitConfig(); } /* Check for input from the client */ @@ -1657,6 +1834,16 @@ WalSndWaitForWal(XLogRecPtr loc) WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL); } + /* + * Wait for specified streaming replication standby servers (if any) to + * confirm receipt of WAL upto RecentFlushPtr. It is good to wait here + * upto RecentFlushPtr and then let it send the changes to logical + * subscribers one by one which are already covered in RecentFlushPtr + * without needing to wait on every change for standby confirmation. + */ + if (WalSndWaitForStandbyNeeded()) + WalSndWaitForStandbyConfirmation(RecentFlushPtr); + /* reactivate latch so WalSndLoop knows to continue */ SetLatch(MyLatch); return RecentFlushPtr; @@ -2469,6 +2656,7 @@ WalSndLoop(WalSndSendDataCallback send_data) ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); SyncRepInitConfig(); + SlotSyncInitConfig(); } /* Check for input from the client */ diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 9c5fdeb3ca..daf2d57d3d 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -76,6 +76,7 @@ LIBPQWALRECEIVER_CONNECT "Waiting in WAL receiver to establish connection to rem LIBPQWALRECEIVER_RECEIVE "Waiting in WAL receiver to receive data from remote server." SSL_OPEN_SERVER "Waiting for SSL while attempting connection." WAL_SENDER_WAIT_FOR_WAL "Waiting for WAL to be flushed in WAL sender process." +WAL_SENDER_WAIT_FOR_STANDBY_CONFIRMATION "Waiting for physical standby confirmation in WAL sender process." WAL_SENDER_WRITE_DATA "Waiting for any activity when processing replies from WAL receiver in WAL sender process." diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 16ec6c5ef0..1b3914af7d 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -4552,6 +4552,36 @@ struct config_string ConfigureNamesString[] = check_debug_io_direct, assign_debug_io_direct, NULL }, + /* + * XXX: synchronize_slot_names needs to be specified on both primary and + * standby, therefore, we might need a new group REPLICATION. + */ + { + {"synchronize_slot_names", PGC_SIGHUP, REPLICATION_STANDBY, + gettext_noop("List of replication slot names to synchronize from " + "primary to streaming replication standby server."), + gettext_noop("Value of \"*\" means all."), + GUC_LIST_INPUT | GUC_LIST_QUOTE + }, + &synchronize_slot_names, + "", + check_synchronize_slot_names, NULL, NULL + }, + + { + {"standby_slot_names", PGC_SIGHUP, REPLICATION_PRIMARY, + gettext_noop("List of streaming replication standby server slot " + "names that logical walsenders waits for."), + gettext_noop("Decoded changes are sent out to plugins by logical " + "walsenders only after specified replication slots " + "confirm receiving WAL."), + GUC_LIST_INPUT | GUC_LIST_QUOTE + }, + &standby_slot_names, + "", + check_standby_slot_names, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index d08d55c3fe..4b0a556b0a 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -326,6 +326,8 @@ # method to choose sync standbys, number of sync standbys, # and comma-separated list of application_name # from standby(s); '*' = all +#standby_slot_names = '' # streaming replication standby server slot names that + # logical walsenders waits for # - Standby Servers - @@ -353,6 +355,8 @@ #wal_retrieve_retry_interval = 5s # time to wait before retrying to # retrieve WAL after a failed attempt #recovery_min_apply_delay = 0 # minimum delay for applying changes during recovery +#synchronize_slot_names = '' # replication slot names to synchronize from + # primary to streaming replication standby server # - Subscribers - diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 758ca79a81..38c0072043 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -210,6 +210,12 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot; /* GUCs */ extern PGDLLIMPORT int max_replication_slots; +extern PGDLLIMPORT char *synchronize_slot_names; +extern PGDLLIMPORT char *standby_slot_names; + +/* Globals */ +extern PGDLLIMPORT List *standby_slot_names_list; +extern PGDLLIMPORT List *synchronize_slot_names_list; /* shmem initialization functions */ extern Size ReplicationSlotsShmemSize(void); @@ -253,4 +259,7 @@ extern void CheckPointReplicationSlots(bool is_shutdown); extern void CheckSlotRequirements(void); extern void CheckSlotPermissions(void); +extern void WaitForStandbyLSN(XLogRecPtr wait_for_lsn); +extern void SlotSyncInitConfig(void); + #endif /* SLOT_H */ diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h index f04b99e3b9..e0295b3df6 100644 --- a/src/include/utils/guc_hooks.h +++ b/src/include/utils/guc_hooks.h @@ -160,5 +160,9 @@ extern bool check_wal_consistency_checking(char **newval, void **extra, extern void assign_wal_consistency_checking(const char *newval, void *extra); extern bool check_wal_segment_size(int *newval, void **extra, GucSource source); extern void assign_xlog_sync_method(int new_sync_method, void *extra); +extern bool check_synchronize_slot_names(char **newval, void **extra, + GucSource source); +extern bool check_standby_slot_names(char **newval, void **extra, + GucSource source); #endif /* GUC_HOOKS_H */ diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index 9d8039684a..3be3ee52fc 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -45,6 +45,7 @@ tests += { 't/037_invalid_database.pl', 't/038_save_logical_slots_shutdown.pl', 't/039_end_of_wal.pl', + 't/050_verify_slot_order.pl', ], }, } diff --git a/src/test/recovery/t/050_verify_slot_order.pl b/src/test/recovery/t/050_verify_slot_order.pl new file mode 100644 index 0000000000..402b704e3f --- /dev/null +++ b/src/test/recovery/t/050_verify_slot_order.pl @@ -0,0 +1,146 @@ + +# Copyright (c) 2023, PostgreSQL Global Development Group + +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Test primary disallowing specified logical replication slots getting ahead of +# specified physical replication slots. It uses the following set up: +# +# | ----> standby1 (connected via streaming replication) +# | ----> standby2 (connected via streaming replication) +# primary ----- | +# | ----> subscriber1 (connected via logical replication) +# | ----> subscriber2 (connected via logical replication) +# +# Set up is configured in such a way that primary never lets subscriber1 ahead +# of standby1. + +# Create primary +my $primary = PostgreSQL::Test::Cluster->new('primary'); +$primary->init(allows_streaming => 'logical'); + +# Configure primary to disallow specified logical replication slot (lsub1_slot) +# getting ahead of specified physical replication slot (sb1_slot). +$primary->append_conf( + 'postgresql.conf', qq( +standby_slot_names = 'sb1_slot' +synchronize_slot_names = 'lsub1_slot' +)); +$primary->start; + +$primary->psql('postgres', + q{SELECT pg_create_physical_replication_slot('sb1_slot');}); +$primary->psql('postgres', + q{SELECT pg_create_physical_replication_slot('sb2_slot');}); + +$primary->safe_psql('postgres', "CREATE TABLE tab_int (a int PRIMARY KEY);"); + +my $backup_name = 'backup'; +$primary->backup($backup_name); + +# Create a standby +my $standby1 = PostgreSQL::Test::Cluster->new('standby1'); +$standby1->init_from_backup( + $primary, $backup_name, + has_streaming => 1, + has_restoring => 1); +$standby1->append_conf( + 'postgresql.conf', qq( +primary_slot_name = 'sb1_slot' +)); +$standby1->start; +$primary->wait_for_replay_catchup($standby1); + +# Create another standby +my $standby2 = PostgreSQL::Test::Cluster->new('standby2'); +$standby2->init_from_backup( + $primary, $backup_name, + has_streaming => 1, + has_restoring => 1); +$standby2->append_conf( + 'postgresql.conf', qq( +primary_slot_name = 'sb2_slot' +)); +$standby2->start; +$primary->wait_for_replay_catchup($standby2); + +# Create publication on primary +my $publisher = $primary; +$publisher->safe_psql('postgres', "CREATE PUBLICATION mypub FOR TABLE tab_int;"); +my $publisher_connstr = $publisher->connstr . ' dbname=postgres'; + +# Create a subscriber node, wait for sync to complete +my $subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1'); +$subscriber1->init(allows_streaming => 'logical'); +$subscriber1->start; +$subscriber1->safe_psql('postgres', "CREATE TABLE tab_int (a int PRIMARY KEY);"); +$subscriber1->safe_psql('postgres', + "CREATE SUBSCRIPTION mysub1 CONNECTION '$publisher_connstr' " + . "PUBLICATION mypub WITH (slot_name = lsub1_slot);"); +$subscriber1->wait_for_subscription_sync; + +# Create another subscriber node, wait for sync to complete +my $subscriber2 = PostgreSQL::Test::Cluster->new('subscriber2'); +$subscriber2->init(allows_streaming => 'logical'); +$subscriber2->start; +$subscriber2->safe_psql('postgres', "CREATE TABLE tab_int (a int PRIMARY KEY);"); +$subscriber2->safe_psql('postgres', + "CREATE SUBSCRIPTION mysub2 CONNECTION '$publisher_connstr' " + . "PUBLICATION mypub WITH (slot_name = lsub2_slot);"); +$subscriber2->wait_for_subscription_sync; + +# Stop the standby associated with specified physical replication slot so that +# the logical replication slot won't receive changes until the standby comes +# up. +$standby1->stop; + +# Create some data on primary +my $primary_row_count = 10; +my $primary_insert_time = time(); +$primary->safe_psql('postgres', + "INSERT INTO tab_int SELECT generate_series(1, $primary_row_count);"); + +# Wait for the standby that's up and running gets the data from primary +$primary->wait_for_replay_catchup($standby2); +my $result = $standby2->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', "standby2 gets data from primary"); + +# Wait for the subscriber that's up and running and not specified in +# synchronize_slot_names GUC on primary gets the data from primary without +# waiting for any standbys. +$publisher->wait_for_catchup('mysub2'); +$result = $subscriber2->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', "subscriber2 gets data from primary"); + +# The subscriber that's up and running and specified in synchronize_slot_names +# GUC on primary doesn't get the data from primary and keeps waiting for the +# standby specified in standby_slot_names. +$result = $subscriber1->safe_psql('postgres', + "SELECT count(*) = 0 FROM tab_int;"); +is($result, 't', "subscriber1 doesn't get data from primary until standby1 acknowledges changes"); + +# Start the standby specified in standby_slot_names and wait for it to catch +# up with the primary. +$standby1->start; +$primary->wait_for_replay_catchup($standby1); +$result = $standby1->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', "standby1 gets data from primary"); + +# Now that the standby specified in standby_slot_names is up and running, +# primary must send the decoded changes to subscriber specified in +# synchronize_slot_names. While the standby was down, this subscriber didn't +# receive any data from primary i.e. the primary didn't allow it to go ahead +# of standby. +$publisher->wait_for_catchup('mysub1'); +$result = $subscriber1->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', "subscriber1 gets data from primary after standby1 acknowledges changes"); + +done_testing(); -- 2.34.1