From 09aa9d1f06da848846852f90e0bd717fa2a0c6c0 Mon Sep 17 00:00:00 2001 From: Shveta Malik Date: Mon, 5 Feb 2024 14:31:15 +0530 Subject: [PATCH v78 2/4] Add a new slotsync worker Be enabling slot synchronization, all the failover logical replication slots on the primary (assuming configurations are appropriate) are automatically created on the physical standbys and are synced periodically. Slot-sync worker on the standby server ping the primary server at regular intervals to get the necessary failover logical slots information and create/update the slots locally. The slots that no longer require synchronization are automatically dropped by the worker. The nap time of the worker is tuned according to the activity on the primary. The worker waits for a period of time before the next synchronization, with the duration varying based on whether any slots were updated during the last cycle. A new parameter sync_replication_slots enables or disables this new process. --- doc/src/sgml/config.sgml | 18 + doc/src/sgml/logicaldecoding.sgml | 8 +- src/backend/access/transam/xlogrecovery.c | 15 + src/backend/postmaster/postmaster.c | 80 ++- .../libpqwalreceiver/libpqwalreceiver.c | 3 + src/backend/replication/logical/slotsync.c | 674 +++++++++++++++++- src/backend/replication/slot.c | 16 +- src/backend/replication/walsender.c | 2 +- src/backend/storage/ipc/ipci.c | 2 + src/backend/storage/lmgr/proc.c | 13 +- src/backend/utils/activity/pgstat_io.c | 1 + .../utils/activity/wait_event_names.txt | 2 + src/backend/utils/init/miscinit.c | 9 +- src/backend/utils/init/postinit.c | 8 +- src/backend/utils/misc/guc_tables.c | 10 + src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/miscadmin.h | 1 + src/include/replication/logicalworker.h | 2 +- src/include/replication/worker_internal.h | 12 + .../t/040_standby_failover_slots_sync.pl | 114 ++- src/tools/pgindent/typedefs.list | 2 + 21 files changed, 938 insertions(+), 55 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index a1e2e6e69f..4cf75c3101 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4943,6 +4943,24 @@ ANY num_sync ( + sync_replication_slots (boolean) + + sync_replication_slots configuration parameter + + + + + It enables a physical standby to synchronize logical failover slots + from the primary server so that logical subscribers are not blocked + after failover. + + + It is disabled by default. This parameter can only be set in the + postgresql.conf file or on the server command line. + + + diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 5e8e4f036b..cd91464b81 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -367,9 +367,11 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU A logical replication slot on the primary can be synchronized to the hot standby by enabling the failover option during slot creation and calling pg_sync_replication_slots - on the standby. For the synchronization - to work, it is mandatory to have a physical replication slot between the - primary and the standby, and + on the standby. By setting + sync_replication_slots + on the standby, the failover slots can be synchronized periodically in the + slotsync worker. For the synchronization to work, it is mandatory to have + a physical replication slot between the primary and the standby, and hot_standby_feedback must be enabled on the standby. It is also necessary to specify a valid dbname in the diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 0bb472da27..87b49d524a 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -50,6 +50,7 @@ #include "postmaster/startup.h" #include "replication/slot.h" #include "replication/walreceiver.h" +#include "replication/worker_internal.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/latch.h" @@ -1467,6 +1468,20 @@ FinishWalRecovery(void) */ XLogShutdownWalRcv(); + /* + * Shutdown the slot sync workers to prevent potential conflicts between + * user processes and slotsync workers after a promotion. + * + * We do not update the 'synced' column from true to false here, as any + * failed update could leave 'synced' column false for some slots. This + * could cause issues during slot sync after restarting the server as a + * standby. While updating after switching to the new timeline is an + * option, it does not simplify the handling for 'synced' column. + * Therefore, we retain the 'synced' column as true after promotion as it + * may provide useful information about the slot origin. + */ + ShutDownSlotSync(); + /* * We are now done reading the xlog from stream. Turn off streaming * recovery to force fetching the files (which would be required at end of diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index feb471dd1d..ab63f7952b 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -116,6 +116,7 @@ #include "postmaster/walsummarizer.h" #include "replication/logicallauncher.h" #include "replication/walsender.h" +#include "replication/worker_internal.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/pg_shmem.h" @@ -167,11 +168,11 @@ * they will never become live backends. dead_end children are not assigned a * PMChildSlot. dead_end children have bkend_type NORMAL. * - * "Special" children such as the startup, bgwriter and autovacuum launcher - * tasks are not in this list. They are tracked via StartupPID and other - * pid_t variables below. (Thus, there can't be more than one of any given - * "special" child process type. We use BackendList entries for any child - * process there can be more than one of.) + * "Special" children such as the startup, bgwriter, autovacuum launcher and + * slot sync worker tasks are not in this list. They are tracked via StartupPID + * and other pid_t variables below. (Thus, there can't be more than one of any + * given "special" child process type. We use BackendList entries for any + * child process there can be more than one of.) */ typedef struct bkend { @@ -254,7 +255,8 @@ static pid_t StartupPID = 0, WalSummarizerPID = 0, AutoVacPID = 0, PgArchPID = 0, - SysLoggerPID = 0; + SysLoggerPID = 0, + SlotSyncWorkerPID = 0; /* Startup process's status */ typedef enum @@ -458,6 +460,10 @@ static void InitPostmasterDeathWatchHandle(void); (pmState == PM_RECOVERY || pmState == PM_HOT_STANDBY))) && \ PgArchCanRestart()) +#define SlotSyncWorkerAllowed() \ + (sync_replication_slots && pmState == PM_HOT_STANDBY && \ + SlotSyncWorkerCanRestart()) + #ifdef EXEC_BACKEND #ifdef WIN32 @@ -1830,6 +1836,10 @@ ServerLoop(void) if (PgArchPID == 0 && PgArchStartupAllowed()) PgArchPID = StartArchiver(); + /* If we need to start a slot sync worker, try to do that now */ + if (SlotSyncWorkerPID == 0 && SlotSyncWorkerAllowed()) + SlotSyncWorkerPID = StartSlotSyncWorker(); + /* If we need to signal the autovacuum launcher, do so now */ if (avlauncher_needs_signal) { @@ -2677,6 +2687,8 @@ process_pm_reload_request(void) signal_child(PgArchPID, SIGHUP); if (SysLoggerPID != 0) signal_child(SysLoggerPID, SIGHUP); + if (SlotSyncWorkerPID != 0) + signal_child(SlotSyncWorkerPID, SIGHUP); /* Reload authentication config files too */ if (!load_hba()) @@ -3034,6 +3046,8 @@ process_pm_child_exit(void) AutoVacPID = StartAutoVacLauncher(); if (PgArchStartupAllowed() && PgArchPID == 0) PgArchPID = StartArchiver(); + if (SlotSyncWorkerAllowed() && SlotSyncWorkerPID == 0) + SlotSyncWorkerPID = StartSlotSyncWorker(); /* workers may be scheduled to start now */ maybe_start_bgworkers(); @@ -3204,6 +3218,22 @@ process_pm_child_exit(void) continue; } + /* + * Was it the slot sync worker? Normal exit or FATAL exit can be + * ignored (FATAL can be caused by libpqwalreceiver on receiving + * shutdown request by the startup process during promotion); we'll + * start a new one at the next iteration of the postmaster's main + * loop, if necessary. Any other exit condition is treated as a crash. + */ + if (pid == SlotSyncWorkerPID) + { + SlotSyncWorkerPID = 0; + if (!EXIT_STATUS_0(exitstatus) && !EXIT_STATUS_1(exitstatus)) + HandleChildCrash(pid, exitstatus, + _("slot sync worker process")); + continue; + } + /* Was it one of our background workers? */ if (CleanupBackgroundWorker(pid, exitstatus)) { @@ -3408,7 +3438,7 @@ CleanupBackend(int pid, /* * HandleChildCrash -- cleanup after failed backend, bgwriter, checkpointer, - * walwriter, autovacuum, archiver or background worker. + * walwriter, autovacuum, archiver, slot sync worker or background worker. * * The objectives here are to clean up our local state about the child * process, and to signal all other remaining children to quickdie. @@ -3570,6 +3600,12 @@ HandleChildCrash(int pid, int exitstatus, const char *procname) else if (PgArchPID != 0 && take_action) sigquit_child(PgArchPID); + /* Take care of the slot sync worker too */ + if (pid == SlotSyncWorkerPID) + SlotSyncWorkerPID = 0; + else if (SlotSyncWorkerPID != 0 && take_action) + sigquit_child(SlotSyncWorkerPID); + /* We do NOT restart the syslogger */ if (Shutdown != ImmediateShutdown) @@ -3710,6 +3746,8 @@ PostmasterStateMachine(void) signal_child(WalReceiverPID, SIGTERM); if (WalSummarizerPID != 0) signal_child(WalSummarizerPID, SIGTERM); + if (SlotSyncWorkerPID != 0) + signal_child(SlotSyncWorkerPID, SIGTERM); /* checkpointer, archiver, stats, and syslogger may continue for now */ /* Now transition to PM_WAIT_BACKENDS state to wait for them to die */ @@ -3725,13 +3763,13 @@ PostmasterStateMachine(void) /* * PM_WAIT_BACKENDS state ends when we have no regular backends * (including autovac workers), no bgworkers (including unconnected - * ones), and no walwriter, autovac launcher or bgwriter. If we are - * doing crash recovery or an immediate shutdown then we expect the - * checkpointer to exit as well, otherwise not. The stats and - * syslogger processes are disregarded since they are not connected to - * shared memory; we also disregard dead_end children here. Walsenders - * and archiver are also disregarded, they will be terminated later - * after writing the checkpoint record. + * ones), and no walwriter, autovac launcher, bgwriter or slot sync + * worker. If we are doing crash recovery or an immediate shutdown + * then we expect the checkpointer to exit as well, otherwise not. The + * stats and syslogger processes are disregarded since they are not + * connected to shared memory; we also disregard dead_end children + * here. Walsenders and archiver are also disregarded, they will be + * terminated later after writing the checkpoint record. */ if (CountChildren(BACKEND_TYPE_ALL - BACKEND_TYPE_WALSND) == 0 && StartupPID == 0 && @@ -3741,7 +3779,8 @@ PostmasterStateMachine(void) (CheckpointerPID == 0 || (!FatalError && Shutdown < ImmediateShutdown)) && WalWriterPID == 0 && - AutoVacPID == 0) + AutoVacPID == 0 && + SlotSyncWorkerPID == 0) { if (Shutdown >= ImmediateShutdown || FatalError) { @@ -3839,6 +3878,7 @@ PostmasterStateMachine(void) Assert(CheckpointerPID == 0); Assert(WalWriterPID == 0); Assert(AutoVacPID == 0); + Assert(SlotSyncWorkerPID == 0); /* syslogger is not considered here */ pmState = PM_NO_CHILDREN; } @@ -4062,6 +4102,8 @@ TerminateChildren(int signal) signal_child(AutoVacPID, signal); if (PgArchPID != 0) signal_child(PgArchPID, signal); + if (SlotSyncWorkerPID != 0) + signal_child(SlotSyncWorkerPID, signal); } /* @@ -4874,6 +4916,7 @@ SubPostmasterMain(int argc, char *argv[]) */ if (strcmp(argv[1], "--forkbackend") == 0 || strcmp(argv[1], "--forkavlauncher") == 0 || + strcmp(argv[1], "--forkssworker") == 0 || strcmp(argv[1], "--forkavworker") == 0 || strcmp(argv[1], "--forkaux") == 0 || strcmp(argv[1], "--forkbgworker") == 0) @@ -4977,6 +5020,13 @@ SubPostmasterMain(int argc, char *argv[]) AutoVacWorkerMain(argc - 2, argv + 2); /* does not return */ } + if (strcmp(argv[1], "--forkssworker") == 0) + { + /* Restore basic shared memory pointers */ + InitShmemAccess(UsedShmemSegAddr); + + ReplSlotSyncWorkerMain(argc - 2, argv + 2); /* does not return */ + } if (strcmp(argv[1], "--forkbgworker") == 0) { /* do this as early as possible; in particular, before InitProcess() */ diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 9270d7b855..e40cdbe4d9 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -6,6 +6,9 @@ * loaded as a dynamic module to avoid linking the main server binary with * libpq. * + * Apart from walreceiver, the libpq-specific routines here are now being used + * by logical replication workers and slot sync worker as well. + * * Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group * * diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 4716696817..20940374e4 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -22,6 +22,13 @@ * * The slots that were synchronized will be dropped if they are currently not * needed to be synchronized. + * + * It waits for a period of time before the next synchronization, with the + * duration varying based on whether any slots were updated during the last + * cycle. Refer to the comments above wait_for_slot_activity() for more details. + * + * The slot sync worker currently does not support synchronization on the + * cascading standby. *--------------------------------------------------------------------------- */ @@ -58,9 +65,6 @@ #include "utils/timeout.h" #include "utils/varlena.h" -/* Flag to tell if we are syncing replication slots */ -static bool syncing_slots = false; - /* * Structure to hold information fetched from the primary server about a logical * replication slot. @@ -78,7 +82,62 @@ typedef struct RemoteSlot /* RS_INVAL_NONE if valid, or the reason of invalidation */ ReplicationSlotInvalidationCause invalidated; -} RemoteSlot; +} RemoteSlot; + +/* + * Struct for sharing information between startup process and slot + * sync worker. + * + * Slot sync worker's pid is needed by the startup process in order to + * shut it down during promotion. Startup process shuts down the slot + * sync worker and also sets stopSignaled=true to handle the race condition + * when postmaster has not noticed the promotion yet and thus may end up + * restarting slot sync worker. If stopSignaled is set, the worker will + * exit in such a case. + * + * The last_start_time is needed by postmaster to start the slot sync + * worker once per SLOTSYNC_RESTART_INTERVAL_SEC. In cases where a + * immediate restart is expected (e.g., slot sync GUCs change), slot + * sync worker will reset last_start_time before exiting, so that postmaster + * can start the worker without waiting for SLOTSYNC_RESTART_INTERVAL_SEC. + */ +typedef struct SlotSyncWorkerCtxStruct +{ + pid_t pid; + bool stopSignaled; + time_t last_start_time; + slock_t mutex; +} SlotSyncWorkerCtxStruct; + +SlotSyncWorkerCtxStruct *SlotSyncWorker = NULL; + +/* GUC variable */ +bool sync_replication_slots = false; + +/* + * The sleep time (ms) between slot-sync cycles varies dynamically + * (within a MIN/MAX range) according to slot activity. See + * wait_for_slot_activity() for details. + */ +#define MIN_WORKER_NAPTIME_MS 200 +#define MAX_WORKER_NAPTIME_MS 30000 /* 30s */ +static long sleep_ms = MIN_WORKER_NAPTIME_MS; + +/* The restart interval for slot sync work used by postmaster */ +#define SLOTSYNC_RESTART_INTERVAL_SEC 10 + +/* Flag to tell if we are in a slot sync worker process */ +static bool am_slotsync_worker = false; + +/* Flag to tell if we are syncing replication slots through the SQL function */ +static bool syncing_slots = false; + +static void ProcessSlotSyncInterrupts(WalReceiverConn *wrconn, bool restart); + +#ifdef EXEC_BACKEND +static pid_t slotsyncworker_forkexec(void); +#endif +NON_EXEC_STATIC void ReplSlotSyncWorkerMain(int argc, char *argv[]) pg_attribute_noreturn(); /* * If necessary, update local slot metadata based on the data from the remote @@ -88,7 +147,7 @@ typedef struct RemoteSlot * local slot) return false, otherwise true. */ static bool -local_slot_update(RemoteSlot * remote_slot, Oid remote_dbid) +local_slot_update(RemoteSlot *remote_slot, Oid remote_dbid) { ReplicationSlot *slot = MyReplicationSlot; NameData plugin_name; @@ -328,7 +387,7 @@ reserve_wal_for_slot(XLogRecPtr restart_lsn) * false. */ static bool -update_and_persist_slot(RemoteSlot * remote_slot, Oid remote_dbid) +update_and_persist_slot(RemoteSlot *remote_slot, Oid remote_dbid) { ReplicationSlot *slot = MyReplicationSlot; @@ -385,7 +444,7 @@ update_and_persist_slot(RemoteSlot * remote_slot, Oid remote_dbid) * Returns TRUE if the local slot is updated. */ static bool -synchronize_one_slot(RemoteSlot * remote_slot, Oid remote_dbid) +synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) { ReplicationSlot *slot; bool slot_updated = false; @@ -402,7 +461,7 @@ synchronize_one_slot(RemoteSlot * remote_slot, Oid remote_dbid) * Can get here only if GUC 'standby_slot_names' on the primary server * was not configured correctly. */ - ereport(ERROR, + ereport(syncing_slots ? ERROR : LOG, errmsg("skipping slot synchronization as the received slot sync" " LSN %X/%X for slot \"%s\" is ahead of the standby position %X/%X", LSN_FORMAT_ARGS(remote_slot->confirmed_lsn), @@ -717,19 +776,27 @@ synchronize_slots(WalReceiverConn *wrconn) } /* - * Using the specified primary server connection, validates primary_slot_name. + * Checks the primary server info. + * + * Using the specified primary server connection, check whether we are a + * cascading standby. It also validates primary_slot_name for non-cascading + * standbys. */ -static void -validate_primary_slot(WalReceiverConn *wrconn, int slot_invalid_elevel) +static bool +check_primary_info(WalReceiverConn *wrconn) { -#define PRIMARY_INFO_OUTPUT_COL_COUNT 1 +#define PRIMARY_INFO_OUTPUT_COL_COUNT 2 WalRcvExecResult *res; - Oid slotRow[PRIMARY_INFO_OUTPUT_COL_COUNT] = {BOOLOID}; + Oid slotRow[PRIMARY_INFO_OUTPUT_COL_COUNT] = {BOOLOID, BOOLOID}; StringInfoData cmd; bool isnull; TupleTableSlot *tupslot; bool valid; + bool remote_in_recovery; bool started_tx = false; + bool primary_info_valid = true; + int slot_invalid_elevel = syncing_slots ? ERROR : LOG; + bool check_cascading = syncing_slots ? false : true; /* The syscache access in walrcv_exec() needs a transaction env. */ if (!IsTransactionState()) @@ -740,7 +807,7 @@ validate_primary_slot(WalReceiverConn *wrconn, int slot_invalid_elevel) initStringInfo(&cmd); appendStringInfo(&cmd, - "SELECT count(*) = 1" + "SELECT pg_is_in_recovery(), count(*) = 1" " FROM pg_catalog.pg_replication_slots" " WHERE slot_type='physical' AND slot_name=%s", quote_literal_cstr(PrimarySlotName)); @@ -759,22 +826,64 @@ validate_primary_slot(WalReceiverConn *wrconn, int slot_invalid_elevel) elog(ERROR, "failed to fetch tuple for the primary server slot specified by \"primary_slot_name\""); - valid = DatumGetBool(slot_getattr(tupslot, 1, &isnull)); + remote_in_recovery = DatumGetBool(slot_getattr(tupslot, 1, &isnull)); Assert(!isnull); - if (!valid) - ereport(slot_invalid_elevel, - errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("bad configuration for slot synchronization"), - /* translator: second %s is a GUC variable name */ - errdetail("The primary server slot \"%s\" specified by \"%s\" is not valid.", - PrimarySlotName, "primary_slot_name")); + if (check_cascading && remote_in_recovery) + { + /* + * No need to check further, just set primary_info_valid to false as + * we are a cascading standby. + */ + primary_info_valid = false; + } + else + { + /* + * We are not cascading standby, thus good to proceed with + * primary_slot_name validity check now. + */ + valid = DatumGetBool(slot_getattr(tupslot, 2, &isnull)); + Assert(!isnull); + + if (!valid) + { + primary_info_valid = false; + + ereport(slot_invalid_elevel, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("bad configuration for slot synchronization"), + /* translator: second %s is a GUC variable name */ + errdetail("The primary server slot \"%s\" specified by \"%s\" is not valid.", + PrimarySlotName, "primary_slot_name")); + } + } ExecClearTuple(tupslot); walrcv_clear_result(res); if (started_tx) CommitTransactionCommand(); + + return primary_info_valid; +} + +/* + * Get database name from the conninfo. + * + * If dbname is extracted already from the conninfo, just return it. + */ +static char * +get_dbname_from_conninfo(const char *conninfo) +{ + static char *dbname; + + if (dbname) + return dbname; + else + dbname = walrcv_get_dbname_from_conninfo(conninfo); + + return dbname; } /* @@ -848,7 +957,7 @@ validate_slotsync_params(int elevel) * The slot synchronization needs a database connection for walrcv_exec to * work. */ - dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo); + dbname = get_dbname_from_conninfo(PrimaryConnInfo); if (dbname == NULL) { ereport(elevel, @@ -866,13 +975,526 @@ validate_slotsync_params(int elevel) return true; } + +/* + * Check that all necessary GUCs for slot synchronization are set + * appropriately. If not, sleep for MAX_WORKER_NAPTIME_MS and check again. + * The idea is to become no-op until we get valid GUCs values. + * + * If all checks pass, extracts the dbname from the primary_conninfo GUC and + * returns it. + */ +static void +ensure_valid_slotsync_params(void) +{ + int rc; + + /* Sanity check. */ + Assert(sync_replication_slots); + + for (;;) + { + if (validate_slotsync_params(LOG)) + break; + + ereport(LOG, errmsg("skipping slot synchronization")); + + ProcessSlotSyncInterrupts(NULL, false); + + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + MAX_WORKER_NAPTIME_MS, + WAIT_EVENT_REPL_SLOTSYNC_MAIN); + + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); + } +} + +/* + * Re-read the config file. + * + * If any of the slot sync GUCs have changed, exit the worker and + * let it get restarted by the postmaster. The worker to be exited for + * restart purpose only if the caller passed restart as true. + */ +static void +slotsync_reread_config(bool restart) +{ + char *old_primary_conninfo = pstrdup(PrimaryConnInfo); + char *old_primary_slotname = pstrdup(PrimarySlotName); + bool old_sync_replication_slots = sync_replication_slots; + bool old_hot_standby_feedback = hot_standby_feedback; + bool conninfo_changed; + bool primary_slotname_changed; + + Assert(sync_replication_slots); + + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + + conninfo_changed = strcmp(old_primary_conninfo, PrimaryConnInfo) != 0; + primary_slotname_changed = strcmp(old_primary_slotname, PrimarySlotName) != 0; + pfree(old_primary_conninfo); + pfree(old_primary_slotname); + + if (old_sync_replication_slots != sync_replication_slots) + { + ereport(LOG, + /* translator: %s is a GUC variable name */ + errmsg("slot sync worker will shutdown because %s is disabled", "sync_replication_slots")); + proc_exit(0); + } + + /* The caller instructed to skip restart */ + if (!restart) + return; + + if (conninfo_changed || + primary_slotname_changed || + (old_hot_standby_feedback != hot_standby_feedback)) + { + ereport(LOG, + errmsg("slot sync worker will restart because of a parameter change")); + + /* + * Reset the last-start time for this worker so that the postmaster + * can restart it without waiting for SLOTSYNC_RESTART_INTERVAL_SEC. + */ + SlotSyncWorker->last_start_time = 0; + + proc_exit(0); + } + +} + +/* + * Interrupt handler for main loop of slot sync worker. + */ +static void +ProcessSlotSyncInterrupts(WalReceiverConn *wrconn, bool restart) +{ + CHECK_FOR_INTERRUPTS(); + + if (ShutdownRequestPending) + { + if (wrconn) + walrcv_disconnect(wrconn); + ereport(LOG, + errmsg("replication slot sync worker is shutting down on receiving SIGINT")); + proc_exit(0); + } + + if (ConfigReloadPending) + slotsync_reread_config(restart); +} + +/* + * Cleanup function for slotsync worker. + * + * Called on slotsync worker exit. + */ +static void +slotsync_worker_onexit(int code, Datum arg) +{ + SpinLockAcquire(&SlotSyncWorker->mutex); + SlotSyncWorker->pid = InvalidPid; + SpinLockRelease(&SlotSyncWorker->mutex); +} + +/* + * Sleep for long enough that we believe it's likely that the slots on primary + * get updated. + * + * If there is no slot activity the wait time between sync-cycles will double + * (to a maximum of 30s). If there is some slot activity the wait time between + * sync-cycles is reset to the minimum (200ms). + */ +static void +wait_for_slot_activity(bool some_slot_updated, bool recheck_primary_info) +{ + int rc; + + if (recheck_primary_info) + { + /* + * If we are on the cascading standby or primary_slot_name configured + * is not valid, then we will skip the sync and take a longer nap + * before we can do check_primary_info() again. + */ + sleep_ms = MAX_WORKER_NAPTIME_MS; + } + else if (!some_slot_updated) + { + /* + * No slots were updated, so double the sleep time, but not beyond the + * maximum allowable value. + */ + sleep_ms = Min(sleep_ms * 2, MAX_WORKER_NAPTIME_MS); + } + else + { + /* + * Some slots were updated since the last sleep, so reset the sleep + * time. + */ + sleep_ms = MIN_WORKER_NAPTIME_MS; + } + + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + sleep_ms, + WAIT_EVENT_REPL_SLOTSYNC_MAIN); + + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); +} + +/* + * The main loop of our worker process. + * + * It connects to the primary server, fetches logical failover slots + * information periodically in order to create and sync the slots. + */ +NON_EXEC_STATIC void +ReplSlotSyncWorkerMain(int argc, char *argv[]) +{ + WalReceiverConn *wrconn = NULL; + char *dbname; + char *err; + sigjmp_buf local_sigjmp_buf; + StringInfoData app_name; + bool primary_info_valid; + + am_slotsync_worker = true; + + MyBackendType = B_SLOTSYNC_WORKER; + + init_ps_display(NULL); + + SetProcessingMode(InitProcessing); + + /* + * Create a per-backend PGPROC struct in shared memory. We must do this + * before we access any shared memory. + */ + InitProcess(); + + /* + * Early initialization. + */ + BaseInit(); + + Assert(SlotSyncWorker != NULL); + + SpinLockAcquire(&SlotSyncWorker->mutex); + Assert(SlotSyncWorker->pid == InvalidPid); + + /* + * Startup process signaled the slot sync worker to stop, so if meanwhile + * postmaster ended up starting the worker again, exit. + */ + if (SlotSyncWorker->stopSignaled) + { + SpinLockRelease(&SlotSyncWorker->mutex); + proc_exit(0); + } + + /* Advertise our PID so that the startup process can kill us on promotion */ + SlotSyncWorker->pid = MyProcPid; + SpinLockRelease(&SlotSyncWorker->mutex); + + ereport(LOG, errmsg("replication slot sync worker started")); + + on_shmem_exit(slotsync_worker_onexit, (Datum) 0); + + /* Setup signal handling */ + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGINT, SignalHandlerForShutdownRequest); + pqsignal(SIGTERM, die); + pqsignal(SIGFPE, FloatExceptionHandler); + pqsignal(SIGUSR1, procsignal_sigusr1_handler); + pqsignal(SIGUSR2, SIG_IGN); + pqsignal(SIGPIPE, SIG_IGN); + pqsignal(SIGCHLD, SIG_DFL); + + /* + * Establishes SIGALRM handler and initialize timeout module. It is needed + * by InitPostgres to register different timeouts. + */ + InitializeTimeouts(); + + /* Load the libpq-specific functions */ + load_file("libpqwalreceiver", false); + + /* + * If an exception is encountered, processing resumes here. + * + * We just need to clean up, report the error, and go away. + * + * If we do not have this handling here, then since this worker process + * operates at the bottom of the exception stack, ERRORs turn into FATALs. + * Therefore, we create our own exception handler to catch ERRORs. + */ + if (sigsetjmp(local_sigjmp_buf, 1) != 0) + { + /* since not using PG_TRY, must reset error stack by hand */ + error_context_stack = NULL; + + /* Prevents interrupts while cleaning up */ + HOLD_INTERRUPTS(); + + /* Report the error to the server log */ + EmitErrorReport(); + + /* + * We can now go away. Note that because we called InitProcess, a + * callback was registered to do ProcKill, which will clean up + * necessary state. + */ + proc_exit(0); + } + + /* We can now handle ereport(ERROR) */ + PG_exception_stack = &local_sigjmp_buf; + + /* + * Unblock signals (they were blocked when the postmaster forked us) + */ + sigprocmask(SIG_SETMASK, &UnBlockSig, NULL); + + ensure_valid_slotsync_params(); + + dbname = get_dbname_from_conninfo(PrimaryConnInfo); + + /* + * Connect to the database specified by user in primary_conninfo. We need + * a database connection for walrcv_exec to work. Please see comments atop + * libpqrcv_exec. + */ + InitPostgres(dbname, InvalidOid, NULL, InvalidOid, 0, NULL); + + SetProcessingMode(NormalProcessing); + + initStringInfo(&app_name); + if (cluster_name[0]) + appendStringInfo(&app_name, "%s_%s", cluster_name, "slotsyncworker"); + else + appendStringInfo(&app_name, "%s", "slotsyncworker"); + + /* + * Establish the connection to the primary server for slots + * synchronization. + */ + wrconn = walrcv_connect(PrimaryConnInfo, false, false, false, + app_name.data, + &err); + pfree(app_name.data); + + if (!wrconn) + ereport(ERROR, + errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the primary server: %s", err)); + + /* + * Using the specified primary server connection, check whether we are + * cascading standby and validates primary_slot_name for + * non-cascading-standbys. + */ + primary_info_valid = check_primary_info(wrconn); + + /* Main wait loop */ + for (;;) + { + bool some_slot_updated = false; + + ProcessSlotSyncInterrupts(wrconn, true); + + if (primary_info_valid) + some_slot_updated = synchronize_slots(wrconn); + + wait_for_slot_activity(some_slot_updated, !primary_info_valid); + + /* + * If the standby was promoted then what was previously a cascading + * standby might no longer be one, so recheck each time. + */ + if (!primary_info_valid) + check_primary_info(wrconn); + } + + /* + * The slot sync worker can not get here because it will only stop when it + * receives a SIGINT from the startup process, or when there is an error. + */ + Assert(false); +} + +/* + * Shut down the slot sync worker. + */ +void +ShutDownSlotSync(void) +{ + SpinLockAcquire(&SlotSyncWorker->mutex); + + SlotSyncWorker->stopSignaled = true; + + if (SlotSyncWorker->pid == InvalidPid) + { + SpinLockRelease(&SlotSyncWorker->mutex); + return; + } + SpinLockRelease(&SlotSyncWorker->mutex); + + kill(SlotSyncWorker->pid, SIGINT); + + /* Wait for it to die */ + for (;;) + { + int rc; + + /* Wait a bit, we don't expect to have to wait long */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 10L, WAIT_EVENT_REPL_SLOTSYNC_SHUTDOWN); + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + SpinLockAcquire(&SlotSyncWorker->mutex); + + /* Is it gone? */ + if (SlotSyncWorker->pid == InvalidPid) + break; + + SpinLockRelease(&SlotSyncWorker->mutex); + } + + SpinLockRelease(&SlotSyncWorker->mutex); +} + +/* + * Allocate and initialize slot sync worker shared memory + */ +void +SlotSyncWorkerShmemInit(void) +{ + Size size; + bool found; + + size = sizeof(SlotSyncWorkerCtxStruct); + size = MAXALIGN(size); + + SlotSyncWorker = (SlotSyncWorkerCtxStruct *) + ShmemInitStruct("Slot Sync Worker Data", size, &found); + + if (!found) + { + memset(SlotSyncWorker, 0, size); + SlotSyncWorker->pid = InvalidPid; + SpinLockInit(&SlotSyncWorker->mutex); + } +} + +#ifdef EXEC_BACKEND +/* + * The forkexec routine for the slot sync worker process. + * + * Format up the arglist, then fork and exec. + */ +static pid_t +slotsyncworker_forkexec(void) +{ + char *av[10]; + int ac = 0; + + av[ac++] = "postgres"; + av[ac++] = "--forkssworker"; + av[ac++] = NULL; /* filled in by postmaster_forkexec */ + av[ac] = NULL; + + Assert(ac < lengthof(av)); + + return postmaster_forkexec(ac, av); +} +#endif + +/* + * SlotSyncWorkerCanRestart + * + * Returns true if the worker is allowed to restart if enough time has + * passed (SLOTSYNC_RESTART_INTERVAL_SEC) since it was launched last. + * Otherwise returns false. + * + * This is a safety valve to protect against continuous respawn attempts if the + * worker is dying immediately at launch. Note that since we will retry to + * launch the worker from the postmaster main loop, we will get another + * chance later. + */ +bool +SlotSyncWorkerCanRestart(void) +{ + time_t curtime = time(NULL); + + /* Return false if too soon since last start. */ + if ((unsigned int) (curtime - SlotSyncWorker->last_start_time) < + (unsigned int) SLOTSYNC_RESTART_INTERVAL_SEC) + return false; + + SlotSyncWorker->last_start_time = curtime; + + return true; +} + +/* + * Main entry point for slot sync worker process, to be called from the + * postmaster. + */ +int +StartSlotSyncWorker(void) +{ + pid_t pid; + +#ifdef EXEC_BACKEND + switch ((pid = slotsyncworker_forkexec())) + { +#else + switch ((pid = fork_process())) + { + case 0: + /* in postmaster child ... */ + InitPostmasterChild(); + + /* Close the postmaster's sockets */ + ClosePostmasterPorts(false); + + ReplSlotSyncWorkerMain(0, NULL); + break; +#endif + case -1: + ereport(LOG, + (errmsg("could not fork slot sync worker process: %m"))); + return 0; + + default: + return (int) pid; + } + + /* shouldn't get here */ + return 0; +} + /* * Is current process syncing replication slots ? */ bool -IsSyncingReplicationSlots(void) +IsLogicalSlotSyncWorker(void) { - return syncing_slots; + return am_slotsync_worker || syncing_slots; } /* @@ -923,7 +1545,7 @@ pg_sync_replication_slots(PG_FUNCTION_ARGS) * Using the specified primary server connection, validates the slot * in primary_slot_name. */ - validate_primary_slot(wrconn, ERROR); + check_primary_info(wrconn); (void) synchronize_slots(wrconn); } diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 144a178b63..7d17428667 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -273,7 +273,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, * synchronization, as it needs to maintain this value in sync with the * remote slots. */ - if (failover && RecoveryInProgress() && !IsSyncingReplicationSlots()) + if (failover && RecoveryInProgress() && !IsLogicalSlotSyncWorker()) ereport(ERROR, errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot enable failover for a replication slot" @@ -1210,6 +1210,20 @@ restart: * concurrently being dropped by a backend connected to another DB. * * That's fairly unlikely in practice, so we'll just bail out. + * + * The slot sync worker holds a shared lock on the database before + * operating on synced logical slots to avoid conflict with the drop + * happening here. The persistent synced slots are thus safe but there + * is a possibility that the slot sync worker has created a temporary + * slot (which stays active even on release) and we are trying to drop + * the same here. In practice, the chances of hitting this scenario is + * very less as during slot synchronization, the temporary slot is + * immediately converted to persistent and thus is safe due to the + * shared lock taken on the database. So for the time being, we'll + * just bail out in such a scenario. + * + * XXX: If needed, we can consider shutting down slot sync worker + * before trying to drop synced temporary slots here. */ if (active_pid) ereport(ERROR, diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 460b3c0be4..5c2051ce78 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -3394,7 +3394,7 @@ GetStandbyFlushRecPtr(TimeLineID *tli) TimeLineID receiveTLI; XLogRecPtr result; - Assert(am_cascading_walsender || IsSyncingReplicationSlots()); + Assert(am_cascading_walsender || IsLogicalSlotSyncWorker()); /* * We can safely send what's already been replayed. Also, if walreceiver diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 7084e18861..925ac6c942 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -38,6 +38,7 @@ #include "replication/slot.h" #include "replication/walreceiver.h" #include "replication/walsender.h" +#include "replication/worker_internal.h" #include "storage/bufmgr.h" #include "storage/dsm.h" #include "storage/dsm_registry.h" @@ -347,6 +348,7 @@ CreateOrAttachShmemStructs(void) WalSummarizerShmemInit(); PgArchShmemInit(); ApplyLauncherShmemInit(); + SlotSyncWorkerShmemInit(); /* * Set up other modules that need some shared memory space diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index e5977548fe..3ff0f449e0 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -39,6 +39,7 @@ #include "miscadmin.h" #include "pgstat.h" #include "postmaster/autovacuum.h" +#include "replication/logicalworker.h" #include "replication/slot.h" #include "replication/syncrep.h" #include "replication/walsender.h" @@ -364,8 +365,12 @@ InitProcess(void) * child; this is so that the postmaster can detect it if we exit without * cleaning up. (XXX autovac launcher currently doesn't participate in * this; it probably should.) + * + * Slot sync worker also does not participate in it, see comments atop + * 'struct bkend' in postmaster.c. */ - if (IsUnderPostmaster && !IsAutoVacuumLauncherProcess()) + if (IsUnderPostmaster && !IsAutoVacuumLauncherProcess() && + !IsLogicalSlotSyncWorker()) MarkPostmasterChildActive(); /* @@ -934,8 +939,12 @@ ProcKill(int code, Datum arg) * This process is no longer present in shared memory in any meaningful * way, so tell the postmaster we've cleaned up acceptably well. (XXX * autovac launcher should be included here someday) + * + * Slot sync worker is also not a postmaster child, so skip this shared + * memory related processing here. */ - if (IsUnderPostmaster && !IsAutoVacuumLauncherProcess()) + if (IsUnderPostmaster && !IsAutoVacuumLauncherProcess() && + !IsLogicalSlotSyncWorker()) MarkPostmasterChildInactive(); /* wake autovac launcher if needed -- see comments in FreeWorkerInfo */ diff --git a/src/backend/utils/activity/pgstat_io.c b/src/backend/utils/activity/pgstat_io.c index 43c393d6fe..9d6e067382 100644 --- a/src/backend/utils/activity/pgstat_io.c +++ b/src/backend/utils/activity/pgstat_io.c @@ -338,6 +338,7 @@ pgstat_tracks_io_bktype(BackendType bktype) case B_BG_WORKER: case B_BG_WRITER: case B_CHECKPOINTER: + case B_SLOTSYNC_WORKER: case B_STANDALONE_BACKEND: case B_STARTUP: case B_WAL_SENDER: diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 6464386b77..b52afd4eac 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -53,6 +53,8 @@ LOGICAL_APPLY_MAIN "Waiting in main loop of logical replication apply process." LOGICAL_LAUNCHER_MAIN "Waiting in main loop of logical replication launcher process." LOGICAL_PARALLEL_APPLY_MAIN "Waiting in main loop of logical replication parallel apply process." RECOVERY_WAL_STREAM "Waiting in main loop of startup process for WAL to arrive, during streaming recovery." +REPL_SLOTSYNC_MAIN "Waiting in main loop of slot sync worker." +REPL_SLOTSYNC_SHUTDOWN "Waiting for slot sync worker to shut down." SYSLOGGER_MAIN "Waiting in main loop of syslogger process." WAL_RECEIVER_MAIN "Waiting in main loop of WAL receiver process." WAL_SENDER_MAIN "Waiting in main loop of WAL sender process." diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c index 23f77a59e5..309aa33a62 100644 --- a/src/backend/utils/init/miscinit.c +++ b/src/backend/utils/init/miscinit.c @@ -40,6 +40,7 @@ #include "postmaster/interrupt.h" #include "postmaster/pgarch.h" #include "postmaster/postmaster.h" +#include "replication/logicalworker.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/latch.h" @@ -293,6 +294,9 @@ GetBackendTypeDesc(BackendType backendType) case B_LOGGER: backendDesc = "logger"; break; + case B_SLOTSYNC_WORKER: + backendDesc = "slotsyncworker"; + break; case B_STANDALONE_BACKEND: backendDesc = "standalone backend"; break; @@ -835,9 +839,10 @@ InitializeSessionUserIdStandalone(void) { /* * This function should only be called in single-user mode, in autovacuum - * workers, and in background workers. + * workers, in slot sync worker and in background workers. */ - Assert(!IsUnderPostmaster || IsAutoVacuumWorkerProcess() || IsBackgroundWorker); + Assert(!IsUnderPostmaster || IsAutoVacuumWorkerProcess() || + IsLogicalSlotSyncWorker() || IsBackgroundWorker); /* call only once */ Assert(!OidIsValid(AuthenticatedUserId)); diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index 1ad3367159..392b2d938c 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -42,6 +42,7 @@ #include "pgstat.h" #include "postmaster/autovacuum.h" #include "postmaster/postmaster.h" +#include "replication/logicalworker.h" #include "replication/slot.h" #include "replication/walsender.h" #include "storage/bufmgr.h" @@ -874,10 +875,11 @@ InitPostgres(const char *in_dbname, Oid dboid, * Perform client authentication if necessary, then figure out our * postgres user ID, and see if we are a superuser. * - * In standalone mode and in autovacuum worker processes, we use a fixed - * ID, otherwise we figure it out from the authenticated user name. + * In standalone mode, autovacuum worker processes and slot sync worker + * process, we use a fixed ID, otherwise we figure it out from the + * authenticated user name. */ - if (bootstrap || IsAutoVacuumWorkerProcess()) + if (bootstrap || IsAutoVacuumWorkerProcess() || IsLogicalSlotSyncWorker()) { InitializeSessionUserIdStandalone(); am_superuser = true; diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 7fe58518d7..9cba1cba72 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -68,6 +68,7 @@ #include "replication/logicallauncher.h" #include "replication/slot.h" #include "replication/syncrep.h" +#include "replication/worker_internal.h" #include "storage/bufmgr.h" #include "storage/large_object.h" #include "storage/pg_shmem.h" @@ -2054,6 +2055,15 @@ struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"sync_replication_slots", PGC_SIGHUP, REPLICATION_STANDBY, + gettext_noop("Enables a physical standby to synchronize logical failover slots from the primary server."), + }, + &sync_replication_slots, + false, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index da10b43dac..dfd1313c94 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -361,6 +361,7 @@ #wal_retrieve_retry_interval = 5s # time to wait before retrying to # retrieve WAL after a failed attempt #recovery_min_apply_delay = 0 # minimum delay for applying changes during recovery +#sync_replication_slots = off # enables slot synchronization on the physical standby from the primary # - Subscribers - diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 0b01c1f093..65819cb7a7 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -332,6 +332,7 @@ typedef enum BackendType B_BG_WRITER, B_CHECKPOINTER, B_LOGGER, + B_SLOTSYNC_WORKER, B_STANDALONE_BACKEND, B_STARTUP, B_WAL_RECEIVER, diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index 7c00f73328..bbe04226db 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -22,7 +22,7 @@ extern void TablesyncWorkerMain(Datum main_arg); extern bool IsLogicalWorker(void); extern bool IsLogicalParallelApplyWorker(void); -extern bool IsSyncingReplicationSlots(void); +extern bool IsLogicalSlotSyncWorker(void); extern void HandleParallelApplyMessageInterrupt(void); extern void HandleParallelApplyMessages(void); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 515aefd519..e7432c23ba 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -237,6 +237,11 @@ extern PGDLLIMPORT bool in_remote_transaction; extern PGDLLIMPORT bool InitializingApplyWorker; +/* Slot sync worker objects */ +extern PGDLLIMPORT char *PrimaryConnInfo; +extern PGDLLIMPORT char *PrimarySlotName; +extern PGDLLIMPORT bool sync_replication_slots; + extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); @@ -324,6 +329,13 @@ extern void pa_decr_and_wait_stream_block(void); extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); +#ifdef EXEC_BACKEND +extern void ReplSlotSyncWorkerMain(int argc, char *argv[]) pg_attribute_noreturn(); +#endif +extern int StartSlotSyncWorker(void); +extern bool SlotSyncWorkerCanRestart(void); +extern void ShutDownSlotSync(void); +extern void SlotSyncWorkerShmemInit(void); #define isParallelApplyWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_PARALLEL_APPLY) diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl index 7036a33ff3..7b44ba7eb1 100644 --- a/src/test/recovery/t/040_standby_failover_slots_sync.pl +++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl @@ -18,7 +18,8 @@ $publisher->init(allows_streaming => 'logical'); $publisher->start; $publisher->safe_psql('postgres', - "CREATE PUBLICATION regress_mypub FOR ALL TABLES;"); + "CREATE PUBLICATION regress_mypub FOR ALL TABLES;" +); my $publisher_connstr = $publisher->connstr . ' dbname=postgres'; @@ -160,4 +161,115 @@ is($standby1->safe_psql('postgres', "t", 'logical slot has synced as true on standby'); +################################################## +# Test to confirm that restart_lsn and confirmed_flush_lsn of the logical slot +# on the primary is synced to the standby +################################################## + +$standby1->append_conf('postgresql.conf', "sync_replication_slots = on"); +$standby1->reload; + +# Insert data on the primary +$primary->safe_psql( + 'postgres', qq[ + CREATE TABLE tab_int (a int PRIMARY KEY); + INSERT INTO tab_int SELECT generate_series(1, 10); +]); + +# Subscribe to the new table data and wait for it to arrive +$subscriber1->safe_psql( + 'postgres', qq[ + CREATE TABLE tab_int (a int PRIMARY KEY); + ALTER SUBSCRIPTION regress_mysub1 REFRESH PUBLICATION; +]); + +$subscriber1->wait_for_subscription_sync; + +# Do not allow any further advancement of the restart_lsn and +# confirmed_flush_lsn for the lsub1_slot. +$subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 DISABLE"); + +# Wait for the replication slot to become inactive on the publisher +$primary->poll_query_until( + 'postgres', + "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active='f'", + 1); + +# Get the restart_lsn for the logical slot lsub1_slot on the primary +my $primary_restart_lsn = $primary->safe_psql('postgres', + "SELECT restart_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';"); + +# Get the confirmed_flush_lsn for the logical slot lsub1_slot on the primary +my $primary_flush_lsn = $primary->safe_psql('postgres', + "SELECT confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';"); + +# Confirm that restart_lsn and of confirmed_flush_lsn lsub1_slot slot are synced +# to the standby +ok( $standby1->poll_query_until( + 'postgres', + "SELECT '$primary_restart_lsn' = restart_lsn AND '$primary_flush_lsn' = confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';"), + 'restart_lsn and confirmed_flush_lsn of slot lsub1_slot synced to standby'); + +################################################## +# Test that a synchronized slot can not be decoded, altered or dropped by the user +################################################## + +# Disable hot_standby_feedback temporarily to stop slot sync worker otherwise +# the concerned testing scenarios here may be interrupted by different error: +# 'ERROR: replication slot is active for PID ..' +$standby1->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = off;'); +$standby1->restart; + +# Attempting to perform logical decoding on a synced slot should result in an error +($result, $stdout, $stderr) = $standby1->psql('postgres', + "select * from pg_logical_slot_get_changes('lsub1_slot',NULL,NULL);"); +ok($stderr =~ /ERROR: cannot use replication slot "lsub1_slot" for logical decoding/, + "logical decoding is not allowed on synced slot"); + +# Attempting to alter a synced slot should result in an error +($result, $stdout, $stderr) = $standby1->psql( + 'postgres', + qq[ALTER_REPLICATION_SLOT lsub1_slot (failover);], + replication => 'database'); +ok($stderr =~ /ERROR: cannot alter replication slot "lsub1_slot"/, + "synced slot on standby cannot be altered"); + +# Attempting to drop a synced slot should result in an error +($result, $stdout, $stderr) = $standby1->psql('postgres', + "SELECT pg_drop_replication_slot('lsub1_slot');"); +ok($stderr =~ /ERROR: cannot drop replication slot "lsub1_slot"/, + "synced slot on standby cannot be dropped"); + +# Enable hot_standby_feedback and restart standby +$standby1->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = on;'); +$standby1->restart; + +################################################## +# Promote the standby1 to primary. Confirm that: +# a) the slot 'lsub1_slot' is retained on the new primary +# b) logical replication for regress_mysub1 is resumed successfully after failover +################################################## +$standby1->promote; + +# Update subscription with the new primary's connection info +$subscriber1->safe_psql('postgres', + "ALTER SUBSCRIPTION regress_mysub1 CONNECTION '$standby1_conninfo'; + ALTER SUBSCRIPTION regress_mysub1 ENABLE; "); + +# Confirm the synced slot 'lsub1_slot' is retained on the new primary +is($standby1->safe_psql('postgres', + q{SELECT slot_name FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';}), + 'lsub1_slot', + 'synced slot retained on the new primary'); + +# Insert data on the new primary +$standby1->safe_psql('postgres', + "INSERT INTO tab_int SELECT generate_series(11, 20);"); +$standby1->wait_for_catchup('regress_mysub1'); + +# Confirm that data in tab_int replicated on the subscriber +is( $subscriber1->safe_psql('postgres', q{SELECT count(*) FROM tab_int;}), + "20", + 'data replicated from the new primary'); + done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 91433d439b..a35e399f0c 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2325,6 +2325,7 @@ RelocationBufferInfo RelptrFreePageBtree RelptrFreePageManager RelptrFreePageSpanLeader +RemoteSlot RenameStmt ReopenPtrType ReorderBuffer @@ -2584,6 +2585,7 @@ SlabBlock SlabContext SlabSlot SlotNumber +SlotSyncWorkerCtxStruct SlruCtl SlruCtlData SlruErrorCause -- 2.34.1