From 374f3300523ab351dcccd59eaafb5f311be96a5c Mon Sep 17 00:00:00 2001 From: Alexander Korotkov Date: Sun, 5 Apr 2026 04:47:10 +0300 Subject: [PATCH v19 6/6] MergeAppend should support Async Foreign Scan subplans This commit makes the MergeAppend node async-capable, similar to the existing async support for Append nodes. When the planner chooses MergeAppend for partitioned tables with foreign partitions, asynchronous execution is now possible. The primary benefit is during the initial heap fill: all async subplans are kicked off concurrently, so their first tuples are fetched in parallel rather than sequentially. In steady state, however, the heap merge algorithm needs the next tuple from one specific subplan (the heap top), so execution at that point is effectively synchronous - we block until that particular subplan delivers its result. A new GUC enable_async_merge_append controls this feature (default on). A new wait event MERGE_APPEND_READY is added (separate from APPEND_READY) so that monitoring tools can distinguish the two node types. The postgres_fdw is updated to work generically with both Append and MergeAppend requestors by casting to the shared AppendBaseState type. Discussion: https://postgr.es/m/59be194c5a409fb9fc9f2031581b8a44%40postgrespro.ru Author: Alexander Pyhalov Reviewed-by: Matheus Alcantara Reviewed-by: Alena Rybakina --- .../postgres_fdw/expected/postgres_fdw.out | 457 ++++++++++++++++++ contrib/postgres_fdw/postgres_fdw.c | 12 +- contrib/postgres_fdw/sql/postgres_fdw.sql | 137 ++++++ doc/src/sgml/config.sgml | 14 + src/backend/executor/execAppend.c | 4 +- src/backend/executor/execAsync.c | 4 + src/backend/executor/nodeMergeAppend.c | 216 ++++++++- src/backend/optimizer/path/costsize.c | 1 + src/backend/optimizer/plan/createplan.c | 17 + .../utils/activity/wait_event_names.txt | 1 + src/backend/utils/misc/guc_parameters.dat | 8 + src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/executor/nodeMergeAppend.h | 1 + src/include/nodes/execnodes.h | 3 + src/include/optimizer/cost.h | 1 + src/test/regress/expected/sysviews.out | 3 +- 16 files changed, 874 insertions(+), 6 deletions(-) diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 55e49b7d67f..5868b4c76cc 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -11718,12 +11718,83 @@ SELECT * FROM result_tbl ORDER BY a; (2 rows) DELETE FROM result_tbl; +-- Test Merge Append +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------ + Merge Append + Sort Key: async_pt.b, async_pt.a + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: async_pt_1.a, async_pt_1.b, async_pt_1.c + Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE (((b % 100) = 0)) ORDER BY b ASC NULLS LAST, a ASC NULLS LAST + -> Async Foreign Scan on public.async_p2 async_pt_2 + Output: async_pt_2.a, async_pt_2.b, async_pt_2.c + Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE (((b % 100) = 0)) ORDER BY b ASC NULLS LAST, a ASC NULLS LAST +(8 rows) + +SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a; + a | b | c +------+-----+------ + 1000 | 0 | 0000 + 2000 | 0 | 0000 + 1100 | 100 | 0100 + 2100 | 100 | 0100 + 1200 | 200 | 0200 + 2200 | 200 | 0200 + 1300 | 300 | 0300 + 2300 | 300 | 0300 + 1400 | 400 | 0400 + 2400 | 400 | 0400 + 1500 | 500 | 0500 + 2500 | 500 | 0500 + 1600 | 600 | 0600 + 2600 | 600 | 0600 + 1700 | 700 | 0700 + 2700 | 700 | 0700 + 1800 | 800 | 0800 + 2800 | 800 | 0800 + 1900 | 900 | 0900 + 2900 | 900 | 0900 +(20 rows) + +-- Test that steady-state execution of an async Merge Append re-fetches from +-- an async subplan instead of just consuming its initial batch and treating +-- exhaustion of that batch as end-of-data. Each partition here has 200 +-- matching rows, more than the FDW's default fetch_size (100), so getting +-- the right count requires more than one fetch from at least one partition +-- after the initial heap fill. The LIMIT keeps the ORDER BY (and therefore +-- the Merge Append) from being optimized away in the subquery. +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM async_pt ORDER BY b, a; + QUERY PLAN +------------------------------------------------------------------------------------------------------ + Merge Append + Sort Key: async_pt.b, async_pt.a + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: async_pt_1.a, async_pt_1.b, async_pt_1.c + Remote SQL: SELECT a, b, c FROM public.base_tbl1 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST + -> Async Foreign Scan on public.async_p2 async_pt_2 + Output: async_pt_2.a, async_pt_2.b, async_pt_2.c + Remote SQL: SELECT a, b, c FROM public.base_tbl2 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST +(8 rows) + +SELECT count(*) FROM (SELECT * FROM async_pt ORDER BY b, a LIMIT 100000) ss; + count +------- + 400 +(1 row) + -- Test error handling, if accessing one of the foreign partitions errors out CREATE FOREIGN TABLE async_p_broken PARTITION OF async_pt FOR VALUES FROM (10000) TO (10001) SERVER loopback OPTIONS (table_name 'non_existent_table'); SELECT * FROM async_pt; ERROR: relation "public.non_existent_table" does not exist CONTEXT: remote SQL command: SELECT a, b, c FROM public.non_existent_table +-- Test error handling for async Merge Append +SELECT * FROM async_pt ORDER BY b, a; +ERROR: relation "public.non_existent_table" does not exist +CONTEXT: remote SQL command: SELECT a, b, c FROM public.non_existent_table ORDER BY b ASC NULLS LAST, a ASC NULLS LAST DROP FOREIGN TABLE async_p_broken; -- Check case where multiple partitions use the same connection CREATE TABLE base_tbl3 (a int, b int, c text); @@ -11782,6 +11853,153 @@ COPY async_pt TO stdout; --error ERROR: cannot copy from foreign table "async_p1" DETAIL: Partition "async_p1" is a foreign table in partitioned table "async_pt" HINT: Try the COPY (SELECT ...) TO variant. +-- Test Merge Append +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a; + QUERY PLAN +------------------------------------------------------------------------------------------------------ + Merge Append + Sort Key: async_pt.b, async_pt.a + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: async_pt_1.a, async_pt_1.b, async_pt_1.c + Filter: (async_pt_1.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl1 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST + -> Async Foreign Scan on public.async_p2 async_pt_2 + Output: async_pt_2.a, async_pt_2.b, async_pt_2.c + Filter: (async_pt_2.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl2 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST + -> Async Foreign Scan on public.async_p3 async_pt_3 + Output: async_pt_3.a, async_pt_3.b, async_pt_3.c + Filter: (async_pt_3.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl3 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST +(14 rows) + +SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a; + a | b | c +------+-----+------ + 1505 | 505 | 0505 + 2505 | 505 | 0505 + 3505 | 505 | 0505 +(3 rows) + +-- Test async Merge Append rescan +EXPLAIN (VERBOSE, COSTS OFF) +SELECT + ARRAY(SELECT f.i FROM (SELECT b + g.i FROM async_pt WHERE a > g.i ORDER BY b) f(i) ORDER BY f.i LIMIT 10) +FROM generate_series(1, 3) g(i); + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------- + Function Scan on pg_catalog.generate_series g + Output: ARRAY(SubPlan array_1) + Function Call: generate_series(1, 3) + SubPlan array_1 + -> Limit + Output: f.i + -> Sort + Output: f.i + Sort Key: f.i + -> Subquery Scan on f + Output: f.i + -> Merge Append + Sort Key: async_pt.b + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: (async_pt_1.b + g.i), async_pt_1.b + Remote SQL: SELECT b FROM public.base_tbl1 WHERE ((a > $1::integer)) ORDER BY b ASC NULLS LAST + -> Async Foreign Scan on public.async_p2 async_pt_2 + Output: (async_pt_2.b + g.i), async_pt_2.b + Remote SQL: SELECT b FROM public.base_tbl2 WHERE ((a > $1::integer)) ORDER BY b ASC NULLS LAST + -> Async Foreign Scan on public.async_p3 async_pt_3 + Output: (async_pt_3.b + g.i), async_pt_3.b + Remote SQL: SELECT b FROM public.base_tbl3 WHERE ((a > $1::integer)) ORDER BY b ASC NULLS LAST +(22 rows) + +SELECT + ARRAY(SELECT f.i FROM (SELECT b + g.i FROM async_pt WHERE a > g.i ORDER BY b) f(i) ORDER BY f.i LIMIT 10) +FROM generate_series(1, 3) g(i); + array +--------------------------- + {1,1,1,6,6,6,11,11,11,16} + {2,2,2,7,7,7,12,12,12,17} + {3,3,3,8,8,8,13,13,13,18} +(3 rows) + +-- Test async Merge Append rescan when a LIMIT stops the scan before every +-- async subplan's request has completed, so on the next rescan (for the +-- next outer row) some subplan's request is still in flight. +EXPLAIN (VERBOSE, COSTS OFF) +SELECT g.i, s.b FROM generate_series(1, 3) g(i), + LATERAL (SELECT b FROM async_pt WHERE a > g.i ORDER BY b LIMIT 1) s; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------- + Nested Loop + Output: g.i, async_pt.b + -> Function Scan on pg_catalog.generate_series g + Output: g.i + Function Call: generate_series(1, 3) + -> Limit + Output: async_pt.b + -> Merge Append + Sort Key: async_pt.b + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: async_pt_1.b + Remote SQL: SELECT b FROM public.base_tbl1 WHERE ((a > $1::integer)) ORDER BY b ASC NULLS LAST + -> Async Foreign Scan on public.async_p2 async_pt_2 + Output: async_pt_2.b + Remote SQL: SELECT b FROM public.base_tbl2 WHERE ((a > $1::integer)) ORDER BY b ASC NULLS LAST + -> Async Foreign Scan on public.async_p3 async_pt_3 + Output: async_pt_3.b + Remote SQL: SELECT b FROM public.base_tbl3 WHERE ((a > $1::integer)) ORDER BY b ASC NULLS LAST +(18 rows) + +SELECT g.i, s.b FROM generate_series(1, 3) g(i), + LATERAL (SELECT b FROM async_pt WHERE a > g.i ORDER BY b LIMIT 1) s; + i | b +---+--- + 1 | 0 + 2 | 0 + 3 | 0 +(3 rows) + +-- Test async Merge Append rescan when a LIMIT stops the scan before an +-- async subplan's already-delivered result was ever consumed (because it +-- didn't win the race to the heap top). The next rescan (for the next +-- outer row) must not mistake that leftover result for a fresh one; the +-- expected result increases each row, so a stale reused value would show +-- up as the first row's value repeating instead. +EXPLAIN (VERBOSE, COSTS OFF) +SELECT g.i, s.b FROM generate_series(1, 3) g(i), + LATERAL (SELECT b FROM async_pt WHERE a > g.i AND b > g.i * 100 ORDER BY b LIMIT 1) s; + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------- + Nested Loop + Output: g.i, async_pt.b + -> Function Scan on pg_catalog.generate_series g + Output: g.i + Function Call: generate_series(1, 3) + -> Limit + Output: async_pt.b + -> Merge Append + Sort Key: async_pt.b + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: async_pt_1.b + Remote SQL: SELECT b FROM public.base_tbl1 WHERE ((a > $1::integer)) AND ((b > ($1::integer * 100))) ORDER BY b ASC NULLS LAST + -> Async Foreign Scan on public.async_p2 async_pt_2 + Output: async_pt_2.b + Remote SQL: SELECT b FROM public.base_tbl2 WHERE ((a > $1::integer)) AND ((b > ($1::integer * 100))) ORDER BY b ASC NULLS LAST + -> Async Foreign Scan on public.async_p3 async_pt_3 + Output: async_pt_3.b + Remote SQL: SELECT b FROM public.base_tbl3 WHERE ((a > $1::integer)) AND ((b > ($1::integer * 100))) ORDER BY b ASC NULLS LAST +(18 rows) + +SELECT g.i, s.b FROM generate_series(1, 3) g(i), + LATERAL (SELECT b FROM async_pt WHERE a > g.i AND b > g.i * 100 ORDER BY b LIMIT 1) s; + i | b +---+----- + 1 | 105 + 2 | 205 + 3 | 305 +(3 rows) + DROP FOREIGN TABLE async_p3; DROP TABLE base_tbl3; -- Check case where the partitioned table has local/remote partitions @@ -11817,6 +12035,37 @@ SELECT * FROM result_tbl ORDER BY a; (3 rows) DELETE FROM result_tbl; +-- Test Merge Append +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a; + QUERY PLAN +------------------------------------------------------------------------------------------------------ + Merge Append + Sort Key: async_pt.b, async_pt.a + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: async_pt_1.a, async_pt_1.b, async_pt_1.c + Filter: (async_pt_1.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl1 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST + -> Async Foreign Scan on public.async_p2 async_pt_2 + Output: async_pt_2.a, async_pt_2.b, async_pt_2.c + Filter: (async_pt_2.b === 505) + Remote SQL: SELECT a, b, c FROM public.base_tbl2 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST + -> Sort + Output: async_pt_3.a, async_pt_3.b, async_pt_3.c + Sort Key: async_pt_3.b, async_pt_3.a + -> Seq Scan on public.async_p3 async_pt_3 + Output: async_pt_3.a, async_pt_3.b, async_pt_3.c + Filter: (async_pt_3.b === 505) +(16 rows) + +SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a; + a | b | c +------+-----+------ + 1505 | 505 | 0505 + 2505 | 505 | 0505 + 3505 | 505 | 0505 +(3 rows) + -- partitionwise joins SET enable_partitionwise_join TO true; CREATE TABLE join_tbl (a1 int, b1 int, c1 text, a2 int, b2 int, c2 text); @@ -12012,6 +12261,21 @@ SELECT * FROM async_pt WHERE a < 2000; Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < 2000)) (3 rows) +-- Test interaction of async Merge Append with plan-time partition pruning +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM async_pt WHERE a < 3000 ORDER BY b, a; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------- + Merge Append + Sort Key: async_pt.b, async_pt.a + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: async_pt_1.a, async_pt_1.b, async_pt_1.c + Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < 3000)) ORDER BY b ASC NULLS LAST, a ASC NULLS LAST + -> Async Foreign Scan on public.async_p2 async_pt_2 + Output: async_pt_2.a, async_pt_2.b, async_pt_2.c + Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE ((a < 3000)) ORDER BY b ASC NULLS LAST, a ASC NULLS LAST +(8 rows) + -- Test interaction of async execution with run-time partition pruning SET plan_cache_mode TO force_generic_plan; PREPARE async_pt_query (int, int) AS @@ -12063,6 +12327,52 @@ SELECT * FROM result_tbl ORDER BY a; (1 row) DELETE FROM result_tbl; +-- Test interaction of async Merge Append with run-time partition pruning +PREPARE async_pt_merge_query (int, int) AS + SELECT * FROM async_pt WHERE a < $1 AND b === $2 ORDER BY b, a; +EXPLAIN (VERBOSE, COSTS OFF) +EXECUTE async_pt_merge_query (3000, 505); + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------- + Merge Append + Sort Key: async_pt.b, async_pt.a + Subplans Removed: 1 + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: async_pt_1.a, async_pt_1.b, async_pt_1.c + Filter: (async_pt_1.b === $2) + Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < $1::integer)) ORDER BY b ASC NULLS LAST, a ASC NULLS LAST + -> Async Foreign Scan on public.async_p2 async_pt_2 + Output: async_pt_2.a, async_pt_2.b, async_pt_2.c + Filter: (async_pt_2.b === $2) + Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE ((a < $1::integer)) ORDER BY b ASC NULLS LAST, a ASC NULLS LAST +(11 rows) + +EXECUTE async_pt_merge_query (3000, 505); + a | b | c +------+-----+------ + 1505 | 505 | 0505 + 2505 | 505 | 0505 +(2 rows) + +EXPLAIN (VERBOSE, COSTS OFF) +EXECUTE async_pt_merge_query (2000, 505); + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------- + Merge Append + Sort Key: async_pt.b, async_pt.a + Subplans Removed: 2 + -> Async Foreign Scan on public.async_p1 async_pt_1 + Output: async_pt_1.a, async_pt_1.b, async_pt_1.c + Filter: (async_pt_1.b === $2) + Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < $1::integer)) ORDER BY b ASC NULLS LAST, a ASC NULLS LAST +(7 rows) + +EXECUTE async_pt_merge_query (2000, 505); + a | b | c +------+-----+------ + 1505 | 505 | 0505 +(1 row) + RESET plan_cache_mode; CREATE TABLE local_tbl(a int, b int, c text); INSERT INTO local_tbl VALUES (1505, 505, 'foo'), (2505, 505, 'bar'); @@ -12599,6 +12909,153 @@ SELECT a FROM base_tbl WHERE (a, random() > 0) IN (SELECT a, random() > 0 FROM f DROP FOREIGN TABLE foreign_tbl CASCADE; NOTICE: drop cascades to foreign table foreign_tbl2 DROP TABLE base_tbl; +-- Test async Merge Append +CREATE TABLE distr1 (i int, j int, k text) PARTITION BY HASH (i); +CREATE TABLE base1 (i int, j int, k text); +CREATE TABLE base2 (i int, j int, k text); +CREATE FOREIGN TABLE distr1_p1 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 0) +SERVER loopback OPTIONS (table_name 'base1'); +CREATE FOREIGN TABLE distr1_p2 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 1) +SERVER loopback OPTIONS (table_name 'base2'); +CREATE TABLE distr2 (i int, j int, k text) PARTITION BY HASH (i); +CREATE TABLE base3 (i int, j int, k text); +CREATE TABLE base4 (i int, j int, k text); +CREATE FOREIGN TABLE distr2_p1 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 0) +SERVER loopback OPTIONS (table_name 'base3'); +CREATE FOREIGN TABLE distr2_p2 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 1) +SERVER loopback OPTIONS (table_name 'base4'); +INSERT INTO distr1 +SELECT i, i*10, 'data_' || i FROM generate_series(1, 1000) i; +INSERT INTO distr2 +SELECT i, i*10, 'data_' || i FROM generate_series(1, 100) i; +ANALYZE distr1_p1; +ANALYZE distr1_p2; +ANALYZE distr2_p1; +ANALYZE distr2_p2; +SET enable_partitionwise_join TO ON; +-- Test joins with async Merge Append +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%' +ORDER BY distr2.i LIMIT 10; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Limit + Output: distr1.i, distr1.j, distr1.k, distr2.i, distr2.j, distr2.k + -> Merge Append + Sort Key: distr1.i + -> Async Foreign Scan + Output: distr1_1.i, distr1_1.j, distr1_1.k, distr2_1.i, distr2_1.j, distr2_1.k + Relations: (public.distr1_p1 distr1_1) INNER JOIN (public.distr2_p1 distr2_1) + Remote SQL: SELECT r3.i, r3.j, r3.k, r5.i, r5.j, r5.k FROM (public.base1 r3 INNER JOIN public.base3 r5 ON (((r3.i = r5.i)) AND ((r5.j > 90)) AND ((r5.k ~~ 'data%')))) ORDER BY r3.i ASC NULLS LAST + -> Async Foreign Scan + Output: distr1_2.i, distr1_2.j, distr1_2.k, distr2_2.i, distr2_2.j, distr2_2.k + Relations: (public.distr1_p2 distr1_2) INNER JOIN (public.distr2_p2 distr2_2) + Remote SQL: SELECT r4.i, r4.j, r4.k, r6.i, r6.j, r6.k FROM (public.base2 r4 INNER JOIN public.base4 r6 ON (((r4.i = r6.i)) AND ((r6.j > 90)) AND ((r6.k ~~ 'data%')))) ORDER BY r4.i ASC NULLS LAST +(12 rows) + +SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%' +ORDER BY distr2.i LIMIT 10; + i | j | k | i | j | k +----+-----+---------+----+-----+--------- + 10 | 100 | data_10 | 10 | 100 | data_10 + 11 | 110 | data_11 | 11 | 110 | data_11 + 12 | 120 | data_12 | 12 | 120 | data_12 + 13 | 130 | data_13 | 13 | 130 | data_13 + 14 | 140 | data_14 | 14 | 140 | data_14 + 15 | 150 | data_15 | 15 | 150 | data_15 + 16 | 160 | data_16 | 16 | 160 | data_16 + 17 | 170 | data_17 | 17 | 170 | data_17 + 18 | 180 | data_18 | 18 | 180 | data_18 + 19 | 190 | data_19 | 19 | 190 | data_19 +(10 rows) + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM distr1 LEFT JOIN distr2 ON distr1.i=distr2.i AND distr2.k like 'data%' WHERE distr1.i > 90 +ORDER BY distr1.i LIMIT 20; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Limit + Output: distr1.i, distr1.j, distr1.k, distr2.i, distr2.j, distr2.k + -> Merge Append + Sort Key: distr1.i + -> Async Foreign Scan + Output: distr1_1.i, distr1_1.j, distr1_1.k, distr2_1.i, distr2_1.j, distr2_1.k + Relations: (public.distr1_p1 distr1_1) LEFT JOIN (public.distr2_p1 distr2_1) + Remote SQL: SELECT r4.i, r4.j, r4.k, r6.i, r6.j, r6.k FROM (public.base1 r4 LEFT JOIN public.base3 r6 ON (((r4.i = r6.i)) AND ((r6.k ~~ 'data%')))) WHERE ((r4.i > 90)) ORDER BY r4.i ASC NULLS LAST + -> Async Foreign Scan + Output: distr1_2.i, distr1_2.j, distr1_2.k, distr2_2.i, distr2_2.j, distr2_2.k + Relations: (public.distr1_p2 distr1_2) LEFT JOIN (public.distr2_p2 distr2_2) + Remote SQL: SELECT r5.i, r5.j, r5.k, r7.i, r7.j, r7.k FROM (public.base2 r5 LEFT JOIN public.base4 r7 ON (((r5.i = r7.i)) AND ((r7.k ~~ 'data%')))) WHERE ((r5.i > 90)) ORDER BY r5.i ASC NULLS LAST +(12 rows) + +SELECT * FROM distr1 LEFT JOIN distr2 ON distr1.i=distr2.i AND distr2.k like 'data%' WHERE distr1.i > 90 +ORDER BY distr1.i LIMIT 20; + i | j | k | i | j | k +-----+------+----------+-----+------+---------- + 91 | 910 | data_91 | 91 | 910 | data_91 + 92 | 920 | data_92 | 92 | 920 | data_92 + 93 | 930 | data_93 | 93 | 930 | data_93 + 94 | 940 | data_94 | 94 | 940 | data_94 + 95 | 950 | data_95 | 95 | 950 | data_95 + 96 | 960 | data_96 | 96 | 960 | data_96 + 97 | 970 | data_97 | 97 | 970 | data_97 + 98 | 980 | data_98 | 98 | 980 | data_98 + 99 | 990 | data_99 | 99 | 990 | data_99 + 100 | 1000 | data_100 | 100 | 1000 | data_100 + 101 | 1010 | data_101 | | | + 102 | 1020 | data_102 | | | + 103 | 1030 | data_103 | | | + 104 | 1040 | data_104 | | | + 105 | 1050 | data_105 | | | + 106 | 1060 | data_106 | | | + 107 | 1070 | data_107 | | | + 108 | 1080 | data_108 | | | + 109 | 1090 | data_109 | | | + 110 | 1100 | data_110 | | | +(20 rows) + +-- Test pruning with async Merge Append +DELETE FROM distr2; +INSERT INTO distr2 +SELECT i%10, i*10, 'data_' || i FROM generate_series(1, 1000) i; +DEALLOCATE ALL; +SET plan_cache_mode TO force_generic_plan; +PREPARE async_pt_query (int, int) AS + SELECT * FROM distr2 WHERE i = ANY(ARRAY[$1, $2]) + ORDER BY i,j + LIMIT 10; +EXPLAIN (VERBOSE, COSTS OFF) + EXECUTE async_pt_query(1, 1); + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------------------------ + Limit + Output: distr2.i, distr2.j, distr2.k + -> Merge Append + Sort Key: distr2.i, distr2.j + Subplans Removed: 1 + -> Async Foreign Scan on public.distr2_p1 distr2_1 + Output: distr2_1.i, distr2_1.j, distr2_1.k + Remote SQL: SELECT i, j, k FROM public.base3 WHERE ((i = ANY (ARRAY[$1::integer, $2::integer]))) ORDER BY i ASC NULLS LAST, j ASC NULLS LAST +(8 rows) + +EXECUTE async_pt_query(1, 1); + i | j | k +---+-----+--------- + 1 | 10 | data_1 + 1 | 110 | data_11 + 1 | 210 | data_21 + 1 | 310 | data_31 + 1 | 410 | data_41 + 1 | 510 | data_51 + 1 | 610 | data_61 + 1 | 710 | data_71 + 1 | 810 | data_81 + 1 | 910 | data_91 +(10 rows) + +RESET plan_cache_mode; +RESET enable_partitionwise_join; +DROP TABLE distr1, distr2, base1, base2, base3, base4; ALTER SERVER loopback OPTIONS (DROP async_capable); ALTER SERVER loopback2 OPTIONS (DROP async_capable); -- =================================================================== diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 117804cff55..f56c30a1ab5 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -8162,8 +8162,8 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq) ForeignScanState *node = (ForeignScanState *) areq->requestee; PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq; - AppendState *requestor = (AppendState *) areq->requestor; - WaitEventSet *set = requestor->as.eventset; + AppendBaseState *requestor = (AppendBaseState *) areq->requestor; + WaitEventSet *set = requestor->eventset; /* This should not be called unless callback_pending */ Assert(areq->callback_pending); @@ -8204,8 +8204,14 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq) * in-process request, then begin a fetch to configure the event * below, because we might otherwise end up with no configured events * other than the postmaster death event. + * + * needrequest is populated only by Append (MergeAppend drives its + * subplans one at a time and has no equivalent notion of a subplan + * being "ready for a new request"), so this check has no effect for + * a MergeAppend requestor and we always fall through to the weaker + * check below. */ - if (!bms_is_empty(requestor->as.needrequest)) + if (!bms_is_empty(requestor->needrequest)) return; if (GetNumRegisteredWaitEvents(set) > 1) return; diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 7aad91b0718..54be12bad61 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -4035,10 +4035,28 @@ INSERT INTO result_tbl SELECT a, b, 'AAA' || c FROM async_pt WHERE b === 505; SELECT * FROM result_tbl ORDER BY a; DELETE FROM result_tbl; +-- Test Merge Append +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a; +SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a; + +-- Test that steady-state execution of an async Merge Append re-fetches from +-- an async subplan instead of just consuming its initial batch and treating +-- exhaustion of that batch as end-of-data. Each partition here has 200 +-- matching rows, more than the FDW's default fetch_size (100), so getting +-- the right count requires more than one fetch from at least one partition +-- after the initial heap fill. The LIMIT keeps the ORDER BY (and therefore +-- the Merge Append) from being optimized away in the subquery. +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM async_pt ORDER BY b, a; +SELECT count(*) FROM (SELECT * FROM async_pt ORDER BY b, a LIMIT 100000) ss; + -- Test error handling, if accessing one of the foreign partitions errors out CREATE FOREIGN TABLE async_p_broken PARTITION OF async_pt FOR VALUES FROM (10000) TO (10001) SERVER loopback OPTIONS (table_name 'non_existent_table'); SELECT * FROM async_pt; +-- Test error handling for async Merge Append +SELECT * FROM async_pt ORDER BY b, a; DROP FOREIGN TABLE async_p_broken; -- Check case where multiple partitions use the same connection @@ -4069,6 +4087,41 @@ ORDER BY o.x; -- Test COPY TO when foreign table is partition COPY async_pt TO stdout; --error +-- Test Merge Append +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a; +SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a; + +-- Test async Merge Append rescan +EXPLAIN (VERBOSE, COSTS OFF) +SELECT + ARRAY(SELECT f.i FROM (SELECT b + g.i FROM async_pt WHERE a > g.i ORDER BY b) f(i) ORDER BY f.i LIMIT 10) +FROM generate_series(1, 3) g(i); +SELECT + ARRAY(SELECT f.i FROM (SELECT b + g.i FROM async_pt WHERE a > g.i ORDER BY b) f(i) ORDER BY f.i LIMIT 10) +FROM generate_series(1, 3) g(i); + +-- Test async Merge Append rescan when a LIMIT stops the scan before every +-- async subplan's request has completed, so on the next rescan (for the +-- next outer row) some subplan's request is still in flight. +EXPLAIN (VERBOSE, COSTS OFF) +SELECT g.i, s.b FROM generate_series(1, 3) g(i), + LATERAL (SELECT b FROM async_pt WHERE a > g.i ORDER BY b LIMIT 1) s; +SELECT g.i, s.b FROM generate_series(1, 3) g(i), + LATERAL (SELECT b FROM async_pt WHERE a > g.i ORDER BY b LIMIT 1) s; + +-- Test async Merge Append rescan when a LIMIT stops the scan before an +-- async subplan's already-delivered result was ever consumed (because it +-- didn't win the race to the heap top). The next rescan (for the next +-- outer row) must not mistake that leftover result for a fresh one; the +-- expected result increases each row, so a stale reused value would show +-- up as the first row's value repeating instead. +EXPLAIN (VERBOSE, COSTS OFF) +SELECT g.i, s.b FROM generate_series(1, 3) g(i), + LATERAL (SELECT b FROM async_pt WHERE a > g.i AND b > g.i * 100 ORDER BY b LIMIT 1) s; +SELECT g.i, s.b FROM generate_series(1, 3) g(i), + LATERAL (SELECT b FROM async_pt WHERE a > g.i AND b > g.i * 100 ORDER BY b LIMIT 1) s; + DROP FOREIGN TABLE async_p3; DROP TABLE base_tbl3; @@ -4084,6 +4137,11 @@ INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505; SELECT * FROM result_tbl ORDER BY a; DELETE FROM result_tbl; +-- Test Merge Append +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a; +SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a; + -- partitionwise joins SET enable_partitionwise_join TO true; @@ -4124,6 +4182,10 @@ SELECT * FROM async_pt WHERE a < 3000; EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM async_pt WHERE a < 2000; +-- Test interaction of async Merge Append with plan-time partition pruning +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM async_pt WHERE a < 3000 ORDER BY b, a; + -- Test interaction of async execution with run-time partition pruning SET plan_cache_mode TO force_generic_plan; @@ -4144,6 +4206,18 @@ EXECUTE async_pt_query (2000, 505); SELECT * FROM result_tbl ORDER BY a; DELETE FROM result_tbl; +-- Test interaction of async Merge Append with run-time partition pruning +PREPARE async_pt_merge_query (int, int) AS + SELECT * FROM async_pt WHERE a < $1 AND b === $2 ORDER BY b, a; + +EXPLAIN (VERBOSE, COSTS OFF) +EXECUTE async_pt_merge_query (3000, 505); +EXECUTE async_pt_merge_query (3000, 505); + +EXPLAIN (VERBOSE, COSTS OFF) +EXECUTE async_pt_merge_query (2000, 505); +EXECUTE async_pt_merge_query (2000, 505); + RESET plan_cache_mode; CREATE TABLE local_tbl(a int, b int, c text); @@ -4322,6 +4396,69 @@ SELECT a FROM base_tbl WHERE (a, random() > 0) IN (SELECT a, random() > 0 FROM f DROP FOREIGN TABLE foreign_tbl CASCADE; DROP TABLE base_tbl; +-- Test async Merge Append +CREATE TABLE distr1 (i int, j int, k text) PARTITION BY HASH (i); +CREATE TABLE base1 (i int, j int, k text); +CREATE TABLE base2 (i int, j int, k text); +CREATE FOREIGN TABLE distr1_p1 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 0) +SERVER loopback OPTIONS (table_name 'base1'); +CREATE FOREIGN TABLE distr1_p2 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 1) +SERVER loopback OPTIONS (table_name 'base2'); + +CREATE TABLE distr2 (i int, j int, k text) PARTITION BY HASH (i); +CREATE TABLE base3 (i int, j int, k text); +CREATE TABLE base4 (i int, j int, k text); +CREATE FOREIGN TABLE distr2_p1 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 0) +SERVER loopback OPTIONS (table_name 'base3'); +CREATE FOREIGN TABLE distr2_p2 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 1) +SERVER loopback OPTIONS (table_name 'base4'); + +INSERT INTO distr1 +SELECT i, i*10, 'data_' || i FROM generate_series(1, 1000) i; + +INSERT INTO distr2 +SELECT i, i*10, 'data_' || i FROM generate_series(1, 100) i; + +ANALYZE distr1_p1; +ANALYZE distr1_p2; +ANALYZE distr2_p1; +ANALYZE distr2_p2; + +SET enable_partitionwise_join TO ON; + +-- Test joins with async Merge Append +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%' +ORDER BY distr2.i LIMIT 10; +SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%' +ORDER BY distr2.i LIMIT 10; + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT * FROM distr1 LEFT JOIN distr2 ON distr1.i=distr2.i AND distr2.k like 'data%' WHERE distr1.i > 90 +ORDER BY distr1.i LIMIT 20; +SELECT * FROM distr1 LEFT JOIN distr2 ON distr1.i=distr2.i AND distr2.k like 'data%' WHERE distr1.i > 90 +ORDER BY distr1.i LIMIT 20; + +-- Test pruning with async Merge Append +DELETE FROM distr2; +INSERT INTO distr2 +SELECT i%10, i*10, 'data_' || i FROM generate_series(1, 1000) i; + +DEALLOCATE ALL; +SET plan_cache_mode TO force_generic_plan; +PREPARE async_pt_query (int, int) AS + SELECT * FROM distr2 WHERE i = ANY(ARRAY[$1, $2]) + ORDER BY i,j + LIMIT 10; +EXPLAIN (VERBOSE, COSTS OFF) + EXECUTE async_pt_query(1, 1); +EXECUTE async_pt_query(1, 1); +RESET plan_cache_mode; + +RESET enable_partitionwise_join; + +DROP TABLE distr1, distr2, base1, base2, base3, base4; + ALTER SERVER loopback OPTIONS (DROP async_capable); ALTER SERVER loopback2 OPTIONS (DROP async_capable); diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 9172a4c5c95..c05b75968c0 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -5784,6 +5784,20 @@ ANY num_sync ( + enable_async_merge_append (boolean) + + enable_async_merge_append configuration parameter + + + + + Enables or disables the query planner's use of async-aware + merge append plan types. The default is on. + + + + enable_bitmapscan (boolean) diff --git a/src/backend/executor/execAppend.c b/src/backend/executor/execAppend.c index cc3b368eb7b..10be317791e 100644 --- a/src/backend/executor/execAppend.c +++ b/src/backend/executor/execAppend.c @@ -224,7 +224,9 @@ ExecReScanAppendBase(AppendBaseState *node) /* * If chgParam of subnode is not null then plan will be re-scanned by - * first ExecProcNode. + * first ExecProcNode. For an async-capable subplan, that's by + * ExecAppendBaseAsyncBegin()/ExecAsyncRequest() instead, the next + * time it fires a request for it. */ if (subnode->chgParam == NULL) ExecReScan(subnode); diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c index cf7ddbb01f4..cc95d285221 100644 --- a/src/backend/executor/execAsync.c +++ b/src/backend/executor/execAsync.c @@ -19,6 +19,7 @@ #include "executor/instrument.h" #include "executor/nodeAppend.h" #include "executor/nodeForeignscan.h" +#include "executor/nodeMergeAppend.h" /* * Asynchronously request a tuple from a designed async-capable node. @@ -122,6 +123,9 @@ ExecAsyncResponse(AsyncRequest *areq) case T_AppendState: ExecAsyncAppendResponse(areq); break; + case T_MergeAppendState: + ExecAsyncMergeAppendResponse(areq); + break; default: /* If the node doesn't support async, caller messed up. */ elog(ERROR, "unrecognized node type: %d", diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c index 8d8a502e928..7565a7218e4 100644 --- a/src/backend/executor/nodeMergeAppend.c +++ b/src/backend/executor/nodeMergeAppend.c @@ -25,6 +25,20 @@ * to a common sort key. The MergeAppend node merges these streams * to produce output sorted the same way. * + * MergeAppend supports async-capable subplans (e.g. foreign scans). + * Async execution is beneficial during the initial heap fill, where + * all async subplans are kicked off concurrently and their first + * tuples are fetched in parallel. In steady state, the heap + * algorithm still needs the next tuple from one specific subplan + * (the one at the heap top), so we can only make progress on that + * one subplan at a time. For a synchronous subplan we simply call + * ExecProcNode() again. An asynchronous subplan, however, must + * still be driven through the async request/response protocol: a + * plain ExecProcNode() call bypasses it, and FDWs such as + * postgres_fdw treat that as end-of-data once their locally cached + * batch of tuples is exhausted. So for those we issue a fresh + * async request and block until it completes. + * * MergeAppend nodes don't make use of their left and right * subtrees, rather they maintain a list of subplans so * a typical MergeAppend node looks like this in the plan tree: @@ -40,12 +54,14 @@ #include "postgres.h" #include "executor/execAppend.h" +#include "executor/execAsync.h" #include "executor/executor.h" #include "executor/execPartition.h" #include "executor/nodeMergeAppend.h" #include "lib/binaryheap.h" #include "miscadmin.h" #include "utils/sortsupport.h" +#include "utils/wait_event.h" /* * We have one slot for each item in the heap array. We use SlotNumber @@ -57,6 +73,12 @@ typedef int32 SlotNumber; static TupleTableSlot *ExecMergeAppend(PlanState *pstate); static int heap_compare_slots(Datum a, Datum b, void *arg); +static void classify_matching_subplans(MergeAppendState *node); +static void ExecMergeAppendAsyncBegin(MergeAppendState *node); +static void ExecMergeAppendAsyncGetNext(MergeAppendState *node, int mplan); +static void ExecMergeAppendAsyncEventWait(MergeAppendState *node); +static void ExecMergeAppendGetNextSlot(MergeAppendState *node, int mplan); + /* ---------------------------------------------------------------- * ExecInitMergeAppend @@ -88,10 +110,15 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags) -1, NULL); + if (mergestate->as.nasyncplans > 0 && mergestate->as.valid_subplans_identified) + classify_matching_subplans(mergestate); + mergestate->ms_slots = palloc0_array(TupleTableSlot *, mergestate->as.nplans); mergestate->ms_heap = binaryheap_allocate(mergestate->as.nplans, heap_compare_slots, mergestate); + mergestate->ms_asyncremain = NULL; + /* * initialize sort-key information */ @@ -158,8 +185,13 @@ ExecMergeAppend(PlanState *pstate) node->as.valid_subplans = ExecFindMatchingSubPlans(node->as.prune_state, false, NULL); node->as.valid_subplans_identified = true; + classify_matching_subplans(node); } + /* If there are any async subplans, begin executing them. */ + if (node->as.nasyncplans > 0) + ExecMergeAppendAsyncBegin(node); + /* * First time through: pull the first tuple from each valid subplan, * and set up the heap. @@ -171,6 +203,16 @@ ExecMergeAppend(PlanState *pstate) if (!TupIsNull(node->ms_slots[i])) binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i)); } + + /* Look at valid async subplans */ + i = -1; + while ((i = bms_next_member(node->as.valid_asyncplans, i)) >= 0) + { + ExecMergeAppendAsyncGetNext(node, i); + if (!TupIsNull(node->ms_slots[i])) + binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i)); + } + binaryheap_build(node->ms_heap); node->ms_initialized = true; } @@ -185,7 +227,7 @@ ExecMergeAppend(PlanState *pstate) * to not pull tuples until necessary.) */ i = DatumGetInt32(binaryheap_first(node->ms_heap)); - node->ms_slots[i] = ExecProcNode(node->as.plans[i]); + ExecMergeAppendGetNextSlot(node, i); if (!TupIsNull(node->ms_slots[i])) binaryheap_replace_first(node->ms_heap, Int32GetDatum(i)); else @@ -206,6 +248,34 @@ ExecMergeAppend(PlanState *pstate) return result; } +/* ---------------------------------------------------------------- + * ExecMergeAppendGetNextSlot + * + * Retrieve the next tuple for the given subplan into ms_slots[mplan]. + * Synchronous subplans are pulled directly; asynchronous ones must go + * through the async request/response protocol, since a plain + * ExecProcNode() call bypasses it and would be mistaken by an + * async-capable FDW for end-of-data (see the file header comment). + * ---------------------------------------------------------------- + */ +static void +ExecMergeAppendGetNextSlot(MergeAppendState *node, int mplan) +{ + if (bms_is_member(mplan, node->as.valid_asyncplans)) + { + AsyncRequest *areq = node->as.asyncrequests[mplan]; + + Assert(!areq->callback_pending); + + /* Discard the previously-returned result before requesting a new one. */ + node->ms_slots[mplan] = NULL; + ExecAsyncRequest(areq); + ExecMergeAppendAsyncGetNext(node, mplan); + } + else + node->ms_slots[mplan] = ExecProcNode(node->as.plans[mplan]); +} + /* * Compare the tuples in the two given slots. */ @@ -265,8 +335,152 @@ ExecEndMergeAppend(MergeAppendState *node) void ExecReScanMergeAppend(MergeAppendState *node) { + int nasyncplans = node->as.nasyncplans; + ExecReScanAppendBase(&node->as); + /* Reset MergeAppend-specific state */ + if (nasyncplans > 0) + { + bms_free(node->ms_asyncremain); + node->ms_asyncremain = NULL; + } binaryheap_reset(node->ms_heap); node->ms_initialized = false; + + /* + * Discard any results left over from before the rescan. An async + * subplan's slot can still hold a tuple that was delivered but never + * consumed (e.g. because a LIMIT above us stopped pulling before that + * subplan reached the heap top); if we didn't clear it here, the next + * scan's initial fill would mistake it for a fresh result via its + * TupIsNull() check, instead of waiting for the subplan's new async + * response. + */ + memset(node->ms_slots, 0, node->as.nplans * sizeof(TupleTableSlot *)); +} + +/* ---------------------------------------------------------------- + * classify_matching_subplans + * + * Classify the node's ms_valid_subplans into sync ones and + * async ones, adjust it to contain sync ones only, and save + * async ones in the node's as.valid_asyncplans. + * ---------------------------------------------------------------- + */ +static void +classify_matching_subplans(MergeAppendState *node) +{ + Assert(node->as.valid_subplans_identified); + + /* Nothing to do if there are no valid subplans. */ + if (bms_is_empty(node->as.valid_subplans)) + { + node->ms_asyncremain = NULL; + return; + } + + /* No valid async subplans identified. */ + if (!classify_matching_subplans_common(&node->as.valid_subplans, + node->as.asyncplans, + &node->as.valid_asyncplans)) + node->ms_asyncremain = NULL; +} + +/* ---------------------------------------------------------------- + * ExecMergeAppendAsyncBegin + * + * Begin executing designed async-capable subplans. + * ---------------------------------------------------------------- + */ +static void +ExecMergeAppendAsyncBegin(MergeAppendState *node) +{ + /* ExecMergeAppend() identifies valid subplans */ + Assert(node->as.valid_subplans_identified); + + /* Initialize state variables. */ + node->ms_asyncremain = bms_copy(node->as.valid_asyncplans); + + /* Nothing to do if there are no valid async subplans. */ + if (bms_is_empty(node->ms_asyncremain)) + return; + + ExecAppendBaseAsyncBegin(&node->as); +} + +/* ---------------------------------------------------------------- + * ExecMergeAppendAsyncGetNext + * + * Get the next tuple from specified asynchronous subplan. + * ---------------------------------------------------------------- + */ +static void +ExecMergeAppendAsyncGetNext(MergeAppendState *node, int mplan) +{ + /* + * All initial async requests were fired by ExecMergeAppendAsyncBegin. The + * result may already be cached from a prior event wait - if so, nothing + * to do. Otherwise, wait for the specific subplan to deliver a tuple or + * report exhaustion. + */ + while (TupIsNull(node->ms_slots[mplan]) && + bms_is_member(mplan, node->ms_asyncremain)) + { + CHECK_FOR_INTERRUPTS(); + ExecMergeAppendAsyncEventWait(node); + } +} + +/* ---------------------------------------------------------------- + * ExecAsyncMergeAppendResponse + * + * Receive a response from an asynchronous request we made. + * ---------------------------------------------------------------- + */ +void +ExecAsyncMergeAppendResponse(AsyncRequest *areq) +{ + MergeAppendState *node = (MergeAppendState *) areq->requestor; + TupleTableSlot *slot = areq->result; + + /* The result should be a TupleTableSlot or NULL. */ + Assert(slot == NULL || IsA(slot, TupleTableSlot)); + + /* Nothing to do if the request is pending. */ + if (!areq->request_complete) + { + /* The request would have been pending for a callback. */ + Assert(areq->callback_pending); + return; + } + + /* If the result is NULL or an empty slot, the subplan is exhausted. */ + if (TupIsNull(slot)) + { + /* The ending subplan wouldn't have been pending for a callback. */ + Assert(!areq->callback_pending); + node->ms_asyncremain = bms_del_member(node->ms_asyncremain, + areq->request_index); + return; + } + + /* Save result directly into the merge slot array. */ + node->ms_slots[areq->request_index] = slot; +} + +/* ---------------------------------------------------------------- + * ExecMergeAppendAsyncEventWait + * + * Wait or poll for file descriptor events and fire callbacks. + * ---------------------------------------------------------------- + */ +static void +ExecMergeAppendAsyncEventWait(MergeAppendState *node) +{ + /* We should never be called when there are no valid async subplans. */ + Assert(bms_num_members(node->ms_asyncremain) > 0); + + ExecAppendBaseAsyncEventWait(&node->as, -1 /* no timeout */ , + WAIT_EVENT_MERGE_APPEND_READY); } diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 1c575e56ff6..b6109e5b91e 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -164,6 +164,7 @@ bool enable_parallel_hash = true; bool enable_partition_pruning = true; bool enable_presorted_aggregate = true; bool enable_async_append = true; +bool enable_async_merge_append = true; typedef struct { diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 828f468f65d..c18e7260a2a 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -1451,6 +1451,7 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path, List *subplans = NIL; ListCell *subpaths; RelOptInfo *rel = best_path->path.parent; + bool consider_async = false; /* * We don't have the actual creation of the MergeAppend node split out @@ -1466,6 +1467,10 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path, node->ab.apprelids = rel->relids; node->ab.child_append_relid_sets = best_path->child_append_relid_sets; + consider_async = (enable_async_merge_append && + !best_path->path.parallel_safe && + list_length(best_path->subpaths) > 1); + /* * Compute sort column info, and adjust MergeAppend's tlist as needed. * Because we pass adjust_tlist_in_place = true, we may ignore the @@ -1566,6 +1571,18 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path, subplan = sort_plan; } + /* + * If needed, check to see if subplan can be executed asynchronously. + * Unlike create_append_plan(), we don't keep a count of the result: + * MergeAppend has no nasyncplans field to keep it in (its executor + * instead recomputes the set of async-capable subplans from each + * subplan's async_capable flag at ExecInit time), since it drives + * its subplans one at a time and has no planning-time consumer for + * such a count. + */ + if (consider_async) + mark_async_capable_plan(subplan, subpath); + subplans = lappend(subplans, subplan); } diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 560659f9568..dc4c14249c3 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -141,6 +141,7 @@ LOGICAL_APPLY_SEND_DATA "Waiting for a logical replication leader apply process LOGICAL_PARALLEL_APPLY_STATE_CHANGE "Waiting for a logical replication parallel apply process to change state." LOGICAL_SYNC_DATA "Waiting for a logical replication remote server to send data for initial table synchronization." LOGICAL_SYNC_STATE_CHANGE "Waiting for a logical replication remote server to change state." +MERGE_APPEND_READY "Waiting for subplan nodes of a MergeAppend plan node to be ready." MESSAGE_QUEUE_INTERNAL "Waiting for another process to be attached to a shared message queue." MESSAGE_QUEUE_PUT_MESSAGE "Waiting to write a protocol message to a shared message queue." MESSAGE_QUEUE_RECEIVE "Waiting to receive bytes from a shared message queue." diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index c9118e71988..ab2ff217688 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -870,6 +870,14 @@ boot_val => 'true', }, +{ name => 'enable_async_merge_append', type => 'bool', context => 'PGC_USERSET', group => 'QUERY_TUNING_METHOD', + short_desc => 'Enables the planner\'s use of async merge append plans.', + flags => 'GUC_EXPLAIN', + variable => 'enable_async_merge_append', + boot_val => 'true', +}, + + { name => 'enable_bitmapscan', type => 'bool', context => 'PGC_USERSET', group => 'QUERY_TUNING_METHOD', short_desc => 'Enables the planner\'s use of bitmap-scan plans.', flags => 'GUC_EXPLAIN', diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index d7942f50a70..297ab2ffb3f 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -424,6 +424,7 @@ # - Planner Method Configuration - #enable_async_append = on +#enable_async_merge_append = on #enable_bitmapscan = on #enable_gathermerge = on #enable_hashagg = on diff --git a/src/include/executor/nodeMergeAppend.h b/src/include/executor/nodeMergeAppend.h index dfcf45099ba..2255cc68b21 100644 --- a/src/include/executor/nodeMergeAppend.h +++ b/src/include/executor/nodeMergeAppend.h @@ -19,5 +19,6 @@ extern MergeAppendState *ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags); extern void ExecEndMergeAppend(MergeAppendState *node); extern void ExecReScanMergeAppend(MergeAppendState *node); +extern void ExecAsyncMergeAppendResponse(AsyncRequest *areq); #endif /* NODEMERGEAPPEND_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 006be3ccf3e..08ac74aaf4f 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1609,6 +1609,9 @@ typedef struct MergeAppendState TupleTableSlot **ms_slots; /* array of length ms_nplans */ struct binaryheap *ms_heap; /* binary heap of slot indices */ bool ms_initialized; /* are subplans started? */ + + /* Merge-specific async tracking */ + Bitmapset *ms_asyncremain; /* remaining asynchronous plans */ } MergeAppendState; /* ---------------- diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index f2fd5d31507..798af1fcd5c 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -70,6 +70,7 @@ extern PGDLLIMPORT bool enable_parallel_hash; extern PGDLLIMPORT bool enable_partition_pruning; extern PGDLLIMPORT bool enable_presorted_aggregate; extern PGDLLIMPORT bool enable_async_append; +extern PGDLLIMPORT bool enable_async_merge_append; extern PGDLLIMPORT int constraint_exclusion; extern double index_pages_fetched(double tuples_fetched, BlockNumber pages, diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out index 132b56a5864..422ca8b7d1f 100644 --- a/src/test/regress/expected/sysviews.out +++ b/src/test/regress/expected/sysviews.out @@ -156,6 +156,7 @@ select name, setting from pg_settings where name like 'enable%'; name | setting --------------------------------+--------- enable_async_append | on + enable_async_merge_append | on enable_bitmapscan | on enable_distinct_reordering | on enable_eager_aggregate | on @@ -180,7 +181,7 @@ select name, setting from pg_settings where name like 'enable%'; enable_seqscan | on enable_sort | on enable_tidscan | on -(25 rows) +(26 rows) -- There are always wait event descriptions for various types. InjectionPoint -- may be present or absent, depending on history since last postmaster start. -- 2.39.5 (Apple Git-154)