Re: pg_restore crash when there is a failure before all child process is created - Mailing list pgsql-hackers

vignesh C <vignesh21@gmail.com> writes:
> On Wed, Jan 29, 2020 at 6:54 PM Ahsan Hadi <ahsan.hadi@gmail.com> wrote:
>> Can you share a test case or steps that you are using to reproduce this issue? Are you reproducing this using a
debugger?

> I could reproduce with the following steps:
> Make cluster setup.
> Create few tables.
> Take a dump in directory format using pg_dump.
> Restore the dump generated above using pg_restore with very high
> number for --jobs options around 600.

I agree this is quite broken.  Another way to observe the crash is
to make the fork() call randomly fail, as per booby-trap-fork.patch
below (not intended for commit, obviously).

I don't especially like the proposed patch, though, as it introduces
a great deal of confusion into what ParallelState.numWorkers means.
I think it's better to leave that as being the allocated array size,
and instead clean up all the fuzzy thinking about whether workers
are actually running or not.  Like 0001-fix-worker-status.patch below.

            regards, tom lane

diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c
index 1410bcd..aa2b8be 100644
--- a/src/bin/pg_dump/parallel.c
+++ b/src/bin/pg_dump/parallel.c
@@ -994,7 +994,10 @@ ParallelBackupStart(ArchiveHandle *AH)
                                 wi, 0, &(slot->threadId));
         slot->hThread = handle;
 #else                            /* !WIN32 */
-        pid = fork();
+        if (random() > 1000000000)
+            pid = fork();
+        else
+            pid = -1, errno = EINVAL;
         if (pid == 0)
         {
             /* we are the worker */
diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c
index 1410bcd..c25e3f7 100644
--- a/src/bin/pg_dump/parallel.c
+++ b/src/bin/pg_dump/parallel.c
@@ -42,6 +42,7 @@
  *
  * In the master process, the workerStatus field for each worker has one of
  * the following values:
+ *        WRKR_NOT_STARTED: we've not yet forked this worker
  *        WRKR_IDLE: it's waiting for a command
  *        WRKR_WORKING: it's working on a command
  *        WRKR_TERMINATED: process ended
@@ -75,11 +76,15 @@
 /* Worker process statuses */
 typedef enum
 {
+    WRKR_NOT_STARTED = 0,
     WRKR_IDLE,
     WRKR_WORKING,
     WRKR_TERMINATED
 } T_WorkerStatus;

+#define WORKER_IS_RUNNING(workerStatus) \
+    ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
+
 /*
  * Private per-parallel-worker state (typedef for this is in parallel.h).
  *
@@ -412,7 +417,9 @@ ShutdownWorkersHard(ParallelState *pstate)

     /*
      * Close our write end of the sockets so that any workers waiting for
-     * commands know they can exit.
+     * commands know they can exit.  (Note: some of the pipeWrite fields might
+     * still be zero, if we failed to initialize all the workers.  Hence, just
+     * ignore errors here.)
      */
     for (i = 0; i < pstate->numWorkers; i++)
         closesocket(pstate->parallelSlot[i].pipeWrite);
@@ -486,7 +493,7 @@ WaitForTerminatingWorkers(ParallelState *pstate)

         for (j = 0; j < pstate->numWorkers; j++)
         {
-            if (pstate->parallelSlot[j].workerStatus != WRKR_TERMINATED)
+            if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus))
             {
                 lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
                 nrun++;
@@ -922,6 +929,7 @@ ParallelBackupStart(ArchiveHandle *AH)
     if (AH->public.numWorkers == 1)
         return pstate;

+    /* Create status arrays, being sure to initialize all fields to 0 */
     pstate->te = (TocEntry **)
         pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
     pstate->parallelSlot = (ParallelSlot *)
@@ -969,13 +977,6 @@ ParallelBackupStart(ArchiveHandle *AH)
         if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
             fatal("could not create communication channels: %m");

-        pstate->te[i] = NULL;    /* just for safety */
-
-        slot->workerStatus = WRKR_IDLE;
-        slot->AH = NULL;
-        slot->callback = NULL;
-        slot->callback_data = NULL;
-
         /* master's ends of the pipes */
         slot->pipeRead = pipeWM[PIPE_READ];
         slot->pipeWrite = pipeMW[PIPE_WRITE];
@@ -993,6 +994,7 @@ ParallelBackupStart(ArchiveHandle *AH)
         handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
                                 wi, 0, &(slot->threadId));
         slot->hThread = handle;
+        slot->workerStatus = WRKR_IDLE;
 #else                            /* !WIN32 */
         pid = fork();
         if (pid == 0)
@@ -1035,6 +1037,7 @@ ParallelBackupStart(ArchiveHandle *AH)

         /* In Master after successful fork */
         slot->pid = pid;
+        slot->workerStatus = WRKR_IDLE;

         /* close read end of Master -> Worker */
         closesocket(pipeMW[PIPE_READ]);
@@ -1262,7 +1265,7 @@ GetIdleWorker(ParallelState *pstate)
 }

 /*
- * Return true iff every worker is in the WRKR_TERMINATED state.
+ * Return true iff no worker is running.
  */
 static bool
 HasEveryWorkerTerminated(ParallelState *pstate)
@@ -1271,7 +1274,7 @@ HasEveryWorkerTerminated(ParallelState *pstate)

     for (i = 0; i < pstate->numWorkers; i++)
     {
-        if (pstate->parallelSlot[i].workerStatus != WRKR_TERMINATED)
+        if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
             return false;
     }
     return true;
@@ -1603,7 +1606,7 @@ getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
     FD_ZERO(&workerset);
     for (i = 0; i < pstate->numWorkers; i++)
     {
-        if (pstate->parallelSlot[i].workerStatus == WRKR_TERMINATED)
+        if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
             continue;
         FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
         if (pstate->parallelSlot[i].pipeRead > maxFd)
@@ -1628,6 +1631,8 @@ getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
     {
         char       *msg;

+        if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
+            continue;
         if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
             continue;


pgsql-hackers by date:

Previous
From: Peter Geoghegan
Date:
Subject: Re: Enabling B-Tree deduplication by default
Next
From: Peter Geoghegan
Date:
Subject: Re: Enabling B-Tree deduplication by default