From 245b10802490fafeba7b17779e5c2860fbc1181c Mon Sep 17 00:00:00 2001 From: Asif Rehman Date: Mon, 14 Oct 2019 17:28:58 +0500 Subject: [PATCH 3/4] pg_basebackup changes for parallel backup. --- src/bin/pg_basebackup/pg_basebackup.c | 583 ++++++++++++++++++++++++-- 1 file changed, 548 insertions(+), 35 deletions(-) diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 55ef13926d..311c1f94ca 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -41,6 +41,7 @@ #include "receivelog.h" #include "replication/basebackup.h" #include "streamutil.h" +#include "fe_utils/simple_list.h" #define ERRCODE_DATA_CORRUPTED "XX001" @@ -57,6 +58,37 @@ typedef struct TablespaceList TablespaceListCell *tail; } TablespaceList; +typedef struct +{ + char name[MAXPGPATH]; + char type; + int32 size; + time_t mtime; +} BackupFile; + +typedef struct +{ + Oid tblspcOid; + char *tablespace; /* tablespace name or NULL if 'base' tablespace */ + int numFiles; /* number of files */ + BackupFile *backupFiles; /* list of files in tablespace */ +} TablespaceInfo; + +typedef struct +{ + int tablespacecount; + int numWorkers; + + char xlogstart[64]; + char *backup_label; + char *tablespace_map; + + TablespaceInfo *tsInfo; + SimpleStringList **worker_files; +} BackupInfo; + +static BackupInfo *backupInfo = NULL; + /* * pg_xlog has been renamed to pg_wal in version 10. This version number * should be compared with PQserverVersion(). @@ -110,6 +142,10 @@ static bool found_existing_xlogdir = false; static bool made_tablespace_dirs = false; static bool found_tablespace_dirs = false; +static int numWorkers = 1; +static PGresult *tablespacehdr; +static SimpleOidList workerspid = {NULL, NULL}; + /* Progress counters */ static uint64 totalsize_kb; static uint64 totaldone; @@ -141,7 +177,7 @@ static void usage(void); static void verify_dir_is_empty_or_create(char *dirname, bool *created, bool *found); static void progress_report(int tablespacenum, const char *filename, bool force); -static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum); +static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum, int worker); static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum); static void BaseBackup(void); @@ -151,6 +187,16 @@ static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline, static const char *get_tablespace_mapping(const char *dir); static void tablespace_list_append(const char *arg); +static void ParallelBackupEnd(void); +static void GetBackupFilesList(PGconn *conn, BackupInfo *binfo); +static int ReceiveFiles(BackupInfo *backupInfo, int worker); +static int compareFileSize(const void *a, const void *b); +static void create_workers_and_fetch(BackupInfo *backupInfo); +static void read_label_tblspcmap(PGconn *conn, char **backup_label, char **tablespace_map); +static void create_backup_dirs(bool basetablespace, char *tablespace, char *name); +static void writefile(char *path, char *buf); +static int simple_list_length(SimpleStringList *list); + static void cleanup_directories_atexit(void) @@ -349,6 +395,7 @@ usage(void) printf(_(" --no-slot prevent creation of temporary replication slot\n")); printf(_(" --no-verify-checksums\n" " do not verify checksums\n")); + printf(_(" -j, --jobs=NUM use this many parallel jobs to backup\n")); printf(_(" -?, --help show this help, then exit\n")); printf(_("\nConnection options:\n")); printf(_(" -d, --dbname=CONNSTR connection string\n")); @@ -921,7 +968,7 @@ writeTarData( * No attempt to inspect or validate the contents of the file is done. */ static void -ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) +ReceiveTarFile(PGconn *conn, PGresult *res, int rownum, int worker) { char filename[MAXPGPATH]; char *copybuf = NULL; @@ -978,7 +1025,10 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) #ifdef HAVE_LIBZ if (compresslevel != 0) { - snprintf(filename, sizeof(filename), "%s/base.tar.gz", basedir); + if (numWorkers > 1) + snprintf(filename, sizeof(filename), "%s/base.%d.tar.gz", basedir, worker); + else + snprintf(filename, sizeof(filename), "%s/base.tar.gz", basedir); ztarfile = gzopen(filename, "wb"); if (gzsetparams(ztarfile, compresslevel, Z_DEFAULT_STRATEGY) != Z_OK) @@ -991,7 +1041,10 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) else #endif { - snprintf(filename, sizeof(filename), "%s/base.tar", basedir); + if (numWorkers > 1) + snprintf(filename, sizeof(filename), "%s/base.%d.tar", basedir, worker); + else + snprintf(filename, sizeof(filename), "%s/base.tar", basedir); tarfile = fopen(filename, "wb"); } } @@ -1004,8 +1057,12 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) #ifdef HAVE_LIBZ if (compresslevel != 0) { - snprintf(filename, sizeof(filename), "%s/%s.tar.gz", basedir, - PQgetvalue(res, rownum, 0)); + if (numWorkers > 1) + snprintf(filename, sizeof(filename), "%s/%s.%d.tar.gz", basedir, + PQgetvalue(res, rownum, 0), worker); + else + snprintf(filename, sizeof(filename), "%s/%s.tar.gz", basedir, + PQgetvalue(res, rownum, 0)); ztarfile = gzopen(filename, "wb"); if (gzsetparams(ztarfile, compresslevel, Z_DEFAULT_STRATEGY) != Z_OK) @@ -1018,8 +1075,12 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) else #endif { - snprintf(filename, sizeof(filename), "%s/%s.tar", basedir, - PQgetvalue(res, rownum, 0)); + if (numWorkers > 1) + snprintf(filename, sizeof(filename), "%s/%s.%d.tar", basedir, + PQgetvalue(res, rownum, 0), worker); + else + snprintf(filename, sizeof(filename), "%s/%s.tar", basedir, + PQgetvalue(res, rownum, 0)); tarfile = fopen(filename, "wb"); } } @@ -1082,6 +1143,45 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) MemSet(zerobuf, 0, sizeof(zerobuf)); + if (numWorkers > 1 && basetablespace && worker == 0) + { + char header[512]; + int padding; + int len; + + /* add backup_label and tablespace_map files to the tar */ + len = strlen(backupInfo->backup_label); + tarCreateHeader(header, + "backup_label", + NULL, + len, + pg_file_create_mode, 04000, 02000, + time(NULL)); + + padding = ((len + 511) & ~511) - len; + WRITE_TAR_DATA(header, sizeof(header)); + WRITE_TAR_DATA(backupInfo->backup_label, len); + if (padding) + WRITE_TAR_DATA(zerobuf, padding); + + if (backupInfo->tablespace_map) + { + len = strlen(backupInfo->tablespace_map); + tarCreateHeader(header, + "tablespace_map", + NULL, + len, + pg_file_create_mode, 04000, 02000, + time(NULL)); + + padding = ((len + 511) & ~511) - len; + WRITE_TAR_DATA(header, sizeof(header)); + WRITE_TAR_DATA(backupInfo->tablespace_map, len); + if (padding) + WRITE_TAR_DATA(zerobuf, padding); + } + } + if (basetablespace && writerecoveryconf) { char header[512]; @@ -1475,6 +1575,7 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) */ snprintf(filename, sizeof(filename), "%s/%s", current_path, copybuf); + if (filename[strlen(filename) - 1] == '/') { /* @@ -1486,21 +1587,14 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) * Directory */ filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */ + + /* + * In parallel mode, we create directories before fetching + * files so its Ok if a directory already exist. + */ if (mkdir(filename, pg_dir_create_mode) != 0) { - /* - * When streaming WAL, pg_wal (or pg_xlog for pre-9.6 - * clusters) will have been created by the wal - * receiver process. Also, when the WAL directory - * location was specified, pg_wal (or pg_xlog) has - * already been created as a symbolic link before - * starting the actual backup. So just ignore creation - * failures on related directories. - */ - if (!((pg_str_endswith(filename, "/pg_wal") || - pg_str_endswith(filename, "/pg_xlog") || - pg_str_endswith(filename, "/archive_status")) && - errno == EEXIST)) + if (errno != EEXIST) { pg_log_error("could not create directory \"%s\": %m", filename); @@ -1528,8 +1622,8 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) * can map them too.) */ filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */ - mapped_tblspc_path = get_tablespace_mapping(©buf[157]); + if (symlink(mapped_tblspc_path, filename) != 0) { pg_log_error("could not create symbolic link from \"%s\" to \"%s\": %m", @@ -1716,7 +1810,8 @@ BaseBackup(void) } basebkp = - psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s", + psprintf("%s LABEL '%s' %s %s %s %s %s %s %s", + (numWorkers > 1) ? "START_BACKUP" : "BASE_BACKUP", escaped_label, showprogress ? "PROGRESS" : "", includewal == FETCH_WAL ? "WAL" : "", @@ -1774,7 +1869,7 @@ BaseBackup(void) /* * Get the header */ - res = PQgetResult(conn); + tablespacehdr = res = PQgetResult(conn); if (PQresultStatus(res) != PGRES_TUPLES_OK) { pg_log_error("could not get backup header: %s", @@ -1830,20 +1925,62 @@ BaseBackup(void) StartLogStreamer(xlogstart, starttli, sysidentifier); } - /* - * Start receiving chunks - */ - for (i = 0; i < PQntuples(res); i++) + if (numWorkers > 1) { - if (format == 't') - ReceiveTarFile(conn, res, i); - else - ReceiveAndUnpackTarFile(conn, res, i); - } /* Loop over all tablespaces */ + backupInfo = palloc0(sizeof(BackupInfo)); + + backupInfo->tablespacecount = tablespacecount; + backupInfo->numWorkers = numWorkers; + strlcpy(backupInfo->xlogstart, xlogstart, sizeof(backupInfo->xlogstart)); + read_label_tblspcmap(conn, &backupInfo->backup_label, &backupInfo->tablespace_map); + + /* retrive backup files from server. **/ + GetBackupFilesList(conn, backupInfo); + + /* + * add backup_label in backup, (for tar format, ReceiveTarFile() will + * takecare of it). + */ + if (format == 'p') + writefile("backup_label", backupInfo->backup_label); + + /* + * The backup files list is already in descending order, distribute it + * to workers. + */ + backupInfo->worker_files = palloc0(sizeof(SimpleStringList) * tablespacecount); + for (i = 0; i < backupInfo->tablespacecount; i++) + { + TablespaceInfo *curTsInfo = &backupInfo->tsInfo[i]; + + backupInfo->worker_files[i] = palloc0(sizeof(SimpleStringList) * numWorkers); + for (int j = 0; j < curTsInfo->numFiles; j++) + { + simple_string_list_append(&backupInfo->worker_files[i][j % numWorkers], + curTsInfo->backupFiles[j].name); + } + } + + create_workers_and_fetch(backupInfo); + ParallelBackupEnd(); + } + else + { + /* + * Start receiving chunks + */ + for (i = 0; i < PQntuples(res); i++) + { + if (format == 't') + ReceiveTarFile(conn, res, i, 0); + else + ReceiveAndUnpackTarFile(conn, res, i); + } /* Loop over all tablespaces */ + } if (showprogress) { - progress_report(PQntuples(res), NULL, true); + progress_report(PQntuples(tablespacehdr), NULL, true); if (isatty(fileno(stderr))) fprintf(stderr, "\n"); /* Need to move to next line */ } @@ -2043,6 +2180,7 @@ main(int argc, char **argv) {"waldir", required_argument, NULL, 1}, {"no-slot", no_argument, NULL, 2}, {"no-verify-checksums", no_argument, NULL, 3}, + {"jobs", required_argument, NULL, 'j'}, {NULL, 0, NULL, 0} }; int c; @@ -2070,7 +2208,7 @@ main(int argc, char **argv) atexit(cleanup_directories_atexit); - while ((c = getopt_long(argc, argv, "CD:F:r:RS:T:X:l:nNzZ:d:c:h:p:U:s:wWkvP", + while ((c = getopt_long(argc, argv, "CD:F:r:RS:T:X:l:nNzZ:d:c:h:p:U:s:wWkvPj:", long_options, &option_index)) != -1) { switch (c) @@ -2211,6 +2349,9 @@ main(int argc, char **argv) case 3: verify_checksums = false; break; + case 'j': /* number of jobs */ + numWorkers = atoi(optarg); + break; default: /* @@ -2325,6 +2466,14 @@ main(int argc, char **argv) } } + if (numWorkers <= 0) + { + pg_log_error("invalid number of parallel jobs"); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + #ifndef HAVE_LIBZ if (compresslevel != 0) { @@ -2397,3 +2546,367 @@ main(int argc, char **argv) success = true; return 0; } + +static void +ParallelBackupEnd(void) +{ + PGresult *res = NULL; + char *basebkp; + + basebkp = psprintf("STOP_BACKUP LABEL '%s' %s %s", + backupInfo->backup_label, + includewal == FETCH_WAL ? "WAL" : "", + includewal == NO_WAL ? "" : "NOWAIT"); + if (PQsendQuery(conn, basebkp) == 0) + { + pg_log_error("could not execute STOP BACKUP \"%s\"", + PQerrorMessage(conn)); + exit(1); + } + + /* receive pg_control and wal files */ + if (format == 't') + ReceiveTarFile(conn, res, tablespacecount, numWorkers); + else + ReceiveAndUnpackTarFile(conn, res, tablespacecount); + + PQclear(res); +} + +static void +GetBackupFilesList(PGconn *conn, BackupInfo *backupInfo) +{ + int i; + PGresult *res = NULL; + char *basebkp; + + backupInfo->tsInfo = palloc0(sizeof(TablespaceInfo) * backupInfo->tablespacecount); + TablespaceInfo *tsInfo = backupInfo->tsInfo; + + /* + * Get list of files. + */ + basebkp = psprintf("SEND_FILE_LIST %s", + format == 't' ? "TABLESPACE_MAP" : ""); + if (PQsendQuery(conn, basebkp) == 0) + { + pg_log_error("could not send replication command \"%s\": %s", + "SEND_FILE_LIST", PQerrorMessage(conn)); + exit(1); + } + + /* + * The list of files is grouped by tablespaces, and we want to fetch them + * in the same order. + */ + for (i = 0; i < backupInfo->tablespacecount; i++) + { + bool basetablespace; + + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not get backup header: %s", + PQerrorMessage(conn)); + exit(1); + } + if (PQntuples(res) < 1) + { + pg_log_error("no data returned from server"); + exit(1); + } + + basetablespace = PQgetisnull(tablespacehdr, i, 0); + tsInfo[i].tblspcOid = atol(PQgetvalue(tablespacehdr, i, 0)); + tsInfo[i].tablespace = PQgetvalue(tablespacehdr, i, 1); + tsInfo[i].numFiles = PQntuples(res); + tsInfo[i].backupFiles = + palloc0(sizeof(BackupFile) * tsInfo[i].numFiles); + + for (int j = 0; j < tsInfo[i].numFiles; j++) + { + char *name = PQgetvalue(res, j, 0); + char type = PQgetvalue(res, j, 1)[0]; + int32 size = atol(PQgetvalue(res, j, 2)); + time_t mtime = atol(PQgetvalue(res, j, 3)); + + /* + * In 'plain' format, create backup directories first. + */ + if (format == 'p' && type == 'd') + create_backup_dirs(basetablespace, tsInfo[i].tablespace, name); + + strlcpy(tsInfo[i].backupFiles[j].name, name, MAXPGPATH); + tsInfo[i].backupFiles[j].type = type; + tsInfo[i].backupFiles[j].size = size; + tsInfo[i].backupFiles[j].mtime = mtime; + } + + /* sort files in descending order, based on size */ + qsort(tsInfo[i].backupFiles, tsInfo[i].numFiles, + sizeof(BackupFile), &compareFileSize); + PQclear(res); + } + + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + pg_log_error("could not get data: %s", PQerrorMessage(conn)); + exit(1); + } + res = PQgetResult(conn); +} + +static int +ReceiveFiles(BackupInfo *backupInfo, int worker) +{ + SimpleStringListCell *cell; + PGresult *res = NULL; + PGconn *worker_conn; + int i; + + worker_conn = GetConnection(); + for (i = 0; i < backupInfo->tablespacecount; i++) + { + TablespaceInfo *curTsInfo = &backupInfo->tsInfo[i]; + SimpleStringList *files = &backupInfo->worker_files[i][worker]; + PQExpBuffer buf = createPQExpBuffer(); + + if (simple_list_length(files) <= 0) + continue; + + + /* + * build query in form of: SEND_FILES_CONTENT ('base/1/1245/32683', + * 'base/1/1245/32683', ...) [options] + */ + appendPQExpBuffer(buf, "SEND_FILES_CONTENT ("); + for (cell = files->head; cell; cell = cell->next) + { + if (cell != files->tail) + appendPQExpBuffer(buf, "'%s' ,", cell->val); + else + appendPQExpBuffer(buf, "'%s'", cell->val); + } + appendPQExpBufferStr(buf, ")"); + + /* + * Add backup options to the command. we are reusing the LABEL here to + * keep the original tablespace path on the server. + */ + appendPQExpBuffer(buf, " LABEL '%s' LSN '%s' %s %s", + curTsInfo->tablespace, + backupInfo->xlogstart, + format == 't' ? "TABLESPACE_MAP" : "", + verify_checksums ? "" : "NOVERIFY_CHECKSUMS"); + if (maxrate > 0) + appendPQExpBuffer(buf, " MAX_RATE %u", maxrate); + + if (!worker_conn) + return 1; + + if (PQsendQuery(worker_conn, buf->data) == 0) + { + pg_log_error("could not send files list \"%s\"", + PQerrorMessage(worker_conn)); + return 1; + } + + destroyPQExpBuffer(buf); + if (format == 't') + ReceiveTarFile(worker_conn, tablespacehdr, i, worker); + else + ReceiveAndUnpackTarFile(worker_conn, tablespacehdr, i); + + res = PQgetResult(worker_conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + pg_log_error("could not get data stream: %s", + PQerrorMessage(worker_conn)); + exit(1); + } + + res = PQgetResult(worker_conn); + } + + PQclear(res); + PQfinish(worker_conn); + + return 0; +} + +/* qsort comparator for BackupFile (sort descending order) */ +static int +compareFileSize(const void *a, const void *b) +{ + const BackupFile *v1 = (BackupFile *) a; + const BackupFile *v2 = (BackupFile *) b; + + if (v1->size > v2->size) + return -1; + if (v1->size < v2->size) + return 1; + + return 0; +} + +static void +create_workers_and_fetch(BackupInfo *backupInfo) +{ + int status; + int pid, + i; + + for (i = 0; i < numWorkers; i++) + { + pid = fork(); + if (pid == 0) + { + /* in child process */ + _exit(ReceiveFiles(backupInfo, i)); + } + else if (pid < 0) + { + pg_log_error("could not create backup worker: %m"); + exit(1); + } + + simple_oid_list_append(&workerspid, pid); + if (verbose) + pg_log_info("backup worker (%d) created", pid); + + /* + * Else we are in the parent process and all is well. + */ + } + + for (i = 0; i < numWorkers; i++) + { + pid = waitpid(-1, &status, 0); + + if (WIFEXITED(status) && WEXITSTATUS(status) == EXIT_FAILURE) + { + SimpleOidListCell *cell; + + pg_log_error("backup worker (%d) failed with code %d", pid, WEXITSTATUS(status)); + + /* error. kill other workers and exit. */ + for (cell = workerspid.head; cell; cell = cell->next) + { + if (pid != cell->val) + { + kill(cell->val, SIGTERM); + pg_log_error("backup worker killed %d", cell->val); + } + } + + exit(1); + } + } +} + +static void +read_label_tblspcmap(PGconn *conn, char **backuplabel, char **tblspc_map) +{ + PGresult *res = NULL; + + Assert(backuplabel != NULL); + Assert(tblspc_map != NULL); + + /* + * Get Backup label and tablespace map data. + */ + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not get data: %s", + PQerrorMessage(conn)); + exit(1); + } + if (PQntuples(res) < 1) + { + pg_log_error("no data returned from server"); + exit(1); + } + + *backuplabel = PQgetvalue(res, 0, 0); /* backup_label */ + if (!PQgetisnull(res, 0, 1)) + *tblspc_map = PQgetvalue(res, 0, 1); /* tablespae_map */ + + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + pg_log_error("could not get data: %s", + PQerrorMessage(conn)); + exit(1); + } + + res = PQgetResult(conn); + PQclear(res); +} + +static void +create_backup_dirs(bool basetablespace, char *tablespace, char *name) +{ + char dirpath[MAXPGPATH]; + + Assert(name != NULL); + + if (basetablespace) + snprintf(dirpath, sizeof(dirpath), "%s/%s", basedir, name); + else + { + Assert(tablespace != NULL); + snprintf(dirpath, sizeof(dirpath), "%s/%s", + get_tablespace_mapping(tablespace), (name + strlen(tablespace) + 1)); + } + + if (pg_mkdir_p(dirpath, pg_dir_create_mode) != 0) + { + if (errno != EEXIST) + { + pg_log_error("could not create directory \"%s\": %m", + dirpath); + exit(1); + } + } +} + +static void +writefile(char *path, char *buf) +{ + FILE *f; + char pathbuf[MAXPGPATH]; + + snprintf(pathbuf, MAXPGPATH, "%s/%s", basedir, path); + f = fopen(pathbuf, "w"); + if (f == NULL) + { + pg_log_error("could not open file \"%s\": %m", pathbuf); + exit(1); + } + + if (fwrite(buf, strlen(buf), 1, f) != 1) + { + pg_log_error("could not write to file \"%s\": %m", pathbuf); + exit(1); + } + + if (fclose(f)) + { + pg_log_error("could not write to file \"%s\": %m", pathbuf); + exit(1); + } +} + +static int +simple_list_length(SimpleStringList *list) +{ + int len = 0; + SimpleStringListCell *cell; + + for (cell = list->head; cell; cell = cell->next, len++) + ; + + return len; +} -- 2.21.0 (Apple Git-122)