From cb3317e81549d6b8d1ccac36a4f4d6be22d46039 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Thu, 20 Nov 2025 12:11:01 -0800 Subject: [PATCH v29 2/2] FIXUP: remove status_change_inprogress flag. --- src/backend/access/transam/xlog.c | 4 + src/backend/replication/logical/logicalctl.c | 184 +++++------------- .../utils/activity/wait_event_names.txt | 1 - .../recovery/t/050_effective_wal_level.pl | 33 +--- 4 files changed, 62 insertions(+), 160 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 055ea4dc7b2..4e7c7061e9a 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -8695,6 +8695,10 @@ xlog_redo(XLogReaderState *record) /* Update the status on shared memory */ memcpy(&logical_decoding, XLogRecGetData(record), sizeof(bool)); + + /* The record must always change the status actually */ + Assert(IsLogicalDecodingEnabled() != logical_decoding); + UpdateLogicalDecodingStatus(logical_decoding, true); if (InRecovery && InHotStandby) diff --git a/src/backend/replication/logical/logicalctl.c b/src/backend/replication/logical/logicalctl.c index 8bc8111cfbb..aee45c790d9 100644 --- a/src/backend/replication/logical/logicalctl.c +++ b/src/backend/replication/logical/logicalctl.c @@ -23,20 +23,19 @@ * * While activation occurs synchronously right after creating the first * logical slot, deactivation happens asynchronously through the checkpointer - * process. This design choice exists because deactivation requires waiting - * for concurrent attempts to update the logical decoding status, which can be - * problematic when the process is holding interrupts. This situation arises - * when a process cleans up temporary or ephemeral slots on error or at process - * exit without releasing temporary slots explicitly. This lazy approach has - * a drawback: it may take longer to change the effective_wal_level and disable - * logical decoding, especially when the checkpointer is busy with other tasks. - * However, since dropping or invalidating the last slot should not happen - * frequently, we chose this approach in all deactivation cases for simpler code - * implementation, even though the lazy approach is required only in error cases - * or at process exit time in principle. In the future, we could address this - * limitation either by using a dedicated worker instead of the checkpointer, or - * by implementing synchronous waiting during slot drops if workloads are - * significantly affected by the lazy deactivation of logical decoding. + * process. This design avoids a race condition at the end of recovery; see + * the comments in UpdateLogicalDecodingStatusEndOfRecovery() for details. + * Asynchronous deactivation also avoids excessive toggling of the logical + * decoding status in workloads that repeatedly create and drop a single + * logical slot. On the other hand, this lazy approach can delay changes + * to effective_wal_level and the disabling logical decoding, especially + * when the checkpointer is busy with other tasks. We chose this lazy approach + * in all deactivation paths to keep the implementation simple, even though + * laziness is strictly required only for end-of-recovery cases. Future work + * might address this limitation either by using a dedicated worker instead + * of the checkpointer, or by implementing synchronous waiting during slot + * drops if workloads are significantly affected by the lazy deactivation + * of logical decoding. * * Standby servers use the primary server's effective_wal_level and logical * decoding status. Unlike normal activation and deactivation, these @@ -96,14 +95,8 @@ typedef struct LogicalDecodingCtlData /* True if logical decoding is available in the system */ bool logical_decoding_enabled; - /* True while the logical decoding status is being changed */ - bool status_change_inprogress; - /* True if logical decoding might need to be disabled */ bool pending_disable; - - /* Condition variable signaled when a status change completes */ - ConditionVariable cv; } LogicalDecodingCtlData; static LogicalDecodingCtlData *LogicalDecodingCtl = NULL; @@ -117,7 +110,6 @@ bool XLogLogicalInfo = false; static void update_xlog_logical_info(void); static void abort_logical_decoding_activation(int code, Datum arg); -static bool start_logical_decoding_status_change(bool new_status); static void write_logical_decoding_status_update_record(bool status); Size @@ -136,10 +128,7 @@ LogicalDecodingCtlShmemInit(void) &found); if (!found) - { MemSet(LogicalDecodingCtl, 0, LogicalDecodingCtlShmemSize()); - ConditionVariableInit(&LogicalDecodingCtl->cv); - } } /* @@ -271,10 +260,15 @@ write_logical_decoding_status_update_record(bool status) static void abort_logical_decoding_activation(int code, Datum arg) { - Assert(LogicalDecodingCtl->status_change_inprogress); + Assert(MyReplicationSlot); + Assert(!LogicalDecodingCtl->logical_decoding_enabled); elog(DEBUG1, "aborting logical decoding activation process"); + /* + * Abort the change to xlog_logical_info. We don't need to check + * CheckLogicalSlotExists() as we're still holding a logical slot. + */ LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); LogicalDecodingCtl->xlog_logical_info = false; LWLockRelease(LogicalDecodingControlLock); @@ -287,67 +281,6 @@ abort_logical_decoding_activation(int code, Datum arg) * strictly required. */ EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO); - - LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); - LogicalDecodingCtl->status_change_inprogress = false; - LWLockRelease(LogicalDecodingControlLock); - - /* Let waiters know the status change completed */ - ConditionVariableBroadcast(&LogicalDecodingCtl->cv); -} - -/* - * Performs preparation work required before changing the logical decoding - * status. If the status change is required, it sets - * LogicalDecodingCtl->status_change_inprogress, and returns true. Otherwise, - * if it's not required (e.g., logical slots exist) it returns false. - */ -static bool -start_logical_decoding_status_change(bool new_status) -{ -retry: - LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); - - /* If a status change is in-progress, we need to wait for completion */ - if (LogicalDecodingCtl->status_change_inprogress) - { - /* Release the lock and wait for someone to complete the transition */ - LWLockRelease(LogicalDecodingControlLock); - - elog(DEBUG1, - "waiting for logical decoding status change to complete"); - - ConditionVariableSleep(&LogicalDecodingCtl->cv, - WAIT_EVENT_LOGICAL_DECODING_STATUS_CHANGE); - - goto retry; - } - - /* Return if we don't need to change the status */ - if (LogicalDecodingCtl->logical_decoding_enabled == new_status) - { - LogicalDecodingCtl->pending_disable = false; - LWLockRelease(LogicalDecodingControlLock); - return false; - } - - /* - * When attempting to disable logical decoding, if there is at least one - * valid logical slot, we cannot disable it. - */ - if (!new_status && CheckLogicalSlotExists()) - { - LogicalDecodingCtl->pending_disable = false; - LWLockRelease(LogicalDecodingControlLock); - return false; - } - - /* Mark the state transition is in-progress */ - LogicalDecodingCtl->status_change_inprogress = true; - - LWLockRelease(LogicalDecodingControlLock); - - return true; } /* @@ -384,25 +317,31 @@ EnsureLogicalDecodingEnabled(void) return; } - /* Prepare and start the activation process if it's disabled */ - if (!start_logical_decoding_status_change(true)) + LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); + + /* Return if someone already started to enable logical decoding */ + if (LogicalDecodingCtl->xlog_logical_info) + { + LogicalDecodingCtl->pending_disable = false; + LWLockRelease(LogicalDecodingControlLock); return; + } /* - * Ensure we reset the activation process if we cancelled or errored out - * below + * Set logical info WAL logging in shmem. All process starts after this + * point will include the information required by logical decoding to WAL + * records. + */ + LogicalDecodingCtl->xlog_logical_info = true; + + LWLockRelease(LogicalDecodingControlLock); + + /* + * Ensure to abort the activation process in cases where there in an + * interruption during the wait. */ PG_ENSURE_ERROR_CLEANUP(abort_logical_decoding_activation, (Datum) 0); { - /* - * Set logical info WAL logging in shmem. All process starts after - * this point will include the information required by logical - * decoding to WAL records. - */ - LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); - LogicalDecodingCtl->xlog_logical_info = true; - LWLockRelease(LogicalDecodingControlLock); - /* * Tell all running processes to reflect the xlog_logical_info update, * and wait. This ensures that all running processes have enabled @@ -432,13 +371,7 @@ EnsureLogicalDecodingEnabled(void) */ LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); LogicalDecodingCtl->logical_decoding_enabled = true; - LWLockRelease(LogicalDecodingControlLock); - write_logical_decoding_status_update_record(true); - - /* Complete the transition */ - LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); - LogicalDecodingCtl->status_change_inprogress = false; LogicalDecodingCtl->pending_disable = false; LWLockRelease(LogicalDecodingControlLock); @@ -446,9 +379,6 @@ EnsureLogicalDecodingEnabled(void) ereport(LOG, errmsg("logical decoding is enabled upon creating a new logical replication slot")); - - /* Let waiters know the work finished */ - ConditionVariableBroadcast(&LogicalDecodingCtl->cv); } /* @@ -509,15 +439,14 @@ DisableLogicalDecodingIfNecessary(void) if (!pending_disable) return; - /* Prepare and start the deactivation process if it's enabled */ - if (!start_logical_decoding_status_change(false)) - return; + LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); - /* - * We don't need PG_ENSURE_ERROR_CLEANUP() to abort the deactivation - * process as all subsequent operations are expected to be - * non-interruptible and not to throw an ERROR or a FATAL. - */ + if (!LogicalDecodingCtl->xlog_logical_info || CheckLogicalSlotExists()) + { + LogicalDecodingCtl->pending_disable = false; + LWLockRelease(LogicalDecodingControlLock); + return; + } START_CRIT_SECTION(); @@ -526,19 +455,20 @@ DisableLogicalDecodingIfNecessary(void) * information WAL logging in order to ensure that no logical decoding * processes WAL records with insufficient information. */ - LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); LogicalDecodingCtl->logical_decoding_enabled = false; - LWLockRelease(LogicalDecodingControlLock); /* Write the WAL to disable logical decoding on standbys too */ if (XLogStandbyInfoActive()) write_logical_decoding_status_update_record(false); /* Now disable logical information WAL logging */ - LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); LogicalDecodingCtl->xlog_logical_info = false; + LogicalDecodingCtl->pending_disable = false; + LWLockRelease(LogicalDecodingControlLock); + END_CRIT_SECTION(); + /* * Tell all running processes to reflect the xlog_logical_info update. * Unlike when enabling logical decoding, we don't need to wait for all @@ -549,19 +479,8 @@ DisableLogicalDecodingIfNecessary(void) */ EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO); - /* Complete the transition */ - LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); - LogicalDecodingCtl->status_change_inprogress = false; - LogicalDecodingCtl->pending_disable = false; - LWLockRelease(LogicalDecodingControlLock); - - END_CRIT_SECTION(); - ereport(LOG, errmsg("logical decoding is disabled because there is no valid logical replication slot")); - - /* Let waiters know the work finished */ - ConditionVariableBroadcast(&LogicalDecodingCtl->cv); } /* @@ -575,7 +494,6 @@ UpdateLogicalDecodingStatusEndOfRecovery(void) bool new_status = false; Assert(RecoveryInProgress()); - Assert(!LogicalDecodingCtl->status_change_inprogress); /* * With 'minimal' WAL level, there have not been logical slots during @@ -619,9 +537,7 @@ UpdateLogicalDecodingStatusEndOfRecovery(void) * Update both the logical decoding status and logical WAL logging * status. Unlike toggling these status during non-recovery, we don't * need to worry about the operation order as WAL writes are still not - * permitted. Similarly, we don't need PG_ENSURE_ERROR_CLEANUP() to - * abort the status change process neither, as erroring out during - * recovery leads to a server shutdown. + * permitted. */ UpdateLogicalDecodingStatus(new_status, false); diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 2ec24f39f25..daaaab0d888 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -135,7 +135,6 @@ HASH_GROW_BUCKETS_ELECT "Waiting to elect a Parallel Hash participant to allocat HASH_GROW_BUCKETS_REALLOCATE "Waiting for an elected Parallel Hash participant to finish allocating more buckets." HASH_GROW_BUCKETS_REINSERT "Waiting for other Parallel Hash participants to finish inserting tuples into new buckets." LOGICAL_APPLY_SEND_DATA "Waiting for a logical replication leader apply process to send data to a parallel apply process." -LOGICAL_DECODING_STATUS_CHANGE "Waiting for logical decoding status change." LOGICAL_PARALLEL_APPLY_STATE_CHANGE "Waiting for a logical replication parallel apply process to change state." LOGICAL_SYNC_DATA "Waiting for a logical replication remote server to send data for initial table synchronization." LOGICAL_SYNC_STATE_CHANGE "Waiting for a logical replication remote server to change state." diff --git a/src/test/recovery/t/050_effective_wal_level.pl b/src/test/recovery/t/050_effective_wal_level.pl index 150a78b904e..16adc054ca5 100644 --- a/src/test/recovery/t/050_effective_wal_level.pl +++ b/src/test/recovery/t/050_effective_wal_level.pl @@ -382,14 +382,13 @@ if ( $ENV{enable_injection_points} eq 'yes' test_wal_level($primary, "replica|replica", "effective_wal_level got decreased to 'replica' on primary"); - # Start two psql sessions to test the case where they try to enable logical - # decoding concurrently. - my $psql_create_slot_1 = $primary->background_psql('postgres'); - my $psql_create_slot_2 = $primary->background_psql('postgres'); + # Start a psql session to test the case where the activation process is + # interrupted. + my $psql_create_slot = $primary->background_psql('postgres'); # Start the logical decoding activation process upon creating the logical # slot, but it will wait due to the injection point. - $psql_create_slot_1->query_until( + $psql_create_slot->query_until( qr/create_slot_canceled/, q(\echo create_slot_canceled select injection_points_set_local(); @@ -401,18 +400,8 @@ select pg_create_logical_replication_slot('slot_canceled', 'pgoutput'); $primary->wait_for_event('client backend', 'logical-decoding-activation'); note("injection_point 'logical-decoding-activation' is reached"); - # Start another activation process but it needs to wait for the first - # activation process to complete. - $psql_create_slot_2->query_until( - qr/create_slot_success/, - q(\echo create_slot_success -select pg_create_logical_replication_slot('test_slot', 'pgoutput'); -\q -)); - $primary->wait_for_event('client backend', 'LogicalDecodingStatusChange'); - - # Cancel the backend initiated by $psql_create_slot_1, aborting its activation - # process, letting the second activation process proceed. + # Cancel the backend initiated by $psql_create_slot, aborting its activation + # process. $primary->safe_psql( 'postgres', qq[ @@ -421,14 +410,8 @@ select pg_cancel_backend(pid) from pg_stat_activity where query ~ 'slot_canceled # Check if the backend aborted the activation process. $primary->wait_for_log("aborting logical decoding activation process"); - - # Wait for the logical slot 'test_slot' to be created. - $primary->poll_query_until('postgres', - qq[select exists (select 1 from pg_replication_slots where slot_name = 'test_slot')] - ); - - test_wal_level($primary, "replica|logical", - "effective_wal_level increased to 'logical'"); + test_wal_level($primary, "replica|replica", + "the activation process aborted"); } $primary->stop; -- 2.47.3