From 15d61fbcceb84d8b8c780309a58893a88a197411 Mon Sep 17 00:00:00 2001 From: Amul Sul Date: Mon, 17 Jun 2024 17:11:26 +0530 Subject: [PATCH v2 09/10] pg_verifybackup: Read tar files and verify its contents --- src/bin/pg_verifybackup/Makefile | 4 +- src/bin/pg_verifybackup/astreamer_verify.c | 250 +++++++++++++++++++++ src/bin/pg_verifybackup/meson.build | 6 +- src/bin/pg_verifybackup/pg_verifybackup.c | 220 +++++++++++++++++- src/bin/pg_verifybackup/pg_verifybackup.h | 10 + src/tools/pgindent/typedefs.list | 1 + 6 files changed, 480 insertions(+), 11 deletions(-) create mode 100644 src/bin/pg_verifybackup/astreamer_verify.c diff --git a/src/bin/pg_verifybackup/Makefile b/src/bin/pg_verifybackup/Makefile index 7c045f142e8..df7aaabd530 100644 --- a/src/bin/pg_verifybackup/Makefile +++ b/src/bin/pg_verifybackup/Makefile @@ -17,11 +17,13 @@ top_builddir = ../../.. include $(top_builddir)/src/Makefile.global # We need libpq only because fe_utils does. +override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS) LDFLAGS_INTERNAL += -L$(top_builddir)/src/fe_utils -lpgfeutils $(libpq_pgport) OBJS = \ $(WIN32RES) \ - pg_verifybackup.o + pg_verifybackup.o \ + astreamer_verify.o all: pg_verifybackup diff --git a/src/bin/pg_verifybackup/astreamer_verify.c b/src/bin/pg_verifybackup/astreamer_verify.c new file mode 100644 index 00000000000..9be9a9bc04a --- /dev/null +++ b/src/bin/pg_verifybackup/astreamer_verify.c @@ -0,0 +1,250 @@ +/*------------------------------------------------------------------------- + * + * astreamer_verify.c + * + * Extend fe_utils/astreamer.h archive streaming facility to verify TAR + * backup. + * + * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group + * + * src/bin/pg_verifybackup/astreamer_verify.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#include "common/logging.h" +#include "fe_utils/astreamer.h" +#include "pg_verifybackup.h" + +typedef struct astreamer_verify +{ + astreamer base; + verifier_context *context; + char *archive_name; + Oid tblspc_oid; + + manifest_file *mfile; + size_t received_bytes; + bool verify_checksums; + bool verify_sysid; + pg_checksum_context *checksum_ctx; +} astreamer_verify; + +static void astreamer_verify_content(astreamer *streamer, + astreamer_member *member, + const char *data, int len, + astreamer_archive_context context); +static void astreamer_verify_finalize(astreamer *streamer); +static void astreamer_verify_free(astreamer *streamer); + +static const astreamer_ops astreamer_verify_ops = { + .content = astreamer_verify_content, + .finalize = astreamer_verify_finalize, + .free = astreamer_verify_free +}; + +/* + * Create a astreamer that can verifies content of a TAR file. + */ +astreamer * +astreamer_verify_content_new(astreamer *next, verifier_context *context, + char *archive_name, Oid tblspc_oid) +{ + astreamer_verify *streamer; + + streamer = palloc0(sizeof(astreamer_verify)); + *((const astreamer_ops **) &streamer->base.bbs_ops) = + &astreamer_verify_ops; + + streamer->base.bbs_next = next; + streamer->context = context; + streamer->archive_name = archive_name; + streamer->tblspc_oid = tblspc_oid; + initStringInfo(&streamer->base.bbs_buffer); + + return &streamer->base; +} + +/* + * It verifies each TAR member entry against the manifest data and performs + * checksum verification if enabled. Additionally, it validates the backup's + * system identifier against the backup_manifest. + */ +static void +astreamer_verify_content(astreamer *streamer, + astreamer_member *member, const char *data, + int len, astreamer_archive_context context) +{ + astreamer_verify *mystreamer = (astreamer_verify *) streamer; + + Assert(context != ASTREAMER_UNKNOWN); + + switch (context) + { + case ASTREAMER_MEMBER_HEADER: + if (!member->is_directory && !member->is_link && + !should_ignore_relpath(mystreamer->context, member->pathname)) + { + manifest_file *m; + + /* + * The backup_manifest stores a relative path to the base + * directory for files belong tablespace, whereas + * .tar doesn't. Prepare the required path, + * otherwise, the manfiest entry verification will fail. + */ + if (OidIsValid(mystreamer->tblspc_oid)) + { + char temp[MAXPGPATH]; + + /* Copy original name at temporary space */ + memcpy(temp, member->pathname, MAXPGPATH); + + snprintf(member->pathname, MAXPGPATH, "%s/%d/%s", + "pg_tblspc", mystreamer->tblspc_oid, temp); + } + + /* Check the manifest entry */ + m = verify_manifest_entry(mystreamer->context, member->pathname, + member->size); + mystreamer->mfile = (void *) m; + + /* + * Prepare for checksum and manifest system identifier + * verification. + * + * We could have these checks while receiving contents. + * However, since contents are received in multiple iterations, + * this would result in these lengthy checks being performed + * multiple times. Instead, having a single flag would be more + * efficient. + */ + if (m != NULL) + { + mystreamer->verify_checksums = + (!skip_checksums && should_verify_checksum(m)); + mystreamer->verify_sysid = + should_verify_sysid(mystreamer->context->manifest, m); + } + } + break; + + case ASTREAMER_MEMBER_CONTENTS: + + /* + * Perform checksum verification as the file content becomes + * available, since the TAR format does not have random access to + * files like a normal backup directory, where checksum verification + * occurs at different points. + */ + if (mystreamer->verify_checksums) + { + /* If we were first time for this file */ + if (!mystreamer->checksum_ctx) + { + mystreamer->checksum_ctx = pg_malloc(sizeof(pg_checksum_context)); + + if (pg_checksum_init(mystreamer->checksum_ctx, + mystreamer->mfile->checksum_type) < 0) + { + report_backup_error(mystreamer->context, + "%s: could not initialize checksum of file \"%s\"", + mystreamer->archive_name, member->pathname); + mystreamer->verify_checksums = false; + return; + } + } + + /* Compute and do the checksum validation */ + mystreamer->verify_checksums = + verify_content_checksum(mystreamer->context, + mystreamer->checksum_ctx, + mystreamer->mfile, + (uint8 *) data, len, + &mystreamer->received_bytes); + } + + /* Do the manifest system identifier verification */ + if (mystreamer->verify_sysid) + { + ControlFileData control_file; + uint64 manifest_system_identifier; + pg_crc32c crc; + bool crc_ok; + + /* Should be here only for control file */ + Assert(strcmp(member->pathname, "global/pg_control") == 0); + Assert(mystreamer->context->manifest->version != 1); + + /* Should have whole control file data. */ + if (!astreamer_buffer_until(streamer, &data, &len, + sizeof(ControlFileData))) + return; + + pg_log_debug("%s: reading \"%s\"", mystreamer->archive_name, + member->pathname); + + if (streamer->bbs_buffer.len != sizeof(ControlFileData)) + report_fatal_error("%s: could not read control file: read %d of %zu", + mystreamer->archive_name, streamer->bbs_buffer.len, + sizeof(ControlFileData)); + + memcpy(&control_file, streamer->bbs_buffer.data, + sizeof(ControlFileData)); + + /* Check the CRC. */ + INIT_CRC32C(crc); + COMP_CRC32C(crc, + (char *) (&control_file), + offsetof(ControlFileData, crc)); + FIN_CRC32C(crc); + + crc_ok = EQ_CRC32C(crc, control_file.crc); + + manifest_system_identifier = + mystreamer->context->manifest->system_identifier; + + verify_control_file_data(&control_file, member->pathname, + crc_ok, manifest_system_identifier); + } + break; + + case ASTREAMER_MEMBER_TRAILER: + if (mystreamer->checksum_ctx) + pfree(mystreamer->checksum_ctx); + mystreamer->checksum_ctx = NULL; + mystreamer->mfile = NULL; + mystreamer->received_bytes = 0; + mystreamer->verify_checksums = false; + mystreamer->verify_sysid = false; + break; + + case ASTREAMER_ARCHIVE_TRAILER: + break; + + default: + /* Shouldn't happen. */ + pg_fatal("unexpected state while parsing tar archive"); + } +} + +/* + * End-of-stream processing for a astreamer_verify stream. + */ +static void +astreamer_verify_finalize(astreamer *streamer) +{ + Assert(streamer->bbs_next == NULL); +} + +/* + * Free memory associated with a astreamer_verify stream. + */ +static void +astreamer_verify_free(astreamer *streamer) +{ + pfree(streamer->bbs_buffer.data); + pfree(streamer); +} diff --git a/src/bin/pg_verifybackup/meson.build b/src/bin/pg_verifybackup/meson.build index 7c7d31a0350..1e3fcf7ee5a 100644 --- a/src/bin/pg_verifybackup/meson.build +++ b/src/bin/pg_verifybackup/meson.build @@ -1,7 +1,8 @@ # Copyright (c) 2022-2024, PostgreSQL Global Development Group pg_verifybackup_sources = files( - 'pg_verifybackup.c' + 'pg_verifybackup.c', + 'astreamer_verify.c' ) if host_system == 'windows' @@ -10,9 +11,10 @@ if host_system == 'windows' '--FILEDESC', 'pg_verifybackup - verify a backup against using a backup manifest']) endif +pg_verifybackup_deps = [frontend_code, libpq, lz4, zlib, zstd] pg_verifybackup = executable('pg_verifybackup', pg_verifybackup_sources, - dependencies: [frontend_code, libpq], + dependencies: pg_verifybackup_deps, kwargs: default_bin_args, ) bin_targets += pg_verifybackup diff --git a/src/bin/pg_verifybackup/pg_verifybackup.c b/src/bin/pg_verifybackup/pg_verifybackup.c index 0d458298f34..21b16c281cb 100644 --- a/src/bin/pg_verifybackup/pg_verifybackup.c +++ b/src/bin/pg_verifybackup/pg_verifybackup.c @@ -20,9 +20,12 @@ #include "common/compression.h" #include "common/parse_manifest.h" +#include "common/relpath.h" +#include "fe_utils/astreamer.h" #include "fe_utils/simple_list.h" #include "getopt_long.h" #include "pg_verifybackup.h" +#include "pgtar.h" #include "pgtime.h" /* @@ -39,6 +42,11 @@ */ #define ESTIMATED_BYTES_PER_MANIFEST_LINE 100 + +static void (*verify_backup_file_cb) (verifier_context *context, + char *relpath, char *fullpath, + size_t filesize); + static manifest_data *parse_manifest_file(char *manifest_path); static void verifybackup_version_cb(JsonManifestParseContext *context, int manifest_version); @@ -63,6 +71,15 @@ static void verify_backup_directory(verifier_context *context, char *relpath, char *fullpath); static void verify_backup_file(verifier_context *context, char *relpath, char *fullpath); +static void verify_plain_file_cb(verifier_context *context, + char *relpath, char *fullpath, + size_t filesize); +static void verify_tar_file_cb(verifier_context *context, + char *relpath, char *fullpath, + size_t filesize); +static void verify_tar_content(verifier_context *context, + char *relpath, char *fullpath, + astreamer *streamer); static void report_extra_backup_files(verifier_context *context); static void verify_backup_checksums(verifier_context *context); static void verify_file_checksum(verifier_context *context, @@ -71,6 +88,9 @@ static void verify_file_checksum(verifier_context *context, static void parse_required_wal(verifier_context *context, char *pg_waldump_path, char *wal_directory); +static astreamer *create_archive_verifier(verifier_context *context, + char *archive_name, + Oid tblspc_oid); static void progress_report(bool finished); static void usage(void); @@ -154,6 +174,10 @@ main(int argc, char **argv) */ simple_string_list_append(&context.ignore_list, "backup_manifest"); simple_string_list_append(&context.ignore_list, "pg_wal"); + simple_string_list_append(&context.ignore_list, "pg_wal.tar"); + simple_string_list_append(&context.ignore_list, "pg_wal.tar.gz"); + simple_string_list_append(&context.ignore_list, "pg_wal.tar.lz4"); + simple_string_list_append(&context.ignore_list, "pg_wal.tar.zst"); simple_string_list_append(&context.ignore_list, "postgresql.auto.conf"); simple_string_list_append(&context.ignore_list, "recovery.signal"); simple_string_list_append(&context.ignore_list, "standby.signal"); @@ -258,6 +282,15 @@ main(int argc, char **argv) if (format == 't' && !tar_compression_specified) compress_algorithm = find_backup_compression(&context); + /* + * Setup the required callback function to verify plain or tar backup + * files. + */ + if (format == 'p') + verify_backup_file_cb = verify_plain_file_cb; + else + verify_backup_file_cb = verify_tar_file_cb; + /* Unless --no-parse-wal was specified, we will need pg_waldump. */ if (!no_parse_wal) { @@ -637,7 +670,8 @@ verify_backup_directory(verifier_context *context, char *relpath, } /* - * Verify one file (which might actually be a directory or a symlink). + * Verify one file (which might actually be a directory, a symlink or a + * archive). * * The arguments to this function have the same meaning as the arguments to * verify_backup_directory. @@ -646,7 +680,6 @@ static void verify_backup_file(verifier_context *context, char *relpath, char *fullpath) { struct stat sb; - manifest_file *m; if (stat(fullpath, &sb) != 0) { @@ -679,8 +712,25 @@ verify_backup_file(verifier_context *context, char *relpath, char *fullpath) return; } + /* Do the further verifications */ + verify_backup_file_cb(context, relpath, fullpath, sb.st_size); +} + +/* + * Verify one plan file or a symlink. + * + * The arguments to this function are mostly the same as the + * verify_backup_directory. The additional argument is the file size for + * verifying against manifest entry. + */ +static void +verify_plain_file_cb(verifier_context *context, char *relpath, + char *fullpath, size_t filesize) +{ + manifest_file *m; + /* Check whether there's an entry in the manifest hash. */ - m = verify_manifest_entry(context, relpath, sb.st_size); + m = verify_manifest_entry(context, relpath, filesize); /* Validate the manifest system identifier */ if (m != NULL && should_verify_sysid(context->manifest, m)) @@ -698,6 +748,124 @@ verify_backup_file(verifier_context *context, char *relpath, char *fullpath) } } +/* + * Verify one tar file. + * + * The arguments to this function are mostly the same as the + * verify_backup_directory. The additional argument is the file size for + * verifying against manifest entry. + */ +static void +verify_tar_file_cb(verifier_context *context, char *relpath, + char *fullpath, size_t filesize) +{ + astreamer *streamer; + Oid tblspc_oid = InvalidOid; + int file_name_len; + int file_extn_len = 0; /* placate compiler */ + char *file_extn = ""; + + /* Should be tar backup */ + Assert(format == 't'); + + /* Find the tar file extension. */ + if (compress_algorithm == PG_COMPRESSION_NONE) + { + file_extn = ".tar"; + file_extn_len = 4; + + } + else if (compress_algorithm == PG_COMPRESSION_GZIP) + { + file_extn = ".tar.gz"; + file_extn_len = 7; + + } + else if (compress_algorithm == PG_COMPRESSION_LZ4) + { + file_extn = ".tar.lz4"; + file_extn_len = 8; + } + else if (compress_algorithm == PG_COMPRESSION_ZSTD) + { + file_extn = ".tar.zst"; + file_extn_len = 8; + } + + /* + * Ensure that we have the correct file type corresponding to the backup + * format. + */ + file_name_len = strlen(relpath); + if (file_name_len < file_extn_len || + strcmp(relpath + file_name_len - file_extn_len, file_extn) != 0) + { + if (compress_algorithm == PG_COMPRESSION_NONE) + report_backup_error(context, + "\"%s\" is not a valid file, expecting tar file", + relpath); + else + report_backup_error(context, + "\"%s\" is not a valid file, expecting \"%s\" compressed tar file", + relpath, + get_compress_algorithm_name(compress_algorithm)); + return; + } + + /* + * For the tablespace, pg_basebackup writes the data out to + * .tar. If a file matches that format, then extract the + * tablespaceoid, which we need to prepare the paths of the files + * belonging to that tablespace relative to the base directory. + */ + if (strspn(relpath, "0123456789") == (file_name_len - file_extn_len)) + tblspc_oid = strtoi64(relpath, NULL, 10); + + streamer = create_archive_verifier(context, relpath, tblspc_oid); + verify_tar_content(context, relpath, fullpath, streamer); + + /* Cleanup. */ + astreamer_finalize(streamer); + astreamer_free(streamer); +} + +/* + * Reads a given tar file in predefined chunks and pass to astreamer. Which + * initiates routines for decompression (if necessary) then verification + * of each member within the tar archive. + */ +static void +verify_tar_content(verifier_context *context, char *relpath, char *fullpath, + astreamer *streamer) +{ + int fd; + int rc; + char *buffer; + + /* Open the target file. */ + if ((fd = open(fullpath, O_RDONLY | PG_BINARY, 0)) < 0) + { + report_backup_error(context, "could not open file \"%s\": %m", + relpath); + return; + } + + buffer = pg_malloc(READ_CHUNK_SIZE * sizeof(uint8)); + + /* Perform the reads */ + while ((rc = read(fd, buffer, READ_CHUNK_SIZE)) > 0) + astreamer_content(streamer, NULL, buffer, rc, ASTREAMER_UNKNOWN); + + if (rc < 0) + report_backup_error(context, "could not read file \"%s\": %m", + relpath); + + /* Close the file. */ + if (close(fd) != 0) + report_backup_error(context, "could not close file \"%s\": %m", + relpath); +} + /* * Verify file and its size entry in the manifest. */ @@ -742,8 +910,8 @@ verify_manifest_entry(verifier_context *context, char *relpath, size_t filesize) } /* - * Sanity check control file and validate system identifier against manifest - * system identifier. + * Sanity check control file data and validate system identifier against + * manifest system identifier. */ void verify_control_file_data(ControlFileData *control_file, @@ -1124,10 +1292,10 @@ find_backup_format(verifier_context *context) } /* - * To determine the compression format, we will search for the main data - * directory archive and its extension, which starts with base.tar, as * pg_basebackup writes the main data directory to an archive file named - * base.tar followed by a compression type extension like .gz, .lz4, or .zst. + * base.tar, followed by a compression type extension such as .gz, .lz4, or + * .zst. To determine the compression format, we need to search for this main + * data directory archive file. */ static pg_compress_algorithm find_backup_compression(verifier_context *context) @@ -1178,6 +1346,42 @@ find_backup_compression(verifier_context *context) return PG_COMPRESSION_NONE; /* placate compiler */ } +/* + * Identifies the necessary steps for verifying the contents of the + * provided tar file. + */ +static astreamer * +create_archive_verifier(verifier_context *context, char *archive_name, + Oid tblspc_oid) +{ + astreamer *streamer = NULL; + + /* Should be here only for tar backup */ + Assert(format == 't'); + + /* + * To verify the contents of the tar file, the initial step is to parse + * its content. + */ + streamer = astreamer_verify_content_new(streamer, context, archive_name, + tblspc_oid); + streamer = astreamer_tar_parser_new(streamer); + + /* + * If the tar file is compressed, we must perform the appropriate + * decompression operation before proceeding with the verification of its + * contents. + */ + if (compress_algorithm == PG_COMPRESSION_GZIP) + streamer = astreamer_gzip_decompressor_new(streamer); + else if (compress_algorithm == PG_COMPRESSION_LZ4) + streamer = astreamer_lz4_decompressor_new(streamer); + else if (compress_algorithm == PG_COMPRESSION_ZSTD) + streamer = astreamer_zstd_decompressor_new(streamer); + + return streamer; +} + /* * Print out usage information and exit. */ diff --git a/src/bin/pg_verifybackup/pg_verifybackup.h b/src/bin/pg_verifybackup/pg_verifybackup.h index 64508578290..e27fe4da6a2 100644 --- a/src/bin/pg_verifybackup/pg_verifybackup.h +++ b/src/bin/pg_verifybackup/pg_verifybackup.h @@ -127,4 +127,14 @@ extern void report_fatal_error(const char *pg_restrict fmt,...) pg_attribute_printf(1, 2) pg_attribute_noreturn(); extern bool should_ignore_relpath(verifier_context *context, const char *relpath); +/* Forward declarations to avoid fe_utils/astreamer.h include. */ +struct astreamer; +typedef struct astreamer astreamer; + +extern astreamer *astreamer_verify_content_new(astreamer *next, + verifier_context *context, + char *archive_name, + Oid tblspc_oid); + + #endif /* PG_VERIFYBACKUP_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index b982dffa5fc..ab279c9eb39 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -3321,6 +3321,7 @@ astreamer_plain_writer astreamer_recovery_injector astreamer_tar_archiver astreamer_tar_parser +astreamer_verify astreamer_zstd_frame bgworker_main_type bh_node_type -- 2.18.0