diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml
index ac6ac5b..c91824a 100644
--- a/doc/src/sgml/ref/allfiles.sgml
+++ b/doc/src/sgml/ref/allfiles.sgml
@@ -172,6 +172,7 @@ Complete list of usable sgml source files in this directory.
+
diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml
index bbca5f5..566f506 100644
--- a/doc/src/sgml/ref/pg_basebackup.sgml
+++ b/doc/src/sgml/ref/pg_basebackup.sgml
@@ -144,8 +144,8 @@ PostgreSQL documentation
-
-
+
+
Includes the required transaction log files (WAL files) in the
@@ -155,16 +155,43 @@ PostgreSQL documentation
to consult the log archive, thus making this a completely standalone
backup.
-
-
- The transaction log files are collected at the end of the backup.
- Therefore, it is necessary for the
- parameter to be set high
- enough that the log is not removed before the end of the backup.
- If the log has been rotated when it's time to transfer it, the
- backup will fail and be unusable.
-
-
+
+ The following methods for collecting the transaction logs are
+ supported:
+
+
+
+ f
+ fetch
+
+
+ The transaction log files are collected at the end of the backup.
+ Therefore, it is necessary for the
+ parameter to be set high
+ enough that the log is not removed before the end of the backup.
+ If the log has been rotated when it's time to transfer it, the
+ backup will fail and be unusable.
+
+
+
+
+
+ s
+ stream
+
+
+ Stream the transaction log while the backup is created. This will
+ open a second connection to the server and start streaming the
+ transaction log in parallel while running the backup. Therefore,
+ it will use up two slots configured by the
+ parameter. As long as the
+ client can keep up with transaction log received, using this mode
+ requires no extra transaction logs to be saved on the master.
+
+
+
+
+
diff --git a/doc/src/sgml/ref/pg_receivexlog.sgml b/doc/src/sgml/ref/pg_receivexlog.sgml
new file mode 100644
index 0000000..fc07ae1
--- /dev/null
+++ b/doc/src/sgml/ref/pg_receivexlog.sgml
@@ -0,0 +1,257 @@
+
+
+
+
+ pg_receivexlog
+ 1
+ Application
+
+
+
+ pg_receivexlog
+ streams transaction logs from a PostgreSQL cluster
+
+
+
+ pg_receivexlog
+
+
+
+
+ pg_receivexlog
+ option>
+
+
+
+
+
+ Description
+
+
+ pg_receivexlog is used to stream transaction log
+ from a running PostgreSQL cluster. The transaction
+ log is streamed using the streaming replication protocol, and is written
+ to a local directory of files. This directory can be used as the archive
+ location for doing a restore using point-in-time recovery (see
+ ).
+
+
+
+ pg_receivexlog streams the transaction
+ log in real time as it's being generated on the server, and does not wait
+ for segments to complete like does.
+ For this reason, it is not necessary to set
+ when using
+ pg_receivexlog.
+
+
+
+ The transaction log is streamed over a regular
+ PostgreSQL connection, and uses the
+ replication protocol. The connection must be
+ made with a user having REPLICATION permissions (see
+ ), and the user must be granted explicit
+ permissions in pg_hba.conf. The server must also
+ be configured with set high enough
+ to leave at least one session available for the stream.
+
+
+
+
+ Options
+
+
+ The following command-line options control the location and format of the
+ output.
+
+
+
+
+
+
+
+ Directory to write the output to.
+
+
+ This parameter is required.
+
+
+
+
+
+
+ The following command-line options control the running of the program.
+
+
+
+
+
+
+
+ Enables verbose mode.
+
+
+
+
+
+
+
+
+ The following command-line options control the database connection parameters.
+
+
+
+
+
+
+
+ Specifies the host name of the machine on which the server is
+ running. If the value begins with a slash, it is used as the
+ directory for the Unix domain socket. The default is taken
+ from the PGHOST environment variable, if set,
+ else a Unix domain socket connection is attempted.
+
+
+
+
+
+
+
+
+
+ Specifies the TCP port or local Unix domain socket file
+ extension on which the server is listening for connections.
+ Defaults to the PGPORT environment variable, if
+ set, or a compiled-in default.
+
+
+
+
+
+
+
+
+
+ User name to connect as.
+
+
+
+
+
+
+
+
+
+ Never issue a password prompt. If the server requires
+ password authentication and a password is not available by
+ other means such as a .pgpass file, the
+ connection attempt will fail. This option can be useful in
+ batch jobs and scripts where no user is present to enter a
+ password.
+
+
+
+
+
+
+
+
+
+ Force pg_basebackup to prompt for a
+ password before connecting to a database.
+
+
+
+ This option is never essential, since
+ pg_bsaebackup will automatically prompt
+ for a password if the server demands password authentication.
+ However, pg_basebackup will waste a
+ connection attempt finding out that the server wants a password.
+ In some cases it is worth typing
+
+
+
+
+
+
+ Other, less commonly used, parameters are also available:
+
+
+
+
+
+
+
+ Print the pg_receivexlog version and exit.
+
+
+
+
+
+
+
+
+
+ Show help about pg_receivexlog command line
+ arguments, and exit.
+
+
+
+
+
+
+
+
+
+
+ Environment
+
+
+ This utility, like most other PostgreSQL> utilities,
+ uses the environment variables supported by libpq>
+ (see ).
+
+
+
+
+
+ Notes
+
+
+ When using pg_receivexlog instead of
+ , the server will continue to
+ recycle transaction log files even if the backups are not properly
+ archived, since there is no command that fails. This can be worked
+ around by having an that fails
+ when the file has not been properly archived yet.
+
+
+
+
+
+ Examples
+
+
+ To stream the transaction log from the server at
+ mydbserver and store it in the local directory
+ /usr/local/pgsql/archive:
+
+ $ pg_receivexlog -h mydbserver -D /usr/local/pgsql/archive
+
+
+
+
+
+ See Also
+
+
+
+
+
+
+
diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml
index 9ae8000..91d9820 100644
--- a/doc/src/sgml/reference.sgml
+++ b/doc/src/sgml/reference.sgml
@@ -212,6 +212,7 @@
&pgConfig;
&pgDump;
&pgDumpall;
+ &pgReceivexlog;
&pgRestore;
&psqlRef;
&reindexdb;
diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile
index ccb1502..38c9b74 100644
--- a/src/bin/pg_basebackup/Makefile
+++ b/src/bin/pg_basebackup/Makefile
@@ -18,21 +18,26 @@ include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
-OBJS= pg_basebackup.o $(WIN32RES)
+OBJS=receivelog.o streamutil.o $(WIN32RES)
-all: pg_basebackup
+all: pg_basebackup pg_receivexlog
-pg_basebackup: $(OBJS) | submake-libpq submake-libpgport
- $(CC) $(CFLAGS) $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
+pg_basebackup: pg_basebackup.o $(OBJS) | submake-libpq submake-libpgport
+ $(CC) $(CFLAGS) pg_basebackup.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
+
+pg_receivexlog: pg_receivexlog.o $(OBJS) | submake-libpq submake-libpgport
+ $(CC) $(CFLAGS) pg_receivexlog.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
install: all installdirs
$(INSTALL_PROGRAM) pg_basebackup$(X) '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
+ $(INSTALL_PROGRAM) pg_receivexlog$(X) '$(DESTDIR)$(bindir)/pg_receivexlog(X)'
installdirs:
$(MKDIR_P) '$(DESTDIR)$(bindir)'
uninstall:
rm -f '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
+ rm -f '$(DESTDIR)$(bindir)/pg_receivexlog$(X)'
clean distclean maintainer-clean:
- rm -f pg_basebackup$(X) $(OBJS)
+ rm -f pg_basebackup$(X) pg_receivexlog$(X) $(OBJS) pg_basebackup.o pg_receivexlog.o
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 61aa1d3..e56bbaa 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -17,6 +17,8 @@
#include
#include
#include
+#include
+#include
#ifdef HAVE_LIBZ
#include
@@ -24,9 +26,11 @@
#include "getopt_long.h"
+#include "receivelog.h"
+#include "streamutil.h"
+
/* Global options */
-static const char *progname;
char *basedir = NULL;
char format = 'p'; /* p(lain)/t(ar) */
char *label = "pg_basebackup base backup";
@@ -34,38 +38,37 @@ bool showprogress = false;
int verbose = 0;
int compresslevel = 0;
bool includewal = false;
+bool streamwal = false;
bool fastcheckpoint = false;
-char *dbhost = NULL;
-char *dbuser = NULL;
-char *dbport = NULL;
-int dbgetpassword = 0; /* 0=auto, -1=never, 1=always */
/* Progress counters */
static uint64 totalsize;
static uint64 totaldone;
static int tablespacecount;
-/* Connection kept global so we can disconnect easily */
-static PGconn *conn = NULL;
+/* Pipe to communicate with background wal receiver process */
+#ifndef WIN32
+static int bgpipe[2] = {-1, -1};
+#endif
-#define disconnect_and_exit(code) \
- { \
- if (conn != NULL) PQfinish(conn); \
- exit(code); \
- }
+/* Handle to child process */
+static pid_t bgchild = -1;
+
+/* End position for xlog streaming, empty string if unknown yet */
+static XLogRecPtr xlogendptr;
+static int has_xlogendptr = 0;
/* Function headers */
-static char *xstrdup(const char *s);
-static void *xmalloc0(int size);
static void usage(void);
static void verify_dir_is_empty_or_create(char *dirname);
static void progress_report(int tablespacenum, char *fn);
-static PGconn *GetConnection(void);
static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
static void BaseBackup();
+static bool segment_callback(XLogRecPtr segendpos, uint32 timeline);
+
#ifdef HAVE_LIBZ
static const char *
get_gz_error(gzFile *gzf)
@@ -81,39 +84,6 @@ get_gz_error(gzFile *gzf)
}
#endif
-/*
- * strdup() and malloc() replacements that prints an error and exits
- * if something goes wrong. Can never return NULL.
- */
-static char *
-xstrdup(const char *s)
-{
- char *result;
-
- result = strdup(s);
- if (!result)
- {
- fprintf(stderr, _("%s: out of memory\n"), progname);
- exit(1);
- }
- return result;
-}
-
-static void *
-xmalloc0(int size)
-{
- void *result;
-
- result = malloc(size);
- if (!result)
- {
- fprintf(stderr, _("%s: out of memory\n"), progname);
- exit(1);
- }
- MemSet(result, 0, size);
- return result;
-}
-
static void
usage(void)
@@ -125,7 +95,7 @@ usage(void)
printf(_("\nOptions controlling the output:\n"));
printf(_(" -D, --pgdata=directory receive base backup into directory\n"));
printf(_(" -F, --format=p|t output format (plain, tar)\n"));
- printf(_(" -x, --xlog include required WAL files in backup\n"));
+ printf(_(" -x, --xlog=fetch|stream include required WAL files in backup\n"));
printf(_(" -Z, --compress=0-9 compress tar output\n"));
printf(_("\nGeneral options:\n"));
printf(_(" -c, --checkpoint=fast|spread\n"
@@ -146,6 +116,195 @@ usage(void)
/*
+ * Called in the background process whenever a complete segment of WAL
+ * has been received.
+ * On Unix, we check to see if there is any data on our pipe
+ * (which would mean we have a stop position), and if it is, check if
+ * it is time to stop.
+ * On Windows, we are in a single process, so we can just check if it's
+ * time to stop.
+ */
+static bool
+segment_callback(XLogRecPtr segendpos, uint32 timeline)
+{
+ if (!has_xlogendptr)
+ {
+#ifndef WIN32
+ fd_set fds;
+ struct timeval tv;
+ int r;
+
+ /*
+ * Don't have the end pointer yet - check our pipe to see if it has
+ * been sent yet.
+ */
+ FD_ZERO(&fds);
+ FD_SET(bgpipe[0], &fds);
+
+ MemSet(&tv, 0, sizeof(tv));
+
+ r = select(bgpipe[0] + 1, &fds, NULL, NULL, &tv);
+ if (r == 1)
+ {
+ char xlogend[64];
+
+ MemSet(xlogend, 0, sizeof(xlogend));
+ r = piperead(bgpipe[0], xlogend, sizeof(xlogend));
+ if (r < 0)
+ {
+ fprintf(stderr, _("%s: could not read from ready pipe: %s\n"),
+ progname, strerror(errno));
+ exit(1);
+ }
+
+ if (sscanf(xlogend, "%X/%X", &xlogendptr.xlogid, &xlogendptr.xrecoff) != 2)
+ {
+ fprintf(stderr, _("%s: could not parse xlog end position \"%s\"\n"),
+ progname, xlogend);
+ exit(1);
+ }
+ has_xlogendptr = 1;
+
+ /*
+ * Fall through to check if we've reached the point further
+ * already.
+ */
+ }
+ else
+ {
+ /*
+ * No data received on the pipe means we don't know the end
+ * position yet - so just say it's not time to stop yet.
+ */
+ return false;
+ }
+#else
+
+ /*
+ * On win32, has_xlogendptr is set by the main thread, so if it's not
+ * set here, we just go back and wait until it shows up.
+ */
+ return false;
+#endif
+ }
+
+ /*
+ * At this point we have an end pointer, so compare it to the current
+ * position to figure out if it's time to stop.
+ */
+ if (segendpos.xlogid > xlogendptr.xlogid ||
+ (segendpos.xlogid == xlogendptr.xlogid &&
+ segendpos.xrecoff >= xlogendptr.xrecoff))
+ return true;
+
+ /*
+ * Have end pointer, but haven't reached it yet - so tell the caller to
+ * keep streaming.
+ */
+ return false;
+}
+
+typedef struct
+{
+ PGconn *bgconn;
+ XLogRecPtr startptr;
+ char xlogdir[MAXPGPATH];
+ int timeline;
+} logstreamer_param;
+
+static int
+LogStreamerMain(logstreamer_param * param)
+{
+ if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, param->xlogdir, segment_callback))
+
+ /*
+ * Any errors will already have been reported in the function process,
+ * but we need to tell the parent that we didn't shutdown in a nice
+ * way.
+ */
+ return 1;
+
+ PQfinish(param->bgconn);
+ return 0;
+}
+
+/*
+ * Initiate background process for receiving xlog during the backup.
+ * The background stream will use its own database connection so we can
+ * stream the logfile in parallel with the backups.
+ */
+static void
+StartLogStreamer(char *startpos, uint32 timeline)
+{
+ logstreamer_param *param;
+
+ param = xmalloc0(sizeof(logstreamer_param));
+ param->timeline = timeline;
+
+ /* Convert the starting position */
+ if (sscanf(startpos, "%X/%X", ¶m->startptr.xlogid, ¶m->startptr.xrecoff) != 2)
+ {
+ fprintf(stderr, _("%s: invalid format of xlog location: %s\n"),
+ progname, startpos);
+ disconnect_and_exit(1);
+ }
+ /* Round off to even segment position */
+ param->startptr.xrecoff -= param->startptr.xrecoff % XLOG_SEG_SIZE;
+
+#ifndef WIN32
+ /* Create our background pipe */
+ if (pgpipe(bgpipe) < 0)
+ {
+ fprintf(stderr, _("%s: could not create pipe for background process: %s\n"),
+ progname, strerror(errno));
+ disconnect_and_exit(1);
+ }
+#endif
+
+ /* Get a second connection */
+ param->bgconn = GetConnection();
+
+ /*
+ * Always in plain format, so we can write to basedir/pg_xlog. But the
+ * directory entry in the tar file may arrive later, so make sure it's
+ * created before we start.
+ */
+ snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir);
+ verify_dir_is_empty_or_create(param->xlogdir);
+
+ /*
+ * Start a child process and tell it to start streaming. On Unix, this is
+ * a fork(). On Windows, we create a thread.
+ */
+#ifndef WIN32
+ bgchild = fork();
+ if (bgchild == 0)
+ {
+ /* in child process */
+ exit(LogStreamerMain(param));
+ }
+ else if (bgchild < 0)
+ {
+ fprintf(stderr, _("%s: could not create background process: %s\n"),
+ progname, strerror(errno));
+ disconnect_and_exit(1);
+ }
+
+ /*
+ * Else we are in the parent process and all is well.
+ */
+#else /* WIN32 */
+ bgchild = _beginthreadex(NULL, 0, (void *) LogStreamerMain, param, 0, NULL);
+ if (bgchild == 0)
+ {
+ fprintf(stderr, _("%s: could not create background thread: %s\n"),
+ progname, strerror(errno));
+ disconnect_and_exit(1);
+ }
+#endif
+}
+
+/*
* Verify that the given directory exists and is empty. If it does not
* exist, it is created. If it exists but is not empty, an error will
* be give and the process ended.
@@ -202,13 +361,19 @@ verify_dir_is_empty_or_create(char *dirname)
static void
progress_report(int tablespacenum, char *fn)
{
- int percent = (int) ((totaldone / 1024) * 100 / totalsize);
+ int percent = (int) ((totaldone / 1024) * 100 / totalsize);
+
if (percent > 100)
percent = 100;
- if (verbose)
+ if (!fn)
+ fprintf(stderr,
+ INT64_FORMAT "/" INT64_FORMAT " kb g(100%%) %i/%i tablespaces %35s\r",
+ totaldone / 1024, totalsize,
+ tablespacenum, tablespacecount, "");
+ else if (verbose)
fprintf(stderr,
- INT64_FORMAT "/" INT64_FORMAT " kB (%i%%) %i/%i tablespaces (%-30s)\r",
+ INT64_FORMAT "/" INT64_FORMAT " kB (%i%%) %i/%i tablespaces (%-30.30s)\r",
totaldone / 1024, totalsize,
percent,
tablespacenum, tablespacecount, fn);
@@ -443,11 +608,6 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
strcpy(current_path, PQgetvalue(res, rownum, 1));
/*
- * Make sure we're unpacking into an empty directory
- */
- verify_dir_is_empty_or_create(current_path);
-
- /*
* Get the COPY data
*/
res = PQgetResult(conn);
@@ -540,10 +700,18 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
fn[strlen(fn) - 1] = '\0'; /* Remove trailing slash */
if (mkdir(fn, S_IRWXU) != 0)
{
- fprintf(stderr,
- _("%s: could not create directory \"%s\": %s\n"),
- progname, fn, strerror(errno));
- disconnect_and_exit(1);
+ /*
+ * When streaming WAL, pg_xlog will have been created
+ * by the wal receiver process, so just ignore failure
+ * on that.
+ */
+ if (!streamwal || strcmp(fn + strlen(fn) - 8, "/pg_xlog") != 0)
+ {
+ fprintf(stderr,
+ _("%s: could not create directory \"%s\": %s\n"),
+ progname, fn, strerror(errno));
+ disconnect_and_exit(1);
+ }
}
#ifndef WIN32
if (chmod(fn, (mode_t) filemode))
@@ -654,90 +822,6 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
}
-static PGconn *
-GetConnection(void)
-{
- PGconn *tmpconn;
- int argcount = 4; /* dbname, replication, fallback_app_name,
- * password */
- int i;
- const char **keywords;
- const char **values;
- char *password = NULL;
-
- if (dbhost)
- argcount++;
- if (dbuser)
- argcount++;
- if (dbport)
- argcount++;
-
- keywords = xmalloc0((argcount + 1) * sizeof(*keywords));
- values = xmalloc0((argcount + 1) * sizeof(*values));
-
- keywords[0] = "dbname";
- values[0] = "replication";
- keywords[1] = "replication";
- values[1] = "true";
- keywords[2] = "fallback_application_name";
- values[2] = progname;
- i = 3;
- if (dbhost)
- {
- keywords[i] = "host";
- values[i] = dbhost;
- i++;
- }
- if (dbuser)
- {
- keywords[i] = "user";
- values[i] = dbuser;
- i++;
- }
- if (dbport)
- {
- keywords[i] = "port";
- values[i] = dbport;
- i++;
- }
-
- while (true)
- {
- if (dbgetpassword == 1)
- {
- /* Prompt for a password */
- password = simple_prompt(_("Password: "), 100, false);
- keywords[argcount - 1] = "password";
- values[argcount - 1] = password;
- }
-
- tmpconn = PQconnectdbParams(keywords, values, true);
- if (password)
- free(password);
-
- if (PQstatus(tmpconn) == CONNECTION_BAD &&
- PQconnectionNeedsPassword(tmpconn) &&
- dbgetpassword != -1)
- {
- dbgetpassword = 1; /* ask for password next time */
- PQfinish(tmpconn);
- continue;
- }
-
- if (PQstatus(tmpconn) != CONNECTION_OK)
- {
- fprintf(stderr, _("%s: could not connect to server: %s\n"),
- progname, PQerrorMessage(tmpconn));
- exit(1);
- }
-
- /* Connection ok! */
- free(values);
- free(keywords);
- return tmpconn;
- }
-}
-
static void
BaseBackup()
{
@@ -780,7 +864,7 @@ BaseBackup()
snprintf(current_path, sizeof(current_path), "BASE_BACKUP LABEL '%s' %s %s %s %s",
escaped_label,
showprogress ? "PROGRESS" : "",
- includewal ? "WAL" : "",
+ includewal && !streamwal ? "WAL" : "",
fastcheckpoint ? "FAST" : "",
includewal ? "NOWAIT" : "");
@@ -859,6 +943,18 @@ BaseBackup()
}
/*
+ * If we're streaming WAL, start the streaming session before we start
+ * receiving the actual data chunks.
+ */
+ if (streamwal)
+ {
+ if (verbose)
+ fprintf(stderr, _("%s: starting background WAL receiver\n"),
+ progname);
+ StartLogStreamer(xlogstart, timeline);
+ }
+
+ /*
* Start receiving chunks
*/
for (i = 0; i < PQntuples(res); i++)
@@ -871,7 +967,7 @@ BaseBackup()
if (showprogress)
{
- progress_report(PQntuples(res), "");
+ progress_report(PQntuples(res), NULL);
fprintf(stderr, "\n"); /* Need to move to next line */
}
PQclear(res);
@@ -905,6 +1001,89 @@ BaseBackup()
disconnect_and_exit(1);
}
+ if (bgchild > 0)
+ {
+ int status;
+ int r;
+
+ if (verbose)
+ fprintf(stderr, _("%s: waiting for background process to finish streaming...\n"), progname);
+
+#ifndef WIN32
+ if (pipewrite(bgpipe[1], xlogend, strlen(xlogend)) != strlen(xlogend))
+ {
+ fprintf(stderr, _("%s: could not send command to background pipe: %s\n"),
+ progname, strerror(errno));
+ disconnect_and_exit(1);
+ }
+
+ /* Just wait for the background process to exit */
+ r = waitpid(bgchild, &status, 0);
+ if (r == -1)
+ {
+ fprintf(stderr, _("%s: could not wait for child process: %s\n"),
+ progname, strerror(errno));
+ disconnect_and_exit(1);
+ }
+ if (r != bgchild)
+ {
+ fprintf(stderr, _("%s: child %i died, expected %i\n"),
+ progname, r, bgchild);
+ disconnect_and_exit(1);
+ }
+ if (!WIFEXITED(status))
+ {
+ fprintf(stderr, _("%s: child process did not exit normally\n"),
+ progname);
+ disconnect_and_exit(1);
+ }
+ if (WEXITSTATUS(status) != 0)
+ {
+ fprintf(stderr, _("%s: child process exited with error %i\n"),
+ progname, WEXITSTATUS(status));
+ disconnect_and_exit(1);
+ }
+ /* Exited normally, we're happy! */
+#else /* WIN32 */
+
+ /*
+ * On Windows, since we are in the same thread, we can just store the
+ * value directly in the variable, and then set the flag that says
+ * it's there.
+ */
+ if (sscanf(xlogend, "%X/%X", &xlogendptr.xlogid, &xlogendptr.xrecoff) != 2)
+ {
+ fprintf(stderr, _("%s: could not parse xlog end position \"%s\"\n"),
+ progname, xlogend);
+ exit(1);
+ }
+ InterlockedIncrement(&has_xlogendptr);
+
+ /* First wait for the thread to exit */
+ if (WaitForSingleObjectEx((HANDLE) bgchild, INFINITE, FALSE) != WAIT_OBJECT_0)
+ {
+ _dosmaperr(GetLastError());
+ fprintf(stderr, _("%s: could not wait for child thread: %s\n"),
+ progname, strerror(errno));
+ disconnect_and_exit(1);
+ }
+ if (GetExitCodeThread((HANDLE) bgchild, &status) == 0)
+ {
+ _dosmaperr(GetLastError());
+ fprintf(stderr, _("%s: could not get child thread exit status: %s\n"),
+ progname, strerror(errno));
+ disconnect_and_exit(1);
+ }
+ if (status != 0)
+ {
+ fprintf(stderr, _("%s: child thread exited with error %u\n"),
+ progname, status);
+ disconnect_and_exit(1);
+ }
+ /* Exited normally, we're happy */
+#endif
+ }
+
/*
* End of copy data. Final result is already checked inside the loop.
*/
@@ -924,7 +1103,7 @@ main(int argc, char **argv)
{"pgdata", required_argument, NULL, 'D'},
{"format", required_argument, NULL, 'F'},
{"checkpoint", required_argument, NULL, 'c'},
- {"xlog", no_argument, NULL, 'x'},
+ {"xlog", required_argument, NULL, 'x'},
{"compress", required_argument, NULL, 'Z'},
{"label", required_argument, NULL, 'l'},
{"host", required_argument, NULL, 'h'},
@@ -958,7 +1137,7 @@ main(int argc, char **argv)
}
}
- while ((c = getopt_long(argc, argv, "D:F:l:Z:c:h:p:U:xwWvP",
+ while ((c = getopt_long(argc, argv, "D:F:l:Z:c:h:p:U:x:wWvP",
long_options, &option_index)) != -1)
{
switch (c)
@@ -980,6 +1159,18 @@ main(int argc, char **argv)
break;
case 'x':
includewal = true;
+ if (strcmp(optarg, "f") == 0 ||
+ strcmp(optarg, "fetch") == 0)
+ streamwal = false;
+ else if (strcmp(optarg, "s") == 0 ||
+ strcmp(optarg, "stream") == 0)
+ streamwal = true;
+ else
+ {
+ fprintf(stderr, _("%s: invalid xlog option \"%s\", must be empty or \"stream\"\n"),
+ progname, optarg);
+ exit(1);
+ }
break;
case 'l':
label = xstrdup(optarg);
@@ -1080,6 +1271,16 @@ main(int argc, char **argv)
exit(1);
}
+ if (format != 'p' && streamwal)
+ {
+ fprintf(stderr,
+ _("%s: wal streaming can only be used in plain mode\n"),
+ progname);
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+
#ifndef HAVE_LIBZ
if (compresslevel > 0)
{
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c
new file mode 100644
index 0000000..41b5bb7
--- /dev/null
+++ b/src/bin/pg_basebackup/pg_receivexlog.c
@@ -0,0 +1,407 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_receivexlog.c - receive streaming transaction log data and write it
+ * to a local file.
+ *
+ * Author: Magnus Hagander
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/pg_receivexlog.c
+ *-------------------------------------------------------------------------
+ */
+
+
+#include "postgres_fe.h"
+#include "libpq-fe.h"
+
+#include
+#include
+#include
+#include
+
+#include "getopt_long.h"
+
+#include "receivelog.h"
+#include "streamutil.h"
+
+
+/* Global options */
+char *basedir = NULL;
+int verbose = 0;
+
+
+static void usage(void);
+static XLogRecPtr FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline);
+static void StreamLog();
+static bool segment_callback(XLogRecPtr segendpos, uint32 timeline);
+
+/*
+ * XXX: from xlog_internal.h
+ */
+#define XLogSegsPerFile (((uint32) 0xffffffff) / XLOG_SEG_SIZE)
+#define PrevLogSeg(logId, logSeg) \
+ do { \
+ if (logSeg) \
+ (logSeg)--; \
+ else \
+ { \
+ (logId)--; \
+ (logSeg) = XLogSegsPerFile-1; \
+ } \
+ } while (0)
+
+
+static void
+usage(void)
+{
+ printf(_("%s receives PostgreSQL streaming transaction logs\n\n"),
+ progname);
+ printf(_("Usage:\n"));
+ printf(_(" %s [OPTION]...\n"), progname);
+ printf(_("\nOptions controlling the output:\n"));
+ printf(_(" -D, --dir=directory receive xlog files into this directory\n"));
+ printf(_("\nGeneral options:\n"));
+ printf(_(" -v, --verbose output verbose messages\n"));
+ printf(_(" -?, --help show this help, then exit\n"));
+ printf(_(" -V, --version output version information, then exit\n"));
+ printf(_("\nConnection options:\n"));
+ printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
+ printf(_(" -p, --port=PORT database server port number\n"));
+ printf(_(" -U, --username=NAME connect as specified database user\n"));
+ printf(_(" -w, --no-password never prompt for password\n"));
+ printf(_(" -W, --password force password prompt (should happen automatically)\n"));
+ printf(_("\nReport bugs to .\n"));
+}
+
+static bool
+segment_callback(XLogRecPtr segendpos, uint32 timeline)
+{
+ char fn[MAXPGPATH];
+ struct stat statbuf;
+
+ if (verbose)
+ fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"),
+ progname, segendpos.xlogid, segendpos.xrecoff, timeline);
+
+ /*
+ * Check if there is a partial file for the name we just finished, and if
+ * there is, remove it under the assumption that we have now got all the
+ * data we need.
+ */
+ PrevLogSeg(segendpos.xlogid, segendpos.xrecoff);
+ snprintf(fn, sizeof(fn), "%s/%08X%08X%08X.partial",
+ basedir, timeline,
+ segendpos.xlogid,
+ segendpos.xrecoff / XLOG_SEG_SIZE);
+ if (stat(fn, &statbuf) == 0)
+ {
+ /* File existed, get rid of it */
+ if (verbose)
+ fprintf(stderr, _("%s: removing file \"%s\"\n"),
+ progname, fn);
+ unlink(fn);
+ }
+
+ /* Never abort */
+ return false;
+}
+
+/*
+ * Determine starting location for streaming, based on:
+ * 1. If there are existing xlog segments, start at the end of the last one
+ * 2. If the last one is a partial segment, rename it and start over, since
+ * we don't sync after every write.
+ * 3. If no existing xlog exists, start from the beginning of the current
+ * WAL segment.
+ */
+static XLogRecPtr
+FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
+{
+ DIR *dir;
+ struct dirent *dirent;
+ int i;
+ bool b;
+ XLogRecPtr high = {0, 0};
+
+ dir = opendir(basedir);
+ if (dir == NULL)
+ {
+ fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"),
+ progname, basedir, strerror(errno));
+ disconnect_and_exit(1);
+ }
+
+ while ((dirent = readdir(dir)) != NULL)
+ {
+ char fullpath[MAXPGPATH];
+ struct stat statbuf;
+ uint32 tli,
+ log,
+ seg;
+
+ if (!strcmp(dirent->d_name, ".") || !strcmp(dirent->d_name, ".."))
+ continue;
+
+ /* xlog files are always 24 characters */
+ if (strlen(dirent->d_name) != 24)
+ continue;
+
+ /* Filenames are always made out of 0-9 and A-F */
+ b = false;
+ for (i = 0; i < 24; i++)
+ {
+ if (!(dirent->d_name[i] >= '0' && dirent->d_name[i] <= '9') &&
+ !(dirent->d_name[i] >= 'A' && dirent->d_name[i] <= 'F'))
+ {
+ b = true;
+ break;
+ }
+ }
+ if (b)
+ continue;
+
+ /*
+ * Looks like an xlog file. Parse it's position.
+ */
+ if (sscanf(dirent->d_name, "%08X%08X%08X", &tli, &log, &seg) != 3)
+ {
+ fprintf(stderr, _("%s: could not parse xlog filename \"%s\"\n"),
+ progname, dirent->d_name);
+ disconnect_and_exit(1);
+ }
+ log *= XLOG_SEG_SIZE;
+
+ /* Ignore any files that are for another timeline */
+ if (tli != currenttimeline)
+ continue;
+
+ /* Check if this is a completed segment or not */
+ snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
+ if (stat(fullpath, &statbuf) != 0)
+ {
+ fprintf(stderr, _("%s: could not stat file \"%s\": %s\n"),
+ progname, fullpath, strerror(errno));
+ disconnect_and_exit(1);
+ }
+
+ if (statbuf.st_size == 16 * 1024 * 1024)
+ {
+ /* Completed segment */
+ if (log > high.xlogid ||
+ (log == high.xlogid && seg > high.xrecoff))
+ {
+ high.xlogid = log;
+ high.xrecoff = seg;
+ continue;
+ }
+ }
+ else
+ {
+ /*
+ * This is a partial file. Rename it out of the way.
+ */
+ char newfn[MAXPGPATH];
+
+ fprintf(stderr, _("%s: renaming partial file \"%s\" to \"%s.partial\"\n"),
+ progname, dirent->d_name, dirent->d_name);
+
+ snprintf(newfn, sizeof(newfn), "%s/%s.partial",
+ basedir, dirent->d_name);
+
+ if (stat(newfn, &statbuf) == 0)
+ {
+ fprintf(stderr, _("%s: file \"%s\" already exists. Check and clean up manually.\n"),
+ progname, newfn);
+ disconnect_and_exit(1);
+ }
+ if (rename(fullpath, newfn) != 0)
+ {
+ fprintf(stderr, _("%s: could not rename \"%s\" to \"%s\": %s\n"),
+ progname, fullpath, newfn, strerror(errno));
+ disconnect_and_exit(1);
+ }
+
+ /* Don't continue looking for more, we assume this is the last */
+ break;
+ }
+ }
+
+ closedir(dir);
+
+ if (high.xlogid > 0 && high.xrecoff > 0)
+ return high;
+
+ return currentpos;
+}
+
+/*
+ * Start the log streaming
+ */
+static void
+StreamLog(void)
+{
+ PGresult *res;
+ uint32 timeline;
+ XLogRecPtr startpos;
+
+ /*
+ * Connect in replication mode to the server
+ */
+ conn = GetConnection();
+
+ /*
+ * Run IDENFITY_SYSTEM so we can get the timeline and current xlog
+ * position.
+ */
+ res = PQexec(conn, "IDENTIFY_SYSTEM");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ fprintf(stderr, _("%s: could not identify system: %s\n"),
+ progname, PQerrorMessage(conn));
+ disconnect_and_exit(1);
+ }
+ if (PQntuples(res) != 1)
+ {
+ fprintf(stderr, _("%s: could not identify system, got %i rows\n"),
+ progname, PQntuples(res));
+ disconnect_and_exit(1);
+ }
+ timeline = atoi(PQgetvalue(res, 0, 1));
+ if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &startpos.xlogid, &startpos.xrecoff) != 2)
+ {
+ fprintf(stderr, _("%s: could not parse log start position from value \"%s\"\n"),
+ progname, PQgetvalue(res, 0, 2));
+ disconnect_and_exit(1);
+ }
+ PQclear(res);
+
+ /*
+ * Figure out where to start streaming.
+ */
+ startpos = FindStreamingStart(startpos, timeline);
+
+ /*
+ * Always start streaming at the beginning of a segment
+ */
+ startpos.xrecoff -= startpos.xrecoff % XLOG_SEG_SIZE;
+
+ /*
+ * Start the replication
+ */
+ if (verbose)
+ fprintf(stderr, _("%s: starting log streaming at %X/%X (timeline %u)\n"),
+ progname, startpos.xlogid, startpos.xrecoff, timeline);
+
+ ReceiveXlogStream(conn, startpos, timeline, basedir, segment_callback);
+}
+
+int
+main(int argc, char **argv)
+{
+ static struct option long_options[] = {
+ {"help", no_argument, NULL, '?'},
+ {"version", no_argument, NULL, 'V'},
+ {"dir", required_argument, NULL, 'D'},
+ {"host", required_argument, NULL, 'h'},
+ {"port", required_argument, NULL, 'p'},
+ {"username", required_argument, NULL, 'U'},
+ {"no-password", no_argument, NULL, 'w'},
+ {"password", no_argument, NULL, 'W'},
+ {"verbose", no_argument, NULL, 'v'},
+ {NULL, 0, NULL, 0}
+ };
+ int c;
+
+ int option_index;
+
+ progname = get_progname(argv[0]);
+ set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_receivexlog"));
+
+ if (argc > 1)
+ {
+ if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
+ {
+ usage();
+ exit(0);
+ }
+ else if (strcmp(argv[1], "-V") == 0
+ || strcmp(argv[1], "--version") == 0)
+ {
+ puts("pg_receivexlog (PostgreSQL) " PG_VERSION);
+ exit(0);
+ }
+ }
+
+ while ((c = getopt_long(argc, argv, "D:h:p:U:wWv",
+ long_options, &option_index)) != -1)
+ {
+ switch (c)
+ {
+ case 'D':
+ basedir = xstrdup(optarg);
+ break;
+ case 'h':
+ dbhost = xstrdup(optarg);
+ break;
+ case 'p':
+ if (atoi(optarg) <= 0)
+ {
+ fprintf(stderr, _("%s: invalid port number \"%s\"\n"),
+ progname, optarg);
+ exit(1);
+ }
+ dbport = xstrdup(optarg);
+ break;
+ case 'U':
+ dbuser = xstrdup(optarg);
+ break;
+ case 'w':
+ dbgetpassword = -1;
+ break;
+ case 'W':
+ dbgetpassword = 1;
+ break;
+ case 'v':
+ verbose++;
+ break;
+ default:
+
+ /*
+ * getopt_long already emitted a complaint
+ */
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+ }
+
+ /*
+ * Any non-option arguments?
+ */
+ if (optind < argc)
+ {
+ fprintf(stderr,
+ _("%s: too many command-line arguments (first is \"%s\")\n"),
+ progname, argv[optind]);
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+
+ /*
+ * Required arguments
+ */
+ if (basedir == NULL)
+ {
+ fprintf(stderr, _("%s: no target directory specified\n"), progname);
+ fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+ progname);
+ exit(1);
+ }
+
+ StreamLog();
+
+ exit(0);
+}
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
new file mode 100644
index 0000000..3be9692
--- /dev/null
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -0,0 +1,207 @@
+/*-------------------------------------------------------------------------
+ *
+ * receivelog.c - receive transaction log files using the streaming
+ * replication protocol.
+ *
+ * Author: Magnus Hagander
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/receivelog.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include "libpq-fe.h"
+
+#include
+#include
+
+#include "receivelog.h"
+#include "streamutil.h"
+
+/* XXX: from xlog_internal.h */
+#define MAXFNAMELEN 64
+#define XLogFileName(fname, tli, log, seg) \
+ snprintf(fname, MAXFNAMELEN, "%08X%08X%08X", tli, log, seg)
+
+/* Size of the streaming replication protocol header */
+#define STREAMING_HEADER_SIZE (1+8+8+8)
+
+/*
+ * Open a new WAL file in the specified directory. Store the name
+ * (not including the full directory) in namebuf. Assumes there is
+ * enough room in this buffer...
+ */
+static int
+open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebuf)
+{
+ int f;
+ char fn[MAXPGPATH];
+
+ XLogFileName(namebuf, timeline, startpoint.xlogid,
+ startpoint.xrecoff / XLOG_SEG_SIZE);
+
+ snprintf(fn, sizeof(fn), "%s/%s", basedir, namebuf);
+ f = open(fn, O_WRONLY | O_CREAT | O_EXCL, 0666);
+ if (f == -1)
+ fprintf(stderr, _("%s: Could not open WAL segment %s: %s\n"),
+ progname, namebuf, strerror(errno));
+ return f;
+}
+
+/*
+ * Receive a log stream starting at the specified position.
+ *
+ * Note: The log position *must* be at a log segment change, or we will
+ * end up streaming an incomplete file.
+ */
+bool
+ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, segment_finish_callback segment_finish)
+{
+ char query[128];
+ char current_walfile_name[MAXPGPATH];
+ PGresult *res;
+ char *copybuf = NULL;
+ int walfile = -1;
+
+ /* Initiate the replication stream at specified location */
+ snprintf(query, sizeof(query), "START_REPLICATION %X/%X", startpos.xlogid, startpos.xrecoff);
+ res = PQexec(conn, query);
+ if (PQresultStatus(res) != PGRES_COPY_BOTH)
+ {
+ fprintf(stderr, _("%s: could not start replication: %s\n"),
+ progname, PQresultErrorMessage(res));
+ return false;
+ }
+ PQclear(res);
+
+ /*
+ * Receive the actual xlog data
+ */
+ while (1)
+ {
+ XLogRecPtr blockstart;
+ int r;
+ int xlogoff;
+
+ if (copybuf != NULL)
+ {
+ PQfreemem(copybuf);
+ copybuf = NULL;
+ }
+
+ r = PQgetCopyData(conn, ©buf, 0);
+ if (r == -1)
+ /* End of copy stream */
+ break;
+ if (r == -2)
+ {
+ fprintf(stderr, _("%s: could not read copy data: %s\n"),
+ progname, PQerrorMessage(conn));
+ return false;
+ }
+ if (r < STREAMING_HEADER_SIZE + 1)
+ {
+ fprintf(stderr, _("%s: streaming header too small: %i\n"),
+ progname, r);
+ return false;
+ }
+ if (copybuf[0] != 'w')
+ {
+ fprintf(stderr, _("%s: streaming header corrupt: \"%c\"\n"),
+ progname, copybuf[0]);
+ return false;
+ }
+
+ /* Extract WAL location for this block */
+ memcpy(&blockstart, copybuf + 1, 8);
+
+ xlogoff = blockstart.xrecoff % XLOG_SEG_SIZE;
+
+ if (walfile == -1)
+ {
+ /* No file open yet */
+ if (xlogoff != 0)
+ {
+ fprintf(stderr, _("%s: received xlog record for offset %u with no file open\n"),
+ progname, xlogoff);
+ return false;
+ }
+ walfile = open_walfile(blockstart, timeline,
+ basedir, current_walfile_name);
+ if (walfile == -1)
+ return false;
+ }
+ else
+ {
+ /* More data in existing segment */
+ /* XXX: store seek value don't reseek all the time */
+ if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
+ {
+ fprintf(stderr, _("%s: got WAL data offset %i, expected %i\n"),
+ progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
+ return false;
+ }
+ /* Position matches, write happens lower down */
+ }
+
+ /* We have a file open in the correct position */
+ if (write(walfile, copybuf + STREAMING_HEADER_SIZE,
+ r - STREAMING_HEADER_SIZE) != r - STREAMING_HEADER_SIZE)
+ {
+ fprintf(stderr, _("%s: could not write %u bytes to WAL file %s: %s\n"),
+ progname,
+ r - STREAMING_HEADER_SIZE,
+ current_walfile_name,
+ strerror(errno));
+ return false;
+ }
+
+ /* XXX: callback after each write */
+
+ /* Check if we are at the end of a segment */
+ if (lseek(walfile, 0, SEEK_CUR) == XLOG_SEG_SIZE)
+ {
+ /* Offset zero in new file, close and sync the old one */
+ fsync(walfile);
+ close(walfile);
+ walfile = -1;
+
+ if (segment_finish != NULL)
+ {
+ /*
+ * Callback when the segment finished, and return if it told
+ * us to.
+ *
+ * A block in the wal stream can never cross a segment
+ * boundary, so we can safely just add the current block size
+ * to the offset, so the xlog pointer points to what we have
+ * actually written.
+ */
+ blockstart.xrecoff += r - STREAMING_HEADER_SIZE;
+ if (segment_finish(blockstart, timeline))
+ return true;
+ }
+ }
+ }
+
+ /*
+ * The only way to get out of the loop is if the server shut down the
+ * replication stream. If it's a controlled shutdown, the server will send
+ * a shutdown message, and we'll return the latest xlog location that has
+ * been streamed.
+ */
+
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ fprintf(stderr, _("%s: unexpected termination of replication stream: %s\n"),
+ progname, PQresultErrorMessage(res));
+ return false;
+ }
+ PQclear(res);
+ return true;
+}
diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h
new file mode 100644
index 0000000..ae34dd6
--- /dev/null
+++ b/src/bin/pg_basebackup/receivelog.h
@@ -0,0 +1,13 @@
+#include "access/xlogdefs.h"
+
+/*
+ * Called whenever a segment is finished, return true to stop
+ * the streaming at this point.
+ */
+typedef bool (*segment_finish_callback)(XLogRecPtr segendpos, uint32 timeline);
+
+bool ReceiveXlogStream(PGconn *conn,
+ XLogRecPtr startpos,
+ uint32 timeline,
+ char *basedir,
+ segment_finish_callback segment_finish);
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
new file mode 100644
index 0000000..9f5c36f
--- /dev/null
+++ b/src/bin/pg_basebackup/streamutil.c
@@ -0,0 +1,160 @@
+/*-------------------------------------------------------------------------
+ *
+ * streamutil.c - utility functions for pg_basebackup and pg_receivelog
+ *
+ * Author: Magnus Hagander
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/bin/pg_basebackup/streamutil.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include
+#include
+
+#include "streamutil.h"
+
+const char *progname;
+char *dbhost = NULL;
+char *dbuser = NULL;
+char *dbport = NULL;
+int dbgetpassword = 0; /* 0=auto, -1=never, 1=always */
+static char *dbpassword = NULL;
+PGconn *conn = NULL;
+
+/*
+ * strdup() and malloc() replacements that prints an error and exits
+ * if something goes wrong. Can never return NULL.
+ */
+char *
+xstrdup(const char *s)
+{
+ char *result;
+
+ result = strdup(s);
+ if (!result)
+ {
+ fprintf(stderr, _("%s: out of memory\n"), progname);
+ exit(1);
+ }
+ return result;
+}
+
+void *
+xmalloc0(int size)
+{
+ void *result;
+
+ result = malloc(size);
+ if (!result)
+ {
+ fprintf(stderr, _("%s: out of memory\n"), progname);
+ exit(1);
+ }
+ MemSet(result, 0, size);
+ return result;
+}
+
+
+PGconn *
+GetConnection(void)
+{
+ PGconn *tmpconn;
+ int argcount = 4; /* dbname, replication, fallback_app_name,
+ * password */
+ int i;
+ const char **keywords;
+ const char **values;
+ char *password = NULL;
+
+ if (dbhost)
+ argcount++;
+ if (dbuser)
+ argcount++;
+ if (dbport)
+ argcount++;
+
+ keywords = xmalloc0((argcount + 1) * sizeof(*keywords));
+ values = xmalloc0((argcount + 1) * sizeof(*values));
+
+ keywords[0] = "dbname";
+ values[0] = "replication";
+ keywords[1] = "replication";
+ values[1] = "true";
+ keywords[2] = "fallback_application_name";
+ values[2] = progname;
+ i = 3;
+ if (dbhost)
+ {
+ keywords[i] = "host";
+ values[i] = dbhost;
+ i++;
+ }
+ if (dbuser)
+ {
+ keywords[i] = "user";
+ values[i] = dbuser;
+ i++;
+ }
+ if (dbport)
+ {
+ keywords[i] = "port";
+ values[i] = dbport;
+ i++;
+ }
+
+ while (true)
+ {
+ if (password)
+ free(password);
+
+ if (dbpassword)
+ {
+ /*
+ * We've saved a password when a previous connection succeeded,
+ * meaning this is the call for a second session to the same
+ * database, so just forcibly reuse that password.
+ */
+ keywords[argcount - 1] = "password";
+ values[argcount - 1] = dbpassword;
+ dbgetpassword = -1; /* Don't try again if this fails */
+ }
+ else if (dbgetpassword == 1)
+ {
+ password = simple_prompt(_("Password: "), 100, false);
+ keywords[argcount - 1] = "password";
+ values[argcount - 1] = password;
+ }
+
+ tmpconn = PQconnectdbParams(keywords, values, true);
+
+ if (PQstatus(tmpconn) == CONNECTION_BAD &&
+ PQconnectionNeedsPassword(tmpconn) &&
+ dbgetpassword != -1)
+ {
+ dbgetpassword = 1; /* ask for password next time */
+ PQfinish(tmpconn);
+ continue;
+ }
+
+ if (PQstatus(tmpconn) != CONNECTION_OK)
+ {
+ fprintf(stderr, _("%s: could not connect to server: %s\n"),
+ progname, PQerrorMessage(tmpconn));
+ exit(1);
+ }
+
+ /* Connection ok! */
+ free(values);
+ free(keywords);
+
+ /* Store the password for next run */
+ if (password)
+ dbpassword = password;
+ return tmpconn;
+ }
+}
diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h
new file mode 100644
index 0000000..cef529a
--- /dev/null
+++ b/src/bin/pg_basebackup/streamutil.h
@@ -0,0 +1,23 @@
+#include "access/xlogdefs.h"
+#include "libpq-fe.h"
+
+extern const char *progname;
+extern char *dbhost;
+extern char *dbuser;
+extern char *dbport;
+extern int dbgetpassword;
+
+/* Connection kept global so we can disconnect easily */
+extern PGconn *conn;
+
+#define disconnect_and_exit(code) \
+ { \
+ if (conn != NULL) PQfinish(conn); \
+ exit(code); \
+ }
+
+
+char *xstrdup(const char *s);
+void *xmalloc0(int size);
+
+PGconn *GetConnection(void);
diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm
index 21c11df..1190ef4 100644
--- a/src/tools/msvc/Mkvcbuild.pm
+++ b/src/tools/msvc/Mkvcbuild.pm
@@ -277,6 +277,11 @@ sub mkvcbuild
$initdb->AddLibrary('ws2_32.lib');
my $pgbasebackup = AddSimpleFrontend('pg_basebackup', 1);
+ $pgbasebackup->AddFile('src\bin\pg_basebackup\pg_basebackup.c');
+
+ my $pgreceivexlog = AddSimpleFrontend('pg_basebackup', 1);
+ $pgreceivexlog->{name} = 'pg_receivexlog';
+ $pgreceivexlog->AddFile('src\bin\pg_basebackup\pg_receivexlog.c');
my $pgconfig = AddSimpleFrontend('pg_config');