From dbdc8f15142d78c355a60c165c7c7bc029cf5d19 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Tue, 12 Dec 2023 16:55:06 +0800 Subject: [PATCH v50 2/3] Add logical slot sync capability to the physical standby This patch implements synchronization of logical replication slots from the primary server to the physical standby so that logical replication can be resumed after failover. GUC 'enable_syncslot' enables a physical standby to synchronize failover logical replication slots from the primary server. The logical replication slots on the primary can be synchronized to the hot standby by enabling the failover option during slot creation and setting 'enable_syncslot' on the standby. For the synchronization to work, it is mandatory to have a physical replication slot between the primary and the standby, and hot_standby_feedback must be enabled on the standby. All the failover logical replication slots on the primary (assuming configurations are appropriate) are automatically created on the physical standbys and are synced periodically. Slot-sync worker on the standby server ping the primary server at regular intervals to get the necessary failover logical slots information and create/update the slots locally. The nap time of the worker is tuned according to the activity on the primary. The worker starts with nap time of 10ms and if no activity is observed on the primary for some time, then nap time is increased to 10sec. If activity is observed again, nap time is reduced back to 10ms. The logical slots created by slot-sync worker on physical standbys are not allowed to be dropped or consumed. Any attempt to perform logical decoding on such slots will result in an error. If a logical slot is invalidated on the primary, slot on the standby is also invalidated. If a logical slot on the primary is valid but is invalidated on the standby, then that slot is dropped and recreated on the standby in next sync-cycle. It is okay to recreate such slots as long as these are not consumable on the standby (which is the case currently). This situation may occur due to the following reasons: - The max_slot_wal_keep_size on the standby is insufficient to retain WAL records from the restart_lsn of the slot. - primary_slot_name is temporarily reset to null and the physical slot is removed. - The primary changes wal_level to a level lower than logical. The slots synchronization status on the standby can be monitored using 'sync_state' column of pg_replication_slots view. The values are: 'n': none for user slots, 'i': sync initiated for the slot but slot is not ready yet for periodic syncs, 'r': ready for periodic syncs. --- doc/src/sgml/bgworker.sgml | 65 +- doc/src/sgml/config.sgml | 33 +- doc/src/sgml/logicaldecoding.sgml | 31 + doc/src/sgml/system-views.sgml | 35 + src/backend/access/transam/xlogrecovery.c | 18 + src/backend/catalog/system_views.sql | 3 +- src/backend/postmaster/bgworker.c | 4 + src/backend/postmaster/postmaster.c | 10 + .../libpqwalreceiver/libpqwalreceiver.c | 41 + src/backend/replication/logical/Makefile | 1 + src/backend/replication/logical/logical.c | 25 + src/backend/replication/logical/meson.build | 1 + src/backend/replication/logical/slotsync.c | 1315 +++++++++++++++++ src/backend/replication/logical/worker.c | 15 +- src/backend/replication/slot.c | 34 +- src/backend/replication/slotfuncs.c | 37 +- src/backend/replication/walsender.c | 6 +- src/backend/storage/ipc/ipci.c | 2 + src/backend/tcop/postgres.c | 11 + .../utils/activity/wait_event_names.txt | 2 + src/backend/utils/misc/guc_tables.c | 10 + src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/catalog/pg_proc.dat | 10 +- src/include/postmaster/bgworker.h | 1 + src/include/replication/logicalworker.h | 1 + src/include/replication/slot.h | 22 +- src/include/replication/walreceiver.h | 18 + src/include/replication/worker_internal.h | 11 + .../t/050_standby_failover_slots_sync.pl | 145 ++ src/test/regress/expected/rules.out | 5 +- src/test/regress/expected/sysviews.out | 3 +- src/tools/pgindent/typedefs.list | 2 + 32 files changed, 1885 insertions(+), 33 deletions(-) create mode 100644 src/backend/replication/logical/slotsync.c diff --git a/doc/src/sgml/bgworker.sgml b/doc/src/sgml/bgworker.sgml index 2c393385a9..a7cfe6c58c 100644 --- a/doc/src/sgml/bgworker.sgml +++ b/doc/src/sgml/bgworker.sgml @@ -114,18 +114,59 @@ typedef struct BackgroundWorker bgw_start_time is the server state during which - postgres should start the process; it can be one of - BgWorkerStart_PostmasterStart (start as soon as - postgres itself has finished its own initialization; processes - requesting this are not eligible for database connections), - BgWorkerStart_ConsistentState (start as soon as a consistent state - has been reached in a hot standby, allowing processes to connect to - databases and run read-only queries), and - BgWorkerStart_RecoveryFinished (start as soon as the system has - entered normal read-write state). Note the last two values are equivalent - in a server that's not a hot standby. Note that this setting only indicates - when the processes are to be started; they do not stop when a different state - is reached. + postgres should start the process. Note that this setting + only indicates when the processes are to be started; they do not stop when + a different state is reached. Possible values are: + + + + BgWorkerStart_PostmasterStart + + + BgWorkerStart_PostmasterStart + Start as soon as postgres itself has finished its own initialization; + processes requesting this are not eligible for database connections. + + + + + + BgWorkerStart_ConsistentState + + + BgWorkerStart_ConsistentState + Start as soon as a consistent state has been reached in a hot-standby, + allowing processes to connect to databases and run read-only queries. + + + + + + BgWorkerStart_ConsistentState_HotStandby + + + BgWorkerStart_ConsistentState_HotStandby + Same meaning as BgWorkerStart_ConsistentState but + it is more strict in terms of the server i.e. start the worker only + if it is hot-standby. + + + + + + BgWorkerStart_RecoveryFinished + + + BgWorkerStart_RecoveryFinished + Start as soon as the system has entered normal read-write state. Note + that the BgWorkerStart_ConsistentState and + BgWorkerStart_RecoveryFinished are equivalent + in a server that's not a hot standby. + + + + + diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 7993fe3cdd..bbb755f057 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4373,6 +4373,12 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows meant to switch to a physical standby after the standby is promoted, the physical replication slot for the standby should be listed here. + + The standbys corresponding to the physical replication slots in + standby_slot_names must configure + enable_syncslot = true so they can receive + failover logical slots changes from the primary. + @@ -4568,8 +4574,13 @@ ANY num_sync ( ) then it is also + necessary to specify dbname in the + primary_conninfo string. This will only be used for + slot synchronization. It is ignored for streaming. This parameter can only be set in the postgresql.conf @@ -4894,6 +4905,24 @@ ANY num_sync ( + enable_syncslot (boolean) + + enable_syncslot configuration parameter + + + + + It enables a physical standby to synchronize logical failover slots + from the primary server so that logical subscribers are not blocked + after failover. + + + It is disabled by default. This parameter can only be set in the + postgresql.conf file or on the server command line. + + + diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index cd152d4ced..de6cdbe2bc 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -346,6 +346,37 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU pg_log_standby_snapshot function on the primary. + + A logical replication slot on the primary can be synchronized to the hot + standby by enabling the failover option during slot creation and setting + on the standby. For the synchronization + to work, it is mandatory to have a physical replication slot between the + primary and the standby, and hot_standby_feedback must + be enabled on the standby. It's also highly recommended that the said + physical replication slot is named in standby_slot_names + list on the primary, to prevent the subscriber from consuming changes + faster than the hot standby. + + + + The ability to resume logical replication after failover depends upon the + pg_replication_slots.sync_state + value for the synchronized slots on the standby at the time of failover. + Only slots that have attained "ready" sync_state ('r') on the standby + before failover can be used for logical replication after failover. Slots + that have not yet reached 'r' state (they are still 'i') will be dropped, + therefore logical replication for those slots cannot be resumed. For + example, if the synchronized slot could not become sync-ready on the + standby due to a disabled subscription, then the subscription cannot be + resumed after failover even when it is enabled. + + + If the primary is idle, then the synchronized slots on the standby may + take a noticeable time to reach the ready ('r') sync_state. This can + be sped up by calling the + pg_log_standby_snapshot function on the primary. + + Replication slots persist across crashes and know nothing about the state diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index 1dc695fd3a..9847342601 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2543,6 +2543,41 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx after failover. Always false for physical slots. + + + + sync_state char + + + Defines slot synchronization state. This is meaningful on the physical + standby which has configured = true. + Possible values are: + + + n = none for user created slots, + + + + i = sync initiated for the slot but slot + is not ready yet for periodic syncs, + + + + + r = ready for periodic syncs. + + + + + + The hot standby can have any of these sync_state values for the slots but + on a hot standby, the slots with state 'r' and 'i' can neither be used + for logical decoding nor dropped by the user. + The sync_state has no meaning on the primary server; the primary + sync_state value is default 'n' for all slots but may (if leftover + from a promoted standby) also be 'r'. + + diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index a2c8fa3981..444f4d23cc 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -50,6 +50,7 @@ #include "postmaster/startup.h" #include "replication/slot.h" #include "replication/walreceiver.h" +#include "replication/worker_internal.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/latch.h" @@ -1435,6 +1436,23 @@ FinishWalRecovery(void) */ XLogShutdownWalRcv(); + /* + * Shutdown the slot sync workers to prevent potential conflicts between + * user processes and slotsync workers after a promotion. Additionally, + * drop any slots that have initiated but not yet completed the sync + * process. + * + * We do not update the sync_state from READY to NONE here, as any failed + * update could leave some slots in the 'NONE' state, causing issues during + * slot sync after restarting the server as a standby. While updating after + * switching to the new timeline is an option, it does not simplify the + * handling for both READY and NONE state slots. Therefore, we retain the + * READY state slots after promotion as they can provide useful information + * about their origin. + */ + ShutDownSlotSync(); + slotsync_drop_initiated_slots(); + /* * We are now done reading the xlog from stream. Turn off streaming * recovery to force fetching the files (which would be required at end of diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 63038f87f7..c4b3e8a807 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1024,7 +1024,8 @@ CREATE VIEW pg_replication_slots AS L.safe_wal_size, L.two_phase, L.conflicting, - L.failover + L.failover, + L.sync_state FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 3c99cf6047..7f74f53ad1 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -21,6 +21,7 @@ #include "postmaster/postmaster.h" #include "replication/logicallauncher.h" #include "replication/logicalworker.h" +#include "replication/worker_internal.h" #include "storage/dsm.h" #include "storage/ipc.h" #include "storage/latch.h" @@ -129,6 +130,9 @@ static const struct { "ApplyWorkerMain", ApplyWorkerMain }, + { + "ReplSlotSyncWorkerMain", ReplSlotSyncWorkerMain + }, { "ParallelApplyWorkerMain", ParallelApplyWorkerMain }, diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 651b85ea74..9b74a49663 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -115,6 +115,7 @@ #include "postmaster/syslogger.h" #include "replication/logicallauncher.h" #include "replication/walsender.h" +#include "replication/worker_internal.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/pg_shmem.h" @@ -1003,6 +1004,12 @@ PostmasterMain(int argc, char *argv[]) */ ApplyLauncherRegister(); + /* + * Register the slot sync worker here to kick start slot-sync operation + * sooner on the physical standby. + */ + SlotSyncWorkerRegister(); + /* * process any libraries that should be preloaded at postmaster start */ @@ -5740,6 +5747,9 @@ bgworker_should_start_now(BgWorkerStartTime start_time) case PM_HOT_STANDBY: if (start_time == BgWorkerStart_ConsistentState) return true; + if (start_time == BgWorkerStart_ConsistentState_HotStandby && + pmState != PM_RUN) + return true; /* fall through */ case PM_RECOVERY: diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index cf11b98da3..f96f377d29 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -34,6 +34,7 @@ #include "utils/memutils.h" #include "utils/pg_lsn.h" #include "utils/tuplestore.h" +#include "utils/varlena.h" PG_MODULE_MAGIC; @@ -58,6 +59,7 @@ static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port); static char *libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli); +static char *libpqrcv_get_dbname_from_conninfo(const char *conninfo); static int libpqrcv_server_version(WalReceiverConn *conn); static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, TimeLineID tli, char **filename, @@ -100,6 +102,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { .walrcv_send = libpqrcv_send, .walrcv_create_slot = libpqrcv_create_slot, .walrcv_alter_slot = libpqrcv_alter_slot, + .walrcv_get_dbname_from_conninfo = libpqrcv_get_dbname_from_conninfo, .walrcv_get_backend_pid = libpqrcv_get_backend_pid, .walrcv_exec = libpqrcv_exec, .walrcv_disconnect = libpqrcv_disconnect @@ -418,6 +421,44 @@ libpqrcv_server_version(WalReceiverConn *conn) return PQserverVersion(conn->streamConn); } +/* + * Get database name from the primary server's conninfo. + * + * If dbname is not found in connInfo, return NULL value. + */ +static char * +libpqrcv_get_dbname_from_conninfo(const char *connInfo) +{ + PQconninfoOption *opts; + char *dbname = NULL; + char *err = NULL; + + opts = PQconninfoParse(connInfo, &err); + if (opts == NULL) + { + /* The error string is malloc'd, so we must free it explicitly */ + char *errcopy = err ? pstrdup(err) : "out of memory"; + + PQfreemem(err); + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("invalid connection string syntax: %s", errcopy))); + } + + for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt) + { + /* + * If multiple dbnames are specified, then the last one will be + * returned + */ + if (strcmp(opt->keyword, "dbname") == 0 && opt->val && + opt->val[0] != '\0') + dbname = pstrdup(opt->val); + } + + return dbname; +} + /* * Start streaming WAL data from given streaming options. * diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index 2dc25e37bb..ba03eeff1c 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -25,6 +25,7 @@ OBJS = \ proto.o \ relation.o \ reorderbuffer.o \ + slotsync.o \ snapbuild.o \ tablesync.o \ worker.o diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 8288da5277..c21d081346 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -524,6 +524,31 @@ CreateDecodingContext(XLogRecPtr start_lsn, errmsg("replication slot \"%s\" was not created in this database", NameStr(slot->data.name)))); + if (RecoveryInProgress()) + { + /* + * Do not allow consumption of a "synchronized" slot until the standby + * gets promoted. + */ + if (slot->data.sync_state != SYNCSLOT_STATE_NONE) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot use replication slot \"%s\" for logical " + "decoding", NameStr(slot->data.name)), + errdetail("This slot is being synced from the primary server."), + errhint("Specify another replication slot."))); + } + else + { + /* + * Slots in state SYNCSLOT_STATE_INITIATED should have been dropped on + * promotion. + */ + if (slot->data.sync_state == SYNCSLOT_STATE_INITIATED) + elog(ERROR, "replication slot \"%s\" was not synced completely " + "from the primary server", NameStr(slot->data.name)); + } + /* * Check if slot has been invalidated due to max_slot_wal_keep_size. Avoid * "cannot get changes" wording in this errmsg because that'd be diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index d48cd4c590..9e52ec421f 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -11,6 +11,7 @@ backend_sources += files( 'proto.c', 'relation.c', 'reorderbuffer.c', + 'slotsync.c', 'snapbuild.c', 'tablesync.c', 'worker.c', diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c new file mode 100644 index 0000000000..110ee2a746 --- /dev/null +++ b/src/backend/replication/logical/slotsync.c @@ -0,0 +1,1315 @@ +/*------------------------------------------------------------------------- + * slotsync.c + * PostgreSQL worker for synchronizing slots to a standby server from the + * primary server. + * + * Copyright (c) 2023, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/slotsync.c + * + * This file contains the code for slot sync worker on a physical standby + * to fetch logical failover slots information from the primary server, + * create the slots on the standby and synchronize them periodically. + * + * While creating the slot on physical standby, if the local restart_lsn and/or + * local catalog_xmin is ahead of those on the remote then the worker cannot + * create the local slot in sync with the primary server because that would + * mean moving the local slot backwards and the standby might not have WALs + * retained for old LSN. In this case, the worker will wait for the primary + * server slot's restart_lsn and catalog_xmin to catch up with the local one + * before attempting the actual sync. Meanwhile, it will persist the slot with + * sync_state as SYNCSLOT_STATE_INITIATED('i'). Once the primary server catches + * up, it will move the slot to SYNCSLOT_STATE_READY('r') state and will perform + * the sync periodically. + * + * The worker also takes care of dropping the slots which were created by it + * and are currently not needed to be synchronized. + * + * It takes a nap of WORKER_DEFAULT_NAPTIME_MS before every next + * synchronization. If there is no activity observed on the primary server for + * some time, the nap time is increased to WORKER_INACTIVITY_NAPTIME_MS, but if + * any activity is observed, the nap time reverts to the default value. + *--------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/genam.h" +#include "access/table.h" +#include "access/xlogrecovery.h" +#include "catalog/pg_database.h" +#include "commands/dbcommands.h" +#include "pgstat.h" +#include "postmaster/bgworker.h" +#include "postmaster/interrupt.h" +#include "replication/logical.h" +#include "replication/logicallauncher.h" +#include "replication/logicalworker.h" +#include "replication/walreceiver.h" +#include "replication/worker_internal.h" +#include "storage/ipc.h" +#include "storage/procarray.h" +#include "tcop/tcopprot.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" +#include "utils/guc_hooks.h" +#include "utils/pg_lsn.h" +#include "utils/varlena.h" + +/* + * Structure to hold information fetched from the primary server about a logical + * replication slot. + */ +typedef struct RemoteSlot +{ + char *name; + char *plugin; + char *database; + bool two_phase; + bool failover; + XLogRecPtr restart_lsn; + XLogRecPtr confirmed_lsn; + TransactionId catalog_xmin; + + /* RS_INVAL_NONE if valid, or the reason of invalidation */ + ReplicationSlotInvalidationCause invalidated; +} RemoteSlot; + +/* + * Struct for sharing information between startup process and slot + * sync worker. + * + * Slot sync worker's pid is needed by startup process in order to + * shut it down during promotion. + */ +typedef struct SlotSyncWorkerCtx +{ + pid_t pid; + slock_t mutex; +} SlotSyncWorkerCtx; + +SlotSyncWorkerCtx *SlotSyncWorker = NULL; + +/* GUC variable */ +bool enable_syncslot = false; + +/* The last sync-cycle time when the worker updated any of the slots. */ +static TimestampTz last_update_time; + +/* Worker's nap time in case of regular activity on the primary server */ +#define WORKER_DEFAULT_NAPTIME_MS 10L /* 10 ms */ + +/* Worker's nap time in case of no-activity on the primary server */ +#define WORKER_INACTIVITY_NAPTIME_MS 10000L /* 10 sec */ + +/* + * Inactivity Threshold in ms before increasing nap time of worker. + * + * If the lsn of slot being monitored did not change for this threshold time, + * then increase nap time of current worker from WORKER_DEFAULT_NAPTIME_MS to + * WORKER_INACTIVITY_NAPTIME_MS. + */ +#define WORKER_INACTIVITY_THRESHOLD_MS 10000L /* 10 sec */ + +static void ProcessSlotSyncInterrupts(WalReceiverConn *wrconn); + +/* + * Wait for remote slot to pass locally reserved position. + * + * Ping and wait for the primary server for + * WAIT_PRIMARY_CATCHUP_ATTEMPTS during a slot creation, if it still + * does not catch up, abort the wait. The ones for which wait is aborted will + * attempt the wait and sync in the next sync-cycle. + * + * If passed, *wait_attempts_exceeded will be set to true only if this + * function exits due to exhausting its wait attempts. It will be false + * in all the other cases. + * + * Returns true if remote_slot could catch up with the locally reserved + * position. + */ +static bool +wait_for_primary_slot_catchup(WalReceiverConn *wrconn, RemoteSlot *remote_slot, + bool *wait_attempts_exceeded) +{ +#define WAIT_OUTPUT_COLUMN_COUNT 4 +#define WAIT_PRIMARY_CATCHUP_ATTEMPTS 5 + + StringInfoData cmd; + int wait_count = 0; + + Assert(wait_attempts_exceeded == NULL || *wait_attempts_exceeded == false); + + ereport(LOG, + errmsg("waiting for remote slot \"%s\" LSN (%X/%X) and catalog xmin" + " (%u) to pass local slot LSN (%X/%X) and catalog xmin (%u)", + remote_slot->name, + LSN_FORMAT_ARGS(remote_slot->restart_lsn), + remote_slot->catalog_xmin, + LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn), + MyReplicationSlot->data.catalog_xmin)); + + initStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT conflicting, restart_lsn," + " confirmed_flush_lsn, catalog_xmin" + " FROM pg_catalog.pg_replication_slots" + " WHERE slot_name = %s", + quote_literal_cstr(remote_slot->name)); + + for (;;) + { + bool new_invalidated; + XLogRecPtr new_restart_lsn; + XLogRecPtr new_confirmed_lsn; + TransactionId new_catalog_xmin; + WalRcvExecResult *res; + TupleTableSlot *tupslot; + int rc; + bool isnull; + Oid slotRow[WAIT_OUTPUT_COLUMN_COUNT] = {BOOLOID, LSNOID, LSNOID, + XIDOID}; + + CHECK_FOR_INTERRUPTS(); + + /* Handle any termination request if any */ + ProcessSlotSyncInterrupts(wrconn); + + res = walrcv_exec(wrconn, cmd.data, WAIT_OUTPUT_COLUMN_COUNT, slotRow); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch slot \"%s\" info from the" + " primary server: %s", + remote_slot->name, res->err))); + + tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + if (!tuplestore_gettupleslot(res->tuplestore, true, false, tupslot)) + { + ereport(WARNING, + (errmsg("aborting initial sync for slot \"%s\"", + remote_slot->name), + errdetail("This slot was not found on the primary server."))); + pfree(cmd.data); + walrcv_clear_result(res); + + return false; + } + + /* + * It is possible to get null values for LSN and Xmin if slot is + * invalidated on the primary server, so handle accordingly. + */ + new_invalidated = DatumGetBool(slot_getattr(tupslot, 1, &isnull)); + Assert(!isnull); + + new_restart_lsn = DatumGetLSN(slot_getattr(tupslot, 2, &isnull)); + if (new_invalidated || isnull) + { + /* + * If the local-slot is in 'RS_EPHEMERAL' state, it will not be + * persisted in the caller and ReplicationSlotRelease() will drop + * it. But if the local slot is already persisted and has 'i' + * sync_state, then it will be marked as invalidated in the caller + * and next time onwards its sync will be skipped. + */ + ereport(WARNING, + (errmsg("aborting initial sync for slot \"%s\"", + remote_slot->name), + errdetail("This slot was invalidated on the primary server."))); + pfree(cmd.data); + ExecClearTuple(tupslot); + walrcv_clear_result(res); + + return false; + } + + /* + * Having got a valid restart_lsn, the confirmed_lsn and catalog_xmin + * are expected to be valid/non-null. + */ + new_confirmed_lsn = DatumGetLSN(slot_getattr(tupslot, 3, &isnull)); + Assert(!isnull); + + new_catalog_xmin = DatumGetTransactionId(slot_getattr(tupslot, + 4, &isnull)); + Assert(!isnull); + + ExecClearTuple(tupslot); + walrcv_clear_result(res); + + if (new_restart_lsn >= MyReplicationSlot->data.restart_lsn && + TransactionIdFollowsOrEquals(new_catalog_xmin, + MyReplicationSlot->data.catalog_xmin)) + { + /* Update new values in remote_slot */ + remote_slot->restart_lsn = new_restart_lsn; + remote_slot->confirmed_lsn = new_confirmed_lsn; + remote_slot->catalog_xmin = new_catalog_xmin; + + ereport(LOG, + errmsg("wait over for remote slot \"%s\" as its LSN (%X/%X)" + " and catalog xmin (%u) has now passed local slot LSN" + " (%X/%X) and catalog xmin (%u)", + remote_slot->name, + LSN_FORMAT_ARGS(new_restart_lsn), + new_catalog_xmin, + LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn), + MyReplicationSlot->data.catalog_xmin)); + pfree(cmd.data); + + return true; + } + + if (++wait_count >= WAIT_PRIMARY_CATCHUP_ATTEMPTS) + { + ereport(LOG, + errmsg("aborting the wait for remote slot \"%s\"", + remote_slot->name)); + pfree(cmd.data); + + if (wait_attempts_exceeded) + *wait_attempts_exceeded = true; + + return false; + } + + /* + * XXX: Is waiting for 2 seconds before retrying enough or more or + * less? + */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + 2000L, + WAIT_EVENT_REPL_SLOTSYNC_PRIMARY_CATCHUP); + + ResetLatch(MyLatch); + + /* Emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + } +} + +/* + * Update local slot metadata as per remote_slot's positions + */ +static void +local_slot_update(RemoteSlot *remote_slot) +{ + Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE); + + LogicalConfirmReceivedLocation(remote_slot->confirmed_lsn); + LogicalIncreaseXminForSlot(remote_slot->confirmed_lsn, + remote_slot->catalog_xmin); + LogicalIncreaseRestartDecodingForSlot(remote_slot->confirmed_lsn, + remote_slot->restart_lsn); + ReplicationSlotMarkDirty(); +} + +/* + * Drop the slots for which sync is initiated but not yet completed + * i.e. they are still waiting for the primary server to catch up (refer + * to the comment atop the file for details on this wait) + */ +void +slotsync_drop_initiated_slots(void) +{ + List *local_slots = NIL; + ListCell *lc; + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + if (s->in_use && s->data.sync_state == SYNCSLOT_STATE_INITIATED) + local_slots = lappend(local_slots, s); + } + + LWLockRelease(ReplicationSlotControlLock); + + foreach(lc, local_slots) + { + ReplicationSlot *s = (ReplicationSlot *) lfirst(lc); + + ReplicationSlotDrop(NameStr(s->data.name), true, false); + ereport(LOG, + (errmsg("dropped replication slot \"%s\" of dbid %d as it " + "was not sync-ready", NameStr(s->data.name), + s->data.database))); + } + + list_free(local_slots); +} + +/* + * Get list of local logical slots which are synchronized from + * the primary server. + */ +static List * +get_local_synced_slots(void) +{ + List *local_slots = NIL; + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + /* Check if it is logical synchronized slot */ + if (s->in_use && SlotIsLogical(s) && + (s->data.sync_state != SYNCSLOT_STATE_NONE)) + { + local_slots = lappend(local_slots, s); + } + } + + LWLockRelease(ReplicationSlotControlLock); + + return local_slots; +} + +/* + * Helper function to check if local_slot is present in remote_slots list. + * + * It also checks if logical slot is locally invalidated i.e. invalidated on + * the standby but valid on the primary server. If found so, it sets + * locally_invalidated to true. + */ +static bool +check_sync_slot_on_remote(ReplicationSlot *local_slot, List *remote_slots, + bool *locally_invalidated) +{ + ListCell *lc; + + foreach(lc, remote_slots) + { + RemoteSlot *remote_slot = (RemoteSlot *) lfirst(lc); + + if (strcmp(remote_slot->name, NameStr(local_slot->data.name)) == 0) + { + /* + * If remote slot is not invalidated but local slot is marked as + * invalidated, then set the bool. + */ + SpinLockAcquire(&local_slot->mutex); + *locally_invalidated = + (remote_slot->invalidated == RS_INVAL_NONE) && + (local_slot->data.invalidated != RS_INVAL_NONE); + SpinLockRelease(&local_slot->mutex); + + return true; + } + } + + return false; +} + +/* + * Drop obsolete slots + * + * Drop the slots that no longer need to be synced i.e. these either do not + * exist on the primary or are no longer enabled for failover. + * + * Additionally, it drops slots that are valid on the primary but got + * invalidated on the standby. This situation may occur due to the following + * reasons: + * - The max_slot_wal_keep_size on the standby is insufficient to retain WAL + * records from the restart_lsn of the slot. + * - primary_slot_name is temporarily reset to null and the physical slot is + * removed. + * - The primary changes wal_level to a level lower than logical. + * + * The assumption is that these dropped local invalidated slots will get + * recreated in next sync-cycle and it is okay to drop and recreate such slots + * as long as these are not consumable on the standby (which is the case + * currently). + */ +static void +drop_obsolete_slots(List *remote_slot_list) +{ + List *local_slots = NIL; + ListCell *lc; + + local_slots = get_local_synced_slots(); + + foreach(lc, local_slots) + { + ReplicationSlot *local_slot = (ReplicationSlot *) lfirst(lc); + bool remote_exists = false; + bool locally_invalidated = false; + + remote_exists = check_sync_slot_on_remote(local_slot, remote_slot_list, + &locally_invalidated); + + /* + * Drop the local slot either if it is not in the remote slots list or + * is invalidated while remote slot is still valid. + */ + if (!remote_exists || locally_invalidated) + { + ReplicationSlotDrop(NameStr(local_slot->data.name), true, false); + + ereport(LOG, + (errmsg("dropped replication slot \"%s\" of dbid %d", + NameStr(local_slot->data.name), + local_slot->data.database))); + } + } +} + +/* + * Synchronize single slot to given position. + * + * This creates a new slot if there is no existing one and updates the + * metadata of the slot as per the data received from the primary server. + * + * The 'sync_state' in slot.data is set to SYNCSLOT_STATE_INITIATED + * immediately after creation. It stays in same state until the + * initialization is complete. The initialization is considered to + * be completed once the remote_slot catches up with locally reserved + * position and local slot is updated. The sync_state is then changed + * to SYNCSLOT_STATE_READY. + * + * Returns TRUE if the local slot is updated. + */ +static bool +synchronize_one_slot(WalReceiverConn *wrconn, RemoteSlot *remote_slot) +{ + ReplicationSlot *slot; + bool slot_updated = false; + + /* + * Sanity check: Make sure that concerned WAL is received before syncing + * slot to target lsn received from the primary server. + * + * This check should never pass as on the primary server, we have waited + * for the standby's confirmation before updating the logical slot. + */ + SpinLockAcquire(&WalRcv->mutex); + if (remote_slot->confirmed_lsn > WalRcv->latestWalEnd) + { + SpinLockRelease(&WalRcv->mutex); + elog(ERROR, "skipping sync of slot \"%s\" as the received slot sync " + "LSN %X/%X is ahead of the standby position %X/%X", + remote_slot->name, + LSN_FORMAT_ARGS(remote_slot->confirmed_lsn), + LSN_FORMAT_ARGS(WalRcv->latestWalEnd)); + } + SpinLockRelease(&WalRcv->mutex); + + /* Search for the named slot */ + if ((slot = SearchNamedReplicationSlot(remote_slot->name, true))) + { + char sync_state; + + SpinLockAcquire(&slot->mutex); + sync_state = slot->data.sync_state; + SpinLockRelease(&slot->mutex); + + /* User created slot with the same name exists, raise ERROR. */ + if (sync_state == SYNCSLOT_STATE_NONE) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("skipping sync of slot \"%s\" as it is a user created" + " slot", remote_slot->name), + errdetail("This slot has failover enabled on the primary and" + " thus is sync candidate but user created slot with" + " the same name already exists on the standby."))); + + /* + * Slot created by the slot sync worker exists, sync it. + * + * It is important to acquire the slot here before checking + * invalidation. If we don't acquire the slot first, there could be a + * race condition that the local slot could be invalidated just after + * checking the 'invalidated' flag here and we could end up + * overwriting 'invalidated' flag to remote_slot's value. See + * InvalidatePossiblyObsoleteSlot() where it invalidates slot directly + * if the slot is not acquired by other processes. + */ + ReplicationSlotAcquire(remote_slot->name, true); + + Assert(slot == MyReplicationSlot); + + /* + * Copy the invalidation cause from remote only if local slot is not + * invalidated locally, we don't want to overwrite existing one. + */ + if (slot->data.invalidated == RS_INVAL_NONE) + { + SpinLockAcquire(&slot->mutex); + slot->data.invalidated = remote_slot->invalidated; + SpinLockRelease(&slot->mutex); + } + + /* Skip the sync of an invalidated slot */ + if (slot->data.invalidated != RS_INVAL_NONE) + { + ReplicationSlotRelease(); + return false; + } + + /* Slot not ready yet, let's attempt to make it sync-ready now. */ + if (sync_state == SYNCSLOT_STATE_INITIATED) + { + /* + * Wait for the primary server to catch-up. Refer to the comment + * atop the file for details on this wait. + */ + if (remote_slot->restart_lsn < slot->data.restart_lsn || + TransactionIdPrecedes(remote_slot->catalog_xmin, + slot->data.catalog_xmin)) + { + if (!wait_for_primary_slot_catchup(wrconn, remote_slot, NULL)) + { + ReplicationSlotRelease(); + return false; + } + } + + /* + * Wait for primary is over, update the lsns and mark the slot as + * READY for further syncs. + */ + local_slot_update(remote_slot); + SpinLockAcquire(&slot->mutex); + slot->data.sync_state = SYNCSLOT_STATE_READY; + SpinLockRelease(&slot->mutex); + + /* Save the changes */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + slot_updated = true; + + ereport(LOG, + errmsg("newly locally created slot \"%s\" is sync-ready now", + remote_slot->name)); + } + /* Slot ready for sync, so sync it. */ + else if (sync_state == SYNCSLOT_STATE_READY) + { + /* + * Sanity check: With hot_standby_feedback enabled and + * invalidations handled appropriately as above, this should never + * happen. + */ + if (remote_slot->restart_lsn < slot->data.restart_lsn) + elog(ERROR, + "not synchronizing local slot \"%s\" LSN(%X/%X)" + " to remote slot's LSN(%X/%X) as synchronization " + " would move it backwards", remote_slot->name, + LSN_FORMAT_ARGS(slot->data.restart_lsn), + LSN_FORMAT_ARGS(remote_slot->restart_lsn)); + + if (remote_slot->confirmed_lsn != slot->data.confirmed_flush || + remote_slot->restart_lsn != slot->data.restart_lsn || + remote_slot->catalog_xmin != slot->data.catalog_xmin) + { + /* Update LSN of slot to remote slot's current position */ + local_slot_update(remote_slot); + ReplicationSlotSave(); + slot_updated = true; + } + } + } + /* Otherwise create the slot first. */ + else + { + TransactionId xmin_horizon = InvalidTransactionId; + + /* Skip creating the local slot if remote_slot is invalidated already */ + if (remote_slot->invalidated != RS_INVAL_NONE) + return false; + + /* Ensure that we have transaction env needed by get_database_oid() */ + Assert(IsTransactionState()); + + ReplicationSlotCreate(remote_slot->name, true, RS_EPHEMERAL, + remote_slot->two_phase, + remote_slot->failover, + SYNCSLOT_STATE_INITIATED); + + /* For shorter lines. */ + slot = MyReplicationSlot; + + SpinLockAcquire(&slot->mutex); + slot->data.database = get_database_oid(remote_slot->database, false); + namestrcpy(&slot->data.plugin, remote_slot->plugin); + SpinLockRelease(&slot->mutex); + + ReplicationSlotReserveWal(); + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + xmin_horizon = GetOldestSafeDecodingTransactionId(true); + SpinLockAcquire(&slot->mutex); + slot->effective_catalog_xmin = xmin_horizon; + slot->data.catalog_xmin = xmin_horizon; + SpinLockRelease(&slot->mutex); + ReplicationSlotsComputeRequiredXmin(true); + LWLockRelease(ProcArrayLock); + + /* + * Wait for the primary server to catch-up. Refer to the comment atop + * the file for details on this wait. + */ + if (remote_slot->restart_lsn < slot->data.restart_lsn || + TransactionIdPrecedes(remote_slot->catalog_xmin, + slot->data.catalog_xmin)) + { + bool wait_attempts_exceeded = false; + + if (!wait_for_primary_slot_catchup(wrconn, remote_slot, &wait_attempts_exceeded)) + { + /* + * The remote slot didn't catch up to locally reserved + * position. + * + * We do not drop the slot because the restart_lsn can be + * ahead of the current location when recreating the slot in + * the next cycle. It may take more time to create such a + * slot. Therefore, we persist it (provided remote-slot is + * still valid i.e wait_attempts_exceeded is true) and attempt + * the wait and synchronization in the next cycle. + */ + if (wait_attempts_exceeded) + { + ReplicationSlotPersist(); + slot_updated = true; + } + + ReplicationSlotRelease(); + return slot_updated; + } + } + + /* + * Wait for primary is either not needed or is over. Update the lsns + * and mark the slot as READY for further syncs. + */ + local_slot_update(remote_slot); + SpinLockAcquire(&slot->mutex); + slot->data.sync_state = SYNCSLOT_STATE_READY; + SpinLockRelease(&slot->mutex); + + /* Mark the slot as PERSISTENT and save the changes to disk */ + ReplicationSlotPersist(); + slot_updated = true; + + ereport(LOG, + errmsg("newly locally created slot \"%s\" is sync-ready now", + remote_slot->name)); + } + + ReplicationSlotRelease(); + + return slot_updated; +} + +/* + * Synchronize slots. + * + * Gets the failover logical slots info from the primary server and updates + * the slots locally. Creates the slots if not present on the standby. + * + * Returns TRUE if any of the slots gets updated in this sync-cycle. + */ +static bool +synchronize_slots(WalReceiverConn *wrconn) +{ +#define SLOTSYNC_COLUMN_COUNT 9 + Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID, + LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, INT2OID}; + + WalRcvExecResult *res; + TupleTableSlot *tupslot; + StringInfoData s; + List *remote_slot_list = NIL; + ListCell *lc; + bool some_slot_updated = false; + + /* + * The primary_slot_name is not set yet or WALs not received yet. + * Synchronization is not possible if the walreceiver is not started. + */ + SpinLockAcquire(&WalRcv->mutex); + if (!WalRcv || + (WalRcv->slotname[0] == '\0') || + XLogRecPtrIsInvalid(WalRcv->latestWalEnd)) + { + SpinLockRelease(&WalRcv->mutex); + return false; + } + SpinLockRelease(&WalRcv->mutex); + + /* The syscache access in walrcv_exec() needs a transaction env. */ + StartTransactionCommand(); + + initStringInfo(&s); + + /* Construct query to fetch slots with failover enabled. */ + appendStringInfo(&s, + "SELECT slot_name, plugin, confirmed_flush_lsn," + " restart_lsn, catalog_xmin, two_phase, failover," + " database, pg_get_slot_invalidation_cause(slot_name)" + " FROM pg_catalog.pg_replication_slots" + " WHERE failover"); + + elog(DEBUG2, "slot sync worker's query:%s \n", s.data); + + /* Execute the query */ + res = walrcv_exec(wrconn, s.data, SLOTSYNC_COLUMN_COUNT, slotRow); + pfree(s.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch failover logical slots info " + "from the primary server: %s", res->err))); + + + /* Construct the remote_slot tuple and synchronize each slot locally */ + tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot)) + { + bool isnull; + RemoteSlot *remote_slot = palloc0(sizeof(RemoteSlot)); + + remote_slot->name = TextDatumGetCString(slot_getattr(tupslot, 1, &isnull)); + Assert(!isnull); + + remote_slot->plugin = TextDatumGetCString(slot_getattr(tupslot, 2, &isnull)); + Assert(!isnull); + + /* + * It is possible to get null values for LSN and Xmin if slot is + * invalidated on the primary server, so handle accordingly. + */ + remote_slot->confirmed_lsn = DatumGetLSN(slot_getattr(tupslot, 3, &isnull)); + if (isnull) + remote_slot->confirmed_lsn = InvalidXLogRecPtr; + + remote_slot->restart_lsn = DatumGetLSN(slot_getattr(tupslot, 4, &isnull)); + if (isnull) + remote_slot->restart_lsn = InvalidXLogRecPtr; + + remote_slot->catalog_xmin = DatumGetTransactionId(slot_getattr(tupslot, 5, + &isnull)); + if (isnull) + remote_slot->catalog_xmin = InvalidTransactionId; + + remote_slot->two_phase = DatumGetBool(slot_getattr(tupslot, 6, &isnull)); + Assert(!isnull); + + remote_slot->failover = DatumGetBool(slot_getattr(tupslot, 7, &isnull)); + Assert(!isnull); + + remote_slot->database = TextDatumGetCString(slot_getattr(tupslot, + 8, &isnull)); + Assert(!isnull); + + remote_slot->invalidated = DatumGetInt16(slot_getattr(tupslot, 9, &isnull)); + Assert(!isnull); + + /* Create list of remote slots */ + remote_slot_list = lappend(remote_slot_list, remote_slot); + + ExecClearTuple(tupslot); + } + + /* + * Drop local slots that no longer need to be synced. Do it before + * synchronize_one_slot to allow dropping of slots before actual sync + * which are invalidated locally while still valid on the primary server. + */ + drop_obsolete_slots(remote_slot_list); + + /* Now sync the slots locally */ + foreach(lc, remote_slot_list) + { + RemoteSlot *remote_slot = (RemoteSlot *) lfirst(lc); + + some_slot_updated |= synchronize_one_slot(wrconn, remote_slot); + } + + /* We are done, free remote_slot_list elements */ + list_free_deep(remote_slot_list); + + walrcv_clear_result(res); + + CommitTransactionCommand(); + + return some_slot_updated; +} + +/* + * Connect to the remote (primary) server. + * + * This uses GUC primary_conninfo in order to connect to the primary. + * For slot sync to work, primary_conninfo is required to specify dbname + * as well. + */ +static WalReceiverConn * +remote_connect(void) +{ + WalReceiverConn *wrconn = NULL; + char *err; + + wrconn = walrcv_connect(PrimaryConnInfo, true, false, + cluster_name[0] ? cluster_name : "slotsyncworker", + &err); + if (wrconn == NULL) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the primary server: %s", err))); + return wrconn; +} + +/* + * Checks the primary server info. + * + * Using the specified primary server connection, check whether we are a + * cascading standby. It also validates primary_slot_name for non-cascading + * standbys. + */ +static void +check_primary_info(WalReceiverConn *wrconn, bool *am_cascading_standby) +{ +#define PRIMARY_INFO_OUTPUT_COL_COUNT 2 + WalRcvExecResult *res; + Oid slotRow[PRIMARY_INFO_OUTPUT_COL_COUNT] = {BOOLOID, BOOLOID}; + StringInfoData cmd; + bool isnull; + TupleTableSlot *tupslot; + bool valid; + bool remote_in_recovery; + bool tuple_ok PG_USED_FOR_ASSERTS_ONLY; + + /* The syscache access in walrcv_exec() needs a transaction env. */ + StartTransactionCommand(); + + Assert(am_cascading_standby != NULL); + + *am_cascading_standby = false; /* overwritten later if cascading */ + + initStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT pg_is_in_recovery(), count(*) = 1" + " FROM pg_replication_slots" + " WHERE slot_type='physical' AND slot_name=%s", + quote_literal_cstr(PrimarySlotName)); + + res = walrcv_exec(wrconn, cmd.data, PRIMARY_INFO_OUTPUT_COL_COUNT, slotRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch primary_slot_name \"%s\" info from the " + "primary: %s", PrimarySlotName, res->err))); + + tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + tuple_ok = tuplestore_gettupleslot(res->tuplestore, true, false, tupslot); + Assert(tuple_ok); /* It must return one tuple */ + + remote_in_recovery = DatumGetBool(slot_getattr(tupslot, 1, &isnull)); + Assert(!isnull); + + /* No need to check further, return that we are cascading standby */ + if (remote_in_recovery) + { + *am_cascading_standby = true; + ExecClearTuple(tupslot); + walrcv_clear_result(res); + CommitTransactionCommand(); + return; + } + + valid = DatumGetBool(slot_getattr(tupslot, 2, &isnull)); + Assert(!isnull); + + if (!valid) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("exiting from slot synchronization due to bad configuration"), + /* translator: second %s is a GUC variable name */ + errdetail("The primary slot \"%s\" specified by %s is not valid.", + PrimarySlotName, "primary_slot_name"))); + ExecClearTuple(tupslot); + walrcv_clear_result(res); + CommitTransactionCommand(); +} + +/* + * Check that all necessary GUCs for slot synchronization are set + * appropriately. If not, raise an ERROR. + */ +static void +validate_slotsync_parameters(char **dbname) +{ + /* Sanity check. */ + Assert(enable_syncslot); + + /* + * A physical replication slot(primary_slot_name) is required on the + * primary to ensure that the rows needed by the standby are not removed + * after restarting, so that the synchronized slot on the standby will not + * be invalidated. + */ + if (PrimarySlotName == NULL || strcmp(PrimarySlotName, "") == 0) + ereport(ERROR, + /* translator: %s is a GUC variable name */ + errmsg("exiting from slot synchronization due to bad configuration"), + errhint("%s must be defined.", "primary_slot_name")); + + /* + * Hot_standby_feedback must be enabled to cooperate with the physical + * replication slot, which allows informing the primary about the xmin and + * catalog_xmin values on the standby. + */ + if (!hot_standby_feedback) + ereport(ERROR, + /* translator: %s is a GUC variable name */ + errmsg("exiting from slot synchronization due to bad configuration"), + errhint("%s must be enabled.", "hot_standby_feedback")); + + /* + * Logical decoding requires wal_level >= logical and we currently only + * synchronize logical slots. + */ + if (wal_level < WAL_LEVEL_LOGICAL) + ereport(ERROR, + /* translator: %s is a GUC variable name */ + errmsg("exiting from slot synchronization due to bad configuration"), + errhint("wal_level must be >= logical.")); + + /* + * The primary_conninfo is required to make connection to primary for + * getting slots information. + */ + if (PrimaryConnInfo == NULL || strcmp(PrimaryConnInfo, "") == 0) + ereport(ERROR, + /* translator: %s is a GUC variable name */ + errmsg("exiting from slot synchronization due to bad configuration"), + errhint("%s must be defined.", "primary_conninfo")); + + /* + * The slot sync worker needs a database connection for walrcv_exec to + * work. + */ + *dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo); + if (*dbname == NULL) + ereport(ERROR, + + /* + * translator: 'dbname' is a specific option; %s is a GUC variable + * name + */ + errmsg("exiting from slot synchronization due to bad configuration"), + errhint("'dbname' must be specified in %s.", "primary_conninfo")); +} + +/* + * Re-read the config file. + * + * If any of the slot sync GUCs have changed, exit the worker and + * let it get restarted by the postmaster. + */ +static void +slotsync_reread_config(WalReceiverConn *wrconn) +{ + char *old_primary_conninfo = pstrdup(PrimaryConnInfo); + char *old_primary_slotname = pstrdup(PrimarySlotName); + bool old_hot_standby_feedback = hot_standby_feedback; + bool conninfo_changed; + bool primary_slotname_changed; + + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + + conninfo_changed = strcmp(old_primary_conninfo, PrimaryConnInfo) != 0; + primary_slotname_changed = strcmp(old_primary_slotname, PrimarySlotName) != 0; + + if (conninfo_changed || + primary_slotname_changed || + (old_hot_standby_feedback != hot_standby_feedback)) + { + ereport(LOG, + errmsg("slot sync worker will restart because of " + "a parameter change")); + /* The exit code 1 will make postmaster restart this worker */ + proc_exit(1); + } + + pfree(old_primary_conninfo); + pfree(old_primary_slotname); +} + +/* + * Interrupt handler for main loop of slot sync worker. + */ +static void +ProcessSlotSyncInterrupts(WalReceiverConn *wrconn) +{ + CHECK_FOR_INTERRUPTS(); + + if (ShutdownRequestPending) + { + walrcv_disconnect(wrconn); + ereport(LOG, + errmsg("replication slot sync worker is shutting down " + "on receiving SIGINT")); + proc_exit(0); + } + + if (ConfigReloadPending) + slotsync_reread_config(wrconn); +} + +/* + * Cleanup function for logical replication launcher. + * + * Called on logical replication launcher exit. + */ +static void +slotsync_worker_onexit(int code, Datum arg) +{ + SpinLockAcquire(&SlotSyncWorker->mutex); + SlotSyncWorker->pid = InvalidPid; + SpinLockRelease(&SlotSyncWorker->mutex); +} + +/* + * The main loop of our worker process. + * + * It connects to the primary server, fetches logical failover slots + * information periodically in order to create and sync the slots. + */ +void +ReplSlotSyncWorkerMain(Datum main_arg) +{ + WalReceiverConn *wrconn = NULL; + char *dbname; + bool am_cascading_standby; + + ereport(LOG, errmsg("replication slot sync worker started")); + + before_shmem_exit(slotsync_worker_onexit, (Datum) 0); + + SpinLockAcquire(&SlotSyncWorker->mutex); + + Assert(SlotSyncWorker->pid == InvalidPid); + + /* Advertise our PID so that the startup process can kill us on promotion */ + SlotSyncWorker->pid = MyProcPid; + + SpinLockRelease(&SlotSyncWorker->mutex); + + /* Setup signal handling */ + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGINT, SignalHandlerForShutdownRequest); + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* Load the libpq-specific functions */ + load_file("libpqwalreceiver", false); + + validate_slotsync_parameters(&dbname); + + /* + * Connect to the database specified by user in primary_conninfo. We need + * a database connection for walrcv_exec to work. Please see comments atop + * libpqrcv_exec. + */ + BackgroundWorkerInitializeConnection(dbname, NULL, 0); + + /* Connect to the primary server */ + wrconn = remote_connect(); + + /* + * Using the specified primary server connection, check whether we are + * cascading standby and validates primary_slot_name for + * non-cascading-standbys. + */ + check_primary_info(wrconn, &am_cascading_standby); + + /* Main wait loop. */ + for (;;) + { + int rc; + long naptime = WORKER_DEFAULT_NAPTIME_MS; + TimestampTz now; + bool some_slot_updated; + + ProcessSlotSyncInterrupts(wrconn); + + if (am_cascading_standby) + { + /* + * Slot synchronization is currently not supported on cascading + * standby. So if we are on the cascading standby, skip the sync + * and take a longer nap before we check again whether we are + * still cascading standby or not. + */ + naptime = 6 * WORKER_INACTIVITY_NAPTIME_MS; /* 60 sec */ + } + else + { + some_slot_updated = synchronize_slots(wrconn); + + /* + * If any of the slots get updated in this sync-cycle, use default + * naptime and update 'last_update_time'. But if no activity is + * observed in this sync-cycle, then increase naptime provided + * inactivity time reaches threshold. + */ + now = GetCurrentTimestamp(); + if (some_slot_updated) + last_update_time = now; + else if (TimestampDifferenceExceeds(last_update_time, + now, WORKER_INACTIVITY_THRESHOLD_MS)) + naptime = WORKER_INACTIVITY_NAPTIME_MS; + } + + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + naptime, + WAIT_EVENT_REPL_SLOTSYNC_MAIN); + + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); + + /* + * One can promote the standby and we can no longer be a cascading + * standby. So recheck here. + */ + if (am_cascading_standby) + check_primary_info(wrconn, &am_cascading_standby); + } + + /* + * The slot sync worker can not get here because it will only stop when it + * receives a SIGINT from the logical replication launcher, or when there + * is an error. + */ + Assert(false); +} + +/* + * Is current process the slot sync worker? + */ +bool +IsLogicalSlotSyncWorker(void) +{ + return SlotSyncWorker->pid == MyProcPid; +} + +/* + * Shut down the slot sync worker. + */ +void +ShutDownSlotSync(void) +{ + SpinLockAcquire(&SlotSyncWorker->mutex); + if (SlotSyncWorker->pid == InvalidPid) + { + SpinLockRelease(&SlotSyncWorker->mutex); + return; + } + + kill(SlotSyncWorker->pid, SIGINT); + + SpinLockRelease(&SlotSyncWorker->mutex); + + /* Wait for it to die. */ + for (;;) + { + int rc; + + /* Wait a bit, we don't expect to have to wait long. */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 10L, WAIT_EVENT_BGWORKER_SHUTDOWN); + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + SpinLockAcquire(&SlotSyncWorker->mutex); + + /* Is it gone? */ + if (SlotSyncWorker->pid == InvalidPid) + break; + + SpinLockRelease(&SlotSyncWorker->mutex); + } + + SpinLockRelease(&SlotSyncWorker->mutex); +} + +/* + * Allocate and initialize slot sync worker shared memory + */ +void +SlotSyncWorkerShmemInit(void) +{ + Size size; + bool found; + + size = sizeof(SlotSyncWorkerCtx); + size = MAXALIGN(size); + + SlotSyncWorker = (SlotSyncWorkerCtx *) + ShmemInitStruct("Slot Sync Worker Data", size, &found); + + if (!found) + { + memset(SlotSyncWorker, 0, size); + SlotSyncWorker->pid = InvalidPid; + SpinLockInit(&SlotSyncWorker->mutex); + } +} + +/* + * Register the background worker for slots synchronization provided + * enable_syncslot is ON. + */ +void +SlotSyncWorkerRegister(void) +{ + BackgroundWorker bgw; + + if (!enable_syncslot) + { + ereport(LOG, + (errmsg("skipping slot synchronization"), + errdetail("enable_syncslot is disabled."))); + return; + } + + memset(&bgw, 0, sizeof(bgw)); + + /* We need database connection which needs shared-memory access as well. */ + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + + /* Start as soon as a consistent state has been reached in a hot standby */ + bgw.bgw_start_time = BgWorkerStart_ConsistentState_HotStandby; + + snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ReplSlotSyncWorkerMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "replication slot sync worker"); + snprintf(bgw.bgw_type, BGW_MAXLEN, + "slot sync worker"); + + bgw.bgw_restart_time = BGW_DEFAULT_RESTART_INTERVAL; + bgw.bgw_notify_pid = 0; + bgw.bgw_main_arg = (Datum) 0; + + RegisterBackgroundWorker(&bgw); +} diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index e46a1955e8..7b3784c212 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -141,7 +141,20 @@ * subscribe to the new primary without losing any data. * * However, we do not enable failover for slots created by the table sync - * worker. + * worker. This is because the table sync slot might not be fully synced on the + * standby due to the following reasons: + * + * - The standby needs to wait for the primary server to catch up because the + * local restart_lsn of the newly created slot on the standby is set using + * the latest redo position (GetXLogReplayRecPtr()), which is typically ahead + * of the primary's restart_lsn. + * - The table sync slot's restart_lsn won't be advanced until the state + * becomes SUBREL_STATE_CATCHUP. + * + * Therefore, if a failover happens before the restart_lsn advances, the table + * sync slot will not be synced to the standby. Consequently, we will not be + * able to subscribe to the promoted standby due to the absence of the + * necessary table sync slot. * * Additionally, failover is not enabled for the main slot if the table sync is * in progress. This is because if a failover occurs while the table sync diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 49d2de5024..54a2b96418 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -47,6 +47,7 @@ #include "miscadmin.h" #include "pgstat.h" #include "replication/slot.h" +#include "replication/walsender.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/proc.h" @@ -262,16 +263,22 @@ ReplicationSlotValidateName(const char *name, int elevel) * user will only get commit prepared. * failover: If enabled, allows the slot to be synced to physical standbys so * that logical replication can be resumed after failover. + * sync_state: Defines slot synchronization state. This function is expected + * to receive either SYNCSLOT_STATE_NONE for the user created slots or + * SYNCSLOT_STATE_INITIATED for the slots being synchronized on the physical + * standby. */ void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase, bool failover) + bool two_phase, bool failover, char sync_state) { ReplicationSlot *slot = NULL; int i; Assert(MyReplicationSlot == NULL); + Assert(sync_state == SYNCSLOT_STATE_NONE || + sync_state == SYNCSLOT_STATE_INITIATED); ReplicationSlotValidateName(name, ERROR); @@ -327,6 +334,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->data.two_phase = two_phase; slot->data.two_phase_at = InvalidXLogRecPtr; slot->data.failover = failover; + slot->data.sync_state = sync_state; /* and then data only present in shared memory */ slot->just_dirtied = false; @@ -686,12 +694,23 @@ restart: * Permanently drop replication slot identified by the passed in name. */ void -ReplicationSlotDrop(const char *name, bool nowait) +ReplicationSlotDrop(const char *name, bool nowait, bool user_cmd) { Assert(MyReplicationSlot == NULL); ReplicationSlotAcquire(name, nowait); + /* + * Do not allow users to drop the slots which are currently being synced + * from the primary to the standby. + */ + if (user_cmd && RecoveryInProgress() && + MyReplicationSlot->data.sync_state != SYNCSLOT_STATE_NONE) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot drop replication slot \"%s\"", name), + errdetail("This slot is being synced from the primary server."))); + ReplicationSlotDropAcquired(); } @@ -711,6 +730,17 @@ ReplicationSlotAlter(const char *name, bool failover) errmsg("cannot use %s with a physical replication slot", "ALTER_REPLICATION_SLOT")); + /* + * Do not allow users to alter the slots which are currently being synced + * from the primary to the standby. + */ + if (RecoveryInProgress() && + MyReplicationSlot->data.sync_state != SYNCSLOT_STATE_NONE) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot alter replication slot \"%s\"", name), + errdetail("This slot is being synced from the primary server."))); + SpinLockAcquire(&MyReplicationSlot->mutex); MyReplicationSlot->data.failover = failover; SpinLockRelease(&MyReplicationSlot->mutex); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index c87f61666d..e5ba5956aa 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -44,7 +44,7 @@ create_physical_replication_slot(char *name, bool immediately_reserve, /* acquire replication slot, this will check for conflicting names */ ReplicationSlotCreate(name, false, temporary ? RS_TEMPORARY : RS_PERSISTENT, false, - false); + false, SYNCSLOT_STATE_NONE); if (immediately_reserve) { @@ -137,7 +137,7 @@ create_logical_replication_slot(char *name, char *plugin, */ ReplicationSlotCreate(name, true, temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase, - failover); + failover, SYNCSLOT_STATE_NONE); /* * Create logical decoding context to find start point or, if we don't @@ -226,11 +226,38 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) CheckSlotRequirements(); - ReplicationSlotDrop(NameStr(*name), true); + ReplicationSlotDrop(NameStr(*name), true, true); PG_RETURN_VOID(); } +/* + * SQL function for getting invalidation cause of a slot. + * + * Returns ReplicationSlotInvalidationCause enum value for valid slot_name; + * returns NULL if slot with given name is not found. + * + * Returns RS_INVAL_NONE if the given slot is not invalidated. + */ +Datum +pg_get_slot_invalidation_cause(PG_FUNCTION_ARGS) +{ + Name name = PG_GETARG_NAME(0); + ReplicationSlot *s; + ReplicationSlotInvalidationCause cause; + + s = SearchNamedReplicationSlot(NameStr(*name), true); + + if (s == NULL) + PG_RETURN_NULL(); + + SpinLockAcquire(&s->mutex); + cause = s->data.invalidated; + SpinLockRelease(&s->mutex); + + PG_RETURN_INT16(cause); +} + /* * pg_get_replication_slots - SQL SRF showing all replication slots * that currently exist on the database cluster. @@ -238,7 +265,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 16 +#define PG_GET_REPLICATION_SLOTS_COLS 17 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; XLogRecPtr currlsn; int slotno; @@ -420,6 +447,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) values[i++] = BoolGetDatum(slot_contents.data.failover); + values[i++] = CharGetDatum(slot_contents.data.sync_state); + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index fddc9310c6..8d48d7c8f4 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1071,7 +1071,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) { ReplicationSlotCreate(cmd->slotname, false, cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT, - false, false); + false, false, SYNCSLOT_STATE_NONE); if (reserve_wal) { @@ -1102,7 +1102,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) */ ReplicationSlotCreate(cmd->slotname, true, cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, - two_phase, failover); + two_phase, failover, SYNCSLOT_STATE_NONE); /* * Do options check early so that we can bail before calling the @@ -1254,7 +1254,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) static void DropReplicationSlot(DropReplicationSlotCmd *cmd) { - ReplicationSlotDrop(cmd->slotname, !cmd->wait); + ReplicationSlotDrop(cmd->slotname, !cmd->wait, true); } /* diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 0e0ac22bdd..ae2daff87b 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -37,6 +37,7 @@ #include "replication/slot.h" #include "replication/walreceiver.h" #include "replication/walsender.h" +#include "replication/worker_internal.h" #include "storage/bufmgr.h" #include "storage/dsm.h" #include "storage/ipc.h" @@ -339,6 +340,7 @@ CreateOrAttachShmemStructs(void) WalRcvShmemInit(); PgArchShmemInit(); ApplyLauncherShmemInit(); + SlotSyncWorkerShmemInit(); /* * Set up other modules that need some shared memory space diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 7298a187d1..d87020821c 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -3286,6 +3286,17 @@ ProcessInterrupts(void) */ proc_exit(1); } + else if (IsLogicalSlotSyncWorker()) + { + elog(DEBUG1, + "replication slot sync worker is shutting down due to administrator command"); + + /* + * Slot sync worker can be stopped at any time. + * Use exit status 1 so the background worker is restarted. + */ + proc_exit(1); + } else if (IsBackgroundWorker) ereport(FATAL, (errcode(ERRCODE_ADMIN_SHUTDOWN), diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index ede94a1ede..7eb735824c 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -53,6 +53,8 @@ LOGICAL_APPLY_MAIN "Waiting in main loop of logical replication apply process." LOGICAL_LAUNCHER_MAIN "Waiting in main loop of logical replication launcher process." LOGICAL_PARALLEL_APPLY_MAIN "Waiting in main loop of logical replication parallel apply process." RECOVERY_WAL_STREAM "Waiting in main loop of startup process for WAL to arrive, during streaming recovery." +REPL_SLOTSYNC_MAIN "Waiting in main loop of slot sync worker." +REPL_SLOTSYNC_PRIMARY_CATCHUP "Waiting for the primary to catch-up, in slot sync worker." SYSLOGGER_MAIN "Waiting in main loop of syslogger process." WAL_RECEIVER_MAIN "Waiting in main loop of WAL receiver process." WAL_SENDER_MAIN "Waiting in main loop of WAL sender process." diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 9a54e04821..7d50971fd7 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -67,6 +67,7 @@ #include "replication/logicallauncher.h" #include "replication/slot.h" #include "replication/syncrep.h" +#include "replication/worker_internal.h" #include "storage/bufmgr.h" #include "storage/large_object.h" #include "storage/pg_shmem.h" @@ -2020,6 +2021,15 @@ struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"enable_syncslot", PGC_POSTMASTER, REPLICATION_STANDBY, + gettext_noop("Enables a physical standby to synchronize logical failover slots from the primary server."), + }, + &enable_syncslot, + false, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 5d940b72cd..39224137e1 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -358,6 +358,7 @@ #wal_retrieve_retry_interval = 5s # time to wait before retrying to # retrieve WAL after a failed attempt #recovery_min_apply_delay = 0 # minimum delay for applying changes during recovery +#enable_syncslot = off # enables slot synchronization on the physical standby from the primary # - Subscribers - diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index bbc03bb76b..82a11e4a31 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11095,14 +11095,18 @@ proname => 'pg_drop_replication_slot', provolatile => 'v', proparallel => 'u', prorettype => 'void', proargtypes => 'name', prosrc => 'pg_drop_replication_slot' }, +{ oid => '8484', descr => 'what caused the replication slot to become invalid', + proname => 'pg_get_slot_invalidation_cause', provolatile => 's', proisstrict => 't', + prorettype => 'int2', proargtypes => 'name', + prosrc => 'pg_get_slot_invalidation_cause' }, { oid => '3781', descr => 'information about replication slots currently in use', proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool,bool}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting,failover}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool,bool,char}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting,failover,sync_state}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', diff --git a/src/include/postmaster/bgworker.h b/src/include/postmaster/bgworker.h index e90ff376a6..8559900b70 100644 --- a/src/include/postmaster/bgworker.h +++ b/src/include/postmaster/bgworker.h @@ -79,6 +79,7 @@ typedef enum BgWorkerStart_PostmasterStart, BgWorkerStart_ConsistentState, BgWorkerStart_RecoveryFinished, + BgWorkerStart_ConsistentState_HotStandby, } BgWorkerStartTime; #define BGW_DEFAULT_RESTART_INTERVAL 60 diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index bbd71d0b42..945d2608f6 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -22,6 +22,7 @@ extern void TablesyncWorkerMain(Datum main_arg); extern bool IsLogicalWorker(void); extern bool IsLogicalParallelApplyWorker(void); +extern bool IsLogicalSlotSyncWorker(void); extern void HandleParallelApplyMessageInterrupt(void); extern void HandleParallelApplyMessages(void); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 5ddad69348..9e4389b991 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -52,6 +52,14 @@ typedef enum ReplicationSlotInvalidationCause RS_INVAL_WAL_LEVEL, } ReplicationSlotInvalidationCause; +/* The possible values for 'sync_state' in ReplicationSlotPersistentData */ +#define SYNCSLOT_STATE_NONE 'n' /* None for user created slots */ +#define SYNCSLOT_STATE_INITIATED 'i' /* Sync initiated for the slot but + * not completed yet, waiting for + * the primary server to catch-up */ +#define SYNCSLOT_STATE_READY 'r' /* Initialization complete, ready + * to be synced further */ + /* * On-Disk data of a replication slot, preserved across restarts. */ @@ -112,6 +120,15 @@ typedef struct ReplicationSlotPersistentData /* plugin name */ NameData plugin; + /* + * Synchronization state for a logical slot. + * + * The standby can have any value among the possible values of 'i','r' and + * 'n'. For primary, the default is 'n' for all slots but may also be 'r' + * if leftover from a promoted standby. + */ + char sync_state; + /* * Is this a failover slot (sync candidate for physical standbys)? Only * relevant for logical slots on the primary server. @@ -225,9 +242,10 @@ extern void ReplicationSlotsShmemInit(void); /* management of individual slots */ extern void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase, bool failover); + bool two_phase, bool failover, + char sync_state); extern void ReplicationSlotPersist(void); -extern void ReplicationSlotDrop(const char *name, bool nowait); +extern void ReplicationSlotDrop(const char *name, bool nowait, bool user_cmd); extern void ReplicationSlotAlter(const char *name, bool failover); extern void ReplicationSlotAcquire(const char *name, bool nowait); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index f1135762fb..259d0f7065 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -279,6 +279,21 @@ typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn, typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn, TimeLineID *primary_tli); +/* + * walrcv_get_dbinfo_for_failover_slots_fn + * + * Run LIST_DBID_FOR_FAILOVER_SLOTS on primary server to get the + * list of unique DBIDs for failover logical slots + */ +typedef List *(*walrcv_get_dbinfo_for_failover_slots_fn) (WalReceiverConn *conn); + +/* + * walrcv_get_dbname_from_conninfo_fn + * + * Returns the dbid from the primary_conninfo + */ +typedef char *(*walrcv_get_dbname_from_conninfo_fn) (const char *conninfo); + /* * walrcv_server_version_fn * @@ -403,6 +418,7 @@ typedef struct WalReceiverFunctionsType walrcv_get_conninfo_fn walrcv_get_conninfo; walrcv_get_senderinfo_fn walrcv_get_senderinfo; walrcv_identify_system_fn walrcv_identify_system; + walrcv_get_dbname_from_conninfo_fn walrcv_get_dbname_from_conninfo; walrcv_server_version_fn walrcv_server_version; walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile; walrcv_startstreaming_fn walrcv_startstreaming; @@ -428,6 +444,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port) #define walrcv_identify_system(conn, primary_tli) \ WalReceiverFunctions->walrcv_identify_system(conn, primary_tli) +#define walrcv_get_dbname_from_conninfo(conninfo) \ + WalReceiverFunctions->walrcv_get_dbname_from_conninfo(conninfo) #define walrcv_server_version(conn) \ WalReceiverFunctions->walrcv_server_version(conn) #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 84bb79ac0f..9406a2666f 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -237,6 +237,11 @@ extern PGDLLIMPORT bool in_remote_transaction; extern PGDLLIMPORT bool InitializingApplyWorker; +/* Slot sync worker objects */ +extern PGDLLIMPORT char *PrimaryConnInfo; +extern PGDLLIMPORT char *PrimarySlotName; +extern PGDLLIMPORT bool enable_syncslot; + extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); @@ -326,6 +331,12 @@ extern void pa_decr_and_wait_stream_block(void); extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); +extern void ReplSlotSyncWorkerMain(Datum main_arg); +extern void SlotSyncWorkerRegister(void); +extern void ShutDownSlotSync(void); +extern void slotsync_drop_initiated_slots(void); +extern void SlotSyncWorkerShmemInit(void); + #define isParallelApplyWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_PARALLEL_APPLY) #define isTablesyncWorker(worker) ((worker)->in_use && \ diff --git a/src/test/recovery/t/050_standby_failover_slots_sync.pl b/src/test/recovery/t/050_standby_failover_slots_sync.pl index 5564577bbe..277fc2e079 100644 --- a/src/test/recovery/t/050_standby_failover_slots_sync.pl +++ b/src/test/recovery/t/050_standby_failover_slots_sync.pl @@ -296,5 +296,150 @@ is( $primary->safe_psql( 'logical slot has failover true on the primary'); $subscriber1->safe_psql('postgres', "DROP SUBSCRIPTION regress_mysub3"); +$primary->safe_psql('postgres', "TRUNCATE tab_int"); + +################################################## +# Test logical failover slots on the standby +# Configure standby1 to replicate and synchronize logical slots configured +# for failover on the primary +# +# failover slot lsub1_slot->| ----> subscriber1 (connected via logical replication) +# primary ---> | +# physical slot sb1_slot--->| ----> standby1 (connected via streaming replication) +# | lsub1_slot(synced_slot) +################################################## + +my $connstr_1 = $primary->connstr; +$standby1->stop; +$standby1->append_conf( + 'postgresql.conf', q{ +enable_syncslot = true +hot_standby_feedback = on +primary_slot_name = 'sb1_slot' +}); +$standby1->append_conf( + 'postgresql.conf', qq( +primary_conninfo = '$connstr_1 dbname=postgres' +)); + +# Add this standby into the primary's configuration +$primary->stop; +$primary->append_conf( + 'postgresql.conf', qq( +standby_slot_names = 'sb1_slot' +)); +$primary->start; + +my $standby1_conninfo = $standby1->connstr . ' dbname=postgres'; + +# Wait for the standby to start sync +$offset = -s $standby1->logfile; +$standby1->start; +$standby1->wait_for_log( + qr/LOG: ( [A-Z0-9]+:)? waiting for remote slot \"lsub1_slot\"/, + $offset); + +# Advance lsn on the primary +$primary->safe_psql('postgres', + "SELECT pg_log_standby_snapshot(); + SELECT pg_log_standby_snapshot(); + SELECT pg_log_standby_snapshot();"); + +# Wait for the standby to finish sync +$offset = -s $standby1->logfile; +$standby1->wait_for_log( + qr/LOG: ( [A-Z0-9]+:)? wait over for remote slot \"lsub1_slot\"/, + $offset); + +# Confirm that logical failover slot is created on the standby and is sync ready +is($standby1->safe_psql('postgres', + q{SELECT failover, sync_state FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';}), + "t|r", + 'logical slot has failover as true and sync_state as ready on standby'); + +################################################## +# Test that a synchronized slot can not be decoded, altered and dropped by the user +################################################## + +# Disable hot_standby_feedback temporarily to stop slot sync worker otherwise +# the concerned testing scenarios here may be interrupted by different error: +# 'ERROR: replication slot is active for PID ..' + +$standby1->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = off;'); +$standby1->restart; + +# Dropping of synced slot should result in error +my ($result1, $stdout, $stderr) = $standby1->psql('postgres', + "SELECT pg_drop_replication_slot('lsub1_slot');"); +ok($stderr =~ /ERROR: cannot drop replication slot "lsub1_slot"/, + "synced slot on standby cannot be dropped"); + +# Logical decoding on synced slot should result in error +($result1, $stdout, $stderr) = $standby1->psql('postgres', + "select * from pg_logical_slot_get_changes('lsub1_slot',NULL,NULL);"); +ok($stderr =~ /ERROR: cannot use replication slot "lsub1_slot" for logical decoding/, + "logical decoding is not allowed on synced slot"); + +($result, $stdout, $stderr) = $standby1->psql( + 'postgres', + qq[ALTER_REPLICATION_SLOT lsub1_slot (failover);], + replication => 'database'); +ok($stderr =~ /ERROR: cannot alter replication slot "lsub1_slot"/, + "synced slot on standby cannot be altered"); + +# Enable hot_standby_feedback and restart standby +$standby1->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = on;'); +$standby1->restart; + +################################################## +# Create another slot which stays in sync_state as initiated ('i') +# because it's a manually created slot and its lsn is not advanced. +################################################## + +# Create a logical slot with failover = true +$primary->psql('postgres', + q{SELECT pg_create_logical_replication_slot('logical_slot','pgoutput', false, true, true);}); + +# Wait for the standby to start sync +$offset = -s $standby1->logfile; +$standby1->wait_for_log( + qr/LOG: ( [A-Z0-9]+:)? waiting for remote slot \"logical_slot\"/, + $offset); + +# Confirm that the logical slot is created on the standby and is in sync initiated state + is($standby1->safe_psql('postgres', + q{SELECT failover, sync_state FROM pg_replication_slots WHERE slot_name = 'logical_slot';}), + "t|i", + 'logical slot has failover as true and sync_state as initiated on standby'); + +################################################## +# Promote the standby1 to primary. Confirm that: +# a) the sync-ready('r') slot 'lsub1_slot' is retained on the new primary +# b) the initiated('i') slot 'logical_slot' is dropped on promotion +# c) logical replication for regress_mysub1 is resumed successfully after failover +################################################## +$standby1->promote; + +# Update subscription with the new primary's connection info +$subscriber1->safe_psql('postgres', + "ALTER SUBSCRIPTION regress_mysub1 DISABLE; + ALTER SUBSCRIPTION regress_mysub1 CONNECTION '$standby1_conninfo'; + ALTER SUBSCRIPTION regress_mysub1 ENABLE; "); + +is($standby1->safe_psql('postgres', + q{SELECT slot_name FROM pg_replication_slots WHERE slot_name in ('logical_slot','lsub1_slot');}), + 'lsub1_slot', + 'synced slot retained on the new primary'); + +# Insert data on the new primary +$primary_row_count = 10; +$standby1->safe_psql('postgres', + "INSERT INTO tab_int SELECT generate_series(1, $primary_row_count);"); +$standby1->wait_for_catchup('regress_mysub1'); + +# Confirm that data in tab_int replicated on subscriber +is( $subscriber1->safe_psql('postgres', q{SELECT count(*) FROM tab_int;}), + "$primary_row_count", + 'data replicated from the new primary'); done_testing(); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index cb3b04aa0c..f2e5a3849c 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1474,8 +1474,9 @@ pg_replication_slots| SELECT l.slot_name, l.safe_wal_size, l.two_phase, l.conflicting, - l.failover - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting, failover) + l.failover, + l.sync_state + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting, failover, sync_state) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out index 271313ebf8..aac83755de 100644 --- a/src/test/regress/expected/sysviews.out +++ b/src/test/regress/expected/sysviews.out @@ -132,8 +132,9 @@ select name, setting from pg_settings where name like 'enable%'; enable_self_join_removal | on enable_seqscan | on enable_sort | on + enable_syncslot | off enable_tidscan | on -(22 rows) +(23 rows) -- There are always wait event descriptions for various types. select type, count(*) > 0 as ok FROM pg_wait_events diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 199863c6b5..10c6f32a30 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2321,6 +2321,7 @@ RelocationBufferInfo RelptrFreePageBtree RelptrFreePageManager RelptrFreePageSpanLeader +RemoteSlot RenameStmt ReopenPtrType ReorderBuffer @@ -2579,6 +2580,7 @@ SlabBlock SlabContext SlabSlot SlotNumber +SlotSyncWorkerCtx SlruCtl SlruCtlData SlruErrorCause -- 2.34.1