*** a/doc/src/sgml/ref/pg_receivexlog.sgml --- b/doc/src/sgml/ref/pg_receivexlog.sgml *************** *** 106,111 **** PostgreSQL documentation --- 106,126 ---- + + + + + How often should pg_receivexlog issue sync + commands to ensure the received WAL file is safely flushed to disk. + Specifying an interval of 0 issuing fsyncs at + every consecutive data. Not specifying an interval issuing fsyncs + at only WAL file close, while still reporting progress the server. + In this case, data may be lost in the event of a crash. + + + + + *** a/src/bin/pg_basebackup/pg_basebackup.c --- b/src/bin/pg_basebackup/pg_basebackup.c *************** *** 370,376 **** LogStreamerMain(logstreamer_param *param) if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, param->sysidentifier, param->xlogdir, reached_end_position, standby_message_timeout, ! NULL)) /* * Any errors will already have been reported in the function process, --- 370,376 ---- if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, param->sysidentifier, param->xlogdir, reached_end_position, standby_message_timeout, ! NULL, -1)) /* * Any errors will already have been reported in the function process, *** a/src/bin/pg_basebackup/pg_receivexlog.c --- b/src/bin/pg_basebackup/pg_receivexlog.c *************** *** 36,41 **** static char *basedir = NULL; --- 36,42 ---- static int verbose = 0; static int noloop = 0; static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ + static int fsync_interval = -1; /* Invalid = default */ static volatile bool time_to_abort = false; *************** *** 62,67 **** usage(void) --- 63,71 ---- printf(_("\nOptions:\n")); printf(_(" -D, --directory=DIR receive transaction log files into this directory\n")); printf(_(" -n, --no-loop do not loop on connection lost\n")); + printf(_(" -F --fsync-interval=INTERVAL\n" + " frequency of syncs to the transaction log files (in seconds)\n" + " (default: file close only)\n")); printf(_(" -v, --verbose output verbose messages\n")); printf(_(" -V, --version output version information, then exit\n")); printf(_(" -?, --help show this help, then exit\n")); *************** *** 330,336 **** StreamLog(void) starttli); ReceiveXlogStream(conn, startpos, starttli, NULL, basedir, ! stop_streaming, standby_message_timeout, ".partial"); PQfinish(conn); } --- 334,340 ---- starttli); ReceiveXlogStream(conn, startpos, starttli, NULL, basedir, ! stop_streaming, standby_message_timeout, ".partial", fsync_interval); PQfinish(conn); } *************** *** 360,365 **** main(int argc, char **argv) --- 364,370 ---- {"port", required_argument, NULL, 'p'}, {"username", required_argument, NULL, 'U'}, {"no-loop", no_argument, NULL, 'n'}, + {"fsync-interval", required_argument, NULL, 'F'}, {"no-password", no_argument, NULL, 'w'}, {"password", no_argument, NULL, 'W'}, {"status-interval", required_argument, NULL, 's'}, *************** *** 389,395 **** main(int argc, char **argv) } } ! while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWv", long_options, &option_index)) != -1) { switch (c) --- 394,400 ---- } } ! while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nF:wWv", long_options, &option_index)) != -1) { switch (c) *************** *** 436,441 **** main(int argc, char **argv) --- 441,455 ---- case 'n': noloop = 1; break; + case 'F': + fsync_interval = atoi(optarg) * 1000; + if (fsync_interval < 0) + { + fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"), + progname, optarg); + exit(1); + } + break; case 'v': verbose++; break; *** a/src/bin/pg_basebackup/receivelog.c --- b/src/bin/pg_basebackup/receivelog.c *************** *** 30,40 **** static int walfile = -1; static char current_walfile_name[MAXPGPATH] = ""; static bool reportFlushPosition = false; static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr; static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, ! char *partial_suffix, XLogRecPtr *stoppos); static int CopyStreamPoll(PGconn *conn, long timeout_ms); static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer); --- 30,41 ---- static char current_walfile_name[MAXPGPATH] = ""; static bool reportFlushPosition = false; static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr; + static int64 output_last_fsync = -1; static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, ! char *partial_suffix, XLogRecPtr *stoppos,int fsync_interval); static int CopyStreamPoll(PGconn *conn, long timeout_ms); static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer); *************** *** 187,193 **** close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos) fprintf(stderr, _("%s: not renaming \"%s%s\", segment is not complete\n"), progname, current_walfile_name, partial_suffix); ! lastFlushPosition = pos; return true; } --- 188,194 ---- fprintf(stderr, _("%s: not renaming \"%s%s\", segment is not complete\n"), progname, current_walfile_name, partial_suffix); ! output_last_fsync = feGetCurrentTimestamp(); lastFlushPosition = pos; return true; } *************** *** 419,431 **** CheckServerVersionForStreaming(PGconn *conn) * allows you to tell the difference between partial and completed files, * so that you can continue later where you left. * * Note: The log position *must* be at a log segment start! */ bool ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, stream_stop_callback stream_stop, ! int standby_message_timeout, char *partial_suffix) { char query[128]; char slotcmd[128]; --- 420,435 ---- * allows you to tell the difference between partial and completed files, * so that you can continue later where you left. * + * fsync_interval controls how often we flush to the received + * WAL file, in milliseconds. + * * Note: The log position *must* be at a log segment start! */ bool ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, stream_stop_callback stream_stop, ! int standby_message_timeout, char *partial_suffix, int fsync_interval) { char query[128]; char slotcmd[128]; *************** *** 570,576 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* Stream the WAL */ res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop, standby_message_timeout, partial_suffix, ! &stoppos); if (res == NULL) goto error; --- 574,580 ---- /* Stream the WAL */ res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop, standby_message_timeout, partial_suffix, ! &stoppos, fsync_interval); if (res == NULL) goto error; *************** *** 731,737 **** static PGresult * HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, ! XLogRecPtr *stoppos) { char *copybuf = NULL; int64 last_status = -1; --- 735,741 ---- HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, ! XLogRecPtr *stoppos, int fsync_interval) { char *copybuf = NULL; int64 last_status = -1; *************** *** 747,752 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, --- 751,758 ---- int64 now; int hdr_len; long sleeptime; + int64 message_target = 0; + int64 fsync_target = 0; /* * Check if we should continue streaming, or abort at this point. *************** *** 780,796 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, goto error; last_status = now; } ! ! /* ! * Compute how long send/receive loops should sleep ! */ ! if (standby_message_timeout && still_sending) { int64 targettime; long secs; int usecs; ! targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000); feTimestampDifference(now, targettime, &secs, --- 786,814 ---- goto error; last_status = now; } ! ! /* Compute when we need to wakeup to send a keepalive message. */ ! if (standby_message_timeout) ! message_target = last_status + (standby_message_timeout - 1) * ! ((int64) 1000); ! ! /* Compute when we need to wakeup to fsync the output file. */ ! if (fsync_interval > 0 && lastFlushPosition < blockpos) ! fsync_target = output_last_fsync + (fsync_interval - 1) * ! ((int64) 1000); ! ! /* Now compute when to wakeup. Compute how long send/receive loops should sleep*/ ! if (still_sending && (message_target > 0 || fsync_target > 0)) { int64 targettime; long secs; int usecs; ! targettime = message_target; ! ! if (fsync_target > 0 && fsync_target < targettime) ! targettime = fsync_target; ! feTimestampDifference(now, targettime, &secs, *************** *** 808,1016 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, sleeptime = -1; r = CopyStreamReceive(conn, sleeptime, ©buf); ! if (r == 0) ! continue; ! if (r == -1) ! goto error; ! if (r == -2) { ! PGresult *res = PQgetResult(conn); ! /* ! * The server closed its end of the copy stream. If we haven't ! * closed ours already, we need to do so now, unless the server ! * threw an error, in which case we don't. ! */ ! if (still_sending) { ! if (!close_walfile(basedir, partial_suffix, blockpos)) { ! /* Error message written in close_walfile() */ ! PQclear(res); goto error; } ! if (PQresultStatus(res) == PGRES_COPY_IN) { ! if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) { fprintf(stderr, ! _("%s: could not send copy-end packet: %s"), ! progname, PQerrorMessage(conn)); ! PQclear(res); goto error; } - res = PQgetResult(conn); } - still_sending = false; - } - if (copybuf != NULL) - PQfreemem(copybuf); - copybuf = NULL; - *stoppos = blockpos; - return res; - } ! /* Check the message type. */ ! if (copybuf[0] == 'k') ! { ! int pos; ! bool replyRequested; ! /* ! * Parse the keepalive message, enclosed in the CopyData message. ! * We just check if the server requested a reply, and ignore the ! * rest. ! */ ! pos = 1; /* skip msgtype 'k' */ ! pos += 8; /* skip walEnd */ ! pos += 8; /* skip sendTime */ ! if (r < pos + 1) ! { ! fprintf(stderr, _("%s: streaming header too small: %d\n"), ! progname, r); ! goto error; ! } ! replyRequested = copybuf[pos]; ! /* If the server requested an immediate reply, send one. */ ! if (replyRequested && still_sending) ! { ! now = feGetCurrentTimestamp(); ! if (!sendFeedback(conn, blockpos, now, false)) ! goto error; ! last_status = now; ! } ! } ! else if (copybuf[0] == 'w') ! { ! /* ! * Once we've decided we don't want to receive any more, just ! * ignore any subsequent XLogData messages. ! */ ! if (!still_sending) ! continue; ! /* ! * Read the header of the XLogData message, enclosed in the ! * CopyData message. We only need the WAL location field ! * (dataStart), the rest of the header is ignored. ! */ ! hdr_len = 1; /* msgtype 'w' */ ! hdr_len += 8; /* dataStart */ ! hdr_len += 8; /* walEnd */ ! hdr_len += 8; /* sendTime */ ! if (r < hdr_len) ! { ! fprintf(stderr, _("%s: streaming header too small: %d\n"), ! progname, r); ! goto error; ! } ! blockpos = fe_recvint64(©buf[1]); ! /* Extract WAL location for this block */ ! xlogoff = blockpos % XLOG_SEG_SIZE; ! /* ! * Verify that the initial location in the stream matches where we ! * think we are. ! */ ! if (walfile == -1) ! { ! /* No file open yet */ ! if (xlogoff != 0) ! { ! fprintf(stderr, ! _("%s: received transaction log record for offset %u with no file open\n"), ! progname, xlogoff); ! goto error; } } else { ! /* More data in existing segment */ ! /* XXX: store seek value don't reseek all the time */ ! if (lseek(walfile, 0, SEEK_CUR) != xlogoff) { ! fprintf(stderr, ! _("%s: got WAL data offset %08x, expected %08x\n"), ! progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR)); goto error; } } ! ! bytes_left = r - hdr_len; ! bytes_written = 0; ! ! while (bytes_left) { ! int bytes_to_write; ! ! /* ! * If crossing a WAL boundary, only write up until we reach ! * XLOG_SEG_SIZE. ! */ ! if (xlogoff + bytes_left > XLOG_SEG_SIZE) ! bytes_to_write = XLOG_SEG_SIZE - xlogoff; ! else ! bytes_to_write = bytes_left; ! ! if (walfile == -1) { ! if (!open_walfile(blockpos, timeline, ! basedir, partial_suffix)) { ! /* Error logged by open_walfile */ goto error; } } ! if (write(walfile, ! copybuf + hdr_len + bytes_written, ! bytes_to_write) != bytes_to_write) { ! fprintf(stderr, ! _("%s: could not write %u bytes to WAL file \"%s\": %s\n"), ! progname, bytes_to_write, current_walfile_name, ! strerror(errno)); goto error; } ! ! /* Write was successful, advance our position */ ! bytes_written += bytes_to_write; ! bytes_left -= bytes_to_write; ! blockpos += bytes_to_write; ! xlogoff += bytes_to_write; ! ! /* Did we reach the end of a WAL segment? */ ! if (blockpos % XLOG_SEG_SIZE == 0) { ! if (!close_walfile(basedir, partial_suffix, blockpos)) ! /* Error message written in close_walfile() */ ! goto error; ! ! xlogoff = 0; ! ! if (still_sending && stream_stop(blockpos, timeline, false)) { ! if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) ! { ! fprintf(stderr, _("%s: could not send copy-end packet: %s"), ! progname, PQerrorMessage(conn)); ! goto error; ! } ! still_sending = false; ! break; /* ignore the rest of this XLogData packet */ } } } ! /* No more data left to write, receive next copy packet */ ! } ! else ! { ! fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"), ! progname, copybuf[0]); ! goto error; } } --- 826,1073 ---- sleeptime = -1; r = CopyStreamReceive(conn, sleeptime, ©buf); ! while(r > 0) { ! /* Check the message type. */ ! if (copybuf[0] == 'k') ! { ! int pos; ! bool replyRequested; ! /* ! * Parse the keepalive message, enclosed in the CopyData message. ! * We just check if the server requested a reply, and ignore the ! * rest. ! */ ! pos = 1; /* skip msgtype 'k' */ ! pos += 8; /* skip walEnd */ ! pos += 8; /* skip sendTime */ ! ! if (r < pos + 1) ! { ! fprintf(stderr, _("%s: streaming header too small: %d\n"), ! progname, r); ! goto error; ! } ! replyRequested = copybuf[pos]; ! ! /* If the server requested an immediate reply, send one. */ ! if (replyRequested && still_sending) ! { ! now = feGetCurrentTimestamp(); ! if (!sendFeedback(conn, blockpos, now, false)) ! goto error; ! last_status = now; ! } ! } ! else if (copybuf[0] == 'w') { ! /* ! * Once we've decided we don't want to receive any more, just ! * ignore any subsequent XLogData messages. ! */ ! if (!still_sending) ! break; ! ! /* ! * Read the header of the XLogData message, enclosed in the ! * CopyData message. We only need the WAL location field ! * (dataStart), the rest of the header is ignored. ! */ ! hdr_len = 1; /* msgtype 'w' */ ! hdr_len += 8; /* dataStart */ ! hdr_len += 8; /* walEnd */ ! hdr_len += 8; /* sendTime */ ! if (r < hdr_len) { ! fprintf(stderr, _("%s: streaming header too small: %d\n"), ! progname, r); goto error; } ! blockpos = fe_recvint64(©buf[1]); ! ! /* Extract WAL location for this block */ ! xlogoff = blockpos % XLOG_SEG_SIZE; ! ! /* ! * Verify that the initial location in the stream matches where we ! * think we are. ! */ ! if (walfile == -1) { ! /* No file open yet */ ! if (xlogoff != 0) { fprintf(stderr, ! _("%s: received transaction log record for offset %u with no file open\n"), ! progname, xlogoff); ! goto error; ! } ! } ! else ! { ! /* More data in existing segment */ ! /* XXX: store seek value don't reseek all the time */ ! if (lseek(walfile, 0, SEEK_CUR) != xlogoff) ! { ! fprintf(stderr, ! _("%s: got WAL data offset %08x, expected %08x\n"), ! progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR)); goto error; } } ! bytes_left = r - hdr_len; ! bytes_written = 0; ! while (bytes_left) ! { ! int bytes_to_write; ! ! /* ! * If crossing a WAL boundary, only write up until we reach ! * XLOG_SEG_SIZE. ! */ ! if (xlogoff + bytes_left > XLOG_SEG_SIZE) ! bytes_to_write = XLOG_SEG_SIZE - xlogoff; ! else ! bytes_to_write = bytes_left; ! ! if (walfile == -1) ! { ! if (!open_walfile(blockpos, timeline, ! basedir, partial_suffix)) ! { ! /* Error logged by open_walfile */ ! goto error; ! } ! } ! if (write(walfile, ! copybuf + hdr_len + bytes_written, ! bytes_to_write) != bytes_to_write) ! { ! fprintf(stderr, ! _("%s: could not write %u bytes to WAL file \"%s\": %s\n"), ! progname, bytes_to_write, current_walfile_name, ! strerror(errno)); ! goto error; ! } ! /* Write was successful, advance our position */ ! bytes_written += bytes_to_write; ! bytes_left -= bytes_to_write; ! blockpos += bytes_to_write; ! xlogoff += bytes_to_write; ! /* Did we reach the end of a WAL segment? */ ! if (blockpos % XLOG_SEG_SIZE == 0) ! { ! if (!close_walfile(basedir, partial_suffix, blockpos)) ! /* Error message written in close_walfile() */ ! goto error; ! xlogoff = 0; ! if (still_sending && stream_stop(blockpos, timeline, false)) ! { ! if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) ! { ! fprintf(stderr, _("%s: could not send copy-end packet: %s"), ! progname, PQerrorMessage(conn)); ! goto error; ! } ! still_sending = false; ! break; /* ignore the rest of this XLogData packet */ ! } ! } } + /* No more data left to write, receive next copy packet */ } else { ! fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"), ! progname, copybuf[0]); ! goto error; ! } ! if (still_sending && stream_stop(blockpos, timeline, false)) ! { ! if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) { ! fprintf(stderr, _("%s: could not send copy-end packet: %s"), ! progname, PQerrorMessage(conn)); goto error; } + still_sending = false; + break; /* ignore the rest of this XLogData packet */ } ! r = CopyStreamReceive(conn, -1, ©buf); ! } ! if (r == 0) ! { ! /* --fsync-interval argument has been specified */ ! if (fsync_interval >= 0) { ! /* interval has been specified */ ! if (fsync_interval > 0) ! { ! now = feGetCurrentTimestamp(); ! if (!feTimestampDifferenceExceeds(output_last_fsync, now, fsync_interval)) ! continue; ! output_last_fsync = now; ! } ! /* check the need for flush */ ! if (walfile != -1 && lastFlushPosition < blockpos) { ! if (fsync(walfile) != 0) { ! fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"), ! progname, current_walfile_name, strerror(errno)); goto error; } + lastFlushPosition = blockpos; } + } + continue; + } + if (r == -1) + goto error; + if (r == -2) + { + PGresult *res = PQgetResult(conn); ! /* ! * The server closed its end of the copy stream. If we haven't ! * closed ours already, we need to do so now, unless the server ! * threw an error, in which case we don't. ! */ ! if (still_sending) ! { ! if (!close_walfile(basedir, partial_suffix, blockpos)) { ! /* Error message written in close_walfile() */ ! PQclear(res); goto error; } ! if (PQresultStatus(res) == PGRES_COPY_IN) { ! if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) { ! fprintf(stderr, ! _("%s: could not send copy-end packet: %s"), ! progname, PQerrorMessage(conn)); ! PQclear(res); ! goto error; } + res = PQgetResult(conn); } + still_sending = false; } ! if (copybuf != NULL) ! PQfreemem(copybuf); ! copybuf = NULL; ! *stoppos = blockpos; ! return res; } } *** a/src/bin/pg_basebackup/receivelog.h --- b/src/bin/pg_basebackup/receivelog.h *************** *** 16,19 **** extern bool ReceiveXlogStream(PGconn *conn, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, ! char *partial_suffix); --- 16,20 ---- char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, ! char *partial_suffix, ! int fsync_interval);