From 67e45f26a2035f714afdfa3c40e09a250448a228 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Thu, 9 Sep 2021 14:53:04 -0400 Subject: [PATCH v4 5/7] Modify pg_basebackup to use a new COPY subprotocol for base backups. In the new approach, all files across all tablespaces are sent in a single COPY OUT operation. The CopyData messages are no longer raw archive content; rather, each message is prefixed with a type byte that describes its purpose, e.g. 'n' signifies the start of a new archive and 'd' signifies archive or manifest data. This protocol is significantly more extensible than the old approach, since we can later create more message types, though not without concern for backward compatibility. The new protocol sends a few things to the client that the old one did not. First, it sends the name of each archive explicitly, instead of letting the client compute it. This is intended to make it easier to write future patches that might send archives in a format other that tar (e.g. cpio, pax, tar.gz). Second, it sends explicit progress messages rather than allowing the client to assume that progress is defined by the number of bytes received. This will help with future features where the server compresses the data, or sends it someplace directly rather than transmitting it to the client. When the new protocol is used, the server generates properly terminated tar archives, in contrast to the old one which intentionally leaves out the two blocks of zero bytes that are supposed to occur at the end of each tar file. Any verison of pg_basebackup new enough to suppor the new protocol is also smart enough not to be confused by these padding blocks, so we need not propagate this kluge. The old protocol is still supported for compatibility with previous releases. The new protocol is selected by means of a new TARGET option to the BASE_BACKUP command. Currently, the only supported target is 'client'. Support for additional targets will be added in a later commit. --- src/backend/replication/basebackup.c | 62 ++- src/backend/replication/basebackup_copy.c | 266 ++++++++++++- src/bin/pg_basebackup/pg_basebackup.c | 443 +++++++++++++++++++--- src/include/replication/basebackup_sink.h | 1 + src/tools/pgindent/typedefs.list | 3 + 5 files changed, 722 insertions(+), 53 deletions(-) diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index ecd32e8436..aefa7cb17e 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -53,6 +53,12 @@ */ #define SINK_BUFFER_LENGTH Max(32768, BLCKSZ) +typedef enum +{ + BACKUP_TARGET_COMPAT, + BACKUP_TARGET_CLIENT +} backup_target_type; + typedef struct { const char *label; @@ -62,6 +68,7 @@ typedef struct bool includewal; uint32 maxrate; bool sendtblspcmapfile; + backup_target_type target; backup_manifest_option manifest; pg_checksum_type manifest_checksum_type; } basebackup_options; @@ -81,6 +88,7 @@ static int64 _tarWriteHeader(bbsink *sink, const char *filename, const char *linktarget, struct stat *statbuf, bool sizeonly); static void _tarWritePadding(bbsink *sink, int len); +static void _tarEndArchive(bbsink *sink, backup_target_type target); static void convert_link_to_directory(const char *pathbuf, struct stat *statbuf); static void perform_base_backup(basebackup_options *opt); static void parse_basebackup_options(List *options, basebackup_options *opt); @@ -233,7 +241,7 @@ perform_base_backup(basebackup_options *opt) StringInfo tblspc_map_file; backup_manifest_info manifest; int datadirpathlen; - bbsink *sink = bbsink_copytblspc_new(); + bbsink *sink; bbsink *progress_sink; /* Initial backup state, insofar as we know it now. */ @@ -243,6 +251,16 @@ perform_base_backup(basebackup_options *opt) state.bytes_total = 0; state.bytes_total_is_valid = false; + /* + * If the TARGET option was specified, then we can use the new copy-stream + * protocol. If not, we must fall back to the old and less capable + * copy-tablespace protocol. + */ + if (opt->target != BACKUP_TARGET_COMPAT) + sink = bbsink_copystream_new(); + else + sink = bbsink_copytblspc_new(); + /* Set up network throttling, if client requested it */ if (opt->maxrate > 0) sink = bbsink_throttle_new(sink, opt->maxrate); @@ -383,7 +401,10 @@ perform_base_backup(basebackup_options *opt) Assert(lnext(state.tablespaces, lc) == NULL); } else + { + _tarEndArchive(sink, opt->target); bbsink_end_archive(sink); + } } basebackup_progress_wait_wal_archive(progress_sink); @@ -621,6 +642,7 @@ perform_base_backup(basebackup_options *opt) sendFileWithContent(sink, pathbuf, "", &manifest); } + _tarEndArchive(sink, opt->target); bbsink_end_archive(sink); } @@ -688,8 +710,10 @@ parse_basebackup_options(List *options, basebackup_options *opt) bool o_noverify_checksums = false; bool o_manifest = false; bool o_manifest_checksums = false; + bool o_target = false; MemSet(opt, 0, sizeof(*opt)); + opt->target = BACKUP_TARGET_COMPAT; opt->manifest = MANIFEST_OPTION_NO; opt->manifest_checksum_type = CHECKSUM_TYPE_CRC32C; @@ -820,6 +844,22 @@ parse_basebackup_options(List *options, basebackup_options *opt) optval))); o_manifest_checksums = true; } + else if (strcmp(defel->defname, "target") == 0) + { + char *optval = defGetString(defel); + + if (o_target) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("duplicate option \"%s\"", defel->defname))); + if (strcmp(optval, "client") == 0) + opt->target = BACKUP_TARGET_CLIENT; + else + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("unrecognized target: \"%s\"", optval))); + o_target = true; + } else ereport(ERROR, errcode(ERRCODE_SYNTAX_ERROR), @@ -1672,6 +1712,26 @@ _tarWritePadding(bbsink *sink, int len) } } +/* + * Tar archives are supposed to end with two blocks of zeroes, so add those, + * unless we're using the old copy-tablespace protocol. In that system, the + * server must not properly terminate the client archive, and the client is + * instead responsible for adding those two blocks of zeroes. + */ +static void +_tarEndArchive(bbsink *sink, backup_target_type target) +{ + if (target != BACKUP_TARGET_COMPAT) + { + /* See comments in _tarWriteHeader for why this must be true. */ + Assert(sink->bbs_buffer_length >= TAR_BLOCK_SIZE); + + MemSet(sink->bbs_buffer, 0, TAR_BLOCK_SIZE); + bbsink_archive_contents(sink, TAR_BLOCK_SIZE); + bbsink_archive_contents(sink, TAR_BLOCK_SIZE); + } +} + /* * If the entry in statbuf is a link, then adjust statbuf to make it look like a * directory, so that it will be written that way. diff --git a/src/backend/replication/basebackup_copy.c b/src/backend/replication/basebackup_copy.c index 564f010188..389a520417 100644 --- a/src/backend/replication/basebackup_copy.c +++ b/src/backend/replication/basebackup_copy.c @@ -1,8 +1,27 @@ /*------------------------------------------------------------------------- * * basebackup_copy.c - * send basebackup archives using one COPY OUT operation per - * tablespace, and an additional COPY OUT for the backup manifest + * send basebackup archives using COPY OUT + * + * We have two different ways of doing this. + * + * 'copytblspc' is an older method still supported for compatibility + * with releases prior to v15. In this method, a separate COPY OUT + * operation is used for each tablespace. The manifest, if it is sent, + * uses an additional COPY OUT operation. + * + * 'copystream' sends a starts a single COPY OUT operation and transmits + * all the archives and the manifest if present during the course of that + * single COPY OUT. Each CopyData message begins with a type byte, + * allowing us to signal the start of a new archive, or the manifest, + * by some means other than ending the COPY stream. This also allows + * this protocol to be extended more easily, since we can include + * arbitrary information in the message stream as long as we're certain + * that the client will know what to do with it. + * + * Regardless of which method is used, we sent a result set with + * information about the tabelspaces to be included in the backup before + * starting COPY OUT. This result has the same format in every method. * * Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group * @@ -18,6 +37,51 @@ #include "libpq/pqformat.h" #include "replication/basebackup.h" #include "replication/basebackup_sink.h" +#include "utils/timestamp.h" + +typedef struct bbsink_copystream +{ + /* Common information for all types of sink. */ + bbsink base; + + /* + * Protocol message buffer. We assemble CopyData protocol messages by + * setting the first character of this buffer to 'd' (archive or manifest + * data) and then making base.bbs_buffer point to the second character so + * that the rest of the data gets copied into the message just where we + * want it. + */ + char *msgbuffer; + + /* + * When did we last report progress to the client, and how much progress + * did we report? + */ + TimestampTz last_progress_report_time; + uint64 bytes_done_at_last_time_check; +} bbsink_copystream; + +/* + * We don't want to send progress messages to the client excessively + * frequently. Ideally, we'd like to send a message when the time since the + * last message reaches PROGRESS_REPORT_MILLISECOND_THRESHOLD, but checking + * the system time every time we send a tiny bit of data seems too expensive. + * So we only check it after the number of bytes sine the last check reaches + * PROGRESS_REPORT_BYTE_INTERVAL. + */ +#define PROGRESS_REPORT_BYTE_INTERVAL 65536 +#define PROGRESS_REPORT_MILLISECOND_THRESHOLD 1000 + +static void bbsink_copystream_begin_backup(bbsink *sink); +static void bbsink_copystream_begin_archive(bbsink *sink, + const char *archive_name); +static void bbsink_copystream_archive_contents(bbsink *sink, size_t len); +static void bbsink_copystream_end_archive(bbsink *sink); +static void bbsink_copystream_begin_manifest(bbsink *sink); +static void bbsink_copystream_manifest_contents(bbsink *sink, size_t len); +static void bbsink_copystream_end_manifest(bbsink *sink); +static void bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr, + TimeLineID endtli); static void bbsink_copytblspc_begin_backup(bbsink *sink); static void bbsink_copytblspc_begin_archive(bbsink *sink, @@ -37,6 +101,17 @@ static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli); static void SendTablespaceList(List *tablespaces); static void send_int8_string(StringInfoData *buf, int64 intval); +const bbsink_ops bbsink_copystream_ops = { + .begin_backup = bbsink_copystream_begin_backup, + .begin_archive = bbsink_copystream_begin_archive, + .archive_contents = bbsink_copystream_archive_contents, + .end_archive = bbsink_copystream_end_archive, + .begin_manifest = bbsink_copystream_begin_manifest, + .manifest_contents = bbsink_copystream_manifest_contents, + .end_manifest = bbsink_copystream_end_manifest, + .end_backup = bbsink_copystream_end_backup +}; + const bbsink_ops bbsink_copytblspc_ops = { .begin_backup = bbsink_copytblspc_begin_backup, .begin_archive = bbsink_copytblspc_begin_archive, @@ -48,6 +123,193 @@ const bbsink_ops bbsink_copytblspc_ops = { .end_backup = bbsink_copytblspc_end_backup }; +/* + * Create a new 'copystream' bbsink. + */ +bbsink * +bbsink_copystream_new(void) +{ + bbsink_copystream *sink = palloc0(sizeof(bbsink_copystream)); + + *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_copystream_ops; + + /* Set up for periodic progress reporting. */ + sink->last_progress_report_time = GetCurrentTimestamp(); + sink->bytes_done_at_last_time_check = UINT64CONST(0); + + return &sink->base; +} + +/* + * Send start-of-backup wire protocol messages. + */ +static void +bbsink_copystream_begin_backup(bbsink *sink) +{ + bbsink_copystream *mysink = (bbsink_copystream *) sink; + bbsink_state *state = sink->bbs_state; + + /* + * Initialize buffer. We ultimately want to send the archive and manifest + * data by means of CopyData messages where the payload portion of each + * message begins with a type byte, so we set up a buffer that begins + * with a the type byte we're going to need, and then arrange things so + * that the data we're given will be written just after that type byte. + * That will allow us to ship the data with a single call to pq_putmessage + * and without needing any extra copying. + */ + mysink->msgbuffer = palloc(mysink->base.bbs_buffer_length + 1); + mysink->base.bbs_buffer = mysink->msgbuffer + 1; + mysink->msgbuffer[0] = 'd'; /* archive or manifest data */ + + /* Tell client the backup start location. */ + SendXlogRecPtrResult(state->startptr, state->starttli); + + /* Send client a list of tablespaces. */ + SendTablespaceList(state->tablespaces); + + /* Send a CommandComplete message */ + pq_puttextmessage('C', "SELECT"); + + /* Begin COPY stream. This will be used for all archives + manifest. */ + SendCopyOutResponse(); +} + +/* + * Send a CopyData message announcing the beginning of a new archive. + */ +static void +bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name) +{ + bbsink_state *state = sink->bbs_state; + tablespaceinfo *ti; + StringInfoData buf; + + ti = list_nth(state->tablespaces, state->tablespace_num); + pq_beginmessage(&buf, 'd'); /* CopyData */ + pq_sendbyte(&buf, 'n'); /* New archive */ + pq_sendstring(&buf, archive_name); + pq_sendstring(&buf, ti->path == NULL ? "" : ti->path); + pq_endmessage(&buf); +} + +/* + * Send a CopyData message containing a chunk of archive content. + */ +static void +bbsink_copystream_archive_contents(bbsink *sink, size_t len) +{ + bbsink_copystream *mysink = (bbsink_copystream *) sink; + bbsink_state *state = mysink->base.bbs_state; + StringInfoData buf; + uint64 targetbytes; + + /* Send the archive content to the client (with leading type byte). */ + pq_putmessage('d', mysink->msgbuffer, len + 1); + + /* Consider whether to send a progress report to the client. */ + targetbytes = mysink->bytes_done_at_last_time_check + + PROGRESS_REPORT_BYTE_INTERVAL; + if (targetbytes <= state->bytes_done) + { + TimestampTz now = GetCurrentTimestamp(); + long ms; + + /* + * OK, we've sent a decent number of bytes, so check the system time + * to see whether we're due to send a progress report. + */ + mysink->bytes_done_at_last_time_check = state->bytes_done; + ms = TimestampDifferenceMilliseconds(mysink->last_progress_report_time, + now); + + /* + * Send a progress report if enough time has passed. Also send one if + * the system clock was set backward, so that such occurrences don't + * have the effect of suppressing further progress messages. + */ + if (ms < 0 || ms >= PROGRESS_REPORT_MILLISECOND_THRESHOLD) + { + mysink->last_progress_report_time = now; + + pq_beginmessage(&buf, 'd'); /* CopyData */ + pq_sendbyte(&buf, 'p'); /* Progress report */ + pq_sendint64(&buf, state->bytes_done); + pq_endmessage(&buf); + pq_flush_if_writable(); + } + } +} + +/* + * We don't need to explicitly signal the end of the archive; the client + * will figure out that we've reached the end when we begin the next one, + * or begin the manifest, or end the COPY stream. However, this seems like + * a good time to force out a progress report. One reason for that is that + * if this is the last archive, and we don't force a progress report now, + * the client will never be told that we sent all the bytes. + */ +static void +bbsink_copystream_end_archive(bbsink *sink) +{ + bbsink_copystream *mysink = (bbsink_copystream *) sink; + bbsink_state *state = mysink->base.bbs_state; + StringInfoData buf; + + mysink->bytes_done_at_last_time_check = state->bytes_done; + mysink->last_progress_report_time = GetCurrentTimestamp(); + pq_beginmessage(&buf, 'd'); /* CopyData */ + pq_sendbyte(&buf, 'p'); /* Progress report */ + pq_sendint64(&buf, state->bytes_done); + pq_endmessage(&buf); + pq_flush_if_writable(); +} + +/* + * Send a CopyData message announcing the beginning of the backup manifest. + */ +static void +bbsink_copystream_begin_manifest(bbsink *sink) +{ + StringInfoData buf; + + pq_beginmessage(&buf, 'd'); /* CopyData */ + pq_sendbyte(&buf, 'm'); /* Manifest */ + pq_endmessage(&buf); +} + +/* + * Each chunk of manifest data is sent using a CopyData message. + */ +static void +bbsink_copystream_manifest_contents(bbsink *sink, size_t len) +{ + bbsink_copystream *mysink = (bbsink_copystream *) sink; + + /* Send the manifest content to the client (with leading type byte). */ + pq_putmessage('d', mysink->msgbuffer, len + 1); +} + +/* + * We don't need an explicit terminator for the backup manifest. + */ +static void +bbsink_copystream_end_manifest(bbsink *sink) +{ + /* Do nothing. */ +} + +/* + * Send end-of-backup wire protocol messages. + */ +static void +bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr, + TimeLineID endtli) +{ + SendCopyDone(); + SendXlogRecPtrResult(endptr, endtli); +} + /* * Create a new 'copytblspc' bbsink. */ diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 947a182e86..8221a8c9ac 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -54,6 +54,16 @@ typedef struct TablespaceList TablespaceListCell *tail; } TablespaceList; +typedef struct ArchiveStreamState +{ + int tablespacenum; + bbstreamer *streamer; + bbstreamer *manifest_inject_streamer; + PQExpBuffer manifest_buffer; + char manifest_filename[MAXPGPATH]; + FILE *manifest_file; +} ArchiveStreamState; + typedef struct WriteTarState { int tablespacenum; @@ -167,6 +177,13 @@ static void progress_report(int tablespacenum, bool force, bool finished); static bbstreamer *CreateBackupStreamer(char *archive_name, char *spclocation, bbstreamer **manifest_inject_streamer_p, bool is_recovery_guc_supported); +static void ReceiveArchiveStreamChunk(size_t r, char *copybuf, + void *callback_data); +static char GetCopyDataByte(size_t r, char *copybuf, size_t *cursor); +static char *GetCopyDataString(size_t r, char *copybuf, size_t *cursor); +static uint64 GetCopyDataUInt64(size_t r, char *copybuf, size_t *cursor); +static void GetCopyDataEnd(size_t r, char *copybuf, size_t cursor); +static void ReportCopyDataParseError(size_t r, char *copybuf); static void ReceiveTarFile(PGconn *conn, char *archive_name, char *spclocation, bool tablespacenum); static void ReceiveTarCopyChunk(size_t r, char *copybuf, void *callback_data); @@ -983,10 +1000,11 @@ CreateBackupStreamer(char *archive_name, char *spclocation, /* * We have to parse the archive if (1) we're suppose to extract it, or if - * (2) we need to inject backup_manifest or recovery configuration into it. + * (2) we need to inject backup_manifest or recovery configuration into + * it. */ must_parse_archive = (format == 'p' || inject_manifest || - (spclocation == NULL && writerecoveryconf)); + (spclocation == NULL && writerecoveryconf)); if (format == 'p') { @@ -1013,8 +1031,8 @@ CreateBackupStreamer(char *archive_name, char *spclocation, /* * In tar format, we just write the archive without extracting it. * Normally, we write it to the archive name provided by the caller, - * but when the base directory is "-" that means we need to write - * to standard output. + * but when the base directory is "-" that means we need to write to + * standard output. */ if (strcmp(basedir, "-") == 0) { @@ -1054,16 +1072,16 @@ CreateBackupStreamer(char *archive_name, char *spclocation, } /* - * If we're supposed to inject the backup manifest into the results, - * it should be done here, so that the file content can be injected - * directly, without worrying about the details of the tar format. + * If we're supposed to inject the backup manifest into the results, it + * should be done here, so that the file content can be injected directly, + * without worrying about the details of the tar format. */ if (inject_manifest) manifest_inject_streamer = streamer; /* - * If this is the main tablespace and we're supposed to write - * recovery information, arrange to do that. + * If this is the main tablespace and we're supposed to write recovery + * information, arrange to do that. */ if (spclocation == NULL && writerecoveryconf) { @@ -1074,8 +1092,8 @@ CreateBackupStreamer(char *archive_name, char *spclocation, } /* - * If we're doing anything that involves understanding the contents of - * the archive, we'll need to parse it. + * If we're doing anything that involves understanding the contents of the + * archive, we'll need to parse it. */ if (must_parse_archive) streamer = bbstreamer_tar_parser_new(streamer); @@ -1085,6 +1103,317 @@ CreateBackupStreamer(char *archive_name, char *spclocation, return streamer; } +/* + * Receive all of the archives the server wants to send - and the backup + * manifest if present - as a single COPY stream. + */ +static void +ReceiveArchiveStream(PGconn *conn) +{ + ArchiveStreamState state; + + /* Set up initial state. */ + memset(&state, 0, sizeof(state)); + state.tablespacenum = -1; + + /* All the real work happens in ReceiveArchiveStreamChunk. */ + ReceiveCopyData(conn, ReceiveArchiveStreamChunk, &state); + + /* If we wrote the backup manifest to a file, close the file. */ + if (state.manifest_file !=NULL) + { + fclose(state.manifest_file); + state.manifest_file = NULL; + } + + /* + * If we buffered the backup manifest in order to inject it into the + * output tarfile, do that now. + */ + if (state.manifest_inject_streamer != NULL && + state.manifest_buffer != NULL) + { + bbstreamer_inject_file(state.manifest_inject_streamer, + "backup_manifest", + state.manifest_buffer->data, + state.manifest_buffer->len); + destroyPQExpBuffer(state.manifest_buffer); + state.manifest_buffer = NULL; + } + + /* If there's still an archive in progress, end processing. */ + if (state.streamer != NULL) + { + bbstreamer_finalize(state.streamer); + bbstreamer_free(state.streamer); + state.streamer = NULL; + } +} + +/* + * Receive one chunk of data sent by the server as part of a single COPY + * stream that includes all archives and the manifest. + */ +static void +ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data) +{ + ArchiveStreamState *state = callback_data; + size_t cursor = 0; + + /* Each CopyData message begins with a type byte. */ + switch (GetCopyDataByte(r, copybuf, &cursor)) + { + case 'n': + { + /* New archive. */ + char *archive_name; + char *spclocation; + + /* + * We force a progress report at the end of each tablespace. A + * new tablespace starts when the previous one ends, except in + * the case of the very first one. + */ + if (++state->tablespacenum > 0) + progress_report(state->tablespacenum, true, false); + + /* Sanity check. */ + if (state->manifest_buffer != NULL || + state->manifest_file !=NULL) + { + pg_log_error("archives should precede manifest"); + exit(1); + } + + /* Parse the rest of the CopyData message. */ + archive_name = GetCopyDataString(r, copybuf, &cursor); + spclocation = GetCopyDataString(r, copybuf, &cursor); + GetCopyDataEnd(r, copybuf, cursor); + + /* + * Basic sanity checks on the archive name: it shouldn't be + * empty, it shouldn't start with a dot, and it shouldn't + * contain a path separator. + */ + if (archive_name[0] == '\0' || archive_name[0] == '.' || + strchr(archive_name, '/') != NULL || + strchr(archive_name, '\\') != NULL) + { + pg_log_error("invalid archive name: \"%s\"", + archive_name); + exit(1); + } + + /* + * An empty spclocation is treated as NULL. We expect this + * case to occur for the data directory itself, but not for + * any archives that correspond to tablespaces. + */ + if (spclocation[0] == '\0') + spclocation = NULL; + + /* End processing of any prior archive. */ + if (state->streamer != NULL) + { + bbstreamer_finalize(state->streamer); + bbstreamer_free(state->streamer); + state->streamer = NULL; + } + + /* + * Create an appropriate backup streamer. We know that + * recovery GUCs are supported, because this protocol can only + * be used on v15+. + */ + state->streamer = + CreateBackupStreamer(archive_name, + spclocation, + &state->manifest_inject_streamer, + true); + break; + } + + case 'd': + { + /* Archive or manifest data. */ + if (state->manifest_buffer != NULL) + { + /* Manifest data, buffer in memory. */ + appendPQExpBuffer(state->manifest_buffer, copybuf + 1, + r - 1); + } + else if (state->manifest_file !=NULL) + { + /* Manifest data, write to disk. */ + if (fwrite(copybuf + 1, r - 1, 1, + state->manifest_file) != 1) + { + /* + * If fwrite() didn't set errno, assume that the + * problem is that we're out of disk space. + */ + if (errno == 0) + errno = ENOSPC; + pg_log_error("could not write to file \"%s\": %m", + state->manifest_filename); + exit(1); + } + } + else if (state->streamer != NULL) + { + /* Archive data. */ + bbstreamer_content(state->streamer, NULL, copybuf + 1, + r - 1, BBSTREAMER_UNKNOWN); + } + else + { + pg_log_error("unexpected payload data"); + exit(1); + } + break; + } + + case 'p': + { + /* + * Progress report. + * + * The remainder of the message is expected to be an 8-byte + * count of bytes completed. + */ + totaldone = GetCopyDataUInt64(r, copybuf, &cursor); + GetCopyDataEnd(r, copybuf, cursor); + + /* + * The server shouldn't send progres report messages too + * often, so we force an update each time we receive one. + */ + progress_report(state->tablespacenum, true, false); + break; + } + + case 'm': + { + /* + * Manifest data will be sent next. This message is not + * expected to have any further payload data. + */ + GetCopyDataEnd(r, copybuf, cursor); + + /* + * If we're supposed inject the manifest into the archive, we + * prepare to buffer it in memory; otherwise, we prepare to + * write it to a temporary file. + */ + if (state->manifest_inject_streamer != NULL) + state->manifest_buffer = createPQExpBuffer(); + else + { + snprintf(state->manifest_filename, + sizeof(state->manifest_filename), + "%s/backup_manifest.tmp", basedir); + state->manifest_file = + fopen(state->manifest_filename, "wb"); + if (state->manifest_file == NULL) + { + pg_log_error("could not create file \"%s\": %m", + state->manifest_filename); + exit(1); + } + } + break; + } + + default: + ReportCopyDataParseError(r, copybuf); + break; + } +} + +/* + * Get a single byte from a CopyData message. + * + * Bail out if none remain. + */ +static char +GetCopyDataByte(size_t r, char *copybuf, size_t *cursor) +{ + if (*cursor >= r) + ReportCopyDataParseError(r, copybuf); + + return copybuf[(*cursor)++]; +} + +/* + * Get a NUL-terminated string from a CopyData message. + * + * Bail out if the terminating NUL cannot be found. + */ +static char * +GetCopyDataString(size_t r, char *copybuf, size_t *cursor) +{ + size_t startpos = *cursor; + size_t endpos = startpos; + + while (1) + { + if (endpos >= r) + ReportCopyDataParseError(r, copybuf); + if (copybuf[endpos] == '\0') + break; + ++endpos; + } + + *cursor = endpos + 1; + return ©buf[startpos]; +} + +/* + * Get an unsigned 64-bit integer from a CopyData message. + * + * Bail out if there are not at least 8 bytes remaining. + */ +static uint64 +GetCopyDataUInt64(size_t r, char *copybuf, size_t *cursor) +{ + uint64 result; + + if (*cursor + sizeof(uint64) > r) + ReportCopyDataParseError(r, copybuf); + memcpy(&result, ©buf[*cursor], sizeof(uint64)); + *cursor += sizeof(uint64); + return pg_ntoh64(result); +} + +/* + * Bail out if we didn't parse the whole message. + */ +static void +GetCopyDataEnd(size_t r, char *copybuf, size_t cursor) +{ + if (r != cursor) + ReportCopyDataParseError(r, copybuf); +} + +/* + * Report failure to parse a CopyData message from the server. Then exit. + * + * As a debugging aid, we try to give some hint about what kind of message + * provoked the failure. Perhaps this is not detailed enough, but it's not + * clear that it's worth expending any more code on what shoud be a + * can't-happen case. + */ +static void +ReportCopyDataParseError(size_t r, char *copybuf) +{ + if (r == 0) + pg_log_error("empty COPY message"); + else + pg_log_error("malformed COPY message of type %d, length %zu", + copybuf[0], r); + exit(1); +} + /* * Receive raw tar data from the server, and stream it to the appropriate * location. If we're writing a single tarfile to standard output, also @@ -1332,28 +1661,32 @@ BaseBackup(void) } if (maxrate > 0) AppendIntegerCommandOption(&buf, use_new_option_syntax, "MAX_RATE", - maxrate); + maxrate); if (format == 't') AppendPlainCommandOption(&buf, use_new_option_syntax, "TABLESPACE_MAP"); if (!verify_checksums) { if (use_new_option_syntax) AppendIntegerCommandOption(&buf, use_new_option_syntax, - "VERIFY_CHECKSUMS", 0); + "VERIFY_CHECKSUMS", 0); else AppendPlainCommandOption(&buf, use_new_option_syntax, - "NOVERIFY_CHECKSUMS"); + "NOVERIFY_CHECKSUMS"); } if (manifest) { AppendStringCommandOption(&buf, use_new_option_syntax, "MANIFEST", - manifest_force_encode ? "force-encode" : "yes"); + manifest_force_encode ? "force-encode" : "yes"); if (manifest_checksums != NULL) AppendStringCommandOption(&buf, use_new_option_syntax, - "MANIFEST_CHECKSUMS", manifest_checksums); + "MANIFEST_CHECKSUMS", manifest_checksums); } + if (serverMajor >= 1500) + AppendStringCommandOption(&buf, use_new_option_syntax, + "TARGET", "client"); + if (verbose) pg_log_info("initiating base backup, waiting for checkpoint to complete"); @@ -1476,46 +1809,56 @@ BaseBackup(void) StartLogStreamer(xlogstart, starttli, sysidentifier); } - /* Receive a tar file for each tablespace in turn */ - for (i = 0; i < PQntuples(res); i++) + if (serverMajor >= 1500) { - char archive_name[MAXPGPATH]; - char *spclocation; - - /* - * If we write the data out to a tar file, it will be named base.tar - * if it's the main data directory or .tar if it's for - * another tablespace. CreateBackupStreamer() will arrange to add .gz - * to the archive name if pg_basebackup is performing compression. - */ - if (PQgetisnull(res, i, 0)) - { - strlcpy(archive_name, "base.tar", sizeof(archive_name)); - spclocation = NULL; - } - else + /* Receive a single tar stream with everything. */ + ReceiveArchiveStream(conn); + } + else + { + /* Receive a tar file for each tablespace in turn */ + for (i = 0; i < PQntuples(res); i++) { - snprintf(archive_name, sizeof(archive_name), - "%s.tar", PQgetvalue(res, i, 0)); - spclocation = PQgetvalue(res, i, 1); + char archive_name[MAXPGPATH]; + char *spclocation; + + /* + * If we write the data out to a tar file, it will be named + * base.tar if it's the main data directory or .tar + * if it's for another tablespace. CreateBackupStreamer() will + * arrange to add .gz to the archive name if pg_basebackup is + * performing compression. + */ + if (PQgetisnull(res, i, 0)) + { + strlcpy(archive_name, "base.tar", sizeof(archive_name)); + spclocation = NULL; + } + else + { + snprintf(archive_name, sizeof(archive_name), + "%s.tar", PQgetvalue(res, i, 0)); + spclocation = PQgetvalue(res, i, 1); + } + + ReceiveTarFile(conn, archive_name, spclocation, i); } - ReceiveTarFile(conn, archive_name, spclocation, i); + /* + * Now receive backup manifest, if appropriate. + * + * If we're writing a tarfile to stdout, ReceiveTarFile will have + * already processed the backup manifest and included it in the output + * tarfile. Such a configuration doesn't allow for writing multiple + * files. + * + * If we're talking to an older server, it won't send a backup + * manifest, so don't try to receive one. + */ + if (!writing_to_stdout && manifest) + ReceiveBackupManifest(conn); } - /* - * Now receive backup manifest, if appropriate. - * - * If we're writing a tarfile to stdout, ReceiveTarFile will have already - * processed the backup manifest and included it in the output tarfile. - * Such a configuration doesn't allow for writing multiple files. - * - * If we're talking to an older server, it won't send a backup manifest, - * so don't try to receive one. - */ - if (!writing_to_stdout && manifest) - ReceiveBackupManifest(conn); - if (showprogress) { progress_filename = NULL; diff --git a/src/include/replication/basebackup_sink.h b/src/include/replication/basebackup_sink.h index 3a2206d82f..2047d0fa7a 100644 --- a/src/include/replication/basebackup_sink.h +++ b/src/include/replication/basebackup_sink.h @@ -261,6 +261,7 @@ extern void bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli); /* Constructors for various types of sinks. */ +extern bbsink *bbsink_copystream_new(void); extern bbsink *bbsink_copytblspc_new(void); extern bbsink *bbsink_progress_new(bbsink *next, bool estimate_backup_size); extern bbsink *bbsink_throttle_new(bbsink *next, uint32 maxrate); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index b916f09165..54c67982f5 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -3765,7 +3765,10 @@ yyscan_t z_stream z_streamp zic_t +ArchiveStreamState +backup_target_type bbsink +bbsink_copystream bbsink_ops bbsink_state bbsink_throttle -- 2.24.3 (Apple Git-128)