From cb861c1de3c8cd70b7cb2fe47711ef36fbd16bd2 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Thu, 22 Dec 2022 02:49:48 +0000 Subject: [PATCH v4] Exit walsender before confirming remote flush in logical replication Currently, at shutdown, walsender processes wait to send all pending data and ensure the all data is flushed in remote node. This mechanism was added by 985bd7 for supporting clean switch over, but such use-case cannot be supported for logical replication. This commit remove the blocking in the case. Author: Hayato Kuroda --- doc/src/sgml/logical-replication.sgml | 10 ++++++ src/backend/replication/walsender.c | 45 +++++++++++++++++---------- 2 files changed, 39 insertions(+), 16 deletions(-) diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 54f48be87f..403c518b51 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -1694,6 +1694,16 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER table is in progress, there will be additional workers for the tables being synchronized. + + + + Unlike physical replication, data synchronization by logical replication is + more likely to be suspended. It is because workers sometimes wait for + acquiring locks and they do not consume messages from the publisher. It + will be resolved automatically when workers acquire locks and start + consuming arrivals. + + diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 015ae2995d..dbca93dd9d 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1450,6 +1450,10 @@ ProcessPendingWrites(void) /* Try to flush pending output to the client */ if (pq_flush_if_writable() != 0) WalSndShutdown(); + + /* If we got shut down requested, try to exit the process */ + if (got_STOPPING) + WalSndDone(XLogSendLogical); } /* reactivate latch so WalSndLoop knows to continue */ @@ -2513,18 +2517,14 @@ WalSndLoop(WalSndSendDataCallback send_data) application_name))); WalSndSetState(WALSNDSTATE_STREAMING); } - - /* - * When SIGUSR2 arrives, we send any outstanding logs up to the - * shutdown checkpoint record (i.e., the latest record), wait for - * them to be replicated to the standby, and exit. This may be a - * normal termination at shutdown, or a promotion, the walsender - * is not sure which. - */ - if (got_SIGUSR2) - WalSndDone(send_data); } + /* + * When SIGUSR2 arrives, try to exit the process. + */ + if (got_SIGUSR2) + WalSndDone(send_data); + /* Check for replication timeout. */ WalSndCheckTimeOut(); @@ -3094,13 +3094,14 @@ XLogSendLogical(void) } /* - * Shutdown if the sender is caught up. + * Shutdown if the sender is we are in a convenient time. * * NB: This should only be called when the shutdown signal has been received * from postmaster. * - * Note that if we determine that there's still more data to send, this - * function will return control to the caller. + * Note that if we determine that there's still more data to send or we are in + * physical replication mode and all WALs are not yet replicated, this function + * will return control to the caller. */ static void WalSndDone(WalSndSendDataCallback send_data) @@ -3118,15 +3119,27 @@ WalSndDone(WalSndSendDataCallback send_data) replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ? MyWalSnd->write : MyWalSnd->flush; - if (WalSndCaughtUp && sentPtr == replicatedPtr && - !pq_is_send_pending()) + /* + * Exit if we are in the convenient time. + * + * Note that in case of logical replication, we don't have to wait that all + * sent data to be flushed on the subscriber. It will request to send WALs + * from the last received point, and we cannot support clean switchover in + * logical replication. + */ + if (WalSndCaughtUp && + (send_data == XLogSendLogical || + (sentPtr == replicatedPtr && !pq_is_send_pending()))) { QueryCompletion qc; /* Inform the standby that XLOG streaming is done */ SetQueryCompletion(&qc, CMDTAG_COPY, 0); EndCommand(&qc, DestRemote, false); - pq_flush(); + if (send_data == XLogSendLogical) + pq_flush_if_writable(); + else + pq_flush(); proc_exit(0); } -- 2.27.0