*** a/doc/src/sgml/ref/pg_receivexlog.sgml --- b/doc/src/sgml/ref/pg_receivexlog.sgml *************** *** 66,71 **** PostgreSQL documentation --- 66,78 ---- as possible. To avoid this behavior, use the -n parameter. + + + Synchronous mode offers the ability to confirm WAL have been streamed + in the same way as synchronous replication. To use synchronous mode, + set up synchronous replication as described in + , and set parameter(that is, -m and --slot). + *************** *** 106,111 **** PostgreSQL documentation --- 113,129 ---- + + + + + Enables synchronous mode. If replication slot is disabled then + this setting is irrelevant. + + + + + *** a/src/bin/pg_basebackup/pg_basebackup.c --- b/src/bin/pg_basebackup/pg_basebackup.c *************** *** 370,376 **** LogStreamerMain(logstreamer_param *param) if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, param->sysidentifier, param->xlogdir, reached_end_position, standby_message_timeout, ! NULL)) /* * Any errors will already have been reported in the function process, --- 370,376 ---- if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, param->sysidentifier, param->xlogdir, reached_end_position, standby_message_timeout, ! NULL, 0)) /* * Any errors will already have been reported in the function process, *** a/src/bin/pg_basebackup/pg_receivexlog.c --- b/src/bin/pg_basebackup/pg_receivexlog.c *************** *** 35,40 **** --- 35,41 ---- static char *basedir = NULL; static int verbose = 0; static int noloop = 0; + static int syncmode = 0; static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ static volatile bool time_to_abort = false; *************** *** 62,67 **** usage(void) --- 63,69 ---- printf(_("\nOptions:\n")); printf(_(" -D, --directory=DIR receive transaction log files into this directory\n")); printf(_(" -n, --no-loop do not loop on connection lost\n")); + printf(_(" -m, --sync-mode synchronous mode\n")); printf(_(" -v, --verbose output verbose messages\n")); printf(_(" -V, --version output version information, then exit\n")); printf(_(" -?, --help show this help, then exit\n")); *************** *** 330,336 **** StreamLog(void) starttli); ReceiveXlogStream(conn, startpos, starttli, NULL, basedir, ! stop_streaming, standby_message_timeout, ".partial"); PQfinish(conn); } --- 332,338 ---- starttli); ReceiveXlogStream(conn, startpos, starttli, NULL, basedir, ! stop_streaming, standby_message_timeout, ".partial", syncmode); PQfinish(conn); } *************** *** 360,365 **** main(int argc, char **argv) --- 362,368 ---- {"port", required_argument, NULL, 'p'}, {"username", required_argument, NULL, 'U'}, {"no-loop", no_argument, NULL, 'n'}, + {"sync-mode", no_argument, NULL, 'm'}, {"no-password", no_argument, NULL, 'w'}, {"password", no_argument, NULL, 'W'}, {"status-interval", required_argument, NULL, 's'}, *************** *** 389,395 **** main(int argc, char **argv) } } ! while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWv", long_options, &option_index)) != -1) { switch (c) --- 392,398 ---- } } ! while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWvm", long_options, &option_index)) != -1) { switch (c) *************** *** 436,441 **** main(int argc, char **argv) --- 439,447 ---- case 'n': noloop = 1; break; + case 'm': + syncmode = 1; + break; case 'v': verbose++; break; *** a/src/bin/pg_basebackup/receivelog.c --- b/src/bin/pg_basebackup/receivelog.c *************** *** 34,40 **** static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr; static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, ! char *partial_suffix, XLogRecPtr *stoppos); static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline); --- 34,40 ---- static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, ! char *partial_suffix, XLogRecPtr *stoppos, int syncmode); static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline); *************** *** 417,429 **** CheckServerVersionForStreaming(PGconn *conn) * allows you to tell the difference between partial and completed files, * so that you can continue later where you left. * * Note: The log position *must* be at a log segment start! */ bool ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, stream_stop_callback stream_stop, ! int standby_message_timeout, char *partial_suffix) { char query[128]; char slotcmd[128]; --- 417,432 ---- * allows you to tell the difference between partial and completed files, * so that you can continue later where you left. * + * If 'syncmode' is not zero, synchronous mode. Flush is executed after all + * received WAL is written, and reply flush position. + * * Note: The log position *must* be at a log segment start! */ bool ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, stream_stop_callback stream_stop, ! int standby_message_timeout, char *partial_suffix, int syncmode) { char query[128]; char slotcmd[128]; *************** *** 568,574 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* Stream the WAL */ res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop, standby_message_timeout, partial_suffix, ! &stoppos); if (res == NULL) goto error; --- 571,577 ---- /* Stream the WAL */ res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop, standby_message_timeout, partial_suffix, ! &stoppos, syncmode); if (res == NULL) goto error; *************** *** 729,740 **** static PGresult * HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, ! XLogRecPtr *stoppos) { char *copybuf = NULL; int64 last_status = -1; XLogRecPtr blockpos = startpos; bool still_sending = true; while (1) { --- 732,744 ---- HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, ! XLogRecPtr *stoppos, int syncmode) { char *copybuf = NULL; int64 last_status = -1; XLogRecPtr blockpos = startpos; bool still_sending = true; + bool flush_flg = false; while (1) { *************** *** 813,818 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, --- 817,827 ---- else timeout.tv_sec = secs; timeout.tv_usec = usecs; + if (syncmode) + { + timeout.tv_sec = 0; + timeout.tv_usec = 100000;/* sync mode sleep at 100 msec*/ + } timeoutptr = &timeout; } else *************** *** 826,831 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, --- 835,853 ---- * deliver a status packet to the server or just go back into * blocking. */ + if (flush_flg && walfile != -1 && syncmode) + { + if (fsync(walfile) != 0) + { + fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"), + progname, current_walfile_name, strerror(errno)); + goto error; + } + lastFlushPosition = blockpos; + if (!sendFeedback(conn, blockpos, now, false)) + goto error; + flush_flg = false; + } continue; } else if (r < 0) *************** *** 1041,1046 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, --- 1063,1069 ---- } } } + flush_flg = true; /* No more data left to write, receive next copy packet */ } else *** a/src/bin/pg_basebackup/receivelog.h --- b/src/bin/pg_basebackup/receivelog.h *************** *** 16,19 **** extern bool ReceiveXlogStream(PGconn *conn, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, ! char *partial_suffix); --- 16,20 ---- char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, ! char *partial_suffix, ! int syncmode);