Re: Parallel Append implementation

From: Ashutosh Bapat <ashutosh(dot)bapat(at)enterprisedb(dot)com>
To: Amit Khandekar <amitdkhan(dot)pg(at)gmail(dot)com>
Cc: pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: Parallel Append implementation
Date: 2017-01-06 11:34:09
Message-ID: CAFjFpRe_ROid1Bnv=kSi_6Ahr90aB1ngTA8xBzr3f290FgO5xw@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Fri, Dec 23, 2016 at 10:51 AM, Amit Khandekar <amitdkhan(dot)pg(at)gmail(dot)com> wrote:
> Currently an Append plan node does not execute its subplans in
> parallel. There is no distribution of workers across its subplans. The
> second subplan starts running only after the first subplan finishes,
> although the individual subplans may be running parallel scans.
>
> Secondly, we create a partial Append path for an appendrel, but we do
> that only if all of its member subpaths are partial paths. If one or
> more of the subplans is a non-parallel path, there will be only a
> non-parallel Append. So whatever node is sitting on top of Append is
> not going to do a parallel plan; for example, a select count(*) won't
> divide it into partial aggregates if the underlying Append is not
> partial.
>
> The attached patch removes both of the above restrictions. There has
> already been a mail thread [1] that discusses an approach suggested by
> Robert Haas for implementing this feature. This patch uses this same
> approach.

The first goal requires some kind of synchronization which will allow workers
to be distributed across the subplans. The second goal requires some kind of
synchronization to prevent multiple workers from executing non-parallel
subplans. The patch uses different mechanisms to achieve the goals. If
we create two different patches addressing each goal, those may be
easier to handle.

We may want to think about a third goal: preventing too many workers
from executing the same plan. As per comment in get_parallel_divisor()
we do not see any benefit if more than 4 workers execute the same
node. So, an append node can distribute more than 4 worker nodes
equally among the available subplans. It might be better to do that as
a separate patch.

>
> Attached is pgbench_create_partition.sql (derived from the one
> included in the above thread) that distributes pgbench_accounts table
> data into 3 partitions pgbench_account_[1-3]. The below queries use
> this schema.
>
> Consider a query such as :
> select count(*) from pgbench_accounts;
>
> Now suppose, these two partitions do not allow parallel scan :
> alter table pgbench_accounts_1 set (parallel_workers=0);
> alter table pgbench_accounts_2 set (parallel_workers=0);
>
> On HEAD, due to some of the partitions having non-parallel scans, the
> whole Append would be a sequential scan :
>
> Aggregate
> -> Append
> -> Index Only Scan using pgbench_accounts_pkey on pgbench_accounts
> -> Seq Scan on pgbench_accounts_1
> -> Seq Scan on pgbench_accounts_2
> -> Seq Scan on pgbench_accounts_3
>
> Whereas, with the patch, the Append looks like this :
>
> Finalize Aggregate
> -> Gather
> Workers Planned: 6
> -> Partial Aggregate
> -> Parallel Append
> -> Parallel Seq Scan on pgbench_accounts
> -> Seq Scan on pgbench_accounts_1
> -> Seq Scan on pgbench_accounts_2
> -> Parallel Seq Scan on pgbench_accounts_3
>
> Above, Parallel Append is generated, and it executes all these
> subplans in parallel, with 1 worker executing each of the sequential
> scans, and multiple workers executing each of the parallel subplans.
>
>
> ======= Implementation details ========
>
> ------- Adding parallel-awareness -------
>
> In a given worker, this Append plan node will be executing just like
> the usual partial Append node. It will run a subplan until completion.
> The subplan may or may not be a partial parallel-aware plan like
> parallelScan. After the subplan is done, Append will choose the next
> subplan. It is here where it will be different than the current
> partial Append plan: it is parallel-aware. The Append nodes in the
> workers will be aware that there are other Append nodes running in
> parallel. The partial Append will have to coordinate with other Append
> nodes while choosing the next subplan.
>
> ------- Distribution of workers --------
>
> The coordination info is stored in a shared array, each element of
> which describes the per-subplan info. This info contains the number of
> workers currently executing the subplan, and the maximum number of
> workers that should be executing it at the same time. For non-partial
> sublans, max workers would always be 1. For choosing the next subplan,
> the Append executor will sequentially iterate over the array to find a
> subplan having the least number of workers currently being executed,
> AND which is not already being executed by the maximum number of
> workers assigned for the subplan. Once it gets one, it increments
> current_workers, and releases the Spinlock, so that other workers can
> choose their next subplan if they are waiting.
>
> This way, workers would be fairly distributed across subplans.
>
> The shared array needs to be initialized and made available to
> workers. For this, we can do exactly what sequential scan does for
> being parallel-aware : Using function ExecAppendInitializeDSM()
> similar to ExecSeqScanInitializeDSM() in the backend to allocate the
> array. Similarly, for workers, have ExecAppendInitializeWorker() to
> retrieve the shared array.
>
>
> -------- Generating Partial Append plan having non-partial subplans --------
>
> In set_append_rel_pathlist(), while generating a partial path for
> Append, also include the non-partial child subpaths, besides the
> partial subpaths. This way, it can contain a mix of partial and
> non-partial children paths. But for a given child, its path would be
> either the cheapest partial path or the cheapest non-partial path.
>
> For a non-partial child path, it will only be included if it is
> parallel-safe. If there is no parallel-safe path, a partial Append
> path would not be generated. This behaviour also automatically
> prevents paths that have a Gather node beneath.
>
> Finally when it comes to create a partial append path using these
> child paths, we also need to store a bitmap set indicating which of
> the child paths are non-partial paths. For this, have a new BitmapSet
> field : Append->partial_subplans. At execution time, this will be used
> to set the maximum workers for a non-partial subpath to 1.
>

We may be able to eliminate this field. Please check comment 6 below.

>
> -------- Costing -------
>
> For calculating per-worker parallel Append path cost, it first
> calculates a total of child subplan costs considering all of their
> workers, and then divides it by the Append node's parallel_divisor,
> similar to how parallel scan uses this "parallel_divisor".
>
> For startup cost, it is assumed that Append would start returning
> tuples when the child node having the lowest startup cost is done
> setting up. So Append startup cost is equal to startup cost of the
> child with minimum startup cost.
>
>
> -------- Scope --------
>
> There are two different code paths where Append path is generated.
> 1. One is where append rel is generated : Inheritance table, and UNION
> ALL clause.
> 2. Second codepath is in prepunion.c. This gets executed for UNION
> (without ALL) and INTERSECT/EXCEPT [ALL]. The patch does not support
> Parallel Append in this scenario. It can be later taken up as
> extension, once this patch is reviewed.
>
>

Here are some review comments

1. struct ParallelAppendDescData is being used at other places. The declaration
style doesn't seem to be very common in the code or in the directory where the
file is located.
+struct ParallelAppendDescData
+{
+ slock_t pa_mutex; /* mutual exclusion to choose
next subplan */
+ parallel_append_info pa_info[FLEXIBLE_ARRAY_MEMBER];
+};
Defining it like
typdef struct ParallelAppendDescData
{
slock_t pa_mutex; /* mutual exclusion to choose next
subplan */
parallel_append_info pa_info[FLEXIBLE_ARRAY_MEMBER];
};
will make its use handy. Instead of struct ParallelAppendDescData, you will
need to use just ParallelAppendDescData. May be we want to rename
parallel_append_info as ParallelAppendInfo and change the style to match other
declarations.

2. The comment below refers to the constant which it describes, which looks
odd. May be it should be worded as "A special value of
AppendState::as_whichplan, to indicate no plans left to be executed.". Also
using INVALID for "no plans left ..." seems to be a misnomer.
/*
* For Parallel Append, AppendState::as_whichplan can have PA_INVALID_PLAN
* value, which indicates there are no plans left to be executed.
*/
#define PA_INVALID_PLAN -1

3. The sentence "We have got NULL", looks odd. Probably we don't need it as
it's clear from the code above that this code deals with the case when the
current subplan didn't return any row.
/*
* We have got NULL. There might be other workers still processing the
* last chunk of rows for this same node, but there's no point for new
* workers to run this node, so mark this node as finished.
*/
4. In the same comment, I guess, the word "node" refers to "subnode" and not
the node pointed by variable "node". May be you want to use word "subplan"
here.

4. set_finished()'s prologue has different indentation compared to other
functions in the file.

5. Multilevel comment starts with an empty line.
+ /* Keep track of the node with the least workers so far. For the very

6. By looking at parallel_worker field of a path, we can say whether it's
partial or not. We probably do not require to maintain a bitmap for that at in
the Append path. The bitmap can be constructed, if required, at the time of
creating the partial append plan. The reason to take this small step is 1. we
want to minimize our work at the time of creating paths, 2. while freeing a
path in add_path, we don't free the internal structures, in this case the
Bitmap, which will waste memory if the path is not chosen while planning.

7. If we consider 6, we don't need concat_append_subpaths(), but still here are
some comments about that function. Instead of accepting two separate arguments
childpaths and child_partial_subpaths_set, which need to be in sync, we can
just pass the path which contains both of those. In the same following code may
be optimized by adding a utility function to Bitmapset, which advances
all members
by given offset and using that function with bms_union() to merge the
bitmapset e.g.
bms_union(*partial_subpaths_set,
bms_advance_members(bms_copy(child_partial_subpaths_set), append_subpath_len));
if (partial_subpaths_set)
{
for (i = 0; i < list_length(childpaths); i++)
{
/*
* The child paths themselves may or may not be partial paths. The
* bitmapset numbers of these paths will need to be set considering
* that these are to be appended onto the partial_subpaths_set.
*/
if (!child_partial_subpaths_set ||
bms_is_member(i, child_partial_subpaths_set))
{
*partial_subpaths_set = bms_add_member(*partial_subpaths_set,
append_subpath_len + i);
}
}
}

8.
- parallel_workers = Max(parallel_workers, path->parallel_workers);
+ /*
+ * partial_subpaths can have non-partial subpaths so
+ * path->parallel_workers can be 0. For such paths, allocate one
+ * worker.
+ */
+ parallel_workers +=
+ (path->parallel_workers > 0 ? path->parallel_workers : 1);

This looks odd. Earlier code was choosing maximum of all parallel workers,
whereas new code adds them all. E.g. if parallel_workers for subpaths is 3, 4,
3, without your change, it will pick up 4. But with your change it will pick
10. I think, you intend to write this as
parallel_workers = Max(parallel_workers, path->parallel_workers ?
path->parallel_workers : 1);

If you do that probably you don't need since parallel_workers are never set
more than max_parallel_workers_per_gather.
+ /* In no case use more than max_parallel_workers_per_gather. */
+ parallel_workers = Min(parallel_workers,
+ max_parallel_workers_per_gather);
+

9. Shouldn't this funciton return double?
int
get_parallel_divisor(int parallel_workers)

9. In get_parallel_divisor(), if parallel_worker is 0 i.e. it's a partial path
the return value will be 2, which isn't true. This function is being called for
all the subpaths to get the original number of rows and costs of partial paths.
I think we don't need to call this function on subpaths which are not partial
paths or make it work parallel_workers = 0.

10. We should probably move the parallel_safe calculation out of cost_append().
+ path->parallel_safe = path->parallel_safe &&
+ subpath->parallel_safe;

11. This check shouldn't be part of cost_append().
+ /* All child paths must have same parameterization */
+ Assert(bms_equal(PATH_REQ_OUTER(subpath), required_outer));

12. cost_append() essentially adds costs of all the subpaths and then divides
by parallel_divisor. This might work if all the subpaths are partial paths. But
for the subpaths which are not partial, a single worker will incur the whole
cost of that subpath. Hence just dividing all the total cost doesn't seem the
right thing to do. We should apply different logic for costing non-partial
subpaths and partial subpaths.

13. No braces required for single line block
+ /* Increment worker count for the chosen node, if at all we found one. */
+ if (min_whichplan != PA_INVALID_PLAN)
+ {
+ padesc->pa_info[min_whichplan].pa_num_workers++;
+ }

14. exec_append_scan_first() is a one-liner function, should we just inline it?

15. This patch replaces exec_append_initialize_next() with
exec_append_scan_first(). The earlier function was handling backward and
forward scans separately, but the later function doesn't do that. Why?

--
Best Wishes,
Ashutosh Bapat
EnterpriseDB Corporation
The Postgres Database Company

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Michael Paquier 2017-01-06 11:43:14 Re: Odd behavior with PG_TRY
Previous Message Amit Kapila 2017-01-06 11:28:59 Re: UNDO and in-place update