*** a/doc/src/sgml/ref/pg_receivexlog.sgml
--- b/doc/src/sgml/ref/pg_receivexlog.sgml
***************
*** 196,201 **** PostgreSQL documentation
--- 196,212 ----
+
+
+
+
+ status packets sent to server as soon as after fsync.
+ If you want to report the flush position to the server, should use -S option.
+
+
+
+
+
*** a/src/bin/pg_basebackup/pg_basebackup.c
--- b/src/bin/pg_basebackup/pg_basebackup.c
***************
*** 370,376 **** LogStreamerMain(logstreamer_param *param)
if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
param->sysidentifier, param->xlogdir,
reached_end_position, standby_message_timeout,
! NULL, 0))
/*
* Any errors will already have been reported in the function process,
--- 370,376 ----
if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
param->sysidentifier, param->xlogdir,
reached_end_position, standby_message_timeout,
! NULL, 0, false))
/*
* Any errors will already have been reported in the function process,
*** a/src/bin/pg_basebackup/pg_receivexlog.c
--- b/src/bin/pg_basebackup/pg_receivexlog.c
***************
*** 38,44 **** static int noloop = 0;
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
static int fsync_interval = 0; /* 0 = default */
static volatile bool time_to_abort = false;
!
static void usage(void);
static XLogRecPtr FindStreamingStart(uint32 *tli);
--- 38,44 ----
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
static int fsync_interval = 0; /* 0 = default */
static volatile bool time_to_abort = false;
! static bool reply_fsync = false;
static void usage(void);
static XLogRecPtr FindStreamingStart(uint32 *tli);
***************
*** 74,79 **** usage(void)
--- 74,81 ----
printf(_(" -p, --port=PORT database server port number\n"));
printf(_(" -s, --status-interval=INTERVAL\n"
" time between status packets sent to server (in seconds)\n"));
+ printf(_(" -r, --reply-fsync status packets sent to server as soon as after fsync\n"
+ " If you want to report the flush position to the server, should use -S option.\n"));
printf(_(" -U, --username=NAME connect as specified database user\n"));
printf(_(" -w, --no-password never prompt for password\n"));
printf(_(" -W, --password force password prompt (should happen automatically)\n"));
***************
*** 334,340 **** StreamLog(void)
ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
stop_streaming, standby_message_timeout, ".partial",
! fsync_interval);
PQfinish(conn);
}
--- 336,342 ----
ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
stop_streaming, standby_message_timeout, ".partial",
! fsync_interval, reply_fsync);
PQfinish(conn);
}
***************
*** 368,373 **** main(int argc, char **argv)
--- 370,376 ----
{"no-password", no_argument, NULL, 'w'},
{"password", no_argument, NULL, 'W'},
{"status-interval", required_argument, NULL, 's'},
+ {"reply-fsync", no_argument, NULL, 'r'},
{"slot", required_argument, NULL, 'S'},
{"verbose", no_argument, NULL, 'v'},
{NULL, 0, NULL, 0}
***************
*** 394,400 **** main(int argc, char **argv)
}
}
! while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nF:wWv",
long_options, &option_index)) != -1)
{
switch (c)
--- 397,403 ----
}
}
! while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:rS:nF:wWv",
long_options, &option_index)) != -1)
{
switch (c)
***************
*** 435,440 **** main(int argc, char **argv)
--- 438,446 ----
exit(1);
}
break;
+ case 'r':
+ reply_fsync = true;
+ break;
case 'S':
replication_slot = pg_strdup(optarg);
break;
*** a/src/bin/pg_basebackup/receivelog.c
--- b/src/bin/pg_basebackup/receivelog.c
***************
*** 38,44 **** static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
uint32 timeline, char *basedir,
stream_stop_callback stream_stop, int standby_message_timeout,
char *partial_suffix, XLogRecPtr *stoppos,
! int fsync_interval);
static int CopyStreamPoll(PGconn *conn, long timeout_ms);
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
--- 38,44 ----
uint32 timeline, char *basedir,
stream_stop_callback stream_stop, int standby_message_timeout,
char *partial_suffix, XLogRecPtr *stoppos,
! int fsync_interval, bool reply_fsync);
static int CopyStreamPoll(PGconn *conn, long timeout_ms);
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
***************
*** 443,448 **** CheckServerVersionForStreaming(PGconn *conn)
--- 443,451 ----
* fsync_interval controls how often we flush to the received WAL file,
* in milliseconds.
*
+ * If 'reply_fsync' is true, status packets sent to server as soon as
+ * after fsync.
+ *
* Note: The log position *must* be at a log segment start!
*/
bool
***************
*** 450,456 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *sysidentifier, char *basedir,
stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
! int fsync_interval)
{
char query[128];
char slotcmd[128];
--- 453,459 ----
char *sysidentifier, char *basedir,
stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
! int fsync_interval, bool reply_fsync)
{
char query[128];
char slotcmd[128];
***************
*** 595,601 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* Stream the WAL */
res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
standby_message_timeout, partial_suffix,
! &stoppos, fsync_interval);
if (res == NULL)
goto error;
--- 598,604 ----
/* Stream the WAL */
res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
standby_message_timeout, partial_suffix,
! &stoppos, fsync_interval, reply_fsync);
if (res == NULL)
goto error;
***************
*** 760,770 **** static PGresult *
HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
! XLogRecPtr *stoppos, int fsync_interval)
{
char *copybuf = NULL;
int64 last_status = -1;
XLogRecPtr blockpos = startpos;
still_sending = true;
--- 763,774 ----
HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
! XLogRecPtr *stoppos, int fsync_interval, bool reply_fsync)
{
char *copybuf = NULL;
int64 last_status = -1;
XLogRecPtr blockpos = startpos;
+ XLogRecPtr lastFeedbackFlushPosition = startpos;
still_sending = true;
***************
*** 807,820 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/*
* Potentially send a status message to the master
*/
! if (still_sending && standby_message_timeout > 0 &&
! feTimestampDifferenceExceeds(last_status, now,
! standby_message_timeout))
{
/* Time to send feedback! */
if (!sendFeedback(conn, blockpos, now, false))
goto error;
last_status = now;
}
/*
--- 811,826 ----
/*
* Potentially send a status message to the master
*/
! if ((still_sending && standby_message_timeout > 0 &&
! feTimestampDifferenceExceeds(last_status, now, standby_message_timeout)) ||
! (reply_fsync && lastFeedbackFlushPosition != lastFlushPosition))
{
/* Time to send feedback! */
if (!sendFeedback(conn, blockpos, now, false))
goto error;
last_status = now;
+ if (reply_fsync)
+ lastFeedbackFlushPosition = lastFlushPosition;
}
/*
*** a/src/bin/pg_basebackup/receivelog.h
--- b/src/bin/pg_basebackup/receivelog.h
***************
*** 31,36 **** extern bool ReceiveXlogStream(PGconn *conn,
stream_stop_callback stream_stop,
int standby_message_timeout,
char *partial_suffix,
! int fsync_interval);
#endif /* RECEIVELOG_H */
--- 31,37 ----
stream_stop_callback stream_stop,
int standby_message_timeout,
char *partial_suffix,
! int fsync_interval,
! bool reply_fsync);
#endif /* RECEIVELOG_H */