diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 54d9ea7be05..b24a415f622 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -550,6 +550,13 @@ LaunchParallelWorkers(ParallelContext *pcxt) /* Restore previous memory context. */ MemoryContextSwitchTo(oldcontext); + + /* + * Cause the next CHECK_FOR_INTERRUPTS() to behave as if we've received + * PROCSIG_PARALLEL_MESSAGE, as part of the protocol for detecting + * failed forks. + */ + ParallelMessagePending = true; } /* @@ -874,7 +881,26 @@ HandleParallelMessages(void) res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes, &data, true); - if (res == SHM_MQ_WOULD_BLOCK) + if (res == SHM_MQ_WOULD_BLOCK_NOT_YET_ATTACHED) + { + /* + * Until we've heard from every single worker, we'll + * keep behaving as if we've received PROCSIG_PARALLEL_MESSAGE. + * This means that every CHECK_FOR_INTERRUPTS() will cause + * us to check for backends that never started up. + * This is necessary because the postmaster isn't allowed + * to set PROCSIG_PARALLEL_MESSAGE to tell us about fork + * failure, but it will signal us causing any correctly + * coded latch wait loop to return here so that we can + * poll for workers that failed to fork. In other words, + * we'll take the extremely pessmistic view that every + * latch-set might be such a failure, and keep checking + * for it until we know it's definitely not the case. + */ + ParallelMessagePending = true; + break; + } + else if (res == SHM_MQ_WOULD_BLOCK) break; else if (res == SHM_MQ_SUCCESS) { diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c index ecdbe7f79f6..feaf999a2f0 100644 --- a/src/backend/executor/tqueue.c +++ b/src/backend/executor/tqueue.c @@ -191,7 +191,8 @@ TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done) } /* In non-blocking mode, bail out if no message ready yet. */ - if (result == SHM_MQ_WOULD_BLOCK) + if (result == SHM_MQ_WOULD_BLOCK || + result == SHM_MQ_WOULD_BLOCK_NOT_YET_ATTACHED) return NULL; Assert(result == SHM_MQ_SUCCESS); diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c index 201075dd477..90cded1e425 100644 --- a/src/backend/libpq/pqmq.c +++ b/src/backend/libpq/pqmq.c @@ -165,7 +165,8 @@ mq_putmessage(char msgtype, const char *s, size_t len) PROCSIG_PARALLEL_MESSAGE, pq_mq_parallel_master_backend_id); - if (result != SHM_MQ_WOULD_BLOCK) + if (result != SHM_MQ_WOULD_BLOCK && + result != SHM_MQ_WOULD_BLOCK_NOT_YET_ATTACHED) break; WaitLatch(MyLatch, WL_LATCH_SET, 0, diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h index f85f2eb7d17..f9800c7edec 100644 --- a/src/include/storage/shm_mq.h +++ b/src/include/storage/shm_mq.h @@ -37,6 +37,7 @@ typedef enum { SHM_MQ_SUCCESS, /* Sent or received a message. */ SHM_MQ_WOULD_BLOCK, /* Not completed; retry later. */ + SHM_MQ_WOULD_BLOCK_NOT_YET_ATTACHED, SHM_MQ_DETACHED /* Other process has detached queue. */ } shm_mq_result;