*** a/doc/src/sgml/ref/pg_receivexlog.sgml
--- b/doc/src/sgml/ref/pg_receivexlog.sgml
***************
*** 106,111 **** PostgreSQL documentation
--- 106,126 ----
+
+
+
+
+ How often should pg_receivexlog issue sync
+ commands to ensure the received WAL file is safely flushed to disk.
+ Specifying an interval of 0 issuing fsyncs at
+ every consecutive data. Not specifying an interval issuing fsyncs
+ at only WAL file close, while still reporting progress the server.
+ In this case, data may be lost in the event of a crash.
+
+
+
+
+
*** a/src/bin/pg_basebackup/pg_basebackup.c
--- b/src/bin/pg_basebackup/pg_basebackup.c
***************
*** 370,376 **** LogStreamerMain(logstreamer_param *param)
if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
param->sysidentifier, param->xlogdir,
reached_end_position, standby_message_timeout,
! NULL))
/*
* Any errors will already have been reported in the function process,
--- 370,376 ----
if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
param->sysidentifier, param->xlogdir,
reached_end_position, standby_message_timeout,
! NULL, -1))
/*
* Any errors will already have been reported in the function process,
*** a/src/bin/pg_basebackup/pg_receivexlog.c
--- b/src/bin/pg_basebackup/pg_receivexlog.c
***************
*** 36,41 **** static char *basedir = NULL;
--- 36,42 ----
static int verbose = 0;
static int noloop = 0;
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
+ static int fsync_interval = -1; /* Invalid = default */
static volatile bool time_to_abort = false;
***************
*** 62,67 **** usage(void)
--- 63,71 ----
printf(_("\nOptions:\n"));
printf(_(" -D, --directory=DIR receive transaction log files into this directory\n"));
printf(_(" -n, --no-loop do not loop on connection lost\n"));
+ printf(_(" -F --fsync-interval=INTERVAL\n"
+ " frequency of syncs to the transaction log files (in seconds)\n"
+ " (default: file close only)\n"));
printf(_(" -v, --verbose output verbose messages\n"));
printf(_(" -V, --version output version information, then exit\n"));
printf(_(" -?, --help show this help, then exit\n"));
***************
*** 330,336 **** StreamLog(void)
starttli);
ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
! stop_streaming, standby_message_timeout, ".partial");
PQfinish(conn);
}
--- 334,340 ----
starttli);
ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
! stop_streaming, standby_message_timeout, ".partial", fsync_interval);
PQfinish(conn);
}
***************
*** 360,365 **** main(int argc, char **argv)
--- 364,370 ----
{"port", required_argument, NULL, 'p'},
{"username", required_argument, NULL, 'U'},
{"no-loop", no_argument, NULL, 'n'},
+ {"fsync-interval", required_argument, NULL, 'F'},
{"no-password", no_argument, NULL, 'w'},
{"password", no_argument, NULL, 'W'},
{"status-interval", required_argument, NULL, 's'},
***************
*** 389,395 **** main(int argc, char **argv)
}
}
! while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWv",
long_options, &option_index)) != -1)
{
switch (c)
--- 394,400 ----
}
}
! while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nF:wWv",
long_options, &option_index)) != -1)
{
switch (c)
***************
*** 436,441 **** main(int argc, char **argv)
--- 441,455 ----
case 'n':
noloop = 1;
break;
+ case 'F':
+ fsync_interval = atoi(optarg) * 1000;
+ if (fsync_interval < 0)
+ {
+ fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"),
+ progname, optarg);
+ exit(1);
+ }
+ break;
case 'v':
verbose++;
break;
*** a/src/bin/pg_basebackup/receivelog.c
--- b/src/bin/pg_basebackup/receivelog.c
***************
*** 30,40 **** static int walfile = -1;
static char current_walfile_name[MAXPGPATH] = "";
static bool reportFlushPosition = false;
static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
uint32 timeline, char *basedir,
stream_stop_callback stream_stop, int standby_message_timeout,
! char *partial_suffix, XLogRecPtr *stoppos);
static int CopyStreamPoll(PGconn *conn, long timeout_ms);
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
--- 30,41 ----
static char current_walfile_name[MAXPGPATH] = "";
static bool reportFlushPosition = false;
static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
+ static int64 output_last_fsync = -1;
static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
uint32 timeline, char *basedir,
stream_stop_callback stream_stop, int standby_message_timeout,
! char *partial_suffix, XLogRecPtr *stoppos,int fsync_interval);
static int CopyStreamPoll(PGconn *conn, long timeout_ms);
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
***************
*** 187,193 **** close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
fprintf(stderr,
_("%s: not renaming \"%s%s\", segment is not complete\n"),
progname, current_walfile_name, partial_suffix);
!
lastFlushPosition = pos;
return true;
}
--- 188,194 ----
fprintf(stderr,
_("%s: not renaming \"%s%s\", segment is not complete\n"),
progname, current_walfile_name, partial_suffix);
! output_last_fsync = feGetCurrentTimestamp();
lastFlushPosition = pos;
return true;
}
***************
*** 419,431 **** CheckServerVersionForStreaming(PGconn *conn)
* allows you to tell the difference between partial and completed files,
* so that you can continue later where you left.
*
* Note: The log position *must* be at a log segment start!
*/
bool
ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *sysidentifier, char *basedir,
stream_stop_callback stream_stop,
! int standby_message_timeout, char *partial_suffix)
{
char query[128];
char slotcmd[128];
--- 420,435 ----
* allows you to tell the difference between partial and completed files,
* so that you can continue later where you left.
*
+ * fsync_interval controls how often we flush to the received
+ * WAL file, in milliseconds.
+ *
* Note: The log position *must* be at a log segment start!
*/
bool
ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *sysidentifier, char *basedir,
stream_stop_callback stream_stop,
! int standby_message_timeout, char *partial_suffix, int fsync_interval)
{
char query[128];
char slotcmd[128];
***************
*** 570,576 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* Stream the WAL */
res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
standby_message_timeout, partial_suffix,
! &stoppos);
if (res == NULL)
goto error;
--- 574,580 ----
/* Stream the WAL */
res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
standby_message_timeout, partial_suffix,
! &stoppos, fsync_interval);
if (res == NULL)
goto error;
***************
*** 731,737 **** static PGresult *
HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
! XLogRecPtr *stoppos)
{
char *copybuf = NULL;
int64 last_status = -1;
--- 735,741 ----
HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
! XLogRecPtr *stoppos, int fsync_interval)
{
char *copybuf = NULL;
int64 last_status = -1;
***************
*** 747,752 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
--- 751,758 ----
int64 now;
int hdr_len;
long sleeptime;
+ int64 message_target = 0;
+ int64 fsync_target = 0;
/*
* Check if we should continue streaming, or abort at this point.
***************
*** 780,796 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
goto error;
last_status = now;
}
!
! /*
! * Compute how long send/receive loops should sleep
! */
! if (standby_message_timeout && still_sending)
{
int64 targettime;
long secs;
int usecs;
! targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
feTimestampDifference(now,
targettime,
&secs,
--- 786,814 ----
goto error;
last_status = now;
}
!
! /* Compute when we need to wakeup to send a keepalive message. */
! if (standby_message_timeout)
! message_target = last_status + (standby_message_timeout - 1) *
! ((int64) 1000);
!
! /* Compute when we need to wakeup to fsync the output file. */
! if (fsync_interval > 0 && lastFlushPosition < blockpos)
! fsync_target = output_last_fsync + (fsync_interval - 1) *
! ((int64) 1000);
!
! /* Now compute when to wakeup. Compute how long send/receive loops should sleep*/
! if (still_sending && (message_target > 0 || fsync_target > 0))
{
int64 targettime;
long secs;
int usecs;
! targettime = message_target;
!
! if (fsync_target > 0 && fsync_target < targettime)
! targettime = fsync_target;
!
feTimestampDifference(now,
targettime,
&secs,
***************
*** 808,1016 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
sleeptime = -1;
r = CopyStreamReceive(conn, sleeptime, ©buf);
! if (r == 0)
! continue;
! if (r == -1)
! goto error;
! if (r == -2)
{
! PGresult *res = PQgetResult(conn);
! /*
! * The server closed its end of the copy stream. If we haven't
! * closed ours already, we need to do so now, unless the server
! * threw an error, in which case we don't.
! */
! if (still_sending)
{
! if (!close_walfile(basedir, partial_suffix, blockpos))
{
! /* Error message written in close_walfile() */
! PQclear(res);
goto error;
}
! if (PQresultStatus(res) == PGRES_COPY_IN)
{
! if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
{
fprintf(stderr,
! _("%s: could not send copy-end packet: %s"),
! progname, PQerrorMessage(conn));
! PQclear(res);
goto error;
}
- res = PQgetResult(conn);
}
- still_sending = false;
- }
- if (copybuf != NULL)
- PQfreemem(copybuf);
- copybuf = NULL;
- *stoppos = blockpos;
- return res;
- }
! /* Check the message type. */
! if (copybuf[0] == 'k')
! {
! int pos;
! bool replyRequested;
! /*
! * Parse the keepalive message, enclosed in the CopyData message.
! * We just check if the server requested a reply, and ignore the
! * rest.
! */
! pos = 1; /* skip msgtype 'k' */
! pos += 8; /* skip walEnd */
! pos += 8; /* skip sendTime */
! if (r < pos + 1)
! {
! fprintf(stderr, _("%s: streaming header too small: %d\n"),
! progname, r);
! goto error;
! }
! replyRequested = copybuf[pos];
! /* If the server requested an immediate reply, send one. */
! if (replyRequested && still_sending)
! {
! now = feGetCurrentTimestamp();
! if (!sendFeedback(conn, blockpos, now, false))
! goto error;
! last_status = now;
! }
! }
! else if (copybuf[0] == 'w')
! {
! /*
! * Once we've decided we don't want to receive any more, just
! * ignore any subsequent XLogData messages.
! */
! if (!still_sending)
! continue;
! /*
! * Read the header of the XLogData message, enclosed in the
! * CopyData message. We only need the WAL location field
! * (dataStart), the rest of the header is ignored.
! */
! hdr_len = 1; /* msgtype 'w' */
! hdr_len += 8; /* dataStart */
! hdr_len += 8; /* walEnd */
! hdr_len += 8; /* sendTime */
! if (r < hdr_len)
! {
! fprintf(stderr, _("%s: streaming header too small: %d\n"),
! progname, r);
! goto error;
! }
! blockpos = fe_recvint64(©buf[1]);
! /* Extract WAL location for this block */
! xlogoff = blockpos % XLOG_SEG_SIZE;
! /*
! * Verify that the initial location in the stream matches where we
! * think we are.
! */
! if (walfile == -1)
! {
! /* No file open yet */
! if (xlogoff != 0)
! {
! fprintf(stderr,
! _("%s: received transaction log record for offset %u with no file open\n"),
! progname, xlogoff);
! goto error;
}
}
else
{
! /* More data in existing segment */
! /* XXX: store seek value don't reseek all the time */
! if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
{
! fprintf(stderr,
! _("%s: got WAL data offset %08x, expected %08x\n"),
! progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
goto error;
}
}
!
! bytes_left = r - hdr_len;
! bytes_written = 0;
!
! while (bytes_left)
{
! int bytes_to_write;
!
! /*
! * If crossing a WAL boundary, only write up until we reach
! * XLOG_SEG_SIZE.
! */
! if (xlogoff + bytes_left > XLOG_SEG_SIZE)
! bytes_to_write = XLOG_SEG_SIZE - xlogoff;
! else
! bytes_to_write = bytes_left;
!
! if (walfile == -1)
{
! if (!open_walfile(blockpos, timeline,
! basedir, partial_suffix))
{
! /* Error logged by open_walfile */
goto error;
}
}
! if (write(walfile,
! copybuf + hdr_len + bytes_written,
! bytes_to_write) != bytes_to_write)
{
! fprintf(stderr,
! _("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
! progname, bytes_to_write, current_walfile_name,
! strerror(errno));
goto error;
}
!
! /* Write was successful, advance our position */
! bytes_written += bytes_to_write;
! bytes_left -= bytes_to_write;
! blockpos += bytes_to_write;
! xlogoff += bytes_to_write;
!
! /* Did we reach the end of a WAL segment? */
! if (blockpos % XLOG_SEG_SIZE == 0)
{
! if (!close_walfile(basedir, partial_suffix, blockpos))
! /* Error message written in close_walfile() */
! goto error;
!
! xlogoff = 0;
!
! if (still_sending && stream_stop(blockpos, timeline, false))
{
! if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
! {
! fprintf(stderr, _("%s: could not send copy-end packet: %s"),
! progname, PQerrorMessage(conn));
! goto error;
! }
! still_sending = false;
! break; /* ignore the rest of this XLogData packet */
}
}
}
! /* No more data left to write, receive next copy packet */
! }
! else
! {
! fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
! progname, copybuf[0]);
! goto error;
}
}
--- 826,1073 ----
sleeptime = -1;
r = CopyStreamReceive(conn, sleeptime, ©buf);
! while(r > 0)
{
! /* Check the message type. */
! if (copybuf[0] == 'k')
! {
! int pos;
! bool replyRequested;
! /*
! * Parse the keepalive message, enclosed in the CopyData message.
! * We just check if the server requested a reply, and ignore the
! * rest.
! */
! pos = 1; /* skip msgtype 'k' */
! pos += 8; /* skip walEnd */
! pos += 8; /* skip sendTime */
!
! if (r < pos + 1)
! {
! fprintf(stderr, _("%s: streaming header too small: %d\n"),
! progname, r);
! goto error;
! }
! replyRequested = copybuf[pos];
!
! /* If the server requested an immediate reply, send one. */
! if (replyRequested && still_sending)
! {
! now = feGetCurrentTimestamp();
! if (!sendFeedback(conn, blockpos, now, false))
! goto error;
! last_status = now;
! }
! }
! else if (copybuf[0] == 'w')
{
! /*
! * Once we've decided we don't want to receive any more, just
! * ignore any subsequent XLogData messages.
! */
! if (!still_sending)
! break;
!
! /*
! * Read the header of the XLogData message, enclosed in the
! * CopyData message. We only need the WAL location field
! * (dataStart), the rest of the header is ignored.
! */
! hdr_len = 1; /* msgtype 'w' */
! hdr_len += 8; /* dataStart */
! hdr_len += 8; /* walEnd */
! hdr_len += 8; /* sendTime */
! if (r < hdr_len)
{
! fprintf(stderr, _("%s: streaming header too small: %d\n"),
! progname, r);
goto error;
}
! blockpos = fe_recvint64(©buf[1]);
!
! /* Extract WAL location for this block */
! xlogoff = blockpos % XLOG_SEG_SIZE;
!
! /*
! * Verify that the initial location in the stream matches where we
! * think we are.
! */
! if (walfile == -1)
{
! /* No file open yet */
! if (xlogoff != 0)
{
fprintf(stderr,
! _("%s: received transaction log record for offset %u with no file open\n"),
! progname, xlogoff);
! goto error;
! }
! }
! else
! {
! /* More data in existing segment */
! /* XXX: store seek value don't reseek all the time */
! if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
! {
! fprintf(stderr,
! _("%s: got WAL data offset %08x, expected %08x\n"),
! progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
goto error;
}
}
! bytes_left = r - hdr_len;
! bytes_written = 0;
! while (bytes_left)
! {
! int bytes_to_write;
!
! /*
! * If crossing a WAL boundary, only write up until we reach
! * XLOG_SEG_SIZE.
! */
! if (xlogoff + bytes_left > XLOG_SEG_SIZE)
! bytes_to_write = XLOG_SEG_SIZE - xlogoff;
! else
! bytes_to_write = bytes_left;
!
! if (walfile == -1)
! {
! if (!open_walfile(blockpos, timeline,
! basedir, partial_suffix))
! {
! /* Error logged by open_walfile */
! goto error;
! }
! }
! if (write(walfile,
! copybuf + hdr_len + bytes_written,
! bytes_to_write) != bytes_to_write)
! {
! fprintf(stderr,
! _("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
! progname, bytes_to_write, current_walfile_name,
! strerror(errno));
! goto error;
! }
! /* Write was successful, advance our position */
! bytes_written += bytes_to_write;
! bytes_left -= bytes_to_write;
! blockpos += bytes_to_write;
! xlogoff += bytes_to_write;
! /* Did we reach the end of a WAL segment? */
! if (blockpos % XLOG_SEG_SIZE == 0)
! {
! if (!close_walfile(basedir, partial_suffix, blockpos))
! /* Error message written in close_walfile() */
! goto error;
! xlogoff = 0;
! if (still_sending && stream_stop(blockpos, timeline, false))
! {
! if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
! {
! fprintf(stderr, _("%s: could not send copy-end packet: %s"),
! progname, PQerrorMessage(conn));
! goto error;
! }
! still_sending = false;
! break; /* ignore the rest of this XLogData packet */
! }
! }
}
+ /* No more data left to write, receive next copy packet */
}
else
{
! fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
! progname, copybuf[0]);
! goto error;
! }
! if (still_sending && stream_stop(blockpos, timeline, false))
! {
! if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
{
! fprintf(stderr, _("%s: could not send copy-end packet: %s"),
! progname, PQerrorMessage(conn));
goto error;
}
+ still_sending = false;
+ break; /* ignore the rest of this XLogData packet */
}
! r = CopyStreamReceive(conn, -1, ©buf);
! }
! if (r == 0)
! {
! /* --fsync-interval argument has been specified */
! if (fsync_interval >= 0)
{
! /* interval has been specified */
! if (fsync_interval > 0)
! {
! now = feGetCurrentTimestamp();
! if (!feTimestampDifferenceExceeds(output_last_fsync, now, fsync_interval))
! continue;
! output_last_fsync = now;
! }
! /* check the need for flush */
! if (walfile != -1 && lastFlushPosition < blockpos)
{
! if (fsync(walfile) != 0)
{
! fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
! progname, current_walfile_name, strerror(errno));
goto error;
}
+ lastFlushPosition = blockpos;
}
+ }
+ continue;
+ }
+ if (r == -1)
+ goto error;
+ if (r == -2)
+ {
+ PGresult *res = PQgetResult(conn);
! /*
! * The server closed its end of the copy stream. If we haven't
! * closed ours already, we need to do so now, unless the server
! * threw an error, in which case we don't.
! */
! if (still_sending)
! {
! if (!close_walfile(basedir, partial_suffix, blockpos))
{
! /* Error message written in close_walfile() */
! PQclear(res);
goto error;
}
! if (PQresultStatus(res) == PGRES_COPY_IN)
{
! if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
{
! fprintf(stderr,
! _("%s: could not send copy-end packet: %s"),
! progname, PQerrorMessage(conn));
! PQclear(res);
! goto error;
}
+ res = PQgetResult(conn);
}
+ still_sending = false;
}
! if (copybuf != NULL)
! PQfreemem(copybuf);
! copybuf = NULL;
! *stoppos = blockpos;
! return res;
}
}
*** a/src/bin/pg_basebackup/receivelog.h
--- b/src/bin/pg_basebackup/receivelog.h
***************
*** 16,19 **** extern bool ReceiveXlogStream(PGconn *conn,
char *basedir,
stream_stop_callback stream_stop,
int standby_message_timeout,
! char *partial_suffix);
--- 16,20 ----
char *basedir,
stream_stop_callback stream_stop,
int standby_message_timeout,
! char *partial_suffix,
! int fsync_interval);