From b4114789d2fb1fbbe585feedfc2b419d768057d7 Mon Sep 17 00:00:00 2001 From: alterego655 <824662526@qq.com> Date: Mon, 15 Dec 2025 15:51:46 +0800 Subject: [PATCH v3] Add WALRCV_CONNECTING and WALRCV_CONNECTED states to walreceiver Previously, walreceiver set status='streaming' early in startup before receiving any WAL, making it unreliable for health monitoring. Introduce two intermediate states: - WALRCV_CONNECTING: Walreceiver enters CONNECTING on startup - WALRCV_CONNECTED: Walreceiver transitions from CONNECTING to CONNECTED after START_REPLICATION succeeded, awaiting first WAL record The final transition from CONNECTED to STREAMING occurs when startup validates the first record, confirming end-to-end data flow. This allows monitoring tools to distinguish connection establishment from active WAL streaming. --- src/backend/access/transam/xlogrecovery.c | 17 +++++++++++++ src/backend/replication/walreceiver.c | 17 +++++++++++-- src/backend/replication/walreceiverfuncs.c | 29 +++++++++++++++++++++- src/include/replication/walreceiver.h | 4 +++ 4 files changed, 64 insertions(+), 3 deletions(-) diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index ae2398d6975..89af2909063 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -250,6 +250,9 @@ static XLogSource currentSource = XLOG_FROM_ANY; static bool lastSourceFailed = false; static bool pendingWalRcvRestart = false; +/* Guard to update walreceiver state only once per streaming session. */ +static bool walrcv_streaming_set = false; + /* * These variables track when we last obtained some WAL data to process, * and where we got it from. (XLogReceiptSource is initially the same as @@ -1827,6 +1830,17 @@ PerformWalRecovery(void) recoveryPausesHere(false); } + /* + * If we are reading from the stream, this is the first valid + * record we have successfully parsed. Now we can verify the + * connection is truly streaming valid WAL. + */ + if (!walrcv_streaming_set && readSource == XLOG_FROM_STREAM) + { + if (WalRcvSetStreaming()) + walrcv_streaming_set = true; + } + /* * Apply the record */ @@ -3692,6 +3706,9 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, * one can hope... */ + /* Reset our "streaming active" guard flag */ + walrcv_streaming_set = false; + /* * We should be able to move to XLOG_FROM_STREAM only in * standby mode. diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index ac802ae85b4..fc8d0cd7804 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -205,6 +205,8 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) break; case WALRCV_WAITING: + case WALRCV_CONNECTING: + case WALRCV_CONNECTED: case WALRCV_STREAMING: case WALRCV_RESTARTING: default: @@ -214,7 +216,7 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) } /* Advertise our PID so that the startup process can kill us */ walrcv->pid = MyProcPid; - walrcv->walRcvState = WALRCV_STREAMING; + walrcv->walRcvState = WALRCV_CONNECTING; /* Fetch information required to start streaming */ walrcv->ready_to_display = false; @@ -394,6 +396,11 @@ WalReceiverMain(const void *startup_data, size_t startup_data_len) LSN_FORMAT_ARGS(startpoint), startpointTLI)); first_stream = false; + SpinLockAcquire(&walrcv->mutex); + if (walrcv->walRcvState == WALRCV_CONNECTING) + walrcv->walRcvState = WALRCV_CONNECTED; + SpinLockRelease(&walrcv->mutex); + /* Initialize LogstreamResult and buffers for processing messages */ LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL); initStringInfo(&reply_message); @@ -688,7 +695,7 @@ WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI) */ *startpoint = walrcv->receiveStart; *startpointTLI = walrcv->receiveStartTLI; - walrcv->walRcvState = WALRCV_STREAMING; + walrcv->walRcvState = WALRCV_CONNECTING; SpinLockRelease(&walrcv->mutex); break; } @@ -791,6 +798,8 @@ WalRcvDie(int code, Datum arg) /* Mark ourselves inactive in shared memory */ SpinLockAcquire(&walrcv->mutex); Assert(walrcv->walRcvState == WALRCV_STREAMING || + walrcv->walRcvState == WALRCV_CONNECTING || + walrcv->walRcvState == WALRCV_CONNECTED || walrcv->walRcvState == WALRCV_RESTARTING || walrcv->walRcvState == WALRCV_STARTING || walrcv->walRcvState == WALRCV_WAITING || @@ -1373,6 +1382,10 @@ WalRcvGetStateString(WalRcvState state) return "stopped"; case WALRCV_STARTING: return "starting"; + case WALRCV_CONNECTING: + return "connecting"; + case WALRCV_CONNECTED: + return "connected"; case WALRCV_STREAMING: return "streaming"; case WALRCV_WAITING: diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 822645748a7..62482830ba2 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -179,12 +179,37 @@ WalRcvStreaming(void) } if (state == WALRCV_STREAMING || state == WALRCV_STARTING || - state == WALRCV_RESTARTING) + state == WALRCV_RESTARTING || state == WALRCV_CONNECTING || + state == WALRCV_CONNECTED) return true; else return false; } +/* + * Transition from CONNECTED to STREAMING state. + * + * This is called by the startup process when the first WAL record from + * the walreceiver is processed, indicating that the connection is fully + * established and data is flowing. + */ +bool +WalRcvSetStreaming(void) +{ + WalRcvData *walrcv = WalRcv; + bool set = false; + + SpinLockAcquire(&walrcv->mutex); + if (walrcv->walRcvState == WALRCV_CONNECTED) + { + walrcv->walRcvState = WALRCV_STREAMING; + set = true; + } + SpinLockRelease(&walrcv->mutex); + + return set; +} + /* * Stop walreceiver (if running) and wait for it to die. * Executed by the Startup process. @@ -211,6 +236,8 @@ ShutdownWalRcv(void) stopped = true; break; + case WALRCV_CONNECTING: + case WALRCV_CONNECTED: case WALRCV_STREAMING: case WALRCV_WAITING: case WALRCV_RESTARTING: diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index e5557d21fa8..656206e8d81 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -47,6 +47,9 @@ typedef enum WALRCV_STOPPED, /* stopped and mustn't start up again */ WALRCV_STARTING, /* launched, but the process hasn't * initialized yet */ + WALRCV_CONNECTING, /* connection starting, but not established yet */ + WALRCV_CONNECTED, /* replication connection established, but no WAL + * streamed yet */ WALRCV_STREAMING, /* walreceiver is streaming */ WALRCV_WAITING, /* stopped streaming, waiting for orders */ WALRCV_RESTARTING, /* asked to restart streaming */ @@ -492,6 +495,7 @@ extern void WalRcvForceReply(void); /* prototypes for functions in walreceiverfuncs.c */ extern Size WalRcvShmemSize(void); extern void WalRcvShmemInit(void); +extern bool WalRcvSetStreaming(void); extern void ShutdownWalRcv(void); extern bool WalRcvStreaming(void); extern bool WalRcvRunning(void); -- 2.51.0