From c543f4b20d79cce3bdc77f328a1fb0560c2cdfdb Mon Sep 17 00:00:00 2001 From: Nitin Motiani Date: Sat, 15 Feb 2025 08:05:25 +0000 Subject: [PATCH v5 2/5] Add pipe-command support in pg_restore * This is same as the pg_dump change. We add support for --pipe-command in directory archive format. This can be used to read from multiple streams and do pre-processing (decompression with a custom algorithm, filtering etc) before restore. Currently that is not possible because the pg_dump output of directory format can't just be piped. * Like pg_dump, here also either filename or --pipe-command can be set. If neither are set, the standard input is used as before. * This is only supported with compression none and archive format directory. * We reuse the inputFileSpec field for the pipe-command. And add a bool to specify if it is a pipe. * The changes made for pg_dump to handle the pipe case with popen and pclose also work here. * The logic of %f format specifier to read from the pg_dump output is the same too. Most of the code from the pg_dump commit works. We add similar logic to the function to read large objects. * The --pipe command works -l and -L option. --- src/bin/pg_dump/compress_io.c | 34 +++++++++++++---------- src/bin/pg_dump/pg_backup_directory.c | 13 ++++++++- src/bin/pg_dump/pg_restore.c | 39 ++++++++++++++++++++++----- 3 files changed, 64 insertions(+), 22 deletions(-) diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c index b3e1a133af8..7b151265165 100644 --- a/src/bin/pg_dump/compress_io.c +++ b/src/bin/pg_dump/compress_io.c @@ -259,26 +259,32 @@ InitDiscoverCompressFileHandle(const char *path, const char *mode, Assert(strcmp(mode, PG_BINARY_R) == 0); fname = pg_strdup(path); - - if (hasSuffix(fname, ".gz")) - compression_spec.algorithm = PG_COMPRESSION_GZIP; - else if (hasSuffix(fname, ".lz4")) - compression_spec.algorithm = PG_COMPRESSION_LZ4; - else if (hasSuffix(fname, ".zst")) - compression_spec.algorithm = PG_COMPRESSION_ZSTD; - else + /* + If the path is a pipe command, the compression algorithm + is none. + */ + if (!path_is_pipe_command) { - if (stat(path, &st) == 0) - compression_spec.algorithm = PG_COMPRESSION_NONE; - else if (check_compressed_file(path, &fname, "gz")) + if (hasSuffix(fname, ".gz")) compression_spec.algorithm = PG_COMPRESSION_GZIP; - else if (check_compressed_file(path, &fname, "lz4")) + else if (hasSuffix(fname, ".lz4")) compression_spec.algorithm = PG_COMPRESSION_LZ4; - else if (check_compressed_file(path, &fname, "zst")) + else if (hasSuffix(fname, ".zst")) compression_spec.algorithm = PG_COMPRESSION_ZSTD; + else + { + if (stat(path, &st) == 0) + compression_spec.algorithm = PG_COMPRESSION_NONE; + else if (check_compressed_file(path, &fname, "gz")) + compression_spec.algorithm = PG_COMPRESSION_GZIP; + else if (check_compressed_file(path, &fname, "lz4")) + compression_spec.algorithm = PG_COMPRESSION_LZ4; + else if (check_compressed_file(path, &fname, "zst")) + compression_spec.algorithm = PG_COMPRESSION_ZSTD; + } } - CFH = InitCompressFileHandle(compression_spec, path_is_pipe_command); /* TODO: try to make it work also with pipes */ + CFH = InitCompressFileHandle(compression_spec, path_is_pipe_command); if (!CFH->open_func(fname, -1, mode, CFH)) { free_keep_errno(CFH); diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c index db4b627a0b8..35b7d5ab6e7 100644 --- a/src/bin/pg_dump/pg_backup_directory.c +++ b/src/bin/pg_dump/pg_backup_directory.c @@ -440,7 +440,18 @@ _LoadLOs(ArchiveHandle *AH, TocEntry *te) tocfname, line); StartRestoreLO(AH, oid, AH->public.ropt->dropSchema); - snprintf(path, MAXPGPATH, "%s/%s", ctx->directory, lofname); + /* TODO: This logic for naming blob files is common betwen _LoadLOs an _StartLO. + * Refactor in a helper function. + */ + if (AH->fSpecIsPipe) + { + pipe = replace_percent_placeholders(ctx->directory, "pipe-command", "f", lofname); + strcpy(path, pipe); + } + else + { + snprintf(path, MAXPGPATH, "%s/%s", ctx->directory, lofname); + } _PrintFileData(AH, path); EndRestoreLO(AH, oid); } diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c index 46d90aec946..f552eabe0d8 100644 --- a/src/bin/pg_dump/pg_restore.c +++ b/src/bin/pg_dump/pg_restore.c @@ -59,7 +59,7 @@ static void usage(const char *progname); static void read_restore_filters(const char *filename, RestoreOptions *opts); static bool file_exists_in_directory(const char *dir, const char *filename); static int restore_one_database(const char *inputFileSpec, RestoreOptions *opts, - int numWorkers, bool append_data, int num); + int numWorkers, bool append_data, int num, bool filespec_is_pipe); static int read_one_statement(StringInfo inBuf, FILE *pfile); static int restore_all_databases(PGconn *conn, const char *dumpdirpath, SimpleStringList db_exclude_patterns, RestoreOptions *opts, int numWorkers); @@ -94,6 +94,7 @@ main(int argc, char **argv) int n_errors = 0; bool globals_only = false; SimpleStringList db_exclude_patterns = {NULL, NULL}; + bool filespec_is_pipe = false; static int disable_triggers = 0; static int enable_row_security = 0; static int if_exists = 0; @@ -175,6 +176,7 @@ main(int argc, char **argv) {"statistics-only", no_argument, &statistics_only, 1}, {"filter", required_argument, NULL, 4}, {"exclude-database", required_argument, NULL, 6}, + {"pipe-command", required_argument, NULL, 7}, {NULL, 0, NULL, 0} }; @@ -356,6 +358,11 @@ main(int argc, char **argv) simple_string_list_append(&db_exclude_patterns, optarg); break; + case 7: /* pipe-command */ + inputFileSpec = pg_strdup(optarg); + filespec_is_pipe = true; + break; + default: /* getopt_long already emitted a complaint */ pg_log_error_hint("Try \"%s --help\" for more information.", progname); @@ -363,11 +370,23 @@ main(int argc, char **argv) } } - /* Get file name from command line */ + /* Get file name from command line. Note that filename argument and pipe-command can't both be set. */ if (optind < argc) + { + if (filespec_is_pipe) + { + pg_log_error_hint("Only one of [filespec, --pipe-command] allowed"); + exit_nicely(1); + } inputFileSpec = argv[optind++]; - else + } + /* Even if the file argument is not provided, if the pipe-command is specified, we need to use that + * as the file arg and not fallback to stdio. + */ + else if (!filespec_is_pipe) + { inputFileSpec = NULL; + } /* Complain if any arguments remain */ if (optind < argc) @@ -577,7 +596,7 @@ main(int argc, char **argv) if (globals_only) pg_fatal("option -g/--globals-only can be used only when restoring an archive created by pg_dumpall"); - n_errors = restore_one_database(inputFileSpec, opts, numWorkers, false, 0); + n_errors = restore_one_database(inputFileSpec, opts, numWorkers, false, 0, filespec_is_pipe); } /* Done, print a summary of ignored errors during restore. */ @@ -599,12 +618,18 @@ main(int argc, char **argv) */ static int restore_one_database(const char *inputFileSpec, RestoreOptions *opts, - int numWorkers, bool append_data, int num) + int numWorkers, bool append_data, int num, bool filespec_is_pipe) { Archive *AH; int n_errors; - AH = OpenArchive(inputFileSpec, opts->format, false); /*TODO: support pipes in restore */ + if (filespec_is_pipe && opts->format != archDirectory) + { + pg_log_error_hint("Option --pipe-command is only supported with directory format."); + exit_nicely(1); + } + + AH = OpenArchive(inputFileSpec, opts->format, filespec_is_pipe); SetArchiveOptions(AH, NULL, opts); @@ -1267,7 +1292,7 @@ restore_all_databases(PGconn *conn, const char *dumpdirpath, opts->dumpStatistics = dumpStatistics; /* Restore the single database. */ - n_errors = restore_one_database(subdirpath, opts, numWorkers, true, count); + n_errors = restore_one_database(subdirpath, opts, numWorkers, true, count, false); /* Print a summary of ignored errors during single database restore. */ if (n_errors) -- 2.49.0.1266.g31b7d2e469-goog