From: | Andy Fan <zhihuifan1213(at)163(dot)com> |
---|---|
To: | Postgres hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>,Robert Haas <robertmhaas(at)gmail(dot)com> |
Subject: | parallel safety of correlated subquery (was: parallel_safe) |
Date: | 2025-07-02 07:02:52 |
Message-ID: | 871pqzm5wj.fsf@163.com |
Views: | Whole Thread | Raw Message | Download mbox | Resend email |
Thread: | |
Lists: | pgsql-hackers |
Andy Fan <zhihuifan1213(at)163(dot)com> writes:
Hi,
After some coding with this subject, I think it is better redefining
the problem and solution.
Problem:
--------
Supplan is common to be ineffective *AND* recently I find it is hard to
work with parallel framework. e.g.
create table bigt (a int, b int, c int);
insert into bigt select i, i, i from generate_series(1, 1000000)i;
analyze bigt;
q1:
select * from bigt o where b = 1
and c > (select avg(c) from bigt i where c = o.c);
We get plan:
QUERY PLAN
-------------------------------------------
Seq Scan on bigt o
Filter: ((b = 1) AND (c > (SubPlan 1)))
SubPlan 1
-> Aggregate
-> Seq Scan on bigt i
Filter: (c = o.c)
(6 rows)
Here we can see there is no parallel at all. However if split the query
q1 into queries q2 and q3, both of them can be parallelized.
q2:
explain (costs off) select * from bigt o where b = 1 and c > 2;
QUERY PLAN
---------------------------------------
Gather
Workers Planned: 2
-> Parallel Seq Scan on bigt o
Filter: ((c > 2) AND (b = 1))
(4 rows)
q3:
explain (costs off) select avg(c) from bigt o where c = 2;
QUERY PLAN
-----------------------------------------
Aggregate
-> Gather
Workers Planned: 2
-> Parallel Seq Scan on bigt o
Filter: (c = 2)
(5 rows)
Analysis
--------
The major reason of q1 can't be paralleled is the subplan is parameterized.
the comment from add_partial_path:
* We don't generate parameterized partial paths for several reasons. Most
* importantly, they're not safe to execute, because there's nothing to
* make sure that a parallel scan within the parameterized portion of the
* plan is running with the same value in every worker at the same time.
the comment from max_parallel_hazard_walker:
* We can't pass Params to workers at the moment either .. unless
* they are listed in safe_param_ids, meaning they could be
* either generated within workers or can be computed by the leader and
* then their value can be passed to workers.
Solutions
----------
two foundations for this solution in my mind:
1. It is not safe to execute a partial parameterized plan with different
parameter value, as what we have well done and documented. But this
doesn't apply to a parameterized completed plan, in this case each
worker runs a completed plan, they always generate the same result
no matter it runs in parallel worker or leader.
2. The subplan never be a partial Plan. in make_subplan:
best_path = get_cheapest_fractional_path(final_rel, tuple_fraction);
plan = create_plan(subroot, best_path);
/* And convert to SubPlan or InitPlan format. */
result = build_subplan(root, plan, best_path,
subroot, plan_params,
subLinkType, subLinkId,
testexpr, NIL, isTopQual);
get_cheapest_fractional_path never read rel->partial_pathlist.
So I think it is safe to ignore the PARAM_EXEC check in
max_parallel_hazard_context.safe_param_ids) for subplan. See attached
patch 1.
Benefit:
--------
After this patch, we could get the below plan -- the correlated subplan
is parallelized.
explain (costs off) select * from bigt o where b = 1
and c > (select avg(c) from bigt i where c = o.c);
QUERY PLAN
------------------------------------------------------
Seq Scan on bigt o
Filter: ((b = 1) AND ((c)::numeric > (SubPlan 1)))
SubPlan 1
-> Aggregate
-> Gather
Workers Planned: 2
-> Parallel Seq Scan on bigt i
Filter: (c = o.c)
(8 rows)
Continue the test to prove the impact of this patch by removing the
"Gather" in SubPlan, we could get the below plan -- scan with
parallel-safe SubPlan is parallelized.
create table t (a int, b int);
explain (costs off) select * from bigt o where b = 1
and c > (select avg(a) from t i where b = o.c);
QUERY PLAN
------------------------------------------------------------
Gather
Workers Planned: 2
-> Parallel Seq Scan on bigt o
Filter: ((b = 1) AND ((c)::numeric > (SubPlan 1)))
SubPlan 1
-> Aggregate
-> Seq Scan on t i
Filter: (b = o.c)
(8 rows)
incremental_sort.sql provides another impacts of this patch. It is
helpful for parallel sort.
Query:
select distinct
unique1,
(select t.unique1 from tenk1 where tenk1.unique1 = t.unique1)
from tenk1 t, generate_series(1, 1000);
From (master)
QUERY PLAN
----------------------------------------------------------------------------------------
Unique
Output: t.unique1, ((SubPlan 1))
-> Sort
Output: t.unique1, ((SubPlan 1))
Sort Key: t.unique1, ((SubPlan 1))
-> Gather
Output: t.unique1, (SubPlan 1)
Workers Planned: 2
-> Nested Loop
Output: t.unique1
-> Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t
Output: t.unique1
-> Function Scan on pg_catalog.generate_series
Output: generate_series.generate_series
Function Call: generate_series(1, 1000)
SubPlan 1
-> Index Only Scan using tenk1_unique1 on public.tenk1
Output: t.unique1
Index Cond: (tenk1.unique1 = t.unique1)
(19 rows)
To (patched)
QUERY PLAN
----------------------------------------------------------------------------------------------
Unique
Output: t.unique1, ((SubPlan 1))
-> Gather Merge * Merge gather at last *
Output: t.unique1, ((SubPlan 1))
Workers Planned: 2
-> Unique
Output: t.unique1, ((SubPlan 1))
-> Sort ** Sort In worker *
Output: t.unique1, ((SubPlan 1))
Sort Key: t.unique1, ((SubPlan 1))
-> Nested Loop
*SubPlan in Worker.**
Output: t.unique1, (SubPlan 1)
-> Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t
Output: t.unique1
-> Function Scan on pg_catalog.generate_series
Output: generate_series.generate_series
Function Call: generate_series(1, 1000)
SubPlan 1
-> Index Only Scan using tenk1_unique1 on public.tenk1
Output: t.unique1
Index Cond: (tenk1.unique1 = t.unique1)
(21 rows)
The execution time for the above query also decreased from 13351.928 ms
to 4814.043 ms, by 64%. The major difference is:
(1) master: correlated subquery is parallel unsafe, so it runs in leader
only, and then sort.
(2) patched: correlated subquery is parallel safe, so it run in worker
(Nested Loop) and then *sort in parallel worker* and then run "merge
gather".
About the implementation, I know 2 issues at least (the state is PoC
now).
1. Query.is_in_sublink should be set in parser and keep unchanged later.
2. The below comment increment_sort.sql should be changed, it is just
conflicted with this patch.
"""
-- Parallel sort but with expression (correlated subquery) that
-- is prohibited in parallel plans.
"""
Hope I have made myself clear, any feedback is welcome!
--
Best Regards
Andy Fan
Attachment | Content-Type | Size |
---|---|---|
v0-0001-Revisit-Subplan-s-parallel-safety.patch | text/x-diff | 9.8 KB |
From | Date | Subject | |
---|---|---|---|
Next Message | shveta malik | 2025-07-02 07:05:59 | Re: Using failover slots for PG-non_PG logical replication |
Previous Message | John Naylor | 2025-07-02 07:01:13 | cpluspluscheck vs ICU again |