From dfcc30566b53c378f03dff907d2c2a73c992f0f8 Mon Sep 17 00:00:00 2001 From: Dave Cramer Date: Mon, 23 Jul 2018 16:34:20 -0400 Subject: [PATCH 1/4] Respect client-initiated CopyDone in walsender --- src/backend/replication/walsender.c | 36 ++++++++++++++++++++++++++++++------ 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index d60026d..f624048 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -765,6 +765,14 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req sendTimeLineValidUpto = state->currTLIValidUntil; sendTimeLineNextTLI = state->nextTLI; + /* + * If the client sent CopyDone while we were waiting, + * bail out so we can wind up the decoding session. + */ + if (streamingDoneSending) + return -1; + + /* more than one block available */ /* make sure we have enough WAL available */ flushptr = WalSndWaitForWal(targetPagePtr + reqLen); @@ -1350,8 +1358,12 @@ WalSndWaitForWal(XLogRecPtr loc) * It's important to do this check after the recomputation of * RecentFlushPtr, so we can send all remaining data before shutting * down. - */ - if (got_STOPPING) + * + * We'll also exit here if the client sent CopyDone because it wants + * to return to command mode. + */ + + if (got_STOPPING || streamingDoneReceiving) break; /* @@ -2096,7 +2108,14 @@ WalSndCheckTimeOut(TimestampTz now) } } -/* Main loop of walsender process that streams the WAL over Copy messages. */ +/* + * Main loop of walsender process that streams the WAL over Copy messages. + * + * The send_data callback must enqueue complete CopyData messages to libpq + * using pq_putmessage_noblock or similar, since the walsender loop may send + * CopyDone then exit and return to command mode in response to a client + * CopyDone between calls to send_data. + */ static void WalSndLoop(WalSndSendDataCallback send_data) { @@ -2152,10 +2171,15 @@ WalSndLoop(WalSndSendDataCallback send_data) * some more. If there is some, we don't bother to call send_data * again until we've flushed it ... but we'd better assume we are not * caught up. + * + * If we're trying to finish sending and exit we shouldn't enqueue more + * data to libpq. We need to finish writing out whatever we already + * have in libpq's send buffer to maintain protocol sync so we still + * need to loop until it's flushed. */ - if (!pq_is_send_pending()) + if (!pq_is_send_pending() && !streamingDoneSending) send_data(); - else + else if (!streamingDoneSending) WalSndCaughtUp = false; /* Try to flush pending output to the client */ @@ -3387,7 +3411,7 @@ WalSndKeepaliveIfNecessary(TimestampTz now) if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0) return; - if (waiting_for_ping_response) + if (waiting_for_ping_response || streamingDoneSending) return; /* -- 2.6.4