From e41322e2578cbf7d077f9c205837db98e0617ad5 Mon Sep 17 00:00:00 2001 From: Shveta Malik Date: Wed, 7 Feb 2024 17:15:41 +0530 Subject: [PATCH v80_2 2/4] Add a new slotsync worker Be enabling slot synchronization, all the failover logical replication slots on the primary (assuming configurations are appropriate) are automatically created on the physical standbys and are synced periodically. Slot-sync worker on the standby server ping the primary server at regular intervals to get the necessary failover logical slots information and create/update the slots locally. The slots that no longer require synchronization are automatically dropped by the worker. The nap time of the worker is tuned according to the activity on the primary. The worker waits for a period of time before the next synchronization, with the duration varying based on whether any slots were updated during the last cycle. A new parameter sync_replication_slots enables or disables this new process. --- doc/src/sgml/config.sgml | 18 + doc/src/sgml/logicaldecoding.sgml | 5 +- src/backend/access/transam/xlogrecovery.c | 15 + src/backend/postmaster/postmaster.c | 80 +- .../libpqwalreceiver/libpqwalreceiver.c | 3 + src/backend/replication/logical/slotsync.c | 757 ++++++++++++++++-- src/backend/replication/slot.c | 16 +- src/backend/replication/slotfuncs.c | 2 +- src/backend/replication/walsender.c | 2 +- src/backend/storage/ipc/ipci.c | 2 +- src/backend/storage/lmgr/proc.c | 13 +- src/backend/utils/activity/pgstat_io.c | 1 + .../utils/activity/wait_event_names.txt | 2 + src/backend/utils/init/miscinit.c | 9 +- src/backend/utils/init/postinit.c | 7 +- src/backend/utils/misc/guc_tables.c | 10 + src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/miscadmin.h | 1 + src/include/replication/slotsync.h | 23 +- .../t/040_standby_failover_slots_sync.pl | 80 +- src/tools/pgindent/typedefs.list | 2 +- 21 files changed, 942 insertions(+), 107 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 037a3b8a64..02d28a0be9 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4943,6 +4943,24 @@ ANY num_sync ( + sync_replication_slots (boolean) + + sync_replication_slots configuration parameter + + + + + It enables a physical standby to synchronize logical failover slots + from the primary server so that logical subscribers are not blocked + after failover. + + + It is disabled by default. This parameter can only be set in the + postgresql.conf file or on the server command line. + + + diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 1438441d90..c0156d7cd3 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -373,7 +373,10 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU failover option of the CREATE SUBSCRIPTION command), and then calling pg_sync_replication_slots - on the standby. For the synchronization to work, it is mandatory to + on the standby. By setting + sync_replication_slots + on the standby, the failover slots can be synchronized periodically in + the slotsync worker. For the synchronization to work, it is mandatory to have a physical replication slot between the primary and the standby, and hot_standby_feedback must be enabled on the standby. It is also necessary to specify a valid diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 0bb472da27..68f48c052c 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -49,6 +49,7 @@ #include "postmaster/bgwriter.h" #include "postmaster/startup.h" #include "replication/slot.h" +#include "replication/slotsync.h" #include "replication/walreceiver.h" #include "storage/fd.h" #include "storage/ipc.h" @@ -1467,6 +1468,20 @@ FinishWalRecovery(void) */ XLogShutdownWalRcv(); + /* + * Shutdown the slot sync workers to prevent potential conflicts between + * user processes and slotsync workers after a promotion. + * + * We do not update the 'synced' column from true to false here, as any + * failed update could leave 'synced' column false for some slots. This + * could cause issues during slot sync after restarting the server as a + * standby. While updating after switching to the new timeline is an + * option, it does not simplify the handling for 'synced' column. + * Therefore, we retain the 'synced' column as true after promotion as it + * may provide useful information about the slot origin. + */ + ShutDownSlotSync(); + /* * We are now done reading the xlog from stream. Turn off streaming * recovery to force fetching the files (which would be required at end of diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index feb471dd1d..73e41c89f3 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -115,6 +115,7 @@ #include "postmaster/syslogger.h" #include "postmaster/walsummarizer.h" #include "replication/logicallauncher.h" +#include "replication/slotsync.h" #include "replication/walsender.h" #include "storage/fd.h" #include "storage/ipc.h" @@ -167,11 +168,11 @@ * they will never become live backends. dead_end children are not assigned a * PMChildSlot. dead_end children have bkend_type NORMAL. * - * "Special" children such as the startup, bgwriter and autovacuum launcher - * tasks are not in this list. They are tracked via StartupPID and other - * pid_t variables below. (Thus, there can't be more than one of any given - * "special" child process type. We use BackendList entries for any child - * process there can be more than one of.) + * "Special" children such as the startup, bgwriter, autovacuum launcher and + * slot sync worker tasks are not in this list. They are tracked via StartupPID + * and other pid_t variables below. (Thus, there can't be more than one of any + * given "special" child process type. We use BackendList entries for any + * child process there can be more than one of.) */ typedef struct bkend { @@ -254,7 +255,8 @@ static pid_t StartupPID = 0, WalSummarizerPID = 0, AutoVacPID = 0, PgArchPID = 0, - SysLoggerPID = 0; + SysLoggerPID = 0, + SlotSyncWorkerPID = 0; /* Startup process's status */ typedef enum @@ -458,6 +460,10 @@ static void InitPostmasterDeathWatchHandle(void); (pmState == PM_RECOVERY || pmState == PM_HOT_STANDBY))) && \ PgArchCanRestart()) +#define SlotSyncWorkerAllowed() \ + (sync_replication_slots && pmState == PM_HOT_STANDBY && \ + SlotSyncWorkerCanRestart()) + #ifdef EXEC_BACKEND #ifdef WIN32 @@ -1830,6 +1836,10 @@ ServerLoop(void) if (PgArchPID == 0 && PgArchStartupAllowed()) PgArchPID = StartArchiver(); + /* If we need to start a slot sync worker, try to do that now */ + if (SlotSyncWorkerPID == 0 && SlotSyncWorkerAllowed()) + SlotSyncWorkerPID = StartSlotSyncWorker(); + /* If we need to signal the autovacuum launcher, do so now */ if (avlauncher_needs_signal) { @@ -2677,6 +2687,8 @@ process_pm_reload_request(void) signal_child(PgArchPID, SIGHUP); if (SysLoggerPID != 0) signal_child(SysLoggerPID, SIGHUP); + if (SlotSyncWorkerPID != 0) + signal_child(SlotSyncWorkerPID, SIGHUP); /* Reload authentication config files too */ if (!load_hba()) @@ -3034,6 +3046,8 @@ process_pm_child_exit(void) AutoVacPID = StartAutoVacLauncher(); if (PgArchStartupAllowed() && PgArchPID == 0) PgArchPID = StartArchiver(); + if (SlotSyncWorkerAllowed() && SlotSyncWorkerPID == 0) + SlotSyncWorkerPID = StartSlotSyncWorker(); /* workers may be scheduled to start now */ maybe_start_bgworkers(); @@ -3204,6 +3218,22 @@ process_pm_child_exit(void) continue; } + /* + * Was it the slot sync worker? Normal exit or FATAL exit can be + * ignored (FATAL can be caused by libpqwalreceiver on receiving + * shutdown request by the startup process during promotion); we'll + * start a new one at the next iteration of the postmaster's main + * loop, if necessary. Any other exit condition is treated as a crash. + */ + if (pid == SlotSyncWorkerPID) + { + SlotSyncWorkerPID = 0; + if (!EXIT_STATUS_0(exitstatus) && !EXIT_STATUS_1(exitstatus)) + HandleChildCrash(pid, exitstatus, + _("slot sync worker process")); + continue; + } + /* Was it one of our background workers? */ if (CleanupBackgroundWorker(pid, exitstatus)) { @@ -3408,7 +3438,7 @@ CleanupBackend(int pid, /* * HandleChildCrash -- cleanup after failed backend, bgwriter, checkpointer, - * walwriter, autovacuum, archiver or background worker. + * walwriter, autovacuum, archiver, slot sync worker or background worker. * * The objectives here are to clean up our local state about the child * process, and to signal all other remaining children to quickdie. @@ -3570,6 +3600,12 @@ HandleChildCrash(int pid, int exitstatus, const char *procname) else if (PgArchPID != 0 && take_action) sigquit_child(PgArchPID); + /* Take care of the slot sync worker too */ + if (pid == SlotSyncWorkerPID) + SlotSyncWorkerPID = 0; + else if (SlotSyncWorkerPID != 0 && take_action) + sigquit_child(SlotSyncWorkerPID); + /* We do NOT restart the syslogger */ if (Shutdown != ImmediateShutdown) @@ -3710,6 +3746,8 @@ PostmasterStateMachine(void) signal_child(WalReceiverPID, SIGTERM); if (WalSummarizerPID != 0) signal_child(WalSummarizerPID, SIGTERM); + if (SlotSyncWorkerPID != 0) + signal_child(SlotSyncWorkerPID, SIGTERM); /* checkpointer, archiver, stats, and syslogger may continue for now */ /* Now transition to PM_WAIT_BACKENDS state to wait for them to die */ @@ -3725,13 +3763,13 @@ PostmasterStateMachine(void) /* * PM_WAIT_BACKENDS state ends when we have no regular backends * (including autovac workers), no bgworkers (including unconnected - * ones), and no walwriter, autovac launcher or bgwriter. If we are - * doing crash recovery or an immediate shutdown then we expect the - * checkpointer to exit as well, otherwise not. The stats and - * syslogger processes are disregarded since they are not connected to - * shared memory; we also disregard dead_end children here. Walsenders - * and archiver are also disregarded, they will be terminated later - * after writing the checkpoint record. + * ones), and no walwriter, autovac launcher, bgwriter or slot sync + * worker. If we are doing crash recovery or an immediate shutdown + * then we expect the checkpointer to exit as well, otherwise not. The + * stats and syslogger processes are disregarded since they are not + * connected to shared memory; we also disregard dead_end children + * here. Walsenders and archiver are also disregarded, they will be + * terminated later after writing the checkpoint record. */ if (CountChildren(BACKEND_TYPE_ALL - BACKEND_TYPE_WALSND) == 0 && StartupPID == 0 && @@ -3741,7 +3779,8 @@ PostmasterStateMachine(void) (CheckpointerPID == 0 || (!FatalError && Shutdown < ImmediateShutdown)) && WalWriterPID == 0 && - AutoVacPID == 0) + AutoVacPID == 0 && + SlotSyncWorkerPID == 0) { if (Shutdown >= ImmediateShutdown || FatalError) { @@ -3839,6 +3878,7 @@ PostmasterStateMachine(void) Assert(CheckpointerPID == 0); Assert(WalWriterPID == 0); Assert(AutoVacPID == 0); + Assert(SlotSyncWorkerPID == 0); /* syslogger is not considered here */ pmState = PM_NO_CHILDREN; } @@ -4062,6 +4102,8 @@ TerminateChildren(int signal) signal_child(AutoVacPID, signal); if (PgArchPID != 0) signal_child(PgArchPID, signal); + if (SlotSyncWorkerPID != 0) + signal_child(SlotSyncWorkerPID, signal); } /* @@ -4874,6 +4916,7 @@ SubPostmasterMain(int argc, char *argv[]) */ if (strcmp(argv[1], "--forkbackend") == 0 || strcmp(argv[1], "--forkavlauncher") == 0 || + strcmp(argv[1], "--forkssworker") == 0 || strcmp(argv[1], "--forkavworker") == 0 || strcmp(argv[1], "--forkaux") == 0 || strcmp(argv[1], "--forkbgworker") == 0) @@ -4977,6 +5020,13 @@ SubPostmasterMain(int argc, char *argv[]) AutoVacWorkerMain(argc - 2, argv + 2); /* does not return */ } + if (strcmp(argv[1], "--forkssworker") == 0) + { + /* Restore basic shared memory pointers */ + InitShmemAccess(UsedShmemSegAddr); + + ReplSlotSyncWorkerMain(argc - 2, argv + 2); /* does not return */ + } if (strcmp(argv[1], "--forkbgworker") == 0) { /* do this as early as possible; in particular, before InitProcess() */ diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 9270d7b855..e40cdbe4d9 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -6,6 +6,9 @@ * loaded as a dynamic module to avoid linking the main server binary with * libpq. * + * Apart from walreceiver, the libpq-specific routines here are now being used + * by logical replication workers and slot sync worker as well. + * * Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group * * diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index e0e94314eb..694cca79e2 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -10,8 +10,7 @@ * * This file contains the code for slot synchronization on a physical standby * to fetch logical failover slots information from the primary server, create - * the slots on the standby and synchronize them. This is done on every call - * to SQL function pg_sync_replication_slots. + * the slots on the standby and synchronize them periodically. * * If on the physical standby, the restart_lsn and/or local catalog_xmin is * ahead of those on the remote then we cannot create the local standby slot @@ -19,12 +18,18 @@ * slot backwards and the standby might not have WALs retained for old LSN. * In this case, the slot will be marked as RS_TEMPORARY. Once the primary * server catches up, the slot will be marked as RS_PERSISTENT (which - * means sync-ready) after which we can call pg_sync_replication_slots() - * periodically to perform syncs. + * means sync-ready) and we can perform the sync periodically. * * Any standby synchronized slots will be dropped if they no longer need * to be synchronized. See comment atop drop_local_obsolete_slots() for more * details. + * + * The worker waits for a period of time before the next synchronization, with the + * duration varying based on whether any slots were updated during the last + * cycle. Refer to the comments above wait_for_slot_activity() for more details. + * + * The slot sync worker currently does not support synchronization on the + * cascading standby. *--------------------------------------------------------------------------- */ @@ -50,6 +55,7 @@ #include "replication/slotsync.h" #include "storage/ipc.h" #include "storage/lmgr.h" +#include "storage/proc.h" #include "storage/procarray.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" @@ -60,48 +66,90 @@ #include "utils/timeout.h" #include "utils/varlena.h" +/* + * Structure to hold information fetched from the primary server about a logical + * replication slot. + */ +typedef struct RemoteSlot +{ + char *name; + char *plugin; + char *database; + bool two_phase; + bool failover; + XLogRecPtr restart_lsn; + XLogRecPtr confirmed_lsn; + TransactionId catalog_xmin; + + /* RS_INVAL_NONE if valid, or the reason of invalidation */ + ReplicationSlotInvalidationCause invalidated; +} RemoteSlot; + /* * Struct for sharing information to control slot synchronization. * * The 'syncing' flag should be set to true in any process that is syncing * slots to prevent concurrent slot sync, which could lead to errors and slot * info overwrite. + * + * Slot sync worker's pid is needed by the startup process in order to + * shut it down during promotion. Startup process shuts down the slot + * sync worker and also sets stopSignaled=true to handle the race condition + * when postmaster has not noticed the promotion yet and thus may end up + * restarting slot sync worker. If stopSignaled is set, the worker will + * exit in such a case. + * + * The last_start_time is needed by postmaster to start the slot sync + * worker once per SLOTSYNC_RESTART_INTERVAL_SEC. In cases where a + * immediate restart is expected (e.g., slot sync GUCs change), slot + * sync worker will reset last_start_time before exiting, so that postmaster + * can start the worker without waiting for SLOTSYNC_RESTART_INTERVAL_SEC. */ -typedef struct SlotSyncCtxStruct +typedef struct SlotSyncWorkerCtxStruct { + pid_t pid; bool syncing; + bool stopSignaled; + time_t last_start_time; slock_t mutex; -} SlotSyncCtxStruct; +} SlotSyncWorkerCtxStruct; + +SlotSyncWorkerCtxStruct *SlotSyncWorker = NULL; + +/* GUC variable */ +bool sync_replication_slots = false; + +/* + * The sleep time (ms) between slot-sync cycles varies dynamically + * (within a MIN/MAX range) according to slot activity. See + * wait_for_slot_activity() for details. + */ +#define MIN_WORKER_NAPTIME_MS 200 +#define MAX_WORKER_NAPTIME_MS 30000 /* 30s */ +static long sleep_ms = MIN_WORKER_NAPTIME_MS; + +/* The restart interval for slot sync work used by postmaster */ +#define SLOTSYNC_RESTART_INTERVAL_SEC 10 -SlotSyncCtxStruct *SlotSyncCtx = NULL; +/* Flag to tell if we are in a slot sync worker process */ +static bool am_slotsync_worker = false; /* * Flag to tell if we are syncing replication slots. Unlike the 'syncing' flag - * in SlotSyncCtxStruct, this flag is true only if the current process is + * in SlotSyncWorkerCtxStruct, this flag is true only if the current process is * performing slot synchronization. This flag is also used as safe-guard - * to clean-up shared 'syncing' flag of SlotSyncCtxStruct if some problem + * to clean-up shared 'syncing' flag of SlotSyncWorkerCtxStruct, if some problem * happens while we are in the process of synchronization. */ static bool syncing_slots = false; -/* - * Structure to hold information fetched from the primary server about a logical - * replication slot. - */ -typedef struct RemoteSlot -{ - char *name; - char *plugin; - char *database; - bool two_phase; - bool failover; - XLogRecPtr restart_lsn; - XLogRecPtr confirmed_lsn; - TransactionId catalog_xmin; - /* RS_INVAL_NONE if valid, or the reason of invalidation */ - ReplicationSlotInvalidationCause invalidated; -} RemoteSlot; +static void ProcessSlotSyncInterrupts(WalReceiverConn *wrconn, bool restart); + +#ifdef EXEC_BACKEND +static pid_t slotsyncworker_forkexec(void); +#endif +NON_EXEC_STATIC void ReplSlotSyncWorkerMain(int argc, char *argv[]) pg_attribute_noreturn(); /* * If necessary, update local synced slot metadata based on the data from the @@ -424,13 +472,17 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) */ latestFlushPtr = GetStandbyFlushRecPtr(NULL); if (remote_slot->confirmed_lsn > latestFlushPtr) - ereport(ERROR, + { + ereport(am_slotsync_worker ? LOG : ERROR, errmsg("skipping slot synchronization as the received slot sync" " LSN %X/%X for slot \"%s\" is ahead of the standby position %X/%X", LSN_FORMAT_ARGS(remote_slot->confirmed_lsn), remote_slot->name, LSN_FORMAT_ARGS(latestFlushPtr))); + return false; + } + /* Search for the named slot */ if ((slot = SearchNamedReplicationSlot(remote_slot->name, true))) { @@ -586,17 +638,17 @@ synchronize_slots(WalReceiverConn *wrconn) XLogRecPtr latestWalEnd; bool started_tx = false; - SpinLockAcquire(&SlotSyncCtx->mutex); - if (SlotSyncCtx->syncing) + SpinLockAcquire(&SlotSyncWorker->mutex); + if (SlotSyncWorker->syncing) { - SpinLockRelease(&SlotSyncCtx->mutex); + SpinLockRelease(&SlotSyncWorker->mutex); ereport(ERROR, errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot synchronize replication slots concurrently")); } - SlotSyncCtx->syncing = true; - SpinLockRelease(&SlotSyncCtx->mutex); + SlotSyncWorker->syncing = true; + SpinLockRelease(&SlotSyncWorker->mutex); syncing_slots = true; @@ -725,9 +777,9 @@ synchronize_slots(WalReceiverConn *wrconn) if (started_tx) CommitTransactionCommand(); - SpinLockAcquire(&SlotSyncCtx->mutex); - SlotSyncCtx->syncing = false; - SpinLockRelease(&SlotSyncCtx->mutex); + SpinLockAcquire(&SlotSyncWorker->mutex); + SlotSyncWorker->syncing = false; + SpinLockRelease(&SlotSyncWorker->mutex); syncing_slots = false; @@ -735,20 +787,25 @@ synchronize_slots(WalReceiverConn *wrconn) } /* - * Validate the 'primary_slot_name' using the specified primary server - * connection. + * Checks the primary server info. + * + * Using the specified primary server connection, check whether we are a + * cascading standby. It also validates primary_slot_name for non-cascading + * standbys. */ -static void -validate_primary_slot_name(WalReceiverConn *wrconn) +static bool +check_primary_info(WalReceiverConn *wrconn, bool check_cascading, int elevel) { -#define PRIMARY_INFO_OUTPUT_COL_COUNT 1 +#define PRIMARY_INFO_OUTPUT_COL_COUNT 2 WalRcvExecResult *res; - Oid slotRow[PRIMARY_INFO_OUTPUT_COL_COUNT] = {BOOLOID}; + Oid slotRow[PRIMARY_INFO_OUTPUT_COL_COUNT] = {BOOLOID, BOOLOID}; StringInfoData cmd; bool isnull; TupleTableSlot *tupslot; bool valid; + bool remote_in_recovery; bool started_tx = false; + bool primary_info_valid = true; /* The syscache access in walrcv_exec() needs a transaction env. */ if (!IsTransactionState()) @@ -759,7 +816,7 @@ validate_primary_slot_name(WalReceiverConn *wrconn) initStringInfo(&cmd); appendStringInfo(&cmd, - "SELECT count(*) = 1" + "SELECT pg_is_in_recovery(), count(*) = 1" " FROM pg_catalog.pg_replication_slots" " WHERE slot_type='physical' AND slot_name=%s", quote_literal_cstr(PrimarySlotName)); @@ -778,30 +835,72 @@ validate_primary_slot_name(WalReceiverConn *wrconn) elog(ERROR, "failed to fetch tuple for the primary server slot specified by \"primary_slot_name\""); - valid = DatumGetBool(slot_getattr(tupslot, 1, &isnull)); + remote_in_recovery = DatumGetBool(slot_getattr(tupslot, 1, &isnull)); Assert(!isnull); - if (!valid) - ereport(ERROR, - errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("bad configuration for slot synchronization"), - /* translator: second %s is a GUC variable name */ - errdetail("The primary server slot \"%s\" specified by \"%s\" is not valid.", - PrimarySlotName, "primary_slot_name")); + if (check_cascading && remote_in_recovery) + { + /* + * No need to check further, just set primary_info_valid to false as + * we are a cascading standby. + */ + primary_info_valid = false; + } + else + { + /* + * We are not cascading standby, thus good to proceed with + * primary_slot_name validity check now. + */ + valid = DatumGetBool(slot_getattr(tupslot, 2, &isnull)); + Assert(!isnull); + + if (!valid) + { + primary_info_valid = false; + + ereport(elevel, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("bad configuration for slot synchronization"), + /* translator: second %s is a GUC variable name */ + errdetail("The primary server slot \"%s\" specified by \"%s\" is not valid.", + PrimarySlotName, "primary_slot_name")); + } + } ExecClearTuple(tupslot); walrcv_clear_result(res); if (started_tx) CommitTransactionCommand(); + + return primary_info_valid; } /* - * Validates if all necessary GUCs for slot synchronization are set - * appropriately, otherwise raise ERROR. + * Get database name from the conninfo. + * + * If dbname is extracted already from the conninfo, just return it. */ -void -ValidateSlotSyncParams(void) +static char * +get_dbname_from_conninfo(const char *conninfo) +{ + static char *dbname; + + if (dbname) + return dbname; + else + dbname = walrcv_get_dbname_from_conninfo(conninfo); + + return dbname; +} + +/* + * Return true if all necessary GUCs for slot synchronization are set + * appropriately, otherwise return false. + */ +bool +ValidateSlotSyncParams(int elevel) { char *dbname; @@ -812,11 +911,14 @@ ValidateSlotSyncParams(void) * be invalidated. */ if (PrimarySlotName == NULL || *PrimarySlotName == '\0') - ereport(ERROR, + { + ereport(elevel, /* translator: %s is a GUC variable name */ errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("bad configuration for slot synchronization"), errhint("\"%s\" must be defined.", "primary_slot_name")); + return false; + } /* * hot_standby_feedback must be enabled to cooperate with the physical @@ -824,40 +926,50 @@ ValidateSlotSyncParams(void) * catalog_xmin values on the standby. */ if (!hot_standby_feedback) - ereport(ERROR, + { + ereport(elevel, /* translator: %s is a GUC variable name */ errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("bad configuration for slot synchronization"), errhint("\"%s\" must be enabled.", "hot_standby_feedback")); + return false; + } /* * Logical decoding requires wal_level >= logical and we currently only * synchronize logical slots. */ if (wal_level < WAL_LEVEL_LOGICAL) - ereport(ERROR, + { + ereport(elevel, errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("bad configuration for slot synchronization"), errhint("\"wal_level\" must be >= logical.")); + return false; + } /* * The primary_conninfo is required to make connection to primary for * getting slots information. */ if (PrimaryConnInfo == NULL || *PrimaryConnInfo == '\0') - ereport(ERROR, + { + ereport(elevel, /* translator: %s is a GUC variable name */ errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("bad configuration for slot synchronization"), errhint("\"%s\" must be defined.", "primary_conninfo")); + return false; + } /* * The slot synchronization needs a database connection for walrcv_exec to * work. */ - dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo); + dbname = get_dbname_from_conninfo(PrimaryConnInfo); if (dbname == NULL) - ereport(ERROR, + { + ereport(elevel, /* * translator: 'dbname' is a specific option; %s is a GUC variable @@ -866,36 +978,531 @@ ValidateSlotSyncParams(void) errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("bad configuration for slot synchronization"), errhint("'dbname' must be specified in \"%s\".", "primary_conninfo")); + return false; + } + + return true; +} + + +/* + * Check that all necessary GUCs for slot synchronization are set + * appropriately. If not, sleep for MAX_WORKER_NAPTIME_MS and check again. + * The idea is to become no-op until we get valid GUCs values. + * + * If all checks pass, extracts the dbname from the primary_conninfo GUC and + * returns it. + */ +static void +ensure_valid_slotsync_params(void) +{ + int rc; + + /* Sanity check. */ + Assert(sync_replication_slots); + + for (;;) + { + if (ValidateSlotSyncParams(LOG)) + break; + + ereport(LOG, errmsg("skipping slot synchronization")); + + ProcessSlotSyncInterrupts(NULL, false); + + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + MAX_WORKER_NAPTIME_MS, + WAIT_EVENT_REPL_SLOTSYNC_MAIN); + + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); + } +} + +/* + * Re-read the config file. + * + * If any of the slot sync GUCs have changed, exit the worker and + * let it get restarted by the postmaster. The worker to be exited for + * restart purpose only if the caller passed restart as true. + */ +static void +slotsync_reread_config(bool restart) +{ + char *old_primary_conninfo = pstrdup(PrimaryConnInfo); + char *old_primary_slotname = pstrdup(PrimarySlotName); + bool old_sync_replication_slots = sync_replication_slots; + bool old_hot_standby_feedback = hot_standby_feedback; + bool conninfo_changed; + bool primary_slotname_changed; + + Assert(sync_replication_slots); + + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + + conninfo_changed = strcmp(old_primary_conninfo, PrimaryConnInfo) != 0; + primary_slotname_changed = strcmp(old_primary_slotname, PrimarySlotName) != 0; + pfree(old_primary_conninfo); + pfree(old_primary_slotname); + + if (old_sync_replication_slots != sync_replication_slots) + { + ereport(LOG, + /* translator: %s is a GUC variable name */ + errmsg("slot sync worker will shutdown because %s is disabled", "sync_replication_slots")); + proc_exit(0); + } + + /* The caller instructed to skip restart */ + if (!restart) + return; + + if (conninfo_changed || + primary_slotname_changed || + (old_hot_standby_feedback != hot_standby_feedback)) + { + ereport(LOG, + errmsg("slot sync worker will restart because of a parameter change")); + + /* + * Reset the last-start time for this worker so that the postmaster + * can restart it without waiting for SLOTSYNC_RESTART_INTERVAL_SEC. + */ + SlotSyncWorker->last_start_time = 0; + + proc_exit(0); + } + +} + +/* + * Interrupt handler for main loop of slot sync worker. + */ +static void +ProcessSlotSyncInterrupts(WalReceiverConn *wrconn, bool restart) +{ + CHECK_FOR_INTERRUPTS(); + + if (ShutdownRequestPending) + { + if (wrconn) + walrcv_disconnect(wrconn); + ereport(LOG, + errmsg("replication slot sync worker is shutting down on receiving SIGINT")); + proc_exit(0); + } + + if (ConfigReloadPending) + slotsync_reread_config(restart); +} + +/* + * Cleanup function for slotsync worker. + * + * Called on slotsync worker exit. + */ +static void +slotsync_worker_onexit(int code, Datum arg) +{ + SpinLockAcquire(&SlotSyncWorker->mutex); + SlotSyncWorker->pid = InvalidPid; + SpinLockRelease(&SlotSyncWorker->mutex); +} + +/* + * Sleep for long enough that we believe it's likely that the slots on primary + * get updated. + * + * If there is no slot activity the wait time between sync-cycles will double + * (to a maximum of 30s). If there is some slot activity the wait time between + * sync-cycles is reset to the minimum (200ms). + */ +static void +wait_for_slot_activity(bool some_slot_updated, bool recheck_primary_info) +{ + int rc; + + if (recheck_primary_info) + { + /* + * If we are on the cascading standby or primary_slot_name configured + * is not valid, then we will skip the sync and take a longer nap + * before we can do check_primary_info() again. + */ + sleep_ms = MAX_WORKER_NAPTIME_MS; + } + else if (!some_slot_updated) + { + /* + * No slots were updated, so double the sleep time, but not beyond the + * maximum allowable value. + */ + sleep_ms = Min(sleep_ms * 2, MAX_WORKER_NAPTIME_MS); + } + else + { + /* + * Some slots were updated since the last sleep, so reset the sleep + * time. + */ + sleep_ms = MIN_WORKER_NAPTIME_MS; + } + + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + sleep_ms, + WAIT_EVENT_REPL_SLOTSYNC_MAIN); + + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); +} + +/* + * The main loop of our worker process. + * + * It connects to the primary server, fetches logical failover slots + * information periodically in order to create and sync the slots. + */ +NON_EXEC_STATIC void +ReplSlotSyncWorkerMain(int argc, char *argv[]) +{ + WalReceiverConn *wrconn = NULL; + char *dbname; + char *err; + sigjmp_buf local_sigjmp_buf; + StringInfoData app_name; + bool primary_info_valid; + + am_slotsync_worker = true; + + MyBackendType = B_SLOTSYNC_WORKER; + + init_ps_display(NULL); + + SetProcessingMode(InitProcessing); + + /* + * Create a per-backend PGPROC struct in shared memory. We must do this + * before we access any shared memory. + */ + InitProcess(); + + /* + * Early initialization. + */ + BaseInit(); + + Assert(SlotSyncWorker != NULL); + + SpinLockAcquire(&SlotSyncWorker->mutex); + Assert(SlotSyncWorker->pid == InvalidPid); + + /* + * Startup process signaled the slot sync worker to stop, so if meanwhile + * postmaster ended up starting the worker again, exit. + */ + if (SlotSyncWorker->stopSignaled) + { + SpinLockRelease(&SlotSyncWorker->mutex); + proc_exit(0); + } + + /* Advertise our PID so that the startup process can kill us on promotion */ + SlotSyncWorker->pid = MyProcPid; + SpinLockRelease(&SlotSyncWorker->mutex); + + ereport(LOG, errmsg("replication slot sync worker started")); + + on_shmem_exit(slotsync_worker_onexit, (Datum) 0); + + /* Setup signal handling */ + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGINT, SignalHandlerForShutdownRequest); + pqsignal(SIGTERM, die); + pqsignal(SIGFPE, FloatExceptionHandler); + pqsignal(SIGUSR1, procsignal_sigusr1_handler); + pqsignal(SIGUSR2, SIG_IGN); + pqsignal(SIGPIPE, SIG_IGN); + pqsignal(SIGCHLD, SIG_DFL); + + /* + * Establishes SIGALRM handler and initialize timeout module. It is needed + * by InitPostgres to register different timeouts. + */ + InitializeTimeouts(); + + /* Load the libpq-specific functions */ + load_file("libpqwalreceiver", false); + + /* + * If an exception is encountered, processing resumes here. + * + * We just need to clean up, report the error, and go away. + * + * If we do not have this handling here, then since this worker process + * operates at the bottom of the exception stack, ERRORs turn into FATALs. + * Therefore, we create our own exception handler to catch ERRORs. + */ + if (sigsetjmp(local_sigjmp_buf, 1) != 0) + { + /* since not using PG_TRY, must reset error stack by hand */ + error_context_stack = NULL; + + /* Prevents interrupts while cleaning up */ + HOLD_INTERRUPTS(); + + /* Report the error to the server log */ + EmitErrorReport(); + + /* + * We can now go away. Note that because we called InitProcess, a + * callback was registered to do ProcKill, which will clean up + * necessary state. + */ + proc_exit(0); + } + + /* We can now handle ereport(ERROR) */ + PG_exception_stack = &local_sigjmp_buf; + + /* + * Unblock signals (they were blocked when the postmaster forked us) + */ + sigprocmask(SIG_SETMASK, &UnBlockSig, NULL); + + ensure_valid_slotsync_params(); + + dbname = get_dbname_from_conninfo(PrimaryConnInfo); + + /* + * Connect to the database specified by user in primary_conninfo. We need + * a database connection for walrcv_exec to work. Please see comments atop + * libpqrcv_exec. + */ + InitPostgres(dbname, InvalidOid, NULL, InvalidOid, 0, NULL); + + SetProcessingMode(NormalProcessing); + + initStringInfo(&app_name); + if (cluster_name[0]) + appendStringInfo(&app_name, "%s_%s", cluster_name, "slotsyncworker"); + else + appendStringInfo(&app_name, "%s", "slotsyncworker"); + + /* + * Establish the connection to the primary server for slots + * synchronization. + */ + wrconn = walrcv_connect(PrimaryConnInfo, false, false, false, + app_name.data, + &err); + pfree(app_name.data); + + if (!wrconn) + ereport(ERROR, + errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the primary server: %s", err)); + + /* + * Using the specified primary server connection, check whether we are + * cascading standby and validates primary_slot_name for + * non-cascading-standbys. + */ + primary_info_valid = check_primary_info(wrconn, true, LOG); + + /* Main wait loop */ + for (;;) + { + bool some_slot_updated = false; + + ProcessSlotSyncInterrupts(wrconn, true); + + if (primary_info_valid) + some_slot_updated = synchronize_slots(wrconn); + + wait_for_slot_activity(some_slot_updated, !primary_info_valid); + + /* + * If the standby was promoted then what was previously a cascading + * standby might no longer be one, so recheck each time. + */ + if (!primary_info_valid) + check_primary_info(wrconn, true, LOG); + } + + /* + * The slot sync worker can not get here because it will only stop when it + * receives a SIGINT from the startup process, or when there is an error. + */ + Assert(false); +} + +/* + * Shut down the slot sync worker. + */ +void +ShutDownSlotSync(void) +{ + SpinLockAcquire(&SlotSyncWorker->mutex); + + SlotSyncWorker->stopSignaled = true; + + if (SlotSyncWorker->pid == InvalidPid) + { + SpinLockRelease(&SlotSyncWorker->mutex); + return; + } + SpinLockRelease(&SlotSyncWorker->mutex); + + kill(SlotSyncWorker->pid, SIGINT); + + /* Wait for it to die */ + for (;;) + { + int rc; + + /* Wait a bit, we don't expect to have to wait long */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 10L, WAIT_EVENT_REPL_SLOTSYNC_SHUTDOWN); + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + SpinLockAcquire(&SlotSyncWorker->mutex); + + /* Is it gone? */ + if (SlotSyncWorker->pid == InvalidPid) + break; + + SpinLockRelease(&SlotSyncWorker->mutex); + } + + SpinLockRelease(&SlotSyncWorker->mutex); +} + +#ifdef EXEC_BACKEND +/* + * The forkexec routine for the slot sync worker process. + * + * Format up the arglist, then fork and exec. + */ +static pid_t +slotsyncworker_forkexec(void) +{ + char *av[10]; + int ac = 0; + + av[ac++] = "postgres"; + av[ac++] = "--forkssworker"; + av[ac++] = NULL; /* filled in by postmaster_forkexec */ + av[ac] = NULL; + + Assert(ac < lengthof(av)); + + return postmaster_forkexec(ac, av); +} +#endif + +/* + * SlotSyncWorkerCanRestart + * + * Returns true if the worker is allowed to restart if enough time has + * passed (SLOTSYNC_RESTART_INTERVAL_SEC) since it was launched last. + * Otherwise returns false. + * + * This is a safety valve to protect against continuous respawn attempts if the + * worker is dying immediately at launch. Note that since we will retry to + * launch the worker from the postmaster main loop, we will get another + * chance later. + */ +bool +SlotSyncWorkerCanRestart(void) +{ + time_t curtime = time(NULL); + + /* Return false if too soon since last start. */ + if ((unsigned int) (curtime - SlotSyncWorker->last_start_time) < + (unsigned int) SLOTSYNC_RESTART_INTERVAL_SEC) + return false; + + SlotSyncWorker->last_start_time = curtime; + + return true; +} + +/* + * Main entry point for slot sync worker process, to be called from the + * postmaster. + */ +int +StartSlotSyncWorker(void) +{ + pid_t pid; + +#ifdef EXEC_BACKEND + switch ((pid = slotsyncworker_forkexec())) + { +#else + switch ((pid = fork_process())) + { + case 0: + /* in postmaster child ... */ + InitPostmasterChild(); + + /* Close the postmaster's sockets */ + ClosePostmasterPorts(false); + + ReplSlotSyncWorkerMain(0, NULL); + break; +#endif + case -1: + ereport(LOG, + (errmsg("could not fork slot sync worker process: %m"))); + return 0; + + default: + return (int) pid; + } + + /* shouldn't get here */ + return 0; } /* * Is current process syncing replication slots ? */ bool -IsSyncingReplicationSlots(void) +IsLogicalSlotSyncWorker(void) { - return syncing_slots; + return am_slotsync_worker || syncing_slots; } /* * Allocate and initialize the shared memory of slot synchronization. */ void -SlotSyncShmemInit(void) +SlotSyncWorkerShmemInit(void) { Size size; bool found; - size = sizeof(SlotSyncCtxStruct); + size = sizeof(SlotSyncWorkerCtxStruct); size = MAXALIGN(size); - SlotSyncCtx = (SlotSyncCtxStruct *) - ShmemInitStruct("Slot Sync Data", size, &found); + SlotSyncWorker = (SlotSyncWorkerCtxStruct *) + ShmemInitStruct("Slot Sync Worker Data", size, &found); if (!found) { - memset(SlotSyncCtx, 0, size); - SpinLockInit(&SlotSyncCtx->mutex); + memset(SlotSyncWorker, 0, size); + SlotSyncWorker->pid = InvalidPid; + SpinLockInit(&SlotSyncWorker->mutex); } } @@ -912,9 +1519,9 @@ slot_sync_shmem_exit(int code, Datum arg) * without resetting the flag. So, we need to clean up shared memory * here before exiting. */ - SpinLockAcquire(&SlotSyncCtx->mutex); - SlotSyncCtx->syncing = false; - SpinLockRelease(&SlotSyncCtx->mutex); + SpinLockAcquire(&SlotSyncWorker->mutex); + SlotSyncWorker->syncing = false; + SpinLockRelease(&SlotSyncWorker->mutex); } } @@ -937,7 +1544,7 @@ SyncReplicationSlots(WalReceiverConn *wrconn) { PG_TRY(); { - validate_primary_slot_name(wrconn); + check_primary_info(wrconn, false, ERROR); (void) synchronize_slots(wrconn); } @@ -945,9 +1552,9 @@ SyncReplicationSlots(WalReceiverConn *wrconn) { if (syncing_slots) { - SpinLockAcquire(&SlotSyncCtx->mutex); - SlotSyncCtx->syncing = false; - SpinLockRelease(&SlotSyncCtx->mutex); + SpinLockAcquire(&SlotSyncWorker->mutex); + SlotSyncWorker->syncing = false; + SpinLockRelease(&SlotSyncWorker->mutex); syncing_slots = false; } diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index c443e949b2..802361f0da 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -273,7 +273,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, * synchronization because we need to retain the same values as the remote * slot. */ - if (failover && RecoveryInProgress() && !IsSyncingReplicationSlots()) + if (failover && RecoveryInProgress() && !IsLogicalSlotSyncWorker()) ereport(ERROR, errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot enable failover for a replication slot" @@ -1214,6 +1214,20 @@ restart: * concurrently being dropped by a backend connected to another DB. * * That's fairly unlikely in practice, so we'll just bail out. + * + * The slot sync worker holds a shared lock on the database before + * operating on synced logical slots to avoid conflict with the drop + * happening here. The persistent synced slots are thus safe but there + * is a possibility that the slot sync worker has created a temporary + * slot (which stays active even on release) and we are trying to drop + * the same here. In practice, the chances of hitting this scenario is + * very less as during slot synchronization, the temporary slot is + * immediately converted to persistent and thus is safe due to the + * shared lock taken on the database. So for the time being, we'll + * just bail out in such a scenario. + * + * XXX: If needed, we can consider shutting down slot sync worker + * before trying to drop synced temporary slots here. */ if (active_pid) ereport(ERROR, diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index cf528db17a..4446817e55 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -973,7 +973,7 @@ pg_sync_replication_slots(PG_FUNCTION_ARGS) /* Load the libpq-specific functions */ load_file("libpqwalreceiver", false); - ValidateSlotSyncParams(); + ValidateSlotSyncParams(ERROR); initStringInfo(&app_name); if (cluster_name[0]) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 2d94379f1a..6f2467b3b1 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -3394,7 +3394,7 @@ GetStandbyFlushRecPtr(TimeLineID *tli) TimeLineID receiveTLI; XLogRecPtr result; - Assert(am_cascading_walsender || IsSyncingReplicationSlots()); + Assert(am_cascading_walsender || IsLogicalSlotSyncWorker()); /* * We can safely send what's already been replayed. Also, if walreceiver diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index d729166312..127c69cb40 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -348,7 +348,7 @@ CreateOrAttachShmemStructs(void) WalSummarizerShmemInit(); PgArchShmemInit(); ApplyLauncherShmemInit(); - SlotSyncShmemInit(); + SlotSyncWorkerShmemInit(); /* * Set up other modules that need some shared memory space diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index e5977548fe..937e626040 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -40,6 +40,7 @@ #include "pgstat.h" #include "postmaster/autovacuum.h" #include "replication/slot.h" +#include "replication/slotsync.h" #include "replication/syncrep.h" #include "replication/walsender.h" #include "storage/condition_variable.h" @@ -364,8 +365,12 @@ InitProcess(void) * child; this is so that the postmaster can detect it if we exit without * cleaning up. (XXX autovac launcher currently doesn't participate in * this; it probably should.) + * + * Slot sync worker also does not participate in it, see comments atop + * 'struct bkend' in postmaster.c. */ - if (IsUnderPostmaster && !IsAutoVacuumLauncherProcess()) + if (IsUnderPostmaster && !IsAutoVacuumLauncherProcess() && + !IsLogicalSlotSyncWorker()) MarkPostmasterChildActive(); /* @@ -934,8 +939,12 @@ ProcKill(int code, Datum arg) * This process is no longer present in shared memory in any meaningful * way, so tell the postmaster we've cleaned up acceptably well. (XXX * autovac launcher should be included here someday) + * + * Slot sync worker is also not a postmaster child, so skip this shared + * memory related processing here. */ - if (IsUnderPostmaster && !IsAutoVacuumLauncherProcess()) + if (IsUnderPostmaster && !IsAutoVacuumLauncherProcess() && + !IsLogicalSlotSyncWorker()) MarkPostmasterChildInactive(); /* wake autovac launcher if needed -- see comments in FreeWorkerInfo */ diff --git a/src/backend/utils/activity/pgstat_io.c b/src/backend/utils/activity/pgstat_io.c index 43c393d6fe..9d6e067382 100644 --- a/src/backend/utils/activity/pgstat_io.c +++ b/src/backend/utils/activity/pgstat_io.c @@ -338,6 +338,7 @@ pgstat_tracks_io_bktype(BackendType bktype) case B_BG_WORKER: case B_BG_WRITER: case B_CHECKPOINTER: + case B_SLOTSYNC_WORKER: case B_STANDALONE_BACKEND: case B_STARTUP: case B_WAL_SENDER: diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 6464386b77..b52afd4eac 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -53,6 +53,8 @@ LOGICAL_APPLY_MAIN "Waiting in main loop of logical replication apply process." LOGICAL_LAUNCHER_MAIN "Waiting in main loop of logical replication launcher process." LOGICAL_PARALLEL_APPLY_MAIN "Waiting in main loop of logical replication parallel apply process." RECOVERY_WAL_STREAM "Waiting in main loop of startup process for WAL to arrive, during streaming recovery." +REPL_SLOTSYNC_MAIN "Waiting in main loop of slot sync worker." +REPL_SLOTSYNC_SHUTDOWN "Waiting for slot sync worker to shut down." SYSLOGGER_MAIN "Waiting in main loop of syslogger process." WAL_RECEIVER_MAIN "Waiting in main loop of WAL receiver process." WAL_SENDER_MAIN "Waiting in main loop of WAL sender process." diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c index 23f77a59e5..26aaf292c5 100644 --- a/src/backend/utils/init/miscinit.c +++ b/src/backend/utils/init/miscinit.c @@ -40,6 +40,7 @@ #include "postmaster/interrupt.h" #include "postmaster/pgarch.h" #include "postmaster/postmaster.h" +#include "replication/slotsync.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/latch.h" @@ -293,6 +294,9 @@ GetBackendTypeDesc(BackendType backendType) case B_LOGGER: backendDesc = "logger"; break; + case B_SLOTSYNC_WORKER: + backendDesc = "slotsyncworker"; + break; case B_STANDALONE_BACKEND: backendDesc = "standalone backend"; break; @@ -835,9 +839,10 @@ InitializeSessionUserIdStandalone(void) { /* * This function should only be called in single-user mode, in autovacuum - * workers, and in background workers. + * workers, in slot sync worker and in background workers. */ - Assert(!IsUnderPostmaster || IsAutoVacuumWorkerProcess() || IsBackgroundWorker); + Assert(!IsUnderPostmaster || IsAutoVacuumWorkerProcess() || + IsLogicalSlotSyncWorker() || IsBackgroundWorker); /* call only once */ Assert(!OidIsValid(AuthenticatedUserId)); diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index 753009b459..1319f9570a 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -881,10 +881,11 @@ InitPostgres(const char *in_dbname, Oid dboid, * Perform client authentication if necessary, then figure out our * postgres user ID, and see if we are a superuser. * - * In standalone mode and in autovacuum worker processes, we use a fixed - * ID, otherwise we figure it out from the authenticated user name. + * In standalone mode, autovacuum worker processes and slot sync worker + * process, we use a fixed ID, otherwise we figure it out from the + * authenticated user name. */ - if (bootstrap || IsAutoVacuumWorkerProcess()) + if (bootstrap || IsAutoVacuumWorkerProcess() || IsLogicalSlotSyncWorker()) { InitializeSessionUserIdStandalone(); am_superuser = true; diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 7fe58518d7..83136e35a6 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -67,6 +67,7 @@ #include "postmaster/walwriter.h" #include "replication/logicallauncher.h" #include "replication/slot.h" +#include "replication/slotsync.h" #include "replication/syncrep.h" #include "storage/bufmgr.h" #include "storage/large_object.h" @@ -2054,6 +2055,15 @@ struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"sync_replication_slots", PGC_SIGHUP, REPLICATION_STANDBY, + gettext_noop("Enables a physical standby to synchronize logical failover slots from the primary server."), + }, + &sync_replication_slots, + false, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index da10b43dac..dfd1313c94 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -361,6 +361,7 @@ #wal_retrieve_retry_interval = 5s # time to wait before retrying to # retrieve WAL after a failed attempt #recovery_min_apply_delay = 0 # minimum delay for applying changes during recovery +#sync_replication_slots = off # enables slot synchronization on the physical standby from the primary # - Subscribers - diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 0b01c1f093..65819cb7a7 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -332,6 +332,7 @@ typedef enum BackendType B_BG_WRITER, B_CHECKPOINTER, B_LOGGER, + B_SLOTSYNC_WORKER, B_STANDALONE_BACKEND, B_STARTUP, B_WAL_RECEIVER, diff --git a/src/include/replication/slotsync.h b/src/include/replication/slotsync.h index 94c948d187..b723a46f64 100644 --- a/src/include/replication/slotsync.h +++ b/src/include/replication/slotsync.h @@ -14,10 +14,27 @@ #include "replication/walreceiver.h" -extern bool IsSyncingReplicationSlots(void); +extern PGDLLIMPORT bool sync_replication_slots; + +/* + * GUCs needed by slot sync worker to connect to the primary + * server and carry on with slots synchronization. + */ +extern PGDLLIMPORT char *PrimaryConnInfo; +extern PGDLLIMPORT char *PrimarySlotName; + +extern bool IsLogicalSlotSyncWorker(void); extern void SyncReplicationSlots(WalReceiverConn *wrconn); -extern void ValidateSlotSyncParams(void); -extern void SlotSyncShmemInit(void); +extern bool ValidateSlotSyncParams(int elevel); +extern void SlotSyncWorkerShmemInit(void); extern void SlotSyncInitialize(void); +#ifdef EXEC_BACKEND +extern void ReplSlotSyncWorkerMain(int argc, char *argv[]) pg_attribute_noreturn(); +#endif +extern int StartSlotSyncWorker(void); +extern bool SlotSyncWorkerCanRestart(void); +extern void ShutDownSlotSync(void); +extern void SlotSyncWorkerShmemInit(void); + #endif /* SLOTSYNC_H */ diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl index c14fb62fa6..049f14cf21 100644 --- a/src/test/recovery/t/040_standby_failover_slots_sync.pl +++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl @@ -18,7 +18,8 @@ $publisher->init(allows_streaming => 'logical'); $publisher->start; $publisher->safe_psql('postgres', - "CREATE PUBLICATION regress_mypub FOR ALL TABLES;"); + "CREATE PUBLICATION regress_mypub FOR ALL TABLES;" +); my $publisher_connstr = $publisher->connstr . ' dbname=postgres'; @@ -208,4 +209,81 @@ ok($stderr =~ /ERROR: cannot alter replication slot "lsub1_slot"/, ok($stderr =~ /ERROR: cannot drop replication slot "lsub1_slot"/, "synced slot on standby cannot be dropped"); +################################################## +# Test to confirm that restart_lsn and confirmed_flush_lsn of the logical slot +# on the primary is synced to the standby +################################################## + +$standby1->append_conf('postgresql.conf', "sync_replication_slots = on"); +$standby1->reload; + +# Insert data on the primary +$primary->safe_psql( + 'postgres', qq[ + CREATE TABLE tab_int (a int PRIMARY KEY); + INSERT INTO tab_int SELECT generate_series(1, 10); +]); + +# Subscribe to the new table data and wait for it to arrive +$subscriber1->safe_psql( + 'postgres', qq[ + CREATE TABLE tab_int (a int PRIMARY KEY); + ALTER SUBSCRIPTION regress_mysub1 REFRESH PUBLICATION; +]); + +$subscriber1->wait_for_subscription_sync; + +# Do not allow any further advancement of the restart_lsn and +# confirmed_flush_lsn for the lsub1_slot. +$subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 DISABLE"); + +# Wait for the replication slot to become inactive on the publisher +$primary->poll_query_until( + 'postgres', + "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active='f'", + 1); + +# Get the restart_lsn for the logical slot lsub1_slot on the primary +my $primary_restart_lsn = $primary->safe_psql('postgres', + "SELECT restart_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';"); + +# Get the confirmed_flush_lsn for the logical slot lsub1_slot on the primary +my $primary_flush_lsn = $primary->safe_psql('postgres', + "SELECT confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';"); + +# Confirm that restart_lsn and of confirmed_flush_lsn lsub1_slot slot are synced +# to the standby +ok( $standby1->poll_query_until( + 'postgres', + "SELECT '$primary_restart_lsn' = restart_lsn AND '$primary_flush_lsn' = confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';"), + 'restart_lsn and confirmed_flush_lsn of slot lsub1_slot synced to standby'); + +################################################## +# Promote the standby1 to primary. Confirm that: +# a) the slot 'lsub1_slot' is retained on the new primary +# b) logical replication for regress_mysub1 is resumed successfully after failover +################################################## +$standby1->promote; + +# Update subscription with the new primary's connection info +$subscriber1->safe_psql('postgres', + "ALTER SUBSCRIPTION regress_mysub1 CONNECTION '$standby1_conninfo'; + ALTER SUBSCRIPTION regress_mysub1 ENABLE; "); + +# Confirm the synced slot 'lsub1_slot' is retained on the new primary +is($standby1->safe_psql('postgres', + q{SELECT slot_name FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';}), + 'lsub1_slot', + 'synced slot retained on the new primary'); + +# Insert data on the new primary +$standby1->safe_psql('postgres', + "INSERT INTO tab_int SELECT generate_series(11, 20);"); +$standby1->wait_for_catchup('regress_mysub1'); + +# Confirm that data in tab_int replicated on the subscriber +is( $subscriber1->safe_psql('postgres', q{SELECT count(*) FROM tab_int;}), + "20", + 'data replicated from the new primary'); + done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index d808aad8b0..a35e399f0c 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2585,7 +2585,7 @@ SlabBlock SlabContext SlabSlot SlotNumber -SlotSyncCtxStruct +SlotSyncWorkerCtxStruct SlruCtl SlruCtlData SlruErrorCause -- 2.34.1