From efb4f15d7b1f3f97da370cf4b7db7562eec08cb3 Mon Sep 17 00:00:00 2001 From: Michael Paquier Date: Wed, 19 Jul 2023 11:29:02 +0900 Subject: [PATCH v5] Fix pg_recvlogical error message upon SIGINT/SIGTERM When pg_recvlogical needs to abort on a signal like SIGINT/SIGTERM, it is expected to exit cleanly. However, the code forgot to clean up the state of the connection befor leaving. This would cause the tool to emit messages like "unexpected termination of replication stream" error, which is meant for really unexpected termination or a crash. The code is refactored to apply the same termination abort operations for signals, end LSN and keepalive cases. Reported-by: Andres Freund Author: Bharath Rupireddy Reviewed-by: Kyotaro Horiguchi, Andres Freund, Cary Huang Discussion: https://www.postgresql.org/message-id/20221019213953.htdtzikf4f45ywil%40awork3.anarazel.de --- src/bin/pg_basebackup/pg_recvlogical.c | 52 ++++++++++++++++++++------ 1 file changed, 41 insertions(+), 11 deletions(-) diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c index f3c7937a1d..3bd83deee7 100644 --- a/src/bin/pg_basebackup/pg_recvlogical.c +++ b/src/bin/pg_basebackup/pg_recvlogical.c @@ -32,6 +32,13 @@ /* Time to sleep between reconnection attempts */ #define RECONNECT_SLEEP_TIME 5 +typedef enum +{ + STREAM_STOP_END_OF_WAL, + STREAM_STOP_KEEPALIVE, + STREAM_STOP_SIGNAL +} StreamStopReason; + /* Global Options */ static char *outfile = NULL; static int verbose = 0; @@ -66,7 +73,7 @@ static void usage(void); static void StreamLogicalLog(void); static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now); static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos, - bool keepalive, XLogRecPtr lsn); + StreamStopReason reason, XLogRecPtr lsn); static void usage(void) @@ -207,6 +214,8 @@ StreamLogicalLog(void) TimestampTz last_status = -1; int i; PQExpBuffer query; + XLogRecPtr stop_lsn = InvalidXLogRecPtr; + StreamStopReason stop_reason = STREAM_STOP_SIGNAL; output_written_lsn = InvalidXLogRecPtr; output_fsync_lsn = InvalidXLogRecPtr; @@ -487,7 +496,7 @@ StreamLogicalLog(void) if (endposReached) { - prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr); + stop_reason = STREAM_STOP_KEEPALIVE; time_to_abort = true; break; } @@ -519,6 +528,12 @@ StreamLogicalLog(void) /* Extract WAL location for this block */ cur_record_lsn = fe_recvint64(©buf[1]); + /* + * If this loop is aborted, like on signal, saving this information + * here gives a correct feedback. + */ + stop_lsn = cur_record_lsn; + if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos) { /* @@ -527,7 +542,7 @@ StreamLogicalLog(void) */ if (!flushAndSendFeedback(conn, &now)) goto error; - prepareToTerminate(conn, endpos, false, cur_record_lsn); + stop_reason = STREAM_STOP_END_OF_WAL; time_to_abort = true; break; } @@ -572,12 +587,16 @@ StreamLogicalLog(void) /* endpos was exactly the record we just processed, we're done */ if (!flushAndSendFeedback(conn, &now)) goto error; - prepareToTerminate(conn, endpos, false, cur_record_lsn); + stop_reason = STREAM_STOP_END_OF_WAL; time_to_abort = true; break; } } + /* Clean up connection state if stream has been aborted */ + if (time_to_abort) + prepareToTerminate(conn, endpos, stop_reason, stop_lsn); + res = PQgetResult(conn); if (PQresultStatus(res) == PGRES_COPY_OUT) { @@ -1021,18 +1040,29 @@ flushAndSendFeedback(PGconn *conn, TimestampTz *now) * retry on failure. */ static void -prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn) +prepareToTerminate(PGconn *conn, XLogRecPtr endpos, StreamStopReason reason, + XLogRecPtr lsn) { (void) PQputCopyEnd(conn, NULL); (void) PQflush(conn); if (verbose) { - if (keepalive) - pg_log_info("end position %X/%X reached by keepalive", - LSN_FORMAT_ARGS(endpos)); - else - pg_log_info("end position %X/%X reached by WAL record at %X/%X", - LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn)); + switch (reason) + { + case STREAM_STOP_SIGNAL: + pg_log_info("end position %X/%X reached on signal", + LSN_FORMAT_ARGS(lsn)); + break; + case STREAM_STOP_KEEPALIVE: + pg_log_info("end position %X/%X reached by keepalive", + LSN_FORMAT_ARGS(endpos)); + break; + case STREAM_STOP_END_OF_WAL: + Assert(!XLogRecPtrIsInvalid(lsn)); + pg_log_info("end position %X/%X reached by WAL record at %X/%X", + LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn)); + break; + } } } -- 2.40.1