From 64005d51e044241e36a3b2e80a7969f837b8a570 Mon Sep 17 00:00:00 2001 From: Fujii Masao Date: Fri, 24 Apr 2026 18:08:21 +0900 Subject: [PATCH v2] Avoid blocking indefinitely while finishing walsender shutdown When walsender finishes streaming during shutdown, it sends a CommandComplete message to tell the receiver that WAL streaming is done. Previously, that path used EndCommand() followed by pq_flush(). Those functions can block indefinitely waiting for the socket to become writeable. As a result, even when wal_sender_shutdown_timeout is set, walsender could remain stuck while sending the final completion message, and the shutdown timeout would not be enforced. Fix this by introducing EndCommandExtended(), which allows CommandComplete to be queued with pq_putmessage_noblock(), and by using the walsender nonblocking flush path instead of pq_flush(), so the shutdown timeout continues to be checked while pending output is flushed. --- src/backend/replication/walsender.c | 66 ++++++++++++++++++++++++++--- src/backend/tcop/dest.c | 15 ++++++- src/include/tcop/dest.h | 2 + 3 files changed, 75 insertions(+), 8 deletions(-) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 3d4ab929f91..04aa770d981 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -209,6 +209,13 @@ static bool waiting_for_ping_response = false; /* Timestamp when walsender received the shutdown request */ static TimestampTz shutdown_request_timestamp = 0; +/* + * Set after queueing the CommandComplete message that ends WAL streaming + * during shutdown. This prevents WalSndDone() and WalSndDoneImmediate() + * from queueing the same message twice. + */ +static bool shutdown_stream_done_queued = false; + /* * While streaming WAL in Copy mode, streamingDoneSending is set to true * after we have sent CopyDone. We should not send any more CopyData messages @@ -3713,15 +3720,17 @@ WalSndDoneImmediate(void) { WalSndState state = MyWalSnd->state; - if (state == WALSNDSTATE_CATCHUP || - state == WALSNDSTATE_STREAMING || - state == WALSNDSTATE_STOPPING) + if ((state == WALSNDSTATE_CATCHUP || + state == WALSNDSTATE_STREAMING || + state == WALSNDSTATE_STOPPING) && + !shutdown_stream_done_queued) { QueryCompletion qc; /* Try to inform receiver that XLOG streaming is done */ SetQueryCompletion(&qc, CMDTAG_COPY, 0); - EndCommand(&qc, DestRemote, false); + EndCommandExtended(&qc, DestRemote, false, true); + shutdown_stream_done_queued = true; /* * Note that the output buffer may be full during the forced shutdown @@ -3778,10 +3787,55 @@ WalSndDone(WalSndSendDataCallback send_data) { QueryCompletion qc; + Assert(!shutdown_stream_done_queued); + /* Inform the standby that XLOG streaming is done */ SetQueryCompletion(&qc, CMDTAG_COPY, 0); - EndCommand(&qc, DestRemote, false); - pq_flush(); + EndCommandExtended(&qc, DestRemote, false, true); + shutdown_stream_done_queued = true; + + /* + * Reset last_reply_timestamp so subsequent WalSndComputeSleeptime() + * calls ignore wal_sender_timeout during shutdown. + */ + last_reply_timestamp = 0; + + /* + * Do not call pq_flush() here, since it can block indefinitely while + * waiting for the socket to become writable, preventing + * wal_sender_shutdown_timeout from being enforced. Instead, use the + * walsender nonblocking flush path so the shutdown timeout continues + * to be checked while the send buffer drains. + */ + for (;;) + { + long sleeptime; + + /* + * During shutdown, die if the shutdown timeout expires. Call this + * before WalSndComputeSleeptime() so the timeout is considered + * when computing sleep time. + */ + WalSndCheckShutdownTimeout(); + + if (!pq_is_send_pending()) + break; + + sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); + + /* Sleep until something happens or we time out */ + WalSndWait(WL_SOCKET_WRITEABLE, sleeptime, + WAIT_EVENT_WAL_SENDER_WRITE_DATA); + + /* Clear any already-pending wakeups */ + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + + /* Try to flush pending output to the client */ + if (pq_flush_if_writable() != 0) + WalSndShutdown(); + } proc_exit(0); } diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c index fb163930c89..bdc3dad3357 100644 --- a/src/backend/tcop/dest.c +++ b/src/backend/tcop/dest.c @@ -165,8 +165,10 @@ CreateDestReceiver(CommandDest dest) * EndCommand - clean up the destination at end of command * ---------------- */ + void -EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output) +EndCommandExtended(const QueryCompletion *qc, CommandDest dest, + bool force_undecorated_output, bool noblock) { char completionTag[COMPLETION_TAG_BUFSIZE]; Size len; @@ -179,7 +181,10 @@ EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_o len = BuildQueryCompletionString(completionTag, qc, force_undecorated_output); - pq_putmessage(PqMsg_CommandComplete, completionTag, len + 1); + if (noblock) + pq_putmessage_noblock(PqMsg_CommandComplete, completionTag, len + 1); + else + pq_putmessage(PqMsg_CommandComplete, completionTag, len + 1); break; case DestNone: @@ -196,6 +201,12 @@ EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_o } } +void +EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output) +{ + EndCommandExtended(qc, dest, force_undecorated_output, false); +} + /* ---------------- * EndReplicationCommand - stripped down version of EndCommand * diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h index 4e4f532d8cc..103f27fc3cb 100644 --- a/src/include/tcop/dest.h +++ b/src/include/tcop/dest.h @@ -136,6 +136,8 @@ extern PGDLLIMPORT DestReceiver *None_Receiver; /* permanent receiver for extern void BeginCommand(CommandTag commandTag, CommandDest dest); extern DestReceiver *CreateDestReceiver(CommandDest dest); +extern void EndCommandExtended(const QueryCompletion *qc, CommandDest dest, + bool force_undecorated_output, bool noblock); extern void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output); extern void EndReplicationCommand(const char *commandTag); -- 2.53.0