*** a/src/bin/pg_basebackup/pg_basebackup.c --- b/src/bin/pg_basebackup/pg_basebackup.c *************** *** 47,52 **** bool includewal = false; --- 47,56 ---- bool streamwal = false; bool fastcheckpoint = false; int standby_message_timeout = 10 * 1000; /* 10 sec = default */ + int standby_recv_timeout = 60*1000; /* 60 sec = default */ + char *standby_connect_timeout = NULL; + + #define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */ /* Progress counters */ static uint64 totalsize; *************** *** 125,130 **** usage(void) --- 129,138 ---- printf(_(" -p, --port=PORT database server port number\n")); printf(_(" -s, --status-interval=INTERVAL\n" " time between status packets sent to server (in seconds)\n")); + printf(_(" -r, --recvtimeout=INTERVAL time that receiver waits for communication from\n" + " server (in seconds)\n")); + printf(_(" -t, --conntimeout=INTERVAL time that client wait for connection to establish\n" + " with server (in seconds)\n")); printf(_(" -U, --username=NAME connect as specified database user\n")); printf(_(" -w, --no-password never prompt for password\n")); printf(_(" -W, --password force password prompt (should happen automatically)\n")); *************** *** 237,244 **** LogStreamerMain(logstreamer_param *param) { if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, param->sysidentifier, param->xlogdir, ! reached_end_position, standby_message_timeout, ! true)) /* * Any errors will already have been reported in the function process, --- 245,252 ---- { if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, param->sysidentifier, param->xlogdir, ! reached_end_position, standby_message_timeout, ! standby_recv_timeout, true)) /* * Any errors will already have been reported in the function process, *************** *** 290,297 **** StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier) } #endif ! /* Get a second connection */ ! param->bgconn = GetConnection(); if (!param->bgconn) /* Error message already written in GetConnection() */ exit(1); --- 298,306 ---- } #endif ! /* Get a second connection. Sending connect_timeout ! * as configured, there is no need for rw_timeout.*/ ! param->bgconn = GetConnection(standby_connect_timeout); if (!param->bgconn) /* Error message already written in GetConnection() */ exit(1); *************** *** 467,472 **** ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) --- 476,482 ---- char filename[MAXPGPATH]; char *copybuf = NULL; FILE *tarfile = NULL; + int64 last_recv_timestamp; #ifdef HAVE_LIBZ gzFile ztarfile = NULL; *************** *** 584,592 **** ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) --- 594,605 ---- disconnect_and_exit(1); } + /* Set the last reply timestamp */ + last_recv_timestamp = localGetCurrentTimestamp(); while (1) { int r; + int64 now; if (copybuf != NULL) { *************** *** 594,600 **** ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) copybuf = NULL; } ! r = PQgetCopyData(conn, ©buf, 0); if (r == -1) { /* --- 607,668 ---- copybuf = NULL; } ! r = PQgetCopyData(conn, ©buf, 1); ! if (r == 0) ! { ! /* ! * In async mode, and no data available. We block on reading but ! * not more than the specified timeout, so that we can send a ! * response back to the client. ! */ ! fd_set input_mask; ! struct timeval timeout; ! ! FD_ZERO(&input_mask); ! FD_SET(PQsocket(conn), &input_mask); ! timeout.tv_sec = 0; ! timeout.tv_usec = NAPTIME_PER_CYCLE*1000; ! ! r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, &timeout); ! if (r == 0 || (r < 0 && errno == EINTR)) ! { ! /* ! * Got a timeout or signal. Before Continuing the loop, check for timeout. ! */ ! if (standby_recv_timeout > 0) ! { ! now = localGetCurrentTimestamp(); ! if (localTimestampDifferenceExceeds(last_recv_timestamp, now, standby_recv_timeout)) ! { ! fprintf(stderr, _("%s: terminating DB File receive due to timeout\n"), ! progname); ! disconnect_and_exit(1); ! } ! } ! ! continue; ! } ! else if (r < 0) ! { ! fprintf(stderr, _("%s: select() failed: %s\n"), ! progname, strerror(errno)); ! disconnect_and_exit(1); ! } ! /* Else there is actually data on the socket */ ! if (PQconsumeInput(conn) == 0) ! { ! fprintf(stderr, ! _("%s: could not receive data from WAL Sender: %s"), ! progname, PQerrorMessage(conn)); ! disconnect_and_exit(1); ! } ! ! /* Set the last reply timestamp */ ! last_recv_timestamp = localGetCurrentTimestamp(); ! ! /* Some data is received, so go back read them in buffer*/ ! continue; ! } if (r == -1) { /* *************** *** 665,670 **** ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) --- 733,741 ---- disconnect_and_exit(1); } + /* Set the last reply timestamp */ + last_recv_timestamp = localGetCurrentTimestamp(); + #ifdef HAVE_LIBZ if (ztarfile != NULL) { *************** *** 714,719 **** ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) --- 785,791 ---- int current_padding = 0; char *copybuf = NULL; FILE *file = NULL; + int64 last_recv_timestamp; if (PQgetisnull(res, rownum, 0)) strcpy(current_path, basedir); *************** *** 731,739 **** ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) --- 803,815 ---- disconnect_and_exit(1); } + /* Set the last reply timestamp */ + last_recv_timestamp = localGetCurrentTimestamp(); while (1) { int r; + int64 now; + if (copybuf != NULL) { *************** *** 741,748 **** ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) copybuf = NULL; } ! r = PQgetCopyData(conn, ©buf, 0); if (r == -1) { /* --- 817,878 ---- copybuf = NULL; } ! r = PQgetCopyData(conn, ©buf, 1); ! if (r == 0) ! { ! /* ! * In async mode, and no data available. We block on reading but ! * not more than the specified timeout, so that we can send a ! * response back to the client. ! */ ! fd_set input_mask; ! struct timeval timeout; ! ! FD_ZERO(&input_mask); ! FD_SET(PQsocket(conn), &input_mask); ! timeout.tv_sec = 0; ! timeout.tv_usec = NAPTIME_PER_CYCLE*1000; + r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, &timeout); + if (r == 0 || (r < 0 && errno == EINTR)) + { + /* + * Got a timeout or signal. Before Continuing the loop, check for timeout. + */ + if (standby_recv_timeout > 0) + { + now = localGetCurrentTimestamp(); + if (localTimestampDifferenceExceeds(last_recv_timestamp, now, standby_recv_timeout)) + { + fprintf(stderr, _("%s: terminating DB File receive due to timeout\n"), + progname); + disconnect_and_exit(1); + } + } + + continue; + } + else if (r < 0) + { + fprintf(stderr, _("%s: select() failed: %s\n"), + progname, strerror(errno)); + disconnect_and_exit(1); + } + /* Else there is actually data on the socket */ + if (PQconsumeInput(conn) == 0) + { + fprintf(stderr, + _("%s: could not receive data from WAL Sender: %s"), + progname, PQerrorMessage(conn)); + disconnect_and_exit(1); + } + + /* Set the last reply timestamp */ + last_recv_timestamp = localGetCurrentTimestamp(); + + /* Some data is received, so go back read them in buffer*/ + continue; + } if (r == -1) { /* *************** *** 755,765 **** ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) } else if (r == -2) { fprintf(stderr, _("%s: could not read COPY data: %s"), progname, PQerrorMessage(conn)); disconnect_and_exit(1); } ! if (file == NULL) { int filemode; --- 885,898 ---- } else if (r == -2) { + fprintf(stderr, "\n"); fprintf(stderr, _("%s: could not read COPY data: %s"), progname, PQerrorMessage(conn)); disconnect_and_exit(1); } ! ! /* Set the last reply timestamp */ ! last_recv_timestamp = localGetCurrentTimestamp(); if (file == NULL) { int filemode; *************** *** 953,961 **** BaseBackup(void) char xlogend[64]; /* ! * Connect in replication mode to the server */ ! conn = GetConnection(); if (!conn) /* Error message already written in GetConnection() */ exit(1); --- 1086,1094 ---- char xlogend[64]; /* ! * Connect in replication mode to the server. Sending connect_timeout. */ ! conn = GetConnection(standby_connect_timeout); if (!conn) /* Error message already written in GetConnection() */ exit(1); *************** *** 1254,1259 **** main(int argc, char **argv) --- 1387,1394 ---- {"no-password", no_argument, NULL, 'w'}, {"password", no_argument, NULL, 'W'}, {"status-interval", required_argument, NULL, 's'}, + {"recvtimeout", required_argument, NULL, 'r'}, + {"conntimeout", required_argument, NULL, 't'}, {"verbose", no_argument, NULL, 'v'}, {"progress", no_argument, NULL, 'P'}, {NULL, 0, NULL, 0} *************** *** 1280,1286 **** main(int argc, char **argv) } } ! while ((c = getopt_long(argc, argv, "D:F:xX:l:zZ:c:h:p:U:s:wWvP", long_options, &option_index)) != -1) { switch (c) --- 1415,1421 ---- } } ! while ((c = getopt_long(argc, argv, "D:F:xX:l:zZ:c:h:p:U:s:r:t:wWvP", long_options, &option_index)) != -1) { switch (c) *************** *** 1392,1397 **** main(int argc, char **argv) --- 1527,1552 ---- exit(1); } break; + case 'r': + standby_recv_timeout = atoi(optarg)*1000; + if (standby_recv_timeout < 0) + { + fprintf(stderr, _("%s: invalid recv timeout \"%s\"\n"), + progname, optarg); + exit(1); + } + + break; + case 't': + if (atoi(optarg) < 0) + { + fprintf(stderr, _("%s: invalid connect timeout \"%s\"\n"), + progname, optarg); + exit(1); + } + + standby_connect_timeout = pg_strdup(optarg); + break; case 'v': verbose++; break; *** a/src/bin/pg_basebackup/pg_receivexlog.c --- b/src/bin/pg_basebackup/pg_receivexlog.c *************** *** 41,46 **** char *basedir = NULL; --- 41,48 ---- int verbose = 0; int noloop = 0; int standby_message_timeout = 10 * 1000; /* 10 sec = default */ + int standby_recv_timeout = 60*1000; /* 60 sec = default */ + char *standby_connect_timeout = NULL; volatile bool time_to_abort = false; *************** *** 69,74 **** usage(void) --- 71,80 ---- printf(_(" -p, --port=PORT database server port number\n")); printf(_(" -s, --status-interval=INTERVAL\n" " time between status packets sent to server (in seconds)\n")); + printf(_(" -r, --recvtimeout=INTERVAL time that receiver waits for communication from\n" + " server (in seconds)\n")); + printf(_(" -t, --conntimeout=INTERVAL time that client wait for connection to establish\n" + " with server (in seconds)\n")); printf(_(" -U, --username=NAME connect as specified database user\n")); printf(_(" -w, --no-password never prompt for password\n")); printf(_(" -W, --password force password prompt (should happen automatically)\n")); *************** *** 224,232 **** StreamLog(void) lo; /* ! * Connect in replication mode to the server */ ! conn = GetConnection(); if (!conn) /* Error message already written in GetConnection() */ return; --- 230,239 ---- lo; /* ! * Connect in replication mode to the server, Sending connect_timeout ! * as configured, there is no need for rw_timeout. */ ! conn = GetConnection(standby_connect_timeout); if (!conn) /* Error message already written in GetConnection() */ return; *************** *** 280,286 **** StreamLog(void) timeline); ReceiveXlogStream(conn, startpos, timeline, NULL, basedir, ! stop_streaming, standby_message_timeout, false); PQfinish(conn); } --- 287,294 ---- timeline); ReceiveXlogStream(conn, startpos, timeline, NULL, basedir, ! stop_streaming, standby_message_timeout, ! standby_recv_timeout, false); PQfinish(conn); } *************** *** 312,317 **** main(int argc, char **argv) --- 320,327 ---- {"no-password", no_argument, NULL, 'w'}, {"password", no_argument, NULL, 'W'}, {"status-interval", required_argument, NULL, 's'}, + {"recvtimeout", required_argument, NULL, 'r'}, + {"conntimeout", required_argument, NULL, 't'}, {"verbose", no_argument, NULL, 'v'}, {NULL, 0, NULL, 0} }; *************** *** 336,342 **** main(int argc, char **argv) } } ! while ((c = getopt_long(argc, argv, "D:h:p:U:s:nwWv", long_options, &option_index)) != -1) { switch (c) --- 346,352 ---- } } ! while ((c = getopt_long(argc, argv, "D:h:p:U:s:r:t:nwWv", long_options, &option_index)) != -1) { switch (c) *************** *** 374,379 **** main(int argc, char **argv) --- 384,409 ---- exit(1); } break; + case 'r': + standby_recv_timeout = atoi(optarg)*1000; + if (standby_recv_timeout < 0) + { + fprintf(stderr, _("%s: invalid recv timeout \"%s\"\n"), + progname, optarg); + exit(1); + } + break; + case 't': + if (atoi(optarg) < 0) + { + fprintf(stderr, _("%s: invalid connect timeout \"%s\"\n"), + progname, optarg); + exit(1); + } + + standby_connect_timeout = pg_strdup(optarg); + break; + case 'n': noloop = 1; break; *** a/src/bin/pg_basebackup/receivelog.c --- b/src/bin/pg_basebackup/receivelog.c *************** *** 190,196 **** close_walfile(char *basedir, char *walname, bool segment_complete) * backend code. The protocol always uses integer timestamps, regardless of * server setting. */ ! static int64 localGetCurrentTimestamp(void) { int64 result; --- 190,196 ---- * backend code. The protocol always uses integer timestamps, regardless of * server setting. */ ! int64 localGetCurrentTimestamp(void) { int64 result; *************** *** 210,216 **** localGetCurrentTimestamp(void) * Local version of TimestampDifference(), since we are not linked with * backend code. */ ! static void localTimestampDifference(int64 start_time, int64 stop_time, long *secs, int *microsecs) { --- 210,216 ---- * Local version of TimestampDifference(), since we are not linked with * backend code. */ ! void localTimestampDifference(int64 start_time, int64 stop_time, long *secs, int *microsecs) { *************** *** 232,238 **** localTimestampDifference(int64 start_time, int64 stop_time, * Local version of TimestampDifferenceExceeds(), since we are not * linked with backend code. */ ! static bool localTimestampDifferenceExceeds(int64 start_time, int64 stop_time, int msec) --- 232,238 ---- * Local version of TimestampDifferenceExceeds(), since we are not * linked with backend code. */ ! bool localTimestampDifferenceExceeds(int64 start_time, int64 stop_time, int msec) *************** *** 342,348 **** bool ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, stream_stop_callback stream_stop, ! int standby_message_timeout, bool rename_partial) { char query[128]; char current_walfile_name[MAXPGPATH]; --- 342,349 ---- ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, stream_stop_callback stream_stop, ! int standby_message_timeout, ! int standby_recv_timeout, bool rename_partial) { char query[128]; char current_walfile_name[MAXPGPATH]; *************** *** 350,355 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, --- 351,358 ---- char *copybuf = NULL; int64 last_status = -1; XLogRecPtr blockpos = InvalidXLogRecPtr; + int64 last_recv_timestamp; + bool ping_sent = false; if (sysidentifier != NULL) { *************** *** 403,408 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, --- 406,415 ---- } PQclear(res); + /* Set the last reply timestamp */ + last_recv_timestamp = localGetCurrentTimestamp(); + ping_sent = false; + /* * Receive the actual xlog data */ *************** *** 486,495 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, if (r == 0 || (r < 0 && errno == EINTR)) { /* ! * Got a timeout or signal. Continue the loop and either ! * deliver a status packet to the server or just go back into * blocking. */ continue; } else if (r < 0) --- 493,531 ---- if (r == 0 || (r < 0 && errno == EINTR)) { /* ! * Got a timeout or signal. Before Continuing the loop, check for timeout. ! * and then either deliver a status packet to the server or just go back into * blocking. */ + if (standby_recv_timeout > 0) + { + now = localGetCurrentTimestamp(); + if (localTimestampDifferenceExceeds(last_recv_timestamp, now, standby_recv_timeout)) + { + fprintf(stderr, _("%s: terminating XLogStream receiver due to timeout\n"), + progname); + goto error; + } + + /* + * We didn't receive anything new, for half of receiver + * replication timeout. Ping the server, if not already done. + */ + if (!ping_sent) + { + if (localTimestampDifferenceExceeds(last_recv_timestamp, now, (standby_recv_timeout/2))) + { + if (!sendFeedback(conn, blockpos, now, true)) + { + goto error; + } + + last_status = now; + ping_sent = true; + } + } + } + continue; } else if (r < 0) *************** *** 506,511 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, --- 542,552 ---- progname, PQerrorMessage(conn)); goto error; } + + /* Set the last reply timestamp */ + last_recv_timestamp = localGetCurrentTimestamp(); + ping_sent = false; + continue; } if (r == -1) *************** *** 518,524 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, goto error; } ! /* Check the message type. */ if (copybuf[0] == 'k') { int pos; --- 559,568 ---- goto error; } ! /* Set the last reply timestamp */ ! last_recv_timestamp = localGetCurrentTimestamp(); ! ping_sent = false; ! if (copybuf[0] == 'k') { int pos; *** a/src/bin/pg_basebackup/receivelog.h --- b/src/bin/pg_basebackup/receivelog.h *************** *** 7,16 **** typedef bool (*stream_stop_callback) (XLogRecPtr segendpos, uint32 timeline, bool segment_finished); extern bool ReceiveXlogStream(PGconn *conn, ! XLogRecPtr startpos, ! uint32 timeline, ! char *sysidentifier, ! char *basedir, ! stream_stop_callback stream_stop, ! int standby_message_timeout, ! bool rename_partial); --- 7,24 ---- typedef bool (*stream_stop_callback) (XLogRecPtr segendpos, uint32 timeline, bool segment_finished); extern bool ReceiveXlogStream(PGconn *conn, ! XLogRecPtr startpos, ! uint32 timeline, ! char *sysidentifier, ! char *basedir, ! stream_stop_callback stream_stop, ! int standby_message_timeout, ! int standby_recv_timeout, ! bool rename_partial); ! ! extern int64 localGetCurrentTimestamp(void); ! extern void localTimestampDifference(int64 start_time, int64 stop_time, ! long *secs, int *microsecs); ! extern bool localTimestampDifferenceExceeds(int64 start_time, ! int64 stop_time, ! int msec); *** a/src/bin/pg_basebackup/streamutil.c --- b/src/bin/pg_basebackup/streamutil.c *************** *** 72,80 **** pg_malloc0(size_t size) * Connect to the server. Returns a valid PGconn pointer if connected, * or NULL on non-permanent error. On permanent error, the function will * call exit(1) directly. */ PGconn * ! GetConnection(void) { PGconn *tmpconn; int argcount = 4; /* dbname, replication, fallback_app_name, --- 72,82 ---- * Connect to the server. Returns a valid PGconn pointer if connected, * or NULL on non-permanent error. On permanent error, the function will * call exit(1) directly. + * Set conn_timeout to PGconn structure if their value + * is not NULL. */ PGconn * ! GetConnection(char *conn_timeout) { PGconn *tmpconn; int argcount = 4; /* dbname, replication, fallback_app_name, *************** *** 91,96 **** GetConnection(void) --- 93,100 ---- argcount++; if (dbport) argcount++; + if (conn_timeout) + argcount++; keywords = pg_malloc0((argcount + 1) * sizeof(*keywords)); values = pg_malloc0((argcount + 1) * sizeof(*values)); *************** *** 120,125 **** GetConnection(void) --- 124,135 ---- values[i] = dbport; i++; } + if (conn_timeout != NULL) + { + keywords[i] = "connect_timeout"; + values[i] = conn_timeout; + i++; + } while (true) { *** a/src/bin/pg_basebackup/streamutil.h --- b/src/bin/pg_basebackup/streamutil.h *************** *** 19,22 **** extern PGconn *conn; extern char *pg_strdup(const char *s); extern void *pg_malloc0(size_t size); ! extern PGconn *GetConnection(void); --- 19,22 ---- extern char *pg_strdup(const char *s); extern void *pg_malloc0(size_t size); ! PGconn *GetConnection(char *conn_timeout);