From 8e926809636f1be474f26cec3d8b3dce80a17a6d Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Thu, 22 Dec 2022 02:49:48 +0000 Subject: [PATCH v6 1/2] Exit walsender before confirming remote flush in logical replication Currently, at shutdown, walsender processes wait to send all pending data and ensure the all data is flushed in remote node. This mechanism was added by 985bd7 for supporting clean switch over, but such use-case cannot be supported for logical replication. This commit remove the blocking in the case. Author: Hayato Kuroda --- doc/src/sgml/logical-replication.sgml | 10 ++++++ src/backend/replication/walsender.c | 50 ++++++++++++++++++--------- 2 files changed, 44 insertions(+), 16 deletions(-) diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 6bd5f61e2b..ccddb8a35a 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -1701,6 +1701,16 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER being synchronized. Moreover, if the streaming transaction is applied in parallel, there may be additional parallel apply workers. + + + + Unlike physical replication, data synchronization by logical replication is + more likely to be suspended. It is because workers sometimes wait for + acquiring locks and they do not consume messages from the publisher. It + will be resolved automatically when workers acquire locks and start + consuming arrivals. + + diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 4ed3747e3f..25a052adfc 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1450,6 +1450,10 @@ ProcessPendingWrites(void) /* Try to flush pending output to the client */ if (pq_flush_if_writable() != 0) WalSndShutdown(); + + /* If we got shut down requested, try to exit the process */ + if (got_STOPPING) + WalSndDone(XLogSendLogical); } /* reactivate latch so WalSndLoop knows to continue */ @@ -2513,18 +2517,14 @@ WalSndLoop(WalSndSendDataCallback send_data) application_name))); WalSndSetState(WALSNDSTATE_STREAMING); } - - /* - * When SIGUSR2 arrives, we send any outstanding logs up to the - * shutdown checkpoint record (i.e., the latest record), wait for - * them to be replicated to the standby, and exit. This may be a - * normal termination at shutdown, or a promotion, the walsender - * is not sure which. - */ - if (got_SIGUSR2) - WalSndDone(send_data); } + /* + * When SIGUSR2 arrives, try to exit the process. + */ + if (got_SIGUSR2) + WalSndDone(send_data); + /* Check for replication timeout. */ WalSndCheckTimeOut(); @@ -3094,13 +3094,14 @@ XLogSendLogical(void) } /* - * Shutdown if the sender is caught up. + * Shutdown if the sender is we are in a convenient time. * * NB: This should only be called when the shutdown signal has been received * from postmaster. * - * Note that if we determine that there's still more data to send, this - * function will return control to the caller. + * Note that if we determine that there's still more data to send or we are in + * physical replication mode and all WALs are not yet replicated, this function + * will return control to the caller. */ static void WalSndDone(WalSndSendDataCallback send_data) @@ -3118,15 +3119,32 @@ WalSndDone(WalSndSendDataCallback send_data) replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ? MyWalSnd->write : MyWalSnd->flush; - if (WalSndCaughtUp && sentPtr == replicatedPtr && - !pq_is_send_pending()) + /* + * Exit if we are in the convenient time. + * + * When we are logical replication mode, we don't have to wait that all + * sent data to be flushed on the subscriber because we cannot support + * clean switchover for it. + */ + if (WalSndCaughtUp && + (send_data == XLogSendLogical || + (sentPtr == replicatedPtr && !pq_is_send_pending()))) { QueryCompletion qc; /* Inform the standby that XLOG streaming is done */ SetQueryCompletion(&qc, CMDTAG_COPY, 0); EndCommand(&qc, DestRemote, false); - pq_flush(); + + /* + * Flush pending data if writable. + * + * Note that the output buffer may be full in case of logical + * replication. If pq_flush() is called at that time, the walsender + * process will be stuck. Therefore, call pq_flush_if_writable() + * instead. + */ + pq_flush_if_writable(); proc_exit(0); } -- 2.27.0