Thread: pg_restore crash when there is a failure before all child process is created

Hi,

I found one crash in pg_restore, this occurs when there is a failure before all the child workers are created. Back trace for the same is given below:
#0  0x00007f9c6d31e337 in raise () from /lib64/libc.so.6
#1  0x00007f9c6d31fa28 in abort () from /lib64/libc.so.6
#2  0x00007f9c6d317156 in __assert_fail_base () from /lib64/libc.so.6
#3  0x00007f9c6d317202 in __assert_fail () from /lib64/libc.so.6
#4  0x0000000000407c9e in WaitForTerminatingWorkers (pstate=0x14af7f0) at parallel.c:515
#5  0x0000000000407bf9 in ShutdownWorkersHard (pstate=0x14af7f0) at parallel.c:451
#6  0x0000000000407ae9 in archive_close_connection (code=1, arg=0x6315a0 <shutdown_info>) at parallel.c:368
#7  0x000000000041a7c7 in exit_nicely (code=1) at pg_backup_utils.c:99
#8  0x0000000000408180 in ParallelBackupStart (AH=0x14972e0) at parallel.c:967
#9  0x000000000040a3dd in RestoreArchive (AHX=0x14972e0) at pg_backup_archiver.c:661
#10 0x0000000000404125 in main (argc=6, argv=0x7ffd5146f308) at pg_restore.c:443

The problem is like:
  • The variable pstate->numWorkers is being set with the number of workers initially in ParallelBackupStart.
  • Then the workers are created one by one.
  • Before creating all the process there is a failure.
  • Then the parent terminates the child process and waits for all the child process to get terminated.
  • This function WaitForTerminatingWorkers checks if all process is terminated by calling HasEveryWorkerTerminated.
  • HasEveryWorkerTerminated will always return false because it will check for the numWorkers rather than the actual forked process count and hits the next assert "Assert(j < pstate->numWorkers);".

Attached patch has the fix for the same. Fixed it by setting pstate->numWorkers with the actual worker count when the child process is being created.

Thoughts?

Regards,
Vignesh
EnterpriseDB: http://www.enterprisedb.com
Attachment
On Wed, Jan 29, 2020 at 6:54 PM Ahsan Hadi <ahsan.hadi@gmail.com> wrote:
>
> Hi Vignesh,
>
> Can you share a test case or steps that you are using to reproduce this issue? Are you reproducing this using a
debugger?
>

I could reproduce with the following steps:
Make cluster setup.
Create few tables.
Take a dump in directory format using pg_dump.
Restore the dump generated above using pg_restore with very high
number for --jobs options around 600.

Regards,
Vignesh
EnterpriseDB: http://www.enterprisedb.com



vignesh C <vignesh21@gmail.com> writes:
> On Wed, Jan 29, 2020 at 6:54 PM Ahsan Hadi <ahsan.hadi@gmail.com> wrote:
>> Can you share a test case or steps that you are using to reproduce this issue? Are you reproducing this using a
debugger?

> I could reproduce with the following steps:
> Make cluster setup.
> Create few tables.
> Take a dump in directory format using pg_dump.
> Restore the dump generated above using pg_restore with very high
> number for --jobs options around 600.

I agree this is quite broken.  Another way to observe the crash is
to make the fork() call randomly fail, as per booby-trap-fork.patch
below (not intended for commit, obviously).

I don't especially like the proposed patch, though, as it introduces
a great deal of confusion into what ParallelState.numWorkers means.
I think it's better to leave that as being the allocated array size,
and instead clean up all the fuzzy thinking about whether workers
are actually running or not.  Like 0001-fix-worker-status.patch below.

            regards, tom lane

diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c
index 1410bcd..aa2b8be 100644
--- a/src/bin/pg_dump/parallel.c
+++ b/src/bin/pg_dump/parallel.c
@@ -994,7 +994,10 @@ ParallelBackupStart(ArchiveHandle *AH)
                                 wi, 0, &(slot->threadId));
         slot->hThread = handle;
 #else                            /* !WIN32 */
-        pid = fork();
+        if (random() > 1000000000)
+            pid = fork();
+        else
+            pid = -1, errno = EINVAL;
         if (pid == 0)
         {
             /* we are the worker */
diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c
index 1410bcd..c25e3f7 100644
--- a/src/bin/pg_dump/parallel.c
+++ b/src/bin/pg_dump/parallel.c
@@ -42,6 +42,7 @@
  *
  * In the master process, the workerStatus field for each worker has one of
  * the following values:
+ *        WRKR_NOT_STARTED: we've not yet forked this worker
  *        WRKR_IDLE: it's waiting for a command
  *        WRKR_WORKING: it's working on a command
  *        WRKR_TERMINATED: process ended
@@ -75,11 +76,15 @@
 /* Worker process statuses */
 typedef enum
 {
+    WRKR_NOT_STARTED = 0,
     WRKR_IDLE,
     WRKR_WORKING,
     WRKR_TERMINATED
 } T_WorkerStatus;

+#define WORKER_IS_RUNNING(workerStatus) \
+    ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
+
 /*
  * Private per-parallel-worker state (typedef for this is in parallel.h).
  *
@@ -412,7 +417,9 @@ ShutdownWorkersHard(ParallelState *pstate)

     /*
      * Close our write end of the sockets so that any workers waiting for
-     * commands know they can exit.
+     * commands know they can exit.  (Note: some of the pipeWrite fields might
+     * still be zero, if we failed to initialize all the workers.  Hence, just
+     * ignore errors here.)
      */
     for (i = 0; i < pstate->numWorkers; i++)
         closesocket(pstate->parallelSlot[i].pipeWrite);
@@ -486,7 +493,7 @@ WaitForTerminatingWorkers(ParallelState *pstate)

         for (j = 0; j < pstate->numWorkers; j++)
         {
-            if (pstate->parallelSlot[j].workerStatus != WRKR_TERMINATED)
+            if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus))
             {
                 lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
                 nrun++;
@@ -922,6 +929,7 @@ ParallelBackupStart(ArchiveHandle *AH)
     if (AH->public.numWorkers == 1)
         return pstate;

+    /* Create status arrays, being sure to initialize all fields to 0 */
     pstate->te = (TocEntry **)
         pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
     pstate->parallelSlot = (ParallelSlot *)
@@ -969,13 +977,6 @@ ParallelBackupStart(ArchiveHandle *AH)
         if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
             fatal("could not create communication channels: %m");

-        pstate->te[i] = NULL;    /* just for safety */
-
-        slot->workerStatus = WRKR_IDLE;
-        slot->AH = NULL;
-        slot->callback = NULL;
-        slot->callback_data = NULL;
-
         /* master's ends of the pipes */
         slot->pipeRead = pipeWM[PIPE_READ];
         slot->pipeWrite = pipeMW[PIPE_WRITE];
@@ -993,6 +994,7 @@ ParallelBackupStart(ArchiveHandle *AH)
         handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
                                 wi, 0, &(slot->threadId));
         slot->hThread = handle;
+        slot->workerStatus = WRKR_IDLE;
 #else                            /* !WIN32 */
         pid = fork();
         if (pid == 0)
@@ -1035,6 +1037,7 @@ ParallelBackupStart(ArchiveHandle *AH)

         /* In Master after successful fork */
         slot->pid = pid;
+        slot->workerStatus = WRKR_IDLE;

         /* close read end of Master -> Worker */
         closesocket(pipeMW[PIPE_READ]);
@@ -1262,7 +1265,7 @@ GetIdleWorker(ParallelState *pstate)
 }

 /*
- * Return true iff every worker is in the WRKR_TERMINATED state.
+ * Return true iff no worker is running.
  */
 static bool
 HasEveryWorkerTerminated(ParallelState *pstate)
@@ -1271,7 +1274,7 @@ HasEveryWorkerTerminated(ParallelState *pstate)

     for (i = 0; i < pstate->numWorkers; i++)
     {
-        if (pstate->parallelSlot[i].workerStatus != WRKR_TERMINATED)
+        if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
             return false;
     }
     return true;
@@ -1603,7 +1606,7 @@ getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
     FD_ZERO(&workerset);
     for (i = 0; i < pstate->numWorkers; i++)
     {
-        if (pstate->parallelSlot[i].workerStatus == WRKR_TERMINATED)
+        if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
             continue;
         FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
         if (pstate->parallelSlot[i].pipeRead > maxFd)
@@ -1628,6 +1631,8 @@ getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
     {
         char       *msg;

+        if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
+            continue;
         if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
             continue;


I have applied tested both patches separately and ran regression with both. No new test cases are failing with both patches.

The issues is fixed by both patches however the fix from Tom looks more elegant. I haven't done a detailed code review.

On Fri, Jan 31, 2020 at 12:39 AM Tom Lane <tgl@sss.pgh.pa.us> wrote:
vignesh C <vignesh21@gmail.com> writes:
> On Wed, Jan 29, 2020 at 6:54 PM Ahsan Hadi <ahsan.hadi@gmail.com> wrote:
>> Can you share a test case or steps that you are using to reproduce this issue? Are you reproducing this using a debugger?

> I could reproduce with the following steps:
> Make cluster setup.
> Create few tables.
> Take a dump in directory format using pg_dump.
> Restore the dump generated above using pg_restore with very high
> number for --jobs options around 600.

I agree this is quite broken.  Another way to observe the crash is
to make the fork() call randomly fail, as per booby-trap-fork.patch
below (not intended for commit, obviously).

I don't especially like the proposed patch, though, as it introduces
a great deal of confusion into what ParallelState.numWorkers means.
I think it's better to leave that as being the allocated array size,
and instead clean up all the fuzzy thinking about whether workers
are actually running or not.  Like 0001-fix-worker-status.patch below.

                        regards, tom lane



--
Highgo Software (Canada/China/Pakistan)
URL : http://www.highgo.ca
ADDR: 10318 WHALLEY BLVD, Surrey, BC
EMAIL: mailto: ahsan.hadi@highgo.ca
On Fri, Jan 31, 2020 at 1:09 AM Tom Lane <tgl@sss.pgh.pa.us> wrote:
>
> vignesh C <vignesh21@gmail.com> writes:
> > On Wed, Jan 29, 2020 at 6:54 PM Ahsan Hadi <ahsan.hadi@gmail.com> wrote:
> >> Can you share a test case or steps that you are using to reproduce this issue? Are you reproducing this using a
debugger?
>
> > I could reproduce with the following steps:
> > Make cluster setup.
> > Create few tables.
> > Take a dump in directory format using pg_dump.
> > Restore the dump generated above using pg_restore with very high
> > number for --jobs options around 600.
>
> I agree this is quite broken.  Another way to observe the crash is
> to make the fork() call randomly fail, as per booby-trap-fork.patch
> below (not intended for commit, obviously).
>
> I don't especially like the proposed patch, though, as it introduces
> a great deal of confusion into what ParallelState.numWorkers means.
> I think it's better to leave that as being the allocated array size,
> and instead clean up all the fuzzy thinking about whether workers
> are actually running or not.  Like 0001-fix-worker-status.patch below.
>

The patch looks fine to me. The test is also getting fixed by the patch.

Regards,
Vignesh
EnterpriseDB: http://www.enterprisedb.com



The following review has been posted through the commitfest application:
make installcheck-world:  tested, passed
Implements feature:       not tested
Spec compliant:           tested, passed
Documentation:            not tested

I have applied tested both patches separately and ran regression with both. No new test cases are failing with both
patches.

The issues is fixed by both patches however the fix from Tom (0001-fix-worker-status.patch) looks more elegant. I
haven'tdone a detailed code review. 
ahsan hadi <ahsan.hadi@gmail.com> writes:
> I have applied tested both patches separately and ran regression with both. No new test cases are failing with both
patches.
> The issues is fixed by both patches however the fix from Tom (0001-fix-worker-status.patch) looks more elegant. I
haven'tdone a detailed code review. 

Pushed, thanks for looking!

            regards, tom lane