Re: Effect of changing the value for PARALLEL_TUPLE_QUEUE_SIZE

From: Rafia Sabih <rafia(dot)sabih(at)enterprisedb(dot)com>
To: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
Cc: Robert Haas <robertmhaas(at)gmail(dot)com>, Ashutosh Bapat <ashutosh(dot)bapat(at)enterprisedb(dot)com>, PostgreSQL Developers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: Effect of changing the value for PARALLEL_TUPLE_QUEUE_SIZE
Date: 2017-09-06 10:44:45
Message-ID: CAOGQiiNiMhq5Pg3LiYxjfi2B9eAQ_q5YjS=fHiBJmbSOF74aBQ@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Fri, Jun 2, 2017 at 6:31 PM, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
>
> Your reasoning sounds sensible to me. I think the other way to attack
> this problem is that we can maintain some local queue in each of the
> workers when the shared memory queue becomes full. Basically, we can
> extend your "Faster processing at Gather node" patch [1] such that
> instead of fixed sized local queue, we can extend it when the shm
> queue become full. I think that way we can handle both the problems
> (worker won't stall if shm queues are full and workers can do batched
> writes in shm queue to avoid the shm queue communication overhead) in
> a similar way.
>
>
> [1] - https://www.postgresql.org/message-id/CAOGQiiMwhOd5-iKZnizn%2BEdzZmB0bc3xa6rKXQgvhbnQ29zCJg%40mail.gmail.com
>

I worked on this idea of using local queue as a temporary buffer to
write the tuples when master is busy and shared queue is full, and it
gives quite some improvement in the query performance.

Design:
On a basic level, the design of this patch can be explained as
following, similar to shm_mq, there is a new structure local_mq which
is private for each worker. Once shared queue is full, we write the
tuple in local queue. Since, local queue is never shared we do not
need any sort of locking for writing in it, hence writing in local
queue is one cheap operation.

Once local queue is atleast 5% (for this version, I've kept this, but
we might need to modify it) full we copy the data from local to shared
queue. In case both the queues are full, wait till master reads from
shared queue, then copy some data from local to shared queue, till
required space is available, subsequently write the tuple to local
queue. If at any instant local queue becomes empty then we write the
tuple in shared queue itself, provided there is space. At the time of
worker shutdown we copy all the data from local queue to shared queue.

For this version of the patch I have kept the size of local queue =
100 * PARALLEL_TUPLE_QUEUE_SIZE = 6553600, which might not be the best
and I am open to understand the reasons for modifying it. But it is
kept that way for the scenarios where gather/gather-merge node is
slow. And I expect when a master is busy it might be for some long
time or the data to be processed is high and we would not want our
worker to wait for some long time.

Performance:
These experiments are on TPC-H scale factor 20. The patch is giving
around 20-30% performance improvement in queries with selectivity
something around 20-30%.

Head:
Default plan
explain analyse select * from lineitem where l_extendedprice < 50000
and l_orderkey < 15000000;

QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------
Index Scan using idx_lineitem_orderkey on lineitem
(cost=0.57..334367.85 rows=10313822 width=129) (actual
time=0.057..26389.587 rows=10258702 loops=1)
Index Cond: (l_orderkey < 15000000)
Filter: (l_extendedprice < '50000'::numeric)
Rows Removed by Filter: 4737888
Planning time: 1.686 ms
Execution time: 27402.801 ms
(6 rows)

Force parallelism plan
explain analyse select * from lineitem where l_extendedprice < 50000
and l_orderkey < 15000000;

QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------
Gather (cost=0.57..193789.78 rows=10313822 width=129) (actual
time=0.354..41153.916 rows=10258702 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Parallel Index Scan using idx_lineitem_orderkey on lineitem
(cost=0.57..193789.78 rows=2578456 width=129) (actual
time=0.062..6530.167 rows=2051740 loops=5)
Index Cond: (l_orderkey < 15000000)
Filter: (l_extendedprice < '50000'::numeric)
Rows Removed by Filter: 947578
Planning time: 0.383 ms
Execution time: 42027.645 ms
(9 rows)

Patch:
Force parallelism plan

explain analyse select * from lineitem where l_extendedprice < 50000
and l_orderkey < 15000000;

QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------
Gather (cost=0.57..193789.78 rows=10313822 width=129) (actual
time=0.413..16690.294 rows=10258702 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Parallel Index Scan using idx_lineitem_orderkey on lineitem
(cost=0.57..193789.78 rows=2578456 width=129) (actual
time=0.047..6185.527 rows=2051740 loops=5)
Index Cond: (l_orderkey < 15000000)
Filter: (l_extendedprice < '50000'::numeric)
Rows Removed by Filter: 947578
Planning time: 0.406 ms
Execution time: 17616.750 ms
(9 rows)

Head:
Default plan
explain analyse select * from lineitem where l_extendedprice < 50000
and l_orderkey < 30000000;

QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------
Index Scan using idx_lineitem_orderkey on lineitem
(cost=0.57..684102.33 rows=21101661 width=129) (actual
time=0.131..55532.251 rows=20519918 loops=1)
Index Cond: (l_orderkey < 30000000)
Filter: (l_extendedprice < '50000'::numeric)
Rows Removed by Filter: 9479875
Planning time: 0.318 ms
Execution time: 57436.251 ms
(6 rows)

Force parallelism plan
explain analyse select * from lineitem where l_extendedprice < 50000
and l_orderkey < 30000000;

QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
Gather (cost=0.57..396485.31 rows=21101661 width=129) (actual
time=0.557..69190.640 rows=20519918 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Parallel Index Scan using idx_lineitem_orderkey on lineitem
(cost=0.57..396485.31 rows=5275415 width=129) (actual
time=0.106..12797.711 rows=4103984 loops=5)
Index Cond: (l_orderkey < 30000000)
Filter: (l_extendedprice < '50000'::numeric)
Rows Removed by Filter: 1895975
Planning time: 0.393 ms
Execution time: 70924.801 ms
(9 rows)

Patch:
Force parallelism plan:
explain analyse select * from lineitem where l_extendedprice < 50000
and l_orderkey < 30000000;

QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
Gather (cost=0.57..396485.31 rows=21101661 width=129) (actual
time=0.424..31677.524 rows=20519918 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Parallel Index Scan using idx_lineitem_orderkey on lineitem
(cost=0.57..396485.31 rows=5275415 width=129) (actual
time=0.075..12811.910 rows=4103984 loops=5)
Index Cond: (l_orderkey < 30000000)
Filter: (l_extendedprice < '50000'::numeric)
Rows Removed by Filter: 1895975
Planning time: 0.462 ms
Execution time: 33440.322 ms
(9 rows)

Head:
Default plan
explain analyse select * from lineitem where l_extendedprice < 50000
and l_orderkey < 60000000;

QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------
Index Scan using idx_lineitem_orderkey on lineitem
(cost=0.57..1337265.07 rows=41248987 width=129) (actual
time=0.070..107944.729 rows=41035759 loops=1)
Index Cond: (l_orderkey < 60000000)
Filter: (l_extendedprice < '50000'::numeric)
Rows Removed by Filter: 18950286
Planning time: 2.021 ms
Execution time: 111963.420 ms
(6 rows)

Force parallelism plan
explain analyse select * from lineitem where l_extendedprice < 50000
and l_orderkey < 60000000;
QUERY
PLAN
-----------------------------------------------------------------------------------------------------------------------------------------
Gather (cost=0.00..692896.08 rows=41248987 width=129) (actual
time=0.354..141432.886 rows=41035759 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Parallel Seq Scan on lineitem (cost=0.00..692896.08
rows=10312247 width=129) (actual time=0.029..31678.105 rows=8207152
loops=5)
Filter: ((l_extendedprice < '50000'::numeric) AND (l_orderkey
< 60000000))
Rows Removed by Filter: 15791770
Planning time: 1.883 ms
Execution time: 144859.515 ms
(8 rows)

Patch:
Force parallelism plan
explain analyse select * from lineitem where l_extendedprice < 50000
and l_orderkey < 60000000;
QUERY
PLAN
-----------------------------------------------------------------------------------------------------------------------------------------
Gather (cost=0.00..692896.08 rows=41248987 width=129) (actual
time=0.350..78312.666 rows=41035759 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Parallel Seq Scan on lineitem (cost=0.00..692896.08
rows=10312247 width=129) (actual time=0.027..31867.170 rows=8207152
loops=5)
Filter: ((l_extendedprice < '50000'::numeric) AND (l_orderkey
< 60000000))
Rows Removed by Filter: 15791770
Planning time: 0.439 ms
Execution time: 82057.225 ms
(8 rows)

Apart from these, Q12 from the benchmark queries shows good
improvement with this patch.

Head:
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Limit (cost=1001.19..426457.34 rows=1 width=27) (actual
time=42770.491..42770.491 rows=1 loops=1)
-> GroupAggregate (cost=1001.19..2979194.24 rows=7 width=27)
(actual time=42770.489..42770.489 rows=1 loops=1)
Group Key: lineitem.l_shipmode
-> Gather Merge (cost=1001.19..2969127.63 rows=575231
width=27) (actual time=11.355..42224.843 rows=311095 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Nested Loop (cost=1.13..2899612.01 rows=143808
width=27) (actual time=0.346..10385.472 rows=62906 loops=5)
-> Parallel Index Scan using idx_l_shipmode on
lineitem (cost=0.57..2796168.46 rows=143808 width=19) (actual
time=0.280..9004.095 rows=62906 loops=5)
Index Cond: (l_shipmode = ANY ('{"REG
AIR",RAIL}'::bpchar[]))
Filter: ((l_commitdate < l_receiptdate) AND
(l_shipdate < l_commitdate) AND (l_receiptdate >= '1995-01-01'::date)
AND (l_receiptdate < '1996-01-01 00:00:00'::timestamp without time
zone))
Rows Removed by Filter: 3402367
-> Index Scan using orders_pkey on orders
(cost=0.56..0.72 rows=1 width=20) (actual time=0.020..0.020 rows=1
loops=314530)
Index Cond: (o_orderkey = lineitem.l_orderkey)
Planning time: 1.202 ms
Execution time: 42841.895 ms
(15 rows)

Patch:
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Limit (cost=1001.19..426457.34 rows=1 width=27) (actual
time=19461.653..19461.654 rows=1 loops=1)
-> GroupAggregate (cost=1001.19..2979194.24 rows=7 width=27)
(actual time=19461.651..19461.651 rows=1 loops=1)
Group Key: lineitem.l_shipmode
-> Gather Merge (cost=1001.19..2969127.63 rows=575231
width=27) (actual time=10.239..18783.386 rows=311095 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Nested Loop (cost=1.13..2899612.01 rows=143808
width=27) (actual time=0.376..19109.107 rows=66104 loops=5)
-> Parallel Index Scan using idx_l_shipmode on
lineitem (cost=0.57..2796168.46 rows=143808 width=19) (actual
time=0.310..16615.236 rows=66104 loops=5)
Index Cond: (l_shipmode = ANY ('{"REG
AIR",RAIL}'::bpchar[]))
Filter: ((l_commitdate < l_receiptdate) AND
(l_shipdate < l_commitdate) AND (l_receiptdate >= '1995-01-01'::date)
AND (l_receiptdate < '1996-01-01 00:00:00'::timestamp with out time
zone))
Rows Removed by Filter: 3574492
-> Index Scan using orders_pkey on orders
(cost=0.56..0.72 rows=1 width=20) (actual time=0.034..0.034 rows=1
loops=330519)
Index Cond: (o_orderkey = lineitem.l_orderkey)
Planning time: 3.498 ms
Execution time: 19661.054 ms
(15 rows)

This suggests that with such an idea the range of selectivity for
using parallelism can be extended for improving the performance of the
queries.

Credits:
Would like to extend thanks to my colleagues Dilip Kumar, Amit Kapila,
and Robert Haas for their discussions and words of encouragement
throughout the development of this patch.

Feedback and suggestions are welcome.

--
Regards,
Rafia Sabih
EnterpriseDB: http://www.enterprisedb.com/

Attachment Content-Type Size
faster_gather_v1.patch application/octet-stream 15.5 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Thomas Munro 2017-09-06 10:56:45 Re: [PATCH] Pageinspect - add functions on GIN and GiST indexes from gevel
Previous Message Daniel Gustafsson 2017-09-06 10:43:03 Re: Copyright in partition.h and partition.c