Parallel Append implementation

From: Amit Khandekar <amitdkhan(dot)pg(at)gmail(dot)com>
To: pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Parallel Append implementation
Date: 2016-12-23 05:21:51
Message-ID: CAJ3gD9dy0K_E8r727heqXoBmWZ83HwLFwdcaSSmBQ1+S+vRuUQ@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox
Thread:
Lists: pgsql-hackers

Currently an Append plan node does not execute its subplans in
parallel. There is no distribution of workers across its subplans. The
second subplan starts running only after the first subplan finishes,
although the individual subplans may be running parallel scans.

Secondly, we create a partial Append path for an appendrel, but we do
that only if all of its member subpaths are partial paths. If one or
more of the subplans is a non-parallel path, there will be only a
non-parallel Append. So whatever node is sitting on top of Append is
not going to do a parallel plan; for example, a select count(*) won't
divide it into partial aggregates if the underlying Append is not
partial.

The attached patch removes both of the above restrictions. There has
already been a mail thread [1] that discusses an approach suggested by
Robert Haas for implementing this feature. This patch uses this same
approach.

Attached is pgbench_create_partition.sql (derived from the one
included in the above thread) that distributes pgbench_accounts table
data into 3 partitions pgbench_account_[1-3]. The below queries use
this schema.

Consider a query such as :
select count(*) from pgbench_accounts;

Now suppose, these two partitions do not allow parallel scan :
alter table pgbench_accounts_1 set (parallel_workers=0);
alter table pgbench_accounts_2 set (parallel_workers=0);

On HEAD, due to some of the partitions having non-parallel scans, the
whole Append would be a sequential scan :

Aggregate
-> Append
-> Index Only Scan using pgbench_accounts_pkey on pgbench_accounts
-> Seq Scan on pgbench_accounts_1
-> Seq Scan on pgbench_accounts_2
-> Seq Scan on pgbench_accounts_3

Whereas, with the patch, the Append looks like this :

Finalize Aggregate
-> Gather
Workers Planned: 6
-> Partial Aggregate
-> Parallel Append
-> Parallel Seq Scan on pgbench_accounts
-> Seq Scan on pgbench_accounts_1
-> Seq Scan on pgbench_accounts_2
-> Parallel Seq Scan on pgbench_accounts_3

Above, Parallel Append is generated, and it executes all these
subplans in parallel, with 1 worker executing each of the sequential
scans, and multiple workers executing each of the parallel subplans.

======= Implementation details ========

------- Adding parallel-awareness -------

In a given worker, this Append plan node will be executing just like
the usual partial Append node. It will run a subplan until completion.
The subplan may or may not be a partial parallel-aware plan like
parallelScan. After the subplan is done, Append will choose the next
subplan. It is here where it will be different than the current
partial Append plan: it is parallel-aware. The Append nodes in the
workers will be aware that there are other Append nodes running in
parallel. The partial Append will have to coordinate with other Append
nodes while choosing the next subplan.

------- Distribution of workers --------

The coordination info is stored in a shared array, each element of
which describes the per-subplan info. This info contains the number of
workers currently executing the subplan, and the maximum number of
workers that should be executing it at the same time. For non-partial
sublans, max workers would always be 1. For choosing the next subplan,
the Append executor will sequentially iterate over the array to find a
subplan having the least number of workers currently being executed,
AND which is not already being executed by the maximum number of
workers assigned for the subplan. Once it gets one, it increments
current_workers, and releases the Spinlock, so that other workers can
choose their next subplan if they are waiting.

This way, workers would be fairly distributed across subplans.

The shared array needs to be initialized and made available to
workers. For this, we can do exactly what sequential scan does for
being parallel-aware : Using function ExecAppendInitializeDSM()
similar to ExecSeqScanInitializeDSM() in the backend to allocate the
array. Similarly, for workers, have ExecAppendInitializeWorker() to
retrieve the shared array.

-------- Generating Partial Append plan having non-partial subplans --------

In set_append_rel_pathlist(), while generating a partial path for
Append, also include the non-partial child subpaths, besides the
partial subpaths. This way, it can contain a mix of partial and
non-partial children paths. But for a given child, its path would be
either the cheapest partial path or the cheapest non-partial path.

For a non-partial child path, it will only be included if it is
parallel-safe. If there is no parallel-safe path, a partial Append
path would not be generated. This behaviour also automatically
prevents paths that have a Gather node beneath.

Finally when it comes to create a partial append path using these
child paths, we also need to store a bitmap set indicating which of
the child paths are non-partial paths. For this, have a new BitmapSet
field : Append->partial_subplans. At execution time, this will be used
to set the maximum workers for a non-partial subpath to 1.

-------- Costing -------

For calculating per-worker parallel Append path cost, it first
calculates a total of child subplan costs considering all of their
workers, and then divides it by the Append node's parallel_divisor,
similar to how parallel scan uses this "parallel_divisor".

For startup cost, it is assumed that Append would start returning
tuples when the child node having the lowest startup cost is done
setting up. So Append startup cost is equal to startup cost of the
child with minimum startup cost.

-------- Scope --------

There are two different code paths where Append path is generated.
1. One is where append rel is generated : Inheritance table, and UNION
ALL clause.
2. Second codepath is in prepunion.c. This gets executed for UNION
(without ALL) and INTERSECT/EXCEPT [ALL]. The patch does not support
Parallel Append in this scenario. It can be later taken up as
extension, once this patch is reviewed.

======= Performance =======

There is a clear benefit in case of ParallelAppend in scenarios where
one or more subplans don't have partial paths, because in such cases,
on HEAD it does not generate Partial Append. For example, the below
query took around 30 secs with the patch
(max_parallel_workers_per_gather should be 3 or more), whereas, it
took 74 secs on HEAD.

explain analyze select avg(aid) from (
select aid from pgbench_accounts_1 inner join bid_tab b using (bid)
UNION ALL
select aid from pgbench_accounts_2 inner join bid_tab using (bid)
UNION ALL
select aid from pgbench_accounts_3 inner join bid_tab using (bid)
) p;

--- With HEAD ---

QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------
Aggregate (cost=6415493.67..6415493.67 rows=1 width=32) (actual
time=74135.821..74135.822 rows=1 loops=1)
-> Append (cost=1541552.36..6390743.54 rows=9900047 width=4)
(actual time=73829.985..74125.048 rows=100000 loops=1)
-> Hash Join (cost=1541552.36..2097249.67 rows=3300039
width=4) (actual time=25758.592..25758.592 rows=0 loops=1)
Hash Cond: (pgbench_accounts_1.bid = b.bid)
-> Seq Scan on pgbench_accounts_1
(cost=0.00..87099.39 rows=3300039 width=8) (actual time=0.118..778.097
rows=3300000 loops=1)
-> Hash (cost=721239.16..721239.16 rows=50000016
width=4) (actual time=24426.433..24426.433 rows=49999902 loops=1)
Buckets: 131072 Batches: 1024 Memory Usage: 2744kB
-> Seq Scan on bid_tab b (cost=0.00..721239.16
rows=50000016 width=4) (actual time=0.105..10112.995 rows=49999902
loops=1)
-> Hash Join (cost=1541552.36..2097249.67 rows=3300039
width=4) (actual time=24063.761..24063.761 rows=0 loops=1)
Hash Cond: (pgbench_accounts_2.bid = bid_tab.bid)
-> Seq Scan on pgbench_accounts_2
(cost=0.00..87099.39 rows=3300039 width=8) (actual time=0.065..779.498
rows=3300000 loops=1)
-> Hash (cost=721239.16..721239.16 rows=50000016
width=4) (actual time=22708.377..22708.377 rows=49999902 loops=1)
Buckets: 131072 Batches: 1024 Memory Usage: 2744kB
-> Seq Scan on bid_tab (cost=0.00..721239.16
rows=50000016 width=4) (actual time=0.024..9513.032 rows=49999902
loops=1)
-> Hash Join (cost=1541552.36..2097243.73 rows=3299969
width=4) (actual time=24007.628..24297.067 rows=100000 loops=1)
Hash Cond: (pgbench_accounts_3.bid = bid_tab_1.bid)
-> Seq Scan on pgbench_accounts_3
(cost=0.00..87098.69 rows=3299969 width=8) (actual time=0.049..782.230
rows=3300000 loops=1)
-> Hash (cost=721239.16..721239.16 rows=50000016
width=4) (actual time=22943.413..22943.413 rows=49999902 loops=1)
Buckets: 131072 Batches: 1024 Memory Usage: 2744kB
-> Seq Scan on bid_tab bid_tab_1
(cost=0.00..721239.16 rows=50000016 width=4) (actual
time=0.022..9601.753 rows=49999902 loops=1)
Planning time: 0.366 ms
Execution time: 74138.043 ms
(22 rows)

--- With Patch ---

QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=2139493.66..2139493.67 rows=1 width=32)
(actual time=29658.825..29658.825 rows=1 loops=1)
-> Gather (cost=2139493.34..2139493.65 rows=3 width=32) (actual
time=29568.957..29658.804 rows=4 loops=1)
Workers Planned: 3
Workers Launched: 3
-> Partial Aggregate (cost=2138493.34..2138493.35 rows=1
width=32) (actual time=22086.324..22086.325 rows=1 loops=4)
-> Parallel Append (cost=0.00..2130243.42
rows=3299969 width=4) (actual time=22008.945..22083.536 rows=25000
loops=4)
-> Hash Join (cost=1541552.36..2097243.73
rows=3299969 width=4) (actual time=29568.605..29568.605 rows=0
loops=1)
Hash Cond: (pgbench_accounts_1.bid = b.bid)
-> Seq Scan on pgbench_accounts_1
(cost=0.00..87098.69 rows=3299969 width=8) (actual time=0.024..841.598
rows=3300000 loops=1)
-> Hash (cost=721239.16..721239.16
rows=50000016 width=4) (actual time=28134.596..28134.596 rows=49999902
loops=1)
Buckets: 131072 Batches: 1024
Memory Usage: 2744kB
-> Seq Scan on bid_tab b
(cost=0.00..721239.16 rows=50000016 width=4) (actual
time=0.076..11598.097 rows=49999902 loops=1)
-> Hash Join (cost=1541552.36..2097243.73
rows=3299969 width=4) (actual time=29127.085..29127.085 rows=0
loops=1)
Hash Cond: (pgbench_accounts_2.bid = bid_tab.bid)
-> Seq Scan on pgbench_accounts_2
(cost=0.00..87098.69 rows=3299969 width=8) (actual time=0.022..837.027
rows=3300000 loops=1)
-> Hash (cost=721239.16..721239.16
rows=50000016 width=4) (actual time=27658.276..27658.276 rows=49999902
loops=1)
-> Seq Scan on bid_tab
(cost=0.00..721239.16 rows=50000016 width=4) (actual
time=0.022..11561.530 rows=49999902 loops=1)
-> Hash Join (cost=1541552.36..2097243.73
rows=3299969 width=4) (actual time=29340.081..29632.180 rows=100000
loops=1)
Hash Cond: (pgbench_accounts_3.bid = bid_tab_1.bid)
-> Seq Scan on pgbench_accounts_3
(cost=0.00..87098.69 rows=3299969 width=8) (actual time=0.027..821.875
rows=3300000 loops=1)
-> Hash (cost=721239.16..721239.16
rows=50000016 width=4) (actual time=28186.009..28186.009 rows=49999902
loops=1)
-> Seq Scan on bid_tab bid_tab_1
(cost=0.00..721239.16 rows=50000016 width=4) (actual
time=0.019..11594.461 rows=49999902 loops=1)
Planning time: 0.493 ms
Execution time: 29662.791 ms
(24 rows)

Thanks to Robert Haas and Rushabh Lathia for their valuable inputs
while working on this feature.

[1] Old mail thread :
https://www.postgresql.org/message-id/flat/9A28C8860F777E439AA12E8AEA7694F80115DEB8%40BPXM15GP(dot)gisp(dot)nec(dot)co(dot)jp#9A28C8860F777E439AA12E8AEA7694F80115DEB8(at)BPXM15GP(dot)gisp(dot)nec(dot)co(dot)jp

Thanks
-Amit Khandekar

Attachment Content-Type Size
ParallelAppend.patch application/octet-stream 41.6 KB
pgbench_create_partition.sql application/octet-stream 1020 bytes

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Ashutosh Bapat 2016-12-23 05:54:23 Re: multi-level partitions and partition-wise joins
Previous Message rafia.sabih 2016-12-23 05:04:35 Re: Parallel Index-only scan