From 45463ca3eb9fdb0907a15e16a60064e5d976c036 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Thu, 25 Jan 2024 20:28:30 +0800 Subject: [PATCH v69 3/7] Add logical slot sync capability to the physical standby This patch implements synchronization of logical replication slots from the primary server to the physical standby so that logical replication can be resumed after failover. GUC 'enable_syncslot' enables a physical standby to synchronize failover logical replication slots from the primary server. The logical replication slots on the primary can be synchronized to the hot standby by enabling the failover option during slot creation and setting 'enable_syncslot' on the standby. For the synchronization to work, it is mandatory to have a physical replication slot between the primary and the standby, and hot_standby_feedback must be enabled on the standby. All the failover logical replication slots on the primary (assuming configurations are appropriate) are automatically created on the physical standbys and are synced periodically. Slot-sync worker on the standby server ping the primary server at regular intervals to get the necessary failover logical slots information and create/update the slots locally. The nap time of the worker is tuned according to the activity on the primary. The worker waits for a period of time before the next synchronization, with the duration varying based on whether any slots were updated during the last cycle. The logical slots created by slot-sync worker on physical standbys are not allowed to be dropped or consumed. Any attempt to perform logical decoding on such slots will result in an error. If a logical slot is invalidated on the primary, then that slot on the standby is also invalidated. If a logical slot on the primary is valid but is invalidated on the standby, then that slot is dropped and recreated on the standby in next sync-cycle provided the slot still exists on the primary server. It is okay to recreate such slots as long as these are not consumable on the standby (which is the case currently). This situation may occur due to the following reasons: - The max_slot_wal_keep_size on the standby is insufficient to retain WAL records from the restart_lsn of the slot. - primary_slot_name is temporarily reset to null and the physical slot is removed. - The primary changes wal_level to a level lower than logical. The slots synchronization status on the standby can be monitored using 'synced' column of pg_replication_slots view. --- doc/src/sgml/bgworker.sgml | 65 +- doc/src/sgml/config.sgml | 27 +- doc/src/sgml/logicaldecoding.sgml | 37 + doc/src/sgml/protocol.sgml | 6 +- doc/src/sgml/system-views.sgml | 22 +- src/backend/access/transam/xlog.c | 5 +- src/backend/access/transam/xlogrecovery.c | 15 + src/backend/catalog/system_views.sql | 3 +- src/backend/postmaster/bgworker.c | 4 + src/backend/postmaster/postmaster.c | 10 + .../libpqwalreceiver/libpqwalreceiver.c | 41 + src/backend/replication/logical/Makefile | 1 + src/backend/replication/logical/logical.c | 12 + src/backend/replication/logical/meson.build | 1 + src/backend/replication/logical/slotsync.c | 1222 +++++++++++++++++ src/backend/replication/slot.c | 59 +- src/backend/replication/slotfuncs.c | 26 +- src/backend/replication/walreceiverfuncs.c | 16 + src/backend/replication/walsender.c | 19 +- src/backend/storage/ipc/ipci.c | 2 + src/backend/tcop/postgres.c | 11 + .../utils/activity/wait_event_names.txt | 1 + src/backend/utils/misc/guc_tables.c | 10 + src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/catalog/pg_proc.dat | 6 +- src/include/postmaster/bgworker.h | 1 + src/include/replication/logicalworker.h | 1 + src/include/replication/slot.h | 17 +- src/include/replication/walreceiver.h | 11 + src/include/replication/walsender.h | 3 + src/include/replication/worker_internal.h | 10 + .../t/050_standby_failover_slots_sync.pl | 166 ++- src/test/regress/expected/rules.out | 5 +- src/test/regress/expected/sysviews.out | 3 +- src/tools/pgindent/typedefs.list | 2 + 35 files changed, 1792 insertions(+), 49 deletions(-) create mode 100644 src/backend/replication/logical/slotsync.c diff --git a/doc/src/sgml/bgworker.sgml b/doc/src/sgml/bgworker.sgml index 2c393385a9..a7cfe6c58c 100644 --- a/doc/src/sgml/bgworker.sgml +++ b/doc/src/sgml/bgworker.sgml @@ -114,18 +114,59 @@ typedef struct BackgroundWorker bgw_start_time is the server state during which - postgres should start the process; it can be one of - BgWorkerStart_PostmasterStart (start as soon as - postgres itself has finished its own initialization; processes - requesting this are not eligible for database connections), - BgWorkerStart_ConsistentState (start as soon as a consistent state - has been reached in a hot standby, allowing processes to connect to - databases and run read-only queries), and - BgWorkerStart_RecoveryFinished (start as soon as the system has - entered normal read-write state). Note the last two values are equivalent - in a server that's not a hot standby. Note that this setting only indicates - when the processes are to be started; they do not stop when a different state - is reached. + postgres should start the process. Note that this setting + only indicates when the processes are to be started; they do not stop when + a different state is reached. Possible values are: + + + + BgWorkerStart_PostmasterStart + + + BgWorkerStart_PostmasterStart + Start as soon as postgres itself has finished its own initialization; + processes requesting this are not eligible for database connections. + + + + + + BgWorkerStart_ConsistentState + + + BgWorkerStart_ConsistentState + Start as soon as a consistent state has been reached in a hot-standby, + allowing processes to connect to databases and run read-only queries. + + + + + + BgWorkerStart_ConsistentState_HotStandby + + + BgWorkerStart_ConsistentState_HotStandby + Same meaning as BgWorkerStart_ConsistentState but + it is more strict in terms of the server i.e. start the worker only + if it is hot-standby. + + + + + + BgWorkerStart_RecoveryFinished + + + BgWorkerStart_RecoveryFinished + Start as soon as the system has entered normal read-write state. Note + that the BgWorkerStart_ConsistentState and + BgWorkerStart_RecoveryFinished are equivalent + in a server that's not a hot standby. + + + + + diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 61038472c5..bd2d2f871e 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4612,8 +4612,13 @@ ANY num_sync ( ) then it is also + necessary to specify dbname in the + primary_conninfo string. This will only be used for + slot synchronization. It is ignored for streaming. This parameter can only be set in the postgresql.conf @@ -4938,6 +4943,24 @@ ANY num_sync ( + enable_syncslot (boolean) + + enable_syncslot configuration parameter + + + + + It enables a physical standby to synchronize logical failover slots + from the primary server so that logical subscribers are not blocked + after failover. + + + It is disabled by default. This parameter can only be set in the + postgresql.conf file or on the server command line. + + + diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index cd152d4ced..ec14cf7325 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -358,6 +358,43 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU So if a slot is no longer required it should be dropped. + + + + + Replication Slot Synchronization + + A logical replication slot on the primary can be synchronized to the hot + standby by enabling the failover option during slot + creation and setting + enable_syncslot + on the standby. For the synchronization + to work, it is mandatory to have a physical replication slot between the + primary and the standby, and + hot_standby_feedback + must be enabled on the standby. + + + + The ability to resume logical replication after failover depends upon the + pg_replication_slots.synced + value for the synchronized slots on the standby at the time of failover. + Only persistent slots that have attained synced state as true on the standby + before failover can be used for logical replication after failover. + Temporary slots will be dropped, therefore logical replication for those + slots cannot be resumed. For example, if the synchronized slot could not + become persistent on the standby due to a disabled subscription, then the + subscription cannot be resumed after failover even when it is enabled. + + + + To resume logical replication after failover from the synced logical + slots, the subscription's 'conninfo' must be altered to point to the + new primary server. This is done using + ALTER SUBSCRIPTION ... CONNECTION. + It is recommended that subscriptions are first disabled before promoting + the standby and are enabled back after altering the connection string. + diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index bb4fef1f51..f8ef2ad2ab 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -2065,7 +2065,8 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" FAILOVER [ boolean ] - If true, the slot is enabled to be synced to the standbys. + If true, the slot is enabled to be synced to the standbys + so that logical replication can be resumed after failover. The default is false. @@ -2165,7 +2166,8 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" FAILOVER [ boolean ] - If true, the slot is enabled to be synced to the standbys. + If true, the slot is enabled to be synced to the standbys + so that logical replication can be resumed after failover. diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index dd468b31ea..4ea2177b34 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2561,10 +2561,28 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx failover bool - True if this is a logical slot enabled to be synced to the standbys. - Always false for physical slots. + True if this is a logical slot enabled to be synced to the standbys + so that logical replication can be resumed from the new primary + after failover. Always false for physical slots. + + + + synced bool + + + True if this is a logical slot that was synced from a primary server. + + + On a hot standby, the slots with the synced column marked as true can + neither be used for logical decoding nor dropped by the user. The value + of this column has no meaning on the primary server; the column value on + the primary is default false for all slots but may (if leftover from a + promoted standby) also be true. + + + diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 478377c4a2..2d66d0d84b 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -3596,6 +3596,9 @@ XLogGetLastRemovedSegno(void) /* * Return the oldest WAL segment on the given TLI that still exists in * XLOGDIR, or 0 if none. + * + * If the given TLI is 0, return the oldest WAL segment among all the currently + * existing WAL segments. */ XLogSegNo XLogGetOldestSegno(TimeLineID tli) @@ -3619,7 +3622,7 @@ XLogGetOldestSegno(TimeLineID tli) wal_segment_size); /* Ignore anything that's not from the TLI of interest. */ - if (tli != file_tli) + if (tli != 0 && tli != file_tli) continue; /* If it's the oldest so far, update oldest_segno. */ diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 0bb472da27..87b49d524a 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -50,6 +50,7 @@ #include "postmaster/startup.h" #include "replication/slot.h" #include "replication/walreceiver.h" +#include "replication/worker_internal.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/latch.h" @@ -1467,6 +1468,20 @@ FinishWalRecovery(void) */ XLogShutdownWalRcv(); + /* + * Shutdown the slot sync workers to prevent potential conflicts between + * user processes and slotsync workers after a promotion. + * + * We do not update the 'synced' column from true to false here, as any + * failed update could leave 'synced' column false for some slots. This + * could cause issues during slot sync after restarting the server as a + * standby. While updating after switching to the new timeline is an + * option, it does not simplify the handling for 'synced' column. + * Therefore, we retain the 'synced' column as true after promotion as it + * may provide useful information about the slot origin. + */ + ShutDownSlotSync(); + /* * We are now done reading the xlog from stream. Turn off streaming * recovery to force fetching the files (which would be required at end of diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 6fc3916850..2e8ce9d554 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1024,7 +1024,8 @@ CREATE VIEW pg_replication_slots AS L.safe_wal_size, L.two_phase, L.conflict_reason, - L.failover + L.failover, + L.synced FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 67f92c24db..46828b8a89 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -21,6 +21,7 @@ #include "postmaster/postmaster.h" #include "replication/logicallauncher.h" #include "replication/logicalworker.h" +#include "replication/worker_internal.h" #include "storage/dsm.h" #include "storage/ipc.h" #include "storage/latch.h" @@ -129,6 +130,9 @@ static const struct { "ApplyWorkerMain", ApplyWorkerMain }, + { + "ReplSlotSyncWorkerMain", ReplSlotSyncWorkerMain + }, { "ParallelApplyWorkerMain", ParallelApplyWorkerMain }, diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index feb471dd1d..d90d5d1576 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -116,6 +116,7 @@ #include "postmaster/walsummarizer.h" #include "replication/logicallauncher.h" #include "replication/walsender.h" +#include "replication/worker_internal.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/pg_shmem.h" @@ -1010,6 +1011,12 @@ PostmasterMain(int argc, char *argv[]) */ ApplyLauncherRegister(); + /* + * Register the slot sync worker here to kick start slot-sync operation + * sooner on the physical standby. + */ + SlotSyncWorkerRegister(); + /* * process any libraries that should be preloaded at postmaster start */ @@ -5799,6 +5806,9 @@ bgworker_should_start_now(BgWorkerStartTime start_time) case PM_HOT_STANDBY: if (start_time == BgWorkerStart_ConsistentState) return true; + if (start_time == BgWorkerStart_ConsistentState_HotStandby && + pmState != PM_RUN) + return true; /* fall through */ case PM_RECOVERY: diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 20d5128c0e..ae76c098b1 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -33,6 +33,7 @@ #include "utils/memutils.h" #include "utils/pg_lsn.h" #include "utils/tuplestore.h" +#include "utils/varlena.h" PG_MODULE_MAGIC; @@ -57,6 +58,7 @@ static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port); static char *libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli); +static char *libpqrcv_get_dbname_from_conninfo(const char *conninfo); static int libpqrcv_server_version(WalReceiverConn *conn); static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, TimeLineID tli, char **filename, @@ -99,6 +101,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { .walrcv_send = libpqrcv_send, .walrcv_create_slot = libpqrcv_create_slot, .walrcv_alter_slot = libpqrcv_alter_slot, + .walrcv_get_dbname_from_conninfo = libpqrcv_get_dbname_from_conninfo, .walrcv_get_backend_pid = libpqrcv_get_backend_pid, .walrcv_exec = libpqrcv_exec, .walrcv_disconnect = libpqrcv_disconnect @@ -471,6 +474,44 @@ libpqrcv_server_version(WalReceiverConn *conn) return PQserverVersion(conn->streamConn); } +/* + * Get database name from the primary server's conninfo. + * + * If dbname is not found in connInfo, return NULL value. + */ +static char * +libpqrcv_get_dbname_from_conninfo(const char *connInfo) +{ + PQconninfoOption *opts; + char *dbname = NULL; + char *err = NULL; + + opts = PQconninfoParse(connInfo, &err); + if (opts == NULL) + { + /* The error string is malloc'd, so we must free it explicitly */ + char *errcopy = err ? pstrdup(err) : "out of memory"; + + PQfreemem(err); + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("invalid connection string syntax: %s", errcopy))); + } + + for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt) + { + /* + * If multiple dbnames are specified, then the last one will be + * returned + */ + if (strcmp(opt->keyword, "dbname") == 0 && opt->val && + opt->val[0] != '\0') + dbname = pstrdup(opt->val); + } + + return dbname; +} + /* * Start streaming WAL data from given streaming options. * diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index 2dc25e37bb..ba03eeff1c 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -25,6 +25,7 @@ OBJS = \ proto.o \ relation.o \ reorderbuffer.o \ + slotsync.o \ snapbuild.o \ tablesync.o \ worker.o diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index ca09c683f1..5aefb10ecb 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -524,6 +524,18 @@ CreateDecodingContext(XLogRecPtr start_lsn, errmsg("replication slot \"%s\" was not created in this database", NameStr(slot->data.name)))); + /* + * Do not allow consumption of a "synchronized" slot until the standby + * gets promoted. + */ + if (RecoveryInProgress() && slot->data.synced) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot use replication slot \"%s\" for logical" + " decoding", NameStr(slot->data.name)), + errdetail("This slot is being synced from the primary server."), + errhint("Specify another replication slot.")); + /* * Check if slot has been invalidated due to max_slot_wal_keep_size. Avoid * "cannot get changes" wording in this errmsg because that'd be diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index 1050eb2c09..3dec36a6de 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -11,6 +11,7 @@ backend_sources += files( 'proto.c', 'relation.c', 'reorderbuffer.c', + 'slotsync.c', 'snapbuild.c', 'tablesync.c', 'worker.c', diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c new file mode 100644 index 0000000000..751d03f5b9 --- /dev/null +++ b/src/backend/replication/logical/slotsync.c @@ -0,0 +1,1222 @@ +/*------------------------------------------------------------------------- + * slotsync.c + * PostgreSQL worker for synchronizing slots to a standby server from the + * primary server. + * + * Copyright (c) 2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/slotsync.c + * + * This file contains the code for slot sync worker on a physical standby + * to fetch logical failover slots information from the primary server, + * create the slots on the standby and synchronize them periodically. + * + * While creating the slot on physical standby, if the local restart_lsn and/or + * local catalog_xmin is ahead of those on the remote then the worker cannot + * create the local slot in sync with the primary server because that would + * mean moving the local slot backwards and the standby might not have WALs + * retained for old LSN. In this case, the worker will mark the slot as + * RS_TEMPORARY. Once the primary server catches up, the worker will mark the + * slot as RS_PERSISTENT (which means sync-ready) and will perform the sync + * periodically. + * + * The worker also takes care of dropping the slots which were created by it + * and are currently not needed to be synchronized. + * + * It waits for a period of time before the next synchronization, with the + * duration varying based on whether any slots were updated during the last + * cycle. Refer to the comments above wait_for_slot_activity() for more details. + * + * Slot synchronization is currently not supported on the cascading standby. + *--------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/genam.h" +#include "access/table.h" +#include "access/xlog_internal.h" +#include "access/xlogrecovery.h" +#include "catalog/pg_database.h" +#include "commands/dbcommands.h" +#include "pgstat.h" +#include "postmaster/bgworker.h" +#include "postmaster/interrupt.h" +#include "replication/logical.h" +#include "replication/logicallauncher.h" +#include "replication/logicalworker.h" +#include "replication/walreceiver.h" +#include "replication/worker_internal.h" +#include "storage/ipc.h" +#include "storage/lmgr.h" +#include "storage/procarray.h" +#include "tcop/tcopprot.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" +#include "utils/guc_hooks.h" +#include "utils/pg_lsn.h" +#include "utils/varlena.h" + +/* + * Structure to hold information fetched from the primary server about a logical + * replication slot. + */ +typedef struct RemoteSlot +{ + char *name; + char *plugin; + char *database; + bool two_phase; + bool failover; + XLogRecPtr restart_lsn; + XLogRecPtr confirmed_lsn; + TransactionId catalog_xmin; + + /* RS_INVAL_NONE if valid, or the reason of invalidation */ + ReplicationSlotInvalidationCause invalidated; +} RemoteSlot; + +/* + * Struct for sharing information between startup process and slot + * sync worker. + * + * Slot sync worker's pid is needed by the startup process in order to + * shut it down during promotion. Startup process shuts down the slot + * sync worker and also sets stopSignaled=true to handle the race condition + * when postmaster has not noticed the promotion yet and thus may end up + * restarting slot sync worker. If stopSignaled is set, the worker will + * exit in such a case. + */ +typedef struct SlotSyncWorkerCtxStruct +{ + pid_t pid; + bool stopSignaled; + slock_t mutex; +} SlotSyncWorkerCtxStruct; + +SlotSyncWorkerCtxStruct *SlotSyncWorker = NULL; + +/* GUC variable */ +bool enable_syncslot = false; + +/* + * The sleep time (ms) between slot-sync cycles varies dynamically + * (within a MIN/MAX range) according to slot activity. See + * wait_for_slot_activity() for details. + */ +#define MIN_WORKER_NAPTIME_MS 200 +#define MAX_WORKER_NAPTIME_MS 30000 /* 30s */ +static long sleep_ms = MIN_WORKER_NAPTIME_MS; + +static void ProcessSlotSyncInterrupts(WalReceiverConn *wrconn); + +/* + * If necessary, update local slot metadata based on the data from the remote + * slot. + * + * If no update was needed (the data of the remote slot is the same as the + * local slot) return false, otherwise true. + */ +static bool +local_slot_update(RemoteSlot *remote_slot, Oid remote_dbid) +{ + ReplicationSlot *slot = MyReplicationSlot; + NameData plugin_name; + + Assert(slot->data.invalidated == RS_INVAL_NONE); + + if (strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) == 0 && + remote_dbid == slot->data.database && + remote_slot->restart_lsn == slot->data.restart_lsn && + remote_slot->catalog_xmin == slot->data.catalog_xmin && + remote_slot->two_phase == slot->data.two_phase && + remote_slot->failover == slot->data.failover && + remote_slot->confirmed_lsn == slot->data.confirmed_flush) + return false; + + /* Avoid expensive operations while holding a spinlock. */ + namestrcpy(&plugin_name, remote_slot->plugin); + + SpinLockAcquire(&slot->mutex); + slot->data.plugin = plugin_name; + slot->data.database = remote_dbid; + slot->data.two_phase = remote_slot->two_phase; + slot->data.failover = remote_slot->failover; + slot->data.restart_lsn = remote_slot->restart_lsn; + slot->data.confirmed_flush = remote_slot->confirmed_lsn; + slot->data.catalog_xmin = remote_slot->catalog_xmin; + slot->effective_catalog_xmin = remote_slot->catalog_xmin; + SpinLockRelease(&slot->mutex); + + if (remote_slot->catalog_xmin != slot->data.catalog_xmin) + ReplicationSlotsComputeRequiredXmin(false); + + if (remote_slot->restart_lsn != slot->data.restart_lsn) + ReplicationSlotsComputeRequiredLSN(); + + return true; +} + +/* + * Get list of local logical slots which are synchronized from + * the primary server. + */ +static List * +get_local_synced_slots(void) +{ + List *local_slots = NIL; + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + /* Check if it is a synchronized slot */ + if (s->in_use && s->data.synced) + { + Assert(SlotIsLogical(s)); + local_slots = lappend(local_slots, s); + } + } + + LWLockRelease(ReplicationSlotControlLock); + + return local_slots; +} + +/* + * Helper function to check if local_slot is present in remote_slots list. + * + * It also checks if the slot on the standby server was invalidated while the + * corresponding remote slot in the list remained valid. If found so, it sets + * the locally_invalidated flag to true. + */ +static bool +check_sync_slot_on_remote(ReplicationSlot *local_slot, List *remote_slots, + bool *locally_invalidated) +{ + foreach_ptr(RemoteSlot, remote_slot, remote_slots) + { + if (strcmp(remote_slot->name, NameStr(local_slot->data.name)) == 0) + { + /* + * If remote slot is not invalidated but local slot is marked as + * invalidated, then set the bool. + */ + SpinLockAcquire(&local_slot->mutex); + *locally_invalidated = + (remote_slot->invalidated == RS_INVAL_NONE) && + (local_slot->data.invalidated != RS_INVAL_NONE); + SpinLockRelease(&local_slot->mutex); + + return true; + } + } + + return false; +} + +/* + * Drop obsolete slots + * + * Drop the slots that no longer need to be synced i.e. these either do not + * exist on the primary or are no longer enabled for failover. + * + * Additionally, it drops slots that are valid on the primary but got + * invalidated on the standby. This situation may occur due to the following + * reasons: + * - The max_slot_wal_keep_size on the standby is insufficient to retain WAL + * records from the restart_lsn of the slot. + * - primary_slot_name is temporarily reset to null and the physical slot is + * removed. + * - The primary changes wal_level to a level lower than logical. + * + * The assumption is that these dropped slots will get recreated in next + * sync-cycle and it is okay to drop and recreate such slots as long as these + * are not consumable on the standby (which is the case currently). + */ +static void +drop_obsolete_slots(List *remote_slot_list) +{ + List *local_slots = get_local_synced_slots(); + + foreach_ptr(ReplicationSlot, local_slot, local_slots) + { + bool remote_exists = false; + bool locally_invalidated = false; + + remote_exists = check_sync_slot_on_remote(local_slot, remote_slot_list, + &locally_invalidated); + + /* + * Drop the local slot either if it is not in the remote slots list or + * is invalidated while remote slot is still valid. + */ + if (!remote_exists || locally_invalidated) + { + /* + * Use shared lock to prevent a conflict with + * ReplicationSlotsDropDBSlots(), trying to drop the same slot + * while drop-database operation. + */ + LockSharedObject(DatabaseRelationId, local_slot->data.database, + 0, AccessShareLock); + + ReplicationSlotAcquire(NameStr(local_slot->data.name), true); + Assert(MyReplicationSlot->data.synced); + ReplicationSlotDropAcquired(); + + UnlockSharedObject(DatabaseRelationId, local_slot->data.database, + 0, AccessShareLock); + + ereport(LOG, + errmsg("dropped replication slot \"%s\" of dbid %d", + NameStr(local_slot->data.name), + local_slot->data.database)); + } + } +} + +/* + * Reserve WAL for the currently active slot using the specified WAL location + * (restart_lsn). + * + * If the given WAL location has been removed, reserve WAL using the oldest + * existing WAL segment. + */ +static void +reserve_wal_for_slot(XLogRecPtr restart_lsn) +{ + XLogSegNo oldest_segno; + XLogSegNo segno; + ReplicationSlot *slot = MyReplicationSlot; + + Assert(slot != NULL); + Assert(XLogRecPtrIsInvalid(slot->data.restart_lsn)); + + while (true) + { + SpinLockAcquire(&slot->mutex); + slot->data.restart_lsn = restart_lsn; + SpinLockRelease(&slot->mutex); + + /* Prevent WAL removal as fast as possible */ + ReplicationSlotsComputeRequiredLSN(); + + XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size); + + /* + * Find the oldest existing WAL segment file. + * + * Normally, we can determine it by using the last removed segment + * number. However, if no WAL segment files have been removed by a + * checkpoint since startup, we need to search for the oldest segment + * file currently existing in XLOGDIR. + */ + oldest_segno = XLogGetLastRemovedSegno() + 1; + + if (oldest_segno == 1) + oldest_segno = XLogGetOldestSegno(0); + + /* + * If all required WAL is still there, great, otherwise retry. The + * slot should prevent further removal of WAL, unless there's a + * concurrent ReplicationSlotsComputeRequiredLSN() after we've written + * the new restart_lsn above, so normally we should never need to loop + * more than twice. + */ + if (segno >= oldest_segno) + break; + + /* Retry using the location of the oldest wal segment */ + XLogSegNoOffsetToRecPtr(oldest_segno, 0, wal_segment_size, restart_lsn); + } +} + +/* + * Update the LSNs and persist the slot for further syncs if the remote + * restart_lsn and catalog_xmin have caught up with the local ones, otherwise + * do nothing. + * + * Return true if the slot is marked as RS_PERSISTENT (sync-ready), otherwise + * false. + */ +static bool +update_and_persist_slot(RemoteSlot *remote_slot, Oid remote_dbid) +{ + ReplicationSlot *slot = MyReplicationSlot; + + /* + * Check if the primary server has caught up. Refer to the comment atop + * the file for details on this check. + * + * We also need to check if remote_slot's confirmed_lsn becomes valid. It + * is possible to get null values for confirmed_lsn and catalog_xmin if on + * the primary server the slot is just created with a valid restart_lsn + * and slot-sync worker has fetched the slot before the primary server + * could set valid confirmed_lsn and catalog_xmin. + */ + if (remote_slot->restart_lsn < slot->data.restart_lsn || + XLogRecPtrIsInvalid(remote_slot->confirmed_lsn) || + TransactionIdPrecedes(remote_slot->catalog_xmin, + slot->data.catalog_xmin)) + { + /* + * The remote slot didn't catch up to locally reserved position. + * + * We do not drop the slot because the restart_lsn can be ahead of the + * current location when recreating the slot in the next cycle. It may + * take more time to create such a slot. Therefore, we keep this slot + * and attempt the wait and synchronization in the next cycle. + */ + return false; + } + + /* First time slot update, the function must return true */ + if (!local_slot_update(remote_slot, remote_dbid)) + elog(ERROR, "failed to update slot"); + + ReplicationSlotPersist(); + + ereport(LOG, + errmsg("newly created slot \"%s\" is sync-ready now", + remote_slot->name)); + + return true; +} + +/* + * Synchronize single slot to given position. + * + * This creates a new slot if there is no existing one and updates the + * metadata of the slot as per the data received from the primary server. + * + * The slot is created as a temporary slot and stays in the same state until the + * the remote_slot catches up with locally reserved position and local slot is + * updated. The slot is then persisted and is considered as sync-ready for + * periodic syncs. + * + * Returns TRUE if the local slot is updated. + */ +static bool +synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) +{ + ReplicationSlot *slot; + bool slot_updated = false; + XLogRecPtr latestFlushPtr; + + /* + * Make sure that concerned WAL is received and flushed before syncing + * slot to target lsn received from the primary server. + * + * This check will never pass if on the primary server, user has + * configured standby_slot_names GUC correctly, otherwise this can hit + * frequently. + */ + latestFlushPtr = GetStandbyFlushRecPtr(NULL); + if (remote_slot->confirmed_lsn > latestFlushPtr) + { + ereport(LOG, + errmsg("skipping slot synchronization as the received slot sync" + " LSN %X/%X for slot \"%s\" is ahead of the standby position %X/%X", + LSN_FORMAT_ARGS(remote_slot->confirmed_lsn), + remote_slot->name, + LSN_FORMAT_ARGS(latestFlushPtr))); + + return false; + } + + /* Search for the named slot */ + if ((slot = SearchNamedReplicationSlot(remote_slot->name, true))) + { + bool synced; + + SpinLockAcquire(&slot->mutex); + synced = slot->data.synced; + SpinLockRelease(&slot->mutex); + + /* User created slot with the same name exists, raise ERROR. */ + if (!synced) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("exiting from slot synchronization because same" + " name slot \"%s\" already exists on the standby", + remote_slot->name)); + + /* + * Slot created by the slot sync worker exists, sync it. + * + * It is important to acquire the slot here before checking + * invalidation. If we don't acquire the slot first, there could be a + * race condition that the local slot could be invalidated just after + * checking the 'invalidated' flag here and we could end up + * overwriting 'invalidated' flag to remote_slot's value. See + * InvalidatePossiblyObsoleteSlot() where it invalidates slot directly + * if the slot is not acquired by other processes. + */ + ReplicationSlotAcquire(remote_slot->name, true); + + Assert(slot == MyReplicationSlot); + + /* + * Copy the invalidation cause from remote only if local slot is not + * invalidated locally, we don't want to overwrite existing one. + */ + if (slot->data.invalidated == RS_INVAL_NONE) + { + SpinLockAcquire(&slot->mutex); + slot->data.invalidated = remote_slot->invalidated; + SpinLockRelease(&slot->mutex); + + /* Make sure the invalidated state persists across server restart */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + slot_updated = true; + } + + /* Skip the sync of an invalidated slot */ + if (slot->data.invalidated != RS_INVAL_NONE) + { + ReplicationSlotRelease(); + return slot_updated; + } + + /* Slot not ready yet, let's attempt to make it sync-ready now. */ + if (slot->data.persistency == RS_TEMPORARY) + { + slot_updated = update_and_persist_slot(remote_slot, remote_dbid); + } + + /* Slot ready for sync, so sync it. */ + else + { + /* + * Sanity check: As long as the invalidations are handled + * appropriately as above, this should never happen. + */ + if (remote_slot->restart_lsn < slot->data.restart_lsn) + elog(ERROR, + "cannot synchronize local slot \"%s\" LSN(%X/%X)" + " to remote slot's LSN(%X/%X) as synchronization" + " would move it backwards", remote_slot->name, + LSN_FORMAT_ARGS(slot->data.restart_lsn), + LSN_FORMAT_ARGS(remote_slot->restart_lsn)); + + /* Make sure the slot changes persist across server restart */ + if (local_slot_update(remote_slot, remote_dbid)) + { + slot_updated = true; + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + } + } + } + /* Otherwise create the slot first. */ + else + { + NameData plugin_name; + TransactionId xmin_horizon = InvalidTransactionId; + + /* Skip creating the local slot if remote_slot is invalidated already */ + if (remote_slot->invalidated != RS_INVAL_NONE) + return false; + + ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY, + remote_slot->two_phase, + remote_slot->failover, + true /* synced */ ); + + /* For shorter lines. */ + slot = MyReplicationSlot; + + /* Avoid expensive operations while holding a spinlock. */ + namestrcpy(&plugin_name, remote_slot->plugin); + + SpinLockAcquire(&slot->mutex); + slot->data.database = remote_dbid; + slot->data.plugin = plugin_name; + SpinLockRelease(&slot->mutex); + + reserve_wal_for_slot(remote_slot->restart_lsn); + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + xmin_horizon = GetOldestSafeDecodingTransactionId(true); + SpinLockAcquire(&slot->mutex); + slot->effective_catalog_xmin = xmin_horizon; + slot->data.catalog_xmin = xmin_horizon; + SpinLockRelease(&slot->mutex); + ReplicationSlotsComputeRequiredXmin(true); + LWLockRelease(ProcArrayLock); + + (void) update_and_persist_slot(remote_slot, remote_dbid); + slot_updated = true; + } + + ReplicationSlotRelease(); + + return slot_updated; +} + +/* + * Maps the pg_replication_slots.conflict_reason text value to + * ReplicationSlotInvalidationCause enum value + */ +static ReplicationSlotInvalidationCause +get_slot_invalidation_cause(char *conflict_reason) +{ + Assert(conflict_reason); + + if (strcmp(conflict_reason, SLOT_INVAL_WAL_REMOVED_TEXT) == 0) + return RS_INVAL_WAL_REMOVED; + else if (strcmp(conflict_reason, SLOT_INVAL_HORIZON_TEXT) == 0) + return RS_INVAL_HORIZON; + else if (strcmp(conflict_reason, SLOT_INVAL_WAL_LEVEL_TEXT) == 0) + return RS_INVAL_WAL_LEVEL; + else + Assert(0); + + /* Keep compiler quiet */ + return RS_INVAL_NONE; +} + +/* + * Synchronize slots. + * + * Gets the failover logical slots info from the primary server and updates + * the slots locally. Creates the slots if not present on the standby. + * + * Returns TRUE if any of the slots gets updated in this sync-cycle. + */ +static bool +synchronize_slots(WalReceiverConn *wrconn) +{ +#define SLOTSYNC_COLUMN_COUNT 9 + Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID, + LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID}; + + WalRcvExecResult *res; + TupleTableSlot *tupslot; + StringInfoData s; + List *remote_slot_list = NIL; + bool some_slot_updated = false; + XLogRecPtr latestWalEnd; + + /* + * The primary_slot_name is not set yet or WALs not received yet. + * Synchronization is not possible if the walreceiver is not started. + */ + latestWalEnd = GetWalRcvLatestWalEnd(); + SpinLockAcquire(&WalRcv->mutex); + if ((WalRcv->slotname[0] == '\0') || + XLogRecPtrIsInvalid(latestWalEnd)) + { + SpinLockRelease(&WalRcv->mutex); + return false; + } + SpinLockRelease(&WalRcv->mutex); + + /* The syscache access in walrcv_exec() needs a transaction env. */ + StartTransactionCommand(); + + initStringInfo(&s); + + /* Construct query to fetch slots with failover enabled. */ + appendStringInfo(&s, + "SELECT slot_name, plugin, confirmed_flush_lsn," + " restart_lsn, catalog_xmin, two_phase, failover," + " database, conflict_reason" + " FROM pg_catalog.pg_replication_slots" + " WHERE failover and NOT temporary"); + + /* Execute the query */ + res = walrcv_exec(wrconn, s.data, SLOTSYNC_COLUMN_COUNT, slotRow); + pfree(s.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + errmsg("could not fetch failover logical slots info from the primary server: %s", + res->err)); + + /* Construct the remote_slot tuple and synchronize each slot locally */ + tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot)) + { + bool isnull; + RemoteSlot *remote_slot = palloc0(sizeof(RemoteSlot)); + Datum d; + int col = 0; + + remote_slot->name = TextDatumGetCString(slot_getattr(tupslot, ++col, + &isnull)); + Assert(!isnull); + + remote_slot->plugin = TextDatumGetCString(slot_getattr(tupslot, ++col, + &isnull)); + Assert(!isnull); + + /* + * It is possible to get null values for LSN and Xmin if slot is + * invalidated on the primary server, so handle accordingly. + */ + d = slot_getattr(tupslot, ++col, &isnull); + remote_slot->confirmed_lsn = isnull ? InvalidXLogRecPtr : + DatumGetLSN(d); + + d = slot_getattr(tupslot, ++col, &isnull); + remote_slot->restart_lsn = isnull ? InvalidXLogRecPtr : DatumGetLSN(d); + + d = slot_getattr(tupslot, ++col, &isnull); + remote_slot->catalog_xmin = isnull ? InvalidTransactionId : + DatumGetTransactionId(d); + + remote_slot->two_phase = DatumGetBool(slot_getattr(tupslot, ++col, + &isnull)); + Assert(!isnull); + + remote_slot->failover = DatumGetBool(slot_getattr(tupslot, ++col, + &isnull)); + Assert(!isnull); + + remote_slot->database = TextDatumGetCString(slot_getattr(tupslot, + ++col, &isnull)); + Assert(!isnull); + + d = slot_getattr(tupslot, ++col, &isnull); + remote_slot->invalidated = isnull ? RS_INVAL_NONE : + get_slot_invalidation_cause(TextDatumGetCString(d)); + + /* Sanity check */ + Assert(col == SLOTSYNC_COLUMN_COUNT); + + /* Create list of remote slots */ + remote_slot_list = lappend(remote_slot_list, remote_slot); + + ExecClearTuple(tupslot); + } + + /* Drop local slots that no longer need to be synced. */ + drop_obsolete_slots(remote_slot_list); + + /* Now sync the slots locally */ + foreach_ptr(RemoteSlot, remote_slot, remote_slot_list) + { + Oid remote_dbid = get_database_oid(remote_slot->database, false); + + /* + * Use shared lock to prevent a conflict with + * ReplicationSlotsDropDBSlots(), trying to drop the same slot while + * drop-database operation. + */ + LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock); + + some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid); + + UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock); + } + + /* We are done, free remote_slot_list elements */ + list_free_deep(remote_slot_list); + + walrcv_clear_result(res); + + CommitTransactionCommand(); + + return some_slot_updated; +} + +/* + * Checks the primary server info. + * + * Using the specified primary server connection, check whether we are a + * cascading standby. It also validates primary_slot_name for non-cascading + * standbys. + */ +static void +check_primary_info(WalReceiverConn *wrconn, bool *am_cascading_standby) +{ +#define PRIMARY_INFO_OUTPUT_COL_COUNT 2 + WalRcvExecResult *res; + Oid slotRow[PRIMARY_INFO_OUTPUT_COL_COUNT] = {BOOLOID, BOOLOID}; + StringInfoData cmd; + bool isnull; + TupleTableSlot *tupslot; + bool valid; + bool remote_in_recovery; + + /* The syscache access in walrcv_exec() needs a transaction env. */ + StartTransactionCommand(); + + Assert(am_cascading_standby != NULL); + + *am_cascading_standby = false; /* overwritten later if cascading */ + + initStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT pg_is_in_recovery(), count(*) = 1" + " FROM pg_replication_slots" + " WHERE slot_type='physical' AND slot_name=%s", + quote_literal_cstr(PrimarySlotName)); + + res = walrcv_exec(wrconn, cmd.data, PRIMARY_INFO_OUTPUT_COL_COUNT, slotRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + errmsg("could not fetch primary_slot_name \"%s\" info from the primary server: %s", + PrimarySlotName, res->err), + errhint("Check if \"primary_slot_name\" is configured correctly.")); + + tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + if (!tuplestore_gettupleslot(res->tuplestore, true, false, tupslot)) + elog(ERROR, + "failed to fetch tuple for the primary server slot specified by \"primary_slot_name\""); + + remote_in_recovery = DatumGetBool(slot_getattr(tupslot, 1, &isnull)); + Assert(!isnull); + + if (remote_in_recovery) + { + /* No need to check further, just set am_cascading_standby to true */ + *am_cascading_standby = true; + } + else + { + /* We are a normal standby */ + valid = DatumGetBool(slot_getattr(tupslot, 2, &isnull)); + Assert(!isnull); + + if (!valid) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("exiting from slot synchronization due to bad configuration"), + /* translator: second %s is a GUC variable name */ + errdetail("The primary server slot \"%s\" specified by" + " \"%s\" is not valid.", + PrimarySlotName, "primary_slot_name")); + } + + ExecClearTuple(tupslot); + walrcv_clear_result(res); + CommitTransactionCommand(); +} + +/* + * Check that all necessary GUCs for slot synchronization are set + * appropriately. If not, raise an ERROR. + * + * If all checks pass, extracts the dbname from the primary_conninfo GUC and + * returns it. + */ +static char * +validate_parameters_and_get_dbname(void) +{ + char *dbname; + + /* Sanity check. */ + Assert(enable_syncslot); + + /* + * A physical replication slot(primary_slot_name) is required on the + * primary to ensure that the rows needed by the standby are not removed + * after restarting, so that the synchronized slot on the standby will not + * be invalidated. + */ + if (PrimarySlotName == NULL || strcmp(PrimarySlotName, "") == 0) + ereport(ERROR, + /* translator: %s is a GUC variable name */ + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("exiting from slot synchronization due to bad configuration"), + errhint("\"%s\" must be defined.", "primary_slot_name")); + + /* + * hot_standby_feedback must be enabled to cooperate with the physical + * replication slot, which allows informing the primary about the xmin and + * catalog_xmin values on the standby. + */ + if (!hot_standby_feedback) + ereport(ERROR, + /* translator: %s is a GUC variable name */ + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("exiting from slot synchronization due to bad configuration"), + errhint("\"%s\" must be enabled.", "hot_standby_feedback")); + + /* + * Logical decoding requires wal_level >= logical and we currently only + * synchronize logical slots. + */ + if (wal_level < WAL_LEVEL_LOGICAL) + ereport(ERROR, + /* translator: %s is a GUC variable name */ + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("exiting from slot synchronization due to bad configuration"), + errhint("\"wal_level\" must be >= logical.")); + + /* + * The primary_conninfo is required to make connection to primary for + * getting slots information. + */ + if (PrimaryConnInfo == NULL || strcmp(PrimaryConnInfo, "") == 0) + ereport(ERROR, + /* translator: %s is a GUC variable name */ + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("exiting from slot synchronization due to bad configuration"), + errhint("\"%s\" must be defined.", "primary_conninfo")); + + /* + * The slot sync worker needs a database connection for walrcv_exec to + * work. + */ + dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo); + if (dbname == NULL) + ereport(ERROR, + + /* + * translator: 'dbname' is a specific option; %s is a GUC variable + * name + */ + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("exiting from slot synchronization due to bad configuration"), + errhint("'dbname' must be specified in \"%s\".", "primary_conninfo")); + + return dbname; +} + +/* + * Re-read the config file. + * + * If any of the slot sync GUCs have changed, exit the worker and + * let it get restarted by the postmaster. + */ +static void +slotsync_reread_config(void) +{ + char *old_primary_conninfo = pstrdup(PrimaryConnInfo); + char *old_primary_slotname = pstrdup(PrimarySlotName); + bool old_hot_standby_feedback = hot_standby_feedback; + bool conninfo_changed; + bool primary_slotname_changed; + + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + + conninfo_changed = strcmp(old_primary_conninfo, PrimaryConnInfo) != 0; + primary_slotname_changed = strcmp(old_primary_slotname, PrimarySlotName) != 0; + + if (conninfo_changed || + primary_slotname_changed || + (old_hot_standby_feedback != hot_standby_feedback)) + { + ereport(LOG, + errmsg("slot sync worker will restart because of a parameter change")); + + /* The exit code 1 will make postmaster restart this worker */ + proc_exit(1); + } + + pfree(old_primary_conninfo); + pfree(old_primary_slotname); +} + +/* + * Interrupt handler for main loop of slot sync worker. + */ +static void +ProcessSlotSyncInterrupts(WalReceiverConn *wrconn) +{ + CHECK_FOR_INTERRUPTS(); + + if (ShutdownRequestPending) + { + walrcv_disconnect(wrconn); + ereport(LOG, + errmsg("replication slot sync worker is shutting down on receiving SIGINT")); + proc_exit(0); + } + + if (ConfigReloadPending) + slotsync_reread_config(); +} + +/* + * Cleanup function for slotsync worker. + * + * Called on slotsync worker exit. + */ +static void +slotsync_worker_onexit(int code, Datum arg) +{ + SpinLockAcquire(&SlotSyncWorker->mutex); + SlotSyncWorker->pid = InvalidPid; + SpinLockRelease(&SlotSyncWorker->mutex); +} + +/* + * Sleep for long enough that we believe it's likely that the slots on primary + * get updated. + * + * If there is no slot activity the wait time between sync-cycles will double + * (to a maximum of 30s). If there is some slot activity the wait time between + * sync-cycles is reset to the minimum (200ms). + */ +static void +wait_for_slot_activity(bool some_slot_updated, bool am_cascading_standby) +{ + int rc; + + if (am_cascading_standby) + { + /* + * Slot synchronization is currently not supported on cascading + * standby. So if we are on the cascading standby, we will skip the + * sync and take a longer nap before we check again whether we are + * still cascading standby or not. + */ + sleep_ms = MAX_WORKER_NAPTIME_MS; + } + else if (!some_slot_updated) + { + /* + * No slots were updated, so double the sleep time, but not beyond the + * maximum allowable value. + */ + sleep_ms = Min(sleep_ms * 2, MAX_WORKER_NAPTIME_MS); + } + else + { + /* + * Some slots were updated since the last sleep, so reset the sleep + * time. + */ + sleep_ms = MIN_WORKER_NAPTIME_MS; + } + + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + sleep_ms, + WAIT_EVENT_REPL_SLOTSYNC_MAIN); + + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); +} + +/* + * The main loop of our worker process. + * + * It connects to the primary server, fetches logical failover slots + * information periodically in order to create and sync the slots. + */ +void +ReplSlotSyncWorkerMain(Datum main_arg) +{ + WalReceiverConn *wrconn = NULL; + char *dbname; + bool am_cascading_standby; + char *err; + + ereport(LOG, errmsg("replication slot sync worker started")); + + on_shmem_exit(slotsync_worker_onexit, (Datum) 0); + + SpinLockAcquire(&SlotSyncWorker->mutex); + + Assert(SlotSyncWorker->pid == InvalidPid); + + /* + * Startup process signaled the slot sync worker to stop, so if meanwhile + * postmaster ended up starting the worker again, exit. + */ + if (SlotSyncWorker->stopSignaled) + { + SpinLockRelease(&SlotSyncWorker->mutex); + proc_exit(0); + } + + /* Advertise our PID so that the startup process can kill us on promotion */ + SlotSyncWorker->pid = MyProcPid; + + SpinLockRelease(&SlotSyncWorker->mutex); + + /* Setup signal handling */ + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGINT, SignalHandlerForShutdownRequest); + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* Load the libpq-specific functions */ + load_file("libpqwalreceiver", false); + + dbname = validate_parameters_and_get_dbname(); + + /* + * Connect to the database specified by user in primary_conninfo. We need + * a database connection for walrcv_exec to work. Please see comments atop + * libpqrcv_exec. + */ + BackgroundWorkerInitializeConnection(dbname, NULL, 0); + + /* + * Establish the connection to the primary server for slots + * synchronization. + */ + wrconn = walrcv_connect(PrimaryConnInfo, true, false, + cluster_name[0] ? cluster_name : "slotsyncworker", + &err); + if (!wrconn) + ereport(ERROR, + errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the primary server: %s", err)); + + /* + * Using the specified primary server connection, check whether we are + * cascading standby and validates primary_slot_name for + * non-cascading-standbys. + */ + check_primary_info(wrconn, &am_cascading_standby); + + /* Main wait loop */ + for (;;) + { + bool some_slot_updated = false; + + ProcessSlotSyncInterrupts(wrconn); + + if (!am_cascading_standby) + some_slot_updated = synchronize_slots(wrconn); + + wait_for_slot_activity(some_slot_updated, am_cascading_standby); + + /* + * If the standby was promoted then what was previously a cascading + * standby might no longer be one, so recheck each time. + */ + if (am_cascading_standby) + check_primary_info(wrconn, &am_cascading_standby); + } + + /* + * The slot sync worker can not get here because it will only stop when it + * receives a SIGINT from the startup process, or when there is an error. + */ + Assert(false); +} + +/* + * Is current process the slot sync worker? + */ +bool +IsLogicalSlotSyncWorker(void) +{ + return SlotSyncWorker->pid == MyProcPid; +} + +/* + * Shut down the slot sync worker. + */ +void +ShutDownSlotSync(void) +{ + SpinLockAcquire(&SlotSyncWorker->mutex); + + SlotSyncWorker->stopSignaled = true; + + if (SlotSyncWorker->pid == InvalidPid) + { + SpinLockRelease(&SlotSyncWorker->mutex); + return; + } + SpinLockRelease(&SlotSyncWorker->mutex); + + kill(SlotSyncWorker->pid, SIGINT); + + /* Wait for it to die */ + for (;;) + { + int rc; + + /* Wait a bit, we don't expect to have to wait long */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 10L, WAIT_EVENT_BGWORKER_SHUTDOWN); + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + SpinLockAcquire(&SlotSyncWorker->mutex); + + /* Is it gone? */ + if (SlotSyncWorker->pid == InvalidPid) + break; + + SpinLockRelease(&SlotSyncWorker->mutex); + } + + SpinLockRelease(&SlotSyncWorker->mutex); +} + +/* + * Allocate and initialize slot sync worker shared memory + */ +void +SlotSyncWorkerShmemInit(void) +{ + Size size; + bool found; + + size = sizeof(SlotSyncWorkerCtxStruct); + size = MAXALIGN(size); + + SlotSyncWorker = (SlotSyncWorkerCtxStruct *) + ShmemInitStruct("Slot Sync Worker Data", size, &found); + + if (!found) + { + memset(SlotSyncWorker, 0, size); + SlotSyncWorker->pid = InvalidPid; + SpinLockInit(&SlotSyncWorker->mutex); + } +} + +/* + * Register the background worker for slots synchronization provided + * enable_syncslot is ON. + */ +void +SlotSyncWorkerRegister(void) +{ + BackgroundWorker bgw; + + if (!enable_syncslot) + { + ereport(LOG, + errmsg("skipping slot synchronization"), + errdetail("\"enable_syncslot\" is disabled.")); + return; + } + + memset(&bgw, 0, sizeof(bgw)); + + /* We need database connection which needs shared-memory access as well */ + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + + /* Start as soon as a consistent state has been reached in a hot standby */ + bgw.bgw_start_time = BgWorkerStart_ConsistentState_HotStandby; + + snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ReplSlotSyncWorkerMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "replication slot sync worker"); + snprintf(bgw.bgw_type, BGW_MAXLEN, + "slot sync worker"); + + bgw.bgw_restart_time = BGW_DEFAULT_RESTART_INTERVAL; + bgw.bgw_notify_pid = 0; + bgw.bgw_main_arg = (Datum) 0; + + RegisterBackgroundWorker(&bgw); +} diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index f2781d0455..8caa6a9e09 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -46,7 +46,9 @@ #include "common/string.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/logicalworker.h" #include "replication/slot.h" +#include "replication/walsender.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/proc.h" @@ -103,7 +105,6 @@ int max_replication_slots = 10; /* the maximum number of replication * slots */ static void ReplicationSlotShmemExit(int code, Datum arg); -static void ReplicationSlotDropAcquired(void); static void ReplicationSlotDropPtr(ReplicationSlot *slot); /* internal persistency functions */ @@ -250,11 +251,12 @@ ReplicationSlotValidateName(const char *name, int elevel) * user will only get commit prepared. * failover: If enabled, allows the slot to be synced to standbys so * that logical replication can be resumed after failover. + * synced: True if the slot is created by a slotsync worker. */ void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase, bool failover) + bool two_phase, bool failover, bool synced) { ReplicationSlot *slot = NULL; int i; @@ -263,6 +265,19 @@ ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotValidateName(name, ERROR); + /* + * Do not allow users to create the slots with failover enabled on the + * standby as we do not support sync to the cascading standby. + * + * Slot sync worker can still create slots with failover enabled, as it + * needs to maintain this value in sync with the remote slots. + */ + if (failover && RecoveryInProgress() && !IsLogicalSlotSyncWorker()) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot enable failover for a replication slot" + " created on the standby")); + /* * If some other backend ran this code concurrently with us, we'd likely * both allocate the same slot, and that would be bad. We'd also be at @@ -315,6 +330,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->data.two_phase = two_phase; slot->data.two_phase_at = InvalidXLogRecPtr; slot->data.failover = failover; + slot->data.synced = synced; /* and then data only present in shared memory */ slot->just_dirtied = false; @@ -680,6 +696,16 @@ ReplicationSlotDrop(const char *name, bool nowait) ReplicationSlotAcquire(name, nowait); + /* + * Do not allow users to drop the slots which are currently being synced + * from the primary to the standby. + */ + if (RecoveryInProgress() && MyReplicationSlot->data.synced) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot drop replication slot \"%s\"", name), + errdetail("This slot is being synced from the primary server.")); + ReplicationSlotDropAcquired(); } @@ -699,6 +725,29 @@ ReplicationSlotAlter(const char *name, bool failover) errmsg("cannot use %s with a physical replication slot", "ALTER_REPLICATION_SLOT")); + if (RecoveryInProgress()) + { + /* + * Do not allow users to alter the slots which are currently being + * synced from the primary to the standby. + */ + if (MyReplicationSlot->data.synced) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot alter replication slot \"%s\"", name), + errdetail("This slot is being synced from the primary server.")); + + /* + * Do not allow users to alter slots to enable failover on the standby + * as we do not support sync to the cascading standby. + */ + if (failover) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot enable failover for a replication slot" + " on the standby")); + } + SpinLockAcquire(&MyReplicationSlot->mutex); MyReplicationSlot->data.failover = failover; SpinLockRelease(&MyReplicationSlot->mutex); @@ -711,7 +760,7 @@ ReplicationSlotAlter(const char *name, bool failover) /* * Permanently drop the currently acquired replication slot. */ -static void +void ReplicationSlotDropAcquired(void) { ReplicationSlot *slot = MyReplicationSlot; @@ -867,8 +916,8 @@ ReplicationSlotMarkDirty(void) } /* - * Convert a slot that's marked as RS_EPHEMERAL to a RS_PERSISTENT slot, - * guaranteeing it will be there after an eventual crash. + * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a + * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash. */ void ReplicationSlotPersist(void) diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index eb685089b3..843ae8cd68 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -43,7 +43,7 @@ create_physical_replication_slot(char *name, bool immediately_reserve, /* acquire replication slot, this will check for conflicting names */ ReplicationSlotCreate(name, false, temporary ? RS_TEMPORARY : RS_PERSISTENT, false, - false); + false, false /* synced */ ); if (immediately_reserve) { @@ -136,7 +136,7 @@ create_logical_replication_slot(char *name, char *plugin, */ ReplicationSlotCreate(name, true, temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase, - failover); + failover, false /* synced */ ); /* * Create logical decoding context to find start point or, if we don't @@ -237,7 +237,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 16 +#define PG_GET_REPLICATION_SLOTS_COLS 17 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; XLogRecPtr currlsn; int slotno; @@ -418,21 +418,23 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) break; case RS_INVAL_WAL_REMOVED: - values[i++] = CStringGetTextDatum("wal_removed"); + values[i++] = CStringGetTextDatum(SLOT_INVAL_WAL_REMOVED_TEXT); break; case RS_INVAL_HORIZON: - values[i++] = CStringGetTextDatum("rows_removed"); + values[i++] = CStringGetTextDatum(SLOT_INVAL_HORIZON_TEXT); break; case RS_INVAL_WAL_LEVEL: - values[i++] = CStringGetTextDatum("wal_level_insufficient"); + values[i++] = CStringGetTextDatum(SLOT_INVAL_WAL_LEVEL_TEXT); break; } } values[i++] = BoolGetDatum(slot_contents.data.failover); + values[i++] = BoolGetDatum(slot_contents.data.synced); + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, @@ -700,7 +702,6 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) XLogRecPtr src_restart_lsn; bool src_islogical; bool temporary; - bool failover; char *plugin; Datum values[2]; bool nulls[2]; @@ -756,7 +757,6 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) src_islogical = SlotIsLogical(&first_slot_contents); src_restart_lsn = first_slot_contents.data.restart_lsn; temporary = (first_slot_contents.data.persistency == RS_TEMPORARY); - failover = first_slot_contents.data.failover; plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL; /* Check type of replication slot */ @@ -791,12 +791,20 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) * We must not try to read WAL, since we haven't reserved it yet -- * hence pass find_startpoint false. confirmed_flush will be set * below, by copying from the source slot. + * + * To avoid potential issues with the slotsync worker when the + * restart_lsn of a replication slot goes backwards, we set the + * failover option to false here. This situation occurs when a slot on + * the primary server is dropped and immediately replaced with a new + * slot of the same name, created by copying from another existing + * slot. However, the slotsync worker will only observe the restart_lsn + * of the same slot going backwards. */ create_logical_replication_slot(NameStr(*dst_name), plugin, temporary, false, - failover, + false, src_restart_lsn, false); } diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 73a7d8f96c..d420a833cd 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -345,6 +345,22 @@ GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI) return recptr; } +/* + * Returns the latest reported end of WAL on the sender + */ +XLogRecPtr +GetWalRcvLatestWalEnd() +{ + WalRcvData *walrcv = WalRcv; + XLogRecPtr recptr; + + SpinLockAcquire(&walrcv->mutex); + recptr = walrcv->latestWalEnd; + SpinLockRelease(&walrcv->mutex); + + return recptr; +} + /* * Returns the last+1 byte position that walreceiver has written. * This returns a recently written value without taking a lock. diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 77c8baa32a..f753aed345 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -72,6 +72,7 @@ #include "postmaster/interrupt.h" #include "replication/decode.h" #include "replication/logical.h" +#include "replication/logicalworker.h" #include "replication/slot.h" #include "replication/snapbuild.h" #include "replication/syncrep.h" @@ -243,7 +244,6 @@ static void WalSndShutdown(void) pg_attribute_noreturn(); static void XLogSendPhysical(void); static void XLogSendLogical(void); static void WalSndDone(WalSndSendDataCallback send_data); -static XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli); static void IdentifySystem(void); static void UploadManifest(void); static bool HandleUploadManifestPacket(StringInfo buf, off_t *offset, @@ -1224,7 +1224,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) { ReplicationSlotCreate(cmd->slotname, false, cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT, - false, false); + false, false, false /* synced */ ); if (reserve_wal) { @@ -1255,7 +1255,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) */ ReplicationSlotCreate(cmd->slotname, true, cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, - two_phase, failover); + two_phase, failover, false /* synced */ ); /* * Do options check early so that we can bail before calling the @@ -3375,14 +3375,17 @@ WalSndDone(WalSndSendDataCallback send_data) } /* - * Returns the latest point in WAL that has been safely flushed to disk, and - * can be sent to the standby. This should only be called when in recovery, - * ie. we're streaming to a cascaded standby. + * Returns the latest point in WAL that has been safely flushed to disk. + * This should only be called when in recovery. + * + * This is called either by cascading walsender to find WAL postion to + * be sent to a cascaded standby or by a slot sync worker to validate + * remote slot's lsn before syncing it locally. * * As a side-effect, *tli is updated to the TLI of the last * replayed WAL record. */ -static XLogRecPtr +XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli) { XLogRecPtr replayPtr; @@ -3391,6 +3394,8 @@ GetStandbyFlushRecPtr(TimeLineID *tli) TimeLineID receiveTLI; XLogRecPtr result; + Assert(am_cascading_walsender || IsLogicalSlotSyncWorker()); + /* * We can safely send what's already been replayed. Also, if walreceiver * is streaming WAL from the same timeline, we can send anything that it diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 7084e18861..925ac6c942 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -38,6 +38,7 @@ #include "replication/slot.h" #include "replication/walreceiver.h" #include "replication/walsender.h" +#include "replication/worker_internal.h" #include "storage/bufmgr.h" #include "storage/dsm.h" #include "storage/dsm_registry.h" @@ -347,6 +348,7 @@ CreateOrAttachShmemStructs(void) WalSummarizerShmemInit(); PgArchShmemInit(); ApplyLauncherShmemInit(); + SlotSyncWorkerShmemInit(); /* * Set up other modules that need some shared memory space diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 1a34bd3715..e8c530acd9 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -3286,6 +3286,17 @@ ProcessInterrupts(void) */ proc_exit(1); } + else if (IsLogicalSlotSyncWorker()) + { + elog(DEBUG1, + "replication slot sync worker is shutting down due to administrator command"); + + /* + * Slot sync worker can be stopped at any time. Use exit status 1 + * so the background worker is restarted. + */ + proc_exit(1); + } else if (IsBackgroundWorker) ereport(FATAL, (errcode(ERRCODE_ADMIN_SHUTDOWN), diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index a5df835dd4..3e6203322a 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -53,6 +53,7 @@ LOGICAL_APPLY_MAIN "Waiting in main loop of logical replication apply process." LOGICAL_LAUNCHER_MAIN "Waiting in main loop of logical replication launcher process." LOGICAL_PARALLEL_APPLY_MAIN "Waiting in main loop of logical replication parallel apply process." RECOVERY_WAL_STREAM "Waiting in main loop of startup process for WAL to arrive, during streaming recovery." +REPL_SLOTSYNC_MAIN "Waiting in main loop of slot sync worker." SYSLOGGER_MAIN "Waiting in main loop of syslogger process." WAL_RECEIVER_MAIN "Waiting in main loop of WAL receiver process." WAL_SENDER_MAIN "Waiting in main loop of WAL sender process." diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 7fe58518d7..fe044c16de 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -68,6 +68,7 @@ #include "replication/logicallauncher.h" #include "replication/slot.h" #include "replication/syncrep.h" +#include "replication/worker_internal.h" #include "storage/bufmgr.h" #include "storage/large_object.h" #include "storage/pg_shmem.h" @@ -2054,6 +2055,15 @@ struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"enable_syncslot", PGC_POSTMASTER, REPLICATION_STANDBY, + gettext_noop("Enables a physical standby to synchronize logical failover slots from the primary server."), + }, + &enable_syncslot, + false, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index da10b43dac..3868694d3f 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -361,6 +361,7 @@ #wal_retrieve_retry_interval = 5s # time to wait before retrying to # retrieve WAL after a failed attempt #recovery_min_apply_delay = 0 # minimum delay for applying changes during recovery +#enable_syncslot = off # enables slot synchronization on the physical standby from the primary # - Subscribers - diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 29af4ce65d..994e33f59b 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11127,9 +11127,9 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text,bool}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason,failover}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text,bool,bool}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason,failover,synced}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', diff --git a/src/include/postmaster/bgworker.h b/src/include/postmaster/bgworker.h index 22fc49ec27..7092fc72c6 100644 --- a/src/include/postmaster/bgworker.h +++ b/src/include/postmaster/bgworker.h @@ -79,6 +79,7 @@ typedef enum BgWorkerStart_PostmasterStart, BgWorkerStart_ConsistentState, BgWorkerStart_RecoveryFinished, + BgWorkerStart_ConsistentState_HotStandby, } BgWorkerStartTime; #define BGW_DEFAULT_RESTART_INTERVAL 60 diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index a18d79d1b2..bbe04226db 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -22,6 +22,7 @@ extern void TablesyncWorkerMain(Datum main_arg); extern bool IsLogicalWorker(void); extern bool IsLogicalParallelApplyWorker(void); +extern bool IsLogicalSlotSyncWorker(void); extern void HandleParallelApplyMessageInterrupt(void); extern void HandleParallelApplyMessages(void); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index da4c776492..1f4446aa3a 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -52,6 +52,14 @@ typedef enum ReplicationSlotInvalidationCause RS_INVAL_WAL_LEVEL, } ReplicationSlotInvalidationCause; +/* + * The possible values for 'conflict_reason' returned in + * pg_get_replication_slots. + */ +#define SLOT_INVAL_WAL_REMOVED_TEXT "wal_removed" +#define SLOT_INVAL_HORIZON_TEXT "rows_removed" +#define SLOT_INVAL_WAL_LEVEL_TEXT "wal_level_insufficient" + /* * On-Disk data of a replication slot, preserved across restarts. */ @@ -112,6 +120,11 @@ typedef struct ReplicationSlotPersistentData /* plugin name */ NameData plugin; + /* + * Was this slot synchronized from the primary server? + */ + char synced; + /* * Is this a failover slot (sync candidate for standbys)? Only relevant * for logical slots on the primary server. @@ -224,9 +237,11 @@ extern void ReplicationSlotsShmemInit(void); /* management of individual slots */ extern void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase, bool failover); + bool two_phase, bool failover, + bool synced); extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); +extern void ReplicationSlotDropAcquired(void); extern void ReplicationSlotAlter(const char *name, bool failover); extern void ReplicationSlotAcquire(const char *name, bool nowait); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index f566a99ba1..48dc846e19 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -279,6 +279,13 @@ typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn, typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn, TimeLineID *primary_tli); +/* + * walrcv_get_dbname_from_conninfo_fn + * + * Returns the dbid from the primary_conninfo + */ +typedef char *(*walrcv_get_dbname_from_conninfo_fn) (const char *conninfo); + /* * walrcv_server_version_fn * @@ -403,6 +410,7 @@ typedef struct WalReceiverFunctionsType walrcv_get_conninfo_fn walrcv_get_conninfo; walrcv_get_senderinfo_fn walrcv_get_senderinfo; walrcv_identify_system_fn walrcv_identify_system; + walrcv_get_dbname_from_conninfo_fn walrcv_get_dbname_from_conninfo; walrcv_server_version_fn walrcv_server_version; walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile; walrcv_startstreaming_fn walrcv_startstreaming; @@ -428,6 +436,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port) #define walrcv_identify_system(conn, primary_tli) \ WalReceiverFunctions->walrcv_identify_system(conn, primary_tli) +#define walrcv_get_dbname_from_conninfo(conninfo) \ + WalReceiverFunctions->walrcv_get_dbname_from_conninfo(conninfo) #define walrcv_server_version(conn) \ WalReceiverFunctions->walrcv_server_version(conn) #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \ @@ -485,6 +495,7 @@ extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, bool create_temp_slot); extern XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI); extern XLogRecPtr GetWalRcvWriteRecPtr(void); +extern XLogRecPtr GetWalRcvLatestWalEnd(void); extern int GetReplicationApplyDelay(void); extern int GetReplicationTransferLatency(void); diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 1b58d50b3b..276d8913aa 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -12,6 +12,8 @@ #ifndef _WALSENDER_H #define _WALSENDER_H +#include "access/xlogdefs.h" + /* * What to do with a snapshot in create replication slot command. */ @@ -45,6 +47,7 @@ extern void WalSndInitStopping(void); extern void WalSndWaitStopping(void); extern void HandleWalSndInitStopping(void); extern void WalSndRqstFileReload(void); +extern XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli); /* * Remember that we want to wakeup walsenders later diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 515aefd519..2167720971 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -237,6 +237,11 @@ extern PGDLLIMPORT bool in_remote_transaction; extern PGDLLIMPORT bool InitializingApplyWorker; +/* Slot sync worker objects */ +extern PGDLLIMPORT char *PrimaryConnInfo; +extern PGDLLIMPORT char *PrimarySlotName; +extern PGDLLIMPORT bool enable_syncslot; + extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); @@ -325,6 +330,11 @@ extern void pa_decr_and_wait_stream_block(void); extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); +extern void ReplSlotSyncWorkerMain(Datum main_arg); +extern void SlotSyncWorkerRegister(void); +extern void ShutDownSlotSync(void); +extern void SlotSyncWorkerShmemInit(void); + #define isParallelApplyWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_PARALLEL_APPLY) #define isTablesyncWorker(worker) ((worker)->in_use && \ diff --git a/src/test/recovery/t/050_standby_failover_slots_sync.pl b/src/test/recovery/t/050_standby_failover_slots_sync.pl index 646293c39e..1055573cde 100644 --- a/src/test/recovery/t/050_standby_failover_slots_sync.pl +++ b/src/test/recovery/t/050_standby_failover_slots_sync.pl @@ -84,6 +84,170 @@ is( $publisher->safe_psql( "t", 'logical slot has failover true on the publisher'); -$subscriber1->safe_psql('postgres', "DROP SUBSCRIPTION regress_mysub1"); +$subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 ENABLE"); + +################################################## +# Test logical failover slots on the standby +# Configure standby1 to replicate and synchronize logical slots configured +# for failover on the primary +# +# failover slot lsub1_slot->| ----> subscriber1 (connected via logical replication) +# primary ---> | +# physical slot sb1_slot--->| ----> standby1 (connected via streaming replication) +# | lsub1_slot(synced_slot) +################################################## + +my $primary = $publisher; +my $backup_name = 'backup'; +$primary->backup($backup_name); + +# Create a standby +my $standby1 = PostgreSQL::Test::Cluster->new('standby1'); +$standby1->init_from_backup( + $primary, $backup_name, + has_streaming => 1, + has_restoring => 1); + +my $connstr_1 = $primary->connstr; +$standby1->append_conf( + 'postgresql.conf', qq( +enable_syncslot = true +hot_standby_feedback = on +primary_slot_name = 'sb1_slot' +primary_conninfo = '$connstr_1 dbname=postgres' +)); + +$primary->psql('postgres', + q{SELECT pg_create_physical_replication_slot('sb1_slot');}); + +my $standby1_conninfo = $standby1->connstr . ' dbname=postgres'; +my $offset = -s $standby1->logfile; + +# Start the standby so that slot syncing can begin +$standby1->start; + +# Generate a log to trigger the walsender to send messages to the walreceiver +# which will update WalRcv->latestWalEnd to a valid number. +$primary->safe_psql('postgres', "SELECT pg_log_standby_snapshot();"); + +# Wait for the standby to finish sync +$standby1->wait_for_log( + qr/LOG: ( [A-Z0-9]+:)? newly created slot \"lsub1_slot\" is sync-ready now/, + $offset); + +# Confirm that the logical failover slot is created on the standby and is +# flagged as 'synced' +is($standby1->safe_psql('postgres', + q{SELECT synced FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';}), + "t", + 'logical slot has synced as true on standby'); + +################################################## +# Test to confirm that restart_lsn and confirmed_flush_lsn of the logical slot +# on the primary is synced to the standby +################################################## + +# Insert data on the primary +$primary->safe_psql( + 'postgres', qq[ + CREATE TABLE tab_int (a int PRIMARY KEY); + INSERT INTO tab_int SELECT generate_series(1, 10); +]); + +# Subscribe to the new table data and wait for it to arrive +$subscriber1->safe_psql( + 'postgres', qq[ + CREATE TABLE tab_int (a int PRIMARY KEY); + ALTER SUBSCRIPTION regress_mysub1 REFRESH PUBLICATION; +]); + +$subscriber1->wait_for_subscription_sync; + +# Do not allow any further advancement of the restart_lsn and +# confirmed_flush_lsn for the lsub1_slot. +$subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 DISABLE"); + +# Wait for the replication slot to become inactive on the publisher +$primary->poll_query_until( + 'postgres', + "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active='f'", + 1); + +# Get the restart_lsn for the logical slot lsub1_slot on the primary +my $primary_restart_lsn = $primary->safe_psql('postgres', + "SELECT restart_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';"); + +# Get the confirmed_flush_lsn for the logical slot lsub1_slot on the primary +my $primary_flush_lsn = $primary->safe_psql('postgres', + "SELECT confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';"); + +# Confirm that restart_lsn and of confirmed_flush_lsn lsub1_slot slot are synced +# to the standby +ok( $standby1->poll_query_until( + 'postgres', + "SELECT '$primary_restart_lsn' = restart_lsn AND '$primary_flush_lsn' = confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';"), + 'restart_lsn and confirmed_flush_lsn of slot lsub1_slot synced to standby'); + +################################################## +# Test that a synchronized slot can not be decoded, altered or dropped by the user +################################################## + +# Disable hot_standby_feedback temporarily to stop slot sync worker otherwise +# the concerned testing scenarios here may be interrupted by different error: +# 'ERROR: replication slot is active for PID ..' +$standby1->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = off;'); +$standby1->restart; + +# Attempting to perform logical decoding on a synced slot should result in an error +my ($result, $stdout, $stderr) = $standby1->psql('postgres', + "select * from pg_logical_slot_get_changes('lsub1_slot',NULL,NULL);"); +ok($stderr =~ /ERROR: cannot use replication slot "lsub1_slot" for logical decoding/, + "logical decoding is not allowed on synced slot"); + +# Attempting to alter a synced slot should result in an error +($result, $stdout, $stderr) = $standby1->psql( + 'postgres', + qq[ALTER_REPLICATION_SLOT lsub1_slot (failover);], + replication => 'database'); +ok($stderr =~ /ERROR: cannot alter replication slot "lsub1_slot"/, + "synced slot on standby cannot be altered"); + +# Attempting to drop a synced slot should result in an error +($result, $stdout, $stderr) = $standby1->psql('postgres', + "SELECT pg_drop_replication_slot('lsub1_slot');"); +ok($stderr =~ /ERROR: cannot drop replication slot "lsub1_slot"/, + "synced slot on standby cannot be dropped"); + +# Enable hot_standby_feedback and restart standby +$standby1->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = on;'); +$standby1->restart; + +################################################## +# Promote the standby1 to primary. Confirm that: +# a) the slot 'lsub1_slot' is retained on the new primary +# b) logical replication for regress_mysub1 is resumed successfully after failover +################################################## +$standby1->promote; + +# Update subscription with the new primary's connection info +$subscriber1->safe_psql('postgres', + "ALTER SUBSCRIPTION regress_mysub1 CONNECTION '$standby1_conninfo'; + ALTER SUBSCRIPTION regress_mysub1 ENABLE; "); + +# Confirm the synced slot 'lsub1_slot' is retained on the new primary +is($standby1->safe_psql('postgres', + q{SELECT slot_name FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';}), + 'lsub1_slot', + 'synced slot retained on the new primary'); + +# Insert data on the new primary +$standby1->safe_psql('postgres', + "INSERT INTO tab_int SELECT generate_series(11, 20);"); +$standby1->wait_for_catchup('regress_mysub1'); + +# Confirm that data in tab_int replicated on the subscriber +is( $subscriber1->safe_psql('postgres', q{SELECT count(*) FROM tab_int;}), + "20", + 'data replicated from the new primary'); done_testing(); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index abc944e8b8..b7488d760e 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1474,8 +1474,9 @@ pg_replication_slots| SELECT l.slot_name, l.safe_wal_size, l.two_phase, l.conflict_reason, - l.failover - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_reason, failover) + l.failover, + l.synced + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_reason, failover, synced) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out index 9be7aca2b8..fae059d389 100644 --- a/src/test/regress/expected/sysviews.out +++ b/src/test/regress/expected/sysviews.out @@ -133,8 +133,9 @@ select name, setting from pg_settings where name like 'enable%'; enable_self_join_removal | on enable_seqscan | on enable_sort | on + enable_syncslot | off enable_tidscan | on -(23 rows) +(24 rows) -- There are always wait event descriptions for various types. select type, count(*) > 0 as ok FROM pg_wait_events diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 513c7702ff..d527bf918b 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2325,6 +2325,7 @@ RelocationBufferInfo RelptrFreePageBtree RelptrFreePageManager RelptrFreePageSpanLeader +RemoteSlot RenameStmt ReopenPtrType ReorderBuffer @@ -2585,6 +2586,7 @@ SlabBlock SlabContext SlabSlot SlotNumber +SlotSyncWorkerCtxStruct SlruCtl SlruCtlData SlruErrorCause -- 2.30.0.windows.2