*** a/doc/src/sgml/ref/pg_receivexlog.sgml --- b/doc/src/sgml/ref/pg_receivexlog.sgml *************** *** 196,201 **** PostgreSQL documentation --- 196,212 ---- + + + + + status packets sent to server as soon as after fsync. + If you want to report the flush position to the server, should use -S option. + + + + + *** a/src/bin/pg_basebackup/pg_basebackup.c --- b/src/bin/pg_basebackup/pg_basebackup.c *************** *** 370,376 **** LogStreamerMain(logstreamer_param *param) if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, param->sysidentifier, param->xlogdir, reached_end_position, standby_message_timeout, ! NULL, 0)) /* * Any errors will already have been reported in the function process, --- 370,376 ---- if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, param->sysidentifier, param->xlogdir, reached_end_position, standby_message_timeout, ! NULL, 0, false)) /* * Any errors will already have been reported in the function process, *** a/src/bin/pg_basebackup/pg_receivexlog.c --- b/src/bin/pg_basebackup/pg_receivexlog.c *************** *** 38,44 **** static int noloop = 0; static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ static int fsync_interval = 0; /* 0 = default */ static volatile bool time_to_abort = false; ! static void usage(void); static XLogRecPtr FindStreamingStart(uint32 *tli); --- 38,44 ---- static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ static int fsync_interval = 0; /* 0 = default */ static volatile bool time_to_abort = false; ! static bool reply_fsync = false; static void usage(void); static XLogRecPtr FindStreamingStart(uint32 *tli); *************** *** 74,79 **** usage(void) --- 74,81 ---- printf(_(" -p, --port=PORT database server port number\n")); printf(_(" -s, --status-interval=INTERVAL\n" " time between status packets sent to server (in seconds)\n")); + printf(_(" -r, --reply-fsync status packets sent to server as soon as after fsync\n" + " If you want to report the flush position to the server, should use -S option.\n")); printf(_(" -U, --username=NAME connect as specified database user\n")); printf(_(" -w, --no-password never prompt for password\n")); printf(_(" -W, --password force password prompt (should happen automatically)\n")); *************** *** 334,340 **** StreamLog(void) ReceiveXlogStream(conn, startpos, starttli, NULL, basedir, stop_streaming, standby_message_timeout, ".partial", ! fsync_interval); PQfinish(conn); } --- 336,342 ---- ReceiveXlogStream(conn, startpos, starttli, NULL, basedir, stop_streaming, standby_message_timeout, ".partial", ! fsync_interval, reply_fsync); PQfinish(conn); } *************** *** 368,373 **** main(int argc, char **argv) --- 370,376 ---- {"no-password", no_argument, NULL, 'w'}, {"password", no_argument, NULL, 'W'}, {"status-interval", required_argument, NULL, 's'}, + {"reply-fsync", no_argument, NULL, 'r'}, {"slot", required_argument, NULL, 'S'}, {"verbose", no_argument, NULL, 'v'}, {NULL, 0, NULL, 0} *************** *** 394,400 **** main(int argc, char **argv) } } ! while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nF:wWv", long_options, &option_index)) != -1) { switch (c) --- 397,403 ---- } } ! while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:rS:nF:wWv", long_options, &option_index)) != -1) { switch (c) *************** *** 435,440 **** main(int argc, char **argv) --- 438,446 ---- exit(1); } break; + case 'r': + reply_fsync = true; + break; case 'S': replication_slot = pg_strdup(optarg); break; *** a/src/bin/pg_basebackup/receivelog.c --- b/src/bin/pg_basebackup/receivelog.c *************** *** 38,44 **** static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, XLogRecPtr *stoppos, ! int fsync_interval); static int CopyStreamPoll(PGconn *conn, long timeout_ms); static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer); static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, --- 38,44 ---- uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, XLogRecPtr *stoppos, ! int fsync_interval, bool reply_fsync); static int CopyStreamPoll(PGconn *conn, long timeout_ms); static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer); static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, *************** *** 443,448 **** CheckServerVersionForStreaming(PGconn *conn) --- 443,451 ---- * fsync_interval controls how often we flush to the received WAL file, * in milliseconds. * + * If 'reply_fsync' is true, status packets sent to server as soon as + * after fsync. + * * Note: The log position *must* be at a log segment start! */ bool *************** *** 450,456 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, ! int fsync_interval) { char query[128]; char slotcmd[128]; --- 453,459 ---- char *sysidentifier, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, ! int fsync_interval, bool reply_fsync) { char query[128]; char slotcmd[128]; *************** *** 595,601 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* Stream the WAL */ res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop, standby_message_timeout, partial_suffix, ! &stoppos, fsync_interval); if (res == NULL) goto error; --- 598,604 ---- /* Stream the WAL */ res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop, standby_message_timeout, partial_suffix, ! &stoppos, fsync_interval, reply_fsync); if (res == NULL) goto error; *************** *** 760,770 **** static PGresult * HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, ! XLogRecPtr *stoppos, int fsync_interval) { char *copybuf = NULL; int64 last_status = -1; XLogRecPtr blockpos = startpos; still_sending = true; --- 763,774 ---- HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, ! XLogRecPtr *stoppos, int fsync_interval, bool reply_fsync) { char *copybuf = NULL; int64 last_status = -1; XLogRecPtr blockpos = startpos; + XLogRecPtr lastFeedbackFlushPosition = startpos; still_sending = true; *************** *** 807,820 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* * Potentially send a status message to the master */ ! if (still_sending && standby_message_timeout > 0 && ! feTimestampDifferenceExceeds(last_status, now, ! standby_message_timeout)) { /* Time to send feedback! */ if (!sendFeedback(conn, blockpos, now, false)) goto error; last_status = now; } /* --- 811,826 ---- /* * Potentially send a status message to the master */ ! if ((still_sending && standby_message_timeout > 0 && ! feTimestampDifferenceExceeds(last_status, now, standby_message_timeout)) || ! (reply_fsync && lastFeedbackFlushPosition != lastFlushPosition)) { /* Time to send feedback! */ if (!sendFeedback(conn, blockpos, now, false)) goto error; last_status = now; + if (reply_fsync) + lastFeedbackFlushPosition = lastFlushPosition; } /* *** a/src/bin/pg_basebackup/receivelog.h --- b/src/bin/pg_basebackup/receivelog.h *************** *** 31,36 **** extern bool ReceiveXlogStream(PGconn *conn, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, ! int fsync_interval); #endif /* RECEIVELOG_H */ --- 31,37 ---- stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, ! int fsync_interval, ! bool reply_fsync); #endif /* RECEIVELOG_H */