From c1a2b056af1d9b2be16b4f5ee6e2731f6ce7bb3a Mon Sep 17 00:00:00 2001 From: Nitin Motiani Date: Sat, 15 Feb 2025 08:05:25 +0000 Subject: [PATCH v11 2/5] Add pipe-command support in pg_restore * This is same as the pg_dump change. We add support for --pipe-command in directory archive format. This can be used to read from multiple streams and do pre-processing (decompression with a custom algorithm, filtering etc) before restore. Currently that is not possible because the pg_dump output of directory format can't just be piped. * Like pg_dump, here also either filename or --pipe-command can be set. If neither are set, the standard input is used as before. * This is only supported with compression none and archive format directory. * We reuse the inputFileSpec field for the pipe-command. And add a bool to specify if it is a pipe. * The changes made for pg_dump to handle the pipe case with popen and pclose also work here. * The logic of %f format specifier to read from the pg_dump output is the same too. Most of the code from the pg_dump commit works. We add similar logic to the function to read large objects. * The --pipe command works -l and -L option. --- src/bin/pg_dump/compress_io.c | 30 +++++++++------ src/bin/pg_dump/pg_backup_directory.c | 16 +++++++- src/bin/pg_dump/pg_restore.c | 53 ++++++++++++++++++++------- 3 files changed, 72 insertions(+), 27 deletions(-) diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c index bc521dd274b..88488186b34 100644 --- a/src/bin/pg_dump/compress_io.c +++ b/src/bin/pg_dump/compress_io.c @@ -260,22 +260,28 @@ InitDiscoverCompressFileHandle(const char *path, const char *mode, fname = pg_strdup(path); - if (hasSuffix(fname, ".gz")) - compression_spec.algorithm = PG_COMPRESSION_GZIP; - else if (hasSuffix(fname, ".lz4")) - compression_spec.algorithm = PG_COMPRESSION_LZ4; - else if (hasSuffix(fname, ".zst")) - compression_spec.algorithm = PG_COMPRESSION_ZSTD; - else + /* + * If the path is a pipe command, the compression algorithm is none. + */ + if (!path_is_pipe_command) { - if (stat(path, &st) == 0) - compression_spec.algorithm = PG_COMPRESSION_NONE; - else if (check_compressed_file(path, &fname, "gz")) + if (hasSuffix(fname, ".gz")) compression_spec.algorithm = PG_COMPRESSION_GZIP; - else if (check_compressed_file(path, &fname, "lz4")) + else if (hasSuffix(fname, ".lz4")) compression_spec.algorithm = PG_COMPRESSION_LZ4; - else if (check_compressed_file(path, &fname, "zst")) + else if (hasSuffix(fname, ".zst")) compression_spec.algorithm = PG_COMPRESSION_ZSTD; + else + { + if (stat(path, &st) == 0) + compression_spec.algorithm = PG_COMPRESSION_NONE; + else if (check_compressed_file(path, &fname, "gz")) + compression_spec.algorithm = PG_COMPRESSION_GZIP; + else if (check_compressed_file(path, &fname, "lz4")) + compression_spec.algorithm = PG_COMPRESSION_LZ4; + else if (check_compressed_file(path, &fname, "zst")) + compression_spec.algorithm = PG_COMPRESSION_ZSTD; + } } CFH = InitCompressFileHandle(compression_spec, path_is_pipe_command); diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c index 74fc651f6f4..2b18c3c8270 100644 --- a/src/bin/pg_dump/pg_backup_directory.c +++ b/src/bin/pg_dump/pg_backup_directory.c @@ -439,7 +439,21 @@ _LoadLOs(ArchiveHandle *AH, TocEntry *te) tocfname, line); StartRestoreLO(AH, oid, AH->public.ropt->dropSchema); - snprintf(path, MAXPGPATH, "%s/%s", ctx->directory, lofname); + + /* + * XXX : Create a helper function for blob files naming common to + * _LoadLOs an _StartLO. + */ + if (AH->fSpecIsPipe) + { + pipe = replace_percent_placeholders(ctx->directory, "pipe-command", "f", lofname); + strcpy(path, pipe); + pfree(pipe); + } + else + { + snprintf(path, MAXPGPATH, "%s/%s", ctx->directory, lofname); + } _PrintFileData(AH, path); EndRestoreLO(AH, oid); } diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c index 703b2677b4e..cbac8bc2520 100644 --- a/src/bin/pg_dump/pg_restore.c +++ b/src/bin/pg_dump/pg_restore.c @@ -60,11 +60,11 @@ static void usage(const char *progname); static void read_restore_filters(const char *filename, RestoreOptions *opts); static bool file_exists_in_directory(const char *dir, const char *filename); static int restore_one_database(const char *inputFileSpec, RestoreOptions *opts, - int numWorkers, bool append_data); -static int restore_global_objects(const char *inputFileSpec, RestoreOptions *opts); + int numWorkers, bool append_data, bool filespec_is_pipe); +static int restore_global_objects(const char *inputFileSpec, RestoreOptions *opts, bool filespec_is_pipe); static int restore_all_databases(const char *inputFileSpec, - SimpleStringList db_exclude_patterns, RestoreOptions *opts, int numWorkers); + SimpleStringList db_exclude_patterns, RestoreOptions *opts, int numWorkers, bool filespec_is_pipe); static int get_dbnames_list_to_restore(PGconn *conn, SimplePtrList *dbname_oid_list, SimpleStringList db_exclude_patterns); @@ -93,6 +93,7 @@ main(int argc, char **argv) int n_errors = 0; bool globals_only = false; SimpleStringList db_exclude_patterns = {NULL, NULL}; + bool filespec_is_pipe = false; static int disable_triggers = 0; static int enable_row_security = 0; static int if_exists = 0; @@ -173,6 +174,7 @@ main(int argc, char **argv) {"filter", required_argument, NULL, 4}, {"restrict-key", required_argument, NULL, 6}, {"exclude-database", required_argument, NULL, 7}, + {"pipe-command", required_argument, NULL, 8}, {NULL, 0, NULL, 0} }; @@ -356,6 +358,11 @@ main(int argc, char **argv) simple_string_list_append(&db_exclude_patterns, optarg); break; + case 8: /* pipe-command */ + inputFileSpec = pg_strdup(optarg); + filespec_is_pipe = true; + break; + default: /* getopt_long already emitted a complaint */ pg_log_error_hint("Try \"%s --help\" for more information.", progname); @@ -363,11 +370,29 @@ main(int argc, char **argv) } } - /* Get file name from command line */ + /* + * Get file name from command line. Note that filename argument and + * pipe-command can't both be set. + */ if (optind < argc) + { + if (filespec_is_pipe) + { + pg_log_error_hint("Only one of [filespec, --pipe-command] allowed"); + exit_nicely(1); + } inputFileSpec = argv[optind++]; - else + } + + /* + * Even if the file argument is not provided, if the pipe-command is + * specified, we need to use that as the file arg and not fallback to + * stdio. + */ + else if (!filespec_is_pipe) + { inputFileSpec = NULL; + } /* Complain if any arguments remain */ if (optind < argc) @@ -594,7 +619,7 @@ main(int argc, char **argv) snprintf(global_path, MAXPGPATH, "%s/toc.glo", inputFileSpec); if (!no_globals) - n_errors = restore_global_objects(global_path, tmpopts); + n_errors = restore_global_objects(global_path, tmpopts, filespec_is_pipe); else pg_log_info("skipping restore of global objects because %s was specified", "--no-globals"); @@ -606,7 +631,7 @@ main(int argc, char **argv) { /* Now restore all the databases from map.dat */ n_errors = n_errors + restore_all_databases(inputFileSpec, db_exclude_patterns, - opts, numWorkers); + opts, numWorkers, filespec_is_pipe); } /* Free db pattern list. */ @@ -626,7 +651,7 @@ main(int argc, char **argv) "-g/--globals-only"); /* Process if toc.glo file does not exist. */ - n_errors = restore_one_database(inputFileSpec, opts, numWorkers, false); + n_errors = restore_one_database(inputFileSpec, opts, numWorkers, false, filespec_is_pipe); } /* Done, print a summary of ignored errors during restore. */ @@ -645,7 +670,7 @@ main(int argc, char **argv) * This restore all global objects. */ static int -restore_global_objects(const char *inputFileSpec, RestoreOptions *opts) +restore_global_objects(const char *inputFileSpec, RestoreOptions *opts, bool filespec_is_pipe) { Archive *AH; int nerror = 0; @@ -654,7 +679,7 @@ restore_global_objects(const char *inputFileSpec, RestoreOptions *opts) opts->format = archCustom; opts->txn_size = 0; - AH = OpenArchive(inputFileSpec, opts->format, false); + AH = OpenArchive(inputFileSpec, opts->format, filespec_is_pipe); SetArchiveOptions(AH, NULL, opts); @@ -691,12 +716,12 @@ restore_global_objects(const char *inputFileSpec, RestoreOptions *opts) */ static int restore_one_database(const char *inputFileSpec, RestoreOptions *opts, - int numWorkers, bool append_data) + int numWorkers, bool append_data, bool filespec_is_pipe) { Archive *AH; int n_errors; - AH = OpenArchive(inputFileSpec, opts->format, false); + AH = OpenArchive(inputFileSpec, opts->format, filespec_is_pipe); SetArchiveOptions(AH, NULL, opts); @@ -1145,7 +1170,7 @@ get_dbname_oid_list_from_mfile(const char *dumpdirpath, static int restore_all_databases(const char *inputFileSpec, SimpleStringList db_exclude_patterns, RestoreOptions *opts, - int numWorkers) + int numWorkers, bool filespec_is_pipe) { SimplePtrList dbname_oid_list = {NULL, NULL}; int num_db_restore = 0; @@ -1309,7 +1334,7 @@ restore_all_databases(const char *inputFileSpec, } /* Restore the single database. */ - n_errors = restore_one_database(subdirpath, tmpopts, numWorkers, true); + n_errors = restore_one_database(subdirpath, tmpopts, numWorkers, true, filespec_is_pipe); n_errors_total += n_errors; -- 2.54.0.545.g6539524ca2-goog