diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c index 62c399e..2d7eb71 100644 --- a/src/backend/executor/nodeGatherMerge.c +++ b/src/backend/executor/nodeGatherMerge.c @@ -195,9 +195,9 @@ ExecGatherMerge(GatherMergeState *node) /* Set up tuple queue readers to read the results. */ if (pcxt->nworkers_launched > 0) { - node->nreaders = 0; - node->reader = palloc(pcxt->nworkers_launched * - sizeof(TupleQueueReader *)); + node->reader = (TupleQueueReader **) + palloc(pcxt->nworkers_launched * + sizeof(TupleQueueReader *)); Assert(gm->numCols); @@ -205,7 +205,7 @@ ExecGatherMerge(GatherMergeState *node) { shm_mq_set_handle(node->pei->tqueue[i], pcxt->worker[i].bgwhandle); - node->reader[node->nreaders++] = + node->reader[i] = CreateTupleQueueReader(node->pei->tqueue[i], node->tupDesc); } @@ -298,7 +298,7 @@ ExecShutdownGatherMergeWorkers(GatherMergeState *node) { int i; - for (i = 0; i < node->nreaders; ++i) + for (i = 0; i < node->nworkers_launched; ++i) if (node->reader[i]) DestroyTupleQueueReader(node->reader[i]); @@ -344,28 +344,26 @@ ExecReScanGatherMerge(GatherMergeState *node) static void gather_merge_init(GatherMergeState *gm_state) { - int nreaders = gm_state->nreaders; + int nslots = gm_state->nworkers_launched + 1; bool initialize = true; int i; /* * Allocate gm_slots for the number of worker + one more slot for leader. - * Last slot is always for leader. Leader always calls ExecProcNode() to - * read the tuple which will return the TupleTableSlot. Later it will - * directly get assigned to gm_slot. So just initialize leader gm_slot - * with NULL. For other slots below code will call - * ExecInitExtraTupleSlot() which will do the initialization of worker - * slots. + * The final slot in the array is reserved for the leader process. This + * slot is always populated via ExecProcNode(). This can be set to NULL + * for now. The remaining slots we'll initialize with a call to + * ExecInitExtraTupleSlot(). */ - gm_state->gm_slots = - palloc((gm_state->nreaders + 1) * sizeof(TupleTableSlot *)); - gm_state->gm_slots[gm_state->nreaders] = NULL; + gm_state->gm_slots = (TupleTableSlot **) + palloc(nslots * sizeof(TupleTableSlot *)); + gm_state->gm_slots[nslots - 1] = NULL; /* nullify leader's slot */ - /* Initialize the tuple slot and tuple array for each worker */ + /* Initialize the tuple slot and tuple array for each reader */ gm_state->gm_tuple_buffers = - (GMReaderTupleBuffer *) palloc0(sizeof(GMReaderTupleBuffer) * - (gm_state->nreaders + 1)); - for (i = 0; i < gm_state->nreaders; i++) + (GMReaderTupleBuffer *) palloc0(sizeof(GMReaderTupleBuffer) * nslots); + + for (i = 0; i < gm_state->nworkers_launched; i++) { /* Allocate the tuple array with MAX_TUPLE_STORE size */ gm_state->gm_tuple_buffers[i].tuple = @@ -378,7 +376,7 @@ gather_merge_init(GatherMergeState *gm_state) } /* Allocate the resources for the merge */ - gm_state->gm_heap = binaryheap_allocate(gm_state->nreaders + 1, + gm_state->gm_heap = binaryheap_allocate(nslots, heap_compare_slots, gm_state); @@ -388,10 +386,10 @@ gather_merge_init(GatherMergeState *gm_state) * leader. After this, if all active workers are unable to produce a * tuple, then re-read and this time use wait mode. For workers that were * able to produce a tuple in the earlier loop and are still active, just - * try to fill the tuple array if more tuples are avaiable. + * try to fill the tuple array if more tuples are available. */ reread: - for (i = 0; i < nreaders + 1; i++) + for (i = 0; i < nslots; i++) { if (!gm_state->gm_tuple_buffers[i].done && (TupIsNull(gm_state->gm_slots[i]) || @@ -408,7 +406,7 @@ reread: } initialize = false; - for (i = 0; i < nreaders; i++) + for (i = 0; i < nslots; i++) if (!gm_state->gm_tuple_buffers[i].done && (TupIsNull(gm_state->gm_slots[i]) || gm_state->gm_slots[i]->tts_isempty)) @@ -419,14 +417,14 @@ reread: } /* - * Clear out the tuple table slots for each gather merge input. + * Clear out the tuple table slots for each gather merge workers. */ static void gather_merge_clear_slots(GatherMergeState *gm_state) { int i; - for (i = 0; i < gm_state->nreaders; i++) + for (i = 0; i < gm_state->nworkers_launched; i++) { pfree(gm_state->gm_tuple_buffers[i].tuple); gm_state->gm_slots[i] = ExecClearTuple(gm_state->gm_slots[i]); @@ -492,13 +490,15 @@ gather_merge_getnext(GatherMergeState *gm_state) static void form_tuple_array(GatherMergeState *gm_state, int reader) { - GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[reader]; + GMReaderTupleBuffer *tuple_buffer; int i; /* Last slot is for leader and we don't build tuple array for leader */ - if (reader == gm_state->nreaders) + if (reader == gm_state->nworkers_launched) return; + tuple_buffer = &gm_state->gm_tuple_buffers[reader]; + /* * We here because we already read all the tuples from the tuple array, so * initialize the counter to zero. @@ -537,7 +537,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) * If we're being asked to generate a tuple from the leader, then we * just call ExecProcNode as normal to produce one. */ - if (gm_state->nreaders == reader) + if (gm_state->nworkers_launched == reader) { if (gm_state->need_to_scan_locally) { diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 11a6850..e8c08c6 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1864,18 +1864,20 @@ typedef struct GatherMergeState PlanState ps; /* its first field is NodeTag */ bool initialized; struct ParallelExecutorInfo *pei; - int nreaders; - int nworkers_launched; - struct TupleQueueReader **reader; + int nworkers_launched; /* number of parallel workers launched */ + struct TupleQueueReader **reader; /* array of readers, nworkers_launched + * long */ TupleDesc tupDesc; - TupleTableSlot **gm_slots; - struct binaryheap *gm_heap; /* binary heap of slot indices */ + TupleTableSlot **gm_slots; /* array of Tuple slots, nworkers_launched + 1 + * long */ + struct binaryheap *gm_heap; /* binary heap of slot indices */ bool gm_initialized; /* gather merge initilized ? */ bool need_to_scan_locally; int gm_nkeys; SortSupport gm_sortkeys; /* array of length ms_nkeys */ - struct GMReaderTupleBuffer *gm_tuple_buffers; /* tuple buffer per - * reader */ + struct GMReaderTupleBuffer *gm_tuple_buffers; /* array of tuple buffers, + * nworkers_launched + 1 + * long */ } GatherMergeState; /* ----------------