Re: parallel pg_restore - WIP patch - Mailing list pgsql-hackers

From Andrew Dunstan
Subject Re: parallel pg_restore - WIP patch
Date
Msg-id 48E04450.9080501@dunslane.net
Whole thread Raw
In response to Re: parallel pg_restore - WIP patch  (Andrew Dunstan <andrew@dunslane.net>)
Responses Re: parallel pg_restore - WIP patch  (Stefan Kaltenbrunner <stefan@kaltenbrunner.cc>)
List pgsql-hackers

Andrew Dunstan wrote:
>
>
>>
>> this works better but there is something fishy still - using the same
>> dump file I get a proper restore using pg_restore normally. If I
>> however use -m for a parallel one I only get parts (in this case only
>> 243 of the 709 tables) of the database restored ...
>>
>>
>>
>
> Yes, there are several funny things going on, including some stuff
> with dependencies. I'll have a new patch tomorrow with luck. Thanks
> for testing.
>
>

OK, in this version a whole heap of bugs are fixed, mainly those to do
with dependencies and saved state. I get identical row counts in the
source and destination now, quite reliably.

cheers

andrew
Index: pg_backup.h
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup.h,v
retrieving revision 1.47
diff -c -r1.47 pg_backup.h
*** pg_backup.h    13 Apr 2008 03:49:21 -0000    1.47
--- pg_backup.h    29 Sep 2008 02:43:51 -0000
***************
*** 123,128 ****
--- 123,130 ----
      int            suppressDumpWarnings;    /* Suppress output of WARNING entries
                                           * to stderr */
      bool        single_txn;
+     int         number_of_threads;
+     bool        truncate_before_load;

      bool       *idWanted;        /* array showing which dump IDs to emit */
  } RestoreOptions;
***************
*** 165,170 ****
--- 167,173 ----
  extern void CloseArchive(Archive *AH);

  extern void RestoreArchive(Archive *AH, RestoreOptions *ropt);
+ extern void RestoreArchiveParallel(Archive *AH, RestoreOptions *ropt);

  /* Open an existing archive */
  extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt);
Index: pg_backup_archiver.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_archiver.c,v
retrieving revision 1.158
diff -c -r1.158 pg_backup_archiver.c
*** pg_backup_archiver.c    5 Sep 2008 23:53:42 -0000    1.158
--- pg_backup_archiver.c    29 Sep 2008 02:43:52 -0000
***************
*** 27,38 ****
--- 27,50 ----

  #include <unistd.h>

+ #include <sys/types.h>
+ #include <sys/wait.h>
+
+
  #ifdef WIN32
  #include <io.h>
  #endif

  #include "libpq/libpq-fs.h"

+ typedef struct _parallel_slot
+ {
+     pid_t   pid;
+     TocEntry *te;
+     DumpId  dumpId;
+ } ParallelSlot;
+
+ #define NO_SLOT (-1)

  const char *progname;

***************
*** 70,76 ****
--- 82,99 ----
  static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
  static OutputContext SetOutput(ArchiveHandle *AH, char *filename, int compression);
  static void ResetOutput(ArchiveHandle *AH, OutputContext savedContext);
+ static bool work_is_being_done(ParallelSlot *slot, int n_slots);
+ static int get_next_slot(ParallelSlot *slots, int n_slots);
+ static TocEntry *get_next_work_item(ArchiveHandle *AH);
+ static void prestore(ArchiveHandle *AH, TocEntry *te);
+ static void mark_work_done(ArchiveHandle *AH, pid_t worker, ParallelSlot *slots, int n_slots);
+ static int _restore_one_te(ArchiveHandle *ah, TocEntry *te, RestoreOptions *ropt,bool is_parallel);
+ static void _reduce_dependencies(ArchiveHandle * AH, TocEntry *te);
+ static void _fix_dependency_counts(ArchiveHandle *AH);
+ static void _inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry * te);
+

+ static ArchiveHandle *GAH;

  /*
   *    Wrapper functions.
***************
*** 125,137 ****

  /* Public */
  void
  RestoreArchive(Archive *AHX, RestoreOptions *ropt)
  {
      ArchiveHandle *AH = (ArchiveHandle *) AHX;
      TocEntry   *te;
      teReqs        reqs;
      OutputContext sav;
-     bool        defnDumped;

      AH->ropt = ropt;
      AH->stage = STAGE_INITIALIZING;
--- 148,579 ----

  /* Public */
  void
+ RestoreArchiveParallel(Archive *AHX, RestoreOptions *ropt)
+ {
+
+     ArchiveHandle *AH = (ArchiveHandle *) AHX;
+     ParallelSlot  *slots;
+     int next_slot;
+     TocEntry *next_work_item = NULL;
+     int work_status;
+     pid_t ret_child;
+     int n_slots = ropt->number_of_threads;
+     TocEntry *te;
+     teReqs    reqs;
+
+
+     /*     AH->debugLevel = 99; */
+     /* some routines that use ahlog() don't get passed AH */
+     GAH = AH;
+
+     ahlog(AH,1,"entering RestoreARchiveParallel\n");
+
+
+     slots = (ParallelSlot *) calloc(sizeof(ParallelSlot),n_slots);
+     AH->ropt = ropt;
+
+ /*
+     if (ropt->create)
+         die_horribly(AH,modulename,
+                      "parallel restore is incompatible with --create\n");
+ */
+
+
+     if (ropt->dropSchema)
+         die_horribly(AH,modulename,
+                      "parallel restore is incompatible with --clean\n");
+
+     if (!ropt->useDB)
+         die_horribly(AH,modulename,
+                      "parallel restore requires direct database connection\n");
+
+
+ #ifndef HAVE_LIBZ
+
+     /* make sure we won't need (de)compression we haven't got */
+     if (AH->compression != 0 && AH->PrintTocDataPtr != NULL)
+     {
+         for (te = AH->toc->next; te != AH->toc; te = te->next)
+         {
+             reqs = _tocEntryRequired(te, ropt, false);
+             if (te->hadDumper && (reqs & REQ_DATA) != 0)
+                 die_horribly(AH, modulename,
+                              "cannot restore from compressed archive (compression not supported in this
installation)\n");
+         }
+     }
+ #endif
+
+     ahlog(AH, 1, "connecting to database for restore\n");
+     if (AH->version < K_VERS_1_3)
+         die_horribly(AH, modulename,
+                      "direct database connections are not supported in pre-1.3 archives\n");
+
+     /* XXX Should get this from the archive */
+     AHX->minRemoteVersion = 070100;
+     AHX->maxRemoteVersion = 999999;
+
+     /* correct dependency counts in case we're doing a partial restore */
+     if (ropt->idWanted == NULL)
+         InitDummyWantedList(AHX,ropt);
+     _fix_dependency_counts(AH);
+
+     /*
+      * Since we're talking to the DB directly, don't send comments since they
+      * obscure SQL when displaying errors
+      */
+     AH->noTocComments = 1;
+
+     /* Do all the early stuff in a single connection in the parent.
+      * There's no great point in running it in parallel and it will actually
+      * run faster in a single connection because we avoid all the connection
+      * and setup overhead, including the 0.5s sleep below.
+      */
+     ConnectDatabase(AHX, ropt->dbname,
+                     ropt->pghost, ropt->pgport, ropt->username,
+                     ropt->requirePassword);
+
+
+     /*
+      * Establish important parameter values right away.
+      */
+     _doSetFixedOutputState(AH);
+
+     while((next_work_item = get_next_work_item(AH)) != NULL)
+     {
+         /* XXX need to improve this test in case there is no table data */
+         /* need to test for indexes, FKs, PK, Unique, etc */
+         if(strcmp(next_work_item->desc,"TABLE DATA") == 0)
+             break;
+         (void) _restore_one_te(AH, next_work_item, ropt, false);
+
+         next_work_item->prestored = true;
+
+         _reduce_dependencies(AH,next_work_item);
+     }
+
+
+     /*
+      * now close parent connection in prep for parallel step.
+      */
+     PQfinish(AH->connection);
+     AH->connection = NULL;
+
+     /* blow away any preserved state from the previous connection */
+
+     if (AH->currSchema)
+         free(AH->currSchema);
+     AH->currSchema = strdup("");
+     if (AH->currUser)
+         free(AH->currUser);
+     AH->currUser = strdup("");
+     if (AH->currTablespace)
+         free(AH->currTablespace);
+     AH->currTablespace = NULL;
+     AH->currWithOids = -1;
+
+     /* main parent loop */
+
+     ahlog(AH,1,"entering main loop\n");
+
+     while (((next_work_item = get_next_work_item(AH)) != NULL) ||
+            (work_is_being_done(slots,n_slots)))
+     {
+         if (next_work_item != NULL &&
+             ((next_slot = get_next_slot(slots,n_slots)) != NO_SLOT))
+         {
+             /* there is work still to do and a worker slot available */
+
+             pid_t child;
+
+             next_work_item->prestored = true;
+
+             child = fork();
+             if (child == 0)
+             {
+                 prestore(AH,next_work_item);
+                 /* should not happen ... we expect prestore to exit */
+                 exit(1);
+             }
+             else if (child > 0)
+             {
+                 slots[next_slot].pid = child;
+                 slots[next_slot].te = next_work_item;
+                 slots[next_slot].dumpId = next_work_item->dumpId;
+             }
+             else
+             {
+                 /* XXX fork error - handle it! */
+             }
+             /* delay just long enough betweek forks to give the catalog some
+              * breathing space. Without this sleep I got
+              * "tuple concurrently updated" errors.
+              */
+             /* pg_usleep(500000); */
+             continue; /* in case the slots are not yet full */
+         }
+         /* if we get here there must be work being done */
+         ret_child = wait(&work_status);
+
+         if (WIFEXITED(work_status) && WEXITSTATUS(work_status) == 0)
+         {
+             mark_work_done(AH, ret_child, slots, n_slots);
+         }
+         else if (WIFEXITED(work_status) && WEXITSTATUS(work_status) == 1)
+         {
+             int i;
+
+             for (i = 0; i < n_slots; i++)
+             {
+                 if (slots[i].pid == ret_child)
+                     _inhibit_data_for_failed_table(AH, slots[i].te);
+                 break;
+             }
+             mark_work_done(AH, ret_child, slots, n_slots);
+         }
+         else
+         {
+             /* XXX something went wrong - deal with it */
+         }
+     }
+
+     /*
+      * now process the ACLs - no need to do this in parallel
+      */
+
+     /* reconnect from parent */
+     ConnectDatabase(AHX, ropt->dbname,
+                     ropt->pghost, ropt->pgport, ropt->username,
+                     ropt->requirePassword);
+
+     /*
+      * Scan TOC to output ownership commands and ACLs
+      */
+     for (te = AH->toc->next; te != AH->toc; te = te->next)
+     {
+         AH->currentTE = te;
+
+         /* Work out what, if anything, we want from this entry */
+         reqs = _tocEntryRequired(te, ropt, true);
+
+         if ((reqs & REQ_SCHEMA) != 0)    /* We want the schema */
+         {
+             ahlog(AH, 1, "setting owner and privileges for %s %s\n",
+                   te->desc, te->tag);
+             _printTocEntry(AH, te, ropt, false, true);
+         }
+     }
+
+     /* clean up */
+     PQfinish(AH->connection);
+     AH->connection = NULL;
+
+ }
+
+ static bool
+ work_is_being_done(ParallelSlot *slot, int n_slots)
+ {
+     ahlog(GAH,1,"is work being done?\n");
+     while(n_slots--)
+     {
+         if (slot->pid > 0)
+             return true;
+         slot++;
+     }
+     ahlog(GAH,1,"work is not being done\n");
+     return false;
+ }
+
+ static int
+ get_next_slot(ParallelSlot *slots, int n_slots)
+ {
+     int i;
+
+     for (i = 0; i < n_slots; i++)
+     {
+         if (slots[i].pid == 0)
+         {
+             ahlog(GAH,1,"available slots is %d\n",i);
+             return i;
+         }
+     }
+     ahlog(GAH,1,"No slot available\n");
+     return NO_SLOT;
+ }
+
+ static TocEntry*
+ get_next_work_item(ArchiveHandle *AH)
+ {
+     TocEntry *te;
+     teReqs    reqs;
+
+     /* just search from the top of the queue until we find an available item.
+      * Note that the queue isn't reordered in the current implementation. If
+      * we ever do reorder it, then certain code that processes entries from the
+      * current item to the end of the queue will probably need to be
+      * re-examined.
+      */
+
+     for (te = AH->toc->next; te != AH->toc; te = te->next)
+     {
+         if (!te->prestored && te->depCount < 1)
+         {
+             /* make sure it's not an ACL */
+             reqs = _tocEntryRequired (te, AH->ropt, false);
+             if ((reqs & (REQ_SCHEMA | REQ_DATA)) != 0)
+             {
+                 ahlog(AH,1,"next item is %d\n",te->dumpId);
+                 return te;
+             }
+         }
+     }
+     ahlog(AH,1,"No item ready\n");
+     return NULL;
+ }
+
+ static void
+ prestore(ArchiveHandle *AH, TocEntry *te)
+ {
+     RestoreOptions *ropt = AH->ropt;
+     int retval;
+
+     /* close and reopen the archive so we have a private copy that doesn't
+      * stomp on anyone else's file pointer
+      */
+
+     (AH->ReopenPtr)(AH);
+
+     ConnectDatabase((Archive *)AH, ropt->dbname,
+                     ropt->pghost, ropt->pgport, ropt->username,
+                     ropt->requirePassword);
+
+     /*
+      * Establish important parameter values right away.
+      */
+     _doSetFixedOutputState(AH);
+
+     retval = _restore_one_te(AH, te, ropt, true);
+
+     PQfinish(AH->connection);
+     exit(retval);
+
+ }
+
+ static void
+ mark_work_done(ArchiveHandle *AH, pid_t worker,
+                ParallelSlot *slots, int n_slots)
+ {
+
+     TocEntry *te = NULL;
+     int i;
+
+     for (i = 0; i < n_slots; i++)
+     {
+         if (slots[i].pid == worker)
+         {
+             te = slots[i].te;
+             slots[i].pid = 0;
+             slots[i].te = NULL;
+             slots[i].dumpId = 0;
+             break;
+         }
+     }
+
+     /* Assert (te != NULL); */
+
+     _reduce_dependencies(AH,te);
+
+
+ }
+
+
+ /*
+  * Make sure the head of each dependency chain is a live item
+  *
+  * Once this is established the property will be maintained by
+  * _reduce_dependencies called as items are done.
+  */
+ static void
+ _fix_dependency_counts(ArchiveHandle *AH)
+ {
+     TocEntry * te;
+     RestoreOptions * ropt = AH->ropt;
+     bool * RealDumpIds;
+     int i;
+
+
+     RealDumpIds = calloc(AH->maxDumpId, sizeof(bool));
+     for (te = AH->toc->next; te != AH->toc; te = te->next)
+     {
+         RealDumpIds[te->dumpId-1] = true;
+         if (te->depCount == 0 && ! ropt->idWanted[te->dumpId -1])
+             _reduce_dependencies(AH,te);
+     }
+
+     /*
+      * It is possible that the dependencies list items that are
+      * not in the archive at all. Reduce the depcounts so those get
+      * ignored.
+      */
+     for (te = AH->toc->next; te != AH->toc; te = te->next)
+         for (i = 0; i < te->nDeps; i++)
+             if (!RealDumpIds[te->dependencies[i]-1])
+                 te->depCount--;
+ }
+
+ static void
+ _reduce_dependencies(ArchiveHandle * AH, TocEntry *te)
+ {
+     DumpId item = te->dumpId;
+     RestoreOptions * ropt = AH->ropt;
+     int i;
+
+     for (te = te->next; te != AH->toc; te = te->next)
+     {
+         if (te->nDeps == 0)
+             continue;
+
+         for (i = 0; i < te->nDeps; i++)
+             if (te->dependencies[i] == item)
+                 te->depCount = te->depCount - 1;
+
+         /* If this is a table data item we are making available,
+          * make the table's dependencies depend on this item instead of
+          * the table definition, so they
+          * don't get scheduled until the data is loaded.
+          * Have to do this now before the main loop gets to anything
+          * further down the list.
+          */
+         if (te->depCount == 0 && strcmp(te->desc,"TABLEDATA") == 0)
+         {
+             TocEntry *tes;
+             int j;
+             for (tes = te->next; tes != AH->toc; tes = tes->next)
+                 for (j = 0; j < tes->nDeps; j++)
+                     if (tes->dependencies[j] == item)
+                         tes->dependencies[j] = te->dumpId;
+         }
+
+         /*
+          * If this item won't in fact be done, and is now  at
+          * 0 dependency count, we pretend it's been done and
+          * reduce the dependency counts of all the things that
+          * depend on it, by a recursive call
+          */
+         if (te->depCount == 0 && ! ropt->idWanted[te->dumpId -1])
+             _reduce_dependencies(AH,te);
+     }
+
+ }
+
+
+ /* Public */
+ void
  RestoreArchive(Archive *AHX, RestoreOptions *ropt)
  {
      ArchiveHandle *AH = (ArchiveHandle *) AHX;
      TocEntry   *te;
      teReqs        reqs;
      OutputContext sav;

      AH->ropt = ropt;
      AH->stage = STAGE_INITIALIZING;
***************
*** 171,176 ****
--- 613,632 ----
          AH->noTocComments = 1;
      }

+ #ifndef HAVE_LIBZ
+
+     /* make sure we won't need (de)compression we haven't got */
+     if (AH->compression != 0 && AH->PrintTocDataPtr != NULL)
+     {
+         for (te = AH->toc->next; te != AH->toc; te = te->next)
+         {
+             reqs = _tocEntryRequired(te, ropt, false);
+             if (te->hadDumper && (reqs & REQ_DATA) != 0)
+                 die_horribly(AH, modulename, "cannot restore from compressed archive (compression not supported in
thisinstallation)\n"); 
+         }
+     }
+ #endif
+
      /*
       * Work out if we have an implied data-only restore. This can happen if
       * the dump was data only or if the user has used a toc list to exclude
***************
*** 270,409 ****
       */
      for (te = AH->toc->next; te != AH->toc; te = te->next)
      {
!         AH->currentTE = te;
!
!         /* Work out what, if anything, we want from this entry */
!         reqs = _tocEntryRequired(te, ropt, false);
!
!         /* Dump any relevant dump warnings to stderr */
!         if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
!         {
!             if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
!                 write_msg(modulename, "warning from original dump file: %s\n", te->defn);
!             else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
!                 write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt);
!         }
!
!         defnDumped = false;
!
!         if ((reqs & REQ_SCHEMA) != 0)    /* We want the schema */
!         {
!             ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag);
!
!             _printTocEntry(AH, te, ropt, false, false);
!             defnDumped = true;
!
!             /*
!              * If we could not create a table and --no-data-for-failed-tables
!              * was given, ignore the corresponding TABLE DATA
!              */
!             if (ropt->noDataForFailedTables &&
!                 AH->lastErrorTE == te &&
!                 strcmp(te->desc, "TABLE") == 0)
!             {
!                 TocEntry   *tes;
!
!                 ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n",
!                       te->tag);
!
!                 for (tes = te->next; tes != AH->toc; tes = tes->next)
!                 {
!                     if (strcmp(tes->desc, "TABLE DATA") == 0 &&
!                         strcmp(tes->tag, te->tag) == 0 &&
!                         strcmp(tes->namespace ? tes->namespace : "",
!                                te->namespace ? te->namespace : "") == 0)
!                     {
!                         /* mark it unwanted */
!                         ropt->idWanted[tes->dumpId - 1] = false;
!                         break;
!                     }
!                 }
!             }
!
!             /* If we created a DB, connect to it... */
!             if (strcmp(te->desc, "DATABASE") == 0)
!             {
!                 ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag);
!                 _reconnectToDB(AH, te->tag);
!             }
!         }
!
!         /*
!          * If we have a data component, then process it
!          */
!         if ((reqs & REQ_DATA) != 0)
!         {
!             /*
!              * hadDumper will be set if there is genuine data component for
!              * this node. Otherwise, we need to check the defn field for
!              * statements that need to be executed in data-only restores.
!              */
!             if (te->hadDumper)
!             {
!                 /*
!                  * If we can output the data, then restore it.
!                  */
!                 if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0)
!                 {
! #ifndef HAVE_LIBZ
!                     if (AH->compression != 0)
!                         die_horribly(AH, modulename, "cannot restore from compressed archive (compression not
supportedin this installation)\n"); 
! #endif
!
!                     _printTocEntry(AH, te, ropt, true, false);
!
!                     if (strcmp(te->desc, "BLOBS") == 0 ||
!                         strcmp(te->desc, "BLOB COMMENTS") == 0)
!                     {
!                         ahlog(AH, 1, "restoring %s\n", te->desc);
!
!                         _selectOutputSchema(AH, "pg_catalog");
!
!                         (*AH->PrintTocDataPtr) (AH, te, ropt);
!                     }
!                     else
!                     {
!                         _disableTriggersIfNecessary(AH, te, ropt);
!
!                         /* Select owner and schema as necessary */
!                         _becomeOwner(AH, te);
!                         _selectOutputSchema(AH, te->namespace);
!
!                         ahlog(AH, 1, "restoring data for table \"%s\"\n",
!                               te->tag);
!
!                         /*
!                          * If we have a copy statement, use it. As of V1.3,
!                          * these are separate to allow easy import from
!                          * withing a database connection. Pre 1.3 archives can
!                          * not use DB connections and are sent to output only.
!                          *
!                          * For V1.3+, the table data MUST have a copy
!                          * statement so that we can go into appropriate mode
!                          * with libpq.
!                          */
!                         if (te->copyStmt && strlen(te->copyStmt) > 0)
!                         {
!                             ahprintf(AH, "%s", te->copyStmt);
!                             AH->writingCopyData = true;
!                         }
!
!                         (*AH->PrintTocDataPtr) (AH, te, ropt);
!
!                         AH->writingCopyData = false;
!
!                         _enableTriggersIfNecessary(AH, te, ropt);
!                     }
!                 }
!             }
!             else if (!defnDumped)
!             {
!                 /* If we haven't already dumped the defn part, do so now */
!                 ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag);
!                 _printTocEntry(AH, te, ropt, false, false);
!             }
!         }
!     }                            /* end loop over TOC entries */

      /*
       * Scan TOC again to output ownership commands and ACLs
--- 726,733 ----
       */
      for (te = AH->toc->next; te != AH->toc; te = te->next)
      {
!         (void) _restore_one_te(AH, te, ropt, false);
!     }

      /*
       * Scan TOC again to output ownership commands and ACLs
***************
*** 451,456 ****
--- 775,955 ----
      }
  }

+ static int
+ _restore_one_te(ArchiveHandle *AH, TocEntry *te,
+                 RestoreOptions *ropt, bool is_parallel)
+ {
+     teReqs        reqs;
+     bool        defnDumped;
+     int         retval = 0;
+
+     AH->currentTE = te;
+
+     /* Work out what, if anything, we want from this entry */
+     reqs = _tocEntryRequired(te, ropt, false);
+
+     /* Dump any relevant dump warnings to stderr */
+     if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
+     {
+         if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
+             write_msg(modulename, "warning from original dump file: %s\n", te->defn);
+         else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
+             write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt);
+     }
+
+     defnDumped = false;
+
+     if ((reqs & REQ_SCHEMA) != 0)    /* We want the schema */
+     {
+         ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag);
+
+         _printTocEntry(AH, te, ropt, false, false);
+         defnDumped = true;
+
+         /*
+          * If we could not create a table and --no-data-for-failed-tables
+          * was given, ignore the corresponding TABLE DATA
+          *
+          * For the parallel case this must be done in the parent, so we just
+          * set a return value.
+          */
+         if (ropt->noDataForFailedTables &&
+             AH->lastErrorTE == te &&
+             strcmp(te->desc, "TABLE") == 0)
+         {
+             if (is_parallel)
+                 retval = 1;
+             else
+                 _inhibit_data_for_failed_table(AH,te);
+         }
+
+         /* If we created a DB, connect to it... */
+         /* won't happen in parallel restore */
+         if (strcmp(te->desc, "DATABASE") == 0)
+         {
+             ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag);
+             _reconnectToDB(AH, te->tag);
+         }
+     }
+
+     /*
+      * If we have a data component, then process it
+      */
+     if ((reqs & REQ_DATA) != 0)
+     {
+         /*
+          * hadDumper will be set if there is genuine data component for
+          * this node. Otherwise, we need to check the defn field for
+          * statements that need to be executed in data-only restores.
+          */
+         if (te->hadDumper)
+         {
+             /*
+              * If we can output the data, then restore it.
+              */
+             if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0)
+             {
+                 _printTocEntry(AH, te, ropt, true, false);
+
+                 if (strcmp(te->desc, "BLOBS") == 0 ||
+                     strcmp(te->desc, "BLOB COMMENTS") == 0)
+                 {
+                     ahlog(AH, 1, "restoring %s\n", te->desc);
+
+                     _selectOutputSchema(AH, "pg_catalog");
+
+                     (*AH->PrintTocDataPtr) (AH, te, ropt);
+                 }
+                 else
+                 {
+                     _disableTriggersIfNecessary(AH, te, ropt);
+
+                     /* Select owner and schema as necessary */
+                     _becomeOwner(AH, te);
+                     _selectOutputSchema(AH, te->namespace);
+
+                     ahlog(AH, 1, "restoring data for table \"%s\"\n",
+                           te->tag);
+
+                     if (ropt->truncate_before_load)
+                     {
+                         if (AH->connection)
+                             StartTransaction(AH);
+                         else
+                             ahprintf(AH, "BEGIN;\n\n");
+
+                         ahprintf(AH, "TRUNCATE TABLE %s;\n\n",
+                                  fmtId(te->tag));                    }
+
+                     /*
+                      * If we have a copy statement, use it. As of V1.3,
+                      * these are separate to allow easy import from
+                      * withing a database connection. Pre 1.3 archives can
+                      * not use DB connections and are sent to output only.
+                      *
+                      * For V1.3+, the table data MUST have a copy
+                      * statement so that we can go into appropriate mode
+                      * with libpq.
+                      */
+                     if (te->copyStmt && strlen(te->copyStmt) > 0)
+                     {
+                         ahprintf(AH, "%s", te->copyStmt);
+                         AH->writingCopyData = true;
+                     }
+
+                     (*AH->PrintTocDataPtr) (AH, te, ropt);
+
+                     AH->writingCopyData = false;
+
+                     if (ropt->truncate_before_load)
+                     {
+                         if (AH->connection)
+                             CommitTransaction(AH);
+                         else
+                             ahprintf(AH, "COMMIT;\n\n");
+                     }
+
+
+                     _enableTriggersIfNecessary(AH, te, ropt);
+                 }
+             }
+         }
+         else if (!defnDumped)
+         {
+             /* If we haven't already dumped the defn part, do so now */
+             ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag);
+             _printTocEntry(AH, te, ropt, false, false);
+         }
+     }
+
+     return retval;
+ }
+
+ static void
+ _inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry * te)
+ {
+     TocEntry   *tes;
+     RestoreOptions *ropt = AH->ropt;
+
+     ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n",
+           te->tag);
+
+     for (tes = te->next; tes != AH->toc; tes = tes->next)
+     {
+         if (strcmp(tes->desc, "TABLE DATA") == 0 &&
+             strcmp(tes->tag, te->tag) == 0 &&
+             strcmp(tes->namespace ? tes->namespace : "",
+                    te->namespace ? te->namespace : "") == 0)
+         {
+             /* mark it unwanted */
+             ropt->idWanted[tes->dumpId - 1] = false;
+
+             _reduce_dependencies(AH, tes);
+             break;
+         }
+     }
+ }
+
  /*
   * Allocate a new RestoreOptions block.
   * This is mainly so we can initialize it, but also for future expansion,
***************
*** 653,662 ****
      while (te != AH->toc)
      {
          if (_tocEntryRequired(te, ropt, true) != 0)
!             ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
                       te->catalogId.tableoid, te->catalogId.oid,
                       te->desc, te->namespace ? te->namespace : "-",
                       te->tag, te->owner);
          te = te->next;
      }

--- 1152,1167 ----
      while (te != AH->toc)
      {
          if (_tocEntryRequired(te, ropt, true) != 0)
!         {
!             int i;
!             ahprintf(AH, "%d;[%d: ",te->dumpId, te->nDeps);
!             for (i=0 ;i<te->nDeps; i++)
!                 ahprintf(AH, "%d ",te->dependencies[i]);
!             ahprintf(AH, "] %u %u %s %s %s %s\n",
                       te->catalogId.tableoid, te->catalogId.oid,
                       te->desc, te->namespace ? te->namespace : "-",
                       te->tag, te->owner);
+         }
          te = te->next;
      }

***************
*** 1948,1965 ****
--- 2453,2473 ----
                  deps = (DumpId *) realloc(deps, sizeof(DumpId) * depIdx);
                  te->dependencies = deps;
                  te->nDeps = depIdx;
+                 te->depCount = depIdx;
              }
              else
              {
                  free(deps);
                  te->dependencies = NULL;
                  te->nDeps = 0;
+                 te->depCount = 0;
              }
          }
          else
          {
              te->dependencies = NULL;
              te->nDeps = 0;
+             te->depCount = 0;
          }

          if (AH->ReadExtraTocPtr)
Index: pg_backup_archiver.h
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_archiver.h,v
retrieving revision 1.76
diff -c -r1.76 pg_backup_archiver.h
*** pg_backup_archiver.h    7 Nov 2007 12:24:24 -0000    1.76
--- pg_backup_archiver.h    29 Sep 2008 02:43:52 -0000
***************
*** 99,104 ****
--- 99,105 ----
  struct _restoreList;

  typedef void (*ClosePtr) (struct _archiveHandle * AH);
+ typedef void (*ReopenPtr) (struct _archiveHandle * AH);
  typedef void (*ArchiveEntryPtr) (struct _archiveHandle * AH, struct _tocEntry * te);

  typedef void (*StartDataPtr) (struct _archiveHandle * AH, struct _tocEntry * te);
***************
*** 212,217 ****
--- 213,219 ----
      WriteBufPtr WriteBufPtr;    /* Write a buffer of output to the archive */
      ReadBufPtr ReadBufPtr;        /* Read a buffer of input from the archive */
      ClosePtr ClosePtr;            /* Close the archive */
+     ReopenPtr ReopenPtr;            /* Reopen the archive */
      WriteExtraTocPtr WriteExtraTocPtr;    /* Write extra TOC entry data
                                           * associated with the current archive
                                           * format */
***************
*** 231,236 ****
--- 233,239 ----
      char       *archdbname;        /* DB name *read* from archive */
      bool        requirePassword;
      PGconn       *connection;
+     char       *cachepw;
      int            connectToDB;    /* Flag to indicate if direct DB connection is
                                   * required */
      bool        writingCopyData;    /* True when we are sending COPY data */
***************
*** 284,289 ****
--- 287,293 ----
      DumpId        dumpId;
      bool        hadDumper;        /* Archiver was passed a dumper routine (used
                                   * in restore) */
+     bool        prestored;      /* keep track of parallel restore */
      char       *tag;            /* index tag */
      char       *namespace;        /* null or empty string if not in a schema */
      char       *tablespace;        /* null if not in a tablespace; empty string
***************
*** 296,301 ****
--- 300,306 ----
      char       *copyStmt;
      DumpId       *dependencies;    /* dumpIds of objects this one depends on */
      int            nDeps;            /* number of dependencies */
+     int         depCount;       /* adjustable tally of dependencies */

      DataDumperPtr dataDumper;    /* Routine to dump data for object */
      void       *dataDumperArg;    /* Arg for above routine */
Index: pg_backup_custom.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_custom.c,v
retrieving revision 1.40
diff -c -r1.40 pg_backup_custom.c
*** pg_backup_custom.c    28 Oct 2007 21:55:52 -0000    1.40
--- pg_backup_custom.c    29 Sep 2008 02:43:52 -0000
***************
*** 40,45 ****
--- 40,46 ----
  static size_t _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
  static size_t _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
  static void _CloseArchive(ArchiveHandle *AH);
+ static void _ReopenArchive(ArchiveHandle *AH);
  static void _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
  static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
  static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
***************
*** 120,125 ****
--- 121,127 ----
      AH->WriteBufPtr = _WriteBuf;
      AH->ReadBufPtr = _ReadBuf;
      AH->ClosePtr = _CloseArchive;
+     AH->ReopenPtr = _ReopenArchive;
      AH->PrintTocDataPtr = _PrintTocData;
      AH->ReadExtraTocPtr = _ReadExtraToc;
      AH->WriteExtraTocPtr = _WriteExtraToc;
***************
*** 835,840 ****
--- 837,879 ----
      AH->FH = NULL;
  }

+ static void
+ _ReopenArchive(ArchiveHandle *AH)
+ {
+     lclContext *ctx = (lclContext *) AH->formatData;
+     pgoff_t        tpos;
+
+     if (AH->mode == archModeWrite)
+     {
+         die_horribly(AH,modulename,"Can only reopen input archives");
+     }
+     else if ((! AH->fSpec) ||  strcmp(AH->fSpec, "") == 0)
+     {
+         die_horribly(AH,modulename,"Cannot reopen stdin");
+     }
+
+     tpos = ftello(AH->FH);
+
+     if (fclose(AH->FH) != 0)
+         die_horribly(AH, modulename, "could not close archive file: %s\n",
+                      strerror(errno));
+
+     AH->FH = fopen(AH->fSpec, PG_BINARY_R);
+     if (!AH->FH)
+         die_horribly(AH, modulename, "could not open input file \"%s\": %s\n",
+                      AH->fSpec, strerror(errno));
+
+     if (ctx->hasSeek)
+     {
+         fseeko(AH->FH, tpos, SEEK_SET);
+     }
+     else
+     {
+         die_horribly(AH,modulename,"cannot reopen non-seekable file");
+     }
+
+ }
+
  /*--------------------------------------------------
   * END OF FORMAT CALLBACKS
   *--------------------------------------------------
Index: pg_backup_db.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_db.c,v
retrieving revision 1.80
diff -c -r1.80 pg_backup_db.c
*** pg_backup_db.c    16 Aug 2008 02:25:06 -0000    1.80
--- pg_backup_db.c    29 Sep 2008 02:43:52 -0000
***************
*** 138,148 ****

      ahlog(AH, 1, "connecting to database \"%s\" as user \"%s\"\n", newdb, newuser);

!     if (AH->requirePassword)
      {
          password = simple_prompt("Password: ", 100, false);
          if (password == NULL)
              die_horribly(AH, modulename, "out of memory\n");
      }

      do
--- 138,153 ----

      ahlog(AH, 1, "connecting to database \"%s\" as user \"%s\"\n", newdb, newuser);

!     if (AH->requirePassword && AH->cachepw == NULL)
      {
          password = simple_prompt("Password: ", 100, false);
          if (password == NULL)
              die_horribly(AH, modulename, "out of memory\n");
+         AH->requirePassword = true;
+     }
+     else if (AH->requirePassword)
+     {
+         password = AH->cachepw;
      }

      do
***************
*** 174,180 ****
          }
      } while (new_pass);

!     if (password)
          free(password);

      /* check for version mismatch */
--- 179,185 ----
          }
      } while (new_pass);

!     if (password != AH->cachepw)
          free(password);

      /* check for version mismatch */
***************
*** 206,220 ****
      if (AH->connection)
          die_horribly(AH, modulename, "already connected to a database\n");

!     if (reqPwd)
      {
          password = simple_prompt("Password: ", 100, false);
          if (password == NULL)
              die_horribly(AH, modulename, "out of memory\n");
          AH->requirePassword = true;
      }
      else
          AH->requirePassword = false;

      /*
       * Start the connection.  Loop until we have a password if requested by
--- 211,231 ----
      if (AH->connection)
          die_horribly(AH, modulename, "already connected to a database\n");

!     if (reqPwd && AH->cachepw == NULL)
      {
          password = simple_prompt("Password: ", 100, false);
          if (password == NULL)
              die_horribly(AH, modulename, "out of memory\n");
          AH->requirePassword = true;
      }
+     else if (reqPwd)
+     {
+         password = AH->cachepw;
+     }
      else
+     {
          AH->requirePassword = false;
+     }

      /*
       * Start the connection.  Loop until we have a password if requested by
***************
*** 241,247 ****
      } while (new_pass);

      if (password)
!         free(password);

      /* check to see that the backend connection was successfully made */
      if (PQstatus(AH->connection) == CONNECTION_BAD)
--- 252,258 ----
      } while (new_pass);

      if (password)
!         AH->cachepw = password;

      /* check to see that the backend connection was successfully made */
      if (PQstatus(AH->connection) == CONNECTION_BAD)
Index: pg_backup_files.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_files.c,v
retrieving revision 1.34
diff -c -r1.34 pg_backup_files.c
*** pg_backup_files.c    28 Oct 2007 21:55:52 -0000    1.34
--- pg_backup_files.c    29 Sep 2008 02:43:52 -0000
***************
*** 87,92 ****
--- 87,93 ----
      AH->WriteBufPtr = _WriteBuf;
      AH->ReadBufPtr = _ReadBuf;
      AH->ClosePtr = _CloseArchive;
+     AH->ReopenPtr = NULL;
      AH->PrintTocDataPtr = _PrintTocData;
      AH->ReadExtraTocPtr = _ReadExtraToc;
      AH->WriteExtraTocPtr = _WriteExtraToc;
Index: pg_backup_tar.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_tar.c,v
retrieving revision 1.62
diff -c -r1.62 pg_backup_tar.c
*** pg_backup_tar.c    15 Nov 2007 21:14:41 -0000    1.62
--- pg_backup_tar.c    29 Sep 2008 02:43:52 -0000
***************
*** 143,148 ****
--- 143,149 ----
      AH->WriteBufPtr = _WriteBuf;
      AH->ReadBufPtr = _ReadBuf;
      AH->ClosePtr = _CloseArchive;
+     AH->ReopenPtr = NULL;
      AH->PrintTocDataPtr = _PrintTocData;
      AH->ReadExtraTocPtr = _ReadExtraToc;
      AH->WriteExtraTocPtr = _WriteExtraToc;
Index: pg_restore.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_restore.c,v
retrieving revision 1.88
diff -c -r1.88 pg_restore.c
*** pg_restore.c    13 Apr 2008 03:49:22 -0000    1.88
--- pg_restore.c    29 Sep 2008 02:43:52 -0000
***************
*** 78,83 ****
--- 78,84 ----
      static int    no_data_for_failed_tables = 0;
      static int  outputNoTablespaces = 0;
      static int    use_setsessauth = 0;
+     static int  truncate_before_load = 0;

      struct option cmdopts[] = {
          {"clean", 0, NULL, 'c'},
***************
*** 92,97 ****
--- 93,99 ----
          {"ignore-version", 0, NULL, 'i'},
          {"index", 1, NULL, 'I'},
          {"list", 0, NULL, 'l'},
+         {"multi-thread",1,NULL,'m'},
          {"no-privileges", 0, NULL, 'x'},
          {"no-acl", 0, NULL, 'x'},
          {"no-owner", 0, NULL, 'O'},
***************
*** 114,119 ****
--- 116,122 ----
          {"disable-triggers", no_argument, &disable_triggers, 1},
          {"no-data-for-failed-tables", no_argument, &no_data_for_failed_tables, 1},
          {"no-tablespaces", no_argument, &outputNoTablespaces, 1},
+         {"truncate-before-load", no_argument, &truncate_before_load, 1},
          {"use-set-session-authorization", no_argument, &use_setsessauth, 1},

          {NULL, 0, NULL, 0}
***************
*** 139,145 ****
          }
      }

!     while ((c = getopt_long(argc, argv, "acCd:ef:F:h:iI:lL:n:Op:P:RsS:t:T:U:vWxX:1",
                              cmdopts, NULL)) != -1)
      {
          switch (c)
--- 142,148 ----
          }
      }

!     while ((c = getopt_long(argc, argv, "acCd:ef:F:h:iI:lL:m:n:Op:P:RsS:t:T:U:vWxX:1",
                              cmdopts, NULL)) != -1)
      {
          switch (c)
***************
*** 182,187 ****
--- 185,194 ----
                  opts->tocFile = strdup(optarg);
                  break;

+             case 'm':
+                 opts->number_of_threads = atoi(optarg); /* XXX fix error checking */
+                 break;
+
              case 'n':            /* Dump data for this schema only */
                  opts->schemaNames = strdup(optarg);
                  break;
***************
*** 262,268 ****
                  break;

              case 0:
!                 /* This covers the long options equivalent to -X xxx. */
                  break;

              case '1':            /* Restore data in a single transaction */
--- 269,278 ----
                  break;

              case 0:
!                 /*
!                  * This covers the long options without a short equivalent,
!                  * including those equivalent to -X xxx.
!                  */
                  break;

              case '1':            /* Restore data in a single transaction */
***************
*** 299,304 ****
--- 309,329 ----
      opts->noDataForFailedTables = no_data_for_failed_tables;
      opts->noTablespace = outputNoTablespaces;
      opts->use_setsessauth = use_setsessauth;
+     opts->truncate_before_load = truncate_before_load;
+
+     if (opts->single_txn)
+     {
+         if (opts->number_of_threads > 1)
+         {
+             write_msg(NULL, "single transaction not compatible with multi-threading");
+             exit(1);
+         }
+         else if (opts->truncate_before_load)
+         {
+             write_msg(NULL, "single transaction not compatible with truncate-before-load");
+             exit(1);
+         }
+     }

      if (opts->formatName)
      {
***************
*** 330,335 ****
--- 355,362 ----

      AH = OpenArchive(inputFileSpec, opts->format);

+     /* XXX looks like we'll have to do sanity checks in the parallel archiver */
+
      /* Let the archiver know how noisy to be */
      AH->verbose = opts->verbose;

***************
*** 351,356 ****
--- 378,385 ----

      if (opts->tocSummary)
          PrintTOCSummary(AH, opts);
+     else if (opts->number_of_threads > 1)
+         RestoreArchiveParallel(AH, opts);
      else
          RestoreArchive(AH, opts);


pgsql-hackers by date:

Previous
From: ITAGAKI Takahiro
Date:
Subject: Operation needed for datfrozenxid bug?
Next
From: "David E. Wheeler"
Date:
Subject: Re: Ad-hoc table type?