From 4d023cfc1fed0b5852b4da1aad6a32549b03ce26 Mon Sep 17 00:00:00 2001 From: Dave Cramer Date: Fri, 30 Nov 2018 18:23:49 -0500 Subject: [PATCH 1/5] Respect client initiated CopyDone in walsender --- src/backend/replication/walsender.c | 36 ++++++++++++++++++++++++++++++------ 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 46edb52..93f2648 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -770,6 +770,14 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req sendTimeLineValidUpto = state->currTLIValidUntil; sendTimeLineNextTLI = state->nextTLI; + /* + * If the client sent CopyDone while we were waiting, + * bail out so we can wind up the decoding session. + */ + if (streamingDoneSending) + return -1; + + /* more than one block available */ /* make sure we have enough WAL available */ flushptr = WalSndWaitForWal(targetPagePtr + reqLen); @@ -1341,8 +1349,12 @@ WalSndWaitForWal(XLogRecPtr loc) * It's important to do this check after the recomputation of * RecentFlushPtr, so we can send all remaining data before shutting * down. - */ - if (got_STOPPING) + * + * We'll also exit here if the client sent CopyDone because it wants + * to return to command mode. + */ + + if (got_STOPPING || streamingDoneReceiving) break; /* @@ -2095,7 +2107,14 @@ WalSndCheckTimeOut(void) } } -/* Main loop of walsender process that streams the WAL over Copy messages. */ +/* + * Main loop of walsender process that streams the WAL over Copy messages. + * + * The send_data callback must enqueue complete CopyData messages to libpq + * using pq_putmessage_noblock or similar, since the walsender loop may send + * CopyDone then exit and return to command mode in response to a client + * CopyDone between calls to send_data. + */ static void WalSndLoop(WalSndSendDataCallback send_data) { @@ -2142,10 +2161,15 @@ WalSndLoop(WalSndSendDataCallback send_data) * some more. If there is some, we don't bother to call send_data * again until we've flushed it ... but we'd better assume we are not * caught up. + * + * If we're trying to finish sending and exit we shouldn't enqueue more + * data to libpq. We need to finish writing out whatever we already + * have in libpq's send buffer to maintain protocol sync so we still + * need to loop until it's flushed. */ - if (!pq_is_send_pending()) + if (!pq_is_send_pending() && !streamingDoneSending) send_data(); - else + else if (!streamingDoneSending) WalSndCaughtUp = false; /* Try to flush pending output to the client */ @@ -3375,7 +3399,7 @@ WalSndKeepaliveIfNecessary(void) if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0) return; - if (waiting_for_ping_response) + if (waiting_for_ping_response || streamingDoneSending) return; /* -- 2.6.4