From e3669efdf973a2c57c9eaba4b499d15fbb3116b8 Mon Sep 17 00:00:00 2001 From: Shveta Malik Date: Fri, 5 Apr 2024 10:16:03 +0530 Subject: [PATCH v3] Handle stopSignaled during sync function call. Currently, promotion related handling is missing in the slot sync SQL function pg_sync_replication_slots(). Here is the background on how it is done in slot sync worker: During promotion, the startup process in order to shut down the slot-sync worker, sets the 'stopSignaled' flag, sends the shut-down signal, and waits for slot sync worker to exit. Meanwhile if the postmaster has not noticed the promotion yet, it may end up restarting slot sync worker. In such a case, the worker exits if 'stopSignaled' is set. Since there is a chance that the user (or any of his scripts/tools) may execute SQL function pg_sync_replication_slots() in parallel to promotion, such handling is needed in this SQL function as well, The attached patch attempts to implement the same. Changes are: 1) If pg_sync_replication_slots() is already running when the promotion is triggered, ShutDownSlotSync() checks the 'SlotSyncCtx->syncing' flag as well and waits for it to become false i.e. waits till parallel running SQL function is finished. 2) If pg_sync_replication_slots() is invoked when promotion is already in progress, pg_sync_replication_slots() respects the 'stopSignaled' flag set by the startup process and becomes a no-op. --- src/backend/replication/logical/slotsync.c | 120 +++++++++++++++------ 1 file changed, 90 insertions(+), 30 deletions(-) diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index bda0de52db..aade737b73 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -807,20 +807,6 @@ synchronize_slots(WalReceiverConn *wrconn) " FROM pg_catalog.pg_replication_slots" " WHERE failover and NOT temporary"; - SpinLockAcquire(&SlotSyncCtx->mutex); - if (SlotSyncCtx->syncing) - { - SpinLockRelease(&SlotSyncCtx->mutex); - ereport(ERROR, - errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("cannot synchronize replication slots concurrently")); - } - - SlotSyncCtx->syncing = true; - SpinLockRelease(&SlotSyncCtx->mutex); - - syncing_slots = true; - /* The syscache access in walrcv_exec() needs a transaction env. */ if (!IsTransactionState()) { @@ -937,12 +923,6 @@ synchronize_slots(WalReceiverConn *wrconn) if (started_tx) CommitTransactionCommand(); - SpinLockAcquire(&SlotSyncCtx->mutex); - SlotSyncCtx->syncing = false; - SpinLockRelease(&SlotSyncCtx->mutex); - - syncing_slots = false; - return some_slot_updated; } @@ -1242,6 +1222,47 @@ wait_for_slot_activity(bool some_slot_updated) ResetLatch(MyLatch); } +/* + * Check syncing flag and error out if it is concurrent sync call. + * Otherwise, set syncing flag. + */ +static void +check_and_set_syncing_flag(bool acquire_lock) +{ + + /* Acquire spinLock if caller asked so */ + if (acquire_lock) + SpinLockAcquire(&SlotSyncCtx->mutex); + + if (SlotSyncCtx->syncing) + { + SpinLockRelease(&SlotSyncCtx->mutex); + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot synchronize replication slots concurrently")); + } + + SlotSyncCtx->syncing = true; + + if (acquire_lock) + SpinLockRelease(&SlotSyncCtx->mutex); + + syncing_slots = true; +} + +/* + * Reset syncing flag. + */ +static void +reset_syncing_flag() +{ + SpinLockAcquire(&SlotSyncCtx->mutex); + SlotSyncCtx->syncing = false; + SpinLockRelease(&SlotSyncCtx->mutex); + + syncing_slots = false; +}; + /* * The main loop of our worker process. * @@ -1424,8 +1445,12 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len) ProcessSlotSyncInterrupts(wrconn); + check_and_set_syncing_flag(true); + some_slot_updated = synchronize_slots(wrconn); + reset_syncing_flag(); + wait_for_slot_activity(some_slot_updated); } @@ -1471,6 +1496,9 @@ update_synced_slots_inactive_since(void) { Assert(SlotIsLogical(s)); + /* The slot must not be acquired by any process */ + Assert(s->active_pid == 0); + /* Use the same inactive_since time for all the slots. */ if (now == 0) now = GetCurrentTimestamp(); @@ -1486,6 +1514,10 @@ update_synced_slots_inactive_since(void) /* * Shut down the slot sync worker. + * + * It sends signal to shutdown slot sync worker. It also waits till + * the slot sync worker has exited and pg_sync_replication_slots() + * has finished. */ void ShutDownSlotSync(void) @@ -1494,7 +1526,11 @@ ShutDownSlotSync(void) SlotSyncCtx->stopSignaled = true; - if (SlotSyncCtx->pid == InvalidPid) + /* + * Return if neither the slot sync worker is running nor the function + * pg_sync_replication_slots() is executing. + */ + if ((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing) { SpinLockRelease(&SlotSyncCtx->mutex); update_synced_slots_inactive_since(); @@ -1502,9 +1538,10 @@ ShutDownSlotSync(void) } SpinLockRelease(&SlotSyncCtx->mutex); - kill(SlotSyncCtx->pid, SIGINT); + if (SlotSyncCtx->pid != InvalidPid) + kill(SlotSyncCtx->pid, SIGINT); - /* Wait for it to die */ + /* Wait for worker to exit and SQL function to finish */ for (;;) { int rc; @@ -1522,8 +1559,11 @@ ShutDownSlotSync(void) SpinLockAcquire(&SlotSyncCtx->mutex); - /* Is it gone? */ - if (SlotSyncCtx->pid == InvalidPid) + /* + * Confirm that both the worker and the function + * pg_sync_replication_slots() are done. + */ + if ((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing) break; SpinLockRelease(&SlotSyncCtx->mutex); @@ -1615,11 +1655,7 @@ slotsync_failure_callback(int code, Datum arg) * without resetting the flag. So, we need to clean up shared memory * and reset the flag here. */ - SpinLockAcquire(&SlotSyncCtx->mutex); - SlotSyncCtx->syncing = false; - SpinLockRelease(&SlotSyncCtx->mutex); - - syncing_slots = false; + reset_syncing_flag(); } walrcv_disconnect(wrconn); @@ -1634,9 +1670,33 @@ SyncReplicationSlots(WalReceiverConn *wrconn) { PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn)); { + SpinLockAcquire(&SlotSyncCtx->mutex); + + /* + * Startup process signaled the slot sync to stop, so if meanwhile + * user has invoked slot sync SQL function, error out. + */ + if (SlotSyncCtx->stopSignaled) + { + SpinLockRelease(&SlotSyncCtx->mutex); + ereport(ERROR, + errmsg("promotion in progress, can not synchronize replication slots")); + } + + /* + * Advertise that we are syncing, so that the startup process knows + * about this sync call during promotion. + */ + check_and_set_syncing_flag(false); + + SpinLockRelease(&SlotSyncCtx->mutex); + validate_remote_info(wrconn); synchronize_slots(wrconn); + + /* We are done with sync, so reset sync flag */ + reset_syncing_flag(); } PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn)); } -- 2.34.1