From 07a27c150897971049ed1edacdc62504c66708e4 Mon Sep 17 00:00:00 2001 From: Shveta Malik Date: Thu, 4 Jan 2024 09:37:50 +0530 Subject: [PATCH v57 2/4] Add logical slot sync capability to the physical standby This patch implements synchronization of logical replication slots from the primary server to the physical standby so that logical replication can be resumed after failover. GUC 'enable_syncslot' enables a physical standby to synchronize failover logical replication slots from the primary server. The logical replication slots on the primary can be synchronized to the hot standby by enabling the failover option during slot creation and setting 'enable_syncslot' on the standby. For the synchronization to work, it is mandatory to have a physical replication slot between the primary and the standby, and hot_standby_feedback must be enabled on the standby. All the failover logical replication slots on the primary (assuming configurations are appropriate) are automatically created on the physical standbys and are synced periodically. Slot-sync worker on the standby server ping the primary server at regular intervals to get the necessary failover logical slots information and create/update the slots locally. The nap time of the worker is tuned according to the activity on the primary. The worker starts with nap time of 10ms and if no activity is observed on the primary for some time, then nap time is increased to 10sec. If activity is observed again, nap time is reduced back to 10ms. The logical slots created by slot-sync worker on physical standbys are not allowed to be dropped or consumed. Any attempt to perform logical decoding on such slots will result in an error. If a logical slot is invalidated on the primary, slot on the standby is also invalidated. If a logical slot on the primary is valid but is invalidated on the standby, then that slot is dropped and recreated on the standby in next sync-cycle provided the slot still exists on the primary server. It is okay to recreate such slots as long as these are not consumable on the standby (which is the case currently). This situation may occur due to the following reasons: - The max_slot_wal_keep_size on the standby is insufficient to retain WAL records from the restart_lsn of the slot. - primary_slot_name is temporarily reset to null and the physical slot is removed. - The primary changes wal_level to a level lower than logical. The slots synchronization status on the standby can be monitored using 'sync_state' column of pg_replication_slots view. The values are: 'none': for user slots, 'initiated': sync initiated for the slot but slot is not ready yet for periodic syncs, 'ready': ready for periodic syncs. --- doc/src/sgml/bgworker.sgml | 65 +- doc/src/sgml/config.sgml | 27 +- doc/src/sgml/logicaldecoding.sgml | 31 + doc/src/sgml/system-views.sgml | 35 + src/backend/access/transam/xlogrecovery.c | 18 + src/backend/catalog/system_views.sql | 3 +- src/backend/postmaster/bgworker.c | 4 + src/backend/postmaster/postmaster.c | 10 + .../libpqwalreceiver/libpqwalreceiver.c | 41 + src/backend/replication/logical/Makefile | 1 + src/backend/replication/logical/logical.c | 25 + src/backend/replication/logical/meson.build | 1 + src/backend/replication/logical/slotsync.c | 1357 +++++++++++++++++ src/backend/replication/logical/worker.c | 15 +- src/backend/replication/slot.c | 35 +- src/backend/replication/slotfuncs.c | 27 +- src/backend/replication/walsender.c | 4 +- src/backend/storage/ipc/ipci.c | 2 + src/backend/tcop/postgres.c | 11 + .../utils/activity/wait_event_names.txt | 2 + src/backend/utils/misc/guc_tables.c | 10 + src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/catalog/pg_proc.dat | 6 +- src/include/postmaster/bgworker.h | 1 + src/include/replication/logicalworker.h | 1 + src/include/replication/slot.h | 29 +- src/include/replication/walreceiver.h | 18 + src/include/replication/worker_internal.h | 11 + .../t/050_standby_failover_slots_sync.pl | 185 ++- src/test/regress/expected/rules.out | 5 +- src/test/regress/expected/sysviews.out | 3 +- src/tools/pgindent/typedefs.list | 2 + 32 files changed, 1951 insertions(+), 35 deletions(-) create mode 100644 src/backend/replication/logical/slotsync.c diff --git a/doc/src/sgml/bgworker.sgml b/doc/src/sgml/bgworker.sgml index 2c393385a9..a7cfe6c58c 100644 --- a/doc/src/sgml/bgworker.sgml +++ b/doc/src/sgml/bgworker.sgml @@ -114,18 +114,59 @@ typedef struct BackgroundWorker bgw_start_time is the server state during which - postgres should start the process; it can be one of - BgWorkerStart_PostmasterStart (start as soon as - postgres itself has finished its own initialization; processes - requesting this are not eligible for database connections), - BgWorkerStart_ConsistentState (start as soon as a consistent state - has been reached in a hot standby, allowing processes to connect to - databases and run read-only queries), and - BgWorkerStart_RecoveryFinished (start as soon as the system has - entered normal read-write state). Note the last two values are equivalent - in a server that's not a hot standby. Note that this setting only indicates - when the processes are to be started; they do not stop when a different state - is reached. + postgres should start the process. Note that this setting + only indicates when the processes are to be started; they do not stop when + a different state is reached. Possible values are: + + + + BgWorkerStart_PostmasterStart + + + BgWorkerStart_PostmasterStart + Start as soon as postgres itself has finished its own initialization; + processes requesting this are not eligible for database connections. + + + + + + BgWorkerStart_ConsistentState + + + BgWorkerStart_ConsistentState + Start as soon as a consistent state has been reached in a hot-standby, + allowing processes to connect to databases and run read-only queries. + + + + + + BgWorkerStart_ConsistentState_HotStandby + + + BgWorkerStart_ConsistentState_HotStandby + Same meaning as BgWorkerStart_ConsistentState but + it is more strict in terms of the server i.e. start the worker only + if it is hot-standby. + + + + + + BgWorkerStart_RecoveryFinished + + + BgWorkerStart_RecoveryFinished + Start as soon as the system has entered normal read-write state. Note + that the BgWorkerStart_ConsistentState and + BgWorkerStart_RecoveryFinished are equivalent + in a server that's not a hot standby. + + + + + diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index f323bba018..cd9ae70c41 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4611,8 +4611,13 @@ ANY num_sync ( ) then it is also + necessary to specify dbname in the + primary_conninfo string. This will only be used for + slot synchronization. It is ignored for streaming. This parameter can only be set in the postgresql.conf @@ -4937,6 +4942,24 @@ ANY num_sync ( + enable_syncslot (boolean) + + enable_syncslot configuration parameter + + + + + It enables a physical standby to synchronize logical failover slots + from the primary server so that logical subscribers are not blocked + after failover. + + + It is disabled by default. This parameter can only be set in the + postgresql.conf file or on the server command line. + + + diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index cd152d4ced..de6cdbe2bc 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -346,6 +346,37 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU pg_log_standby_snapshot function on the primary. + + A logical replication slot on the primary can be synchronized to the hot + standby by enabling the failover option during slot creation and setting + on the standby. For the synchronization + to work, it is mandatory to have a physical replication slot between the + primary and the standby, and hot_standby_feedback must + be enabled on the standby. It's also highly recommended that the said + physical replication slot is named in standby_slot_names + list on the primary, to prevent the subscriber from consuming changes + faster than the hot standby. + + + + The ability to resume logical replication after failover depends upon the + pg_replication_slots.sync_state + value for the synchronized slots on the standby at the time of failover. + Only slots that have attained "ready" sync_state ('r') on the standby + before failover can be used for logical replication after failover. Slots + that have not yet reached 'r' state (they are still 'i') will be dropped, + therefore logical replication for those slots cannot be resumed. For + example, if the synchronized slot could not become sync-ready on the + standby due to a disabled subscription, then the subscription cannot be + resumed after failover even when it is enabled. + + + If the primary is idle, then the synchronized slots on the standby may + take a noticeable time to reach the ready ('r') sync_state. This can + be sped up by calling the + pg_log_standby_snapshot function on the primary. + + Replication slots persist across crashes and know nothing about the state diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index c10880b200..5eb7d1bc8a 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2566,6 +2566,41 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx after failover. Always false for physical slots. + + + + sync_state text + + + Defines slot synchronization state. This is meaningful on the physical + standby which has configured = true. + Possible values are: + + + none = for user created slots, + + + + initiated = sync initiated for the slot but slot + is not ready yet for periodic syncs, + + + + + ready = ready for periodic syncs. + + + + + + The hot standby can have any of these sync_state values for the slots but + on a hot standby, the slots with state 'ready' and 'initiated' can neither + be used for logical decoding nor dropped by the user. + The sync_state has no meaning on the primary server; the primary + sync_state value is default 'none' for all slots but may (if leftover + from a promoted standby) also be 'ready'. + + diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 1b48d7171a..d67c5c1793 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -50,6 +50,7 @@ #include "postmaster/startup.h" #include "replication/slot.h" #include "replication/walreceiver.h" +#include "replication/worker_internal.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/latch.h" @@ -1441,6 +1442,23 @@ FinishWalRecovery(void) */ XLogShutdownWalRcv(); + /* + * Shutdown the slot sync workers to prevent potential conflicts between + * user processes and slotsync workers after a promotion. Additionally, + * drop any slots that have initiated but not yet completed the sync + * process. + * + * We do not update the sync_state from READY to NONE here, as any failed + * update could leave some slots in the 'NONE' state, causing issues during + * slot sync after restarting the server as a standby. While updating after + * switching to the new timeline is an option, it does not simplify the + * handling for both READY and NONE state slots. Therefore, we retain the + * READY state slots after promotion as they can provide useful information + * about their origin. + */ + ShutDownSlotSync(); + slotsync_drop_initiated_slots(); + /* * We are now done reading the xlog from stream. Turn off streaming * recovery to force fetching the files (which would be required at end of diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 1e954147e5..df2d4bd01a 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1024,7 +1024,8 @@ CREATE VIEW pg_replication_slots AS L.safe_wal_size, L.two_phase, L.conflict_reason, - L.failover + L.failover, + L.sync_state FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 67f92c24db..46828b8a89 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -21,6 +21,7 @@ #include "postmaster/postmaster.h" #include "replication/logicallauncher.h" #include "replication/logicalworker.h" +#include "replication/worker_internal.h" #include "storage/dsm.h" #include "storage/ipc.h" #include "storage/latch.h" @@ -129,6 +130,9 @@ static const struct { "ApplyWorkerMain", ApplyWorkerMain }, + { + "ReplSlotSyncWorkerMain", ReplSlotSyncWorkerMain + }, { "ParallelApplyWorkerMain", ParallelApplyWorkerMain }, diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index feb471dd1d..d90d5d1576 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -116,6 +116,7 @@ #include "postmaster/walsummarizer.h" #include "replication/logicallauncher.h" #include "replication/walsender.h" +#include "replication/worker_internal.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/pg_shmem.h" @@ -1010,6 +1011,12 @@ PostmasterMain(int argc, char *argv[]) */ ApplyLauncherRegister(); + /* + * Register the slot sync worker here to kick start slot-sync operation + * sooner on the physical standby. + */ + SlotSyncWorkerRegister(); + /* * process any libraries that should be preloaded at postmaster start */ @@ -5799,6 +5806,9 @@ bgworker_should_start_now(BgWorkerStartTime start_time) case PM_HOT_STANDBY: if (start_time == BgWorkerStart_ConsistentState) return true; + if (start_time == BgWorkerStart_ConsistentState_HotStandby && + pmState != PM_RUN) + return true; /* fall through */ case PM_RECOVERY: diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 45c94006ae..7a6ff557bc 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -34,6 +34,7 @@ #include "utils/memutils.h" #include "utils/pg_lsn.h" #include "utils/tuplestore.h" +#include "utils/varlena.h" PG_MODULE_MAGIC; @@ -58,6 +59,7 @@ static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port); static char *libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli); +static char *libpqrcv_get_dbname_from_conninfo(const char *conninfo); static int libpqrcv_server_version(WalReceiverConn *conn); static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, TimeLineID tli, char **filename, @@ -100,6 +102,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { .walrcv_send = libpqrcv_send, .walrcv_create_slot = libpqrcv_create_slot, .walrcv_alter_slot = libpqrcv_alter_slot, + .walrcv_get_dbname_from_conninfo = libpqrcv_get_dbname_from_conninfo, .walrcv_get_backend_pid = libpqrcv_get_backend_pid, .walrcv_exec = libpqrcv_exec, .walrcv_disconnect = libpqrcv_disconnect @@ -418,6 +421,44 @@ libpqrcv_server_version(WalReceiverConn *conn) return PQserverVersion(conn->streamConn); } +/* + * Get database name from the primary server's conninfo. + * + * If dbname is not found in connInfo, return NULL value. + */ +static char * +libpqrcv_get_dbname_from_conninfo(const char *connInfo) +{ + PQconninfoOption *opts; + char *dbname = NULL; + char *err = NULL; + + opts = PQconninfoParse(connInfo, &err); + if (opts == NULL) + { + /* The error string is malloc'd, so we must free it explicitly */ + char *errcopy = err ? pstrdup(err) : "out of memory"; + + PQfreemem(err); + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("invalid connection string syntax: %s", errcopy))); + } + + for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt) + { + /* + * If multiple dbnames are specified, then the last one will be + * returned + */ + if (strcmp(opt->keyword, "dbname") == 0 && opt->val && + opt->val[0] != '\0') + dbname = pstrdup(opt->val); + } + + return dbname; +} + /* * Start streaming WAL data from given streaming options. * diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index 2dc25e37bb..ba03eeff1c 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -25,6 +25,7 @@ OBJS = \ proto.o \ relation.o \ reorderbuffer.o \ + slotsync.o \ snapbuild.o \ tablesync.o \ worker.o diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index ca09c683f1..ee9f2445a8 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -524,6 +524,31 @@ CreateDecodingContext(XLogRecPtr start_lsn, errmsg("replication slot \"%s\" was not created in this database", NameStr(slot->data.name)))); + if (RecoveryInProgress()) + { + /* + * Do not allow consumption of a "synchronized" slot until the standby + * gets promoted. + */ + if (slot->data.sync_state != SYNCSLOT_STATE_NONE) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot use replication slot \"%s\" for logical" + " decoding", NameStr(slot->data.name)), + errdetail("This slot is being synced from the primary server."), + errhint("Specify another replication slot.")); + } + else + { + /* + * Slots in state SYNCSLOT_STATE_INITIATED should have been dropped on + * promotion. + */ + if (slot->data.sync_state == SYNCSLOT_STATE_INITIATED) + elog(ERROR, "replication slot \"%s\" was not synced completely" + " from the primary server", NameStr(slot->data.name)); + } + /* * Check if slot has been invalidated due to max_slot_wal_keep_size. Avoid * "cannot get changes" wording in this errmsg because that'd be diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index 1050eb2c09..3dec36a6de 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -11,6 +11,7 @@ backend_sources += files( 'proto.c', 'relation.c', 'reorderbuffer.c', + 'slotsync.c', 'snapbuild.c', 'tablesync.c', 'worker.c', diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c new file mode 100644 index 0000000000..a84ab020ee --- /dev/null +++ b/src/backend/replication/logical/slotsync.c @@ -0,0 +1,1357 @@ +/*------------------------------------------------------------------------- + * slotsync.c + * PostgreSQL worker for synchronizing slots to a standby server from the + * primary server. + * + * Copyright (c) 2023, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/slotsync.c + * + * This file contains the code for slot sync worker on a physical standby + * to fetch logical failover slots information from the primary server, + * create the slots on the standby and synchronize them periodically. + * + * While creating the slot on physical standby, if the local restart_lsn and/or + * local catalog_xmin is ahead of those on the remote then the worker cannot + * create the local slot in sync with the primary server because that would + * mean moving the local slot backwards and the standby might not have WALs + * retained for old LSN. In this case, the worker will wait for the primary + * server slot's restart_lsn and catalog_xmin to catch up with the local one + * before attempting the actual sync. Meanwhile, it will persist the slot with + * sync_state as SYNCSLOT_STATE_INITIATED('i'). Once the primary server catches + * up, it will move the slot to SYNCSLOT_STATE_READY('r') state and will perform + * the sync periodically. + * + * The worker also takes care of dropping the slots which were created by it + * and are currently not needed to be synchronized. + * + * It takes a nap of WORKER_DEFAULT_NAPTIME_MS before every next + * synchronization. If there is no activity observed on the primary server for + * some time, the nap time is increased to WORKER_INACTIVITY_NAPTIME_MS, but if + * any activity is observed, the nap time reverts to the default value. + *--------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/genam.h" +#include "access/table.h" +#include "access/xlogrecovery.h" +#include "catalog/pg_database.h" +#include "commands/dbcommands.h" +#include "pgstat.h" +#include "postmaster/bgworker.h" +#include "postmaster/interrupt.h" +#include "replication/logical.h" +#include "replication/logicallauncher.h" +#include "replication/logicalworker.h" +#include "replication/walreceiver.h" +#include "replication/worker_internal.h" +#include "storage/ipc.h" +#include "storage/procarray.h" +#include "tcop/tcopprot.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" +#include "utils/guc_hooks.h" +#include "utils/pg_lsn.h" +#include "utils/varlena.h" + +/* + * Structure to hold information fetched from the primary server about a logical + * replication slot. + */ +typedef struct RemoteSlot +{ + char *name; + char *plugin; + char *database; + bool two_phase; + bool failover; + XLogRecPtr restart_lsn; + XLogRecPtr confirmed_lsn; + TransactionId catalog_xmin; + + /* RS_INVAL_NONE if valid, or the reason of invalidation */ + ReplicationSlotInvalidationCause invalidated; +} RemoteSlot; + +/* + * Struct for sharing information between startup process and slot + * sync worker. + * + * Slot sync worker's pid is needed by startup process in order to + * shut it down during promotion. + */ +typedef struct SlotSyncWorkerCtxStruct +{ + pid_t pid; + slock_t mutex; +} SlotSyncWorkerCtxStruct; + +SlotSyncWorkerCtxStruct *SlotSyncWorker = NULL; + +/* GUC variable */ +bool enable_syncslot = false; + +/* The last sync-cycle time when the worker updated any of the slots. */ +static TimestampTz last_update_time; + +/* Worker's nap time in case of regular activity on the primary server */ +#define WORKER_DEFAULT_NAPTIME_MS 10L /* 10 ms */ + +/* Worker's nap time in case of no-activity on the primary server */ +#define WORKER_INACTIVITY_NAPTIME_MS 10000L /* 10 sec */ + +/* + * Inactivity Threshold in ms before increasing nap time of worker. + * + * If the lsn of slot being monitored did not change for this threshold time, + * then increase nap time of current worker from WORKER_DEFAULT_NAPTIME_MS to + * WORKER_INACTIVITY_NAPTIME_MS. + */ +#define WORKER_INACTIVITY_THRESHOLD_MS 10000L /* 10 sec */ + +static void ProcessSlotSyncInterrupts(WalReceiverConn *wrconn); + +/* + * Wait for remote slot to pass locally reserved position. + * + * Ping and wait for the primary server for + * WAIT_PRIMARY_CATCHUP_ATTEMPTS during a slot creation, if it still + * does not catch up, abort the wait. The ones for which wait is aborted will + * attempt the wait and sync in the next sync-cycle. + * + * If passed, *wait_attempts_exceeded will be set to true only if this + * function exits due to exhausting its wait attempts. It will be false + * in all the other cases. + * + * Returns true if remote_slot could catch up with the locally reserved + * position. + */ +static bool +wait_for_primary_slot_catchup(WalReceiverConn *wrconn, RemoteSlot *remote_slot, + bool *wait_attempts_exceeded) +{ +#define WAIT_OUTPUT_COLUMN_COUNT 4 +#define WAIT_PRIMARY_CATCHUP_ATTEMPTS 5 + + StringInfoData cmd; + int wait_count = 0; + + Assert(wait_attempts_exceeded == NULL || *wait_attempts_exceeded == false); + + ereport(LOG, + errmsg("waiting for remote slot \"%s\" LSN (%X/%X) and catalog xmin" + " (%u) to pass local slot LSN (%X/%X) and catalog xmin (%u)", + remote_slot->name, + LSN_FORMAT_ARGS(remote_slot->restart_lsn), + remote_slot->catalog_xmin, + LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn), + MyReplicationSlot->data.catalog_xmin)); + + initStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT conflict_reason IS NOT NULL, restart_lsn," + " confirmed_flush_lsn, catalog_xmin" + " FROM pg_catalog.pg_replication_slots" + " WHERE slot_name = %s", + quote_literal_cstr(remote_slot->name)); + + for (;;) + { + bool new_invalidated; + XLogRecPtr new_restart_lsn; + XLogRecPtr new_confirmed_lsn; + TransactionId new_catalog_xmin; + WalRcvExecResult *res; + TupleTableSlot *tupslot; + int rc; + bool isnull; + Oid slotRow[WAIT_OUTPUT_COLUMN_COUNT] = {BOOLOID, LSNOID, LSNOID, + XIDOID}; + + /* Handle any termination request if any */ + ProcessSlotSyncInterrupts(wrconn); + + res = walrcv_exec(wrconn, cmd.data, WAIT_OUTPUT_COLUMN_COUNT, slotRow); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + errmsg("could not fetch slot \"%s\" info from the" + " primary server: %s", + remote_slot->name, res->err)); + + tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + if (!tuplestore_gettupleslot(res->tuplestore, true, false, tupslot)) + { + ereport(WARNING, + errmsg("aborting initial sync for slot \"%s\"", + remote_slot->name), + errdetail("This slot was not found on the primary server.")); + pfree(cmd.data); + walrcv_clear_result(res); + + return false; + } + + /* + * It is possible to get null value for restart_lsn if the slot is + * invalidated on the primary server, so handle accordingly. + */ + new_invalidated = DatumGetBool(slot_getattr(tupslot, 1, &isnull)); + Assert(!isnull); + + new_restart_lsn = !slot_attisnull(tupslot, 2) ? + DatumGetLSN(slot_getattr(tupslot, 2, &isnull)) : + InvalidXLogRecPtr; + + if (new_invalidated || XLogRecPtrIsInvalid(new_restart_lsn)) + { + /* + * If the local-slot is in 'RS_EPHEMERAL' state, it will not be + * persisted in the caller and ReplicationSlotRelease() will drop + * it. But if the local slot is already persisted and has 'i' + * sync_state, then it will be marked as invalidated in the caller + * and next time onwards its sync will be skipped. + */ + ereport(WARNING, + errmsg("aborting initial sync for slot \"%s\"", + remote_slot->name), + errdetail("This slot was invalidated on the primary server.")); + pfree(cmd.data); + ExecClearTuple(tupslot); + walrcv_clear_result(res); + + return false; + } + + /* + * It is possible to get null values for confirmed_lsn and + * catalog_xmin if on the primary server the slot is just created with + * a valid restart_lsn and slot-sync worker has fetched the slot + * before the primary server could set valid confirmed_lsn and + * catalog_xmin. + */ + new_confirmed_lsn = !slot_attisnull(tupslot, 3) ? + DatumGetLSN(slot_getattr(tupslot, 3, &isnull)) : + InvalidXLogRecPtr; + + new_catalog_xmin = !slot_attisnull(tupslot, 4) ? + DatumGetTransactionId(slot_getattr(tupslot, 4, &isnull)) : + InvalidTransactionId; + + ExecClearTuple(tupslot); + walrcv_clear_result(res); + + if (new_restart_lsn >= MyReplicationSlot->data.restart_lsn && + !XLogRecPtrIsInvalid(new_confirmed_lsn) && + TransactionIdFollowsOrEquals(new_catalog_xmin, + MyReplicationSlot->data.catalog_xmin)) + { + /* Update new values in remote_slot */ + remote_slot->restart_lsn = new_restart_lsn; + remote_slot->confirmed_lsn = new_confirmed_lsn; + remote_slot->catalog_xmin = new_catalog_xmin; + + ereport(LOG, + errmsg("wait over for remote slot \"%s\" as its LSN (%X/%X)" + " and catalog xmin (%u) has now passed local slot LSN" + " (%X/%X) and catalog xmin (%u)", + remote_slot->name, + LSN_FORMAT_ARGS(new_restart_lsn), + new_catalog_xmin, + LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn), + MyReplicationSlot->data.catalog_xmin)); + pfree(cmd.data); + + return true; + } + + if (++wait_count >= WAIT_PRIMARY_CATCHUP_ATTEMPTS) + { + ereport(LOG, + errmsg("aborting the wait for remote slot \"%s\"", + remote_slot->name)); + pfree(cmd.data); + + if (wait_attempts_exceeded) + *wait_attempts_exceeded = true; + + return false; + } + + /* + * XXX: Is waiting for 2 seconds before retrying enough or more or + * less? + */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 2000L, + WAIT_EVENT_REPL_SLOTSYNC_PRIMARY_CATCHUP); + + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); + } +} + +/* + * Update local slot metadata as per remote_slot's positions + */ +static void +local_slot_update(RemoteSlot *remote_slot) +{ + Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE); + + LogicalConfirmReceivedLocation(remote_slot->confirmed_lsn); + LogicalIncreaseXminForSlot(remote_slot->confirmed_lsn, + remote_slot->catalog_xmin); + LogicalIncreaseRestartDecodingForSlot(remote_slot->confirmed_lsn, + remote_slot->restart_lsn); +} + +/* + * Helper function for slotsync_drop_initiated_slots() and + * drop_obsolete_slots() + * + * Drops synced slot identified by the passed in name. + */ +static void +drop_synced_slots_internal(const char *name, bool nowait) +{ + Assert(MyReplicationSlot == NULL); + + ReplicationSlotAcquire(name, nowait); + + Assert(MyReplicationSlot->data.sync_state != SYNCSLOT_STATE_NONE); + + ReplicationSlotDropAcquired(); +} + +/* + * Drop the slots for which sync is initiated but not yet completed + * i.e. they are still waiting for the primary server to catch up (refer + * to the comment atop the file for details on this wait) + */ +void +slotsync_drop_initiated_slots(void) +{ + List *local_slots = NIL; + ListCell *lc; + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + if (s->in_use && s->data.sync_state == SYNCSLOT_STATE_INITIATED) + local_slots = lappend(local_slots, s); + } + + LWLockRelease(ReplicationSlotControlLock); + + foreach(lc, local_slots) + { + ReplicationSlot *s = (ReplicationSlot *) lfirst(lc); + + drop_synced_slots_internal(NameStr(s->data.name), true); + + ereport(LOG, + errmsg("dropped replication slot \"%s\" of dbid %d", + NameStr(s->data.name), s->data.database), + errdetail("It was not sync-ready.")); + } + + list_free(local_slots); +} + +/* + * Get list of local logical slots which are synchronized from + * the primary server. + */ +static List * +get_local_synced_slots(void) +{ + List *local_slots = NIL; + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + /* Check if it is logical synchronized slot */ + if (s->in_use && SlotIsLogical(s) && + (s->data.sync_state != SYNCSLOT_STATE_NONE)) + { + local_slots = lappend(local_slots, s); + } + } + + LWLockRelease(ReplicationSlotControlLock); + + return local_slots; +} + +/* + * Helper function to check if local_slot is present in remote_slots list. + * + * It also checks if logical slot is locally invalidated i.e. invalidated on + * the standby but valid on the primary server. If found so, it sets + * locally_invalidated to true. + */ +static bool +check_sync_slot_on_remote(ReplicationSlot *local_slot, List *remote_slots, + bool *locally_invalidated) +{ + ListCell *lc; + + foreach(lc, remote_slots) + { + RemoteSlot *remote_slot = (RemoteSlot *) lfirst(lc); + + if (strcmp(remote_slot->name, NameStr(local_slot->data.name)) == 0) + { + /* + * If remote slot is not invalidated but local slot is marked as + * invalidated, then set the bool. + */ + SpinLockAcquire(&local_slot->mutex); + *locally_invalidated = + (remote_slot->invalidated == RS_INVAL_NONE) && + (local_slot->data.invalidated != RS_INVAL_NONE); + SpinLockRelease(&local_slot->mutex); + + return true; + } + } + + return false; +} + +/* + * Drop obsolete slots + * + * Drop the slots that no longer need to be synced i.e. these either do not + * exist on the primary or are no longer enabled for failover. + * + * Additionally, it drops slots that are valid on the primary but got + * invalidated on the standby. This situation may occur due to the following + * reasons: + * - The max_slot_wal_keep_size on the standby is insufficient to retain WAL + * records from the restart_lsn of the slot. + * - primary_slot_name is temporarily reset to null and the physical slot is + * removed. + * - The primary changes wal_level to a level lower than logical. + * + * The assumption is that these dropped slots will get recreated in next + * sync-cycle and it is okay to drop and recreate such slots as long as these + * are not consumable on the standby (which is the case currently). + */ +static void +drop_obsolete_slots(List *remote_slot_list) +{ + List *local_slots = NIL; + ListCell *lc; + + local_slots = get_local_synced_slots(); + + foreach(lc, local_slots) + { + ReplicationSlot *local_slot = (ReplicationSlot *) lfirst(lc); + bool remote_exists = false; + bool locally_invalidated = false; + + remote_exists = check_sync_slot_on_remote(local_slot, remote_slot_list, + &locally_invalidated); + + /* + * Drop the local slot either if it is not in the remote slots list or + * is invalidated while remote slot is still valid. + */ + if (!remote_exists || locally_invalidated) + { + drop_synced_slots_internal(NameStr(local_slot->data.name), true); + + ereport(LOG, + errmsg("dropped replication slot \"%s\" of dbid %d", + NameStr(local_slot->data.name), + local_slot->data.database)); + } + } +} + +/* + * Synchronize single slot to given position. + * + * This creates a new slot if there is no existing one and updates the + * metadata of the slot as per the data received from the primary server. + * + * The 'sync_state' in slot.data is set to SYNCSLOT_STATE_INITIATED + * immediately after creation. It stays in same state until the + * initialization is complete. The initialization is considered to + * be completed once the remote_slot catches up with locally reserved + * position and local slot is updated. The sync_state is then changed + * to SYNCSLOT_STATE_READY. + * + * Returns TRUE if the local slot is updated. + */ +static bool +synchronize_one_slot(WalReceiverConn *wrconn, RemoteSlot *remote_slot) +{ + ReplicationSlot *slot; + bool slot_updated = false; + + /* + * Sanity check: Make sure that concerned WAL is received before syncing + * slot to target lsn received from the primary server. + * + * This check should never pass as on the primary server, we have waited + * for the standby's confirmation before updating the logical slot. + */ + SpinLockAcquire(&WalRcv->mutex); + if (remote_slot->confirmed_lsn > WalRcv->latestWalEnd) + { + SpinLockRelease(&WalRcv->mutex); + elog(ERROR, "exiting from slot synchronization as the received slot sync" + " LSN %X/%X for slot \"%s\" is ahead of the standby position %X/%X", + LSN_FORMAT_ARGS(remote_slot->confirmed_lsn), + remote_slot->name, + LSN_FORMAT_ARGS(WalRcv->latestWalEnd)); + } + SpinLockRelease(&WalRcv->mutex); + + /* Search for the named slot */ + if ((slot = SearchNamedReplicationSlot(remote_slot->name, true))) + { + char sync_state; + + SpinLockAcquire(&slot->mutex); + sync_state = slot->data.sync_state; + SpinLockRelease(&slot->mutex); + + /* User created slot with the same name exists, raise ERROR. */ + if (sync_state == SYNCSLOT_STATE_NONE) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("exiting from slot synchronization on receiving" + " the failover slot \"%s\" from the primary server", + remote_slot->name), + errdetail("A user-created slot with the same name already" + " exists on the standby.")); + + /* + * Slot created by the slot sync worker exists, sync it. + * + * It is important to acquire the slot here before checking + * invalidation. If we don't acquire the slot first, there could be a + * race condition that the local slot could be invalidated just after + * checking the 'invalidated' flag here and we could end up + * overwriting 'invalidated' flag to remote_slot's value. See + * InvalidatePossiblyObsoleteSlot() where it invalidates slot directly + * if the slot is not acquired by other processes. + */ + ReplicationSlotAcquire(remote_slot->name, true); + + Assert(slot == MyReplicationSlot); + + /* + * Copy the invalidation cause from remote only if local slot is not + * invalidated locally, we don't want to overwrite existing one. + */ + if (slot->data.invalidated == RS_INVAL_NONE) + { + SpinLockAcquire(&slot->mutex); + slot->data.invalidated = remote_slot->invalidated; + SpinLockRelease(&slot->mutex); + + /* Make sure the invalidated state persists across server restart */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + slot_updated = true; + } + + /* Skip the sync of an invalidated slot */ + if (slot->data.invalidated != RS_INVAL_NONE) + { + ReplicationSlotRelease(); + return slot_updated; + } + + /* Slot not ready yet, let's attempt to make it sync-ready now. */ + if (sync_state == SYNCSLOT_STATE_INITIATED) + { + /* + * Wait for the primary server to catch-up. Refer to the comment + * atop the file for details on this wait. + */ + if (remote_slot->restart_lsn < slot->data.restart_lsn || + TransactionIdPrecedes(remote_slot->catalog_xmin, + slot->data.catalog_xmin)) + { + if (!wait_for_primary_slot_catchup(wrconn, remote_slot, NULL)) + { + ReplicationSlotRelease(); + return false; + } + } + + /* + * Wait for primary is over, update the lsns and mark the slot as + * READY for further syncs. + */ + local_slot_update(remote_slot); + SpinLockAcquire(&slot->mutex); + slot->data.sync_state = SYNCSLOT_STATE_READY; + SpinLockRelease(&slot->mutex); + + /* Make sure the slot changes persist across server restart */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + slot_updated = true; + + ereport(LOG, + errmsg("newly locally created slot \"%s\" is sync-ready now", + remote_slot->name)); + } + /* Slot ready for sync, so sync it. */ + else if (sync_state == SYNCSLOT_STATE_READY) + { + /* + * Sanity check: With hot_standby_feedback enabled and + * invalidations handled appropriately as above, this should never + * happen. + */ + if (remote_slot->restart_lsn < slot->data.restart_lsn) + elog(ERROR, + "cannot synchronize local slot \"%s\" LSN(%X/%X)" + " to remote slot's LSN(%X/%X) as synchronization" + " would move it backwards", remote_slot->name, + LSN_FORMAT_ARGS(slot->data.restart_lsn), + LSN_FORMAT_ARGS(remote_slot->restart_lsn)); + + if (remote_slot->confirmed_lsn != slot->data.confirmed_flush || + remote_slot->restart_lsn != slot->data.restart_lsn || + remote_slot->catalog_xmin != slot->data.catalog_xmin) + { + /* Update LSN of slot to remote slot's current position */ + local_slot_update(remote_slot); + + /* Make sure the slot changes persist across server restart */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + slot_updated = true; + } + } + } + /* Otherwise create the slot first. */ + else + { + TransactionId xmin_horizon = InvalidTransactionId; + + /* Skip creating the local slot if remote_slot is invalidated already */ + if (remote_slot->invalidated != RS_INVAL_NONE) + return false; + + /* Ensure that we have transaction env needed by get_database_oid() */ + Assert(IsTransactionState()); + + ReplicationSlotCreate(remote_slot->name, true, RS_EPHEMERAL, + remote_slot->two_phase, + remote_slot->failover, + SYNCSLOT_STATE_INITIATED); + + /* For shorter lines. */ + slot = MyReplicationSlot; + + SpinLockAcquire(&slot->mutex); + slot->data.database = get_database_oid(remote_slot->database, false); + namestrcpy(&slot->data.plugin, remote_slot->plugin); + SpinLockRelease(&slot->mutex); + + ReplicationSlotReserveWal(); + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + xmin_horizon = GetOldestSafeDecodingTransactionId(true); + SpinLockAcquire(&slot->mutex); + slot->effective_catalog_xmin = xmin_horizon; + slot->data.catalog_xmin = xmin_horizon; + SpinLockRelease(&slot->mutex); + ReplicationSlotsComputeRequiredXmin(true); + LWLockRelease(ProcArrayLock); + + /* + * Wait for the primary server to catch-up. Refer to the comment atop + * the file for details on this wait. + * + * We also need to wait until remote_slot's confirmed_lsn becomes + * valid. It is possible to get null values for confirmed_lsn and + * catalog_xmin if on the primary server the slot is just created with + * a valid restart_lsn and slot-sync worker has fetched the slot + * before the primary server could set valid confirmed_lsn and + * catalog_xmin. + */ + if (remote_slot->restart_lsn < slot->data.restart_lsn || + XLogRecPtrIsInvalid(remote_slot->confirmed_lsn) || + TransactionIdPrecedes(remote_slot->catalog_xmin, + slot->data.catalog_xmin)) + { + bool wait_attempts_exceeded = false; + + if (!wait_for_primary_slot_catchup(wrconn, remote_slot, &wait_attempts_exceeded)) + { + /* + * The remote slot didn't catch up to locally reserved + * position. + * + * We do not drop the slot because the restart_lsn can be + * ahead of the current location when recreating the slot in + * the next cycle. It may take more time to create such a + * slot. Therefore, we persist it (provided remote-slot is + * still valid i.e wait_attempts_exceeded is true) and attempt + * the wait and synchronization in the next cycle. + */ + if (wait_attempts_exceeded) + { + ReplicationSlotPersist(); + slot_updated = true; + } + + ReplicationSlotRelease(); + return slot_updated; + } + } + + /* + * Wait for primary is either not needed or is over. Update the lsns + * and mark the slot as READY for further syncs. + */ + local_slot_update(remote_slot); + SpinLockAcquire(&slot->mutex); + slot->data.sync_state = SYNCSLOT_STATE_READY; + SpinLockRelease(&slot->mutex); + + /* Mark the slot as PERSISTENT and save the changes to disk */ + ReplicationSlotPersist(); + slot_updated = true; + + ereport(LOG, + errmsg("newly locally created slot \"%s\" is sync-ready now", + remote_slot->name)); + } + + ReplicationSlotRelease(); + + return slot_updated; +} + +/* + * Maps the pg_replication_slots.conflict_reason text value to + * ReplicationSlotInvalidationCause enum value + */ +static ReplicationSlotInvalidationCause +get_slot_invalidation_cause(char *conflict_reason) +{ + Assert(conflict_reason); + + if (strcmp(conflict_reason, SLOT_INVAL_WAL_REMOVED_TEXT) == 0) + return RS_INVAL_WAL_REMOVED; + + if (strcmp(conflict_reason, SLOT_INVAL_HORIZON_TEXT) == 0) + return RS_INVAL_HORIZON; + + if (strcmp(conflict_reason, SLOT_INVAL_WAL_LEVEL_TEXT) == 0) + return RS_INVAL_WAL_LEVEL; + + /* Cannot get here */ + Assert(0); +} + +/* + * Synchronize slots. + * + * Gets the failover logical slots info from the primary server and updates + * the slots locally. Creates the slots if not present on the standby. + * + * Returns TRUE if any of the slots gets updated in this sync-cycle. + */ +static bool +synchronize_slots(WalReceiverConn *wrconn) +{ +#define SLOTSYNC_COLUMN_COUNT 9 + Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID, + LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID}; + + WalRcvExecResult *res; + TupleTableSlot *tupslot; + StringInfoData s; + List *remote_slot_list = NIL; + ListCell *lc; + bool some_slot_updated = false; + + /* WalRcv shared memory not set yet */ + if (!WalRcv) + return false; + + /* + * The primary_slot_name is not set yet or WALs not received yet. + * Synchronization is not possible if the walreceiver is not started. + */ + SpinLockAcquire(&WalRcv->mutex); + if ((WalRcv->slotname[0] == '\0') || + XLogRecPtrIsInvalid(WalRcv->latestWalEnd)) + { + SpinLockRelease(&WalRcv->mutex); + return false; + } + SpinLockRelease(&WalRcv->mutex); + + /* The syscache access in walrcv_exec() needs a transaction env. */ + StartTransactionCommand(); + + initStringInfo(&s); + + /* Construct query to fetch slots with failover enabled. */ + appendStringInfo(&s, + "SELECT slot_name, plugin, confirmed_flush_lsn," + " restart_lsn, catalog_xmin, two_phase, failover," + " database, conflict_reason" + " FROM pg_catalog.pg_replication_slots" + " WHERE failover"); + + /* Execute the query */ + res = walrcv_exec(wrconn, s.data, SLOTSYNC_COLUMN_COUNT, slotRow); + pfree(s.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + errmsg("could not fetch failover logical slots info" + " from the primary server: %s", res->err)); + + + /* Construct the remote_slot tuple and synchronize each slot locally */ + tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot)) + { + bool isnull; + RemoteSlot *remote_slot = palloc0(sizeof(RemoteSlot)); + + remote_slot->name = TextDatumGetCString(slot_getattr(tupslot, 1, &isnull)); + Assert(!isnull); + + remote_slot->plugin = TextDatumGetCString(slot_getattr(tupslot, 2, &isnull)); + Assert(!isnull); + + /* + * It is possible to get null values for LSN and Xmin if slot is + * invalidated on the primary server, so handle accordingly. + */ + remote_slot->confirmed_lsn = !slot_attisnull(tupslot, 3) ? + DatumGetLSN(slot_getattr(tupslot, 3, &isnull)) : + InvalidXLogRecPtr; + + remote_slot->restart_lsn = !slot_attisnull(tupslot, 4) ? + DatumGetLSN(slot_getattr(tupslot, 4, &isnull)) : + InvalidXLogRecPtr; + + remote_slot->catalog_xmin = !slot_attisnull(tupslot, 5) ? + DatumGetTransactionId(slot_getattr(tupslot, 5, &isnull)) : + InvalidTransactionId; + + remote_slot->two_phase = DatumGetBool(slot_getattr(tupslot, 6, &isnull)); + Assert(!isnull); + + remote_slot->failover = DatumGetBool(slot_getattr(tupslot, 7, &isnull)); + Assert(!isnull); + + remote_slot->database = TextDatumGetCString(slot_getattr(tupslot, + 8, &isnull)); + Assert(!isnull); + + remote_slot->invalidated = !slot_attisnull(tupslot, 9) ? + get_slot_invalidation_cause(TextDatumGetCString(slot_getattr(tupslot, 9, &isnull))) : + RS_INVAL_NONE; + + /* Create list of remote slots */ + remote_slot_list = lappend(remote_slot_list, remote_slot); + + ExecClearTuple(tupslot); + } + + /* Drop local slots that no longer need to be synced. */ + drop_obsolete_slots(remote_slot_list); + + /* Now sync the slots locally */ + foreach(lc, remote_slot_list) + { + RemoteSlot *remote_slot = (RemoteSlot *) lfirst(lc); + + some_slot_updated |= synchronize_one_slot(wrconn, remote_slot); + } + + /* We are done, free remote_slot_list elements */ + list_free_deep(remote_slot_list); + + walrcv_clear_result(res); + + CommitTransactionCommand(); + + return some_slot_updated; +} + +/* + * Checks the primary server info. + * + * Using the specified primary server connection, check whether we are a + * cascading standby. It also validates primary_slot_name for non-cascading + * standbys. + */ +static void +check_primary_info(WalReceiverConn *wrconn, bool *am_cascading_standby) +{ +#define PRIMARY_INFO_OUTPUT_COL_COUNT 2 + WalRcvExecResult *res; + Oid slotRow[PRIMARY_INFO_OUTPUT_COL_COUNT] = {BOOLOID, BOOLOID}; + StringInfoData cmd; + bool isnull; + TupleTableSlot *tupslot; + bool valid; + bool remote_in_recovery; + bool tuple_ok PG_USED_FOR_ASSERTS_ONLY; + + /* The syscache access in walrcv_exec() needs a transaction env. */ + StartTransactionCommand(); + + Assert(am_cascading_standby != NULL); + + *am_cascading_standby = false; /* overwritten later if cascading */ + + initStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT pg_is_in_recovery(), count(*) = 1" + " FROM pg_replication_slots" + " WHERE slot_type='physical' AND slot_name=%s", + quote_literal_cstr(PrimarySlotName)); + + res = walrcv_exec(wrconn, cmd.data, PRIMARY_INFO_OUTPUT_COL_COUNT, slotRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + errmsg("could not fetch primary_slot_name \"%s\" info from the" + " primary server: %s", PrimarySlotName, res->err)); + + tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + tuple_ok = tuplestore_gettupleslot(res->tuplestore, true, false, tupslot); + Assert(tuple_ok); /* It must return one tuple */ + + remote_in_recovery = DatumGetBool(slot_getattr(tupslot, 1, &isnull)); + Assert(!isnull); + + if (remote_in_recovery) + { + /* No need to check further, return that we are cascading standby */ + *am_cascading_standby = true; + } + else + { + /* We are a normal standby. */ + valid = DatumGetBool(slot_getattr(tupslot, 2, &isnull)); + Assert(!isnull); + + if (!valid) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("exiting from slot synchronization due to bad configuration"), + /* translator: second %s is a GUC variable name */ + errdetail("The primary server slot \"%s\" specified by %s is not valid.", + PrimarySlotName, "primary_slot_name")); + } + + ExecClearTuple(tupslot); + walrcv_clear_result(res); + CommitTransactionCommand(); +} + +/* + * Check that all necessary GUCs for slot synchronization are set + * appropriately. If not, raise an ERROR. + */ +static void +validate_slotsync_parameters(char **dbname) +{ + /* Sanity check. */ + Assert(enable_syncslot); + + /* + * A physical replication slot(primary_slot_name) is required on the + * primary to ensure that the rows needed by the standby are not removed + * after restarting, so that the synchronized slot on the standby will not + * be invalidated. + */ + if (PrimarySlotName == NULL || strcmp(PrimarySlotName, "") == 0) + ereport(ERROR, + /* translator: %s is a GUC variable name */ + errmsg("exiting from slot synchronization due to bad configuration"), + errhint("%s must be defined.", "primary_slot_name")); + + /* + * Hot_standby_feedback must be enabled to cooperate with the physical + * replication slot, which allows informing the primary about the xmin and + * catalog_xmin values on the standby. + */ + if (!hot_standby_feedback) + ereport(ERROR, + /* translator: %s is a GUC variable name */ + errmsg("exiting from slot synchronization due to bad configuration"), + errhint("%s must be enabled.", "hot_standby_feedback")); + + /* + * Logical decoding requires wal_level >= logical and we currently only + * synchronize logical slots. + */ + if (wal_level < WAL_LEVEL_LOGICAL) + ereport(ERROR, + /* translator: %s is a GUC variable name */ + errmsg("exiting from slot synchronization due to bad configuration"), + errhint("wal_level must be >= logical.")); + + /* + * The primary_conninfo is required to make connection to primary for + * getting slots information. + */ + if (PrimaryConnInfo == NULL || strcmp(PrimaryConnInfo, "") == 0) + ereport(ERROR, + /* translator: %s is a GUC variable name */ + errmsg("exiting from slot synchronization due to bad configuration"), + errhint("%s must be defined.", "primary_conninfo")); + + /* + * The slot sync worker needs a database connection for walrcv_exec to + * work. + */ + *dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo); + if (*dbname == NULL) + ereport(ERROR, + + /* + * translator: 'dbname' is a specific option; %s is a GUC variable + * name + */ + errmsg("exiting from slot synchronization due to bad configuration"), + errhint("'dbname' must be specified in %s.", "primary_conninfo")); +} + +/* + * Re-read the config file. + * + * If any of the slot sync GUCs have changed, exit the worker and + * let it get restarted by the postmaster. + */ +static void +slotsync_reread_config(WalReceiverConn *wrconn) +{ + char *old_primary_conninfo = pstrdup(PrimaryConnInfo); + char *old_primary_slotname = pstrdup(PrimarySlotName); + bool old_hot_standby_feedback = hot_standby_feedback; + bool conninfo_changed; + bool primary_slotname_changed; + + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + + conninfo_changed = strcmp(old_primary_conninfo, PrimaryConnInfo) != 0; + primary_slotname_changed = strcmp(old_primary_slotname, PrimarySlotName) != 0; + + if (conninfo_changed || + primary_slotname_changed || + (old_hot_standby_feedback != hot_standby_feedback)) + { + ereport(LOG, + errmsg("slot sync worker will restart because of" + " a parameter change")); + /* The exit code 1 will make postmaster restart this worker */ + proc_exit(1); + } + + pfree(old_primary_conninfo); + pfree(old_primary_slotname); +} + +/* + * Interrupt handler for main loop of slot sync worker. + */ +static void +ProcessSlotSyncInterrupts(WalReceiverConn *wrconn) +{ + CHECK_FOR_INTERRUPTS(); + + if (ShutdownRequestPending) + { + walrcv_disconnect(wrconn); + ereport(LOG, + errmsg("replication slot sync worker is shutting down" + " on receiving SIGINT")); + proc_exit(0); + } + + if (ConfigReloadPending) + slotsync_reread_config(wrconn); +} + +/* + * Cleanup function for logical replication launcher. + * + * Called on logical replication launcher exit. + */ +static void +slotsync_worker_onexit(int code, Datum arg) +{ + SpinLockAcquire(&SlotSyncWorker->mutex); + SlotSyncWorker->pid = InvalidPid; + SpinLockRelease(&SlotSyncWorker->mutex); +} + +/* + * The main loop of our worker process. + * + * It connects to the primary server, fetches logical failover slots + * information periodically in order to create and sync the slots. + */ +void +ReplSlotSyncWorkerMain(Datum main_arg) +{ + WalReceiverConn *wrconn = NULL; + char *dbname; + bool am_cascading_standby; + char *err; + + ereport(LOG, errmsg("replication slot sync worker started")); + + on_shmem_exit(slotsync_worker_onexit, (Datum) 0); + + SpinLockAcquire(&SlotSyncWorker->mutex); + + Assert(SlotSyncWorker->pid == InvalidPid); + + /* Advertise our PID so that the startup process can kill us on promotion */ + SlotSyncWorker->pid = MyProcPid; + + SpinLockRelease(&SlotSyncWorker->mutex); + + /* Setup signal handling */ + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGINT, SignalHandlerForShutdownRequest); + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* Load the libpq-specific functions */ + load_file("libpqwalreceiver", false); + + validate_slotsync_parameters(&dbname); + + /* + * Connect to the database specified by user in primary_conninfo. We need + * a database connection for walrcv_exec to work. Please see comments atop + * libpqrcv_exec. + */ + BackgroundWorkerInitializeConnection(dbname, NULL, 0); + + /* + * Establish the connection to the primary server for slots + * synchronization. + */ + wrconn = walrcv_connect(PrimaryConnInfo, true, false, + cluster_name[0] ? cluster_name : "slotsyncworker", + &err); + if (wrconn == NULL) + ereport(ERROR, + errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the primary server: %s", err)); + + /* + * Using the specified primary server connection, check whether we are + * cascading standby and validates primary_slot_name for + * non-cascading-standbys. + */ + check_primary_info(wrconn, &am_cascading_standby); + + /* Main wait loop. */ + for (;;) + { + int rc; + long naptime = WORKER_DEFAULT_NAPTIME_MS; + TimestampTz now; + bool some_slot_updated; + + ProcessSlotSyncInterrupts(wrconn); + + if (am_cascading_standby) + { + /* + * Slot synchronization is currently not supported on cascading + * standby. So if we are on the cascading standby, skip the sync + * and take a longer nap before we check again whether we are + * still cascading standby or not. + */ + naptime = 6 * WORKER_INACTIVITY_NAPTIME_MS; /* 60 sec */ + } + else + { + some_slot_updated = synchronize_slots(wrconn); + + /* + * If any of the slots get updated in this sync-cycle, use default + * naptime and update 'last_update_time'. But if no activity is + * observed in this sync-cycle, then increase naptime provided + * inactivity time reaches threshold. + */ + now = GetCurrentTimestamp(); + if (some_slot_updated) + last_update_time = now; + else if (TimestampDifferenceExceeds(last_update_time, + now, WORKER_INACTIVITY_THRESHOLD_MS)) + naptime = WORKER_INACTIVITY_NAPTIME_MS; + } + + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + naptime, + WAIT_EVENT_REPL_SLOTSYNC_MAIN); + + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); + + /* + * If the standby was promoted then what was previously a cascading + * standby might no longer be one, so recheck each time. + */ + if (am_cascading_standby) + check_primary_info(wrconn, &am_cascading_standby); + } + + /* + * The slot sync worker can not get here because it will only stop when it + * receives a SIGINT from the logical replication launcher, or when there + * is an error. + */ + Assert(false); +} + +/* + * Is current process the slot sync worker? + */ +bool +IsLogicalSlotSyncWorker(void) +{ + return SlotSyncWorker->pid == MyProcPid; +} + +/* + * Shut down the slot sync worker. + */ +void +ShutDownSlotSync(void) +{ + SpinLockAcquire(&SlotSyncWorker->mutex); + if (SlotSyncWorker->pid == InvalidPid) + { + SpinLockRelease(&SlotSyncWorker->mutex); + return; + } + + kill(SlotSyncWorker->pid, SIGINT); + + SpinLockRelease(&SlotSyncWorker->mutex); + + /* Wait for it to die. */ + for (;;) + { + int rc; + + /* Wait a bit, we don't expect to have to wait long. */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 10L, WAIT_EVENT_BGWORKER_SHUTDOWN); + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + SpinLockAcquire(&SlotSyncWorker->mutex); + + /* Is it gone? */ + if (SlotSyncWorker->pid == InvalidPid) + break; + + SpinLockRelease(&SlotSyncWorker->mutex); + } + + SpinLockRelease(&SlotSyncWorker->mutex); +} + +/* + * Allocate and initialize slot sync worker shared memory + */ +void +SlotSyncWorkerShmemInit(void) +{ + Size size; + bool found; + + size = sizeof(SlotSyncWorkerCtxStruct); + size = MAXALIGN(size); + + SlotSyncWorker = (SlotSyncWorkerCtxStruct *) + ShmemInitStruct("Slot Sync Worker Data", size, &found); + + if (!found) + { + memset(SlotSyncWorker, 0, size); + SlotSyncWorker->pid = InvalidPid; + SpinLockInit(&SlotSyncWorker->mutex); + } +} + +/* + * Register the background worker for slots synchronization provided + * enable_syncslot is ON. + */ +void +SlotSyncWorkerRegister(void) +{ + BackgroundWorker bgw; + + if (!enable_syncslot) + { + ereport(LOG, + errmsg("skipping slot synchronization"), + errdetail("enable_syncslot is disabled.")); + return; + } + + memset(&bgw, 0, sizeof(bgw)); + + /* We need database connection which needs shared-memory access as well. */ + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + + /* Start as soon as a consistent state has been reached in a hot standby */ + bgw.bgw_start_time = BgWorkerStart_ConsistentState_HotStandby; + + snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ReplSlotSyncWorkerMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "replication slot sync worker"); + snprintf(bgw.bgw_type, BGW_MAXLEN, + "slot sync worker"); + + bgw.bgw_restart_time = BGW_DEFAULT_RESTART_INTERVAL; + bgw.bgw_notify_pid = 0; + bgw.bgw_main_arg = (Datum) 0; + + RegisterBackgroundWorker(&bgw); +} diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 3fe1f9953b..287412e1a6 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -141,7 +141,20 @@ * subscribe to the new primary without losing any data. * * However, we do not enable failover for slots created by the table sync - * worker. + * worker. This is because the table sync slot might not be fully synced on the + * standby due to the following reasons: + * + * - The standby needs to wait for the primary server to catch up because the + * local restart_lsn of the newly created slot on the standby is set using + * the latest redo position (GetXLogReplayRecPtr()), which is typically ahead + * of the primary's restart_lsn. + * - The table sync slot's restart_lsn won't be advanced until the state + * becomes SUBREL_STATE_CATCHUP. + * + * Therefore, if a failover happens before the restart_lsn advances, the table + * sync slot will not be synced to the standby. Consequently, we will not be + * able to subscribe to the promoted standby due to the absence of the + * necessary table sync slot. * * Additionally, failover is not enabled for the main slot if the table sync is * in progress. This is because if a failover occurs while the table sync diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 696376400e..9d27593979 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -47,6 +47,7 @@ #include "miscadmin.h" #include "pgstat.h" #include "replication/slot.h" +#include "replication/walsender.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/proc.h" @@ -103,7 +104,6 @@ int max_replication_slots = 10; /* the maximum number of replication * slots */ static void ReplicationSlotShmemExit(int code, Datum arg); -static void ReplicationSlotDropAcquired(void); static void ReplicationSlotDropPtr(ReplicationSlot *slot); /* internal persistency functions */ @@ -250,16 +250,22 @@ ReplicationSlotValidateName(const char *name, int elevel) * user will only get commit prepared. * failover: If enabled, allows the slot to be synced to physical standbys so * that logical replication can be resumed after failover. + * sync_state: Defines slot synchronization state. This function is expected + * to receive either SYNCSLOT_STATE_NONE for the user created slots or + * SYNCSLOT_STATE_INITIATED for the slots being synchronized on the physical + * standby. */ void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase, bool failover) + bool two_phase, bool failover, char sync_state) { ReplicationSlot *slot = NULL; int i; Assert(MyReplicationSlot == NULL); + Assert(sync_state == SYNCSLOT_STATE_NONE || + sync_state == SYNCSLOT_STATE_INITIATED); ReplicationSlotValidateName(name, ERROR); @@ -315,6 +321,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->data.two_phase = two_phase; slot->data.two_phase_at = InvalidXLogRecPtr; slot->data.failover = failover; + slot->data.sync_state = sync_state; /* and then data only present in shared memory */ slot->just_dirtied = false; @@ -680,6 +687,17 @@ ReplicationSlotDrop(const char *name, bool nowait) ReplicationSlotAcquire(name, nowait); + /* + * Do not allow users to drop the slots which are currently being synced + * from the primary to the standby. + */ + if (RecoveryInProgress() && + MyReplicationSlot->data.sync_state != SYNCSLOT_STATE_NONE) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot drop replication slot \"%s\"", name), + errdetail("This slot is being synced from the primary server.")); + ReplicationSlotDropAcquired(); } @@ -699,6 +717,17 @@ ReplicationSlotAlter(const char *name, bool failover) errmsg("cannot use %s with a physical replication slot", "ALTER_REPLICATION_SLOT")); + /* + * Do not allow users to alter the slots which are currently being synced + * from the primary to the standby. + */ + if (RecoveryInProgress() && + MyReplicationSlot->data.sync_state != SYNCSLOT_STATE_NONE) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot alter replication slot \"%s\"", name), + errdetail("This slot is being synced from the primary server.")); + SpinLockAcquire(&MyReplicationSlot->mutex); MyReplicationSlot->data.failover = failover; SpinLockRelease(&MyReplicationSlot->mutex); @@ -711,7 +740,7 @@ ReplicationSlotAlter(const char *name, bool failover) /* * Permanently drop the currently acquired replication slot. */ -static void +void ReplicationSlotDropAcquired(void) { ReplicationSlot *slot = MyReplicationSlot; diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index eb685089b3..b59d6e62fd 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -43,7 +43,7 @@ create_physical_replication_slot(char *name, bool immediately_reserve, /* acquire replication slot, this will check for conflicting names */ ReplicationSlotCreate(name, false, temporary ? RS_TEMPORARY : RS_PERSISTENT, false, - false); + false, SYNCSLOT_STATE_NONE); if (immediately_reserve) { @@ -136,7 +136,7 @@ create_logical_replication_slot(char *name, char *plugin, */ ReplicationSlotCreate(name, true, temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase, - failover); + failover, SYNCSLOT_STATE_NONE); /* * Create logical decoding context to find start point or, if we don't @@ -237,7 +237,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 16 +#define PG_GET_REPLICATION_SLOTS_COLS 17 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; XLogRecPtr currlsn; int slotno; @@ -418,21 +418,36 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) break; case RS_INVAL_WAL_REMOVED: - values[i++] = CStringGetTextDatum("wal_removed"); + values[i++] = CStringGetTextDatum(SLOT_INVAL_WAL_REMOVED_TEXT); break; case RS_INVAL_HORIZON: - values[i++] = CStringGetTextDatum("rows_removed"); + values[i++] = CStringGetTextDatum(SLOT_INVAL_HORIZON_TEXT); break; case RS_INVAL_WAL_LEVEL: - values[i++] = CStringGetTextDatum("wal_level_insufficient"); + values[i++] = CStringGetTextDatum(SLOT_INVAL_WAL_LEVEL_TEXT); break; } } values[i++] = BoolGetDatum(slot_contents.data.failover); + switch (slot_contents.data.sync_state) + { + case SYNCSLOT_STATE_NONE: + values[i++] = CStringGetTextDatum("none"); + break; + + case SYNCSLOT_STATE_INITIATED: + values[i++] = CStringGetTextDatum("initiated"); + break; + + case SYNCSLOT_STATE_READY: + values[i++] = CStringGetTextDatum("ready"); + break; + } + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 58165ea614..9d8710f904 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1224,7 +1224,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) { ReplicationSlotCreate(cmd->slotname, false, cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT, - false, false); + false, false, SYNCSLOT_STATE_NONE); if (reserve_wal) { @@ -1255,7 +1255,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) */ ReplicationSlotCreate(cmd->slotname, true, cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, - two_phase, failover); + two_phase, failover, SYNCSLOT_STATE_NONE); /* * Do options check early so that we can bail before calling the diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index e5119ed55d..04fed1007e 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -38,6 +38,7 @@ #include "replication/slot.h" #include "replication/walreceiver.h" #include "replication/walsender.h" +#include "replication/worker_internal.h" #include "storage/bufmgr.h" #include "storage/dsm.h" #include "storage/ipc.h" @@ -342,6 +343,7 @@ CreateOrAttachShmemStructs(void) WalSummarizerShmemInit(); PgArchShmemInit(); ApplyLauncherShmemInit(); + SlotSyncWorkerShmemInit(); /* * Set up other modules that need some shared memory space diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 1eaaf3c6c5..19b08c1b5f 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -3286,6 +3286,17 @@ ProcessInterrupts(void) */ proc_exit(1); } + else if (IsLogicalSlotSyncWorker()) + { + elog(DEBUG1, + "replication slot sync worker is shutting down due to administrator command"); + + /* + * Slot sync worker can be stopped at any time. Use exit status 1 + * so the background worker is restarted. + */ + proc_exit(1); + } else if (IsBackgroundWorker) ereport(FATAL, (errcode(ERRCODE_ADMIN_SHUTDOWN), diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 088eb977d4..aee5fe7315 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -53,6 +53,8 @@ LOGICAL_APPLY_MAIN "Waiting in main loop of logical replication apply process." LOGICAL_LAUNCHER_MAIN "Waiting in main loop of logical replication launcher process." LOGICAL_PARALLEL_APPLY_MAIN "Waiting in main loop of logical replication parallel apply process." RECOVERY_WAL_STREAM "Waiting in main loop of startup process for WAL to arrive, during streaming recovery." +REPL_SLOTSYNC_MAIN "Waiting in main loop of slot sync worker." +REPL_SLOTSYNC_PRIMARY_CATCHUP "Waiting for the primary to catch-up, in slot sync worker." SYSLOGGER_MAIN "Waiting in main loop of syslogger process." WAL_RECEIVER_MAIN "Waiting in main loop of WAL receiver process." WAL_SENDER_MAIN "Waiting in main loop of WAL sender process." diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index e53ebc6dc2..0f5ec63de1 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -68,6 +68,7 @@ #include "replication/logicallauncher.h" #include "replication/slot.h" #include "replication/syncrep.h" +#include "replication/worker_internal.h" #include "storage/bufmgr.h" #include "storage/large_object.h" #include "storage/pg_shmem.h" @@ -2044,6 +2045,15 @@ struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"enable_syncslot", PGC_POSTMASTER, REPLICATION_STANDBY, + gettext_noop("Enables a physical standby to synchronize logical failover slots from the primary server."), + }, + &enable_syncslot, + false, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index b2809c711a..136be912e6 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -361,6 +361,7 @@ #wal_retrieve_retry_interval = 5s # time to wait before retrying to # retrieve WAL after a failed attempt #recovery_min_apply_delay = 0 # minimum delay for applying changes during recovery +#enable_syncslot = off # enables slot synchronization on the physical standby from the primary # - Subscribers - diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index f40726c4f7..d7c9fe31f8 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11115,9 +11115,9 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text,bool}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason,failover}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text,bool,text}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason,failover,sync_state}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', diff --git a/src/include/postmaster/bgworker.h b/src/include/postmaster/bgworker.h index 22fc49ec27..7092fc72c6 100644 --- a/src/include/postmaster/bgworker.h +++ b/src/include/postmaster/bgworker.h @@ -79,6 +79,7 @@ typedef enum BgWorkerStart_PostmasterStart, BgWorkerStart_ConsistentState, BgWorkerStart_RecoveryFinished, + BgWorkerStart_ConsistentState_HotStandby, } BgWorkerStartTime; #define BGW_DEFAULT_RESTART_INTERVAL 60 diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index a18d79d1b2..bbe04226db 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -22,6 +22,7 @@ extern void TablesyncWorkerMain(Datum main_arg); extern bool IsLogicalWorker(void); extern bool IsLogicalParallelApplyWorker(void); +extern bool IsLogicalSlotSyncWorker(void); extern void HandleParallelApplyMessageInterrupt(void); extern void HandleParallelApplyMessages(void); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 585ccbb504..b310b809c4 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -52,6 +52,22 @@ typedef enum ReplicationSlotInvalidationCause RS_INVAL_WAL_LEVEL, } ReplicationSlotInvalidationCause; +/* + * The possible values for 'conflict_reason' returned in + * pg_get_replication_slots. + */ +#define SLOT_INVAL_WAL_REMOVED_TEXT "wal_removed" +#define SLOT_INVAL_HORIZON_TEXT "rows_removed" +#define SLOT_INVAL_WAL_LEVEL_TEXT "wal_level_insufficient" + +/* The possible values for 'sync_state' in ReplicationSlotPersistentData */ +#define SYNCSLOT_STATE_NONE 'n' /* None for user created slots */ +#define SYNCSLOT_STATE_INITIATED 'i' /* Sync initiated for the slot but + * not completed yet, waiting for + * the primary server to catch-up */ +#define SYNCSLOT_STATE_READY 'r' /* Initialization complete, ready + * to be synced further */ + /* * On-Disk data of a replication slot, preserved across restarts. */ @@ -112,6 +128,15 @@ typedef struct ReplicationSlotPersistentData /* plugin name */ NameData plugin; + /* + * Synchronization state for a logical slot. + * + * The standby can have any value among the possible values of 'i','r' and + * 'n'. For primary, the default is 'n' for all slots but may also be 'r' + * if leftover from a promoted standby. + */ + char sync_state; + /* * Is this a failover slot (sync candidate for physical standbys)? Only * relevant for logical slots on the primary server. @@ -224,9 +249,11 @@ extern void ReplicationSlotsShmemInit(void); /* management of individual slots */ extern void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase, bool failover); + bool two_phase, bool failover, + char sync_state); extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); +extern void ReplicationSlotDropAcquired(void); extern void ReplicationSlotAlter(const char *name, bool failover); extern void ReplicationSlotAcquire(const char *name, bool nowait); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index f566a99ba1..9f78fd1e5a 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -279,6 +279,21 @@ typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn, typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn, TimeLineID *primary_tli); +/* + * walrcv_get_dbinfo_for_failover_slots_fn + * + * Run LIST_DBID_FOR_FAILOVER_SLOTS on primary server to get the + * list of unique DBIDs for failover logical slots + */ +typedef List *(*walrcv_get_dbinfo_for_failover_slots_fn) (WalReceiverConn *conn); + +/* + * walrcv_get_dbname_from_conninfo_fn + * + * Returns the dbid from the primary_conninfo + */ +typedef char *(*walrcv_get_dbname_from_conninfo_fn) (const char *conninfo); + /* * walrcv_server_version_fn * @@ -403,6 +418,7 @@ typedef struct WalReceiverFunctionsType walrcv_get_conninfo_fn walrcv_get_conninfo; walrcv_get_senderinfo_fn walrcv_get_senderinfo; walrcv_identify_system_fn walrcv_identify_system; + walrcv_get_dbname_from_conninfo_fn walrcv_get_dbname_from_conninfo; walrcv_server_version_fn walrcv_server_version; walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile; walrcv_startstreaming_fn walrcv_startstreaming; @@ -428,6 +444,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port) #define walrcv_identify_system(conn, primary_tli) \ WalReceiverFunctions->walrcv_identify_system(conn, primary_tli) +#define walrcv_get_dbname_from_conninfo(conninfo) \ + WalReceiverFunctions->walrcv_get_dbname_from_conninfo(conninfo) #define walrcv_server_version(conn) \ WalReceiverFunctions->walrcv_server_version(conn) #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 4e1f6e7df9..e08e48debb 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -237,6 +237,11 @@ extern PGDLLIMPORT bool in_remote_transaction; extern PGDLLIMPORT bool InitializingApplyWorker; +/* Slot sync worker objects */ +extern PGDLLIMPORT char *PrimaryConnInfo; +extern PGDLLIMPORT char *PrimarySlotName; +extern PGDLLIMPORT bool enable_syncslot; + extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); @@ -326,6 +331,12 @@ extern void pa_decr_and_wait_stream_block(void); extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); +extern void ReplSlotSyncWorkerMain(Datum main_arg); +extern void SlotSyncWorkerRegister(void); +extern void ShutDownSlotSync(void); +extern void slotsync_drop_initiated_slots(void); +extern void SlotSyncWorkerShmemInit(void); + #define isParallelApplyWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_PARALLEL_APPLY) #define isTablesyncWorker(worker) ((worker)->in_use && \ diff --git a/src/test/recovery/t/050_standby_failover_slots_sync.pl b/src/test/recovery/t/050_standby_failover_slots_sync.pl index 796bf0a4af..c55a285ca3 100644 --- a/src/test/recovery/t/050_standby_failover_slots_sync.pl +++ b/src/test/recovery/t/050_standby_failover_slots_sync.pl @@ -97,6 +97,189 @@ ok( $publisher->poll_query_until( "SELECT failover from pg_replication_slots WHERE slot_name = 'lsub1_slot';"), 'logical slot has failover true on the publisher'); -$subscriber1->safe_psql('postgres', "DROP SUBSCRIPTION regress_mysub1"); +################################################## +# Test logical failover slots on the standby +# Configure standby1 to replicate and synchronize logical slots configured +# for failover on the primary +# +# failover slot lsub1_slot->| ----> subscriber1 (connected via logical replication) +# primary ---> | +# physical slot sb1_slot--->| ----> standby1 (connected via streaming replication) +# | lsub1_slot(synced_slot) +################################################## + +my $primary = $publisher; +my $backup_name = 'backup'; +$primary->backup($backup_name); + +# Create a standby +my $standby1 = PostgreSQL::Test::Cluster->new('standby1'); +$standby1->init_from_backup( + $primary, $backup_name, + has_streaming => 1, + has_restoring => 1); + +my $connstr_1 = $primary->connstr; +$standby1->append_conf( + 'postgresql.conf', qq( +enable_syncslot = true +hot_standby_feedback = on +primary_slot_name = 'sb1_slot' +primary_conninfo = '$connstr_1 dbname=postgres' +)); + +$primary->psql('postgres', + q{SELECT pg_create_physical_replication_slot('sb1_slot');}); + +my $standby1_conninfo = $standby1->connstr . ' dbname=postgres'; + +# Wait for the standby to start sync +my $offset = -s $standby1->logfile; +$standby1->start; +$standby1->wait_for_log( + qr/LOG: ( [A-Z0-9]+:)? waiting for remote slot \"lsub1_slot\"/, + $offset); + +# Advance lsn on the primary +$primary->safe_psql('postgres', + "SELECT pg_log_standby_snapshot(); + SELECT pg_log_standby_snapshot(); + SELECT pg_log_standby_snapshot();"); + +# Wait for the standby to finish sync +$offset = -s $standby1->logfile; +$standby1->wait_for_log( + qr/LOG: ( [A-Z0-9]+:)? wait over for remote slot \"lsub1_slot\"/, + $offset); + +# Confirm that logical failover slot is created on the standby and is sync +# ready. +is($standby1->safe_psql('postgres', + q{SELECT failover, sync_state FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';}), + "t|ready", + 'logical slot has failover as true and sync_state as ready on standby'); + +################################################## +# Test to confirm that restart_lsn and confirmed_flush_lsn of the logical slot +# on the primary is synced to the standby +################################################## + +# Insert data on the primary +$primary->safe_psql( + 'postgres', qq[ + TRUNCATE TABLE tab_int; + INSERT INTO tab_int SELECT generate_series(1, 10); +]); + +$primary->wait_for_catchup('regress_mysub1'); + +# Do not allow any further advancement of the restart_lsn and +# confirmed_flush_lsn for the lsub1_slot. +$subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 DISABLE"); + +# Wait for the replication slot to become inactive on the publisher +$primary->poll_query_until( + 'postgres', + "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active='f'", + 1); + +# Get the restart_lsn for the logical slot lsub1_slot on the primary +my $primary_restart_lsn = $primary->safe_psql('postgres', + "SELECT restart_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';"); + +# Get the confirmed_flush_lsn for the logical slot lsub1_slot on the primary +my $primary_flush_lsn = $primary->safe_psql('postgres', + "SELECT confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';"); + +# Confirm that restart_lsn and of confirmed_flush_lsn lsub1_slot slot are synced +# to the standby +ok( $standby1->poll_query_until( + 'postgres', + "SELECT '$primary_restart_lsn' = restart_lsn AND '$primary_flush_lsn' = confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';"), + 'restart_lsn and confirmed_flush_lsn of slot lsub1_slot synced to standby'); + +################################################## +# Test that a synchronized slot can not be decoded, altered or dropped by the user +################################################## + +# Disable hot_standby_feedback temporarily to stop slot sync worker otherwise +# the concerned testing scenarios here may be interrupted by different error: +# 'ERROR: replication slot is active for PID ..' + +$standby1->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = off;'); +$standby1->restart; + +# Attempting to perform logical decoding on a synced slot should result in an error +my ($result, $stdout, $stderr) = $standby1->psql('postgres', + "select * from pg_logical_slot_get_changes('lsub1_slot',NULL,NULL);"); +ok($stderr =~ /ERROR: cannot use replication slot "lsub1_slot" for logical decoding/, + "logical decoding is not allowed on synced slot"); + +# Attempting to alter a synced slot should result in an error +($result, $stdout, $stderr) = $standby1->psql( + 'postgres', + qq[ALTER_REPLICATION_SLOT lsub1_slot (failover);], + replication => 'database'); +ok($stderr =~ /ERROR: cannot alter replication slot "lsub1_slot"/, + "synced slot on standby cannot be altered"); + +# Attempting to drop a synced slot should result in an error +($result, $stdout, $stderr) = $standby1->psql('postgres', + "SELECT pg_drop_replication_slot('lsub1_slot');"); +ok($stderr =~ /ERROR: cannot drop replication slot "lsub1_slot"/, + "synced slot on standby cannot be dropped"); + +# Enable hot_standby_feedback and restart standby +$standby1->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = on;'); +$standby1->restart; + +################################################## +# Create another slot which stays in sync_state as 'initiated' +# because it's a manually created slot and its lsn is not advanced. +################################################## + +# Create a logical slot with failover = true +$primary->psql('postgres', + q{SELECT pg_create_logical_replication_slot('logical_slot','pgoutput', false, true, true);}); + +# Wait for the standby to start sync +$offset = -s $standby1->logfile; +$standby1->wait_for_log( + qr/LOG: ( [A-Z0-9]+:)? waiting for remote slot \"logical_slot\"/, + $offset); + +# Confirm that the logical slot is created on the standby and is in sync initiated state + is($standby1->safe_psql('postgres', + q{SELECT failover, sync_state FROM pg_replication_slots WHERE slot_name = 'logical_slot';}), + "t|initiated", + 'logical slot has failover as true and sync_state as initiated on standby'); + +################################################## +# Promote the standby1 to primary. Confirm that: +# a) the 'ready' slot 'lsub1_slot' is retained on the new primary +# b) the 'initiated' slot 'logical_slot' is dropped on promotion +# c) logical replication for regress_mysub1 is resumed successfully after failover +################################################## +$standby1->promote; + +# Update subscription with the new primary's connection info +$subscriber1->safe_psql('postgres', + "ALTER SUBSCRIPTION regress_mysub1 CONNECTION '$standby1_conninfo'; + ALTER SUBSCRIPTION regress_mysub1 ENABLE; "); + +is($standby1->safe_psql('postgres', + q{SELECT slot_name FROM pg_replication_slots WHERE slot_name in ('logical_slot','lsub1_slot');}), + 'lsub1_slot', + 'synced slot retained on the new primary'); + +# Insert data on the new primary +$standby1->safe_psql('postgres', + "INSERT INTO tab_int SELECT generate_series(11, 20);"); +$standby1->wait_for_catchup('regress_mysub1'); + +# Confirm that data in tab_int replicated on subscriber +is( $subscriber1->safe_psql('postgres', q{SELECT count(*) FROM tab_int;}), + "20", + 'data replicated from the new primary'); done_testing(); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index acc2339b49..f5ccbf9ccb 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1474,8 +1474,9 @@ pg_replication_slots| SELECT l.slot_name, l.safe_wal_size, l.two_phase, l.conflict_reason, - l.failover - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_reason, failover) + l.failover, + l.sync_state + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_reason, failover, sync_state) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out index 271313ebf8..aac83755de 100644 --- a/src/test/regress/expected/sysviews.out +++ b/src/test/regress/expected/sysviews.out @@ -132,8 +132,9 @@ select name, setting from pg_settings where name like 'enable%'; enable_self_join_removal | on enable_seqscan | on enable_sort | on + enable_syncslot | off enable_tidscan | on -(22 rows) +(23 rows) -- There are always wait event descriptions for various types. select type, count(*) > 0 as ok FROM pg_wait_events diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 9b67986914..b83f2143cd 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2321,6 +2321,7 @@ RelocationBufferInfo RelptrFreePageBtree RelptrFreePageManager RelptrFreePageSpanLeader +RemoteSlot RenameStmt ReopenPtrType ReorderBuffer @@ -2581,6 +2582,7 @@ SlabBlock SlabContext SlabSlot SlotNumber +SlotSyncWorkerCtx SlruCtl SlruCtlData SlruErrorCause -- 2.34.1