From 8b205bd8a109892779dc804fb75d06d9a71db396 Mon Sep 17 00:00:00 2001 From: Timothy Garnett Date: Tue, 23 Apr 2013 11:21:25 -0400 Subject: [PATCH 1/1] patch pg_restore to support partial parallel restore (parallel restore of indexes/constraints, but not data) of custom format dumps received vai stdin --- src/bin/pg_dump/pg_backup_archiver.c | 66 ++++++++++++++++++++++-------------- src/bin/pg_dump/pg_backup_custom.c | 5 +++ 2 files changed, 45 insertions(+), 26 deletions(-) diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index c176b65..537b0c1 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -115,7 +115,6 @@ typedef struct _outputContext /* translator: this is a module name */ static const char *modulename = gettext_noop("archiver"); - static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt, const int compression, ArchiveMode mode); static void _getObjectDescription(PQExpBuffer buf, TocEntry *te, @@ -316,18 +315,12 @@ RestoreArchive(Archive *AHX) if (parallel_mode) { /* We haven't got round to making this work for all archive formats */ - if (AH->ClonePtr == NULL || AH->ReopenPtr == NULL) + if (AH->ClonePtr == NULL) exit_horribly(modulename, "parallel restore is not supported with this archive file format\n"); /* Doesn't work if the archive represents dependencies as OIDs */ if (AH->version < K_VERS_1_8) exit_horribly(modulename, "parallel restore is not supported with archives made by pre-8.0 pg_dump\n"); - - /* - * It's also not gonna work if we can't reopen the input file, so - * let's try that immediately. - */ - (AH->ReopenPtr) (AH); } /* @@ -3465,12 +3458,19 @@ on_exit_close_archive(Archive *AHX) * Main engine for parallel restore. * * Work is done in three phases. - * First we process all SECTION_PRE_DATA tocEntries, in a single connection, - * just as for a standard restore. Second we process the remaining non-ACL - * steps in parallel worker children (threads on Windows, processes on Unix), - * each of which connects separately to the database. Finally we process all - * the ACL entries in a single connection (that happens back in - * RestoreArchive). + * 1) First we process all SECTION_PRE_DATA tocEntries, in a single connection, + * just as for a standard restore. If we are unable to reopen the archive + * (say from STDIN) we also do SECTION_DATA tocEntries as well. + * 2) Second we process the remaining non-ACL steps in parallel worker children + * (threads on Windows, processes on Unix), each of which connects separately + * to the database. + * 3) Finally we process all the ACL entries in a single connection + * (that happens back in RestoreArchive). + * + * As a future improvement, in the case of unable to reopen the archive, it + * should be possible do SECTION_POST_DATA entries in parallel with the + * SECTION_DATA entries as their dependencies are met. But, that requires + * some re-working of the flow here. */ static void restore_toc_entries_parallel(ArchiveHandle *AH) @@ -3520,23 +3520,32 @@ restore_toc_entries_parallel(ArchiveHandle *AH) /* NB: process-or-continue logic must be the inverse of loop below */ if (next_work_item->section != SECTION_PRE_DATA) { - /* DATA and POST_DATA items are just ignored for now */ - if (next_work_item->section == SECTION_DATA || - next_work_item->section == SECTION_POST_DATA) + /* Detect if we can re-open the archive (needed for parallel data + * restore). If not process SECTION_DATA entries as well. */ + if (next_work_item->section == SECTION_DATA && AH->ReopenPtr == NULL) { - skipped_some = true; - continue; + /* fall through to process toc entry */ } else { - /* - * SECTION_NONE items, such as comments, can be processed now - * if we are still in the PRE_DATA part of the archive. Once - * we've skipped any items, we have to consider whether the - * comment's dependencies are satisfied, so skip it for now. - */ - if (skipped_some) + /* DATA and POST_DATA items are just ignored for now */ + if (next_work_item->section == SECTION_DATA || + next_work_item->section == SECTION_POST_DATA) + { + skipped_some = true; continue; + } + else + { + /* + * SECTION_NONE items, such as comments, can be processed now + * if we are still in the PRE_DATA part of the archive. Once + * we've skipped any items, we have to consider whether the + * comment's dependencies are satisfied, so skip it for now. + */ + if (skipped_some) + continue; + } } } @@ -3595,6 +3604,11 @@ restore_toc_entries_parallel(ArchiveHandle *AH) /* All PRE_DATA items were dealt with above */ continue; } + if (next_work_item->section == SECTION_DATA && AH->ReopenPtr == NULL) + { + /* If not able to reopen we handled SECTION_DATA above */ + continue; + } if (next_work_item->section == SECTION_DATA || next_work_item->section == SECTION_POST_DATA) { diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c index f7dc5be..4c9aa44 100644 --- a/src/bin/pg_dump/pg_backup_custom.c +++ b/src/bin/pg_dump/pg_backup_custom.c @@ -184,6 +184,11 @@ InitArchiveFmt_Custom(ArchiveHandle *AH) ctx->dataStart = _getFilePos(AH, ctx); } + /* clear the ReopenPtr if we can't actually reopen the file (say STDIN) */ + if (AH->fSpec == NULL || strcmp(AH->fSpec, "") == 0 || !ctx->hasSeek) + { + AH->ReopenPtr = NULL; + } } /* -- 1.8.2.1