From bac0fbef8b203c530e5117b0b7cfee13cfab78b9 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Sat, 22 Jul 2023 10:17:48 +0000 Subject: [PATCH v13 1/2] Allow logical walsenders to wait for physical standbys --- doc/src/sgml/config.sgml | 42 ++++ .../replication/logical/reorderbuffer.c | 9 + src/backend/replication/slot.c | 216 +++++++++++++++++- src/backend/utils/misc/guc_tables.c | 30 +++ src/backend/utils/misc/postgresql.conf.sample | 4 + src/include/replication/slot.h | 4 + src/include/utils/guc_hooks.h | 4 + src/test/recovery/meson.build | 1 + src/test/recovery/t/050_verify_slot_order.pl | 146 ++++++++++++ 9 files changed, 455 insertions(+), 1 deletion(-) create mode 100644 src/test/recovery/t/050_verify_slot_order.pl diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 11251fa05e..83a7d2e87e 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4397,6 +4397,24 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows + + standby_slot_names (string) + + standby_slot_names configuration parameter + + + + + List of physical replication slots that logical replication waits for. + Specify * to wait for all physical replication + slots. If a logical replication connection is meant to switch to a + physical standby after the standby is promoted, the physical + replication slot for the standby should be listed here. This ensures + that logical replication is not ahead of the physical standby. + + + + @@ -4545,6 +4563,30 @@ ANY num_sync ( + synchronize_slot_names (string) + + synchronize_slot_names configuration parameter + + + + + Specifies a list of logical replication slots that a streaming + replication standby should synchronize from the primary server. This is + necessary to be able to retarget those logical replication connections + to this standby if it gets promoted. Specify * to + synchronize all logical replication slots. The default is empty. On + primary, the logical walsenders associated with logical replication + slots specified in this parameter will wait for the standby servers + specified in parameter. In + other words, primary ensures those logical replication slots will + never get ahead of the standby servers. On standby server, the logical + replication slots specified are synchronized from the primary. Set this + parameter to same value on both primary and standby. + + + + diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 87a4d2a24b..dc12d825e2 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -100,6 +100,7 @@ #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */ #include "storage/bufmgr.h" #include "storage/fd.h" +#include "storage/ipc.h" #include "storage/sinval.h" #include "utils/builtins.h" #include "utils/combocid.h" @@ -107,6 +108,7 @@ #include "utils/memutils.h" #include "utils/rel.h" #include "utils/relfilenumbermap.h" +#include "utils/varlena.h" /* entry for a hash table we use to map from xid to our transaction state */ @@ -2498,6 +2500,13 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, } else { + /* + * Before we send out the last set of changes to logical decoding + * output plugin, wait for specified streaming replication standby + * servers (if any) to confirm receipt of WAL upto commit_lsn. + */ + WaitForStandbyLSN(commit_lsn); + /* * Call either PREPARE (for two-phase transactions) or COMMIT (for * regular ones). diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 1dc27264f6..dc1d11a564 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -52,6 +52,8 @@ #include "storage/proc.h" #include "storage/procarray.h" #include "utils/builtins.h" +#include "utils/guc_hooks.h" +#include "utils/varlena.h" /* * Replication slot on-disk data structure. @@ -98,9 +100,11 @@ ReplicationSlotCtlData *ReplicationSlotCtl = NULL; /* My backend's replication slot in the shared memory array */ ReplicationSlot *MyReplicationSlot = NULL; -/* GUC variable */ +/* GUC variables */ int max_replication_slots = 10; /* the maximum number of replication * slots */ +char *synchronize_slot_names; +char *standby_slot_names; static void ReplicationSlotShmemExit(int code, Datum arg); static void ReplicationSlotDropAcquired(void); @@ -111,6 +115,8 @@ static void RestoreSlotFromDisk(const char *name); static void CreateSlotOnDisk(ReplicationSlot *slot); static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel); +static bool validate_slot_names(char **newval); + /* * Report shared-memory space needed by ReplicationSlotsShmemInit. */ @@ -2085,3 +2091,211 @@ RestoreSlotFromDisk(const char *name) (errmsg("too many replication slots active before shutdown"), errhint("Increase max_replication_slots and try again."))); } + +/* + * A helper function to simplify check_hook implementation for + * synchronize_slot_names and standby_slot_names GUCs. + */ +static bool +validate_slot_names(char **newval) +{ + char *rawname; + List *elemlist; + + /* Need a modifiable copy of string */ + rawname = pstrdup(*newval); + + /* Parse string into list of identifiers */ + if (!SplitIdentifierString(rawname, ',', &elemlist)) + { + /* syntax error in name list */ + GUC_check_errdetail("List syntax is invalid."); + pfree(rawname); + list_free(elemlist); + return false; + } + + pfree(rawname); + list_free(elemlist); + return true; +} + +/* + * GUC check_hook for synchronize_slot_names + */ +bool +check_synchronize_slot_names(char **newval, void **extra, GucSource source) +{ + /* Special handling for "*" which means all. */ + if (strcmp(*newval, "*") == 0) + return true; + + if (strcmp(*newval, "") == 0) + return true; + + return validate_slot_names(newval); +} + +/* + * GUC check_hook for standby_slot_names + */ +bool +check_standby_slot_names(char **newval, void **extra, GucSource source) +{ + /* Special handling for "*" which means all. */ + if (strcmp(*newval, "*") == 0) + return true; + + if (strcmp(*newval, "") == 0) + return true; + + return validate_slot_names(newval); +} + +/* + * Function in which logical walsender (the caller) corresponding to a logical + * slot specified in synchronize_slot_names GUC value waits for one or more + * physical standbys corresponding to specified physical slots in + * standby_slot_names GUC value. + */ +void +WaitForStandbyLSN(XLogRecPtr wait_for_lsn) +{ + char *rawname; + List *elemlist; + ListCell *l; + ReplicationSlot *slot; + + Assert(MyReplicationSlot != NULL); + Assert(SlotIsLogical(MyReplicationSlot)); + + if (strcmp(standby_slot_names, "") == 0) + return; + + /* + * Check if the slot associated with this logical walsender is asked to + * wait for physical standbys. + */ + if (strcmp(synchronize_slot_names, "") == 0) + return; + + /* "*" means all logical walsenders should wait for physical standbys. */ + if (strcmp(synchronize_slot_names, "*") != 0) + { + bool shouldwait = false; + + rawname = pstrdup(synchronize_slot_names); + SplitIdentifierString(rawname, ',', &elemlist); + + foreach (l, elemlist) + { + char *name = lfirst(l); + if (strcmp(name, NameStr(MyReplicationSlot->data.name)) == 0) + { + shouldwait = true; + break; + } + } + + pfree(rawname); + rawname = NULL; + list_free(elemlist); + elemlist = NIL; + + if (!shouldwait) + return; + } + + rawname = pstrdup(standby_slot_names); + SplitIdentifierString(rawname, ',', &elemlist); + +retry: + + foreach (l, elemlist) + { + char *name = lfirst(l); + XLogRecPtr restart_lsn; + bool invalidated; + + slot = SearchNamedReplicationSlot(name, true); + + /* + * It may happen that the slot specified in standby_slot_names GUC + * value is dropped, so let's skip over it. + */ + if (!slot) + { + ereport(WARNING, + errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist, ignoring", + name, "standby_slot_names")); + elemlist = foreach_delete_current(elemlist, l); + continue; + } + + /* + * It may happen that the physical slot specified in standby_slot_names + * is dropped without removing it from the GUC value, and a logical + * slot has been created with the same name meanwhile. Let's skip over + * it. + * + * NB: We might think to modify the GUC value automatically while + * dropping a physical replication slot, but that won't be a nice idea + * given that the slot can sometimes be dropped in process exit paths + * (check ReplicationSlotCleanup call sites), so modifying GUC value + * there isn't a great idea. + */ + if (SlotIsLogical(slot)) + { + ereport(WARNING, + errmsg("cannot have logical replication slot \"%s\" in parameter \"%s\", ignoring", + name, "standby_slot_names")); + elemlist = foreach_delete_current(elemlist, l); + continue; + } + + /* physical slots advance restart_lsn on remote flush */ + SpinLockAcquire(&slot->mutex); + restart_lsn = slot->data.restart_lsn; + invalidated = slot->data.invalidated != RS_INVAL_NONE; + SpinLockRelease(&slot->mutex); + + /* + * Specified physical slot may have been invalidated, so no point in + * waiting for it. + */ + if (restart_lsn == InvalidXLogRecPtr || invalidated) + { + ereport(WARNING, + errmsg("physical slot \"%s\" specified in parameter \"%s\" has been invalidated, ignoring", + name, "standby_slot_names")); + elemlist = foreach_delete_current(elemlist, l); + continue; + } + + /* If the slot is past the wait_for_lsn, no need to wait anymore */ + if (restart_lsn >= wait_for_lsn) + { + elemlist = foreach_delete_current(elemlist, l); + continue; + } + } + + if (list_length(elemlist) == 0) + { + pfree(rawname); + return; /* Exit if done waiting for everyone */ + } + + /* XXX: Is waiting for 1 second before retrying enough or more or less? */ + + /* XXX: Need to have a new wait event type. */ + (void) WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 1000L, + WAIT_EVENT_WAL_SENDER_WAIT_WAL); + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + + goto retry; +} diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index f9dba43b8c..d72b6b95b6 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -4551,6 +4551,36 @@ struct config_string ConfigureNamesString[] = check_io_direct, assign_io_direct, NULL }, + /* + * XXX: synchronize_slot_names needs to be specified on both primary and + * standby, therefore, we might need a new group REPLICATION. + */ + { + {"synchronize_slot_names", PGC_SIGHUP, REPLICATION_STANDBY, + gettext_noop("List of replication slot names to synchronize from " + "primary to streaming replication standby server."), + gettext_noop("Value of \"*\" means all."), + GUC_LIST_INPUT | GUC_LIST_QUOTE + }, + &synchronize_slot_names, + "", + check_synchronize_slot_names, NULL, NULL + }, + + { + {"standby_slot_names", PGC_SIGHUP, REPLICATION_PRIMARY, + gettext_noop("List of streaming replication standby server slot " + "names that logical walsenders waits for."), + gettext_noop("Decoded changes are sent out to plugins by logical " + "walsenders only after specified replication slots " + "confirm receiving WAL."), + GUC_LIST_INPUT | GUC_LIST_QUOTE + }, + &standby_slot_names, + "", + check_standby_slot_names, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index c768af9a73..63daf586f3 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -328,6 +328,8 @@ # method to choose sync standbys, number of sync standbys, # and comma-separated list of application_name # from standby(s); '*' = all +#standby_slot_names = '' # streaming replication standby server slot names that + # logical walsenders waits for # - Standby Servers - @@ -355,6 +357,8 @@ #wal_retrieve_retry_interval = 5s # time to wait before retrying to # retrieve WAL after a failed attempt #recovery_min_apply_delay = 0 # minimum delay for applying changes during recovery +#synchronize_slot_names = '' # replication slot names to synchronize from + # primary to streaming replication standby server # - Subscribers - diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index a8a89dc784..2765f99ccf 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -203,6 +203,8 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot; /* GUCs */ extern PGDLLIMPORT int max_replication_slots; +extern PGDLLIMPORT char *synchronize_slot_names; +extern PGDLLIMPORT char *standby_slot_names; /* shmem initialization functions */ extern Size ReplicationSlotsShmemSize(void); @@ -246,4 +248,6 @@ extern void CheckPointReplicationSlots(void); extern void CheckSlotRequirements(void); extern void CheckSlotPermissions(void); +extern void WaitForStandbyLSN(XLogRecPtr wait_for_lsn); + #endif /* SLOT_H */ diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h index 2ecb9fc086..259aefb9d7 100644 --- a/src/include/utils/guc_hooks.h +++ b/src/include/utils/guc_hooks.h @@ -159,5 +159,9 @@ extern void assign_wal_consistency_checking(const char *newval, void *extra); extern void assign_xlog_sync_method(int new_sync_method, void *extra); extern bool check_io_direct(char **newval, void **extra, GucSource source); extern void assign_io_direct(const char *newval, void *extra); +extern bool check_synchronize_slot_names(char **newval, void **extra, + GucSource source); +extern bool check_standby_slot_names(char **newval, void **extra, + GucSource source); #endif /* GUC_HOOKS_H */ diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index e7328e4894..ee590eeac7 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -43,6 +43,7 @@ tests += { 't/035_standby_logical_decoding.pl', 't/036_truncated_dropped.pl', 't/037_invalid_database.pl', + 't/050_verify_slot_order.pl', ], }, } diff --git a/src/test/recovery/t/050_verify_slot_order.pl b/src/test/recovery/t/050_verify_slot_order.pl new file mode 100644 index 0000000000..402b704e3f --- /dev/null +++ b/src/test/recovery/t/050_verify_slot_order.pl @@ -0,0 +1,146 @@ + +# Copyright (c) 2023, PostgreSQL Global Development Group + +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Test primary disallowing specified logical replication slots getting ahead of +# specified physical replication slots. It uses the following set up: +# +# | ----> standby1 (connected via streaming replication) +# | ----> standby2 (connected via streaming replication) +# primary ----- | +# | ----> subscriber1 (connected via logical replication) +# | ----> subscriber2 (connected via logical replication) +# +# Set up is configured in such a way that primary never lets subscriber1 ahead +# of standby1. + +# Create primary +my $primary = PostgreSQL::Test::Cluster->new('primary'); +$primary->init(allows_streaming => 'logical'); + +# Configure primary to disallow specified logical replication slot (lsub1_slot) +# getting ahead of specified physical replication slot (sb1_slot). +$primary->append_conf( + 'postgresql.conf', qq( +standby_slot_names = 'sb1_slot' +synchronize_slot_names = 'lsub1_slot' +)); +$primary->start; + +$primary->psql('postgres', + q{SELECT pg_create_physical_replication_slot('sb1_slot');}); +$primary->psql('postgres', + q{SELECT pg_create_physical_replication_slot('sb2_slot');}); + +$primary->safe_psql('postgres', "CREATE TABLE tab_int (a int PRIMARY KEY);"); + +my $backup_name = 'backup'; +$primary->backup($backup_name); + +# Create a standby +my $standby1 = PostgreSQL::Test::Cluster->new('standby1'); +$standby1->init_from_backup( + $primary, $backup_name, + has_streaming => 1, + has_restoring => 1); +$standby1->append_conf( + 'postgresql.conf', qq( +primary_slot_name = 'sb1_slot' +)); +$standby1->start; +$primary->wait_for_replay_catchup($standby1); + +# Create another standby +my $standby2 = PostgreSQL::Test::Cluster->new('standby2'); +$standby2->init_from_backup( + $primary, $backup_name, + has_streaming => 1, + has_restoring => 1); +$standby2->append_conf( + 'postgresql.conf', qq( +primary_slot_name = 'sb2_slot' +)); +$standby2->start; +$primary->wait_for_replay_catchup($standby2); + +# Create publication on primary +my $publisher = $primary; +$publisher->safe_psql('postgres', "CREATE PUBLICATION mypub FOR TABLE tab_int;"); +my $publisher_connstr = $publisher->connstr . ' dbname=postgres'; + +# Create a subscriber node, wait for sync to complete +my $subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1'); +$subscriber1->init(allows_streaming => 'logical'); +$subscriber1->start; +$subscriber1->safe_psql('postgres', "CREATE TABLE tab_int (a int PRIMARY KEY);"); +$subscriber1->safe_psql('postgres', + "CREATE SUBSCRIPTION mysub1 CONNECTION '$publisher_connstr' " + . "PUBLICATION mypub WITH (slot_name = lsub1_slot);"); +$subscriber1->wait_for_subscription_sync; + +# Create another subscriber node, wait for sync to complete +my $subscriber2 = PostgreSQL::Test::Cluster->new('subscriber2'); +$subscriber2->init(allows_streaming => 'logical'); +$subscriber2->start; +$subscriber2->safe_psql('postgres', "CREATE TABLE tab_int (a int PRIMARY KEY);"); +$subscriber2->safe_psql('postgres', + "CREATE SUBSCRIPTION mysub2 CONNECTION '$publisher_connstr' " + . "PUBLICATION mypub WITH (slot_name = lsub2_slot);"); +$subscriber2->wait_for_subscription_sync; + +# Stop the standby associated with specified physical replication slot so that +# the logical replication slot won't receive changes until the standby comes +# up. +$standby1->stop; + +# Create some data on primary +my $primary_row_count = 10; +my $primary_insert_time = time(); +$primary->safe_psql('postgres', + "INSERT INTO tab_int SELECT generate_series(1, $primary_row_count);"); + +# Wait for the standby that's up and running gets the data from primary +$primary->wait_for_replay_catchup($standby2); +my $result = $standby2->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', "standby2 gets data from primary"); + +# Wait for the subscriber that's up and running and not specified in +# synchronize_slot_names GUC on primary gets the data from primary without +# waiting for any standbys. +$publisher->wait_for_catchup('mysub2'); +$result = $subscriber2->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', "subscriber2 gets data from primary"); + +# The subscriber that's up and running and specified in synchronize_slot_names +# GUC on primary doesn't get the data from primary and keeps waiting for the +# standby specified in standby_slot_names. +$result = $subscriber1->safe_psql('postgres', + "SELECT count(*) = 0 FROM tab_int;"); +is($result, 't', "subscriber1 doesn't get data from primary until standby1 acknowledges changes"); + +# Start the standby specified in standby_slot_names and wait for it to catch +# up with the primary. +$standby1->start; +$primary->wait_for_replay_catchup($standby1); +$result = $standby1->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', "standby1 gets data from primary"); + +# Now that the standby specified in standby_slot_names is up and running, +# primary must send the decoded changes to subscriber specified in +# synchronize_slot_names. While the standby was down, this subscriber didn't +# receive any data from primary i.e. the primary didn't allow it to go ahead +# of standby. +$publisher->wait_for_catchup('mysub1'); +$result = $subscriber1->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', "subscriber1 gets data from primary after standby1 acknowledges changes"); + +done_testing(); -- 2.34.1