From 16b77550d4e4e185b6bb45176301212db0edb09b Mon Sep 17 00:00:00 2001 From: Asif Rehman Date: Mon, 14 Oct 2019 17:28:58 +0500 Subject: [PATCH 5/6] pg_basebackup changes for parallel backup. --- src/bin/pg_basebackup/pg_basebackup.c | 710 ++++++++++++++++++++++++-- 1 file changed, 672 insertions(+), 38 deletions(-) diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index a9d162a7da..9dd7c62933 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -19,6 +19,7 @@ #include #include #include +#include #ifdef HAVE_SYS_SELECT_H #include #endif @@ -41,6 +42,7 @@ #include "receivelog.h" #include "replication/basebackup.h" #include "streamutil.h" +#include "fe_utils/simple_list.h" #define ERRCODE_DATA_CORRUPTED "XX001" @@ -57,6 +59,57 @@ typedef struct TablespaceList TablespaceListCell *tail; } TablespaceList; +typedef struct +{ + char path[MAXPGPATH]; + char type; + int32 size; + time_t mtime; + + int tsIndex; /* index of tsInfo this file belongs to. */ +} BackupFile; + +typedef struct +{ + Oid tblspcOid; + char *tablespace; /* tablespace name or NULL if 'base' tablespace */ + int numFiles; /* number of files */ + BackupFile *backupFiles; /* list of files in a tablespace */ +} TablespaceInfo; + +typedef struct +{ + int tablespacecount; + int totalfiles; + int numWorkers; + + char xlogstart[64]; + char *backup_label; + char *tablespace_map; + + TablespaceInfo *tsInfo; + BackupFile **files; /* list of BackupFile pointers */ + int fileIndex; /* index of file to be fetched */ + + PGconn **workerConns; +} BackupInfo; + +typedef struct +{ + BackupInfo *backupInfo; + uint64 bytesRead; + + int workerid; + pthread_t worker; + + bool terminated; +} WorkerState; + +BackupInfo *backupInfo = NULL; +WorkerState *workers = NULL; + +static pthread_mutex_t fetch_mutex = PTHREAD_MUTEX_INITIALIZER; + /* * pg_xlog has been renamed to pg_wal in version 10. This version number * should be compared with PQserverVersion(). @@ -110,6 +163,9 @@ static bool found_existing_xlogdir = false; static bool made_tablespace_dirs = false; static bool found_tablespace_dirs = false; +static int numWorkers = 1; +static PGresult *tablespacehdr; + /* Progress counters */ static uint64 totalsize_kb; static uint64 totaldone; @@ -140,9 +196,10 @@ static PQExpBuffer recoveryconfcontents = NULL; static void usage(void); static void verify_dir_is_empty_or_create(char *dirname, bool *created, bool *found); static void progress_report(int tablespacenum, const char *filename, bool force); +static void workers_progress_report(uint64 totalBytesRead, const char *filename, bool force); static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum); -static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum); +static int ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum); static void BaseBackup(void); static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline, @@ -151,6 +208,17 @@ static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline, static const char *get_tablespace_mapping(const char *dir); static void tablespace_list_append(const char *arg); +static void ParallelBackupRun(BackupInfo *backupInfo); +static void StopBackup(BackupInfo *backupInfo); +static void GetBackupFileList(PGconn *conn, BackupInfo *backupInfo); +static int GetBackupFile(WorkerState *wstate); +static BackupFile *getNextFile(BackupInfo *backupInfo); +static int compareFileSize(const void *a, const void *b); +static void read_label_tblspcmap(PGconn *conn, char **backup_label, char **tablespace_map); +static void create_backup_dirs(bool basetablespace, char *tablespace, char *name); +static void writefile(char *path, char *buf); +static void *workerRun(void *arg); + static void cleanup_directories_atexit(void) @@ -202,6 +270,17 @@ cleanup_directories_atexit(void) static void disconnect_atexit(void) { + /* close worker connections */ + if (backupInfo && backupInfo->workerConns != NULL) + { + int i; + for (i = 0; i < numWorkers; i++) + { + if (backupInfo->workerConns[i] != NULL) + PQfinish(backupInfo->workerConns[i]); + } + } + if (conn != NULL) PQfinish(conn); } @@ -349,6 +428,7 @@ usage(void) printf(_(" --no-slot prevent creation of temporary replication slot\n")); printf(_(" --no-verify-checksums\n" " do not verify checksums\n")); + printf(_(" -j, --jobs=NUM use this many parallel jobs to backup\n")); printf(_(" -?, --help show this help, then exit\n")); printf(_("\nConnection options:\n")); printf(_(" -d, --dbname=CONNSTR connection string\n")); @@ -695,6 +775,93 @@ verify_dir_is_empty_or_create(char *dirname, bool *created, bool *found) } } +/* + * Print a progress report of worker threads. If verbose output + * is enabled, also print the current file name. + * + * Progress report is written at maximum once per second, unless the + * force parameter is set to true. + */ +static void +workers_progress_report(uint64 totalBytesRead, const char *filename, bool force) +{ + int percent; + char totalBytesRead_str[32]; + char totalsize_str[32]; + pg_time_t now; + + if (!showprogress) + return; + + now = time(NULL); + if (now == last_progress_report && !force) + return; /* Max once per second */ + + last_progress_report = now; + percent = totalsize_kb ? (int) ((totalBytesRead / 1024) * 100 / totalsize_kb) : 0; + + /* + * Avoid overflowing past 100% or the full size. This may make the total + * size number change as we approach the end of the backup (the estimate + * will always be wrong if WAL is included), but that's better than having + * the done column be bigger than the total. + */ + if (percent > 100) + percent = 100; + if (totalBytesRead / 1024 > totalsize_kb) + totalsize_kb = totalBytesRead / 1024; + + /* + * Separate step to keep platform-dependent format code out of + * translatable strings. And we only test for INT64_FORMAT availability + * in snprintf, not fprintf. + */ + snprintf(totalBytesRead_str, sizeof(totalBytesRead_str), INT64_FORMAT, + totalBytesRead / 1024); + snprintf(totalsize_str, sizeof(totalsize_str), INT64_FORMAT, totalsize_kb); + +#define VERBOSE_FILENAME_LENGTH 35 + + if (verbose) + { + if (!filename) + + /* + * No filename given, so clear the status line (used for last + * call) + */ + fprintf(stderr, _("%*s/%s kB (%d%%) copied %*s"), + (int) strlen(totalsize_str), + totalBytesRead_str, totalsize_str, + percent, + VERBOSE_FILENAME_LENGTH + 5, ""); + else + { + bool truncate = (strlen(filename) > VERBOSE_FILENAME_LENGTH); + fprintf(stderr, _("%*s/%s kB (%d%%) copied, current file (%s%-*.*s)"), + (int) strlen(totalsize_str), totalBytesRead_str, totalsize_str, + percent, + /* Prefix with "..." if we do leading truncation */ + truncate ? "..." : "", + truncate ? VERBOSE_FILENAME_LENGTH - 3 : VERBOSE_FILENAME_LENGTH, + truncate ? VERBOSE_FILENAME_LENGTH - 3 : VERBOSE_FILENAME_LENGTH, + /* Truncate filename at beginning if it's too long */ + truncate ? filename + strlen(filename) - VERBOSE_FILENAME_LENGTH + 3 : filename); + } + } + else + { + fprintf(stderr, _("%*s/%s kB (%d%%) copied"), + (int) strlen(totalsize_str), + totalBytesRead_str, totalsize_str, + percent); + } + + if (isatty(fileno(stderr))) + fprintf(stderr, "\r"); + else + fprintf(stderr, "\n"); +} /* * Print a progress report based on the global variables. If verbose output @@ -711,7 +878,7 @@ progress_report(int tablespacenum, const char *filename, bool force) char totalsize_str[32]; pg_time_t now; - if (!showprogress) + if (!showprogress || numWorkers > 1) return; now = time(NULL); @@ -1381,7 +1548,7 @@ get_tablespace_mapping(const char *dir) * specified directory. If it's for another tablespace, it will be restored * in the original or mapped directory. */ -static void +static int ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) { char current_path[MAXPGPATH]; @@ -1392,6 +1559,7 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) bool basetablespace; char *copybuf = NULL; FILE *file = NULL; + int readBytes = 0; basetablespace = PQgetisnull(res, rownum, 0); if (basetablespace) @@ -1455,7 +1623,7 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) pg_log_error("invalid tar block header size: %d", r); exit(1); } - totaldone += 512; + readBytes += 512; current_len_left = read_tar_number(©buf[124], 12); @@ -1486,21 +1654,14 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) * Directory */ filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */ + + /* + * In parallel mode, we create directories before fetching + * files so its Ok if a directory already exist. + */ if (mkdir(filename, pg_dir_create_mode) != 0) { - /* - * When streaming WAL, pg_wal (or pg_xlog for pre-9.6 - * clusters) will have been created by the wal - * receiver process. Also, when the WAL directory - * location was specified, pg_wal (or pg_xlog) has - * already been created as a symbolic link before - * starting the actual backup. So just ignore creation - * failures on related directories. - */ - if (!((pg_str_endswith(filename, "/pg_wal") || - pg_str_endswith(filename, "/pg_xlog") || - pg_str_endswith(filename, "/archive_status")) && - errno == EEXIST)) + if (errno != EEXIST) { pg_log_error("could not create directory \"%s\": %m", filename); @@ -1585,7 +1746,7 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) */ fclose(file); file = NULL; - totaldone += r; + readBytes += r; continue; } @@ -1594,7 +1755,8 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) pg_log_error("could not write to file \"%s\": %m", filename); exit(1); } - totaldone += r; + readBytes += r; + totaldone = readBytes; progress_report(rownum, filename, false); current_len_left -= r; @@ -1622,13 +1784,11 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) if (copybuf != NULL) PQfreemem(copybuf); - if (basetablespace && writerecoveryconf) - WriteRecoveryConfig(conn, basedir, recoveryconfcontents); - /* * No data is synced here, everything is done for all tablespaces at the * end. */ + return readBytes; } @@ -1716,7 +1876,8 @@ BaseBackup(void) } basebkp = - psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s", + psprintf("%s LABEL '%s' %s %s %s %s %s %s %s", + (numWorkers > 1) ? "START_BACKUP" : "BASE_BACKUP", escaped_label, showprogress ? "PROGRESS" : "", includewal == FETCH_WAL ? "WAL" : "", @@ -1774,7 +1935,7 @@ BaseBackup(void) /* * Get the header */ - res = PQgetResult(conn); + tablespacehdr = res = PQgetResult(conn); if (PQresultStatus(res) != PGRES_TUPLES_OK) { pg_log_error("could not get backup header: %s", @@ -1830,24 +1991,74 @@ BaseBackup(void) StartLogStreamer(xlogstart, starttli, sysidentifier); } - /* - * Start receiving chunks - */ - for (i = 0; i < PQntuples(res); i++) + if (numWorkers > 1) { - if (format == 't') - ReceiveTarFile(conn, res, i); - else - ReceiveAndUnpackTarFile(conn, res, i); - } /* Loop over all tablespaces */ + int j = 0, + k = 0; - if (showprogress) + backupInfo = palloc0(sizeof(BackupInfo)); + backupInfo->workerConns = (PGconn **) palloc0(sizeof(PGconn *) * numWorkers); + backupInfo->tablespacecount = tablespacecount; + backupInfo->numWorkers = numWorkers; + strlcpy(backupInfo->xlogstart, xlogstart, sizeof(backupInfo->xlogstart)); + + read_label_tblspcmap(conn, &backupInfo->backup_label, &backupInfo->tablespace_map); + + /* retrieve backup file list from the server. **/ + GetBackupFileList(conn, backupInfo); + + /* + * add backup_label in backup, (for tar format, ReceiveTarFile() will + * take care of it). + */ + if (format == 'p') + writefile("backup_label", backupInfo->backup_label); + + /* + * Flatten the file list to avoid unnecessary locks and enable the sequential + * access to file list. (Creating an array of BackupFile structre pointers). + */ + backupInfo->files = + (BackupFile **) palloc0(sizeof(BackupFile *) * backupInfo->totalfiles); + for (i = 0; i < backupInfo->tablespacecount; i++) + { + TablespaceInfo *curTsInfo = &backupInfo->tsInfo[i]; + + for (j = 0; j < curTsInfo->numFiles; j++) + { + backupInfo->files[k] = &curTsInfo->backupFiles[j]; + k++; + } + } + + ParallelBackupRun(backupInfo); + StopBackup(backupInfo); + } + else { - progress_report(PQntuples(res), NULL, true); - if (isatty(fileno(stderr))) - fprintf(stderr, "\n"); /* Need to move to next line */ + /* + * Start receiving chunks + */ + for (i = 0; i < PQntuples(res); i++) + { + if (format == 't') + ReceiveTarFile(conn, res, i); + else + ReceiveAndUnpackTarFile(conn, res, i); + } /* Loop over all tablespaces */ + + if (showprogress) + { + progress_report(PQntuples(tablespacehdr), NULL, true); + if (isatty(fileno(stderr))) + fprintf(stderr, "\n"); /* Need to move to next line */ + } } + /* Write recovery contents */ + if (format == 'p' && writerecoveryconf) + WriteRecoveryConfig(conn, basedir, recoveryconfcontents); + PQclear(res); /* @@ -2043,6 +2254,7 @@ main(int argc, char **argv) {"waldir", required_argument, NULL, 1}, {"no-slot", no_argument, NULL, 2}, {"no-verify-checksums", no_argument, NULL, 3}, + {"jobs", required_argument, NULL, 'j'}, {NULL, 0, NULL, 0} }; int c; @@ -2070,7 +2282,7 @@ main(int argc, char **argv) atexit(cleanup_directories_atexit); - while ((c = getopt_long(argc, argv, "CD:F:r:RS:T:X:l:nNzZ:d:c:h:p:U:s:wWkvP", + while ((c = getopt_long(argc, argv, "CD:F:r:RS:T:X:l:nNzZ:d:c:h:p:U:s:wWkvPj:", long_options, &option_index)) != -1) { switch (c) @@ -2211,6 +2423,9 @@ main(int argc, char **argv) case 3: verify_checksums = false; break; + case 'j': /* number of jobs */ + numWorkers = atoi(optarg); + break; default: /* @@ -2325,6 +2540,22 @@ main(int argc, char **argv) } } + if (numWorkers <= 0) + { + pg_log_error("invalid number of parallel jobs"); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + + if (format != 'p' && numWorkers > 1) + { + pg_log_error("parallel jobs are only supported with 'plain' format"); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + #ifndef HAVE_LIBZ if (compresslevel != 0) { @@ -2397,3 +2628,406 @@ main(int argc, char **argv) success = true; return 0; } + +/* + * Thread worker + */ +static void * +workerRun(void *arg) +{ + WorkerState *wstate = (WorkerState *) arg; + + GetBackupFile(wstate); + + wstate->terminated = true; + return NULL; +} + +/* + * Runs the worker threads and updates progress until all workers have + * terminated/completed. + */ +static void +ParallelBackupRun(BackupInfo *backupInfo) +{ + int status, + i; + bool threadsActive = true; + uint64 totalBytes = 0; + + workers = (WorkerState *) palloc0(sizeof(WorkerState) * numWorkers); + + for (i = 0; i < numWorkers; i++) + { + WorkerState *worker = &workers[i]; + + worker->backupInfo = backupInfo; + worker->workerid = i; + worker->bytesRead = 0; + worker->terminated = false; + + backupInfo->workerConns[i] = GetConnection(); + status = pthread_create(&worker->worker, NULL, workerRun, worker); + if (status != 0) + { + pg_log_error("failed to create thread: %m"); + exit(1); + } + + if (verbose) + pg_log_info("backup worker (%d) created, %d", i, status); + } + + /* + * This is the main thread for updating progrsss. It waits for workers to + * complete and gets updated status during every loop iteration. + */ + while(threadsActive) + { + char *filename = NULL; + + threadsActive = false; + totalBytes = 0; + + for (i = 0; i < numWorkers; i++) + { + WorkerState *worker = &workers[i]; + + totalBytes += worker->bytesRead; + threadsActive |= !worker->terminated; + } + + if (backupInfo->fileIndex < backupInfo->totalfiles) + filename = backupInfo->files[backupInfo->fileIndex]->path; + + workers_progress_report(totalBytes, filename, false); + pg_usleep(100000); + } + + if (showprogress) + { + workers_progress_report(totalBytes, NULL, true); + if (isatty(fileno(stderr))) + fprintf(stderr, "\n"); /* Need to move to next line */ + } +} + +/* + * Take the system out of backup mode. + */ +static void +StopBackup(BackupInfo *backupInfo) +{ + PGresult *res = NULL; + char *basebkp; + + basebkp = psprintf("STOP_BACKUP LABEL '%s' %s %s", + backupInfo->backup_label, + includewal == FETCH_WAL ? "WAL" : "", + includewal == NO_WAL ? "" : "NOWAIT"); + if (PQsendQuery(conn, basebkp) == 0) + { + pg_log_error("could not execute STOP BACKUP \"%s\"", + PQerrorMessage(conn)); + exit(1); + } + + /* receive pg_control and wal files */ + ReceiveAndUnpackTarFile(conn, res, tablespacecount); + PQclear(res); +} + +/* + * Retrive backup file list from the server and populate TablespaceInfo struct + * to keep track of tablespaces and its files. + */ +static void +GetBackupFileList(PGconn *conn, BackupInfo *backupInfo) +{ + int i; + PGresult *res = NULL; + char *basebkp; + + backupInfo->tsInfo = palloc0(sizeof(TablespaceInfo) * backupInfo->tablespacecount); + TablespaceInfo *tsInfo = backupInfo->tsInfo; + + /* + * Get list of files. + */ + basebkp = psprintf("SEND_BACKUP_FILELIST %s", + format == 't' ? "TABLESPACE_MAP" : ""); + if (PQsendQuery(conn, basebkp) == 0) + { + pg_log_error("could not send replication command \"%s\": %s", + "SEND_BACKUP_FILELIST", PQerrorMessage(conn)); + exit(1); + } + + /* + * The list of files is grouped by tablespaces, and we want to fetch them + * in the same order. + */ + for (i = 0; i < backupInfo->tablespacecount; i++) + { + bool basetablespace; + + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not get backup header: %s", + PQerrorMessage(conn)); + exit(1); + } + if (PQntuples(res) < 1) + { + pg_log_error("no data returned from server"); + exit(1); + } + + basetablespace = PQgetisnull(tablespacehdr, i, 0); + tsInfo[i].tblspcOid = atol(PQgetvalue(tablespacehdr, i, 0)); + tsInfo[i].tablespace = PQgetvalue(tablespacehdr, i, 1); + tsInfo[i].numFiles = PQntuples(res); + tsInfo[i].backupFiles = palloc0(sizeof(BackupFile) * tsInfo[i].numFiles); + + /* keep count of all files in backup */ + backupInfo->totalfiles += tsInfo[i].numFiles; + + for (int j = 0; j < tsInfo[i].numFiles; j++) + { + char *path = PQgetvalue(res, j, 0); + char type = PQgetvalue(res, j, 1)[0]; + int32 size = atol(PQgetvalue(res, j, 2)); + time_t mtime = atol(PQgetvalue(res, j, 3)); + + /* + * In 'plain' format, create backup directories first. + */ + if (format == 'p' && type == 'd') + create_backup_dirs(basetablespace, tsInfo[i].tablespace, path); + + strlcpy(tsInfo[i].backupFiles[j].path, path, MAXPGPATH); + tsInfo[i].backupFiles[j].type = type; + tsInfo[i].backupFiles[j].size = size; + tsInfo[i].backupFiles[j].mtime = mtime; + tsInfo[i].backupFiles[j].tsIndex = i; + } + + /* sort files in descending order, based on size */ + qsort(tsInfo[i].backupFiles, tsInfo[i].numFiles, + sizeof(BackupFile), &compareFileSize); + PQclear(res); + } + + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + pg_log_error("could not get data: %s", PQerrorMessage(conn)); + exit(1); + } + res = PQgetResult(conn); +} + +/* + * Retrive and write backup file from the server. The file list is provided by + * worker state. It pulls a single file from this list and writes it to the + * backup directory. + */ +static int +GetBackupFile(WorkerState *wstate) +{ + PGresult *res = NULL; + PGconn *worker_conn = NULL; + BackupFile *fetchFile = NULL; + BackupInfo *backupInfo = NULL; + + backupInfo = wstate->backupInfo; + worker_conn = backupInfo->workerConns[wstate->workerid]; + while ((fetchFile = getNextFile(backupInfo)) != NULL) + { + PQExpBuffer buf = createPQExpBuffer(); + TablespaceInfo *curTsInfo = &backupInfo->tsInfo[fetchFile->tsIndex]; + + + /* + * build query in form of: SEND_BACKUP_FILES ('base/1/1245/32683', + * 'base/1/1245/32683', ...) [options] + */ + appendPQExpBuffer(buf, "SEND_BACKUP_FILES ( '%s' )", fetchFile->path); + + /* add options */ + appendPQExpBuffer(buf, " TABLESPACE_PATH '%s' START_WAL_LOCATION '%s' %s %s", + curTsInfo->tablespace, + backupInfo->xlogstart, + format == 't' ? "TABLESPACE_MAP" : "", + verify_checksums ? "" : "NOVERIFY_CHECKSUMS"); + if (maxrate > 0) + appendPQExpBuffer(buf, " MAX_RATE %u", maxrate); + + if (!worker_conn) + return 1; + + if (PQsendQuery(worker_conn, buf->data) == 0) + { + pg_log_error("could not send files list \"%s\"", + PQerrorMessage(worker_conn)); + return 1; + } + + destroyPQExpBuffer(buf); + + /* process file contents, also count bytesRead for progress */ + wstate->bytesRead += + ReceiveAndUnpackTarFile(worker_conn, tablespacehdr, fetchFile->tsIndex); + + res = PQgetResult(worker_conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + pg_log_error("could not get data stream: %s", + PQerrorMessage(worker_conn)); + exit(1); + } + + res = PQgetResult(worker_conn); + } + + PQclear(res); + return 0; +} + +/* + * Increment fileIndex and store it in a local variable so that even a + * context switch does not affect the file index value and we don't accidentally + * increment the value twice and therefore skip some files. + */ +static BackupFile* +getNextFile(BackupInfo *backupInfo) +{ + int fileIndex = 0; + + pthread_mutex_lock(&fetch_mutex); + fileIndex = backupInfo->fileIndex++; + pthread_mutex_unlock(&fetch_mutex); + + if (fileIndex >= backupInfo->totalfiles) + return NULL; + + return backupInfo->files[fileIndex]; +} + +/* qsort comparator for BackupFile (sort descending order) */ +static int +compareFileSize(const void *a, const void *b) +{ + const BackupFile *v1 = (BackupFile *) a; + const BackupFile *v2 = (BackupFile *) b; + + if (v1->size > v2->size) + return -1; + if (v1->size < v2->size) + return 1; + + return 0; +} + +static void +read_label_tblspcmap(PGconn *conn, char **backuplabel, char **tblspc_map) +{ + PGresult *res = NULL; + + Assert(backuplabel != NULL); + Assert(tblspc_map != NULL); + + /* + * Get Backup label and tablespace map data. + */ + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not get data: %s", + PQerrorMessage(conn)); + exit(1); + } + if (PQntuples(res) < 1) + { + pg_log_error("no data returned from server"); + exit(1); + } + + *backuplabel = PQgetvalue(res, 0, 0); /* backup_label */ + if (!PQgetisnull(res, 0, 1)) + *tblspc_map = PQgetvalue(res, 0, 1); /* tablespace_map */ + + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + pg_log_error("could not get data: %s", + PQerrorMessage(conn)); + exit(1); + } + + res = PQgetResult(conn); + PQclear(res); +} + +/* + * Create backup direcotries while taking care of tablespace path. If tablespace + * mapping (with -T) is given then the directory will be created on the mapped + * path. + */ +static void +create_backup_dirs(bool basetablespace, char *tablespace, char *name) +{ + char dirpath[MAXPGPATH]; + + Assert(name != NULL); + + if (basetablespace) + snprintf(dirpath, sizeof(dirpath), "%s/%s", basedir, name); + else + { + Assert(tablespace != NULL); + snprintf(dirpath, sizeof(dirpath), "%s/%s", + get_tablespace_mapping(tablespace), (name + strlen(tablespace) + 1)); + } + + if (pg_mkdir_p(dirpath, pg_dir_create_mode) != 0) + { + if (errno != EEXIST) + { + pg_log_error("could not create directory \"%s\": %m", + dirpath); + exit(1); + } + } +} + +/* + * General function for writing to a file; creates one if it doesn't exist + */ +static void +writefile(char *path, char *buf) +{ + FILE *f; + char pathbuf[MAXPGPATH]; + + snprintf(pathbuf, MAXPGPATH, "%s/%s", basedir, path); + f = fopen(pathbuf, "w"); + if (f == NULL) + { + pg_log_error("could not open file \"%s\": %m", pathbuf); + exit(1); + } + + if (fwrite(buf, strlen(buf), 1, f) != 1) + { + pg_log_error("could not write to file \"%s\": %m", pathbuf); + exit(1); + } + + if (fclose(f)) + { + pg_log_error("could not write to file \"%s\": %m", pathbuf); + exit(1); + } +} -- 2.21.0 (Apple Git-122)