From fe64d52c836eb1120d7446d9f1ea1ea5b27b0bbf Mon Sep 17 00:00:00 2001
From: Alexander Pyhalov <a.pyhalov@postgrespro.ru>
Date: Sat, 26 Jul 2025 10:43:57 +0300
Subject: [PATCH 2/2] MergeAppend should support Async Foreign Scan subplans

---
 .../postgres_fdw/expected/postgres_fdw.out    | 247 +++++++++
 contrib/postgres_fdw/postgres_fdw.c           |  10 +-
 contrib/postgres_fdw/sql/postgres_fdw.sql     |  78 +++
 doc/src/sgml/config.sgml                      |  14 +
 src/backend/executor/execAsync.c              |   4 +
 src/backend/executor/nodeMergeAppend.c        | 481 +++++++++++++++++-
 src/backend/optimizer/path/costsize.c         |   1 +
 src/backend/optimizer/plan/createplan.c       |   9 +
 src/backend/utils/misc/guc_tables.c           |  10 +
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/executor/nodeMergeAppend.h        |   1 +
 src/include/nodes/execnodes.h                 |  56 ++
 src/include/optimizer/cost.h                  |   1 +
 src/test/regress/expected/sysviews.out        |   3 +-
 14 files changed, 910 insertions(+), 6 deletions(-)

diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 4b6e49a5d95..ca5f0926e22 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -11453,6 +11453,46 @@ SELECT * FROM result_tbl ORDER BY a;
 (2 rows)
 
 DELETE FROM result_tbl;
+-- Test Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a;
+                                                          QUERY PLAN                                                          
+------------------------------------------------------------------------------------------------------------------------------
+ Merge Append
+   Sort Key: async_pt.b, async_pt.a
+   ->  Async Foreign Scan on public.async_p1 async_pt_1
+         Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+         Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE (((b % 100) = 0)) ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+   ->  Async Foreign Scan on public.async_p2 async_pt_2
+         Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+         Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE (((b % 100) = 0)) ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+(8 rows)
+
+SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a;
+  a   |  b  |  c   
+------+-----+------
+ 1000 |   0 | 0000
+ 2000 |   0 | 0000
+ 1100 | 100 | 0100
+ 2100 | 100 | 0100
+ 1200 | 200 | 0200
+ 2200 | 200 | 0200
+ 1300 | 300 | 0300
+ 2300 | 300 | 0300
+ 1400 | 400 | 0400
+ 2400 | 400 | 0400
+ 1500 | 500 | 0500
+ 2500 | 500 | 0500
+ 1600 | 600 | 0600
+ 2600 | 600 | 0600
+ 1700 | 700 | 0700
+ 2700 | 700 | 0700
+ 1800 | 800 | 0800
+ 2800 | 800 | 0800
+ 1900 | 900 | 0900
+ 2900 | 900 | 0900
+(20 rows)
+
 -- Test error handling, if accessing one of the foreign partitions errors out
 CREATE FOREIGN TABLE async_p_broken PARTITION OF async_pt FOR VALUES FROM (10000) TO (10001)
   SERVER loopback OPTIONS (table_name 'non_existent_table');
@@ -11496,6 +11536,35 @@ SELECT * FROM result_tbl ORDER BY a;
 (3 rows)
 
 DELETE FROM result_tbl;
+-- Test Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+                                              QUERY PLAN                                              
+------------------------------------------------------------------------------------------------------
+ Merge Append
+   Sort Key: async_pt.b, async_pt.a
+   ->  Async Foreign Scan on public.async_p1 async_pt_1
+         Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+         Filter: (async_pt_1.b === 505)
+         Remote SQL: SELECT a, b, c FROM public.base_tbl1 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+   ->  Async Foreign Scan on public.async_p2 async_pt_2
+         Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+         Filter: (async_pt_2.b === 505)
+         Remote SQL: SELECT a, b, c FROM public.base_tbl2 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+   ->  Async Foreign Scan on public.async_p3 async_pt_3
+         Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
+         Filter: (async_pt_3.b === 505)
+         Remote SQL: SELECT a, b, c FROM public.base_tbl3 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+(14 rows)
+
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+  a   |  b  |  c   
+------+-----+------
+ 1505 | 505 | 0505
+ 2505 | 505 | 0505
+ 3505 | 505 | 0505
+(3 rows)
+
 DROP FOREIGN TABLE async_p3;
 DROP TABLE base_tbl3;
 -- Check case where the partitioned table has local/remote partitions
@@ -11531,6 +11600,37 @@ SELECT * FROM result_tbl ORDER BY a;
 (3 rows)
 
 DELETE FROM result_tbl;
+-- Test Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+                                              QUERY PLAN                                              
+------------------------------------------------------------------------------------------------------
+ Merge Append
+   Sort Key: async_pt.b, async_pt.a
+   ->  Async Foreign Scan on public.async_p1 async_pt_1
+         Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+         Filter: (async_pt_1.b === 505)
+         Remote SQL: SELECT a, b, c FROM public.base_tbl1 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+   ->  Async Foreign Scan on public.async_p2 async_pt_2
+         Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+         Filter: (async_pt_2.b === 505)
+         Remote SQL: SELECT a, b, c FROM public.base_tbl2 ORDER BY b ASC NULLS LAST, a ASC NULLS LAST
+   ->  Sort
+         Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
+         Sort Key: async_pt_3.b, async_pt_3.a
+         ->  Seq Scan on public.async_p3 async_pt_3
+               Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
+               Filter: (async_pt_3.b === 505)
+(16 rows)
+
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+  a   |  b  |  c   
+------+-----+------
+ 1505 | 505 | 0505
+ 2505 | 505 | 0505
+ 3505 | 505 | 0505
+(3 rows)
+
 -- partitionwise joins
 SET enable_partitionwise_join TO true;
 CREATE TABLE join_tbl (a1 int, b1 int, c1 text, a2 int, b2 int, c2 text);
@@ -12313,6 +12413,153 @@ SELECT a FROM base_tbl WHERE (a, random() > 0) IN (SELECT a, random() > 0 FROM f
 DROP FOREIGN TABLE foreign_tbl CASCADE;
 NOTICE:  drop cascades to foreign table foreign_tbl2
 DROP TABLE base_tbl;
+-- Test async Merge Append
+CREATE TABLE distr1 (i int, j int, k text) PARTITION BY HASH (i);
+CREATE TABLE base1 (i int, j int, k text);
+CREATE TABLE base2 (i int, j int, k text);
+CREATE FOREIGN TABLE distr1_p1 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 0)
+SERVER loopback OPTIONS (table_name 'base1');
+CREATE FOREIGN TABLE distr1_p2 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 1)
+SERVER loopback OPTIONS (table_name 'base2');
+CREATE TABLE distr2 (i int, j int, k text) PARTITION BY HASH (i);
+CREATE TABLE base3 (i int, j int, k text);
+CREATE TABLE base4 (i int, j int, k text);
+CREATE FOREIGN TABLE distr2_p1 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 0)
+SERVER loopback OPTIONS (table_name 'base3');
+CREATE FOREIGN TABLE distr2_p2 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 1)
+SERVER loopback OPTIONS (table_name 'base4');
+INSERT INTO distr1
+SELECT i, i*10, 'data_' || i FROM generate_series(1, 1000) i;
+INSERT INTO distr2
+SELECT i, i*10, 'data_' || i FROM generate_series(1, 100) i;
+ANALYZE distr1_p1;
+ANALYZE distr1_p2;
+ANALYZE distr2_p1;
+ANALYZE distr2_p2;
+SET enable_partitionwise_join TO ON;
+-- Test joins with async Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%'
+ORDER BY distr2.i LIMIT 10;
+                                                                                                    QUERY PLAN                                                                                                     
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Limit
+   Output: distr1.i, distr1.j, distr1.k, distr2.i, distr2.j, distr2.k
+   ->  Merge Append
+         Sort Key: distr1.i
+         ->  Async Foreign Scan
+               Output: distr1_1.i, distr1_1.j, distr1_1.k, distr2_1.i, distr2_1.j, distr2_1.k
+               Relations: (public.distr1_p1 distr1_1) INNER JOIN (public.distr2_p1 distr2_1)
+               Remote SQL: SELECT r3.i, r3.j, r3.k, r5.i, r5.j, r5.k FROM (public.base1 r3 INNER JOIN public.base3 r5 ON (((r3.i = r5.i)) AND ((r5.j > 90)) AND ((r5.k ~~ 'data%')))) ORDER BY r3.i ASC NULLS LAST
+         ->  Async Foreign Scan
+               Output: distr1_2.i, distr1_2.j, distr1_2.k, distr2_2.i, distr2_2.j, distr2_2.k
+               Relations: (public.distr1_p2 distr1_2) INNER JOIN (public.distr2_p2 distr2_2)
+               Remote SQL: SELECT r4.i, r4.j, r4.k, r6.i, r6.j, r6.k FROM (public.base2 r4 INNER JOIN public.base4 r6 ON (((r4.i = r6.i)) AND ((r6.j > 90)) AND ((r6.k ~~ 'data%')))) ORDER BY r4.i ASC NULLS LAST
+(12 rows)
+
+SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%'
+ORDER BY distr2.i LIMIT 10;
+ i  |  j  |    k    | i  |  j  |    k    
+----+-----+---------+----+-----+---------
+ 10 | 100 | data_10 | 10 | 100 | data_10
+ 11 | 110 | data_11 | 11 | 110 | data_11
+ 12 | 120 | data_12 | 12 | 120 | data_12
+ 13 | 130 | data_13 | 13 | 130 | data_13
+ 14 | 140 | data_14 | 14 | 140 | data_14
+ 15 | 150 | data_15 | 15 | 150 | data_15
+ 16 | 160 | data_16 | 16 | 160 | data_16
+ 17 | 170 | data_17 | 17 | 170 | data_17
+ 18 | 180 | data_18 | 18 | 180 | data_18
+ 19 | 190 | data_19 | 19 | 190 | data_19
+(10 rows)
+
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM distr1 LEFT JOIN distr2 ON distr1.i=distr2.i AND distr2.k like 'data%' WHERE  distr1.i > 90
+ORDER BY distr1.i LIMIT 20;
+                                                                                                     QUERY PLAN                                                                                                     
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Limit
+   Output: distr1.i, distr1.j, distr1.k, distr2.i, distr2.j, distr2.k
+   ->  Merge Append
+         Sort Key: distr1.i
+         ->  Async Foreign Scan
+               Output: distr1_1.i, distr1_1.j, distr1_1.k, distr2_1.i, distr2_1.j, distr2_1.k
+               Relations: (public.distr1_p1 distr1_1) LEFT JOIN (public.distr2_p1 distr2_1)
+               Remote SQL: SELECT r4.i, r4.j, r4.k, r6.i, r6.j, r6.k FROM (public.base1 r4 LEFT JOIN public.base3 r6 ON (((r4.i = r6.i)) AND ((r6.k ~~ 'data%')))) WHERE ((r4.i > 90)) ORDER BY r4.i ASC NULLS LAST
+         ->  Async Foreign Scan
+               Output: distr1_2.i, distr1_2.j, distr1_2.k, distr2_2.i, distr2_2.j, distr2_2.k
+               Relations: (public.distr1_p2 distr1_2) LEFT JOIN (public.distr2_p2 distr2_2)
+               Remote SQL: SELECT r5.i, r5.j, r5.k, r7.i, r7.j, r7.k FROM (public.base2 r5 LEFT JOIN public.base4 r7 ON (((r5.i = r7.i)) AND ((r7.k ~~ 'data%')))) WHERE ((r5.i > 90)) ORDER BY r5.i ASC NULLS LAST
+(12 rows)
+
+SELECT * FROM distr1 LEFT JOIN distr2 ON  distr1.i=distr2.i AND distr2.k like 'data%' WHERE distr1.i > 90
+ORDER BY distr1.i LIMIT 20;
+  i  |  j   |    k     |  i  |  j   |    k     
+-----+------+----------+-----+------+----------
+  91 |  910 | data_91  |  91 |  910 | data_91
+  92 |  920 | data_92  |  92 |  920 | data_92
+  93 |  930 | data_93  |  93 |  930 | data_93
+  94 |  940 | data_94  |  94 |  940 | data_94
+  95 |  950 | data_95  |  95 |  950 | data_95
+  96 |  960 | data_96  |  96 |  960 | data_96
+  97 |  970 | data_97  |  97 |  970 | data_97
+  98 |  980 | data_98  |  98 |  980 | data_98
+  99 |  990 | data_99  |  99 |  990 | data_99
+ 100 | 1000 | data_100 | 100 | 1000 | data_100
+ 101 | 1010 | data_101 |     |      | 
+ 102 | 1020 | data_102 |     |      | 
+ 103 | 1030 | data_103 |     |      | 
+ 104 | 1040 | data_104 |     |      | 
+ 105 | 1050 | data_105 |     |      | 
+ 106 | 1060 | data_106 |     |      | 
+ 107 | 1070 | data_107 |     |      | 
+ 108 | 1080 | data_108 |     |      | 
+ 109 | 1090 | data_109 |     |      | 
+ 110 | 1100 | data_110 |     |      | 
+(20 rows)
+
+-- Test pruning with async Merge Append
+DELETE FROM distr2;
+INSERT INTO distr2
+SELECT i%10, i*10, 'data_' || i FROM generate_series(1, 1000) i;
+DEALLOCATE ALL;
+SET plan_cache_mode TO force_generic_plan;
+PREPARE async_pt_query (int, int) AS
+  SELECT * FROM distr2 WHERE i = ANY(ARRAY[$1, $2])
+  ORDER BY i,j
+  LIMIT 10;
+EXPLAIN (VERBOSE, COSTS OFF)
+	EXECUTE async_pt_query(1, 1);
+                                                                         QUERY PLAN                                                                         
+------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Limit
+   Output: distr2.i, distr2.j, distr2.k
+   ->  Merge Append
+         Sort Key: distr2.i, distr2.j
+         Subplans Removed: 1
+         ->  Async Foreign Scan on public.distr2_p1 distr2_1
+               Output: distr2_1.i, distr2_1.j, distr2_1.k
+               Remote SQL: SELECT i, j, k FROM public.base3 WHERE ((i = ANY (ARRAY[$1::integer, $2::integer]))) ORDER BY i ASC NULLS LAST, j ASC NULLS LAST
+(8 rows)
+
+EXECUTE async_pt_query(1, 1);
+ i |  j  |    k    
+---+-----+---------
+ 1 |  10 | data_1
+ 1 | 110 | data_11
+ 1 | 210 | data_21
+ 1 | 310 | data_31
+ 1 | 410 | data_41
+ 1 | 510 | data_51
+ 1 | 610 | data_61
+ 1 | 710 | data_71
+ 1 | 810 | data_81
+ 1 | 910 | data_91
+(10 rows)
+
+RESET plan_cache_mode;
+RESET enable_partitionwise_join;
+DROP TABLE distr1, distr2, base1, base2, base3, base4;
 ALTER SERVER loopback OPTIONS (DROP async_capable);
 ALTER SERVER loopback2 OPTIONS (DROP async_capable);
 -- ===================================================================
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 25b287be069..8438d67a6be 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -7213,12 +7213,16 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq)
 	ForeignScanState *node = (ForeignScanState *) areq->requestee;
 	PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
 	AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq;
-	AppendState *requestor = (AppendState *) areq->requestor;
-	WaitEventSet *set = requestor->as_eventset;
+	PlanState  *requestor = areq->requestor;
+	WaitEventSet *set;
+	Bitmapset  *needrequest;
 
 	/* This should not be called unless callback_pending */
 	Assert(areq->callback_pending);
 
+	set = GetAppendEventSet(requestor);
+	needrequest = GetNeedRequest(requestor);
+
 	/*
 	 * If process_pending_request() has been invoked on the given request
 	 * before we get here, we might have some tuples already; in which case
@@ -7256,7 +7260,7 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq)
 		 * below, because we might otherwise end up with no configured events
 		 * other than the postmaster death event.
 		 */
-		if (!bms_is_empty(requestor->as_needrequest))
+		if (!bms_is_empty(needrequest))
 			return;
 		if (GetNumRegisteredWaitEvents(set) > 1)
 			return;
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 31b6c685b55..55708af4736 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -3861,6 +3861,11 @@ INSERT INTO result_tbl SELECT a, b, 'AAA' || c FROM async_pt WHERE b === 505;
 SELECT * FROM result_tbl ORDER BY a;
 DELETE FROM result_tbl;
 
+-- Test Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a;
+SELECT * FROM async_pt WHERE b % 100 = 0 ORDER BY b, a;
+
 -- Test error handling, if accessing one of the foreign partitions errors out
 CREATE FOREIGN TABLE async_p_broken PARTITION OF async_pt FOR VALUES FROM (10000) TO (10001)
   SERVER loopback OPTIONS (table_name 'non_existent_table');
@@ -3881,6 +3886,11 @@ INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
 SELECT * FROM result_tbl ORDER BY a;
 DELETE FROM result_tbl;
 
+-- Test Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+
 DROP FOREIGN TABLE async_p3;
 DROP TABLE base_tbl3;
 
@@ -3896,6 +3906,11 @@ INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
 SELECT * FROM result_tbl ORDER BY a;
 DELETE FROM result_tbl;
 
+-- Test Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+SELECT * FROM async_pt WHERE b === 505 ORDER BY b, a;
+
 -- partitionwise joins
 SET enable_partitionwise_join TO true;
 
@@ -4134,6 +4149,69 @@ SELECT a FROM base_tbl WHERE (a, random() > 0) IN (SELECT a, random() > 0 FROM f
 DROP FOREIGN TABLE foreign_tbl CASCADE;
 DROP TABLE base_tbl;
 
+-- Test async Merge Append
+CREATE TABLE distr1 (i int, j int, k text) PARTITION BY HASH (i);
+CREATE TABLE base1 (i int, j int, k text);
+CREATE TABLE base2 (i int, j int, k text);
+CREATE FOREIGN TABLE distr1_p1 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 0)
+SERVER loopback OPTIONS (table_name 'base1');
+CREATE FOREIGN TABLE distr1_p2 PARTITION OF distr1 FOR VALUES WITH (MODULUS 2, REMAINDER 1)
+SERVER loopback OPTIONS (table_name 'base2');
+
+CREATE TABLE distr2 (i int, j int, k text) PARTITION BY HASH (i);
+CREATE TABLE base3 (i int, j int, k text);
+CREATE TABLE base4 (i int, j int, k text);
+CREATE FOREIGN TABLE distr2_p1 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 0)
+SERVER loopback OPTIONS (table_name 'base3');
+CREATE FOREIGN TABLE distr2_p2 PARTITION OF distr2 FOR VALUES WITH (MODULUS 2, REMAINDER 1)
+SERVER loopback OPTIONS (table_name 'base4');
+
+INSERT INTO distr1
+SELECT i, i*10, 'data_' || i FROM generate_series(1, 1000) i;
+
+INSERT INTO distr2
+SELECT i, i*10, 'data_' || i FROM generate_series(1, 100) i;
+
+ANALYZE distr1_p1;
+ANALYZE distr1_p2;
+ANALYZE distr2_p1;
+ANALYZE distr2_p2;
+
+SET enable_partitionwise_join TO ON;
+
+-- Test joins with async Merge Append
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%'
+ORDER BY distr2.i LIMIT 10;
+SELECT * FROM distr1, distr2 WHERE distr1.i=distr2.i AND distr2.j > 90 and distr2.k like 'data%'
+ORDER BY distr2.i LIMIT 10;
+
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM distr1 LEFT JOIN distr2 ON distr1.i=distr2.i AND distr2.k like 'data%' WHERE  distr1.i > 90
+ORDER BY distr1.i LIMIT 20;
+SELECT * FROM distr1 LEFT JOIN distr2 ON  distr1.i=distr2.i AND distr2.k like 'data%' WHERE distr1.i > 90
+ORDER BY distr1.i LIMIT 20;
+
+-- Test pruning with async Merge Append
+DELETE FROM distr2;
+INSERT INTO distr2
+SELECT i%10, i*10, 'data_' || i FROM generate_series(1, 1000) i;
+
+DEALLOCATE ALL;
+SET plan_cache_mode TO force_generic_plan;
+PREPARE async_pt_query (int, int) AS
+  SELECT * FROM distr2 WHERE i = ANY(ARRAY[$1, $2])
+  ORDER BY i,j
+  LIMIT 10;
+EXPLAIN (VERBOSE, COSTS OFF)
+	EXECUTE async_pt_query(1, 1);
+EXECUTE async_pt_query(1, 1);
+RESET plan_cache_mode;
+
+RESET enable_partitionwise_join;
+
+DROP TABLE distr1, distr2, base1, base2, base3, base4;
+
 ALTER SERVER loopback OPTIONS (DROP async_capable);
 ALTER SERVER loopback2 OPTIONS (DROP async_capable);
 
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 20ccb2d6b54..3a83f2eddd9 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -5443,6 +5443,20 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-enable-async-merge-append" xreflabel="enable_async_merge_append">
+      <term><varname>enable_async_merge_append</varname> (<type>boolean</type>)
+      <indexterm>
+       <primary><varname>enable_async_merge_append</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Enables or disables the query planner's use of async-aware
+        merge append plan types. The default is <literal>on</literal>.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-enable-bitmapscan" xreflabel="enable_bitmapscan">
       <term><varname>enable_bitmapscan</varname> (<type>boolean</type>)
       <indexterm>
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
index 5d3cabe73e3..6dc19ebc374 100644
--- a/src/backend/executor/execAsync.c
+++ b/src/backend/executor/execAsync.c
@@ -17,6 +17,7 @@
 #include "executor/execAsync.h"
 #include "executor/executor.h"
 #include "executor/nodeAppend.h"
+#include "executor/nodeMergeAppend.h"
 #include "executor/nodeForeignscan.h"
 
 /*
@@ -121,6 +122,9 @@ ExecAsyncResponse(AsyncRequest *areq)
 		case T_AppendState:
 			ExecAsyncAppendResponse(areq);
 			break;
+		case T_MergeAppendState:
+			ExecAsyncMergeAppendResponse(areq);
+			break;
 		default:
 			/* If the node doesn't support async, caller messed up. */
 			elog(ERROR, "unrecognized node type: %d",
diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c
index 405e8f94285..6f899a2d5f5 100644
--- a/src/backend/executor/nodeMergeAppend.c
+++ b/src/backend/executor/nodeMergeAppend.c
@@ -39,10 +39,15 @@
 #include "postgres.h"
 
 #include "executor/executor.h"
+#include "executor/execAsync.h"
 #include "executor/execPartition.h"
 #include "executor/nodeMergeAppend.h"
 #include "lib/binaryheap.h"
 #include "miscadmin.h"
+#include "storage/latch.h"
+#include "utils/wait_event.h"
+
+#define EVENT_BUFFER_SIZE                     16
 
 /*
  * We have one slot for each item in the heap array.  We use SlotNumber
@@ -54,6 +59,12 @@ typedef int32 SlotNumber;
 static TupleTableSlot *ExecMergeAppend(PlanState *pstate);
 static int	heap_compare_slots(Datum a, Datum b, void *arg);
 
+static void classify_matching_subplans(MergeAppendState *node);
+static void ExecMergeAppendAsyncBegin(MergeAppendState *node);
+static void ExecMergeAppendAsyncGetNext(MergeAppendState *node, int mplan);
+static bool ExecMergeAppendAsyncRequest(MergeAppendState *node, int mplan);
+static void ExecMergeAppendAsyncEventWait(MergeAppendState *node);
+
 
 /* ----------------------------------------------------------------
  *		ExecInitMergeAppend
@@ -71,6 +82,8 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
 	int			nplans;
 	int			i,
 				j;
+	Bitmapset  *asyncplans;
+	int			nasyncplans;
 
 	/* check for unsupported flags */
 	Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
@@ -106,7 +119,10 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
 		 * later calls to ExecFindMatchingSubPlans.
 		 */
 		if (!prunestate->do_exec_prune && nplans > 0)
+		{
 			mergestate->ms_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
+			mergestate->ms_valid_subplans_identified = true;
+		}
 	}
 	else
 	{
@@ -119,6 +135,7 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
 		Assert(nplans > 0);
 		mergestate->ms_valid_subplans = validsubplans =
 			bms_add_range(NULL, 0, nplans - 1);
+		mergestate->ms_valid_subplans_identified = true;
 		mergestate->ms_prune_state = NULL;
 	}
 
@@ -135,11 +152,25 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
 	 * the results into the mergeplanstates array.
 	 */
 	j = 0;
+	asyncplans = NULL;
+	nasyncplans = 0;
+
 	i = -1;
 	while ((i = bms_next_member(validsubplans, i)) >= 0)
 	{
 		Plan	   *initNode = (Plan *) list_nth(node->mergeplans, i);
 
+		/*
+		 * Record async subplans.  When executing EvalPlanQual, we treat them
+		 * as sync ones; don't do this when initializing an EvalPlanQual plan
+		 * tree.
+		 */
+		if (initNode->async_capable && estate->es_epq_active == NULL)
+		{
+			asyncplans = bms_add_member(asyncplans, j);
+			nasyncplans++;
+		}
+
 		mergeplanstates[j++] = ExecInitNode(initNode, estate, eflags);
 	}
 
@@ -170,6 +201,45 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
 	 */
 	mergestate->ps.ps_ProjInfo = NULL;
 
+	/* Initialize async state */
+	mergestate->ms_asyncplans = asyncplans;
+	mergestate->ms_nasyncplans = nasyncplans;
+	mergestate->ms_asyncrequests = NULL;
+	mergestate->ms_asyncresults = NULL;
+	mergestate->ms_has_asyncresults = NULL;
+	mergestate->ms_asyncremain = NULL;
+	mergestate->ms_needrequest = NULL;
+	mergestate->ms_eventset = NULL;
+	mergestate->ms_valid_asyncplans = NULL;
+
+	if (nasyncplans > 0)
+	{
+		mergestate->ms_asyncrequests = (AsyncRequest **)
+			palloc0(nplans * sizeof(AsyncRequest *));
+
+		i = -1;
+		while ((i = bms_next_member(asyncplans, i)) >= 0)
+		{
+			AsyncRequest *areq;
+
+			areq = palloc(sizeof(AsyncRequest));
+			areq->requestor = (PlanState *) mergestate;
+			areq->requestee = mergeplanstates[i];
+			areq->request_index = i;
+			areq->callback_pending = false;
+			areq->request_complete = false;
+			areq->result = NULL;
+
+			mergestate->ms_asyncrequests[i] = areq;
+		}
+
+		mergestate->ms_asyncresults = (TupleTableSlot **)
+			palloc0(nplans * sizeof(TupleTableSlot *));
+
+		if (mergestate->ms_valid_subplans_identified)
+			classify_matching_subplans(mergestate);
+	}
+
 	/*
 	 * initialize sort-key information
 	 */
@@ -231,9 +301,16 @@ ExecMergeAppend(PlanState *pstate)
 		 * run-time pruning is disabled then the valid subplans will always be
 		 * set to all subplans.
 		 */
-		if (node->ms_valid_subplans == NULL)
+		if (!node->ms_valid_subplans_identified)
+		{
 			node->ms_valid_subplans =
 				ExecFindMatchingSubPlans(node->ms_prune_state, false, NULL);
+			node->ms_valid_subplans_identified = true;
+		}
+
+		/* If there are any async subplans, begin executing them. */
+		if (node->ms_nasyncplans > 0)
+			ExecMergeAppendAsyncBegin(node);
 
 		/*
 		 * First time through: pull the first tuple from each valid subplan,
@@ -246,6 +323,16 @@ ExecMergeAppend(PlanState *pstate)
 			if (!TupIsNull(node->ms_slots[i]))
 				binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
 		}
+
+		/* Look at valid async subplans */
+		i = -1;
+		while ((i = bms_next_member(node->ms_valid_asyncplans, i)) >= 0)
+		{
+			ExecMergeAppendAsyncGetNext(node, i);
+			if (!TupIsNull(node->ms_slots[i]))
+				binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
+		}
+
 		binaryheap_build(node->ms_heap);
 		node->ms_initialized = true;
 	}
@@ -260,7 +347,13 @@ ExecMergeAppend(PlanState *pstate)
 		 * to not pull tuples until necessary.)
 		 */
 		i = DatumGetInt32(binaryheap_first(node->ms_heap));
-		node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
+		if (bms_is_member(i, node->ms_asyncplans))
+			ExecMergeAppendAsyncGetNext(node, i);
+		else
+		{
+			Assert(bms_is_member(i, node->ms_valid_subplans));
+			node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
+		}
 		if (!TupIsNull(node->ms_slots[i]))
 			binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
 		else
@@ -276,6 +369,8 @@ ExecMergeAppend(PlanState *pstate)
 	{
 		i = DatumGetInt32(binaryheap_first(node->ms_heap));
 		result = node->ms_slots[i];
+		/* For async plan record that we can get the next tuple */
+		node->ms_has_asyncresults = bms_del_member(node->ms_has_asyncresults, i);
 	}
 
 	return result;
@@ -355,6 +450,7 @@ void
 ExecReScanMergeAppend(MergeAppendState *node)
 {
 	int			i;
+	int			nasyncplans = node->ms_nasyncplans;
 
 	/*
 	 * If any PARAM_EXEC Params used in pruning expressions have changed, then
@@ -365,8 +461,11 @@ ExecReScanMergeAppend(MergeAppendState *node)
 		bms_overlap(node->ps.chgParam,
 					node->ms_prune_state->execparamids))
 	{
+		node->ms_valid_subplans_identified = false;
 		bms_free(node->ms_valid_subplans);
 		node->ms_valid_subplans = NULL;
+		bms_free(node->ms_valid_asyncplans);
+		node->ms_valid_asyncplans = NULL;
 	}
 
 	for (i = 0; i < node->ms_nplans; i++)
@@ -387,6 +486,384 @@ ExecReScanMergeAppend(MergeAppendState *node)
 		if (subnode->chgParam == NULL)
 			ExecReScan(subnode);
 	}
+
+	/* Reset async state */
+	if (nasyncplans > 0)
+	{
+		i = -1;
+		while ((i = bms_next_member(node->ms_asyncplans, i)) >= 0)
+		{
+			AsyncRequest *areq = node->ms_asyncrequests[i];
+
+			areq->callback_pending = false;
+			areq->request_complete = false;
+			areq->result = NULL;
+		}
+
+		bms_free(node->ms_asyncremain);
+		node->ms_asyncremain = NULL;
+		bms_free(node->ms_needrequest);
+		node->ms_needrequest = NULL;
+		bms_free(node->ms_has_asyncresults);
+		node->ms_has_asyncresults = NULL;
+	}
 	binaryheap_reset(node->ms_heap);
 	node->ms_initialized = false;
 }
+
+/* ----------------------------------------------------------------
+ *              classify_matching_subplans
+ *
+ *              Classify the node's ms_valid_subplans into sync ones and
+ *              async ones, adjust it to contain sync ones only, and save
+ *              async ones in the node's ms_valid_asyncplans.
+ * ----------------------------------------------------------------
+ */
+static void
+classify_matching_subplans(MergeAppendState *node)
+{
+	Bitmapset  *valid_asyncplans;
+
+	Assert(node->ms_valid_subplans_identified);
+	Assert(node->ms_valid_asyncplans == NULL);
+
+	/* Nothing to do if there are no valid subplans. */
+	if (bms_is_empty(node->ms_valid_subplans))
+	{
+		node->ms_asyncremain = NULL;
+		return;
+	}
+
+	/* Nothing to do if there are no valid async subplans. */
+	if (!bms_overlap(node->ms_valid_subplans, node->ms_asyncplans))
+	{
+		node->ms_asyncremain = NULL;
+		return;
+	}
+
+	/* Get valid async subplans. */
+	valid_asyncplans = bms_intersect(node->ms_asyncplans,
+									 node->ms_valid_subplans);
+
+	/* Adjust the valid subplans to contain sync subplans only. */
+	node->ms_valid_subplans = bms_del_members(node->ms_valid_subplans,
+											  valid_asyncplans);
+
+	/* Save valid async subplans. */
+	node->ms_valid_asyncplans = valid_asyncplans;
+}
+
+/* ----------------------------------------------------------------
+ *              ExecMergeAppendAsyncBegin
+ *
+ *              Begin executing designed async-capable subplans.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecMergeAppendAsyncBegin(MergeAppendState *node)
+{
+	int			i;
+
+	/* Backward scan is not supported by async-aware MergeAppends. */
+	Assert(ScanDirectionIsForward(node->ps.state->es_direction));
+
+	/* We should never be called when there are no subplans */
+	Assert(node->ms_nplans > 0);
+
+	/* We should never be called when there are no async subplans. */
+	Assert(node->ms_nasyncplans > 0);
+
+	/* If we've yet to determine the valid subplans then do so now. */
+	if (!node->ms_valid_subplans_identified)
+	{
+		node->ms_valid_subplans =
+			ExecFindMatchingSubPlans(node->ms_prune_state, false, NULL);
+		node->ms_valid_subplans_identified = true;
+
+		classify_matching_subplans(node);
+	}
+
+	/* Initialize state variables. */
+	node->ms_asyncremain = bms_copy(node->ms_valid_asyncplans);
+
+	/* Nothing to do if there are no valid async subplans. */
+	if (bms_is_empty(node->ms_asyncremain))
+		return;
+
+	/* Make a request for each of the valid async subplans. */
+	i = -1;
+	while ((i = bms_next_member(node->ms_valid_asyncplans, i)) >= 0)
+	{
+		AsyncRequest *areq = node->ms_asyncrequests[i];
+
+		Assert(areq->request_index == i);
+		Assert(!areq->callback_pending);
+
+		/* Do the actual work. */
+		ExecAsyncRequest(areq);
+	}
+}
+
+/* ----------------------------------------------------------------
+ *              ExecMergeAppendAsyncGetNext
+ *
+ *              Get the next tuple from specified asynchronous subplan.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecMergeAppendAsyncGetNext(MergeAppendState *node, int mplan)
+{
+	node->ms_slots[mplan] = NULL;
+
+	/* Request a tuple asynchronously. */
+	if (ExecMergeAppendAsyncRequest(node, mplan))
+		return;
+
+	/*
+	 * node->ms_asyncremain can be NULL if we have fetched tuples, but haven't
+	 * returned them yet. In this case ExecMergeAppendAsyncRequest() above
+	 * just returns tuples without performing a request.
+	 */
+	while (bms_is_member(mplan, node->ms_asyncremain))
+	{
+		CHECK_FOR_INTERRUPTS();
+
+		/* Wait or poll for async events. */
+		ExecMergeAppendAsyncEventWait(node);
+
+		/* Request a tuple asynchronously. */
+		if (ExecMergeAppendAsyncRequest(node, mplan))
+			return;
+
+		/*
+		 * Waiting until there's no async requests pending or we got some
+		 * tuples from our request
+		 */
+	}
+
+	/* No tuples */
+	return;
+}
+
+/* ----------------------------------------------------------------
+ *              ExecMergeAppendAsyncRequest
+ *
+ *              Request a tuple asynchronously.
+ * ----------------------------------------------------------------
+ */
+static bool
+ExecMergeAppendAsyncRequest(MergeAppendState *node, int mplan)
+{
+	Bitmapset  *needrequest;
+	int			i;
+
+	/*
+	 * If we've already fetched necessary data, just return it
+	 */
+	if (bms_is_member(mplan, node->ms_has_asyncresults))
+	{
+		node->ms_slots[mplan] = node->ms_asyncresults[mplan];
+		return true;
+	}
+
+	/*
+	 * Get a list of members which can process request and don't have data
+	 * ready.
+	 */
+	needrequest = NULL;
+	i = -1;
+	while ((i = bms_next_member(node->ms_needrequest, i)) >= 0)
+	{
+		if (!bms_is_member(i, node->ms_has_asyncresults))
+			needrequest = bms_add_member(needrequest, i);
+	}
+
+	/*
+	 * If there's no members, which still need request, no need to send it.
+	 */
+	if (bms_is_empty(needrequest))
+		return false;
+
+	/* Clear ms_needrequest flag, as we are going to send requests now */
+	node->ms_needrequest = bms_del_members(node->ms_needrequest, needrequest);
+
+	/* Make a new request for each of the async subplans that need it. */
+	i = -1;
+	while ((i = bms_next_member(needrequest, i)) >= 0)
+	{
+		AsyncRequest *areq = node->ms_asyncrequests[i];
+
+		/*
+		 * We've just checked that subplan doesn't already have some fetched
+		 * data
+		 */
+		Assert(!bms_is_member(i, node->ms_has_asyncresults));
+
+		/* Do the actual work. */
+		ExecAsyncRequest(areq);
+	}
+	bms_free(needrequest);
+
+	/* Return needed asynchronously-generated results if any. */
+	if (bms_is_member(mplan, node->ms_has_asyncresults))
+	{
+		node->ms_slots[mplan] = node->ms_asyncresults[mplan];
+		return true;
+	}
+
+	return false;
+}
+
+/* ----------------------------------------------------------------
+ *              ExecAsyncMergeAppendResponse
+ *
+ *              Receive a response from an asynchronous request we made.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncMergeAppendResponse(AsyncRequest *areq)
+{
+	MergeAppendState *node = (MergeAppendState *) areq->requestor;
+	TupleTableSlot *slot = areq->result;
+
+	/* The result should be a TupleTableSlot or NULL. */
+	Assert(slot == NULL || IsA(slot, TupleTableSlot));
+	Assert(!bms_is_member(areq->request_index, node->ms_has_asyncresults));
+
+	node->ms_asyncresults[areq->request_index] = NULL;
+	/* Nothing to do if the request is pending. */
+	if (!areq->request_complete)
+	{
+		/* The request would have been pending for a callback. */
+		Assert(areq->callback_pending);
+		return;
+	}
+
+	/* If the result is NULL or an empty slot, there's nothing more to do. */
+	if (TupIsNull(slot))
+	{
+		/* The ending subplan wouldn't have been pending for a callback. */
+		Assert(!areq->callback_pending);
+		node->ms_asyncremain = bms_del_member(node->ms_asyncremain, areq->request_index);
+		return;
+	}
+
+	node->ms_has_asyncresults = bms_add_member(node->ms_has_asyncresults, areq->request_index);
+	/* Save result so we can return it. */
+	node->ms_asyncresults[areq->request_index] = slot;
+
+	/*
+	 * Mark the subplan that returned a result as ready for a new request.  We
+	 * don't launch another one here immediately because it might complete.
+	 */
+	node->ms_needrequest = bms_add_member(node->ms_needrequest,
+										  areq->request_index);
+}
+
+/* ----------------------------------------------------------------
+ *		ExecMergeAppendAsyncEventWait
+ *
+ *		Wait or poll for file descriptor events and fire callbacks.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecMergeAppendAsyncEventWait(MergeAppendState *node)
+{
+	int			nevents = node->ms_nasyncplans + 2; /* one for PM death and
+													 * one for latch */
+	WaitEvent	occurred_event[EVENT_BUFFER_SIZE];
+	int			noccurred;
+	int			i;
+
+	/* We should never be called when there are no valid async subplans. */
+	Assert(bms_num_members(node->ms_asyncremain) > 0);
+
+	node->ms_eventset = CreateWaitEventSet(CurrentResourceOwner, nevents);
+	AddWaitEventToSet(node->ms_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
+					  NULL, NULL);
+
+	/* Give each waiting subplan a chance to add an event. */
+	i = -1;
+	while ((i = bms_next_member(node->ms_asyncplans, i)) >= 0)
+	{
+		AsyncRequest *areq = node->ms_asyncrequests[i];
+
+		if (areq->callback_pending)
+			ExecAsyncConfigureWait(areq);
+	}
+
+	/*
+	 * No need for further processing if none of the subplans configured any
+	 * events.
+	 */
+	if (GetNumRegisteredWaitEvents(node->ms_eventset) == 1)
+	{
+		FreeWaitEventSet(node->ms_eventset);
+		node->ms_eventset = NULL;
+		return;
+	}
+
+	/*
+	 * Add the process latch to the set, so that we wake up to process the
+	 * standard interrupts with CHECK_FOR_INTERRUPTS().
+	 *
+	 * NOTE: For historical reasons, it's important that this is added to the
+	 * WaitEventSet after the ExecAsyncConfigureWait() calls.  Namely,
+	 * postgres_fdw calls "GetNumRegisteredWaitEvents(set) == 1" to check if
+	 * any other events are in the set.  That's a poor design, it's
+	 * questionable for postgres_fdw to be doing that in the first place, but
+	 * we cannot change it now.  The pattern has possibly been copied to other
+	 * extensions too.
+	 */
+	AddWaitEventToSet(node->ms_eventset, WL_LATCH_SET, PGINVALID_SOCKET,
+					  MyLatch, NULL);
+
+	/* Return at most EVENT_BUFFER_SIZE events in one call. */
+	if (nevents > EVENT_BUFFER_SIZE)
+		nevents = EVENT_BUFFER_SIZE;
+
+	/*
+	 * Wait until at least one event occurs.
+	 */
+	noccurred = WaitEventSetWait(node->ms_eventset, -1 /* no timeout */ , occurred_event,
+								 nevents, WAIT_EVENT_APPEND_READY);
+	FreeWaitEventSet(node->ms_eventset);
+	node->ms_eventset = NULL;
+	if (noccurred == 0)
+		return;
+
+	/* Deliver notifications. */
+	for (i = 0; i < noccurred; i++)
+	{
+		WaitEvent  *w = &occurred_event[i];
+
+		/*
+		 * Each waiting subplan should have registered its wait event with
+		 * user_data pointing back to its AsyncRequest.
+		 */
+		if ((w->events & WL_SOCKET_READABLE) != 0)
+		{
+			AsyncRequest *areq = (AsyncRequest *) w->user_data;
+
+			if (areq->callback_pending)
+			{
+				/*
+				 * Mark it as no longer needing a callback.  We must do this
+				 * before dispatching the callback in case the callback resets
+				 * the flag.
+				 */
+				areq->callback_pending = false;
+
+				/* Do the actual work. */
+				ExecAsyncNotify(areq);
+			}
+		}
+
+		/* Handle standard interrupts */
+		if ((w->events & WL_LATCH_SET) != 0)
+		{
+			ResetLatch(MyLatch);
+			CHECK_FOR_INTERRUPTS();
+		}
+	}
+}
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index 1f04a2c182c..4c2c2a92ec9 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -163,6 +163,7 @@ bool		enable_parallel_hash = true;
 bool		enable_partition_pruning = true;
 bool		enable_presorted_aggregate = true;
 bool		enable_async_append = true;
+bool		enable_async_merge_append = true;
 
 typedef struct
 {
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index deaf763fd44..bea58bd46f8 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -1471,6 +1471,7 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
 	List	   *subplans = NIL;
 	ListCell   *subpaths;
 	RelOptInfo *rel = best_path->path.parent;
+	bool		consider_async = false;
 
 	/*
 	 * We don't have the actual creation of the MergeAppend node split out
@@ -1485,6 +1486,10 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
 	plan->righttree = NULL;
 	node->apprelids = rel->relids;
 
+	consider_async = (enable_async_merge_append &&
+					  !best_path->path.parallel_safe &&
+					  list_length(best_path->subpaths) > 1);
+
 	/*
 	 * Compute sort column info, and adjust MergeAppend's tlist as needed.
 	 * Because we pass adjust_tlist_in_place = true, we may ignore the
@@ -1585,6 +1590,10 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
 			subplan = sort_plan;
 		}
 
+		/* If needed, check to see if subplan can be executed asynchronously */
+		if (consider_async)
+			mark_async_capable_plan(subplan, subpath);
+
 		subplans = lappend(subplans, subplan);
 	}
 
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index d14b1678e7f..00d967c0f24 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -1006,6 +1006,16 @@ struct config_bool ConfigureNamesBool[] =
 		true,
 		NULL, NULL, NULL
 	},
+	{
+		{"enable_async_merge_append", PGC_USERSET, QUERY_TUNING_METHOD,
+			gettext_noop("Enables the planner's use of async merge append plans."),
+			NULL,
+			GUC_EXPLAIN
+		},
+		&enable_async_merge_append,
+		true,
+		NULL, NULL, NULL
+	},
 	{
 		{"enable_self_join_elimination", PGC_USERSET, QUERY_TUNING_METHOD,
 			gettext_noop("Enables removal of unique self-joins."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index a9d8293474a..bfa332a098c 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -405,6 +405,7 @@
 # - Planner Method Configuration -
 
 #enable_async_append = on
+#enable_async_merge_append = on
 #enable_bitmapscan = on
 #enable_gathermerge = on
 #enable_hashagg = on
diff --git a/src/include/executor/nodeMergeAppend.h b/src/include/executor/nodeMergeAppend.h
index 4eb05dc30d6..e3fdb26ece6 100644
--- a/src/include/executor/nodeMergeAppend.h
+++ b/src/include/executor/nodeMergeAppend.h
@@ -19,5 +19,6 @@
 extern MergeAppendState *ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags);
 extern void ExecEndMergeAppend(MergeAppendState *node);
 extern void ExecReScanMergeAppend(MergeAppendState *node);
+extern void ExecAsyncMergeAppendResponse(AsyncRequest *areq);
 
 #endif							/* NODEMERGEAPPEND_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index e107d6e5f81..097559006af 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1544,10 +1544,66 @@ typedef struct MergeAppendState
 	TupleTableSlot **ms_slots;	/* array of length ms_nplans */
 	struct binaryheap *ms_heap; /* binary heap of slot indices */
 	bool		ms_initialized; /* are subplans started? */
+	Bitmapset  *ms_asyncplans;	/* asynchronous plans indexes */
+	int			ms_nasyncplans; /* # of asynchronous plans */
+	AsyncRequest **ms_asyncrequests;	/* array of AsyncRequests */
+	TupleTableSlot **ms_asyncresults;	/* unreturned results of async plans */
+	Bitmapset   *ms_has_asyncresults;	/* plans which have async results */
+	Bitmapset  *ms_asyncremain; /* remaining asynchronous plans */
+	Bitmapset  *ms_needrequest; /* asynchronous plans needing a new request */
+	struct WaitEventSet *ms_eventset;	/* WaitEventSet used to configure file
+										 * descriptor wait events */
 	struct PartitionPruneState *ms_prune_state;
+	bool		ms_valid_subplans_identified;   /* is ms_valid_subplans valid? */
 	Bitmapset  *ms_valid_subplans;
+	Bitmapset  *ms_valid_asyncplans;	/* valid asynchronous plans indexes */
 } MergeAppendState;
 
+/* Getters for AppendState and MergeAppendState */
+static inline struct WaitEventSet *
+GetAppendEventSet(PlanState *ps)
+{
+	Assert (IsA(ps, AppendState) || IsA(ps, MergeAppendState));
+
+	if (IsA(ps, AppendState))
+		return ((AppendState *)ps)->as_eventset;
+	else
+		return ((MergeAppendState *)ps)->ms_eventset;
+}
+
+static inline Bitmapset *
+GetNeedRequest(PlanState *ps)
+{
+	Assert (IsA(ps, AppendState) || IsA(ps, MergeAppendState));
+
+	if (IsA(ps, AppendState))
+		return ((AppendState *)ps)->as_needrequest;
+	else
+		return ((MergeAppendState *)ps)->ms_needrequest;
+}
+
+static inline Bitmapset *
+GetValidAsyncplans(PlanState *ps)
+{
+	Assert (IsA(ps, AppendState) || IsA(ps, MergeAppendState));
+
+	if (IsA(ps, AppendState))
+		return ((AppendState *)ps)->as_valid_asyncplans;
+	else
+		return ((MergeAppendState *)ps)->ms_valid_asyncplans;
+}
+
+static inline AsyncRequest*
+GetValidAsyncRequest(PlanState *ps, int nreq)
+{
+	Assert (IsA(ps, AppendState) || IsA(ps, MergeAppendState));
+
+	if (IsA(ps, AppendState))
+		return ((AppendState *)ps)->as_asyncrequests[nreq];
+	else
+		return ((MergeAppendState *)ps)->ms_asyncrequests[nreq];
+}
+
 /* ----------------
  *	 RecursiveUnionState information
  *
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index b523bcda8f3..fee491b77ad 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -70,6 +70,7 @@ extern PGDLLIMPORT bool enable_parallel_hash;
 extern PGDLLIMPORT bool enable_partition_pruning;
 extern PGDLLIMPORT bool enable_presorted_aggregate;
 extern PGDLLIMPORT bool enable_async_append;
+extern PGDLLIMPORT bool enable_async_merge_append;
 extern PGDLLIMPORT int constraint_exclusion;
 
 extern double index_pages_fetched(double tuples_fetched, BlockNumber pages,
diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out
index 83228cfca29..2a55efd6605 100644
--- a/src/test/regress/expected/sysviews.out
+++ b/src/test/regress/expected/sysviews.out
@@ -149,6 +149,7 @@ select name, setting from pg_settings where name like 'enable%';
               name              | setting 
 --------------------------------+---------
  enable_async_append            | on
+ enable_async_merge_append      | on
  enable_bitmapscan              | on
  enable_distinct_reordering     | on
  enable_gathermerge             | on
@@ -172,7 +173,7 @@ select name, setting from pg_settings where name like 'enable%';
  enable_seqscan                 | on
  enable_sort                    | on
  enable_tidscan                 | on
-(24 rows)
+(25 rows)
 
 -- There are always wait event descriptions for various types.  InjectionPoint
 -- may be present or absent, depending on history since last postmaster start.
-- 
2.43.0

