From fdd376b2bc7667badeee2c5bb2848acd7da2663e Mon Sep 17 00:00:00 2001 From: Fujii Masao Date: Thu, 23 Apr 2026 13:34:38 +0900 Subject: [PATCH v1] Avoid blocking indefinitely while finishing walsender shutdown When walsender finishes streaming during shutdown, it sends a CommandComplete message to tell the receiver that WAL streaming is done. Previously, that path used EndCommand() followed by pq_flush(). Those functions can block indefinitely waiting for the socket to become writeable. As a result, even when wal_sender_shutdown_timeout is set, walsender could remain stuck while sending the final completion message, and the shutdown timeout would not be enforced. Fix this by introducing EndCommandExtended(), which allows CommandComplete to be queued with pq_putmessage_noblock(), and by using ProcessPendingWrites() instead of pq_flush() so the normal walsender write loop continues to process replies and check replication and shutdown timeouts while pending output is being flushed. --- src/backend/replication/walsender.c | 33 +++++++++++++++++++++++------ src/backend/tcop/dest.c | 15 +++++++++++-- src/include/tcop/dest.h | 2 ++ 3 files changed, 42 insertions(+), 8 deletions(-) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 3d4ab929f91..339747bf868 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -209,6 +209,13 @@ static bool waiting_for_ping_response = false; /* Timestamp when walsender received the shutdown request */ static TimestampTz shutdown_request_timestamp = 0; +/* + * Set after queueing the CommandComplete message that ends WAL streaming + * during shutdown. This prevents WalSndDone() and WalSndDoneImmediate() + * from queueing the same message twice. + */ +static bool shutdown_stream_done_queued = false; + /* * While streaming WAL in Copy mode, streamingDoneSending is set to true * after we have sent CopyDone. We should not send any more CopyData messages @@ -3713,15 +3720,17 @@ WalSndDoneImmediate(void) { WalSndState state = MyWalSnd->state; - if (state == WALSNDSTATE_CATCHUP || - state == WALSNDSTATE_STREAMING || - state == WALSNDSTATE_STOPPING) + if ((state == WALSNDSTATE_CATCHUP || + state == WALSNDSTATE_STREAMING || + state == WALSNDSTATE_STOPPING) && + !shutdown_stream_done_queued) { QueryCompletion qc; /* Try to inform receiver that XLOG streaming is done */ SetQueryCompletion(&qc, CMDTAG_COPY, 0); - EndCommand(&qc, DestRemote, false); + EndCommandExtended(&qc, DestRemote, false, true); + shutdown_stream_done_queued = true; /* * Note that the output buffer may be full during the forced shutdown @@ -3778,10 +3787,22 @@ WalSndDone(WalSndSendDataCallback send_data) { QueryCompletion qc; + Assert(!shutdown_stream_done_queued); + /* Inform the standby that XLOG streaming is done */ SetQueryCompletion(&qc, CMDTAG_COPY, 0); - EndCommand(&qc, DestRemote, false); - pq_flush(); + EndCommandExtended(&qc, DestRemote, false, true); + shutdown_stream_done_queued = true; + + /* + * Don't call pq_flush() here. It can block indefinitely waiting for + * the socket to become writeable, which would prevent + * wal_sender_shutdown_timeout from being enforced. Use the regular + * walsender non-blocking flush path so shutdown and replication + * timeouts continue to be checked while waiting for the send buffer + * to drain. + */ + ProcessPendingWrites(); proc_exit(0); } diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c index fb163930c89..bdc3dad3357 100644 --- a/src/backend/tcop/dest.c +++ b/src/backend/tcop/dest.c @@ -165,8 +165,10 @@ CreateDestReceiver(CommandDest dest) * EndCommand - clean up the destination at end of command * ---------------- */ + void -EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output) +EndCommandExtended(const QueryCompletion *qc, CommandDest dest, + bool force_undecorated_output, bool noblock) { char completionTag[COMPLETION_TAG_BUFSIZE]; Size len; @@ -179,7 +181,10 @@ EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_o len = BuildQueryCompletionString(completionTag, qc, force_undecorated_output); - pq_putmessage(PqMsg_CommandComplete, completionTag, len + 1); + if (noblock) + pq_putmessage_noblock(PqMsg_CommandComplete, completionTag, len + 1); + else + pq_putmessage(PqMsg_CommandComplete, completionTag, len + 1); break; case DestNone: @@ -196,6 +201,12 @@ EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_o } } +void +EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output) +{ + EndCommandExtended(qc, dest, force_undecorated_output, false); +} + /* ---------------- * EndReplicationCommand - stripped down version of EndCommand * diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h index 4e4f532d8cc..103f27fc3cb 100644 --- a/src/include/tcop/dest.h +++ b/src/include/tcop/dest.h @@ -136,6 +136,8 @@ extern PGDLLIMPORT DestReceiver *None_Receiver; /* permanent receiver for extern void BeginCommand(CommandTag commandTag, CommandDest dest); extern DestReceiver *CreateDestReceiver(CommandDest dest); +extern void EndCommandExtended(const QueryCompletion *qc, CommandDest dest, + bool force_undecorated_output, bool noblock); extern void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output); extern void EndReplicationCommand(const char *commandTag); -- 2.53.0