From 51120c0c46533a062fb6be61ca58d100dc0059a0 Mon Sep 17 00:00:00 2001 From: alterego655 <824662526@qq.com> Date: Mon, 15 Dec 2025 16:37:07 +0800 Subject: [PATCH v3] Add CONNECTING/CONNECTED states to walreceiver Previously, walreceiver reported WALRCV_STREAMING immediately after startup, before it had demonstrated end-to-end WAL flow. This could confuse monitoring in some cases. Introduce two intermediate states: - WALRCV_CONNECTING: walreceiver is starting up - WALRCV_CONNECTED: START_REPLICATION succeeded, but replay progress has not yet been observed Capture a baseline apply pointer when START_REPLICATION succeeds, and promote to WALRCV_STREAMING once applyPtr advances beyond that baseline. --- src/backend/replication/walreceiver.c | 43 ++++++++++++++++++++-- src/backend/replication/walreceiverfuncs.c | 5 ++- src/include/replication/walreceiver.h | 3 ++ 3 files changed, 46 insertions(+), 5 deletions(-) diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index ac802ae85b4..345deed9093 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -130,6 +130,7 @@ typedef enum WalRcvWakeupReason static TimestampTz wakeup[NUM_WALRCV_WAKEUPS]; static StringInfoData reply_message; +static XLogRecPtr initialApplyPtr = InvalidXLogRecPtr; /* Prototypes for private functions */ static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last); @@ -204,6 +205,8 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) /* The usual case */ break; + case WALRCV_CONNECTING: + case WALRCV_CONNECTED: case WALRCV_WAITING: case WALRCV_STREAMING: case WALRCV_RESTARTING: @@ -214,7 +217,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) } /* Advertise our PID so that the startup process can kill us */ walrcv->pid = MyProcPid; - walrcv->walRcvState = WALRCV_STREAMING; + walrcv->walRcvState = WALRCV_CONNECTING; /* Fetch information required to start streaming */ walrcv->ready_to_display = false; @@ -398,6 +401,13 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL); initStringInfo(&reply_message); + /* We are connected, but have not yet applied WAL from this stream */ + initialApplyPtr = LogstreamResult.Write; + SpinLockAcquire(&walrcv->mutex); + if (walrcv->walRcvState == WALRCV_CONNECTING) + walrcv->walRcvState = WALRCV_CONNECTED; + SpinLockRelease(&walrcv->mutex); + /* Initialize nap wakeup times. */ now = GetCurrentTimestamp(); for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i) @@ -688,8 +698,9 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI) */ *startpoint = walrcv->receiveStart; *startpointTLI = walrcv->receiveStartTLI; - walrcv->walRcvState = WALRCV_STREAMING; + walrcv->walRcvState = WALRCV_CONNECTING; SpinLockRelease(&walrcv->mutex); + initialApplyPtr = InvalidXLogRecPtr; break; } if (walrcv->walRcvState == WALRCV_STOPPING) @@ -790,7 +801,9 @@ WalRcvDie(int code, Datum arg) /* Mark ourselves inactive in shared memory */ SpinLockAcquire(&walrcv->mutex); - Assert(walrcv->walRcvState == WALRCV_STREAMING || + Assert(walrcv->walRcvState == WALRCV_CONNECTING || + walrcv->walRcvState == WALRCV_CONNECTED || + walrcv->walRcvState == WALRCV_STREAMING || walrcv->walRcvState == WALRCV_RESTARTING || walrcv->walRcvState == WALRCV_STARTING || walrcv->walRcvState == WALRCV_WAITING || @@ -989,6 +1002,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli) if (LogstreamResult.Flush < LogstreamResult.Write) { WalRcvData *walrcv = WalRcv; + bool force_reply = false; issue_xlog_fsync(recvFile, recvSegNo, tli); @@ -1001,6 +1015,8 @@ XLogWalRcvFlush(bool dying, TimeLineID tli) walrcv->latestChunkStart = walrcv->flushedUpto; walrcv->flushedUpto = LogstreamResult.Flush; walrcv->receivedTLI = tli; + if (walrcv->walRcvState == WALRCV_CONNECTED) + force_reply = true; } SpinLockRelease(&walrcv->mutex); @@ -1022,7 +1038,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli) /* Also let the primary know that we made some progress */ if (!dying) { - XLogWalRcvSendReply(false, false); + XLogWalRcvSendReply(force_reply, false); XLogWalRcvSendHSFeedback(false); } } @@ -1129,6 +1145,21 @@ XLogWalRcvSendReply(bool force, bool requestReply) flushPtr = LogstreamResult.Flush; applyPtr = GetXLogReplayRecPtr(NULL); + /* + * If we've established the replication connection but have not yet proven + * WAL is flowing end-to-end, flip to STREAMING once applyPtr advances + * beyond the baseline captured when START_REPLICATION succeeded. + */ + if (WalRcv->walRcvState == WALRCV_CONNECTED && + XLogRecPtrIsValid(initialApplyPtr) && + applyPtr != initialApplyPtr) + { + SpinLockAcquire(&WalRcv->mutex); + if (WalRcv->walRcvState == WALRCV_CONNECTED) + WalRcv->walRcvState = WALRCV_STREAMING; + SpinLockRelease(&WalRcv->mutex); + } + resetStringInfo(&reply_message); pq_sendbyte(&reply_message, PqReplMsg_StandbyStatusUpdate); pq_sendint64(&reply_message, writePtr); @@ -1373,6 +1404,10 @@ WalRcvGetStateString(WalRcvState state) return "stopped"; case WALRCV_STARTING: return "starting"; + case WALRCV_CONNECTING: + return "connecting"; + case WALRCV_CONNECTED: + return "connected"; case WALRCV_STREAMING: return "streaming"; case WALRCV_WAITING: diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 822645748a7..b0b6e8314b5 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -179,7 +179,8 @@ WalRcvStreaming(void) } if (state == WALRCV_STREAMING || state == WALRCV_STARTING || - state == WALRCV_RESTARTING) + state == WALRCV_RESTARTING || state == WALRCV_CONNECTING || + state == WALRCV_CONNECTED) return true; else return false; @@ -211,6 +212,8 @@ ShutdownWalRcv(void) stopped = true; break; + case WALRCV_CONNECTING: + case WALRCV_CONNECTED: case WALRCV_STREAMING: case WALRCV_WAITING: case WALRCV_RESTARTING: diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index e5557d21fa8..70b2947f2b4 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -47,6 +47,9 @@ typedef enum WALRCV_STOPPED, /* stopped and mustn't start up again */ WALRCV_STARTING, /* launched, but the process hasn't * initialized yet */ + WALRCV_CONNECTING, /* connection starting, but not established yet */ + WALRCV_CONNECTED, /* replication connection established, but no WAL + * streamed yet */ WALRCV_STREAMING, /* walreceiver is streaming */ WALRCV_WAITING, /* stopped streaming, waiting for orders */ WALRCV_RESTARTING, /* asked to restart streaming */ -- 2.51.0