From b73da85bcf051034444bf27e1f5dd1045a0eab00 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Thu, 13 Nov 2025 14:42:40 -0800 Subject: [PATCH v28 2/2] FIXUP: remove status_change_allowed flag. --- src/backend/access/transam/xlog.c | 12 +- src/backend/postmaster/checkpointer.c | 13 + src/backend/replication/logical/logical.c | 16 +- src/backend/replication/logical/logicalctl.c | 222 +++++++----------- src/backend/replication/slot.c | 3 + src/include/access/xlog.h | 2 + src/include/replication/logicalctl.h | 1 - .../recovery/t/050_effective_wal_level.pl | 4 - 8 files changed, 114 insertions(+), 159 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 5a16c1e520c..055ea4dc7b2 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -6242,12 +6242,6 @@ StartupXLOG(void) /* * Update logical decoding status in shared memory and write an * XLOG_LOGICAL_DECODING_STATUS_CHANGE, if necessary. - * - * Note that this function starts to delay logical decoding status changes - * until the recovery state changes to DONE below, which is applied also - * for the checkpointer process in deactivation cases. Therefore, the - * startup should not do any operations that wait for the checkpointer - * because otherwise it easily ends up with a deadlock. */ UpdateLogicalDecodingStatusEndOfRecovery(); @@ -6282,6 +6276,12 @@ StartupXLOG(void) UpdateControlFile(); LWLockRelease(ControlFileLock); + /* + * Wake up the checkpointer process as there might be a request to disable + * logical decoding by concurrent slot drop. + */ + WakeupCheckpointer(); + /* * Wake up all waiters for replay LSN. They need to report an error that * recovery was ended before reaching the target LSN. diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c index 7ebf9239d70..5bf863e1452 100644 --- a/src/backend/postmaster/checkpointer.c +++ b/src/backend/postmaster/checkpointer.c @@ -1540,3 +1540,16 @@ FirstCallSinceLastCheckpoint(void) return FirstCall; } + +/* + * Wake up the checkpointer process. + */ +void +WakeupCheckpointer(void) +{ + volatile PROC_HDR *procglobal = ProcGlobal; + ProcNumber checkpointerProc = procglobal->checkpointerProc; + + if (checkpointerProc != INVALID_PROC_NUMBER) + SetLatch(&GetPGProcByNumber(checkpointerProc)->procLatch); +} diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 7a5a86313b6..f4366f59ee7 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -125,20 +125,8 @@ CheckLogicalDecodingRequirements(void) /* CheckSlotRequirements() has already checked if wal_level >= 'replica' */ - /* - * Check if logical decoding is available on standbys. Typically, when - * running a standby, RecoveryInProgress() returning true implies that - * LogicalDecodingStatusChangeAllowed() is false. However, during - * promotion, there is a brief transitional phase where - * RecoveryInProgress() remains true even though - * LogicalDecodingStatusChangeAllowed() has already turned true. - * - * In this window, logical decoding enable/disable operations are - * permitted on standby, anticipating its transition to primary. The - * actual wait for recovery completion is handled within - * start_logical_decoding_status_change(). - */ - if (!IsLogicalDecodingEnabled() && !LogicalDecodingStatusChangeAllowed()) + /* Check if logical decoding is available on standby */ + if (RecoveryInProgress() && !IsLogicalDecodingEnabled()) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("logical decoding on standby requires \"effective_wal_level\" >= \"logical\" on the primary"), diff --git a/src/backend/replication/logical/logicalctl.c b/src/backend/replication/logical/logicalctl.c index 57ba46bd204..8bc8111cfbb 100644 --- a/src/backend/replication/logical/logicalctl.c +++ b/src/backend/replication/logical/logicalctl.c @@ -96,26 +96,6 @@ typedef struct LogicalDecodingCtlData /* True if logical decoding is available in the system */ bool logical_decoding_enabled; - /* - * This flag indicates whether logical decoding status changes are - * allowed. It is false during recovery and becomes true when recovery - * ends. Even when true, it specifically means "allowed after recovery has - * fully completed". - * - * This flag helps prevent race conditions with the startup process's - * end-of-recovery actions. After the startup process updates the logical - * decoding status at recovery end, other processes might attempt to - * toggle logical decoding before recovery fully completes (i.e., - * RecoveryInProgress() returns false) - a period when WAL writes are - * still not permitted. Therefore, when this flag is true, we must wait - * for recovery to fully complete before attempting an activation or a - * deactivation. We cannot rely on the end-of-recovery to allow toggling - * of the logical decoding status because it's possible that concurrent - * processes write non-logical WAL records after the startup process - * enables logical decoding. - */ - bool status_change_allowed; - /* True while the logical decoding status is being changed */ bool status_change_inprogress; @@ -138,6 +118,7 @@ bool XLogLogicalInfo = false; static void update_xlog_logical_info(void); static void abort_logical_decoding_activation(int code, Datum arg); static bool start_logical_decoding_status_change(bool new_status); +static void write_logical_decoding_status_update_record(bool status); Size LogicalDecodingCtlShmemSize(void) @@ -201,7 +182,7 @@ InitializeProcessXLogLogicalInfo(void) } /* - * This routine is called when we are ordered to update XLogLogicalInfo + * This routine is called when we are told to update XLogLogicalInfo * by a ProcSignalBarrier. */ bool @@ -246,21 +227,20 @@ IsXLogLogicalInfoEnabled(void) /* * Enable or disable both the status of logical info WAL logging and logical * decoding in shmem. - + * * Note that this function updates the global flags without the state transition * process. EnsureLogicalDecodingEnabled() and DisableLogicalDecodingIfNecessary() * should be used instead if there could be concurrent processes doing writes - * or logical decoding, particularly once the status change is allowed globally. + * or logical decoding, particularly after recovery completes. */ void UpdateLogicalDecodingStatus(bool new_status, bool need_lock) { + Assert(RecoveryInProgress()); + if (need_lock) LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); - /* Must be called before allowing the status change globally */ - Assert(!LogicalDecodingCtl->status_change_allowed); - LogicalDecodingCtl->xlog_logical_info = new_status; LogicalDecodingCtl->logical_decoding_enabled = new_status; @@ -270,6 +250,20 @@ UpdateLogicalDecodingStatus(bool new_status, bool need_lock) elog(DEBUG1, "update logical decoding status to %d", new_status); } +/* + * Writes XLOG_LOGICAL_DECODING_STATUS_CHANGE WAL record with the given status. + */ +static void +write_logical_decoding_status_update_record(bool status) +{ + XLogRecPtr recptr; + + XLogBeginInsert(); + XLogRegisterData(&status, sizeof(bool)); + recptr = XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS_CHANGE); + XLogFlush(recptr); +} + /* * A PG_ENSURE_ERROR_CLEANUP callback for activating logical decoding, resetting * the shared flags to revert the logical decoding activation process. @@ -302,62 +296,15 @@ abort_logical_decoding_activation(int code, Datum arg) ConditionVariableBroadcast(&LogicalDecodingCtl->cv); } -/* - * Returns the status_change_allowed flag in LogicalDecodingCtl. The caller - * might need to check RecoveryInProgress() as well. Please see the comments for - * the status_change_allowed flag for details. - */ -bool -LogicalDecodingStatusChangeAllowed(void) -{ - bool status_change_allowed; - - LWLockAcquire(LogicalDecodingControlLock, LW_SHARED); - status_change_allowed = LogicalDecodingCtl->status_change_allowed; - LWLockRelease(LogicalDecodingControlLock); - - return status_change_allowed; -} - /* * Performs preparation work required before changing the logical decoding * status. If the status change is required, it sets * LogicalDecodingCtl->status_change_inprogress, and returns true. Otherwise, - * if it's not required or not allowed (e.g., logical slots exist) it returns - * false. + * if it's not required (e.g., logical slots exist) it returns false. */ static bool start_logical_decoding_status_change(bool new_status) { - if (!LogicalDecodingStatusChangeAllowed()) - return false; - - if (RecoveryInProgress()) - { - /* - * Wait for the recovery to complete. Note that even the checkpointer - * can wait for the recovery to complete here without concerning - * deadlocks unless the startup process performs any action that waits - * for it after calling UpdateLogicalDecodingStatusEndOfRecovery(). - */ - elog(DEBUG1, - "waiting for recovery completion to change logical decoding status"); - do - { - CHECK_FOR_INTERRUPTS(); - - pgstat_report_wait_start(WAIT_EVENT_LOGICAL_DECODING_STATUS_CHANGE_DELAY); - pg_usleep(100000L); /* wait for 100 msec */ - pgstat_report_wait_end(); - } - while (RecoveryInProgress()); - - /* - * Now that writing WAL records are officially allowed, start the - * logical decoding status change. - */ - } - retry: LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); @@ -366,6 +313,10 @@ retry: { /* Release the lock and wait for someone to complete the transition */ LWLockRelease(LogicalDecodingControlLock); + + elog(DEBUG1, + "waiting for logical decoding status change to complete"); + ConditionVariableSleep(&LogicalDecodingCtl->cv, WAIT_EVENT_LOGICAL_DECODING_STATUS_CHANGE); @@ -405,7 +356,7 @@ retry: * If this function is called during recovery, it simply returns without * action since the logical decoding status change is not allowed during * this time. The logical decoding status depends on the status on the primary. - * The caller can use CheckLogicalDecodingRequirements() before calling this + * The caller should use CheckLogicalDecodingRequirements() before calling this * function to make sure that the logical decoding status can be modified. * * Note that there is no interlock between logical decoding activation @@ -427,6 +378,12 @@ EnsureLogicalDecodingEnabled(void) if (wal_level >= WAL_LEVEL_LOGICAL) return; + if (RecoveryInProgress()) + { + Assert(IsLogicalDecodingEnabled()); + return; + } + /* Prepare and start the activation process if it's disabled */ if (!start_logical_decoding_status_change(true)) return; @@ -477,15 +434,7 @@ EnsureLogicalDecodingEnabled(void) LogicalDecodingCtl->logical_decoding_enabled = true; LWLockRelease(LogicalDecodingControlLock); - { - XLogRecPtr recptr; - bool logical_decoding = true; - - XLogBeginInsert(); - XLogRegisterData(&logical_decoding, sizeof(bool)); - recptr = XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS_CHANGE); - XLogFlush(recptr); - } + write_logical_decoding_status_update_record(true); /* Complete the transition */ LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); @@ -511,19 +460,9 @@ EnsureLogicalDecodingEnabled(void) void RequestDisableLogicalDecoding(void) { - volatile PROC_HDR *procglobal = ProcGlobal; - ProcNumber checkpointerProc = procglobal->checkpointerProc; - if (wal_level != WAL_LEVEL_REPLICA) return; - /* - * Check if the status change is allowed before initiating a disable - * request, to avoid unnecessary work. - */ - if (!LogicalDecodingStatusChangeAllowed()) - return; - /* * It's possible that we might not actually need to disable logical * decoding if someone creates a new logical slot concurrently. We set the @@ -534,9 +473,7 @@ RequestDisableLogicalDecoding(void) LogicalDecodingCtl->pending_disable = true; LWLockRelease(LogicalDecodingControlLock); - /* Wake up the checkpointer */ - if (checkpointerProc != INVALID_PROC_NUMBER) - SetLatch(&GetPGProcByNumber(checkpointerProc)->procLatch); + WakeupCheckpointer(); elog(DEBUG1, "requested disabling logical decoding"); } @@ -561,6 +498,9 @@ DisableLogicalDecodingIfNecessary(void) */ Assert(!MyReplicationSlot); + if (RecoveryInProgress()) + return; + LWLockAcquire(LogicalDecodingControlLock, LW_SHARED); pending_disable = LogicalDecodingCtl->pending_disable; LWLockRelease(LogicalDecodingControlLock); @@ -592,15 +532,7 @@ DisableLogicalDecodingIfNecessary(void) /* Write the WAL to disable logical decoding on standbys too */ if (XLogStandbyInfoActive()) - { - bool logical_decoding = false; - XLogRecPtr recptr; - - XLogBeginInsert(); - XLogRegisterData(&logical_decoding, sizeof(bool)); - recptr = XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS_CHANGE); - XLogFlush(recptr); - } + write_logical_decoding_status_update_record(false); /* Now disable logical information WAL logging */ LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); @@ -633,20 +565,23 @@ DisableLogicalDecodingIfNecessary(void) } /* - * Update the logical decoding status at end of the recovery. This function - * must be called before accepting writes. Please refer to the comment of - * LogicalDecodingCtlData.status_change_allowed flag for the details. + * Updates the logical decoding status at end of recovery, and ensures that + * all running processes have the updated XLogLogicalInfo status. This + * function must be called before accepting writes. */ void UpdateLogicalDecodingStatusEndOfRecovery(void) { bool new_status = false; - bool need_wal = false; Assert(RecoveryInProgress()); Assert(!LogicalDecodingCtl->status_change_inprogress); - /* With 'minimal' WAL level, logical decoding is always disabled */ + /* + * With 'minimal' WAL level, there have not been logical slots during + * recovery. Logical decoding is always disabled, and no need to + * synchronize XLogLogicalInfo. + */ if (wal_level == WAL_LEVEL_MINIMAL) { Assert(!IsXLogLogicalInfoEnabled() && !IsLogicalDecodingEnabled()); @@ -658,39 +593,58 @@ UpdateLogicalDecodingStatusEndOfRecovery(void) if (wal_level == WAL_LEVEL_LOGICAL || CheckLogicalSlotExists()) new_status = true; - if (LogicalDecodingCtl->logical_decoding_enabled != new_status) - need_wal = true; - - /* - * Update shmem flags. We don't need to care about the order of setting - * global flag and writing the WAL record as writes are not allowed yet. - */ - UpdateLogicalDecodingStatus(new_status, false); - /* - * Mark the end-of-recovery action has been done, allowing processes to - * change the logical decoding status after the recovery finished. + * When recovery ends, we need to either enable or disable logical + * decoding based on the wal_level setting and the presence of logical + * slots. We need to note that concurrent slot creation and deletion could + * happen but WAL writes are still not permitted until recovery fully + * completes. Here's how we handle concurrent toggling logical decoding: + * + * For 'enable' case, if there's a concurrent disable request before + * recovery fully completes, the checkpointer will handle it after + * recovery is done. This means there might be a brief period after + * recovery where logical decoding remains enabled even with no logical + * replication slots present. This temporary state is not new - it can + * already occur due to the checkpointer's asynchronous deactivation + * process. + * + * For 'disable' case, backend cannot create logical replication slots + * during recovery (see checks in CheckLogicalDecodingRequirements()), + * which prevents a race condition between disabling logical decoding and + * concurrent slot creation. */ - LogicalDecodingCtl->status_change_allowed = true; - - LWLockRelease(LogicalDecodingControlLock); - - if (need_wal) + if (new_status != LogicalDecodingCtl->logical_decoding_enabled) { - XLogRecPtr recptr; + /* + * Update both the logical decoding status and logical WAL logging + * status. Unlike toggling these status during non-recovery, we don't + * need to worry about the operation order as WAL writes are still not + * permitted. Similarly, we don't need PG_ENSURE_ERROR_CLEANUP() to + * abort the status change process neither, as erroring out during + * recovery leads to a server shutdown. + */ + UpdateLogicalDecodingStatus(new_status, false); + + /* + * Now that we updated the logical decoding status, clear the pending + * disable flag. It's possible that a concurrent process drops the + * last logical slot and initiates the pending disable again. The + * checkpointer process will check it. + */ + LogicalDecodingCtl->pending_disable = false; - Assert(XLogStandbyInfoActive()); + LWLockRelease(LogicalDecodingControlLock); - XLogBeginInsert(); - XLogRegisterData(&new_status, sizeof(bool)); - recptr = XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS_CHANGE); - XLogFlush(recptr); + write_logical_decoding_status_update_record(new_status); } + else + LWLockRelease(LogicalDecodingControlLock); /* * Ensure all running processes have the updated status. We don't need to * wait for running transactions to finish as we don't accept any writes - * yet. We need the wait even if we've not updated the status above as the + * yet. On the other hand, we need to wait for synchronizing + * XLogLogicalInfo even if we've not updated the status above as the * status have been turned on and off during recovery, having running * processes have different status on their local caches. */ diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index c0d02ffa624..1da843119a1 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -2153,6 +2153,9 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes, * causes in a single pass, minimizing redundant iterations. The "cause" * parameter can be a MASK representing one or more of the defined causes. * + * If it invalidates the last logical slot in the cluster, it requests to + * disable logical decoding. + * * NB - this runs as part of checkpoint, so avoid raising errors if possible. */ bool diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index a73726982d7..58655d91db7 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -269,6 +269,8 @@ extern XLogRecPtr GetLastImportantRecPtr(void); extern void SetWalWriterSleeping(bool sleeping); +extern void WakeupCheckpointer(void); + extern Size WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, TimeLineID tli); diff --git a/src/include/replication/logicalctl.h b/src/include/replication/logicalctl.h index fe8b7141af7..77ded671473 100644 --- a/src/include/replication/logicalctl.h +++ b/src/include/replication/logicalctl.h @@ -26,6 +26,5 @@ extern void RequestDisableLogicalDecoding(void); extern void DisableLogicalDecodingIfNecessary(void); extern void UpdateLogicalDecodingStatus(bool new_status, bool need_lock); extern void UpdateLogicalDecodingStatusEndOfRecovery(void); -extern bool LogicalDecodingStatusChangeAllowed(void); #endif diff --git a/src/test/recovery/t/050_effective_wal_level.pl b/src/test/recovery/t/050_effective_wal_level.pl index 9a04b653bdc..150a78b904e 100644 --- a/src/test/recovery/t/050_effective_wal_level.pl +++ b/src/test/recovery/t/050_effective_wal_level.pl @@ -360,13 +360,9 @@ if ( $ENV{enable_injection_points} eq 'yes' ); # Drop the logical slot, requesting to disable logical decoding to the checkpointer. - # It has to wait for the recovery to complete before disabling logical decoding. $standby5->safe_psql('postgres', qq[select pg_drop_replication_slot('standby5_slot');]); - $standby5->wait_for_log( - "waiting for recovery completion to change logical decoding status"); - # Resume the startup process to complete the recovery. $standby5->safe_psql('postgres', qq[select injection_points_wakeup('startup-logical-decoding-status-change-end-of-recovery')] -- 2.47.3