From 1cc185edb4b9852d21ac7d97343c342bb4f16a2d Mon Sep 17 00:00:00 2001 From: Shveta Malik Date: Fri, 5 Apr 2024 10:16:03 +0530 Subject: [PATCH v6] Handle stopSignaled during sync function call. This patch attempts to fix two issues: 1) It implements promotion related handling needed for slot sync SQL function pg_sync_replication_slots() simiar to slot sync worker. Since there is a chance that the user (or any of his scripts/tools) may execute SQL function pg_sync_replication_slots() in parallel to promotion, both the promotion and SQL call need to handle it well. Changes are: a) If pg_sync_replication_slots() is already running when the promotion is triggered, ShutDownSlotSync() checks the 'SlotSyncCtx->syncing' flag as well and waits for it to become false i.e. waits till parallel running SQL function is finished. b) If pg_sync_replication_slots() is invoked when promotion is already in progress, pg_sync_replication_slots() respects the 'stopSignaled' flag set by the startup process and errors out. 2) This patch fixes another issue in slot sync worker where SignalHandlerForShutdownRequest() needs to be registered *before* setting SlotSyncCtx->pid, otherwise the slotsync worker could miss to handle SIGINT sent by startup process(ShutDownSlotSync). To be consistent, all signale handlers' registration is moved to a prior location before we set pid. --- doc/src/sgml/func.sgml | 4 + src/backend/replication/logical/slotsync.c | 200 +++++++++++++-------- 2 files changed, 128 insertions(+), 76 deletions(-) diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 8dfb42ad4d..65a5fa6bb2 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -29348,6 +29348,10 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset standby server. Temporary synced slots, if any, cannot be used for logical decoding and must be dropped after promotion. See for details. + Note that this function cannot be executed if + + sync_replication_slots is enabled and the slotsync + worker is already running to perform the synchronization of slots. diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index bda0de52db..c85779b247 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -92,9 +92,6 @@ * is expected (e.g., slot sync GUCs change), slot sync worker will reset * last_start_time before exiting, so that postmaster can start the worker * without waiting for SLOTSYNC_RESTART_INTERVAL_SEC. - * - * All the fields except 'syncing' are used only by slotsync worker. - * 'syncing' is used both by worker and SQL function pg_sync_replication_slots. */ typedef struct SlotSyncCtxStruct { @@ -807,20 +804,6 @@ synchronize_slots(WalReceiverConn *wrconn) " FROM pg_catalog.pg_replication_slots" " WHERE failover and NOT temporary"; - SpinLockAcquire(&SlotSyncCtx->mutex); - if (SlotSyncCtx->syncing) - { - SpinLockRelease(&SlotSyncCtx->mutex); - ereport(ERROR, - errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("cannot synchronize replication slots concurrently")); - } - - SlotSyncCtx->syncing = true; - SpinLockRelease(&SlotSyncCtx->mutex); - - syncing_slots = true; - /* The syscache access in walrcv_exec() needs a transaction env. */ if (!IsTransactionState()) { @@ -937,12 +920,6 @@ synchronize_slots(WalReceiverConn *wrconn) if (started_tx) CommitTransactionCommand(); - SpinLockAcquire(&SlotSyncCtx->mutex); - SlotSyncCtx->syncing = false; - SpinLockRelease(&SlotSyncCtx->mutex); - - syncing_slots = false; - return some_slot_updated; } @@ -1199,7 +1176,20 @@ static void slotsync_worker_onexit(int code, Datum arg) { SpinLockAcquire(&SlotSyncCtx->mutex); + SlotSyncCtx->pid = InvalidPid; + + /* + * If syncing_slots is true, it indicates that the process errored out + * without resetting the flag. So, we need to clean up shared memory and + * reset the flag here. + */ + if (syncing_slots) + { + SlotSyncCtx->syncing = false; + syncing_slots = false; + } + SpinLockRelease(&SlotSyncCtx->mutex); } @@ -1242,6 +1232,63 @@ wait_for_slot_activity(bool some_slot_updated) ResetLatch(MyLatch); } +/* + * Check stopSignaled and syncing flags. Emit error if promotion has + * already set stopSignaled or if it is concurrent sync call. Otherwise, + * set 'syncing' flag and pid info. + */ +static void +check_flags_and_set_sync_info(pid_t worker_pid) +{ + SpinLockAcquire(&SlotSyncCtx->mutex); + + /* The worker pid must not be already assigned in SlotSyncCtx */ + Assert(worker_pid == InvalidPid || SlotSyncCtx->pid == InvalidPid); + + /* + * Startup process signaled the slot sync machinery to stop, so if + * meanwhile postmaster ended up starting the worker again or user has + * invoked pg_sync_replication_slots(), error out. + */ + if (SlotSyncCtx->stopSignaled) + { + SpinLockRelease(&SlotSyncCtx->mutex); + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot synchronize replication slots when standby promotion is ongoing")); + } + + if (SlotSyncCtx->syncing) + { + SpinLockRelease(&SlotSyncCtx->mutex); + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot synchronize replication slots concurrently")); + } + + /* Advertise our PID so that the startup process can kill us on promotion */ + SlotSyncCtx->pid = worker_pid; + + SlotSyncCtx->syncing = true; + + SpinLockRelease(&SlotSyncCtx->mutex); + + syncing_slots = true; +} + +/* + * Reset syncing flag. + */ +static void +reset_syncing_flag() +{ + SpinLockAcquire(&SlotSyncCtx->mutex); + SlotSyncCtx->syncing = false; + SpinLockRelease(&SlotSyncCtx->mutex); + + syncing_slots = false; +}; + /* * The main loop of our worker process. * @@ -1278,47 +1325,6 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len) Assert(SlotSyncCtx != NULL); - SpinLockAcquire(&SlotSyncCtx->mutex); - Assert(SlotSyncCtx->pid == InvalidPid); - - /* - * Startup process signaled the slot sync worker to stop, so if meanwhile - * postmaster ended up starting the worker again, exit. - */ - if (SlotSyncCtx->stopSignaled) - { - SpinLockRelease(&SlotSyncCtx->mutex); - proc_exit(0); - } - - /* Advertise our PID so that the startup process can kill us on promotion */ - SlotSyncCtx->pid = MyProcPid; - SpinLockRelease(&SlotSyncCtx->mutex); - - ereport(LOG, errmsg("slot sync worker started")); - - /* Register it as soon as SlotSyncCtx->pid is initialized. */ - before_shmem_exit(slotsync_worker_onexit, (Datum) 0); - - /* Setup signal handling */ - pqsignal(SIGHUP, SignalHandlerForConfigReload); - pqsignal(SIGINT, SignalHandlerForShutdownRequest); - pqsignal(SIGTERM, die); - pqsignal(SIGFPE, FloatExceptionHandler); - pqsignal(SIGUSR1, procsignal_sigusr1_handler); - pqsignal(SIGUSR2, SIG_IGN); - pqsignal(SIGPIPE, SIG_IGN); - pqsignal(SIGCHLD, SIG_DFL); - - /* - * Establishes SIGALRM handler and initialize timeout module. It is needed - * by InitPostgres to register different timeouts. - */ - InitializeTimeouts(); - - /* Load the libpq-specific functions */ - load_file("libpqwalreceiver", false); - /* * If an exception is encountered, processing resumes here. * @@ -1350,6 +1356,32 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len) /* We can now handle ereport(ERROR) */ PG_exception_stack = &local_sigjmp_buf; + /* Setup signal handling */ + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGINT, SignalHandlerForShutdownRequest); + pqsignal(SIGTERM, die); + pqsignal(SIGFPE, FloatExceptionHandler); + pqsignal(SIGUSR1, procsignal_sigusr1_handler); + pqsignal(SIGUSR2, SIG_IGN); + pqsignal(SIGPIPE, SIG_IGN); + pqsignal(SIGCHLD, SIG_DFL); + + check_flags_and_set_sync_info(MyProcPid); + + ereport(LOG, errmsg("slot sync worker started")); + + /* Register it as soon as SlotSyncCtx->pid is initialized. */ + before_shmem_exit(slotsync_worker_onexit, (Datum) 0); + + /* + * Establishes SIGALRM handler and initialize timeout module. It is needed + * by InitPostgres to register different timeouts. + */ + InitializeTimeouts(); + + /* Load the libpq-specific functions */ + load_file("libpqwalreceiver", false); + /* * Unblock signals (they were blocked when the postmaster forked us) */ @@ -1457,8 +1489,8 @@ update_synced_slots_inactive_since(void) if (!StandbyMode) return; - /* The slot sync worker mustn't be running by now */ - Assert(SlotSyncCtx->pid == InvalidPid); + /* The slot sync worker or SQL function mustn't be running by now */ + Assert((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing); LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); @@ -1471,6 +1503,9 @@ update_synced_slots_inactive_since(void) { Assert(SlotIsLogical(s)); + /* The slot must not be acquired by any process */ + Assert(s->active_pid == 0); + /* Use the same inactive_since time for all the slots. */ if (now == 0) now = GetCurrentTimestamp(); @@ -1486,6 +1521,10 @@ update_synced_slots_inactive_since(void) /* * Shut down the slot sync worker. + * + * It sends signal to shutdown slot sync worker. It also waits till + * the slot sync worker has exited and pg_sync_replication_slots() + * has finished. */ void ShutDownSlotSync(void) @@ -1494,7 +1533,11 @@ ShutDownSlotSync(void) SlotSyncCtx->stopSignaled = true; - if (SlotSyncCtx->pid == InvalidPid) + /* + * Return if neither the slot sync worker is running nor the function + * pg_sync_replication_slots() is executing. + */ + if (!SlotSyncCtx->syncing) { SpinLockRelease(&SlotSyncCtx->mutex); update_synced_slots_inactive_since(); @@ -1502,9 +1545,10 @@ ShutDownSlotSync(void) } SpinLockRelease(&SlotSyncCtx->mutex); - kill(SlotSyncCtx->pid, SIGINT); + if (SlotSyncCtx->pid != InvalidPid) + kill(SlotSyncCtx->pid, SIGINT); - /* Wait for it to die */ + /* Wait for slot sync to end */ for (;;) { int rc; @@ -1522,8 +1566,11 @@ ShutDownSlotSync(void) SpinLockAcquire(&SlotSyncCtx->mutex); - /* Is it gone? */ - if (SlotSyncCtx->pid == InvalidPid) + /* + * Confirm that both the worker and the function + * pg_sync_replication_slots() are done. + */ + if (!SlotSyncCtx->syncing) break; SpinLockRelease(&SlotSyncCtx->mutex); @@ -1615,11 +1662,7 @@ slotsync_failure_callback(int code, Datum arg) * without resetting the flag. So, we need to clean up shared memory * and reset the flag here. */ - SpinLockAcquire(&SlotSyncCtx->mutex); - SlotSyncCtx->syncing = false; - SpinLockRelease(&SlotSyncCtx->mutex); - - syncing_slots = false; + reset_syncing_flag(); } walrcv_disconnect(wrconn); @@ -1634,9 +1677,14 @@ SyncReplicationSlots(WalReceiverConn *wrconn) { PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn)); { + check_flags_and_set_sync_info(InvalidPid); + validate_remote_info(wrconn); synchronize_slots(wrconn); + + /* We are done with sync, so reset sync flag */ + reset_syncing_flag(); } PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn)); } -- 2.34.1