From 6b52a8697a1c0d9bd515afcf7ed6a331c6a9e056 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Wed, 6 May 2020 12:08:21 -0400 Subject: [PATCH v2 04/11] Convert libpq-related code to a bbsink. --- src/backend/replication/Makefile | 1 + src/backend/replication/backup_manifest.c | 18 +- src/backend/replication/basebackup.c | 286 +++++-------------- src/backend/replication/basebackup_libpq.c | 309 +++++++++++++++++++++ src/include/replication/backup_manifest.h | 4 +- src/include/replication/basebackup_sink.h | 3 + 6 files changed, 388 insertions(+), 233 deletions(-) create mode 100644 src/backend/replication/basebackup_libpq.c diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile index 25d56478f4..6adc396501 100644 --- a/src/backend/replication/Makefile +++ b/src/backend/replication/Makefile @@ -17,6 +17,7 @@ override CPPFLAGS := -I. -I$(srcdir) $(CPPFLAGS) OBJS = \ backup_manifest.o \ basebackup.o \ + basebackup_libpq.o \ basebackup_sink.o \ repl_gram.o \ slot.o \ diff --git a/src/backend/replication/backup_manifest.c b/src/backend/replication/backup_manifest.c index b626004927..ff326bce19 100644 --- a/src/backend/replication/backup_manifest.c +++ b/src/backend/replication/backup_manifest.c @@ -17,6 +17,7 @@ #include "libpq/pqformat.h" #include "mb/pg_wchar.h" #include "replication/backup_manifest.h" +#include "replication/basebackup_sink.h" #include "utils/builtins.h" #include "utils/json.h" @@ -283,9 +284,8 @@ AddWALInfoToBackupManifest(backup_manifest_info *manifest, XLogRecPtr startptr, * Finalize the backup manifest, and send it to the client. */ void -SendBackupManifest(backup_manifest_info *manifest) +SendBackupManifest(backup_manifest_info *manifest, bbsink *sink) { - StringInfoData protobuf; uint8 checksumbuf[PG_SHA256_DIGEST_LENGTH]; char checksumstringbuf[PG_SHA256_DIGEST_STRING_LENGTH]; size_t manifest_bytes_done = 0; @@ -321,19 +321,15 @@ SendBackupManifest(backup_manifest_info *manifest) (errcode_for_file_access(), errmsg("could not rewind temporary file"))); - /* Send CopyOutResponse message */ - pq_beginmessage(&protobuf, 'H'); - pq_sendbyte(&protobuf, 0); /* overall format */ - pq_sendint16(&protobuf, 0); /* natts */ - pq_endmessage(&protobuf); /* - * Send CopyData messages. + * Send the backup manifest. * * We choose to read back the data from the temporary file in chunks of * size BLCKSZ; this isn't necessary, but buffile.c uses that as the I/O * size, so it seems to make sense to match that value here. */ + bbsink_begin_manifest(sink); while (manifest_bytes_done < manifest->manifest_size) { char manifestbuf[BLCKSZ]; @@ -347,12 +343,10 @@ SendBackupManifest(backup_manifest_info *manifest) ereport(ERROR, (errcode_for_file_access(), errmsg("could not read from temporary file: %m"))); - pq_putmessage('d', manifestbuf, bytes_to_read); + bbsink_manifest_contents(sink, manifestbuf, bytes_to_read); manifest_bytes_done += bytes_to_read; } - - /* No more data, so send CopyDone message */ - pq_putemptymessage('c'); + bbsink_end_manifest(sink); /* Release resources */ BufFileClose(manifest->buffile); diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index 6916132400..a56b0e9813 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -17,13 +17,10 @@ #include #include "access/xlog_internal.h" /* for pg_start/stop_backup */ -#include "catalog/pg_type.h" #include "common/file_perm.h" #include "commands/defrem.h" #include "commands/progress.h" #include "lib/stringinfo.h" -#include "libpq/libpq.h" -#include "libpq/pqformat.h" #include "miscadmin.h" #include "nodes/pg_list.h" #include "pgstat.h" @@ -31,6 +28,7 @@ #include "port.h" #include "postmaster/syslogger.h" #include "replication/basebackup.h" +#include "replication/basebackup_sink.h" #include "replication/backup_manifest.h" #include "replication/walsender.h" #include "replication/walsender_private.h" @@ -59,24 +57,23 @@ typedef struct pg_checksum_type manifest_checksum_type; } basebackup_options; -static int64 sendTablespace(char *path, char *oid, bool sizeonly, +static int64 sendTablespace(bbsink *sink, char *path, char *oid, bool sizeonly, struct backup_manifest_info *manifest); -static int64 sendDir(const char *path, int basepathlen, bool sizeonly, +static int64 sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly, List *tablespaces, bool sendtblspclinks, backup_manifest_info *manifest, const char *spcoid); -static bool sendFile(const char *readfilename, const char *tarfilename, +static bool sendFile(bbsink *sink, const char *readfilename, const char *tarfilename, struct stat *statbuf, bool missing_ok, Oid dboid, backup_manifest_info *manifest, const char *spcoid); -static void sendFileWithContent(const char *filename, const char *content, +static void sendFileWithContent(bbsink *sink, const char *filename, + const char *content, backup_manifest_info *manifest); -static int64 _tarWriteHeader(const char *filename, const char *linktarget, - struct stat *statbuf, bool sizeonly); +static int64 _tarWriteHeader(bbsink *sink, const char *filename, + const char *linktarget, struct stat *statbuf, + bool sizeonly); static void convert_link_to_directory(const char *pathbuf, struct stat *statbuf); -static void send_int8_string(StringInfoData *buf, int64 intval); -static void SendBackupHeader(List *tablespaces); static void perform_base_backup(basebackup_options *opt); static void parse_basebackup_options(List *options, basebackup_options *opt); -static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli); static int compareWalFileNames(const ListCell *a, const ListCell *b); static void throttle(size_t increment); static void update_basebackup_progress(int64 delta); @@ -263,6 +260,7 @@ perform_base_backup(basebackup_options *opt) backup_manifest_info manifest; int datadirpathlen; List *tablespaces = NIL; + bbsink *sink = bbsink_libpq_new(); backup_total = 0; backup_streamed = 0; @@ -345,10 +343,10 @@ perform_base_backup(basebackup_options *opt) tablespaceinfo *tmp = (tablespaceinfo *) lfirst(lc); if (tmp->path == NULL) - tmp->size = sendDir(".", 1, true, tablespaces, true, NULL, + tmp->size = sendDir(sink, ".", 1, true, tablespaces, true, NULL, NULL); else - tmp->size = sendTablespace(tmp->path, tmp->oid, true, + tmp->size = sendTablespace(sink, tmp->path, tmp->oid, true, NULL); backup_total += tmp->size; } @@ -369,11 +367,8 @@ perform_base_backup(basebackup_options *opt) pgstat_progress_update_multi_param(3, index, val); } - /* Send the starting position of the backup */ - SendXlogRecPtrResult(startptr, starttli); - - /* Send tablespace header */ - SendBackupHeader(tablespaces); + /* notify basebackup sink about start of backup */ + bbsink_begin_backup(sink, startptr, starttli, tablespaces); /* Setup and activate network throttling, if client requested it */ if (opt->maxrate > 0) @@ -403,33 +398,28 @@ perform_base_backup(basebackup_options *opt) foreach(lc, tablespaces) { tablespaceinfo *ti = (tablespaceinfo *) lfirst(lc); - StringInfoData buf; - - /* Send CopyOutResponse message */ - pq_beginmessage(&buf, 'H'); - pq_sendbyte(&buf, 0); /* overall format */ - pq_sendint16(&buf, 0); /* natts */ - pq_endmessage(&buf); if (ti->path == NULL) { struct stat statbuf; bool sendtblspclinks = true; + bbsink_begin_archive(sink, "base.tar"); + /* In the main tar, include the backup_label first... */ - sendFileWithContent(BACKUP_LABEL_FILE, labelfile->data, + sendFileWithContent(sink, BACKUP_LABEL_FILE, labelfile->data, &manifest); /* Then the tablespace_map file, if required... */ if (opt->sendtblspcmapfile) { - sendFileWithContent(TABLESPACE_MAP, tblspc_map_file->data, + sendFileWithContent(sink, TABLESPACE_MAP, tblspc_map_file->data, &manifest); sendtblspclinks = false; } /* Then the bulk of the files... */ - sendDir(".", 1, false, tablespaces, sendtblspclinks, + sendDir(sink, ".", 1, false, tablespaces, sendtblspclinks, &manifest, NULL); /* ... and pg_control after everything else. */ @@ -438,24 +428,30 @@ perform_base_backup(basebackup_options *opt) (errcode_for_file_access(), errmsg("could not stat file \"%s\": %m", XLOG_CONTROL_FILE))); - sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, + sendFile(sink, XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false, InvalidOid, &manifest, NULL); } else - sendTablespace(ti->path, ti->oid, false, &manifest); + { + char *archive_name = psprintf("%s.tar", ti->oid); + + bbsink_begin_archive(sink, archive_name); + + sendTablespace(sink, ti->path, ti->oid, false, &manifest); + } /* * If we're including WAL, and this is the main data directory we - * don't terminate the tar stream here. Instead, we will append - * the xlog files below and terminate it then. This is safe since - * the main data directory is always sent *last*. + * don't treat this as the end of the tablespace. Instead, we will + * include the xlog files below and stop afterwards. This is safe + * since the main data directory is always sent *last*. */ if (opt->includewal && ti->path == NULL) { Assert(lnext(tablespaces, lc) == NULL); } else - pq_putemptymessage('c'); /* CopyDone */ + bbsink_end_archive(sink); tblspc_streamed++; pgstat_progress_update_param(PROGRESS_BASEBACKUP_TBLSPC_STREAMED, @@ -630,7 +626,7 @@ perform_base_backup(basebackup_options *opt) } /* send the WAL file itself */ - _tarWriteHeader(pathbuf, NULL, &statbuf, false); + _tarWriteHeader(sink, pathbuf, NULL, &statbuf, false); while ((cnt = basebackup_read_file(fd, buf, Min(sizeof(buf), @@ -638,10 +634,7 @@ perform_base_backup(basebackup_options *opt) len, pathbuf, true)) > 0) { CheckXLogRemoved(segno, tli); - /* Send the chunk as a CopyData message */ - if (pq_putmessage('d', buf, cnt)) - ereport(ERROR, - (errmsg("base backup could not send data, aborting backup"))); + bbsink_archive_contents(sink, buf, cnt); update_basebackup_progress(cnt); len += cnt; @@ -674,7 +667,7 @@ perform_base_backup(basebackup_options *opt) * complete segment. */ StatusFilePath(pathbuf, walFileName, ".done"); - sendFileWithContent(pathbuf, "", &manifest); + sendFileWithContent(sink, pathbuf, "", &manifest); } /* @@ -697,23 +690,22 @@ perform_base_backup(basebackup_options *opt) (errcode_for_file_access(), errmsg("could not stat file \"%s\": %m", pathbuf))); - sendFile(pathbuf, pathbuf, &statbuf, false, InvalidOid, + sendFile(sink, pathbuf, pathbuf, &statbuf, false, InvalidOid, &manifest, NULL); /* unconditionally mark file as archived */ StatusFilePath(pathbuf, fname, ".done"); - sendFileWithContent(pathbuf, "", &manifest); + sendFileWithContent(sink, pathbuf, "", &manifest); } - /* Send CopyDone message for the last tar file */ - pq_putemptymessage('c'); + bbsink_end_archive(sink); } AddWALInfoToBackupManifest(&manifest, startptr, starttli, endptr, endtli); - SendBackupManifest(&manifest); + SendBackupManifest(&manifest, sink); - SendXlogRecPtrResult(endptr, endtli); + bbsink_end_backup(sink, endptr, endtli); if (total_checksum_failures) { @@ -941,151 +933,11 @@ SendBaseBackup(BaseBackupCmd *cmd) perform_base_backup(&opt); } -static void -send_int8_string(StringInfoData *buf, int64 intval) -{ - char is[32]; - - sprintf(is, INT64_FORMAT, intval); - pq_sendint32(buf, strlen(is)); - pq_sendbytes(buf, is, strlen(is)); -} - -static void -SendBackupHeader(List *tablespaces) -{ - StringInfoData buf; - ListCell *lc; - - /* Construct and send the directory information */ - pq_beginmessage(&buf, 'T'); /* RowDescription */ - pq_sendint16(&buf, 3); /* 3 fields */ - - /* First field - spcoid */ - pq_sendstring(&buf, "spcoid"); - pq_sendint32(&buf, 0); /* table oid */ - pq_sendint16(&buf, 0); /* attnum */ - pq_sendint32(&buf, OIDOID); /* type oid */ - pq_sendint16(&buf, 4); /* typlen */ - pq_sendint32(&buf, 0); /* typmod */ - pq_sendint16(&buf, 0); /* format code */ - - /* Second field - spclocation */ - pq_sendstring(&buf, "spclocation"); - pq_sendint32(&buf, 0); - pq_sendint16(&buf, 0); - pq_sendint32(&buf, TEXTOID); - pq_sendint16(&buf, -1); - pq_sendint32(&buf, 0); - pq_sendint16(&buf, 0); - - /* Third field - size */ - pq_sendstring(&buf, "size"); - pq_sendint32(&buf, 0); - pq_sendint16(&buf, 0); - pq_sendint32(&buf, INT8OID); - pq_sendint16(&buf, 8); - pq_sendint32(&buf, 0); - pq_sendint16(&buf, 0); - pq_endmessage(&buf); - - foreach(lc, tablespaces) - { - tablespaceinfo *ti = lfirst(lc); - - /* Send one datarow message */ - pq_beginmessage(&buf, 'D'); - pq_sendint16(&buf, 3); /* number of columns */ - if (ti->path == NULL) - { - pq_sendint32(&buf, -1); /* Length = -1 ==> NULL */ - pq_sendint32(&buf, -1); - } - else - { - Size len; - - len = strlen(ti->oid); - pq_sendint32(&buf, len); - pq_sendbytes(&buf, ti->oid, len); - - len = strlen(ti->path); - pq_sendint32(&buf, len); - pq_sendbytes(&buf, ti->path, len); - } - if (ti->size >= 0) - send_int8_string(&buf, ti->size / 1024); - else - pq_sendint32(&buf, -1); /* NULL */ - - pq_endmessage(&buf); - } - - /* Send a CommandComplete message */ - pq_puttextmessage('C', "SELECT"); -} - -/* - * Send a single resultset containing just a single - * XLogRecPtr record (in text format) - */ -static void -SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli) -{ - StringInfoData buf; - char str[MAXFNAMELEN]; - Size len; - - pq_beginmessage(&buf, 'T'); /* RowDescription */ - pq_sendint16(&buf, 2); /* 2 fields */ - - /* Field headers */ - pq_sendstring(&buf, "recptr"); - pq_sendint32(&buf, 0); /* table oid */ - pq_sendint16(&buf, 0); /* attnum */ - pq_sendint32(&buf, TEXTOID); /* type oid */ - pq_sendint16(&buf, -1); - pq_sendint32(&buf, 0); - pq_sendint16(&buf, 0); - - pq_sendstring(&buf, "tli"); - pq_sendint32(&buf, 0); /* table oid */ - pq_sendint16(&buf, 0); /* attnum */ - - /* - * int8 may seem like a surprising data type for this, but in theory int4 - * would not be wide enough for this, as TimeLineID is unsigned. - */ - pq_sendint32(&buf, INT8OID); /* type oid */ - pq_sendint16(&buf, -1); - pq_sendint32(&buf, 0); - pq_sendint16(&buf, 0); - pq_endmessage(&buf); - - /* Data row */ - pq_beginmessage(&buf, 'D'); - pq_sendint16(&buf, 2); /* number of columns */ - - len = snprintf(str, sizeof(str), - "%X/%X", (uint32) (ptr >> 32), (uint32) ptr); - pq_sendint32(&buf, len); - pq_sendbytes(&buf, str, len); - - len = snprintf(str, sizeof(str), "%u", tli); - pq_sendint32(&buf, len); - pq_sendbytes(&buf, str, len); - - pq_endmessage(&buf); - - /* Send a CommandComplete message */ - pq_puttextmessage('C', "SELECT"); -} - /* * Inject a file with given name and content in the output tar stream. */ static void -sendFileWithContent(const char *filename, const char *content, +sendFileWithContent(bbsink *sink, const char *filename, const char *content, backup_manifest_info *manifest) { struct stat statbuf; @@ -1113,9 +965,8 @@ sendFileWithContent(const char *filename, const char *content, statbuf.st_mode = pg_file_create_mode; statbuf.st_size = len; - _tarWriteHeader(filename, NULL, &statbuf, false); - /* Send the contents as a CopyData message */ - pq_putmessage('d', content, len); + _tarWriteHeader(sink, filename, NULL, &statbuf, false); + bbsink_archive_contents(sink, content, len); update_basebackup_progress(len); /* Pad to a multiple of the tar block size. */ @@ -1125,7 +976,7 @@ sendFileWithContent(const char *filename, const char *content, char buf[TAR_BLOCK_SIZE]; MemSet(buf, 0, pad); - pq_putmessage('d', buf, pad); + bbsink_archive_contents(sink, buf, pad); update_basebackup_progress(pad); } @@ -1142,7 +993,7 @@ sendFileWithContent(const char *filename, const char *content, * Only used to send auxiliary tablespaces, not PGDATA. */ static int64 -sendTablespace(char *path, char *spcoid, bool sizeonly, +sendTablespace(bbsink *sink, char *path, char *spcoid, bool sizeonly, backup_manifest_info *manifest) { int64 size; @@ -1172,11 +1023,11 @@ sendTablespace(char *path, char *spcoid, bool sizeonly, return 0; } - size = _tarWriteHeader(TABLESPACE_VERSION_DIRECTORY, NULL, &statbuf, + size = _tarWriteHeader(sink, TABLESPACE_VERSION_DIRECTORY, NULL, &statbuf, sizeonly); /* Send all the files in the tablespace version directory */ - size += sendDir(pathbuf, strlen(path), sizeonly, NIL, true, manifest, + size += sendDir(sink, pathbuf, strlen(path), sizeonly, NIL, true, manifest, spcoid); return size; @@ -1195,8 +1046,8 @@ sendTablespace(char *path, char *spcoid, bool sizeonly, * as it will be sent separately in the tablespace_map file. */ static int64 -sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, - bool sendtblspclinks, backup_manifest_info *manifest, +sendDir(bbsink *sink, const char *path, int basepathlen, bool sizeonly, + List *tablespaces, bool sendtblspclinks, backup_manifest_info *manifest, const char *spcoid) { DIR *dir; @@ -1356,8 +1207,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, { elog(DEBUG1, "contents of directory \"%s\" excluded from backup", de->d_name); convert_link_to_directory(pathbuf, &statbuf); - size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf, - sizeonly); + size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, NULL, + &statbuf, sizeonly); excludeFound = true; break; } @@ -1374,8 +1225,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, { elog(DEBUG1, "contents of directory \"%s\" excluded from backup", statrelpath); convert_link_to_directory(pathbuf, &statbuf); - size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf, - sizeonly); + size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, NULL, + &statbuf, sizeonly); continue; } @@ -1388,15 +1239,15 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, { /* If pg_wal is a symlink, write it as a directory anyway */ convert_link_to_directory(pathbuf, &statbuf); - size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf, - sizeonly); + size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, NULL, + &statbuf, sizeonly); /* * Also send archive_status directory (by hackishly reusing * statbuf from above ...). */ - size += _tarWriteHeader("./pg_wal/archive_status", NULL, &statbuf, - sizeonly); + size += _tarWriteHeader(sink, "./pg_wal/archive_status", NULL, + &statbuf, sizeonly); continue; /* don't recurse into pg_wal */ } @@ -1427,7 +1278,7 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, pathbuf))); linkpath[rllen] = '\0'; - size += _tarWriteHeader(pathbuf + basepathlen + 1, linkpath, + size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, linkpath, &statbuf, sizeonly); #else @@ -1451,7 +1302,7 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, * Store a directory entry in the tar file so we can get the * permissions right. */ - size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf, + size += _tarWriteHeader(sink, pathbuf + basepathlen + 1, NULL, &statbuf, sizeonly); /* @@ -1483,7 +1334,7 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, skip_this_dir = true; if (!skip_this_dir) - size += sendDir(pathbuf, basepathlen, sizeonly, tablespaces, + size += sendDir(sink, pathbuf, basepathlen, sizeonly, tablespaces, sendtblspclinks, manifest, spcoid); } else if (S_ISREG(statbuf.st_mode)) @@ -1491,7 +1342,7 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, bool sent = false; if (!sizeonly) - sent = sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf, + sent = sendFile(sink, pathbuf, pathbuf + basepathlen + 1, &statbuf, true, isDbDir ? atooid(lastDir + 1) : InvalidOid, manifest, spcoid); @@ -1568,7 +1419,7 @@ is_checksummed_file(const char *fullpath, const char *filename) * and the file did not exist. */ static bool -sendFile(const char *readfilename, const char *tarfilename, +sendFile(bbsink *sink, const char *readfilename, const char *tarfilename, struct stat *statbuf, bool missing_ok, Oid dboid, backup_manifest_info *manifest, const char *spcoid) { @@ -1601,7 +1452,7 @@ sendFile(const char *readfilename, const char *tarfilename, errmsg("could not open file \"%s\": %m", readfilename))); } - _tarWriteHeader(tarfilename, NULL, statbuf, false); + _tarWriteHeader(sink, tarfilename, NULL, statbuf, false); if (!noverify_checksums && DataChecksumsEnabled()) { @@ -1756,10 +1607,7 @@ sendFile(const char *readfilename, const char *tarfilename, } } - /* Send the chunk as a CopyData message */ - if (pq_putmessage('d', buf, cnt)) - ereport(ERROR, - (errmsg("base backup could not send data, aborting backup"))); + bbsink_archive_contents(sink, buf, cnt); update_basebackup_progress(cnt); /* Also feed it to the checksum machinery. */ @@ -1776,7 +1624,7 @@ sendFile(const char *readfilename, const char *tarfilename, while (len < statbuf->st_size) { cnt = Min(sizeof(buf), statbuf->st_size - len); - pq_putmessage('d', buf, cnt); + bbsink_archive_contents(sink, buf, cnt); pg_checksum_update(&checksum_ctx, (uint8 *) buf, cnt); update_basebackup_progress(cnt); len += cnt; @@ -1793,7 +1641,7 @@ sendFile(const char *readfilename, const char *tarfilename, if (pad > 0) { MemSet(buf, 0, pad); - pq_putmessage('d', buf, pad); + bbsink_archive_contents(sink, buf, pad); update_basebackup_progress(pad); } @@ -1820,7 +1668,7 @@ sendFile(const char *readfilename, const char *tarfilename, static int64 -_tarWriteHeader(const char *filename, const char *linktarget, +_tarWriteHeader(bbsink *sink, const char *filename, const char *linktarget, struct stat *statbuf, bool sizeonly) { char h[TAR_BLOCK_SIZE]; @@ -1851,7 +1699,7 @@ _tarWriteHeader(const char *filename, const char *linktarget, elog(ERROR, "unrecognized tar error: %d", rc); } - pq_putmessage('d', h, sizeof(h)); + bbsink_archive_contents(sink, h, sizeof(h)); update_basebackup_progress(sizeof(h)); } diff --git a/src/backend/replication/basebackup_libpq.c b/src/backend/replication/basebackup_libpq.c new file mode 100644 index 0000000000..f0024a881a --- /dev/null +++ b/src/backend/replication/basebackup_libpq.c @@ -0,0 +1,309 @@ +/*------------------------------------------------------------------------- + * + * basebackup_libpq.c + * send archives and backup manifest to client via libpq + * + * Portions Copyright (c) 2010-2020, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/basebackup_libpq.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "catalog/pg_type_d.h" +#include "libpq/libpq.h" +#include "libpq/pqformat.h" +#include "replication/basebackup.h" +#include "replication/basebackup_sink.h" + +static void bbsink_libpq_begin_backup(bbsink *sink, XLogRecPtr startptr, + TimeLineID starttli, List *tablespaces); +static void bbsink_libpq_begin_archive(bbsink *sink, const char *archive_name); +static void bbsink_libpq_archive_contents(bbsink *sink, + const char *data, size_t len); +static void bbsink_libpq_end_archive(bbsink *sink); +static void bbsink_libpq_begin_manifest(bbsink *sink); +static void bbsink_libpq_manifest_contents(bbsink *sink, + const char *data, size_t len); +static void bbsink_libpq_end_manifest(bbsink *sink); +static void bbsink_libpq_end_backup(bbsink *sink, XLogRecPtr endptr, + TimeLineID endtli); + +static void SendCopyOutResponse(void); +static void SendCopyData(const char *data, size_t len); +static void SendCopyDone(void); +static void send_int8_string(StringInfoData *buf, int64 intval); +static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli); + +const bbsink_ops bbsink_libpq_ops = { + .begin_backup = bbsink_libpq_begin_backup, + .begin_archive = bbsink_libpq_begin_archive, + .archive_contents = bbsink_libpq_archive_contents, + .end_archive = bbsink_libpq_end_archive, + .begin_manifest = bbsink_libpq_begin_manifest, + .manifest_contents = bbsink_libpq_manifest_contents, + .end_manifest = bbsink_libpq_end_manifest, + .end_backup = bbsink_libpq_end_backup +}; + +/* + * Create a new 'libpq' bbsink. + */ +bbsink * +bbsink_libpq_new(void) +{ + bbsink *sink = palloc(sizeof(bbsink)); + + *((const bbsink_ops **) &sink->bbs_ops) = &bbsink_libpq_ops; + sink->bbs_next = NULL; + + return sink; +} + +/* + * Send start-of-backup wire protocol messages. + */ +static void +bbsink_libpq_begin_backup(bbsink *sink, XLogRecPtr startptr, TimeLineID starttli, + List *tablespaces) +{ + StringInfoData buf; + ListCell *lc; + + SendXlogRecPtrResult(startptr, starttli); + + /* Construct and send the directory information */ + pq_beginmessage(&buf, 'T'); /* RowDescription */ + pq_sendint16(&buf, 3); /* 3 fields */ + + /* First field - spcoid */ + pq_sendstring(&buf, "spcoid"); + pq_sendint32(&buf, 0); /* table oid */ + pq_sendint16(&buf, 0); /* attnum */ + pq_sendint32(&buf, OIDOID); /* type oid */ + pq_sendint16(&buf, 4); /* typlen */ + pq_sendint32(&buf, 0); /* typmod */ + pq_sendint16(&buf, 0); /* format code */ + + /* Second field - spclocation */ + pq_sendstring(&buf, "spclocation"); + pq_sendint32(&buf, 0); + pq_sendint16(&buf, 0); + pq_sendint32(&buf, TEXTOID); + pq_sendint16(&buf, -1); + pq_sendint32(&buf, 0); + pq_sendint16(&buf, 0); + + /* Third field - size */ + pq_sendstring(&buf, "size"); + pq_sendint32(&buf, 0); + pq_sendint16(&buf, 0); + pq_sendint32(&buf, INT8OID); + pq_sendint16(&buf, 8); + pq_sendint32(&buf, 0); + pq_sendint16(&buf, 0); + pq_endmessage(&buf); + + foreach(lc, tablespaces) + { + tablespaceinfo *ti = lfirst(lc); + + /* Send one datarow message */ + pq_beginmessage(&buf, 'D'); + pq_sendint16(&buf, 3); /* number of columns */ + if (ti->path == NULL) + { + pq_sendint32(&buf, -1); /* Length = -1 ==> NULL */ + pq_sendint32(&buf, -1); + } + else + { + Size len; + + len = strlen(ti->oid); + pq_sendint32(&buf, len); + pq_sendbytes(&buf, ti->oid, len); + + len = strlen(ti->path); + pq_sendint32(&buf, len); + pq_sendbytes(&buf, ti->path, len); + } + if (ti->size >= 0) + send_int8_string(&buf, ti->size / 1024); + else + pq_sendint32(&buf, -1); /* NULL */ + + pq_endmessage(&buf); + } + + /* Send a CommandComplete message */ + pq_puttextmessage('C', "SELECT"); +} + +/* + * Each archive is set as a separate stream of COPY data, and thus begins + * with a CopyOutResponse message. + */ +static void +bbsink_libpq_begin_archive(bbsink *sink, const char *archive_name) +{ + SendCopyOutResponse(); +} + +/* + * Each chunk of data within the archive is sent as a CopyData message. + */ +static void +bbsink_libpq_archive_contents(bbsink *sink, const char *data, size_t len) +{ + SendCopyData(data, len); +} + +/* + * The archive is terminated by a CopyDone message. + */ +static void +bbsink_libpq_end_archive(bbsink *sink) +{ + SendCopyDone(); +} + +/* + * The backup manifest is sent as a separate stream of COPY data, and thus + * begins with a CopyOutResponse message. + */ +static void +bbsink_libpq_begin_manifest(bbsink *sink) +{ + SendCopyOutResponse(); +} + +/* + * Each chunk of manifest data is sent using a CopyData message. + */ +static void +bbsink_libpq_manifest_contents(bbsink *sink, const char *data, size_t len) +{ + SendCopyData(data, len); +} + +/* + * When we've finished sending the manifest, send a CopyDone message. + */ +static void +bbsink_libpq_end_manifest(bbsink *sink) +{ + SendCopyDone(); +} + +/* + * Send end-of-backup wire protocol messages. + */ +static void +bbsink_libpq_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli) +{ + SendXlogRecPtrResult(endptr, endtli); +} + +/* + * Send a CopyOutResponse message. + */ +static void +SendCopyOutResponse(void) +{ + StringInfoData buf; + + pq_beginmessage(&buf, 'H'); + pq_sendbyte(&buf, 0); /* overall format */ + pq_sendint16(&buf, 0); /* natts */ + pq_endmessage(&buf); +} + +/* + * Send a CopyData message. + */ +static void +SendCopyData(const char *data, size_t len) +{ + pq_putmessage('d', data, len); +} + +/* + * Send a CopyDone message. + */ +static void +SendCopyDone(void) +{ + pq_putemptymessage('c'); +} + +/* + * Send a single resultset containing just a single + * XLogRecPtr record (in text format) + */ +static void +SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli) +{ + StringInfoData buf; + char str[MAXFNAMELEN]; + Size len; + + pq_beginmessage(&buf, 'T'); /* RowDescription */ + pq_sendint16(&buf, 2); /* 2 fields */ + + /* Field headers */ + pq_sendstring(&buf, "recptr"); + pq_sendint32(&buf, 0); /* table oid */ + pq_sendint16(&buf, 0); /* attnum */ + pq_sendint32(&buf, TEXTOID); /* type oid */ + pq_sendint16(&buf, -1); + pq_sendint32(&buf, 0); + pq_sendint16(&buf, 0); + + pq_sendstring(&buf, "tli"); + pq_sendint32(&buf, 0); /* table oid */ + pq_sendint16(&buf, 0); /* attnum */ + + /* + * int8 may seem like a surprising data type for this, but in theory int4 + * would not be wide enough for this, as TimeLineID is unsigned. + */ + pq_sendint32(&buf, INT8OID); /* type oid */ + pq_sendint16(&buf, -1); + pq_sendint32(&buf, 0); + pq_sendint16(&buf, 0); + pq_endmessage(&buf); + + /* Data row */ + pq_beginmessage(&buf, 'D'); + pq_sendint16(&buf, 2); /* number of columns */ + + len = snprintf(str, sizeof(str), + "%X/%X", (uint32) (ptr >> 32), (uint32) ptr); + pq_sendint32(&buf, len); + pq_sendbytes(&buf, str, len); + + len = snprintf(str, sizeof(str), "%u", tli); + pq_sendint32(&buf, len); + pq_sendbytes(&buf, str, len); + + pq_endmessage(&buf); + + /* Send a CommandComplete message */ + pq_puttextmessage('C', "SELECT"); +} + +/* + * Send a 64-bit integer as a string via the wire protocol. + */ +static void +send_int8_string(StringInfoData *buf, int64 intval) +{ + char is[32]; + + sprintf(is, INT64_FORMAT, intval); + pq_sendint32(buf, strlen(is)); + pq_sendbytes(buf, is, strlen(is)); +} diff --git a/src/include/replication/backup_manifest.h b/src/include/replication/backup_manifest.h index fb1291cbe4..bbd08f1852 100644 --- a/src/include/replication/backup_manifest.h +++ b/src/include/replication/backup_manifest.h @@ -12,9 +12,9 @@ #ifndef BACKUP_MANIFEST_H #define BACKUP_MANIFEST_H -#include "access/xlogdefs.h" #include "common/checksum_helper.h" #include "pgtime.h" +#include "replication/basebackup_sink.h" #include "storage/buffile.h" typedef enum manifest_option @@ -47,6 +47,6 @@ extern void AddWALInfoToBackupManifest(backup_manifest_info *manifest, XLogRecPtr startptr, TimeLineID starttli, XLogRecPtr endptr, TimeLineID endtli); -extern void SendBackupManifest(backup_manifest_info *manifest); +extern void SendBackupManifest(backup_manifest_info *manifest, bbsink *sink); #endif diff --git a/src/include/replication/basebackup_sink.h b/src/include/replication/basebackup_sink.h index 050cf1180d..a8df937957 100644 --- a/src/include/replication/basebackup_sink.h +++ b/src/include/replication/basebackup_sink.h @@ -173,4 +173,7 @@ extern void bbsink_forward_end_manifest(bbsink *sink); extern void bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli); +/* Constructors for various types of sinks. */ +extern bbsink *bbsink_libpq_new(void); + #endif -- 2.24.3 (Apple Git-128)