From 8c29c68ff24413d8d01478080d9741b0b231d848 Mon Sep 17 00:00:00 2001 From: Asif Rehman Date: Thu, 3 Oct 2019 23:41:55 +0500 Subject: [PATCH] parallel backup --- src/backend/access/transam/xlog.c | 2 +- src/backend/replication/basebackup.c | 1078 +++++++++++++---- src/backend/replication/repl_gram.y | 58 + src/backend/replication/repl_scanner.l | 5 + src/bin/pg_basebackup/pg_basebackup.c | 360 +++++- .../t/040_pg_basebackup_parallel.pl | 571 +++++++++ src/include/nodes/replnodes.h | 9 + src/include/replication/basebackup.h | 2 +- 8 files changed, 1797 insertions(+), 288 deletions(-) create mode 100644 src/bin/pg_basebackup/t/040_pg_basebackup_parallel.pl diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 790e2c8714..3dc2ebd7dc 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -10477,7 +10477,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p, ti->oid = pstrdup(de->d_name); ti->path = pstrdup(buflinkpath.data); ti->rpath = relpath ? pstrdup(relpath) : NULL; - ti->size = infotbssize ? sendTablespace(fullpath, true) : -1; + ti->size = infotbssize ? sendTablespace(fullpath, true, NULL) : -1; if (tablespaces) *tablespaces = lappend(*tablespaces, ti); diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index d0f210de8c..fe906dbfdf 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -52,11 +52,31 @@ typedef struct bool includewal; uint32 maxrate; bool sendtblspcmapfile; + int32 worker; } basebackup_options; +typedef struct +{ + char path[MAXPGPATH]; + bool isdir; + int32 size; +} pathinfo; + +#define STORE_PATHINFO(_filenames, _path, _isdir, _size) \ + do { \ + if (files != NULL) { \ + pathinfo *pi = palloc0(sizeof(pathinfo)); \ + strlcpy(pi->path, _path, sizeof(pi->path)); \ + pi->isdir = _isdir; \ + pi->size = _size; \ + *_filenames = lappend(*_filenames, pi); \ + } \ + } while(0) static int64 sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, bool sendtblspclinks); +static int64 sendDir_(const char *path, int basepathlen, bool sizeonly, + List *tablespaces, bool sendtblspclinks, List **files); static bool sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf, bool missing_ok, Oid dboid); static void sendFileWithContent(const char *filename, const char *content); @@ -71,15 +91,26 @@ static void perform_base_backup(basebackup_options *opt); static void parse_basebackup_options(List *options, basebackup_options *opt); static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli); static int compareWalFileNames(const ListCell *a, const ListCell *b); +static int compareFileSize(const ListCell *a, const ListCell *b); static void throttle(size_t increment); static bool is_checksummed_file(const char *fullpath, const char *filename); +static void StartBackup(basebackup_options *opt); +static void StopBackup(basebackup_options *opt); +static void SendBackupFileList(basebackup_options *opt, List *tablespaces); +static void SendFilesContents(basebackup_options *opt, List *filenames, bool missing_ok); +static void include_wal_files(XLogRecPtr endptr, TimeLineID endtli); +static void setup_throttle(int maxrate); +static char *readfile(const char *readfilename, bool missing_ok); + /* Was the backup currently in-progress initiated in recovery mode? */ static bool backup_started_in_recovery = false; /* Relative path of temporary statistics directory */ static char *statrelpath = NULL; +#define BACKUP_LABEL_FILE_TMP BACKUP_LABEL_FILE ".tmp" +#define TABLESPACE_MAP_TMP TABLESPACE_MAP ".tmp" /* * Size of each block sent into the tar stream for larger files. */ @@ -192,6 +223,14 @@ static const char *const excludeFiles[] = BACKUP_LABEL_FILE, TABLESPACE_MAP, + /* + * Skip backup_label.tmp or tablespace_map.tmp files. These are temporary + * and are injected into the backup by SendFilesList and + * SendFilesContents, will be removed after as well. + */ + BACKUP_LABEL_FILE_TMP, + TABLESPACE_MAP_TMP, + "postmaster.pid", "postmaster.opts", @@ -294,28 +333,7 @@ perform_base_backup(basebackup_options *opt) SendBackupHeader(tablespaces); /* Setup and activate network throttling, if client requested it */ - if (opt->maxrate > 0) - { - throttling_sample = - (int64) opt->maxrate * (int64) 1024 / THROTTLING_FREQUENCY; - - /* - * The minimum amount of time for throttling_sample bytes to be - * transferred. - */ - elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY; - - /* Enable throttling. */ - throttling_counter = 0; - - /* The 'real data' starts now (header was ignored). */ - throttled_last = GetCurrentTimestamp(); - } - else - { - /* Disable throttling. */ - throttling_counter = -1; - } + setup_throttle(opt->maxrate); /* Send off our tablespaces one by one */ foreach(lc, tablespaces) @@ -357,7 +375,7 @@ perform_base_backup(basebackup_options *opt) sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false, InvalidOid); } else - sendTablespace(ti->path, false); + sendTablespace(ti->path, false, NULL); /* * If we're including WAL, and this is the main data directory we @@ -384,227 +402,7 @@ perform_base_backup(basebackup_options *opt) * We've left the last tar file "open", so we can now append the * required WAL files to it. */ - char pathbuf[MAXPGPATH]; - XLogSegNo segno; - XLogSegNo startsegno; - XLogSegNo endsegno; - struct stat statbuf; - List *historyFileList = NIL; - List *walFileList = NIL; - char firstoff[MAXFNAMELEN]; - char lastoff[MAXFNAMELEN]; - DIR *dir; - struct dirent *de; - ListCell *lc; - TimeLineID tli; - - /* - * I'd rather not worry about timelines here, so scan pg_wal and - * include all WAL files in the range between 'startptr' and 'endptr', - * regardless of the timeline the file is stamped with. If there are - * some spurious WAL files belonging to timelines that don't belong in - * this server's history, they will be included too. Normally there - * shouldn't be such files, but if there are, there's little harm in - * including them. - */ - XLByteToSeg(startptr, startsegno, wal_segment_size); - XLogFileName(firstoff, ThisTimeLineID, startsegno, wal_segment_size); - XLByteToPrevSeg(endptr, endsegno, wal_segment_size); - XLogFileName(lastoff, ThisTimeLineID, endsegno, wal_segment_size); - - dir = AllocateDir("pg_wal"); - while ((de = ReadDir(dir, "pg_wal")) != NULL) - { - /* Does it look like a WAL segment, and is it in the range? */ - if (IsXLogFileName(de->d_name) && - strcmp(de->d_name + 8, firstoff + 8) >= 0 && - strcmp(de->d_name + 8, lastoff + 8) <= 0) - { - walFileList = lappend(walFileList, pstrdup(de->d_name)); - } - /* Does it look like a timeline history file? */ - else if (IsTLHistoryFileName(de->d_name)) - { - historyFileList = lappend(historyFileList, pstrdup(de->d_name)); - } - } - FreeDir(dir); - - /* - * Before we go any further, check that none of the WAL segments we - * need were removed. - */ - CheckXLogRemoved(startsegno, ThisTimeLineID); - - /* - * Sort the WAL filenames. We want to send the files in order from - * oldest to newest, to reduce the chance that a file is recycled - * before we get a chance to send it over. - */ - list_sort(walFileList, compareWalFileNames); - - /* - * There must be at least one xlog file in the pg_wal directory, since - * we are doing backup-including-xlog. - */ - if (walFileList == NIL) - ereport(ERROR, - (errmsg("could not find any WAL files"))); - - /* - * Sanity check: the first and last segment should cover startptr and - * endptr, with no gaps in between. - */ - XLogFromFileName((char *) linitial(walFileList), - &tli, &segno, wal_segment_size); - if (segno != startsegno) - { - char startfname[MAXFNAMELEN]; - - XLogFileName(startfname, ThisTimeLineID, startsegno, - wal_segment_size); - ereport(ERROR, - (errmsg("could not find WAL file \"%s\"", startfname))); - } - foreach(lc, walFileList) - { - char *walFileName = (char *) lfirst(lc); - XLogSegNo currsegno = segno; - XLogSegNo nextsegno = segno + 1; - - XLogFromFileName(walFileName, &tli, &segno, wal_segment_size); - if (!(nextsegno == segno || currsegno == segno)) - { - char nextfname[MAXFNAMELEN]; - - XLogFileName(nextfname, ThisTimeLineID, nextsegno, - wal_segment_size); - ereport(ERROR, - (errmsg("could not find WAL file \"%s\"", nextfname))); - } - } - if (segno != endsegno) - { - char endfname[MAXFNAMELEN]; - - XLogFileName(endfname, ThisTimeLineID, endsegno, wal_segment_size); - ereport(ERROR, - (errmsg("could not find WAL file \"%s\"", endfname))); - } - - /* Ok, we have everything we need. Send the WAL files. */ - foreach(lc, walFileList) - { - char *walFileName = (char *) lfirst(lc); - FILE *fp; - char buf[TAR_SEND_SIZE]; - size_t cnt; - pgoff_t len = 0; - - snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", walFileName); - XLogFromFileName(walFileName, &tli, &segno, wal_segment_size); - - fp = AllocateFile(pathbuf, "rb"); - if (fp == NULL) - { - int save_errno = errno; - - /* - * Most likely reason for this is that the file was already - * removed by a checkpoint, so check for that to get a better - * error message. - */ - CheckXLogRemoved(segno, tli); - - errno = save_errno; - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\": %m", pathbuf))); - } - - if (fstat(fileno(fp), &statbuf) != 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not stat file \"%s\": %m", - pathbuf))); - if (statbuf.st_size != wal_segment_size) - { - CheckXLogRemoved(segno, tli); - ereport(ERROR, - (errcode_for_file_access(), - errmsg("unexpected WAL file size \"%s\"", walFileName))); - } - - /* send the WAL file itself */ - _tarWriteHeader(pathbuf, NULL, &statbuf, false); - - while ((cnt = fread(buf, 1, - Min(sizeof(buf), wal_segment_size - len), - fp)) > 0) - { - CheckXLogRemoved(segno, tli); - /* Send the chunk as a CopyData message */ - if (pq_putmessage('d', buf, cnt)) - ereport(ERROR, - (errmsg("base backup could not send data, aborting backup"))); - - len += cnt; - throttle(cnt); - - if (len == wal_segment_size) - break; - } - - CHECK_FREAD_ERROR(fp, pathbuf); - - if (len != wal_segment_size) - { - CheckXLogRemoved(segno, tli); - ereport(ERROR, - (errcode_for_file_access(), - errmsg("unexpected WAL file size \"%s\"", walFileName))); - } - - /* wal_segment_size is a multiple of 512, so no need for padding */ - - FreeFile(fp); - - /* - * Mark file as archived, otherwise files can get archived again - * after promotion of a new node. This is in line with - * walreceiver.c always doing an XLogArchiveForceDone() after a - * complete segment. - */ - StatusFilePath(pathbuf, walFileName, ".done"); - sendFileWithContent(pathbuf, ""); - } - - /* - * Send timeline history files too. Only the latest timeline history - * file is required for recovery, and even that only if there happens - * to be a timeline switch in the first WAL segment that contains the - * checkpoint record, or if we're taking a base backup from a standby - * server and the target timeline changes while the backup is taken. - * But they are small and highly useful for debugging purposes, so - * better include them all, always. - */ - foreach(lc, historyFileList) - { - char *fname = lfirst(lc); - - snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", fname); - - if (lstat(pathbuf, &statbuf) != 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not stat file \"%s\": %m", pathbuf))); - - sendFile(pathbuf, pathbuf, &statbuf, false, InvalidOid); - - /* unconditionally mark file as archived */ - StatusFilePath(pathbuf, fname, ".done"); - sendFileWithContent(pathbuf, ""); - } + include_wal_files(endptr, endtli); /* Send CopyDone message for the last tar file */ pq_putemptymessage('c'); @@ -637,6 +435,24 @@ compareWalFileNames(const ListCell *a, const ListCell *b) return strcmp(fna + 8, fnb + 8); } +/* + * list_sort comparison function, to compare size attribute of pathinfo + * in descending order. + */ +static int +compareFileSize(const ListCell *a, const ListCell *b) +{ + pathinfo *fna = (pathinfo *) lfirst(a); + pathinfo *fnb = (pathinfo *) lfirst(b); + + if (fna->size > fnb->size) + return -1; + if (fna->size < fnb->size) + return 1; + return 0; + +} + /* * Parse the base backup options passed down by the parser */ @@ -652,8 +468,10 @@ parse_basebackup_options(List *options, basebackup_options *opt) bool o_maxrate = false; bool o_tablespace_map = false; bool o_noverify_checksums = false; + bool o_worker = false; MemSet(opt, 0, sizeof(*opt)); + opt->worker = -1; foreach(lopt, options) { DefElem *defel = (DefElem *) lfirst(lopt); @@ -740,6 +558,16 @@ parse_basebackup_options(List *options, basebackup_options *opt) noverify_checksums = true; o_noverify_checksums = true; } + else if (strcmp(defel->defname, "worker") == 0) + { + if (o_worker) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("duplicate option \"%s\"", defel->defname))); + + opt->worker = intVal(defel->arg); + o_worker = true; + } else elog(ERROR, "option \"%s\" not recognized", defel->defname); @@ -774,7 +602,26 @@ SendBaseBackup(BaseBackupCmd *cmd) set_ps_display(activitymsg, false); } - perform_base_backup(&opt); + switch (cmd->cmdtag) + { + case BASE_BACKUP: + perform_base_backup(&opt); + break; + case START_BACKUP: + StartBackup(&opt); + break; + case SEND_FILES_CONTENT: + SendFilesContents(&opt, cmd->backupfiles, true); + break; + case STOP_BACKUP: + StopBackup(&opt); + break; + + default: + elog(ERROR, "unrecognized replication command tag: %u", + cmd->cmdtag); + break; + } } static void @@ -968,7 +815,7 @@ sendFileWithContent(const char *filename, const char *content) * Only used to send auxiliary tablespaces, not PGDATA. */ int64 -sendTablespace(char *path, bool sizeonly) +sendTablespace(char *path, bool sizeonly, List **files) { int64 size; char pathbuf[MAXPGPATH]; @@ -997,11 +844,11 @@ sendTablespace(char *path, bool sizeonly) return 0; } + STORE_PATHINFO(files, pathbuf, true, -1); size = _tarWriteHeader(TABLESPACE_VERSION_DIRECTORY, NULL, &statbuf, sizeonly); - /* Send all the files in the tablespace version directory */ - size += sendDir(pathbuf, strlen(path), sizeonly, NIL, true); + size += sendDir_(pathbuf, strlen(path), sizeonly, NIL, true, files); return size; } @@ -1019,8 +866,16 @@ sendTablespace(char *path, bool sizeonly) * as it will be sent separately in the tablespace_map file. */ static int64 -sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, - bool sendtblspclinks) +sendDir(const char *path, int basepathlen, bool sizeonly, + List *tablespaces, bool sendtblspclinks) +{ + return sendDir_(path, basepathlen, sizeonly, tablespaces, sendtblspclinks, NULL); +} + +/* Same as sendDir(), except that it also returns a list of filenames in PGDATA */ +static int64 +sendDir_(const char *path, int basepathlen, bool sizeonly, List *tablespaces, + bool sendtblspclinks, List **files) { DIR *dir; struct dirent *de; @@ -1174,6 +1029,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, if (strcmp(de->d_name, excludeDirContents[excludeIdx]) == 0) { elog(DEBUG1, "contents of directory \"%s\" excluded from backup", de->d_name); + + STORE_PATHINFO(files, pathbuf, true, -1); size += _tarWriteDir(pathbuf, basepathlen, &statbuf, sizeonly); excludeFound = true; break; @@ -1190,6 +1047,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, if (statrelpath != NULL && strcmp(pathbuf, statrelpath) == 0) { elog(DEBUG1, "contents of directory \"%s\" excluded from backup", statrelpath); + + STORE_PATHINFO(files, pathbuf, true, -1); size += _tarWriteDir(pathbuf, basepathlen, &statbuf, sizeonly); continue; } @@ -1211,6 +1070,9 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, size += _tarWriteHeader("./pg_wal/archive_status", NULL, &statbuf, sizeonly); + STORE_PATHINFO(files, pathbuf, true, -1); + STORE_PATHINFO(files, "./pg_wal/archive_status", true, -1); + continue; /* don't recurse into pg_wal */ } @@ -1240,6 +1102,7 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, pathbuf))); linkpath[rllen] = '\0'; + STORE_PATHINFO(files, pathbuf, false, statbuf.st_size); size += _tarWriteHeader(pathbuf + basepathlen + 1, linkpath, &statbuf, sizeonly); #else @@ -1266,6 +1129,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, */ size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf, sizeonly); + STORE_PATHINFO(files, pathbuf, true, -1); + /* * Call ourselves recursively for a directory, unless it happens @@ -1296,13 +1161,15 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, skip_this_dir = true; if (!skip_this_dir) - size += sendDir(pathbuf, basepathlen, sizeonly, tablespaces, sendtblspclinks); + size += sendDir_(pathbuf, basepathlen, sizeonly, tablespaces, sendtblspclinks, files); } else if (S_ISREG(statbuf.st_mode)) { bool sent = false; - if (!sizeonly) + STORE_PATHINFO(files, pathbuf, false, statbuf.st_size); + + if (!sizeonly && files == NULL) sent = sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf, true, isDbDir ? pg_atoi(lastDir + 1, sizeof(Oid), 0) : InvalidOid); @@ -1743,3 +1610,710 @@ throttle(size_t increment) */ throttled_last = GetCurrentTimestamp(); } + +/* + * In parallel mode, pg_stop_backup() is not called, nor are the files sent + * right away. Upon receiving the BASE_BACKUP call, it sends out a list of + * files in $PGDATA. + */ +static void +StartBackup(basebackup_options *opt) +{ + TimeLineID starttli; + StringInfo labelfile; + StringInfo tblspc_map_file = NULL; + int datadirpathlen; + List *tablespaces = NIL; + + datadirpathlen = strlen(DataDir); + + backup_started_in_recovery = RecoveryInProgress(); + + labelfile = makeStringInfo(); + tblspc_map_file = makeStringInfo(); + + total_checksum_failures = 0; + + startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &starttli, + labelfile, &tablespaces, + tblspc_map_file, + opt->progress, opt->sendtblspcmapfile); + + /* + * Once do_pg_start_backup has been called, ensure that any failure causes + * us to abort the backup so we don't "leak" a backup counter. For this + * reason, *all* functionality between do_pg_start_backup() and the end of + * do_pg_stop_backup() should be inside the error cleanup block! + */ + + PG_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0); + { + tablespaceinfo *ti; + FILE *fp; + + SendXlogRecPtrResult(startptr, starttli); + + /* + * Calculate the relative path of temporary statistics directory in + * order to skip the files which are located in that directory later. + */ + if (is_absolute_path(pgstat_stat_directory) && + strncmp(pgstat_stat_directory, DataDir, datadirpathlen) == 0) + statrelpath = psprintf("./%s", pgstat_stat_directory + datadirpathlen + 1); + else if (strncmp(pgstat_stat_directory, "./", 2) != 0) + statrelpath = psprintf("./%s", pgstat_stat_directory); + else + statrelpath = pgstat_stat_directory; + + /* Add a node for the base directory at the end */ + ti = palloc0(sizeof(tablespaceinfo)); + ti->size = opt->progress ? sendDir(".", 1, true, tablespaces, true) : -1; + tablespaces = lappend(tablespaces, ti); + + /* Send tablespace header */ + SendBackupHeader(tablespaces); + + /* Setup and activate network throttling, if client requested it */ + setup_throttle(opt->maxrate); + + /* + * backup_label and tablespace_map are stored into temp files for + * their usage are a later stage i.e. during STOP_BACKUP or while + * transfering files to the client. + */ + fp = AllocateFile(BACKUP_LABEL_FILE_TMP, "w"); + if (!fp) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create file \"%s\": %m", + BACKUP_LABEL_FILE_TMP))); + if (fwrite(labelfile->data, labelfile->len, 1, fp) != 1 || + fflush(fp) != 0 || + pg_fsync(fileno(fp)) != 0 || + ferror(fp) || + FreeFile(fp)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write file \"%s\": %m", + BACKUP_LABEL_FILE_TMP))); + + if (opt->sendtblspcmapfile && tblspc_map_file->len > 0) + { + fp = AllocateFile(TABLESPACE_MAP_TMP, "w"); + if (!fp) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create file \"%s\": %m", + TABLESPACE_MAP_TMP))); + if (fwrite(tblspc_map_file->data, tblspc_map_file->len, 1, fp) != 1 || + fflush(fp) != 0 || + pg_fsync(fileno(fp)) != 0 || + ferror(fp) || + FreeFile(fp)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write file \"%s\": %m", + TABLESPACE_MAP_TMP))); + } + + /* send out the list of file in $PGDATA */ + SendBackupFileList(opt, tablespaces); + } + PG_END_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0); +} + +/* + * StopBackup() - ends a parallel backup + * + * The function is called in parallel mode. It ends a parallel backup session + * established by 'BASE_BACKUP PARALLEL' command. + */ +static void +StopBackup(basebackup_options *opt) +{ + TimeLineID endtli; + XLogRecPtr endptr; + struct stat statbuf; + StringInfoData buf; + char *labelfile; + + /* Setup and activate network throttling, if client requested it */ + setup_throttle(opt->maxrate); + + /* read backup_label file into buffer, we need it for do_pg_stop_backup */ + labelfile = readfile(BACKUP_LABEL_FILE_TMP, false); + + pq_beginmessage(&buf, 'H'); + pq_sendbyte(&buf, 0); /* overall format */ + pq_sendint16(&buf, 0); /* natts */ + pq_endmessage(&buf); + + /* ... and pg_control after everything else. */ + if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", + XLOG_CONTROL_FILE))); + sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false, InvalidOid); + + /* stop backup */ + endptr = do_pg_stop_backup(labelfile, !opt->nowait, &endtli); + + if (opt->includewal) + include_wal_files(endptr, endtli); + + pq_putemptymessage('c'); /* CopyDone */ + SendXlogRecPtrResult(endptr, endtli); + + unlink(BACKUP_LABEL_FILE_TMP); + unlink(TABLESPACE_MAP_TMP); +} + +/* + * SendBackupFileList() - sends a list of filenames of PGDATA + * + * The function collects a list of filenames, nessery for a full backup and sends + * this list to the client. + */ +static void +SendBackupFileList(basebackup_options *opt, List *tablespaces) +{ + StringInfoData buf; + ListCell *lc; + + foreach(lc, tablespaces) + { + List *filenames = NULL; + tablespaceinfo *ti = (tablespaceinfo *) lfirst(lc); + + if (ti->path == NULL) + sendDir_(".", 1, false, NIL, !opt->sendtblspcmapfile, &filenames); + else + sendTablespace(ti->path, false, &filenames); + + /* sort the files in desending order, based on file size */ + list_sort(filenames, compareFileSize); + + /* Construct and send the list of filenames */ + pq_beginmessage(&buf, 'T'); /* RowDescription */ + pq_sendint16(&buf, 3); /* 1 field */ + + /* First field - file path */ + pq_sendstring(&buf, "path"); + pq_sendint32(&buf, 0); + pq_sendint16(&buf, 0); + pq_sendint32(&buf, TEXTOID); + pq_sendint16(&buf, -1); + pq_sendint32(&buf, 0); + pq_sendint16(&buf, 0); + + /* Second field - is_dir */ + pq_sendstring(&buf, "isdir"); + pq_sendint32(&buf, 0); + pq_sendint16(&buf, 0); + pq_sendint32(&buf, BOOLOID); + pq_sendint16(&buf, 1); + pq_sendint32(&buf, 0); + pq_sendint16(&buf, 0); + + /* Third field - size */ + pq_sendstring(&buf, "size"); + pq_sendint32(&buf, 0); + pq_sendint16(&buf, 0); + pq_sendint32(&buf, INT8OID); + pq_sendint16(&buf, 8); + pq_sendint32(&buf, 0); + pq_sendint16(&buf, 0); + pq_endmessage(&buf); + + foreach(lc, filenames) + { + pathinfo *pi = (pathinfo *) lfirst(lc); + Size len; + + /* Send one datarow message */ + pq_beginmessage(&buf, 'D'); + pq_sendint16(&buf, 3); /* number of columns */ + + /* send file name */ + len = strlen(pi->path); + pq_sendint32(&buf, len); + pq_sendbytes(&buf, pi->path, len); + + /* send isdir */ + pq_sendint32(&buf, 1); + pq_sendbytes(&buf, pi->isdir ? "t" : "f", 1); + + /* send size */ + send_int8_string(&buf, pi->size); + + pq_endmessage(&buf); + } + + pfree(filenames); + } + + /* Send a CommandComplete message */ + pq_puttextmessage('C', "SELECT"); +} + +/* + * SendFilesContents() - sends the actual files to the caller + * + * The function sends the files over to the caller using the COPY protocol. + */ +static void +SendFilesContents(basebackup_options *opt, List *filenames, bool missing_ok) +{ + StringInfoData buf; + char *labelfile; + ListCell *lc; + char startxlogfilename[MAXFNAMELEN]; + bool basetablespace = true; + int basepathlen = 1; + char ch; + uint32 hi, + lo; + + if (list_length(filenames) <= 0) + return; + + total_checksum_failures = 0; + + /* Setup and activate network throttling, if client requested it */ + setup_throttle(opt->maxrate); + + /* + * LABEL is reused here to identify the tablespace path on server. Its empty + * in case of 'base' tablespace. + */ + if (is_absolute_path(opt->label)) + { + basepathlen = strlen(opt->label); + basetablespace = false; + } + + /* retrive the backup start location from backup_label file. */ + labelfile = readfile(BACKUP_LABEL_FILE_TMP, false); + if (sscanf(labelfile, "START WAL LOCATION: %X/%X (file %24s)%c", + &hi, &lo, startxlogfilename, + &ch) != 4 || ch != '\n') + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE_TMP))); + startptr = ((uint64) hi) << 32 | lo; + + /* Send CopyOutResponse message */ + pq_beginmessage(&buf, 'H'); + pq_sendbyte(&buf, 0); /* overall format */ + pq_sendint16(&buf, 0); /* natts */ + pq_endmessage(&buf); + + if (opt->worker == 0 && basetablespace) /* 'base' tablespace */ + { + /* Send BACKUP_LABEL_FILE file */ + sendFileWithContent(BACKUP_LABEL_FILE, labelfile); + + /* Send TABLESPACE_MAP file */ + if (opt->sendtblspcmapfile) + { + char *mapfile = readfile(TABLESPACE_MAP_TMP, true); + + if (mapfile) + { + sendFileWithContent(TABLESPACE_MAP, mapfile); + pfree(mapfile); + } + } + } + + foreach(lc, filenames) + { + struct stat statbuf; + char *pathbuf; + + pathbuf = (char *) strVal(lfirst(lc)); + if (lstat(pathbuf, &statbuf) != 0) + { + if (errno != ENOENT) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file or directory \"%s\": %m", + pathbuf))); + + /* If the file went away while scanning, it's not an error. */ + continue; + } + + /* Allow symbolic links in pg_tblspc only */ + if (strstr(pathbuf, "./pg_tblspc") != NULL && +#ifndef WIN32 + S_ISLNK(statbuf.st_mode) +#else + pgwin32_is_junction(pathbuf) +#endif + ) + { + char linkpath[MAXPGPATH]; + int rllen; + + rllen = readlink(pathbuf, linkpath, sizeof(linkpath)); + if (rllen < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read symbolic link \"%s\": %m", + pathbuf))); + if (rllen >= sizeof(linkpath)) + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("symbolic link \"%s\" target is too long", + pathbuf))); + linkpath[rllen] = '\0'; + + _tarWriteHeader(pathbuf, linkpath, &statbuf, false); + } + else if (S_ISDIR(statbuf.st_mode)) + { + _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf, false); + } + else if ( +#ifndef WIN32 + S_ISLNK(statbuf.st_mode) +#else + pgwin32_is_junction(pathbuf) +#endif + ) + { + /* + * If symlink, write it as a directory. file symlinks only allowed + * in pg_tblspc + */ + statbuf.st_mode = S_IFDIR | pg_dir_create_mode; + _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf, false); + } + else + { + /* send file to client */ + sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf, true, InvalidOid); + } + } + + pq_putemptymessage('c'); /* CopyDone */ + + /* + * Check for checksum failures. If there are failures across multiple + * processes it may not report totoal checksum count, but it will error + * out,terminating the backup. + */ + if (total_checksum_failures) + { + if (total_checksum_failures > 1) + ereport(WARNING, + (errmsg("%lld total checksum verification failures", total_checksum_failures))); + + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("checksum verification failure during base backup"))); + } +} + +static void +include_wal_files(XLogRecPtr endptr, TimeLineID endtli) +{ + /* + * We've left the last tar file "open", so we can now append the required + * WAL files to it. + */ + char pathbuf[MAXPGPATH]; + XLogSegNo segno; + XLogSegNo startsegno; + XLogSegNo endsegno; + struct stat statbuf; + List *historyFileList = NIL; + List *walFileList = NIL; + char firstoff[MAXFNAMELEN]; + char lastoff[MAXFNAMELEN]; + DIR *dir; + struct dirent *de; + ListCell *lc; + TimeLineID tli; + + /* + * I'd rather not worry about timelines here, so scan pg_wal and include + * all WAL files in the range between 'startptr' and 'endptr', regardless + * of the timeline the file is stamped with. If there are some spurious + * WAL files belonging to timelines that don't belong in this server's + * history, they will be included too. Normally there shouldn't be such + * files, but if there are, there's little harm in including them. + */ + XLByteToSeg(startptr, startsegno, wal_segment_size); + XLogFileName(firstoff, ThisTimeLineID, startsegno, wal_segment_size); + XLByteToPrevSeg(endptr, endsegno, wal_segment_size); + XLogFileName(lastoff, ThisTimeLineID, endsegno, wal_segment_size); + + dir = AllocateDir("pg_wal"); + while ((de = ReadDir(dir, "pg_wal")) != NULL) + { + /* Does it look like a WAL segment, and is it in the range? */ + if (IsXLogFileName(de->d_name) && + strcmp(de->d_name + 8, firstoff + 8) >= 0 && + strcmp(de->d_name + 8, lastoff + 8) <= 0) + { + walFileList = lappend(walFileList, pstrdup(de->d_name)); + } + /* Does it look like a timeline history file? */ + else if (IsTLHistoryFileName(de->d_name)) + { + historyFileList = lappend(historyFileList, pstrdup(de->d_name)); + } + } + FreeDir(dir); + + /* + * Before we go any further, check that none of the WAL segments we need + * were removed. + */ + CheckXLogRemoved(startsegno, ThisTimeLineID); + + /* + * Sort the WAL filenames. We want to send the files in order from oldest + * to newest, to reduce the chance that a file is recycled before we get a + * chance to send it over. + */ + list_sort(walFileList, compareWalFileNames); + + /* + * There must be at least one xlog file in the pg_wal directory, since we + * are doing backup-including-xlog. + */ + if (walFileList == NIL) + ereport(ERROR, + (errmsg("could not find any WAL files"))); + + /* + * Sanity check: the first and last segment should cover startptr and + * endptr, with no gaps in between. + */ + XLogFromFileName((char *) linitial(walFileList), + &tli, &segno, wal_segment_size); + if (segno != startsegno) + { + char startfname[MAXFNAMELEN]; + + XLogFileName(startfname, ThisTimeLineID, startsegno, + wal_segment_size); + ereport(ERROR, + (errmsg("could not find WAL file \"%s\"", startfname))); + } + foreach(lc, walFileList) + { + char *walFileName = (char *) lfirst(lc); + XLogSegNo currsegno = segno; + XLogSegNo nextsegno = segno + 1; + + XLogFromFileName(walFileName, &tli, &segno, wal_segment_size); + if (!(nextsegno == segno || currsegno == segno)) + { + char nextfname[MAXFNAMELEN]; + + XLogFileName(nextfname, ThisTimeLineID, nextsegno, + wal_segment_size); + ereport(ERROR, + (errmsg("could not find WAL file \"%s\"", nextfname))); + } + } + if (segno != endsegno) + { + char endfname[MAXFNAMELEN]; + + XLogFileName(endfname, ThisTimeLineID, endsegno, wal_segment_size); + ereport(ERROR, + (errmsg("could not find WAL file \"%s\"", endfname))); + } + + /* Ok, we have everything we need. Send the WAL files. */ + foreach(lc, walFileList) + { + char *walFileName = (char *) lfirst(lc); + FILE *fp; + char buf[TAR_SEND_SIZE]; + size_t cnt; + pgoff_t len = 0; + + snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", walFileName); + XLogFromFileName(walFileName, &tli, &segno, wal_segment_size); + + fp = AllocateFile(pathbuf, "rb"); + if (fp == NULL) + { + int save_errno = errno; + + /* + * Most likely reason for this is that the file was already + * removed by a checkpoint, so check for that to get a better + * error message. + */ + CheckXLogRemoved(segno, tli); + + errno = save_errno; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", pathbuf))); + } + + if (fstat(fileno(fp), &statbuf) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", + pathbuf))); + if (statbuf.st_size != wal_segment_size) + { + CheckXLogRemoved(segno, tli); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("unexpected WAL file size \"%s\"", walFileName))); + } + + /* send the WAL file itself */ + _tarWriteHeader(pathbuf, NULL, &statbuf, false); + + while ((cnt = fread(buf, 1, + Min(sizeof(buf), wal_segment_size - len), + fp)) > 0) + { + CheckXLogRemoved(segno, tli); + /* Send the chunk as a CopyData message */ + if (pq_putmessage('d', buf, cnt)) + ereport(ERROR, + (errmsg("base backup could not send data, aborting backup"))); + + len += cnt; + throttle(cnt); + + if (len == wal_segment_size) + break; + } + + if (len != wal_segment_size) + { + CheckXLogRemoved(segno, tli); + ereport(ERROR, + (errcode_for_file_access(), + errmsg("unexpected WAL file size \"%s\"", walFileName))); + } + + /* wal_segment_size is a multiple of 512, so no need for padding */ + + FreeFile(fp); + + /* + * Mark file as archived, otherwise files can get archived again after + * promotion of a new node. This is in line with walreceiver.c always + * doing an XLogArchiveForceDone() after a complete segment. + */ + StatusFilePath(pathbuf, walFileName, ".done"); + sendFileWithContent(pathbuf, ""); + } + + /* + * Send timeline history files too. Only the latest timeline history file + * is required for recovery, and even that only if there happens to be a + * timeline switch in the first WAL segment that contains the checkpoint + * record, or if we're taking a base backup from a standby server and the + * target timeline changes while the backup is taken. But they are small + * and highly useful for debugging purposes, so better include them all, + * always. + */ + foreach(lc, historyFileList) + { + char *fname = lfirst(lc); + + snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", fname); + + if (lstat(pathbuf, &statbuf) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", pathbuf))); + + sendFile(pathbuf, pathbuf, &statbuf, false, InvalidOid); + + /* unconditionally mark file as archived */ + StatusFilePath(pathbuf, fname, ".done"); + sendFileWithContent(pathbuf, ""); + } +} + +/* + * Setup and activate network throttling, if client requested it + */ +static void +setup_throttle(int maxrate) +{ + /* Setup and activate network throttling, if client requested it */ + if (maxrate > 0) + { + throttling_sample = + (int64) maxrate * (int64) 1024 / THROTTLING_FREQUENCY; + + /* + * The minimum amount of time for throttling_sample bytes to be + * transferred. + */ + elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY; + + /* Enable throttling. */ + throttling_counter = 0; + + /* The 'real data' starts now (header was ignored). */ + throttled_last = GetCurrentTimestamp(); + } + else + { + /* Disable throttling. */ + throttling_counter = -1; + } +} + +static char * +readfile(const char *readfilename, bool missing_ok) +{ + struct stat statbuf; + FILE *fp; + char *data; + int r; + + if (stat(readfilename, &statbuf)) + { + if (errno == ENOENT && missing_ok) + return NULL; + + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", + readfilename))); + } + + fp = AllocateFile(readfilename, "r"); + if (!fp) + { + if (errno == ENOENT && missing_ok) + return NULL; + + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", readfilename))); + } + + data = palloc(statbuf.st_size + 1); + r = fread(data, statbuf.st_size, 1, fp); + data[statbuf.st_size] = '\0'; + + /* Close the file */ + if (r != 1 || ferror(fp) || FreeFile(fp)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", + readfilename))); + + return data; +} diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index c4e11cc4e8..88e384bf3c 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -87,6 +87,10 @@ static SQLCmd *make_sqlcmd(void); %token K_EXPORT_SNAPSHOT %token K_NOEXPORT_SNAPSHOT %token K_USE_SNAPSHOT +%token K_START_BACKUP +%token K_SEND_FILES_CONTENT +%token K_STOP_BACKUP +%token K_WORKER %type command %type base_backup start_replication start_logical_replication @@ -102,6 +106,8 @@ static SQLCmd *make_sqlcmd(void); %type opt_temporary %type create_slot_opt_list %type create_slot_opt +%type backup_files backup_files_list +%type backup_file %% @@ -162,6 +168,29 @@ base_backup: { BaseBackupCmd *cmd = makeNode(BaseBackupCmd); cmd->options = $2; + cmd->cmdtag = BASE_BACKUP; + $$ = (Node *) cmd; + } + | K_START_BACKUP base_backup_opt_list + { + BaseBackupCmd *cmd = makeNode(BaseBackupCmd); + cmd->options = $2; + cmd->cmdtag = START_BACKUP; + $$ = (Node *) cmd; + } + | K_SEND_FILES_CONTENT backup_files base_backup_opt_list + { + BaseBackupCmd *cmd = makeNode(BaseBackupCmd); + cmd->options = $3; + cmd->cmdtag = SEND_FILES_CONTENT; + cmd->backupfiles = $2; + $$ = (Node *) cmd; + } + | K_STOP_BACKUP base_backup_opt_list + { + BaseBackupCmd *cmd = makeNode(BaseBackupCmd); + cmd->options = $2; + cmd->cmdtag = STOP_BACKUP; $$ = (Node *) cmd; } ; @@ -214,6 +243,35 @@ base_backup_opt: $$ = makeDefElem("noverify_checksums", (Node *)makeInteger(true), -1); } + | K_WORKER UCONST + { + $$ = makeDefElem("worker", + (Node *)makeInteger($2), -1); + } + ; + +backup_files: + '(' backup_files_list ')' + { + $$ = $2; + } + | /* EMPTY */ + { $$ = NIL; } + ; + +backup_files_list: + backup_file + { + $$ = list_make1($1); + } + | backup_files_list ',' backup_file + { + $$ = lappend($1, $3); + } + ; + +backup_file: + SCONST { $$ = (Node *) makeString($1); } ; create_replication_slot: diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index 380faeb5f6..4836828c39 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -107,6 +107,11 @@ EXPORT_SNAPSHOT { return K_EXPORT_SNAPSHOT; } NOEXPORT_SNAPSHOT { return K_NOEXPORT_SNAPSHOT; } USE_SNAPSHOT { return K_USE_SNAPSHOT; } WAIT { return K_WAIT; } +START_BACKUP { return K_START_BACKUP; } +SEND_FILES_CONTENT { return K_SEND_FILES_CONTENT; } +STOP_BACKUP { return K_STOP_BACKUP; } +WORKER { return K_WORKER; } + "," { return ','; } ";" { return ';'; } diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 55ef13926d..5139dcbe03 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -41,6 +41,7 @@ #include "receivelog.h" #include "replication/basebackup.h" #include "streamutil.h" +#include "fe_utils/simple_list.h" #define ERRCODE_DATA_CORRUPTED "XX001" @@ -57,6 +58,15 @@ typedef struct TablespaceList TablespaceListCell *tail; } TablespaceList; +typedef struct WorkerFiles +{ + int num_files; + char *tspath; + SimpleStringList *worker_files; + +} WorkerFiles; + + /* * pg_xlog has been renamed to pg_wal in version 10. This version number * should be compared with PQserverVersion(). @@ -110,6 +120,10 @@ static bool found_existing_xlogdir = false; static bool made_tablespace_dirs = false; static bool found_tablespace_dirs = false; +static int numWorkers = 1; +static PGresult *tablespacehdr; +static SimpleOidList workerspid = {NULL, NULL}; + /* Progress counters */ static uint64 totalsize_kb; static uint64 totaldone; @@ -141,7 +155,7 @@ static void usage(void); static void verify_dir_is_empty_or_create(char *dirname, bool *created, bool *found); static void progress_report(int tablespacenum, const char *filename, bool force); -static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum); +static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum, int worker); static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum); static void BaseBackup(void); @@ -151,6 +165,10 @@ static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline, static const char *get_tablespace_mapping(const char *dir); static void tablespace_list_append(const char *arg); +static void ParallelBackupEnd(void); +static int ReceiveFiles(WorkerFiles * workerFiles, int worker); +static void create_workers_and_fetch(WorkerFiles * workerFiles); +static int simple_list_length(SimpleStringList *list); static void cleanup_directories_atexit(void) @@ -349,6 +367,7 @@ usage(void) printf(_(" --no-slot prevent creation of temporary replication slot\n")); printf(_(" --no-verify-checksums\n" " do not verify checksums\n")); + printf(_(" -j, --jobs=NUM use this many parallel jobs to backup\n")); printf(_(" -?, --help show this help, then exit\n")); printf(_("\nConnection options:\n")); printf(_(" -d, --dbname=CONNSTR connection string\n")); @@ -921,7 +940,7 @@ writeTarData( * No attempt to inspect or validate the contents of the file is done. */ static void -ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) +ReceiveTarFile(PGconn *conn, PGresult *res, int rownum, int worker) { char filename[MAXPGPATH]; char *copybuf = NULL; @@ -978,7 +997,10 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) #ifdef HAVE_LIBZ if (compresslevel != 0) { - snprintf(filename, sizeof(filename), "%s/base.tar.gz", basedir); + if (numWorkers > 1) + snprintf(filename, sizeof(filename), "%s/base.%d.tar.gz", basedir, worker); + else + snprintf(filename, sizeof(filename), "%s/base.tar.gz", basedir); ztarfile = gzopen(filename, "wb"); if (gzsetparams(ztarfile, compresslevel, Z_DEFAULT_STRATEGY) != Z_OK) @@ -991,7 +1013,10 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) else #endif { - snprintf(filename, sizeof(filename), "%s/base.tar", basedir); + if (numWorkers > 1) + snprintf(filename, sizeof(filename), "%s/base.%d.tar", basedir, worker); + else + snprintf(filename, sizeof(filename), "%s/base.tar", basedir); tarfile = fopen(filename, "wb"); } } @@ -1004,8 +1029,12 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) #ifdef HAVE_LIBZ if (compresslevel != 0) { - snprintf(filename, sizeof(filename), "%s/%s.tar.gz", basedir, - PQgetvalue(res, rownum, 0)); + if (numWorkers > 1) + snprintf(filename, sizeof(filename), "%s/%s.%d.tar.gz", basedir, + PQgetvalue(res, rownum, 0), worker); + else + snprintf(filename, sizeof(filename), "%s/%s.tar.gz", basedir, + PQgetvalue(res, rownum, 0)); ztarfile = gzopen(filename, "wb"); if (gzsetparams(ztarfile, compresslevel, Z_DEFAULT_STRATEGY) != Z_OK) @@ -1018,8 +1047,12 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) else #endif { - snprintf(filename, sizeof(filename), "%s/%s.tar", basedir, - PQgetvalue(res, rownum, 0)); + if (numWorkers > 1) + snprintf(filename, sizeof(filename), "%s/%s.%d.tar", basedir, + PQgetvalue(res, rownum, 0), worker); + else + snprintf(filename, sizeof(filename), "%s/%s.tar", basedir, + PQgetvalue(res, rownum, 0)); tarfile = fopen(filename, "wb"); } } @@ -1475,6 +1508,7 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) */ snprintf(filename, sizeof(filename), "%s/%s", current_path, copybuf); + if (filename[strlen(filename) - 1] == '/') { /* @@ -1486,21 +1520,14 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) * Directory */ filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */ + + /* + * In parallel mode, we create directories before fetching + * files so its Ok if a directory already exist. + */ if (mkdir(filename, pg_dir_create_mode) != 0) { - /* - * When streaming WAL, pg_wal (or pg_xlog for pre-9.6 - * clusters) will have been created by the wal - * receiver process. Also, when the WAL directory - * location was specified, pg_wal (or pg_xlog) has - * already been created as a symbolic link before - * starting the actual backup. So just ignore creation - * failures on related directories. - */ - if (!((pg_str_endswith(filename, "/pg_wal") || - pg_str_endswith(filename, "/pg_xlog") || - pg_str_endswith(filename, "/archive_status")) && - errno == EEXIST)) + if (errno != EEXIST) { pg_log_error("could not create directory \"%s\": %m", filename); @@ -1528,8 +1555,8 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) * can map them too.) */ filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */ - mapped_tblspc_path = get_tablespace_mapping(©buf[157]); + if (symlink(mapped_tblspc_path, filename) != 0) { pg_log_error("could not create symbolic link from \"%s\" to \"%s\": %m", @@ -1716,7 +1743,8 @@ BaseBackup(void) } basebkp = - psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s", + psprintf("%s LABEL '%s' %s %s %s %s %s %s %s", + (numWorkers > 1) ? "START_BACKUP" : "BASE_BACKUP", escaped_label, showprogress ? "PROGRESS" : "", includewal == FETCH_WAL ? "WAL" : "", @@ -1830,20 +1858,102 @@ BaseBackup(void) StartLogStreamer(xlogstart, starttli, sysidentifier); } - /* - * Start receiving chunks - */ - for (i = 0; i < PQntuples(res); i++) + if (numWorkers > 1) { - if (format == 't') - ReceiveTarFile(conn, res, i); - else - ReceiveAndUnpackTarFile(conn, res, i); - } /* Loop over all tablespaces */ + WorkerFiles *workerFiles = palloc0(sizeof(WorkerFiles) * tablespacecount); + + tablespacehdr = res; + + for (i = 0; i < tablespacecount; i++) + { + bool basetablespace; + + workerFiles[i].worker_files = palloc0(sizeof(SimpleStringList) * numWorkers); + + /* + * Get the header + */ + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not get backup header: %s", + PQerrorMessage(conn)); + exit(1); + } + if (PQntuples(res) < 1) + { + pg_log_error("no data returned from server"); + exit(1); + } + + basetablespace = PQgetisnull(tablespacehdr, i, 0); + workerFiles[i].tspath = PQgetvalue(tablespacehdr, i, 1); + workerFiles[i].num_files = 0; + + for (int j = 0; j < PQntuples(res); j++) + { + const char *path = PQgetvalue(res, j, 0); + bool isdir = PQgetvalue(res, j, 1)[0] == 't'; + + if (format == 'p' && isdir) + { + char dirpath[MAXPGPATH]; + + if (basetablespace) + snprintf(dirpath, sizeof(dirpath), "%s/%s", basedir, path); + else + { + const char *tspath = PQgetvalue(tablespacehdr, i, 1); + + snprintf(dirpath, sizeof(dirpath), "%s/%s", + get_tablespace_mapping(tspath), (path + strlen(tspath) + 1)); + } + + if (pg_mkdir_p(dirpath, pg_dir_create_mode) != 0) + { + if (errno != EEXIST) + { + pg_log_error("could not create directory \"%s\": %m", + dirpath); + exit(1); + } + } + } + + workerFiles[i].num_files++; + simple_string_list_append(&workerFiles[i].worker_files[j % numWorkers], path); + } + PQclear(res); + } + + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + pg_log_error("could not get data: %s", PQerrorMessage(conn)); + exit(1); + } + + res = PQgetResult(conn); + create_workers_and_fetch(workerFiles); + ParallelBackupEnd(); + } + else + { + /* + * Start receiving chunks + */ + for (i = 0; i < PQntuples(res); i++) + { + if (format == 't') + ReceiveTarFile(conn, res, i, 0); + else + ReceiveAndUnpackTarFile(conn, res, i); + } /* Loop over all tablespaces */ + } if (showprogress) { - progress_report(PQntuples(res), NULL, true); + progress_report(PQntuples(tablespacehdr), NULL, true); if (isatty(fileno(stderr))) fprintf(stderr, "\n"); /* Need to move to next line */ } @@ -2043,6 +2153,7 @@ main(int argc, char **argv) {"waldir", required_argument, NULL, 1}, {"no-slot", no_argument, NULL, 2}, {"no-verify-checksums", no_argument, NULL, 3}, + {"jobs", required_argument, NULL, 'j'}, {NULL, 0, NULL, 0} }; int c; @@ -2070,7 +2181,7 @@ main(int argc, char **argv) atexit(cleanup_directories_atexit); - while ((c = getopt_long(argc, argv, "CD:F:r:RS:T:X:l:nNzZ:d:c:h:p:U:s:wWkvP", + while ((c = getopt_long(argc, argv, "CD:F:r:RS:T:X:l:nNzZ:d:c:h:p:U:s:wWkvPj:", long_options, &option_index)) != -1) { switch (c) @@ -2211,6 +2322,9 @@ main(int argc, char **argv) case 3: verify_checksums = false; break; + case 'j': /* number of jobs */ + numWorkers = atoi(optarg); + break; default: /* @@ -2325,6 +2439,14 @@ main(int argc, char **argv) } } + if (numWorkers <= 0) + { + pg_log_error("invalid number of parallel jobs"); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + #ifndef HAVE_LIBZ if (compresslevel != 0) { @@ -2397,3 +2519,173 @@ main(int argc, char **argv) success = true; return 0; } + +static void +ParallelBackupEnd(void) +{ + PGresult *res = NULL; + char *basebkp; + + basebkp = psprintf("STOP_BACKUP %s %s", + includewal == FETCH_WAL ? "WAL" : "", + includewal == NO_WAL ? "" : "NOWAIT"); + if (PQsendQuery(conn, basebkp) == 0) + { + pg_log_error("could not execute STOP BACKUP \"%s\"", + PQerrorMessage(conn)); + exit(1); + } + + /* receive pg_control and wal files */ + if (format == 't') + ReceiveTarFile(conn, res, tablespacecount, numWorkers); + else + ReceiveAndUnpackTarFile(conn, res, tablespacecount); + + PQclear(res); +} + +static int +ReceiveFiles(WorkerFiles * workerFiles, int worker) +{ + SimpleStringListCell *cell; + PGresult *res = NULL; + PGconn *worker_conn; + int i; + + worker_conn = GetConnection(); + for (i = 0; i < tablespacecount; i++) + { + SimpleStringList *files = &workerFiles[i].worker_files[worker]; + PQExpBuffer buf = createPQExpBuffer(); + + if (simple_list_length(files) <= 0) + continue; + + + /* + * build query in form of: SEND_FILES_CONTENT ('base/1/1245/32683', + * 'base/1/1245/32683', ...) [options] + */ + appendPQExpBuffer(buf, "SEND_FILES_CONTENT ("); + for (cell = files->head; cell; cell = cell->next) + { + if (cell != files->tail) + appendPQExpBuffer(buf, "'%s' ,", cell->val); + else + appendPQExpBuffer(buf, "'%s'", cell->val); + } + appendPQExpBufferStr(buf, " )"); + + /* + * Add backup options to the command. we are reusing the LABEL here to + * keep the original tablespace path on the server. + */ + appendPQExpBuffer(buf, " LABEL '%s' WORKER %u %s %s", + workerFiles[i].tspath, + worker, + format == 't' ? "TABLESPACE_MAP" : "", + verify_checksums ? "" : "NOVERIFY_CHECKSUMS"); + if (maxrate > 0) + appendPQExpBuffer(buf, " MAX_RATE %u", maxrate); + + if (!worker_conn) + return 1; + + if (PQsendQuery(worker_conn, buf->data) == 0) + { + pg_log_error("could not send files list \"%s\"", + PQerrorMessage(worker_conn)); + return 1; + } + destroyPQExpBuffer(buf); + + if (format == 't') + ReceiveTarFile(worker_conn, tablespacehdr, i, worker); + else + ReceiveAndUnpackTarFile(worker_conn, tablespacehdr, i); + + res = PQgetResult(worker_conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + pg_log_error("could not get data stream: %s", + PQerrorMessage(worker_conn)); + exit(1); + } + + res = PQgetResult(worker_conn); + } + + PQclear(res); + PQfinish(worker_conn); + + return 0; +} + +static void +create_workers_and_fetch(WorkerFiles * workerFiles) +{ + int status; + int pid, + i; + + for (i = 0; i < numWorkers; i++) + { + pid = fork(); + if (pid == 0) + { + /* in child process */ + _exit(ReceiveFiles(workerFiles, i)); + } + else if (pid < 0) + { + pg_log_error("could not create backup worker: %m"); + exit(1); + } + + simple_oid_list_append(&workerspid, pid); + if (verbose) + pg_log_info("backup worker (%d) created", pid); + + /* + * Else we are in the parent process and all is well. + */ + } + + for (i = 0; i < numWorkers; i++) + { + pid = waitpid(-1, &status, 0); + + if (WIFEXITED(status) && WEXITSTATUS(status) == EXIT_FAILURE) + { + SimpleOidListCell *cell; + + pg_log_error("backup worker (%d) failed with code %d", pid, WEXITSTATUS(status)); + + /* error. kill other workers and exit. */ + for (cell = workerspid.head; cell; cell = cell->next) + { + if (pid != cell->val) + { + kill(cell->val, SIGTERM); + pg_log_error("backup worker killed %d", cell->val); + } + } + + exit(1); + } + } +} + + +static int +simple_list_length(SimpleStringList *list) +{ + int len = 0; + SimpleStringListCell *cell; + + for (cell = list->head; cell; cell = cell->next, len++) + ; + + return len; +} diff --git a/src/bin/pg_basebackup/t/040_pg_basebackup_parallel.pl b/src/bin/pg_basebackup/t/040_pg_basebackup_parallel.pl new file mode 100644 index 0000000000..6c31214f3d --- /dev/null +++ b/src/bin/pg_basebackup/t/040_pg_basebackup_parallel.pl @@ -0,0 +1,571 @@ +use strict; +use warnings; +use Cwd; +use Config; +use File::Basename qw(basename dirname); +use File::Path qw(rmtree); +use PostgresNode; +use TestLib; +use Test::More tests => 106; + +program_help_ok('pg_basebackup'); +program_version_ok('pg_basebackup'); +program_options_handling_ok('pg_basebackup'); + +my $tempdir = TestLib::tempdir; + +my $node = get_new_node('main'); + +# Set umask so test directories and files are created with default permissions +umask(0077); + +# Initialize node without replication settings +$node->init(extra => ['--data-checksums']); +$node->start; +my $pgdata = $node->data_dir; + +$node->command_fails(['pg_basebackup'], + 'pg_basebackup needs target directory specified'); + +# Some Windows ANSI code pages may reject this filename, in which case we +# quietly proceed without this bit of test coverage. +if (open my $badchars, '>>', "$tempdir/pgdata/FOO\xe0\xe0\xe0BAR") +{ + print $badchars "test backup of file with non-UTF8 name\n"; + close $badchars; +} + +$node->set_replication_conf(); +system_or_bail 'pg_ctl', '-D', $pgdata, 'reload'; + +$node->command_fails( + [ 'pg_basebackup', '-D', "$tempdir/backup" ], + 'pg_basebackup fails because of WAL configuration'); + +ok(!-d "$tempdir/backup", 'backup directory was cleaned up'); + +# Create a backup directory that is not empty so the next command will fail +# but leave the data directory behind +mkdir("$tempdir/backup") + or BAIL_OUT("unable to create $tempdir/backup"); +append_to_file("$tempdir/backup/dir-not-empty.txt", "Some data"); + +$node->command_fails([ 'pg_basebackup', '-D', "$tempdir/backup", '-n' ], + 'failing run with no-clean option'); + +ok(-d "$tempdir/backup", 'backup directory was created and left behind'); +rmtree("$tempdir/backup"); + +open my $conf, '>>', "$pgdata/postgresql.conf"; +print $conf "max_replication_slots = 10\n"; +print $conf "max_wal_senders = 10\n"; +print $conf "wal_level = replica\n"; +close $conf; +$node->restart; + +# Write some files to test that they are not copied. +foreach my $filename ( + qw(backup_label tablespace_map postgresql.auto.conf.tmp current_logfiles.tmp) + ) +{ + open my $file, '>>', "$pgdata/$filename"; + print $file "DONOTCOPY"; + close $file; +} + +# Connect to a database to create global/pg_internal.init. If this is removed +# the test to ensure global/pg_internal.init is not copied will return a false +# positive. +$node->safe_psql('postgres', 'SELECT 1;'); + +# Create an unlogged table to test that forks other than init are not copied. +$node->safe_psql('postgres', 'CREATE UNLOGGED TABLE base_unlogged (id int)'); + +my $baseUnloggedPath = $node->safe_psql('postgres', + q{select pg_relation_filepath('base_unlogged')}); + +# Make sure main and init forks exist +ok(-f "$pgdata/${baseUnloggedPath}_init", 'unlogged init fork in base'); +ok(-f "$pgdata/$baseUnloggedPath", 'unlogged main fork in base'); + +# Create files that look like temporary relations to ensure they are ignored. +my $postgresOid = $node->safe_psql('postgres', + q{select oid from pg_database where datname = 'postgres'}); + +my @tempRelationFiles = + qw(t999_999 t9999_999.1 t999_9999_vm t99999_99999_vm.1); + +foreach my $filename (@tempRelationFiles) +{ + append_to_file("$pgdata/base/$postgresOid/$filename", 'TEMP_RELATION'); +} + +# Run base backup in parallel mode. +$node->command_ok([ 'pg_basebackup', '-D', "$tempdir/backup", '-X', 'none', "-j 4" ], + 'pg_basebackup runs'); +ok(-f "$tempdir/backup/PG_VERSION", 'backup was created'); + +# Permissions on backup should be default +SKIP: +{ + skip "unix-style permissions not supported on Windows", 1 + if ($windows_os); + + ok(check_mode_recursive("$tempdir/backup", 0700, 0600), + "check backup dir permissions"); +} + +# Only archive_status directory should be copied in pg_wal/. +is_deeply( + [ sort(slurp_dir("$tempdir/backup/pg_wal/")) ], + [ sort qw(. .. archive_status) ], + 'no WAL files copied'); + +# Contents of these directories should not be copied. +foreach my $dirname ( + qw(pg_dynshmem pg_notify pg_replslot pg_serial pg_snapshots pg_stat_tmp pg_subtrans) + ) +{ + is_deeply( + [ sort(slurp_dir("$tempdir/backup/$dirname/")) ], + [ sort qw(. ..) ], + "contents of $dirname/ not copied"); +} + +# These files should not be copied. +foreach my $filename ( + qw(postgresql.auto.conf.tmp postmaster.opts postmaster.pid tablespace_map current_logfiles.tmp + global/pg_internal.init)) +{ + ok(!-f "$tempdir/backup/$filename", "$filename not copied"); +} + +# Unlogged relation forks other than init should not be copied +ok(-f "$tempdir/backup/${baseUnloggedPath}_init", + 'unlogged init fork in backup'); +ok( !-f "$tempdir/backup/$baseUnloggedPath", + 'unlogged main fork not in backup'); + +# Temp relations should not be copied. +foreach my $filename (@tempRelationFiles) +{ + ok( !-f "$tempdir/backup/base/$postgresOid/$filename", + "base/$postgresOid/$filename not copied"); +} + +# Make sure existing backup_label was ignored. +isnt(slurp_file("$tempdir/backup/backup_label"), + 'DONOTCOPY', 'existing backup_label not copied'); +rmtree("$tempdir/backup"); + +$node->command_ok( + [ + 'pg_basebackup', '-D', "$tempdir/backup2", '--waldir', + "$tempdir/xlog2", "-j 4" + ], + 'separate xlog directory'); +ok(-f "$tempdir/backup2/PG_VERSION", 'backup was created'); +ok(-d "$tempdir/xlog2/", 'xlog directory was created'); +rmtree("$tempdir/backup2"); +rmtree("$tempdir/xlog2"); + +$node->command_ok([ 'pg_basebackup', '-D', "$tempdir/tarbackup", '-Ft', "-j 4"], + 'tar format'); +foreach my $filename ( + qw(base.0.tar base.1.tar base.2.tar base.3.tar)) +{ + ok(!-f "$tempdir/backup/$filename", "backup $filename tar created"); +} + +rmtree("$tempdir/tarbackup"); + +$node->command_fails( + [ 'pg_basebackup', '-D', "$tempdir/backup_foo", '-Fp', "-j 4", "-T=/foo" ], + '-T with empty old directory fails'); +$node->command_fails( + [ 'pg_basebackup', '-D', "$tempdir/backup_foo", '-Fp', "-j 4", "-T/foo=" ], + '-T with empty new directory fails'); +$node->command_fails( + [ + 'pg_basebackup', '-D', "$tempdir/backup_foo", '-Fp', "-j 4", + "-T/foo=/bar=/baz" + ], + '-T with multiple = fails'); +$node->command_fails( + [ 'pg_basebackup', '-D', "$tempdir/backup_foo", '-Fp', "-j 4", "-Tfoo=/bar" ], + '-T with old directory not absolute fails'); +$node->command_fails( + [ 'pg_basebackup', '-D', "$tempdir/backup_foo", '-Fp', "-j 4", "-T/foo=bar" ], + '-T with new directory not absolute fails'); +$node->command_fails( + [ 'pg_basebackup', '-D', "$tempdir/backup_foo", '-Fp', "-j 4", "-Tfoo" ], + '-T with invalid format fails'); + +# Tar format doesn't support filenames longer than 100 bytes. +my $superlongname = "superlongname_" . ("x" x 100); +my $superlongpath = "$pgdata/$superlongname"; + +open my $file, '>', "$superlongpath" + or die "unable to create file $superlongpath"; +close $file; +$node->command_fails( + [ 'pg_basebackup', '-D', "$tempdir/tarbackup_l1", '-Ft', "-j 4" ], + 'pg_basebackup tar with long name fails'); +unlink "$pgdata/$superlongname"; + + +# The following tests test symlinks. Windows doesn't have symlinks, so +# skip on Windows. +SKIP: +{ + skip "symlinks not supported on Windows", 18 if ($windows_os); + + # Move pg_replslot out of $pgdata and create a symlink to it. + $node->stop; + + # Set umask so test directories and files are created with group permissions + umask(0027); + + # Enable group permissions on PGDATA + chmod_recursive("$pgdata", 0750, 0640); + + rename("$pgdata/pg_replslot", "$tempdir/pg_replslot") + or BAIL_OUT "could not move $pgdata/pg_replslot"; + symlink("$tempdir/pg_replslot", "$pgdata/pg_replslot") + or BAIL_OUT "could not symlink to $pgdata/pg_replslot"; + + $node->start; + + # Create a temporary directory in the system location and symlink it + # to our physical temp location. That way we can use shorter names + # for the tablespace directories, which hopefully won't run afoul of + # the 99 character length limit. + my $shorter_tempdir = TestLib::tempdir_short . "/tempdir"; + symlink "$tempdir", $shorter_tempdir; + + mkdir "$tempdir/tblspc1"; + $node->safe_psql('postgres', + "CREATE TABLESPACE tblspc1 LOCATION '$shorter_tempdir/tblspc1';"); + $node->safe_psql('postgres', + "CREATE TABLE test1 (a int) TABLESPACE tblspc1;"); + $node->command_ok([ 'pg_basebackup', '-D', "$tempdir/tarbackup2", '-Ft', "-j 4" ], + 'tar format with tablespaces'); + ok(-f "$tempdir/tarbackup2/base.0.tar", 'backup tar was created'); + my @tblspc_tars = glob "$tempdir/tarbackup2/[0-9]*.tar"; + is(scalar(@tblspc_tars), 3, 'one tablespace tar was created'); + rmtree("$tempdir/tarbackup2"); + + # Create an unlogged table to test that forks other than init are not copied. + $node->safe_psql('postgres', + 'CREATE UNLOGGED TABLE tblspc1_unlogged (id int) TABLESPACE tblspc1;' + ); + + my $tblspc1UnloggedPath = $node->safe_psql('postgres', + q{select pg_relation_filepath('tblspc1_unlogged')}); + + # Make sure main and init forks exist + ok( -f "$pgdata/${tblspc1UnloggedPath}_init", + 'unlogged init fork in tablespace'); + ok(-f "$pgdata/$tblspc1UnloggedPath", 'unlogged main fork in tablespace'); + + # Create files that look like temporary relations to ensure they are ignored + # in a tablespace. + my @tempRelationFiles = qw(t888_888 t888888_888888_vm.1); + my $tblSpc1Id = basename( + dirname( + dirname( + $node->safe_psql( + 'postgres', q{select pg_relation_filepath('test1')})))); + + foreach my $filename (@tempRelationFiles) + { + append_to_file( + "$shorter_tempdir/tblspc1/$tblSpc1Id/$postgresOid/$filename", + 'TEMP_RELATION'); + } + + $node->command_fails( + [ 'pg_basebackup', '-D', "$tempdir/backup1", '-Fp', "-j 4" ], + 'plain format with tablespaces fails without tablespace mapping'); + + $node->command_ok( + [ + 'pg_basebackup', '-D', "$tempdir/backup1", '-Fp', "-j 4", + "-T$shorter_tempdir/tblspc1=$tempdir/tbackup/tblspc1" + ], + 'plain format with tablespaces succeeds with tablespace mapping'); + ok(-d "$tempdir/tbackup/tblspc1", 'tablespace was relocated'); + opendir(my $dh, "$pgdata/pg_tblspc") or die; + ok( ( grep { + -l "$tempdir/backup1/pg_tblspc/$_" + and readlink "$tempdir/backup1/pg_tblspc/$_" eq + "$tempdir/tbackup/tblspc1" + } readdir($dh)), + "tablespace symlink was updated"); + closedir $dh; + + # Group access should be enabled on all backup files + ok(check_mode_recursive("$tempdir/backup1", 0750, 0640), + "check backup dir permissions"); + + # Unlogged relation forks other than init should not be copied + my ($tblspc1UnloggedBackupPath) = + $tblspc1UnloggedPath =~ /[^\/]*\/[^\/]*\/[^\/]*$/g; + + ok(-f "$tempdir/tbackup/tblspc1/${tblspc1UnloggedBackupPath}_init", + 'unlogged init fork in tablespace backup'); + ok(!-f "$tempdir/tbackup/tblspc1/$tblspc1UnloggedBackupPath", + 'unlogged main fork not in tablespace backup'); + + # Temp relations should not be copied. + foreach my $filename (@tempRelationFiles) + { + ok( !-f "$tempdir/tbackup/tblspc1/$tblSpc1Id/$postgresOid/$filename", + "[tblspc1]/$postgresOid/$filename not copied"); + + # Also remove temp relation files or tablespace drop will fail. + my $filepath = + "$shorter_tempdir/tblspc1/$tblSpc1Id/$postgresOid/$filename"; + + unlink($filepath) + or BAIL_OUT("unable to unlink $filepath"); + } + + ok( -d "$tempdir/backup1/pg_replslot", + 'pg_replslot symlink copied as directory'); + rmtree("$tempdir/backup1"); + + mkdir "$tempdir/tbl=spc2"; + $node->safe_psql('postgres', "DROP TABLE test1;"); + $node->safe_psql('postgres', "DROP TABLE tblspc1_unlogged;"); + $node->safe_psql('postgres', "DROP TABLESPACE tblspc1;"); + $node->safe_psql('postgres', + "CREATE TABLESPACE tblspc2 LOCATION '$shorter_tempdir/tbl=spc2';"); + $node->command_ok( + [ + 'pg_basebackup', '-D', "$tempdir/backup3", '-Fp', "-j 4", + "-T$shorter_tempdir/tbl\\=spc2=$tempdir/tbackup/tbl\\=spc2" + ], + 'mapping tablespace with = sign in path'); + ok(-d "$tempdir/tbackup/tbl=spc2", + 'tablespace with = sign was relocated'); + $node->safe_psql('postgres', "DROP TABLESPACE tblspc2;"); + rmtree("$tempdir/backup3"); + + mkdir "$tempdir/$superlongname"; + $node->safe_psql('postgres', + "CREATE TABLESPACE tblspc3 LOCATION '$tempdir/$superlongname';"); + $node->command_ok( + [ 'pg_basebackup', '-D', "$tempdir/tarbackup_l3", '-Ft' , '-j 4'], + 'pg_basebackup tar with long symlink target'); + $node->safe_psql('postgres', "DROP TABLESPACE tblspc3;"); + rmtree("$tempdir/tarbackup_l3"); +} + +$node->command_ok([ 'pg_basebackup', '-D', "$tempdir/backupR", '-R' , '-j 4'], + 'pg_basebackup -R runs'); +ok(-f "$tempdir/backupR/postgresql.auto.conf", 'postgresql.auto.conf exists'); +ok(-f "$tempdir/backupR/standby.signal", 'standby.signal was created'); +my $recovery_conf = slurp_file "$tempdir/backupR/postgresql.auto.conf"; +rmtree("$tempdir/backupR"); + +my $port = $node->port; +like( + $recovery_conf, + qr/^primary_conninfo = '.*port=$port.*'\n/m, + 'postgresql.auto.conf sets primary_conninfo'); + +$node->command_ok( + [ 'pg_basebackup', '-D', "$tempdir/backupxd" , "-j 4"], + 'pg_basebackup runs in default xlog mode'); +ok(grep(/^[0-9A-F]{24}$/, slurp_dir("$tempdir/backupxd/pg_wal")), + 'WAL files copied'); +rmtree("$tempdir/backupxd"); + +$node->command_ok( + [ 'pg_basebackup', '-D', "$tempdir/backupxf", '-X', 'fetch' , "-j 4"], + 'pg_basebackup -X fetch runs'); +ok(grep(/^[0-9A-F]{24}$/, slurp_dir("$tempdir/backupxf/pg_wal")), + 'WAL files copied'); +rmtree("$tempdir/backupxf"); +$node->command_ok( + [ 'pg_basebackup', '-D', "$tempdir/backupxs", '-X', 'stream' , "-j 4"], + 'pg_basebackup -X stream runs'); +ok(grep(/^[0-9A-F]{24}$/, slurp_dir("$tempdir/backupxs/pg_wal")), + 'WAL files copied'); +rmtree("$tempdir/backupxs"); +$node->command_ok( + [ 'pg_basebackup', '-D', "$tempdir/backupxst", '-X', 'stream', '-Ft' , "-j 4"], + 'pg_basebackup -X stream runs in tar mode'); +ok(-f "$tempdir/backupxst/pg_wal.tar", "tar file was created"); +rmtree("$tempdir/backupxst"); +$node->command_ok( + [ + 'pg_basebackup', '-D', + "$tempdir/backupnoslot", '-X', + 'stream', '--no-slot', + '-j 4' + ], + 'pg_basebackup -X stream runs with --no-slot'); +rmtree("$tempdir/backupnoslot"); + +$node->command_fails( + [ + 'pg_basebackup', '-D', + "$tempdir/backupxs_sl_fail", '-X', + 'stream', '-S', + 'slot0', + '-j 4' + ], + 'pg_basebackup fails with nonexistent replication slot'); +# +$node->command_fails( + [ 'pg_basebackup', '-D', "$tempdir/backupxs_slot", '-C' , '-j 4'], + 'pg_basebackup -C fails without slot name'); + +$node->command_fails( + [ + 'pg_basebackup', '-D', + "$tempdir/backupxs_slot", '-C', + '-S', 'slot0', + '--no-slot', + '-j 4' + ], + 'pg_basebackup fails with -C -S --no-slot'); + +$node->command_ok( + [ 'pg_basebackup', '-D', "$tempdir/backupxs_slot", '-C', '-S', 'slot0', '-j 4'], + 'pg_basebackup -C runs'); +rmtree("$tempdir/backupxs_slot"); + +is( $node->safe_psql( + 'postgres', + q{SELECT slot_name FROM pg_replication_slots WHERE slot_name = 'slot0'} + ), + 'slot0', + 'replication slot was created'); +isnt( + $node->safe_psql( + 'postgres', + q{SELECT restart_lsn FROM pg_replication_slots WHERE slot_name = 'slot0'} + ), + '', + 'restart LSN of new slot is not null'); + +$node->command_fails( + [ 'pg_basebackup', '-D', "$tempdir/backupxs_slot1", '-C', '-S', 'slot0', '-j 4'], + 'pg_basebackup fails with -C -S and a previously existing slot'); + +$node->safe_psql('postgres', + q{SELECT * FROM pg_create_physical_replication_slot('slot1')}); +my $lsn = $node->safe_psql('postgres', + q{SELECT restart_lsn FROM pg_replication_slots WHERE slot_name = 'slot1'} +); +is($lsn, '', 'restart LSN of new slot is null'); +$node->command_fails( + [ 'pg_basebackup', '-D', "$tempdir/fail", '-S', 'slot1', '-X', 'none', '-j 4'], + 'pg_basebackup with replication slot fails without WAL streaming'); +$node->command_ok( + [ + 'pg_basebackup', '-D', "$tempdir/backupxs_sl", '-X', + 'stream', '-S', 'slot1', '-j 4' + ], + 'pg_basebackup -X stream with replication slot runs'); +$lsn = $node->safe_psql('postgres', + q{SELECT restart_lsn FROM pg_replication_slots WHERE slot_name = 'slot1'} +); +like($lsn, qr!^0/[0-9A-Z]{7,8}$!, 'restart LSN of slot has advanced'); +rmtree("$tempdir/backupxs_sl"); + +$node->command_ok( + [ + 'pg_basebackup', '-D', "$tempdir/backupxs_sl_R", '-X', + 'stream', '-S', 'slot1', '-R', + '-j 4' + ], + 'pg_basebackup with replication slot and -R runs'); +like( + slurp_file("$tempdir/backupxs_sl_R/postgresql.auto.conf"), + qr/^primary_slot_name = 'slot1'\n/m, + 'recovery conf file sets primary_slot_name'); + +my $checksum = $node->safe_psql('postgres', 'SHOW data_checksums;'); +is($checksum, 'on', 'checksums are enabled'); +rmtree("$tempdir/backupxs_sl_R"); + +# create tables to corrupt and get their relfilenodes +my $file_corrupt1 = $node->safe_psql('postgres', + q{SELECT a INTO corrupt1 FROM generate_series(1,10000) AS a; ALTER TABLE corrupt1 SET (autovacuum_enabled=false); SELECT pg_relation_filepath('corrupt1')} +); +my $file_corrupt2 = $node->safe_psql('postgres', + q{SELECT b INTO corrupt2 FROM generate_series(1,2) AS b; ALTER TABLE corrupt2 SET (autovacuum_enabled=false); SELECT pg_relation_filepath('corrupt2')} +); + +# set page header and block sizes +my $pageheader_size = 24; +my $block_size = $node->safe_psql('postgres', 'SHOW block_size;'); + +# induce corruption +system_or_bail 'pg_ctl', '-D', $pgdata, 'stop'; +open $file, '+<', "$pgdata/$file_corrupt1"; +seek($file, $pageheader_size, 0); +syswrite($file, "\0\0\0\0\0\0\0\0\0"); +close $file; +system_or_bail 'pg_ctl', '-D', $pgdata, 'start'; + +$node->command_checks_all( + [ 'pg_basebackup', '-D', "$tempdir/backup_corrupt", '-j 4'], + 1, + [qr{^$}], + [qr/^WARNING.*checksum verification failed/s], + 'pg_basebackup reports checksum mismatch'); +rmtree("$tempdir/backup_corrupt"); + +# induce further corruption in 5 more blocks +system_or_bail 'pg_ctl', '-D', $pgdata, 'stop'; +open $file, '+<', "$pgdata/$file_corrupt1"; +for my $i (1 .. 5) +{ + my $offset = $pageheader_size + $i * $block_size; + seek($file, $offset, 0); + syswrite($file, "\0\0\0\0\0\0\0\0\0"); +} +close $file; +system_or_bail 'pg_ctl', '-D', $pgdata, 'start'; + +$node->command_checks_all( + [ 'pg_basebackup', '-D', "$tempdir/backup_corrupt2", '-j 4'], + 1, + [qr{^$}], + [qr/^WARNING.*further.*failures.*will.not.be.reported/s], + 'pg_basebackup does not report more than 5 checksum mismatches'); +rmtree("$tempdir/backup_corrupt2"); + +# induce corruption in a second file +system_or_bail 'pg_ctl', '-D', $pgdata, 'stop'; +open $file, '+<', "$pgdata/$file_corrupt2"; +seek($file, $pageheader_size, 0); +syswrite($file, "\0\0\0\0\0\0\0\0\0"); +close $file; +system_or_bail 'pg_ctl', '-D', $pgdata, 'start'; + +#$node->command_checks_all( +# [ 'pg_basebackup', '-D', "$tempdir/backup_corrupt3", '-j 4'], +# 1, +# [qr{^$}], +# [qr/^WARNING.*checksum verification failed/s], +# 'pg_basebackup correctly report the total number of checksum mismatches'); +#rmtree("$tempdir/backup_corrupt3"); + +# do not verify checksums, should return ok +$node->command_ok( + [ + 'pg_basebackup', '-D', + "$tempdir/backup_corrupt4", '--no-verify-checksums', + '-j 4' + ], + 'pg_basebackup with -k does not report checksum mismatch'); +rmtree("$tempdir/backup_corrupt4"); + +$node->safe_psql('postgres', "DROP TABLE corrupt1;"); +$node->safe_psql('postgres', "DROP TABLE corrupt2;"); diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h index 1e3ed4e19f..f92d593e2e 100644 --- a/src/include/nodes/replnodes.h +++ b/src/include/nodes/replnodes.h @@ -23,6 +23,13 @@ typedef enum ReplicationKind REPLICATION_KIND_LOGICAL } ReplicationKind; +typedef enum BackupCmdTag +{ + BASE_BACKUP, + START_BACKUP, + SEND_FILES_CONTENT, + STOP_BACKUP +} BackupCmdTag; /* ---------------------- * IDENTIFY_SYSTEM command @@ -42,6 +49,8 @@ typedef struct BaseBackupCmd { NodeTag type; List *options; + BackupCmdTag cmdtag; + List *backupfiles; } BaseBackupCmd; diff --git a/src/include/replication/basebackup.h b/src/include/replication/basebackup.h index 503a5b9f0b..9e792af99d 100644 --- a/src/include/replication/basebackup.h +++ b/src/include/replication/basebackup.h @@ -31,6 +31,6 @@ typedef struct extern void SendBaseBackup(BaseBackupCmd *cmd); -extern int64 sendTablespace(char *path, bool sizeonly); +extern int64 sendTablespace(char *path, bool sizeonly, List **files); #endif /* _BASEBACKUP_H */ -- 2.21.0 (Apple Git-122)