Re: Parallel Append implementation

From: Amit Khandekar <amitdkhan(dot)pg(at)gmail(dot)com>
To: Andres Freund <andres(at)anarazel(dot)de>
Cc: Robert Haas <robertmhaas(at)gmail(dot)com>, Ashutosh Bapat <ashutosh(dot)bapat(at)enterprisedb(dot)com>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: Parallel Append implementation
Date: 2017-04-04 07:07:59
Message-ID: CAJ3gD9dD9-VcT9T+DMtYE6ZKRZmmKoWjCgxbTKcT-34NULy17w@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On 4 April 2017 at 01:47, Andres Freund <andres(at)anarazel(dot)de> wrote:
>> +typedef struct ParallelAppendDescData
>> +{
>> + LWLock pa_lock; /* mutual exclusion to choose next subplan */
>> + int pa_first_plan; /* plan to choose while wrapping around plans */
>> + int pa_next_plan; /* next plan to choose by any worker */
>> +
>> + /*
>> + * pa_finished : workers currently executing the subplan. A worker which
>> + * finishes a subplan should set pa_finished to true, so that no new
>> + * worker picks this subplan. For non-partial subplan, a worker which picks
>> + * up that subplan should immediately set to true, so as to make sure
>> + * there are no more than 1 worker assigned to this subplan.
>> + */
>> + bool pa_finished[FLEXIBLE_ARRAY_MEMBER];
>> +} ParallelAppendDescData;
>
>
>> +typedef ParallelAppendDescData *ParallelAppendDesc;
>
> Pointer hiding typedefs make this Andres sad.

Yeah .. was trying to be consistent with other parts of code where we
have typedefs for both structure and a pointer to that structure.

>
>
>
>> @@ -291,6 +362,276 @@ ExecReScanAppend(AppendState *node)
>> if (subnode->chgParam == NULL)
>> ExecReScan(subnode);
>> }
>> +
>> + if (padesc)
>> + {
>> + padesc->pa_first_plan = padesc->pa_next_plan = 0;
>> + memset(padesc->pa_finished, 0, sizeof(bool) * node->as_nplans);
>> + }
>> +
>
> Is it actually guaranteed that none of the parallel workers are doing
> something at that point?

ExecReScanAppend() would be called by ExecReScanGather().
ExecReScanGather() shuts down all the parallel workers before calling
its child node (i.e. ExecReScanAppend).

>> +static bool
>> +exec_append_parallel_next(AppendState *state)
>> +{
>> + ParallelAppendDesc padesc = state->as_padesc;
>> + int whichplan;
>> + int initial_plan;
>> + int first_partial_plan = ((Append *)state->ps.plan)->first_partial_plan;
>> + bool found;
>> +
>> + Assert(padesc != NULL);
>> +
>> + /* Backward scan is not supported by parallel-aware plans */
>> + Assert(ScanDirectionIsForward(state->ps.state->es_direction));
>> +
>> + /* The parallel leader chooses its next subplan differently */
>> + if (!IsParallelWorker())
>> + return exec_append_leader_next(state);
>
> It's a bit weird that the leader's case does is so separate, and does
> it's own lock acquisition.

Since we wanted to prevent it from taking the most expensive
non-partial plans first , thought it would be better to keep its logic
simple and separate, so could not merge it in the main logic code.

>
>
>> + found = false;
>> + for (whichplan = initial_plan; whichplan != PA_INVALID_PLAN;)
>> + {
>> + /*
>> + * Ignore plans that are already done processing. These also include
>> + * non-partial subplans which have already been taken by a worker.
>> + */
>> + if (!padesc->pa_finished[whichplan])
>> + {
>> + found = true;
>> + break;
>> + }
>> +
>> + /*
>> + * Note: There is a chance that just after the child plan node is
>> + * chosen above, some other worker finishes this node and sets
>> + * pa_finished to true. In that case, this worker will go ahead and
>> + * call ExecProcNode(child_node), which will return NULL tuple since it
>> + * is already finished, and then once again this worker will try to
>> + * choose next subplan; but this is ok : it's just an extra
>> + * "choose_next_subplan" operation.
>> + */
>
> IIRC not all node types are safe against being executed again when
> they've previously returned NULL. That's why e.g. nodeMaterial.c
> contains the following blurb:
> /*
> * If necessary, try to fetch another row from the subplan.
> *
> * Note: the eof_underlying state variable exists to short-circuit further
> * subplan calls. It's not optional, unfortunately, because some plan
> * node types are not robust about being called again when they've already
> * returned NULL.
> */

This scenario is different from the parallel append scenario described
by my comment. A worker sets pa_finished to true only when it itself
gets a NULL tuple for a given subplan. So in
exec_append_parallel_next(), suppose a worker W1 finds a subplan with
pa_finished=false. So it chooses it. Now a different worker W2 sets
this subplan's pa_finished=true because W2 has got a NULL tuple. But
W1 hasn't yet got a NULL tuple. If it had got a NULL tuple earlier, it
would have itself set pa_finished to true, and then it would have
never again chosen this subplan. So effectively, a worker would never
execute the same subplan once that subplan returns NULL.

>
>
>> + else if (IsA(subpath, MergeAppendPath))
>> + {
>> + MergeAppendPath *mpath = (MergeAppendPath *) subpath;
>> +
>> + /*
>> + * If at all MergeAppend is partial, all its child plans have to be
>> + * partial : we don't currently support a mix of partial and
>> + * non-partial MergeAppend subpaths.
>> + */
>
> Why is that?

The mix of partial and non-partial subplans is being implemented only
for Append plan. In the future if and when we extend this support for
MergeAppend, then we would need to change this. Till then, we can
assume that if MergeAppend is partial, all it child plans have to be
partial otherwise there woudn't have been a partial MergeAppendPath.

BTW MergeAppendPath currently is itself never partial. So in the
comment it is mentioned "If at all MergeAppend is partial".

>
>
>
>> +int
>> +get_append_num_workers(List *partial_subpaths, List *nonpartial_subpaths)
>> +{
>> + ListCell *lc;
>> + double log2w;
>> + int num_workers;
>> + int max_per_plan_workers;
>> +
>> + /*
>> + * log2(number_of_subpaths)+1 formula seems to give an appropriate number of
>> + * workers for Append path either having high number of children (> 100) or
>> + * having all non-partial subpaths or subpaths with 1-2 parallel_workers.
>> + * Whereas, if the subpaths->parallel_workers is high, this formula is not
>> + * suitable, because it does not take into account per-subpath workers.
>> + * For e.g., with workers (2, 8, 8),
>
> That's the per-subplan workers for three subplans? That's not
> necessarily clear.

Right. Corrected it to : "3 subplans having per-subplan workers such
as (2, 8, 8)"

>
>
>> the Append workers should be at least
>> + * 8, whereas the formula gives 2. In this case, it seems better to follow
>> + * the method used for calculating parallel_workers of an unpartitioned
>> + * table : log3(table_size). So we treat the UNION query as if the data
>
> Which "UNION query"?

Changed it to "partitioned table". The idea is : treat all the data of
a partitioned table as if it belonged to a single non-partitioned
table, and then calculate the workers for such a table. It may not
exactly apply for UNION query because that can involve different
tables and with joins too. So replaced UNION query to partitioned
table.

>
>
>> + * belongs to a single unpartitioned table, and then derive its workers. So
>> + * it will be : logb(b^w1 + b^w2 + b^w3) where w1, w2.. are per-subplan
>> + * workers and b is some logarithmic base such as 2 or 3. It turns out that
>> + * this evaluates to a value just a bit greater than max(w1,w2, w3). So, we
>> + * just use the maximum of workers formula. But this formula gives too few
>> + * workers when all paths have single worker (meaning they are non-partial)
>> + * For e.g. with workers : (1, 1, 1, 1, 1, 1), it is better to allocate 3
>> + * workers, whereas this method allocates only 1.
>> + * So we use whichever method that gives higher number of workers.
>> + */
>> +
>> + /* Get log2(num_subpaths) */
>> + log2w = fls(list_length(partial_subpaths) +
>> + list_length(nonpartial_subpaths));
>> +
>> + /* Avoid further calculations if we already crossed max workers limit */
>> + if (max_parallel_workers_per_gather <= log2w + 1)
>> + return max_parallel_workers_per_gather;
>> +
>> +
>> + /*
>> + * Get the parallel_workers value of the partial subpath having the highest
>> + * parallel_workers.
>> + */
>> + max_per_plan_workers = 1;
>> + foreach(lc, partial_subpaths)
>> + {
>> + Path *subpath = lfirst(lc);
>> + max_per_plan_workers = Max(max_per_plan_workers,
>> + subpath->parallel_workers);
>> + }
>> +
>> + /* Choose the higher of the results of the two formulae */
>> + num_workers = rint(Max(log2w, max_per_plan_workers) + 1);
>> +
>> + /* In no case use more than max_parallel_workers_per_gather workers. */
>> + num_workers = Min(num_workers, max_parallel_workers_per_gather);
>> +
>> + return num_workers;
>> +}
>
> Hm. I'm not really convinced by the logic here. Wouldn't it be better
> to try to compute the minimum total cost across all workers for
> 1..#max_workers for the plans in an iterative manner? I.e. try to map
> each of the subplans to 1 (if non-partial) or N workers (partial) using
> some fitting algorith (e.g. always choosing the worker(s) that currently
> have the least work assigned). I think the current algorithm doesn't
> lead to useful #workers for e.g. cases with a lot of non-partial,
> high-startup plans - imo a quite reasonable scenario.

Have responded in a separate reply.

>
>
> I'm afraid this is too late for v10 - do you agree?

I am not exactly sure; may be it depends upon how much more review
comments would follow this week. I anticipate there would not be any
high level/design-level changes now.

Attached is an updated patch v13 that has some comments changed as per
your review, and also rebased on latest master.

Attachment Content-Type Size
ParallelAppend_v13.patch application/octet-stream 57.3 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Tatsuo Ishii 2017-04-04 07:10:32 Re: Statement timeout behavior in extended queries
Previous Message 'Andres Freund' 2017-04-04 06:48:20 Re: Statement timeout behavior in extended queries