Re: pg_restore crash when there is a failure before all child process is created - Mailing list pgsql-hackers
From | Tom Lane |
---|---|
Subject | Re: pg_restore crash when there is a failure before all child process is created |
Date | |
Msg-id | 11160.1580413187@sss.pgh.pa.us Whole thread Raw |
In response to | Re: pg_restore crash when there is a failure before all child processis created (vignesh C <vignesh21@gmail.com>) |
Responses |
Re: pg_restore crash when there is a failure before all child processis created
Re: pg_restore crash when there is a failure before all child processis created Re: pg_restore crash when there is a failure before all child process is created |
List | pgsql-hackers |
vignesh C <vignesh21@gmail.com> writes: > On Wed, Jan 29, 2020 at 6:54 PM Ahsan Hadi <ahsan.hadi@gmail.com> wrote: >> Can you share a test case or steps that you are using to reproduce this issue? Are you reproducing this using a debugger? > I could reproduce with the following steps: > Make cluster setup. > Create few tables. > Take a dump in directory format using pg_dump. > Restore the dump generated above using pg_restore with very high > number for --jobs options around 600. I agree this is quite broken. Another way to observe the crash is to make the fork() call randomly fail, as per booby-trap-fork.patch below (not intended for commit, obviously). I don't especially like the proposed patch, though, as it introduces a great deal of confusion into what ParallelState.numWorkers means. I think it's better to leave that as being the allocated array size, and instead clean up all the fuzzy thinking about whether workers are actually running or not. Like 0001-fix-worker-status.patch below. regards, tom lane diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c index 1410bcd..aa2b8be 100644 --- a/src/bin/pg_dump/parallel.c +++ b/src/bin/pg_dump/parallel.c @@ -994,7 +994,10 @@ ParallelBackupStart(ArchiveHandle *AH) wi, 0, &(slot->threadId)); slot->hThread = handle; #else /* !WIN32 */ - pid = fork(); + if (random() > 1000000000) + pid = fork(); + else + pid = -1, errno = EINVAL; if (pid == 0) { /* we are the worker */ diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c index 1410bcd..c25e3f7 100644 --- a/src/bin/pg_dump/parallel.c +++ b/src/bin/pg_dump/parallel.c @@ -42,6 +42,7 @@ * * In the master process, the workerStatus field for each worker has one of * the following values: + * WRKR_NOT_STARTED: we've not yet forked this worker * WRKR_IDLE: it's waiting for a command * WRKR_WORKING: it's working on a command * WRKR_TERMINATED: process ended @@ -75,11 +76,15 @@ /* Worker process statuses */ typedef enum { + WRKR_NOT_STARTED = 0, WRKR_IDLE, WRKR_WORKING, WRKR_TERMINATED } T_WorkerStatus; +#define WORKER_IS_RUNNING(workerStatus) \ + ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING) + /* * Private per-parallel-worker state (typedef for this is in parallel.h). * @@ -412,7 +417,9 @@ ShutdownWorkersHard(ParallelState *pstate) /* * Close our write end of the sockets so that any workers waiting for - * commands know they can exit. + * commands know they can exit. (Note: some of the pipeWrite fields might + * still be zero, if we failed to initialize all the workers. Hence, just + * ignore errors here.) */ for (i = 0; i < pstate->numWorkers; i++) closesocket(pstate->parallelSlot[i].pipeWrite); @@ -486,7 +493,7 @@ WaitForTerminatingWorkers(ParallelState *pstate) for (j = 0; j < pstate->numWorkers; j++) { - if (pstate->parallelSlot[j].workerStatus != WRKR_TERMINATED) + if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus)) { lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread; nrun++; @@ -922,6 +929,7 @@ ParallelBackupStart(ArchiveHandle *AH) if (AH->public.numWorkers == 1) return pstate; + /* Create status arrays, being sure to initialize all fields to 0 */ pstate->te = (TocEntry **) pg_malloc0(pstate->numWorkers * sizeof(TocEntry *)); pstate->parallelSlot = (ParallelSlot *) @@ -969,13 +977,6 @@ ParallelBackupStart(ArchiveHandle *AH) if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0) fatal("could not create communication channels: %m"); - pstate->te[i] = NULL; /* just for safety */ - - slot->workerStatus = WRKR_IDLE; - slot->AH = NULL; - slot->callback = NULL; - slot->callback_data = NULL; - /* master's ends of the pipes */ slot->pipeRead = pipeWM[PIPE_READ]; slot->pipeWrite = pipeMW[PIPE_WRITE]; @@ -993,6 +994,7 @@ ParallelBackupStart(ArchiveHandle *AH) handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32, wi, 0, &(slot->threadId)); slot->hThread = handle; + slot->workerStatus = WRKR_IDLE; #else /* !WIN32 */ pid = fork(); if (pid == 0) @@ -1035,6 +1037,7 @@ ParallelBackupStart(ArchiveHandle *AH) /* In Master after successful fork */ slot->pid = pid; + slot->workerStatus = WRKR_IDLE; /* close read end of Master -> Worker */ closesocket(pipeMW[PIPE_READ]); @@ -1262,7 +1265,7 @@ GetIdleWorker(ParallelState *pstate) } /* - * Return true iff every worker is in the WRKR_TERMINATED state. + * Return true iff no worker is running. */ static bool HasEveryWorkerTerminated(ParallelState *pstate) @@ -1271,7 +1274,7 @@ HasEveryWorkerTerminated(ParallelState *pstate) for (i = 0; i < pstate->numWorkers; i++) { - if (pstate->parallelSlot[i].workerStatus != WRKR_TERMINATED) + if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus)) return false; } return true; @@ -1603,7 +1606,7 @@ getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker) FD_ZERO(&workerset); for (i = 0; i < pstate->numWorkers; i++) { - if (pstate->parallelSlot[i].workerStatus == WRKR_TERMINATED) + if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus)) continue; FD_SET(pstate->parallelSlot[i].pipeRead, &workerset); if (pstate->parallelSlot[i].pipeRead > maxFd) @@ -1628,6 +1631,8 @@ getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker) { char *msg; + if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus)) + continue; if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset)) continue;
pgsql-hackers by date: