[PATCH 6/6] pg_basebackup: add a single-tar output format. - Mailing list pgsql-hackers

From Joshua Elsasser
Subject [PATCH 6/6] pg_basebackup: add a single-tar output format.
Date
Msg-id 1443564988-17928-7-git-send-email-josh@idealist.org
Whole thread Raw
In response to Re: Add pg_basebackup single tar output format  (Joshua Elsasser <josh@idealist.org>)
List pgsql-hackers
This will write one single tar file containing all tablespaces, and
can be written to stdout.
---src/bin/pg_basebackup/pg_basebackup.c | 282 ++++++++++++++++++++++++++++++++--1 file changed, 269 insertions(+), 13
deletions(-)

diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index e29e466..b3534cb 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -29,6 +29,7 @@#include "getopt_long.h"#include "libpq-fe.h"#include "pqexpbuffer.h"
+#include "common/fe_memutils.h"#include "pgtar.h"#include "pgtime.h"#include "receivelog.h"
@@ -71,7 +72,9 @@ typedef struct TarParser {/* Global options */static char *basedir = NULL;
+static char *outpath = NULL;static TablespaceList tablespace_dirs = {NULL, NULL};
+static char *default_basetablespace = NULL;static char *xlog_dir = "";static char format = 'p';        /*
p(lain)/t(ar)*/static char *label = "pg_basebackup base backup";
 
@@ -117,15 +120,19 @@ static void usage(void);static void disconnect_and_exit(int code);static void
verify_dir_is_empty_or_create(char*dirname);static void progress_report(int tablespacenum, const char *filename, bool
force);
+static char *get_base_tablespace_path(void);static void OpenTarFile(TarStream *tarfile, const char *path);static void
CloseTarFile(TarStream*tarfile);
 
-static void TarInsertRecoveryConf(TarStream *stream);
+static void TarInsertRecoveryConf(TarStream *stream, const char *prefix);
+static void TarInsertDirectory(TarStream *stream, const char *path, mode_t mode);static void
IterateAndWriteTar(TarParser*tp, char *inbuf, int buflen,                               bool (*callback)(char *, void
*),void *cbarg);static bool TarIterSkipRecoveryConf(char *h, void *arg);
 
+static bool TarIterRenameForTablespace(char *h, void *arg);static void ReceiveTarFile(PGconn *conn, PGresult *res, int
rownum);
+static void ReceiveAndAppendTarFile(TarStream *tarfile, PGconn *conn, PGresult *res, int rownum);static void
ReceiveAndUnpackTarFile(PGconn*conn, PGresult *res, int rownum);static void GenerateRecoveryConf(PGconn *conn);static
voidWriteRecoveryConf(void);
 
@@ -259,7 +266,7 @@ usage(void)    printf(_("  %s [OPTION]...\n"), progname);    printf(_("\nOptions controlling the
output:\n"));   printf(_("  -D, --pgdata=DIRECTORY receive base backup into directory\n"));
 
-    printf(_("  -F, --format=p|t       output format (plain (default), tar)\n"));
+    printf(_("  -F, --format=p|t|s     output format (plain (default), tar, single-tar)\n"));    printf(_("  -r,
--max-rate=RATE   maximum transfer rate to transfer data directory\n"      "                         (in kB/s, or use
suffix\"k\" or \"M\")\n"));    printf(_("  -R, --write-recovery-conf\n"
 
@@ -746,6 +753,39 @@ parse_max_rate(char *src)    return (int32) result;}
+
+/*
+ * Returns the path of the server's data directory. The returned string is
+ * malloc'd.
+ */
+static char *
+get_base_tablespace_path(void)
+{
+    PGconn *sqlconn;
+    PGresult *res;
+    char *dir;
+
+    sqlconn = GetConnection(false);
+    if (!sqlconn)
+        /* Error message already written in GetConnection() */
+        disconnect_and_exit(1);
+
+    res = PQexec(sqlconn, "SELECT setting FROM pg_catalog.pg_settings "
+                 "WHERE name = 'data_directory';");
+    if (PQresultStatus(res) != PGRES_TUPLES_OK)
+    {
+        fprintf(stderr, _("%s: could not get server data_directory: %s"),
+                progname, PQerrorMessage(sqlconn));
+        disconnect_and_exit(1);
+    }
+
+    dir = pg_strdup(PQgetvalue(res, 0, 0));
+    PQclear(res);
+    PQfinish(sqlconn);
+    return dir;
+}
+
+/* * Write a piece of tar data */
@@ -891,7 +931,7 @@ CloseTarFile(TarStream *tarfile) * Write a recovery.conf file into the tar stream. */static void
-TarInsertRecoveryConf(TarStream *stream)
+TarInsertRecoveryConf(TarStream *stream, const char *prefix){    static char    zerobuf[512];    char
header[512];
@@ -901,6 +941,8 @@ TarInsertRecoveryConf(TarStream *stream)                    recoveryconfcontents->len,
     0600, 04000, 02000,                    time(NULL));
 
+    if (prefix != NULL)
+        TarIterRenameForTablespace(header, (void *)prefix);    padding = ((recoveryconfcontents->len + 511) & ~511) -
recoveryconfcontents->len;
@@ -912,6 +954,29 @@ TarInsertRecoveryConf(TarStream *stream)/*
+ * Write a directory into the tar stream.
+ */
+static void
+TarInsertDirectory(TarStream *stream, const char *path, mode_t mode)
+{
+    char       hdr[512];
+
+    /* set file type to directory */
+    mode = (mode & 07777) | 040000;
+
+    if (tarCreateHeader(hdr, path, NULL, 0,
+                        mode, 04000, 02000, time(NULL)) != TAR_OK)
+    {
+        fprintf(stderr, _("%s: filename too long for tar format: %s"),
+                progname, path);
+        disconnect_and_exit(1);
+    }
+
+    writeTarData(stream, hdr, 512);
+}
+
+
+/* * Process the individual files inside the TAR stream and pass their headers * to a callback which can modify or
choseto skip them. The stream consists * of a header and zero or more chunks, all 512 bytes long. The stream from
 
@@ -1026,6 +1091,52 @@ TarIterSkipRecoveryConf(char *h, void *arg)/*
+ * Adjusts the filename in the tar header pointed to by the first argument to
+ * place it under the directory in the second argument.
+ *
+ * Intended for use as a callback for IterateAndWriteTar() when iterating over
+ * a non-default tablespace's files. The callback argument should be the base
+ * tablespace directory.
+ */
+static bool
+TarIterRenameForTablespace(char *h, void *arg)
+{
+    const char *prefix = arg;
+    char        full[MAXPGPATH];
+    size_t        len;
+
+    /*
+     * Build a new path out of the tablespace prefix and the existing name in
+     * the header.
+     */
+    strlcpy(full, prefix, sizeof(full));
+    strlcat(full, "/", sizeof(full));
+    len = strlen(full);
+    if (tarHeaderGetName(h, &full[len], sizeof(full) - len) >= sizeof(full) - len)
+    {
+        tarHeaderGetName(h, full, sizeof(full));
+        fprintf(stderr, _("%s: filename too long: %s/%s"),
+                progname, prefix, full);
+        disconnect_and_exit(1);
+    }
+
+    /* clean up the path to remove /./ and such */
+    canonicalize_path(full);
+
+    /* Store the new name in the tar header */
+    if (tarHeaderRename(h, full) != TAR_OK)
+    {
+        fprintf(stderr, _("%s: filename too long for tar format: %s"),
+                progname, full);
+        disconnect_and_exit(1);
+    }
+
+    /* Never skip any files */
+    return false;
+}
+
+
+/* * Open a (possibly zlib-compressed) tar file for writing. The filebase * argument should be the desired filename
relativeto basedir, without a .tar * or .tar.gz file extension. If the user specified a basedir of - then stdout
 
@@ -1110,7 +1221,7 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)             * file, as required by some
tarprograms.             */            if (basetablespace && writerecoveryconf)
 
-                TarInsertRecoveryConf(&stream);
+                TarInsertRecoveryConf(&stream, NULL);            CloseTarFile(&stream);            break;
@@ -1151,6 +1262,108 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)}
+static bool
+tablespaceRenameAndSkipRecoveryConf(char *h, void *arg)
+{
+    if (TarIterSkipRecoveryConf(h, NULL))
+        return true;
+    TarIterRenameForTablespace(h, arg);
+    return false;
+}
+
+
+/*
+ * Receive a tar format file from the connection to the server, and write
+ * the data from this file directly into a tar file. If compression is
+ * enabled, the data will be compressed while written to the file.
+ *
+ * The file must have already been opened by the caller.
+ *
+ * No attempt to inspect or validate the contents of the file is done.
+ */
+static void
+ReceiveAndAppendTarFile(TarStream *stream, PGconn *conn, PGresult *res, int rownum)
+{
+    const char *prefix;
+    char       *copybuf = NULL;
+    int            buflen;
+    bool        basetablespace = PQgetisnull(res, rownum, 0);
+    TarParser    parser;
+
+    MemSet(&parser, 0, sizeof(parser));
+    parser.stream = stream;
+    parser.in_tarhdr = true;
+
+    /* Get the directory prefix for the tablespace */
+    prefix = get_tablespace_mapping(basetablespace ? default_basetablespace :
+                                    PQgetvalue(res, rownum, 1));
+
+    if (!basetablespace)
+        TarInsertDirectory(stream, prefix, 0700);
+
+    /*
+     * Get the COPY data stream
+     */
+    res = PQgetResult(conn);
+    if (PQresultStatus(res) != PGRES_COPY_OUT)
+    {
+        fprintf(stderr, _("%s: could not get COPY data stream: %s"),
+                progname, PQerrorMessage(conn));
+        disconnect_and_exit(1);
+    }
+
+    while ((buflen = PQgetCopyData(conn, ©buf, 0)) >= 0)
+    {
+        if (basetablespace && writerecoveryconf)
+        {
+            /*
+             * Look for a recovery.conf in the existing tar stream. If it's
+             * there, we must skip it so we can later overwrite it with our
+             * own version of the file.
+             */
+            IterateAndWriteTar(&parser, copybuf, buflen,
+                               tablespaceRenameAndSkipRecoveryConf, (void *)prefix);
+        }
+        else
+        {
+            /*
+             * Rename files in non-base tablespaces into the correct
+             * tablespace directory.
+             */
+            IterateAndWriteTar(&parser, copybuf, buflen,
+                               TarIterRenameForTablespace, (void *)prefix);
+        }
+
+        if (copybuf != NULL)
+        {
+            PQfreemem(copybuf);
+            copybuf = NULL;
+        }
+
+        totaldone += buflen;
+        progress_report(rownum, stream->path, false);
+    }                            /* while (1) */
+
+    if (buflen == -1)
+    {
+        /*
+         * End of chunk. If requested, and this is the base tablespace,
+         * write recovery.conf into the tarfile.
+         */
+        if (basetablespace && writerecoveryconf)
+            TarInsertRecoveryConf(stream, prefix);
+    }
+    else if (buflen == -2)
+    {
+        fprintf(stderr, _("%s: could not read COPY data: %s"),
+                progname, PQerrorMessage(conn));
+        disconnect_and_exit(1);
+    }
+
+    progress_report(rownum, stream->path, true);
+}
+
+/* * Retrieve tablespace path, either relocated or original depending on whether * -T was passed or not.
@@ -1648,6 +1861,7 @@ BaseBackup(void)    int            minServerMajor,                maxServerMajor;    int
 serverMajor;
 
+    TarStream    tarfile;    /*     * Connect in replication mode to the server
@@ -1683,6 +1897,9 @@ BaseBackup(void)        disconnect_and_exit(1);    }
+    if (format == 's')
+        default_basetablespace = get_base_tablespace_path();
+    /*     * Build contents of recovery.conf if requested     */
@@ -1711,7 +1928,7 @@ BaseBackup(void)                 fastcheckpoint ? "FAST" : "",                 includewal ?
"NOWAIT": "",                 maxrate_clause ? maxrate_clause : "",
 
-                 format == 't' ? "TABLESPACE_MAP" : "");
+                 (format == 't' || format == 's') ? "TABLESPACE_MAP" : "");    if (PQsendQuery(conn, basebkp) == 0)
{
@@ -1802,6 +2019,9 @@ BaseBackup(void)        fprintf(stderr,                _("%s: can only write single tablespace to
stdout,database has %d\n"),                progname, PQntuples(res));
 
+        fprintf(stderr,
+                _("HINT: the -F single-tar option always writes a single tar file which may\n"
+                  "be written to stdout.\n"));        disconnect_and_exit(1);    }
@@ -1817,6 +2037,9 @@ BaseBackup(void)        StartLogStreamer(xlogstart, starttli, sysidentifier);    }
+    if (format == 's')
+        OpenTarFile(&tarfile, outpath);
+    /*     * Start receiving chunks     */
@@ -1824,10 +2047,15 @@ BaseBackup(void)    {        if (format == 't')            ReceiveTarFile(conn, res, i);
+        else if (format == 's')
+            ReceiveAndAppendTarFile(&tarfile, conn, res, i);        else            ReceiveAndUnpackTarFile(conn, res,
i);   }                            /* Loop over all tablespaces */
 
+    if (format == 's')
+        CloseTarFile(&tarfile);
+    if (showprogress)    {        progress_report(PQntuples(res), NULL, true);
@@ -1982,6 +2210,7 @@ main(int argc, char **argv)        {"help", no_argument, NULL, '?'},        {"version",
no_argument,NULL, 'V'},        {"pgdata", required_argument, NULL, 'D'},
 
+        {"file", required_argument, NULL, 'f'},        {"format", required_argument, NULL, 'F'},        {"checkpoint",
required_argument,NULL, 'c'},        {"max-rate", required_argument, NULL, 'r'},
 
@@ -2027,7 +2256,7 @@ main(int argc, char **argv)        }    }
-    while ((c = getopt_long(argc, argv, "D:F:r:RT:xX:l:zZ:d:c:h:p:U:s:S:wWvP",
+    while ((c = getopt_long(argc, argv, "D:f:F:r:RT:xX:l:zZ:d:c:h:p:U:s:S:wWvP",
long_options,&option_index)) != -1)    {        switch (c)
 
@@ -2035,15 +2264,20 @@ main(int argc, char **argv)            case 'D':                basedir = pg_strdup(optarg);
           break;
 
+            case 'f':
+                outpath = pg_strdup(optarg);
+                break;            case 'F':                if (strcmp(optarg, "p") == 0 || strcmp(optarg, "plain") ==
0)                   format = 'p';                else if (strcmp(optarg, "t") == 0 || strcmp(optarg, "tar") == 0)
             format = 't';
 
+                else if (strcmp(optarg, "s") == 0 || strcmp(optarg, "single-tar") == 0)
+                    format = 's';                else                {                    fprintf(stderr,
-                            _("%s: invalid output format \"%s\", must be \"plain\" or \"tar\"\n"),
+                            _("%s: invalid output format \"%s\", must be \"plain\", \"tar\" or \"single-tar\"\n"),
                      progname, optarg);                    exit(1);                }
 
@@ -2190,12 +2424,34 @@ main(int argc, char **argv)    /*     * Required arguments     */
-    if (basedir == NULL)
+    if (format == 's')    {
-        fprintf(stderr, _("%s: no target directory specified\n"), progname);
-        fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
-                progname);
-        exit(1);
+        if (basedir != NULL)
+        {
+            fprintf(stderr, _("%s: target directory cannot be specified in single-tar mode\n"), progname);
+            fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+                    progname);
+            exit(1);
+        }
+        if (outpath == NULL)
+            outpath = pg_strdup("-");
+    }
+    else
+    {
+        if (basedir == NULL)
+        {
+            fprintf(stderr, _("%s: no target directory specified\n"), progname);
+            fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+                    progname);
+            exit(1);
+        }
+        if (outpath != NULL)
+        {
+            fprintf(stderr, _("%s: target file can only be specified in single-tar mode\n"), progname);
+            fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+                    progname);
+            exit(1);
+        }    }    /*
@@ -2270,7 +2526,7 @@ main(int argc, char **argv)     * backups, always require the directory. For tar backups, require
it    * unless we are writing to stdout.     */
 
-    if (format == 'p' || strcmp(basedir, "-") != 0)
+    if (format == 'p' || (basedir != NULL && strcmp(basedir, "-")) != 0)
verify_dir_is_empty_or_create(basedir);   /* Create transaction log symlink, if required */
 
-- 
2.3.0




pgsql-hackers by date:

Previous
From: David Rowley
Date:
Subject: Re: PATCH: use foreign keys to improve join estimates v1
Next
From: Tom Lane
Date:
Subject: Re: Idea for improving buildfarm robustness