Re: Parallel Append implementation - Mailing list pgsql-hackers

From Andres Freund
Subject Re: Parallel Append implementation
Date
Msg-id 20170403201755.gdytjulvytzjf2sr@alap3.anarazel.de
Whole thread Raw
In response to Re: Parallel Append implementation  (Amit Khandekar <amitdkhan.pg@gmail.com>)
Responses Re: Parallel Append implementation  (Robert Haas <robertmhaas@gmail.com>)
List pgsql-hackers
Hi,


On 2017-03-24 21:32:57 +0530, Amit Khandekar wrote:
> diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
> index a107545..e9e8676 100644
> --- a/src/backend/executor/nodeAppend.c
> +++ b/src/backend/executor/nodeAppend.c
> @@ -59,9 +59,47 @@
>  
>  #include "executor/execdebug.h"
>  #include "executor/nodeAppend.h"
> +#include "miscadmin.h"
> +#include "optimizer/cost.h"
> +#include "storage/spin.h"
> +
> +/*
> + * Shared state for Parallel Append.
> + *
> + * Each backend participating in a Parallel Append has its own
> + * descriptor in backend-private memory, and those objects all contain
> + * a pointer to this structure.
> + */
> +typedef struct ParallelAppendDescData
> +{
> +    LWLock        pa_lock;        /* mutual exclusion to choose next subplan */
> +    int            pa_first_plan;    /* plan to choose while wrapping around plans */
> +    int            pa_next_plan;    /* next plan to choose by any worker */
> +
> +    /*
> +     * pa_finished : workers currently executing the subplan. A worker which
> +     * finishes a subplan should set pa_finished to true, so that no new
> +     * worker picks this subplan. For non-partial subplan, a worker which picks
> +     * up that subplan should immediately set to true, so as to make sure
> +     * there are no more than 1 worker assigned to this subplan.
> +     */
> +    bool        pa_finished[FLEXIBLE_ARRAY_MEMBER];
> +} ParallelAppendDescData;


> +typedef ParallelAppendDescData *ParallelAppendDesc;

Pointer hiding typedefs make this Andres sad.



> @@ -291,6 +362,276 @@ ExecReScanAppend(AppendState *node)
>          if (subnode->chgParam == NULL)
>              ExecReScan(subnode);
>      }
> +
> +    if (padesc)
> +    {
> +        padesc->pa_first_plan = padesc->pa_next_plan = 0;
> +        memset(padesc->pa_finished, 0, sizeof(bool) * node->as_nplans);
> +    }
> +

Is it actually guaranteed that none of the parallel workers are doing
something at that point?


> +/* ----------------------------------------------------------------
> + *        exec_append_parallel_next
> + *
> + *        Determine the next subplan that should be executed. Each worker uses a
> + *        shared next_subplan counter index to start looking for unfinished plan,
> + *        executes the subplan, then shifts ahead this counter to the next
> + *        subplan, so that other workers know which next plan to choose. This
> + *        way, workers choose the subplans in round robin order, and thus they
> + *        get evenly distributed among the subplans.
> + *
> + *        Returns false if and only if all subplans are already finished
> + *        processing.
> + * ----------------------------------------------------------------
> + */
> +static bool
> +exec_append_parallel_next(AppendState *state)
> +{
> +    ParallelAppendDesc padesc = state->as_padesc;
> +    int        whichplan;
> +    int        initial_plan;
> +    int        first_partial_plan = ((Append *)state->ps.plan)->first_partial_plan;
> +    bool    found;
> +
> +    Assert(padesc != NULL);
> +
> +    /* Backward scan is not supported by parallel-aware plans */
> +    Assert(ScanDirectionIsForward(state->ps.state->es_direction));
> +
> +    /* The parallel leader chooses its next subplan differently */
> +    if (!IsParallelWorker())
> +        return exec_append_leader_next(state);

It's a bit weird that the leader's case does is so separate, and does
it's own lock acquisition.


> +    found = false;
> +    for (whichplan = initial_plan; whichplan != PA_INVALID_PLAN;)
> +    {
> +        /*
> +         * Ignore plans that are already done processing. These also include
> +         * non-partial subplans which have already been taken by a worker.
> +         */
> +        if (!padesc->pa_finished[whichplan])
> +        {
> +            found = true;
> +            break;
> +        }
> +
> +        /*
> +         * Note: There is a chance that just after the child plan node is
> +         * chosen above, some other worker finishes this node and sets
> +         * pa_finished to true. In that case, this worker will go ahead and
> +         * call ExecProcNode(child_node), which will return NULL tuple since it
> +         * is already finished, and then once again this worker will try to
> +         * choose next subplan; but this is ok : it's just an extra
> +         * "choose_next_subplan" operation.
> +         */

IIRC not all node types are safe against being executed again when
they've previously returned NULL.  That's why e.g. nodeMaterial.c
contains the following blurb:/* * If necessary, try to fetch another row from the subplan. * * Note: the eof_underlying
statevariable exists to short-circuit further * subplan calls.  It's not optional, unfortunately, because some plan *
nodetypes are not robust about being called again when they've already * returned NULL. */
 


> +    else if (IsA(subpath, MergeAppendPath))
> +    {
> +        MergeAppendPath *mpath = (MergeAppendPath *) subpath;
> +
> +        /*
> +         * If at all MergeAppend is partial, all its child plans have to be
> +         * partial : we don't currently support a mix of partial and
> +         * non-partial MergeAppend subpaths.
> +         */

Why is that?



> +int
> +get_append_num_workers(List *partial_subpaths, List *nonpartial_subpaths)
> +{
> +    ListCell   *lc;
> +    double        log2w;
> +    int            num_workers;
> +    int            max_per_plan_workers;
> +
> +    /*
> +     * log2(number_of_subpaths)+1 formula seems to give an appropriate number of
> +     * workers for Append path either having high number of children (> 100) or
> +     * having all non-partial subpaths or subpaths with 1-2 parallel_workers.
> +     * Whereas, if the subpaths->parallel_workers is high, this formula is not
> +     * suitable, because it does not take into account per-subpath workers.
> +     * For e.g., with workers (2, 8, 8),

That's the per-subplan workers for three subplans?  That's not
necessarily clear.


> the Append workers should be at least
> +     * 8, whereas the formula gives 2. In this case, it seems better to follow
> +     * the method used for calculating parallel_workers of an unpartitioned
> +     * table : log3(table_size). So we treat the UNION query as if the data

Which "UNION query"?


> +     * belongs to a single unpartitioned table, and then derive its workers. So
> +     * it will be : logb(b^w1 + b^w2 + b^w3) where w1, w2.. are per-subplan
> +     * workers and b is some logarithmic base such as 2 or 3. It turns out that
> +     * this evaluates to a value just a bit greater than max(w1,w2, w3). So, we
> +     * just use the maximum of workers formula. But this formula gives too few
> +     * workers when all paths have single worker (meaning they are non-partial)
> +     * For e.g. with workers : (1, 1, 1, 1, 1, 1), it is better to allocate 3
> +     * workers, whereas this method allocates only 1.
> +     * So we use whichever method that gives higher number of workers.
> +     */
> +
> +    /* Get log2(num_subpaths) */
> +    log2w = fls(list_length(partial_subpaths) +
> +                list_length(nonpartial_subpaths));
> +
> +    /* Avoid further calculations if we already crossed max workers limit */
> +    if (max_parallel_workers_per_gather <= log2w + 1)
> +        return max_parallel_workers_per_gather;
> +
> +
> +    /*
> +     * Get the parallel_workers value of the partial subpath having the highest
> +     * parallel_workers.
> +     */
> +    max_per_plan_workers = 1;
> +    foreach(lc, partial_subpaths)
> +    {
> +        Path       *subpath = lfirst(lc);
> +        max_per_plan_workers = Max(max_per_plan_workers,
> +                                   subpath->parallel_workers);
> +    }
> +
> +    /* Choose the higher of the results of the two formulae */
> +    num_workers = rint(Max(log2w, max_per_plan_workers) + 1);
> +
> +    /* In no case use more than max_parallel_workers_per_gather workers. */
> +    num_workers = Min(num_workers, max_parallel_workers_per_gather);
> +
> +    return num_workers;
> +}

Hm.  I'm not really convinced by the logic here.  Wouldn't it be better
to try to compute the minimum total cost across all workers for
1..#max_workers for the plans in an iterative manner?  I.e. try to map
each of the subplans to 1 (if non-partial) or N workers (partial) using
some fitting algorith (e.g. always choosing the worker(s) that currently
have the least work assigned).  I think the current algorithm doesn't
lead to useful #workers for e.g. cases with a lot of non-partial,
high-startup plans - imo a quite reasonable scenario.


I'm afraid this is too late for v10 - do you agree?

- Andres



pgsql-hackers by date:

Previous
From: Andrew Dunstan
Date:
Subject: Re: [PATCH] few fts functions for jsonb
Next
From: Andres Freund
Date:
Subject: Re: Making clausesel.c Smarter