Re: Re: parallel distinct union and aggregate support patch

From: "bucoo(at)sohu(dot)com" <bucoo(at)sohu(dot)com>
To: pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>
Cc: tgl <tgl(at)sss(dot)pgh(dot)pa(dot)us>, "Dilip Kumar" <dilipbalaut(at)gmail(dot)com>, "Thomas Munro" <thomas(dot)munro(at)gmail(dot)com>, "Tomas Vondra" <tomas(dot)vondra(at)2ndquadrant(dot)com>, hlinnaka <hlinnaka(at)iki(dot)fi>, robertmhaas <robertmhaas(at)gmail(dot)com>, pgsql <pgsql(at)j-davis(dot)com>, "David Steele" <david(at)pgmasters(dot)net>, "David Rowley" <dgrowleyml(at)gmail(dot)com>
Subject: Re: Re: parallel distinct union and aggregate support patch
Date: 2021-09-15 10:34:01
Message-ID: 2021091517250848215321@sohu.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

That are busy days, sorry patchs too later.
Here is an unbuffered plan Redistribute for parallel aggregate/distinct/union,
like this(when new GUC param redistribute_query_size large then 0):
Gather
-> Finalize HashAggregate
-> Parallel Redistribute
-> Partial HashAggregate
-> Parallel Seq Scan on test
0001-xxx.patch:
Fix cost_subqueryscan() get wrong parallel cost, it always same as none parallel path.
If not apply this patch parallel union always can't be choose.

How Redistribute work:
Each have N*MQ + 1*SharedTuplestore, N is parallel workers number(include leader).
1. Alloc shared memory for Redistribute(using plan parallel worker number).
2. Leader worker after all parallel workers launched change "final_worker_num" to launched workers number.
3. Each worker try to get a unique part number. part number count is "final_worker_num".
4. If get a invalid part number return null tuple.
5. Try read tuple from MQ, if get a tuple then return it, else goto next step.
6-0. Get tuple from outer, if get a tuple compute mod as "hash value % final_worker_num", else goto step 7.
6-1. If mod equal our part number then return this tuple.
6-2. Use mod get part's MQ and try write tuple to the MQ, if write success got step 6-0.
6-3. Write tuple to part's SharedTuplestore.
7. Read tuple from MQ, if get a tuple then return it, else close all opend MQ and goto next step.
8. Read tuple from SharedTuplestore, if get a tuple then return it, else close it and goto next step.
9. Try get next unique part number, if get an invalid part number then return null tuple, else goto step 7.

In step "6-2" we can't use shm_mq_send() function, because it maybe write partial data,
if this happend we must write remaining data to this MQ, so we must wait other worker read some date from this MQ.
However, we do't want to wait(this may cause all worker to wait for each other).
So, I write a new function named shm_mq_send_once(). It like shm_mq_send, but return would block immediately when
no space for write data and "do not write any data" to MQ.
This will cause a problem, when MQ ring size small then tuple size, it never write to MQ(write to SharedTuplestore).
So it's best to make sure that MQ has enough space for tuple(change GUC param "redistribute_query_size").

Execute comparison
prepare data:
begin;
create table gtest(id integer, txt text);
insert into gtest select t1.id,'txt'||t1.id from (select generate_series(1,10*1000*1000) id) t1,(select generate_series(1,10) id) t2;
analyze gtest;
commit;
set max_parallel_workers_per_gather=8;
set work_mem = '256MB';

hash aggregate
explain (verbose,analyze,costs off)
select sum(id),txt from gtest group by txt;
QUERY PLAN
---------------------------------------------------------------------------------------------------------
Finalize HashAggregate (actual time=11733.519..19075.309 rows=10000000 loops=1)
Output: sum(id), txt
Group Key: gtest.txt
Batches: 21 Memory Usage: 262201kB Disk Usage: 359808kB
-> Gather (actual time=5540.052..8029.550 rows=10000056 loops=1)
Output: txt, (PARTIAL sum(id))
Workers Planned: 6
Workers Launched: 6
-> Partial HashAggregate (actual time=5534.690..5914.643 rows=1428579 loops=7)
Output: txt, PARTIAL sum(id)
Group Key: gtest.txt
Batches: 1 Memory Usage: 188433kB
Worker 0: actual time=5533.956..5913.461 rows=1443740 loops=1
Batches: 1 Memory Usage: 188433kB
Worker 1: actual time=5533.552..5913.595 rows=1400439 loops=1
Batches: 1 Memory Usage: 188433kB
Worker 2: actual time=5533.553..5913.357 rows=1451759 loops=1
Batches: 1 Memory Usage: 188433kB
Worker 3: actual time=5533.834..5907.952 rows=1379830 loops=1
Batches: 1 Memory Usage: 180241kB
Worker 4: actual time=5533.782..5912.408 rows=1428060 loops=1
Batches: 1 Memory Usage: 188433kB
Worker 5: actual time=5534.271..5910.458 rows=1426987 loops=1
Batches: 1 Memory Usage: 188433kB
-> Parallel Seq Scan on public.gtest (actual time=0.022..1523.231 rows=14285714 loops=7)
Output: id, txt
Worker 0: actual time=0.032..1487.403 rows=14437315 loops=1
Worker 1: actual time=0.016..1635.675 rows=14004315 loops=1
Worker 2: actual time=0.015..1482.005 rows=14517505 loops=1
Worker 3: actual time=0.017..1664.469 rows=13798225 loops=1
Worker 4: actual time=0.018..1471.233 rows=14280520 loops=1
Worker 5: actual time=0.030..1463.973 rows=14269790 loops=1
Planning Time: 0.075 ms
Execution Time: 19575.976 ms

parallel hash aggregate
set redistribute_query_size = '256kB';
explain (verbose,analyze,costs off)
select sum(id),txt from gtest group by txt;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------
Gather (actual time=9710.061..11372.560 rows=10000000 loops=1)
Output: (sum(id)), txt
Workers Planned: 6
Workers Launched: 6
-> Finalize HashAggregate (actual time=9703.098..10082.575 rows=1428571 loops=7)
Output: sum(id), txt
Group Key: gtest.txt
Batches: 1 Memory Usage: 188433kB
Worker 0: actual time=9701.365..10077.995 rows=1428857 loops=1
Batches: 1 Memory Usage: 188433kB
Worker 1: actual time=9701.415..10095.876 rows=1430065 loops=1
Batches: 1 Memory Usage: 188433kB
Worker 2: actual time=9701.315..10077.635 rows=1425811 loops=1
Batches: 1 Memory Usage: 188433kB
Worker 3: actual time=9703.047..10088.985 rows=1427745 loops=1
Batches: 1 Memory Usage: 188433kB
Worker 4: actual time=9703.166..10077.937 rows=1431644 loops=1
Batches: 1 Memory Usage: 188433kB
Worker 5: actual time=9701.809..10076.922 rows=1426156 loops=1
Batches: 1 Memory Usage: 188433kB
-> Parallel Redistribute (actual time=5593.440..9036.392 rows=1428579 loops=7)
Output: txt, (PARTIAL sum(id))
Hash Key: gtest.txt
Parts: 1 Disk Usage: 0kB Disk Rows: 0
Worker 0: actual time=5591.812..9036.394 rows=1428865 loops=1
Parts: 1 Disk Usage: 0kB Disk Rows: 0
Worker 1: actual time=5591.773..9002.576 rows=1430072 loops=1
Parts: 1 Disk Usage: 0kB Disk Rows: 0
Worker 2: actual time=5591.774..9039.341 rows=1425817 loops=1
Parts: 1 Disk Usage: 0kB Disk Rows: 0
Worker 3: actual time=5593.635..9040.148 rows=1427753 loops=1
Parts: 1 Disk Usage: 0kB Disk Rows: 0
Worker 4: actual time=5593.565..9044.528 rows=1431652 loops=1
Parts: 1 Disk Usage: 0kB Disk Rows: 0
Worker 5: actual time=5592.220..9043.953 rows=1426167 loops=1
Parts: 1 Disk Usage: 0kB Disk Rows: 0
-> Partial HashAggregate (actual time=5566.237..5990.671 rows=1428579 loops=7)
Output: txt, PARTIAL sum(id)
Group Key: gtest.txt
Batches: 1 Memory Usage: 188433kB
Worker 0: actual time=5565.941..5997.635 rows=1449687 loops=1
Batches: 1 Memory Usage: 188433kB
Worker 1: actual time=5565.930..6073.977 rows=1400013 loops=1
Batches: 1 Memory Usage: 188433kB
Worker 2: actual time=5565.945..5975.454 rows=1446727 loops=1
Batches: 1 Memory Usage: 188433kB
Worker 3: actual time=5567.673..5981.978 rows=1396379 loops=1
Batches: 1 Memory Usage: 180241kB
Worker 4: actual time=5567.622..5972.500 rows=1415832 loops=1
Batches: 1 Memory Usage: 188433kB
Worker 5: actual time=5566.148..5962.503 rows=1415665 loops=1
Batches: 1 Memory Usage: 188433kB
-> Parallel Seq Scan on public.gtest (actual time=0.022..1520.647 rows=14285714 loops=7)
Output: id, txt
Worker 0: actual time=0.021..1476.653 rows=14496785 loops=1
Worker 1: actual time=0.020..1519.023 rows=14000060 loops=1
Worker 2: actual time=0.020..1476.707 rows=14467185 loops=1
Worker 3: actual time=0.019..1654.088 rows=13963715 loops=1
Worker 4: actual time=0.027..1527.803 rows=14158235 loops=1
Worker 5: actual time=0.030..1514.247 rows=14156570 loops=1
Planning Time: 0.080 ms
Execution Time: 11830.773 ms

Attachment Content-Type Size
0001-fix-cost_subqueryscan-get-worning-parallel-cost.patch application/octet-stream 2.6 KB
0002-parallel-redistribute-plan.patch application/octet-stream 96.1 KB

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message Amul Sul 2021-09-15 10:49:17 Re: [Patch] ALTER SYSTEM READ ONLY
Previous Message Marcos Pegoraro 2021-09-15 10:28:37 Trigger position