diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index d841cec39b..b5b6d30c39 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -59,6 +59,7 @@ typedef struct ConnCacheEntry
bool invalidated; /* true if reconnect is pending */
uint32 server_hashvalue; /* hash value of foreign server OID */
uint32 mapping_hashvalue; /* hash value of user mapping OID */
+ PgFdwConnState state; /* extra per-connection state */
} ConnCacheEntry;
/*
@@ -106,7 +107,7 @@ static bool UserMappingPasswordRequired(UserMapping *user);
* (not even on error), we need this flag to cue manual cleanup.
*/
PGconn *
-GetConnection(UserMapping *user, bool will_prep_stmt)
+GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
{
bool found;
bool retry = false;
@@ -253,6 +254,10 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
/* Remember if caller will prepare statements */
entry->have_prep_stmt |= will_prep_stmt;
+ /* If caller needs access to the per-connection state, return it. */
+ if (state)
+ *state = &entry->state;
+
return entry->conn;
}
@@ -279,6 +284,7 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
entry->mapping_hashvalue =
GetSysCacheHashValue1(USERMAPPINGOID,
ObjectIdGetDatum(user->umid));
+ memset(&entry->state, 0, sizeof(entry->state));
/* Now try to make the connection */
entry->conn = connect_pg_server(server, user);
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index c11092f8cc..ac931e56e9 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -6986,7 +6986,7 @@ INSERT INTO a(aa) VALUES('aaaaa');
INSERT INTO b(aa) VALUES('bbb');
INSERT INTO b(aa) VALUES('bbbb');
INSERT INTO b(aa) VALUES('bbbbb');
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
tableoid | aa
----------+-------
a | aaa
@@ -7014,7 +7014,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
(3 rows)
UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
tableoid | aa
----------+--------
a | aaa
@@ -7042,7 +7042,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
(3 rows)
UPDATE b SET aa = 'new';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
tableoid | aa
----------+--------
a | aaa
@@ -7070,7 +7070,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
(3 rows)
UPDATE a SET aa = 'newtoo';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
tableoid | aa
----------+--------
a | newtoo
@@ -7098,7 +7098,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
(3 rows)
DELETE FROM a;
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
tableoid | aa
----------+----
(0 rows)
@@ -7140,35 +7140,40 @@ insert into bar2 values(3,33,33);
insert into bar2 values(4,44,44);
insert into bar2 values(7,77,77);
explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for update;
- QUERY PLAN
-----------------------------------------------------------------------------------------------
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
+ QUERY PLAN
+-----------------------------------------------------------------------------------------------------------------
LockRows
Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
- -> Hash Join
+ -> Merge Join
Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
Inner Unique: true
- Hash Cond: (bar.f1 = foo.f1)
- -> Append
- -> Seq Scan on public.bar bar_1
+ Merge Cond: (bar.f1 = foo.f1)
+ -> Merge Append
+ Sort Key: bar.f1
+ -> Sort
Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid
+ Sort Key: bar_1.f1
+ -> Seq Scan on public.bar bar_1
+ Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid
-> Foreign Scan on public.bar2 bar_2
Output: bar_2.f1, bar_2.f2, bar_2.ctid, bar_2.*, bar_2.tableoid
- Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR UPDATE
- -> Hash
+ Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR UPDATE
+ -> Sort
Output: foo.ctid, foo.f1, foo.*, foo.tableoid
+ Sort Key: foo.f1
-> HashAggregate
Output: foo.ctid, foo.f1, foo.*, foo.tableoid
Group Key: foo.f1
-> Append
-> Seq Scan on public.foo foo_1
Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
- -> Foreign Scan on public.foo2 foo_2
+ -> Async Foreign Scan on public.foo2 foo_2
Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid
Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
-(23 rows)
+(28 rows)
-select * from bar where f1 in (select f1 from foo) for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
f1 | f2
----+----
1 | 11
@@ -7178,35 +7183,40 @@ select * from bar where f1 in (select f1 from foo) for update;
(4 rows)
explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for share;
- QUERY PLAN
-----------------------------------------------------------------------------------------------
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
+ QUERY PLAN
+----------------------------------------------------------------------------------------------------------------
LockRows
Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
- -> Hash Join
+ -> Merge Join
Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
Inner Unique: true
- Hash Cond: (bar.f1 = foo.f1)
- -> Append
- -> Seq Scan on public.bar bar_1
+ Merge Cond: (bar.f1 = foo.f1)
+ -> Merge Append
+ Sort Key: bar.f1
+ -> Sort
Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid
+ Sort Key: bar_1.f1
+ -> Seq Scan on public.bar bar_1
+ Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid
-> Foreign Scan on public.bar2 bar_2
Output: bar_2.f1, bar_2.f2, bar_2.ctid, bar_2.*, bar_2.tableoid
- Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR SHARE
- -> Hash
+ Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR SHARE
+ -> Sort
Output: foo.ctid, foo.f1, foo.*, foo.tableoid
+ Sort Key: foo.f1
-> HashAggregate
Output: foo.ctid, foo.f1, foo.*, foo.tableoid
Group Key: foo.f1
-> Append
-> Seq Scan on public.foo foo_1
Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
- -> Foreign Scan on public.foo2 foo_2
+ -> Async Foreign Scan on public.foo2 foo_2
Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid
Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
-(23 rows)
+(28 rows)
-select * from bar where f1 in (select f1 from foo) for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
f1 | f2
----+----
1 | 11
@@ -7238,7 +7248,7 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
-> Append
-> Seq Scan on public.foo foo_1
Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
- -> Foreign Scan on public.foo2 foo_2
+ -> Async Foreign Scan on public.foo2 foo_2
Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid
Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
-> Hash Join
@@ -7256,7 +7266,7 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
-> Append
-> Seq Scan on public.foo foo_1
Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
- -> Foreign Scan on public.foo2 foo_2
+ -> Async Foreign Scan on public.foo2 foo_2
Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid
Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
(39 rows)
@@ -7274,6 +7284,7 @@ select tableoid::regclass, * from bar order by 1,2;
(6 rows)
-- Check UPDATE with inherited target and an appendrel subquery
+SET enable_async_append TO false;
explain (verbose, costs off)
update bar set f2 = f2 + 100
from
@@ -7332,6 +7343,7 @@ update bar set f2 = f2 + 100
from
( select f1 from foo union all select f1+3 from foo ) ss
where bar.f1 = ss.f1;
+RESET enable_async_append;
select tableoid::regclass, * from bar order by 1,2;
tableoid | f1 | f2
----------+----+-----
@@ -8571,9 +8583,9 @@ SELECT t1.a,t2.b,t3.c FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) INNER J
Sort
Sort Key: t1.a, t3.c
-> Append
- -> Foreign Scan
+ -> Async Foreign Scan
Relations: ((ftprt1_p1 t1_1) INNER JOIN (ftprt2_p1 t2_1)) INNER JOIN (ftprt1_p1 t3_1)
- -> Foreign Scan
+ -> Async Foreign Scan
Relations: ((ftprt1_p2 t1_2) INNER JOIN (ftprt2_p2 t2_2)) INNER JOIN (ftprt1_p2 t3_2)
(7 rows)
@@ -8610,19 +8622,19 @@ SELECT t1.a,t2.b,t2.c FROM fprt1 t1 LEFT JOIN (SELECT * FROM fprt2 WHERE a < 10)
-- with whole-row reference; partitionwise join does not apply
EXPLAIN (COSTS OFF)
SELECT t1.wr, t2.wr FROM (SELECT t1 wr, a FROM fprt1 t1 WHERE t1.a % 25 = 0) t1 FULL JOIN (SELECT t2 wr, b FROM fprt2 t2 WHERE t2.b % 25 = 0) t2 ON (t1.a = t2.b) ORDER BY 1,2;
- QUERY PLAN
---------------------------------------------------------
+ QUERY PLAN
+--------------------------------------------------------------
Sort
Sort Key: ((t1.*)::fprt1), ((t2.*)::fprt2)
-> Hash Full Join
Hash Cond: (t1.a = t2.b)
-> Append
- -> Foreign Scan on ftprt1_p1 t1_1
- -> Foreign Scan on ftprt1_p2 t1_2
+ -> Async Foreign Scan on ftprt1_p1 t1_1
+ -> Async Foreign Scan on ftprt1_p2 t1_2
-> Hash
-> Append
- -> Foreign Scan on ftprt2_p1 t2_1
- -> Foreign Scan on ftprt2_p2 t2_2
+ -> Async Foreign Scan on ftprt2_p1 t2_1
+ -> Async Foreign Scan on ftprt2_p2 t2_2
(11 rows)
SELECT t1.wr, t2.wr FROM (SELECT t1 wr, a FROM fprt1 t1 WHERE t1.a % 25 = 0) t1 FULL JOIN (SELECT t2 wr, b FROM fprt2 t2 WHERE t2.b % 25 = 0) t2 ON (t1.a = t2.b) ORDER BY 1,2;
@@ -8652,9 +8664,9 @@ SELECT t1.a,t1.b FROM fprt1 t1, LATERAL (SELECT t2.a, t2.b FROM fprt2 t2 WHERE t
Sort
Sort Key: t1.a, t1.b
-> Append
- -> Foreign Scan
+ -> Async Foreign Scan
Relations: (ftprt1_p1 t1_1) INNER JOIN (ftprt2_p1 t2_1)
- -> Foreign Scan
+ -> Async Foreign Scan
Relations: (ftprt1_p2 t1_2) INNER JOIN (ftprt2_p2 t2_2)
(7 rows)
@@ -8707,6 +8719,7 @@ SELECT t1.a, t1.phv, t2.b, t2.phv FROM (SELECT 't1_phv' phv, * FROM fprt1 WHERE
(14 rows)
-- test FOR UPDATE; partitionwise join does not apply
+SET enable_async_append TO false;
EXPLAIN (COSTS OFF)
SELECT t1.a, t2.b FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) WHERE t1.a % 25 = 0 ORDER BY 1,2 FOR UPDATE OF t1;
QUERY PLAN
@@ -8734,6 +8747,7 @@ SELECT t1.a, t2.b FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) WHERE t1.a
400 | 400
(4 rows)
+RESET enable_async_append;
RESET enable_partitionwise_join;
-- ===================================================================
-- test partitionwise aggregates
@@ -8758,17 +8772,17 @@ ANALYZE fpagg_tab_p3;
SET enable_partitionwise_aggregate TO false;
EXPLAIN (COSTS OFF)
SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 ORDER BY 1;
- QUERY PLAN
------------------------------------------------------------
+ QUERY PLAN
+-----------------------------------------------------------------
Sort
Sort Key: pagg_tab.a
-> HashAggregate
Group Key: pagg_tab.a
Filter: (avg(pagg_tab.b) < '22'::numeric)
-> Append
- -> Foreign Scan on fpagg_tab_p1 pagg_tab_1
- -> Foreign Scan on fpagg_tab_p2 pagg_tab_2
- -> Foreign Scan on fpagg_tab_p3 pagg_tab_3
+ -> Async Foreign Scan on fpagg_tab_p1 pagg_tab_1
+ -> Async Foreign Scan on fpagg_tab_p2 pagg_tab_2
+ -> Async Foreign Scan on fpagg_tab_p3 pagg_tab_3
(9 rows)
-- Plan with partitionwise aggregates is enabled
@@ -8780,11 +8794,11 @@ SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 O
Sort
Sort Key: pagg_tab.a
-> Append
- -> Foreign Scan
+ -> Async Foreign Scan
Relations: Aggregate on (fpagg_tab_p1 pagg_tab)
- -> Foreign Scan
+ -> Async Foreign Scan
Relations: Aggregate on (fpagg_tab_p2 pagg_tab_1)
- -> Foreign Scan
+ -> Async Foreign Scan
Relations: Aggregate on (fpagg_tab_p3 pagg_tab_2)
(9 rows)
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index b6c72e1d1e..de01cba130 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -21,6 +21,7 @@
#include "commands/defrem.h"
#include "commands/explain.h"
#include "commands/vacuum.h"
+#include "executor/execAsync.h"
#include "foreign/fdwapi.h"
#include "funcapi.h"
#include "miscadmin.h"
@@ -37,6 +38,7 @@
#include "optimizer/tlist.h"
#include "parser/parsetree.h"
#include "postgres_fdw.h"
+#include "storage/latch.h"
#include "utils/builtins.h"
#include "utils/float.h"
#include "utils/guc.h"
@@ -155,6 +157,11 @@ typedef struct PgFdwScanState
int fetch_ct_2; /* Min(# of fetches done, 2) */
bool eof_reached; /* true if last fetch reached EOF */
+ /* for asynchronous execution */
+ bool async_capable; /* engage asynchronous-capable logic? */
+ PgFdwConnState *conn_state; /* extra per-connection state */
+ ForeignScanState *next_node; /* next ForeignScan node to activate */
+
/* working memory contexts */
MemoryContext batch_cxt; /* context holding current batch of tuples */
MemoryContext temp_cxt; /* context for per-tuple temporary data */
@@ -392,6 +399,10 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root,
RelOptInfo *input_rel,
RelOptInfo *output_rel,
void *extra);
+static bool postgresIsForeignPathAsyncCapable(ForeignPath *path);
+static void postgresForeignAsyncRequest(AsyncRequest *areq);
+static void postgresForeignAsyncConfigureWait(AsyncRequest *areq);
+static void postgresForeignAsyncNotify(AsyncRequest *areq);
/*
* Helper functions
@@ -420,6 +431,7 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
EquivalenceClass *ec, EquivalenceMember *em,
void *arg);
static void create_cursor(ForeignScanState *node);
+static void fetch_more_data_begin(ForeignScanState *node);
static void fetch_more_data(ForeignScanState *node);
static void close_cursor(PGconn *conn, unsigned int cursor_number);
static PgFdwModifyState *create_foreign_modify(EState *estate,
@@ -471,6 +483,7 @@ static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
double *totaldeadrows);
static void analyze_row_processor(PGresult *res, int row,
PgFdwAnalyzeState *astate);
+static void request_tuple_asynchronously(AsyncRequest *areq);
static HeapTuple make_tuple_from_result_row(PGresult *res,
int row,
Relation rel,
@@ -560,6 +573,12 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
/* Support functions for upper relation push-down */
routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
+ /* Support functions for asynchronous execution */
+ routine->IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable;
+ routine->ForeignAsyncRequest = postgresForeignAsyncRequest;
+ routine->ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait;
+ routine->ForeignAsyncNotify = postgresForeignAsyncNotify;
+
PG_RETURN_POINTER(routine);
}
@@ -1435,7 +1454,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
* Get connection to the foreign server. Connection manager will
* establish new connection if necessary.
*/
- fsstate->conn = GetConnection(user, false);
+ fsstate->conn = GetConnection(user, false, &fsstate->conn_state);
/* Assign a unique ID for my cursor */
fsstate->cursor_number = GetCursorNumber(fsstate->conn);
@@ -1486,6 +1505,12 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
&fsstate->param_flinfo,
&fsstate->param_exprs,
&fsstate->param_values);
+
+ /* Initialize async state */
+ fsstate->async_capable = node->ss.ps.plan->async_capable;
+ fsstate->conn_state->activated = NULL;
+ fsstate->conn_state->async_query_sent = false;
+ fsstate->next_node = NULL;
}
/*
@@ -1500,8 +1525,10 @@ postgresIterateForeignScan(ForeignScanState *node)
TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
/*
- * If this is the first call after Begin or ReScan, we need to create the
- * cursor on the remote side.
+ * In sync mode, if this is the first call after Begin or ReScan, we need
+ * to create the cursor on the remote side. In async mode, we would have
+ * aready created the cursor before we get here, even if this is the first
+ * call after Begin or ReScan.
*/
if (!fsstate->cursor_exists)
create_cursor(node);
@@ -1511,6 +1538,9 @@ postgresIterateForeignScan(ForeignScanState *node)
*/
if (fsstate->next_tuple >= fsstate->num_tuples)
{
+ /* In async mode, just clear tuple slot. */
+ if (fsstate->async_capable)
+ return ExecClearTuple(slot);
/* No point in another fetch if we already detected EOF, though. */
if (!fsstate->eof_reached)
fetch_more_data(node);
@@ -1540,6 +1570,14 @@ postgresReScanForeignScan(ForeignScanState *node)
char sql[64];
PGresult *res;
+ /* Reset async state */
+ if (fsstate->async_capable)
+ {
+ fsstate->conn_state->activated = NULL;
+ fsstate->conn_state->async_query_sent = false;
+ fsstate->next_node = NULL;
+ }
+
/* If we haven't created the cursor yet, nothing to do. */
if (!fsstate->cursor_exists)
return;
@@ -1598,6 +1636,14 @@ postgresEndForeignScan(ForeignScanState *node)
if (fsstate == NULL)
return;
+ /*
+ * If we're ending before we've collected a response from an asynchronous
+ * query, we have to consume the response.
+ */
+ if (fsstate->conn_state->activated == node &&
+ fsstate->conn_state->async_query_sent)
+ fetch_more_data(node);
+
/* Close the cursor if open, to prevent accumulation of cursors */
if (fsstate->cursor_exists)
close_cursor(fsstate->conn, fsstate->cursor_number);
@@ -2374,7 +2420,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
* Get connection to the foreign server. Connection manager will
* establish new connection if necessary.
*/
- dmstate->conn = GetConnection(user, false);
+ dmstate->conn = GetConnection(user, false, NULL);
/* Update the foreign-join-related fields. */
if (fsplan->scan.scanrelid == 0)
@@ -2748,7 +2794,7 @@ estimate_path_cost_size(PlannerInfo *root,
false, &retrieved_attrs, NULL);
/* Get the remote estimate */
- conn = GetConnection(fpinfo->user, false);
+ conn = GetConnection(fpinfo->user, false, NULL);
get_remote_estimate(sql.data, conn, &rows, &width,
&startup_cost, &total_cost);
ReleaseConnection(conn);
@@ -3376,6 +3422,34 @@ create_cursor(ForeignScanState *node)
pfree(buf.data);
}
+/*
+ * Begin an asynchronous data fetch.
+ * fetch_more_data must be called to fetch the results..
+ */
+static void
+fetch_more_data_begin(ForeignScanState *node)
+{
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ PGconn *conn = fsstate->conn;
+ char sql[64];
+
+ Assert(fsstate->conn_state->activated == node);
+ Assert(!fsstate->conn_state->async_query_sent);
+
+ /* Create the cursor synchronously. */
+ if (!fsstate->cursor_exists)
+ create_cursor(node);
+
+ /* We will send this query, but not wait for the response. */
+ snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+ fsstate->fetch_size, fsstate->cursor_number);
+
+ if (PQsendQuery(conn, sql) < 0)
+ pgfdw_report_error(ERROR, NULL, conn, false, fsstate->query);
+
+ fsstate->conn_state->async_query_sent = true;
+}
+
/*
* Fetch some more rows from the node's cursor.
*/
@@ -3398,17 +3472,36 @@ fetch_more_data(ForeignScanState *node)
PG_TRY();
{
PGconn *conn = fsstate->conn;
- char sql[64];
int numrows;
int i;
- snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
- fsstate->fetch_size, fsstate->cursor_number);
+ if (fsstate->async_capable)
+ {
+ Assert(fsstate->conn_state->activated == node);
+ Assert(fsstate->conn_state->async_query_sent);
- res = pgfdw_exec_query(conn, sql);
- /* On error, report the original query, not the FETCH. */
- if (PQresultStatus(res) != PGRES_TUPLES_OK)
- pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+ /*
+ * The query was already sent by an earlier call to
+ * fetch_more_data_begin. So now we just fetch the result.
+ */
+ res = PQgetResult(conn);
+ /* On error, report the original query, not the FETCH. */
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+ }
+ else
+ {
+ char sql[64];
+
+ /* This is a regular synchronous fetch. */
+ snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+ fsstate->fetch_size, fsstate->cursor_number);
+
+ res = pgfdw_exec_query(conn, sql);
+ /* On error, report the original query, not the FETCH. */
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+ }
/* Convert the data into HeapTuples */
numrows = PQntuples(res);
@@ -3435,6 +3528,15 @@ fetch_more_data(ForeignScanState *node)
/* Must be EOF if we didn't get as many tuples as we asked for. */
fsstate->eof_reached = (numrows < fsstate->fetch_size);
+
+ /* If this was the second part of an async request, we must fetch until NULL. */
+ if (fsstate->async_capable)
+ {
+ /* call once and raise error if not NULL as expected? */
+ while (PQgetResult(conn) != NULL)
+ ;
+ fsstate->conn_state->async_query_sent = false;
+ }
}
PG_FINALLY();
{
@@ -3559,7 +3661,7 @@ create_foreign_modify(EState *estate,
user = GetUserMapping(userid, table->serverid);
/* Open connection; report that we'll create a prepared statement. */
- fmstate->conn = GetConnection(user, true);
+ fmstate->conn = GetConnection(user, true, NULL);
fmstate->p_name = NULL; /* prepared statement not made yet */
/* Set up remote query information. */
@@ -4434,7 +4536,7 @@ postgresAnalyzeForeignTable(Relation relation,
*/
table = GetForeignTable(RelationGetRelid(relation));
user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
- conn = GetConnection(user, false);
+ conn = GetConnection(user, false, NULL);
/*
* Construct command to get page count for relation.
@@ -4520,7 +4622,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
table = GetForeignTable(RelationGetRelid(relation));
server = GetForeignServer(table->serverid);
user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
- conn = GetConnection(user, false);
+ conn = GetConnection(user, false, NULL);
/*
* Construct cursor that retrieves whole rows from remote.
@@ -4748,7 +4850,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
*/
server = GetForeignServer(serverOid);
mapping = GetUserMapping(GetUserId(), server->serverid);
- conn = GetConnection(mapping, false);
+ conn = GetConnection(mapping, false, NULL);
/* Don't attempt to import collation if remote server hasn't got it */
if (PQserverVersion(conn) < 90100)
@@ -6294,6 +6396,177 @@ add_foreign_final_paths(PlannerInfo *root, RelOptInfo *input_rel,
add_path(final_rel, (Path *) final_path);
}
+/*
+ * postgresIsForeignPathAsyncCapable
+ * Check whether a given ForeignPath node is async-capable.
+ */
+static bool
+postgresIsForeignPathAsyncCapable(ForeignPath *path)
+{
+ return true;
+}
+
+/*
+ * postgresForeignAsyncRequest
+ * Asynchronously request next tuple from a foreign PostgreSQL table.
+ */
+static void
+postgresForeignAsyncRequest(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+
+ /*
+ * If this is the first call after Begin or ReScan, mark the connection
+ * as used by the ForeignScan node.
+ */
+ if (fsstate->conn_state->activated == NULL)
+ fsstate->conn_state->activated = node;
+
+ /*
+ * If the connection has already been used by a ForeignScan node, put it
+ * at the end of the chain of waiting ForeignScan nodes, and then return.
+ */
+ if (node != fsstate->conn_state->activated)
+ {
+ ForeignScanState *curr_node = fsstate->conn_state->activated;
+ PgFdwScanState *curr_fsstate = (PgFdwScanState *) curr_node->fdw_state;
+
+ /* Scan down the chain ... */
+ while (curr_fsstate->next_node)
+ {
+ curr_node = curr_fsstate->next_node;
+ Assert(node != curr_node);
+ curr_fsstate = (PgFdwScanState *) curr_node->fdw_state;
+ }
+ /* Update the chain linking */
+ curr_fsstate->next_node = node;
+ /* Mark the request as needing a callback */
+ areq->callback_pending = true;
+ areq->request_complete = false;
+ return;
+ }
+
+ request_tuple_asynchronously(areq);
+}
+
+/*
+ * postgresForeignAsyncConfigureWait
+ * Configure a file descriptor event for which we wish to wait.
+ */
+static void
+postgresForeignAsyncConfigureWait(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ AppendState *requestor = (AppendState *) areq->requestor;
+ WaitEventSet *set = requestor->as_eventset;
+
+ /* This function should not be called unless callback_pending */
+ Assert(areq->callback_pending);
+
+ /* If the ForeignScan node isn't activated yet, nothing to do */
+ if (fsstate->conn_state->activated != node)
+ return;
+
+ AddWaitEventToSet(set, WL_SOCKET_READABLE, PQsocket(fsstate->conn),
+ NULL, areq);
+}
+
+/*
+ * postgresForeignAsyncNotify
+ * Fetch some more tuples from a file descriptor that becomes ready,
+ * requesting next tuple.
+ */
+static void
+postgresForeignAsyncNotify(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+
+ /* The core code would have initialized the callback_pending flag */
+ Assert(!areq->callback_pending);
+
+ fetch_more_data(node);
+
+ request_tuple_asynchronously(areq);
+}
+
+/*
+ * Asynchronously request next tuple from a foreign PostgreSQL table.
+ */
+static void
+request_tuple_asynchronously(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ TupleTableSlot *result;
+
+ /* Request some more tuples, if we've run out */
+ if (fsstate->next_tuple >= fsstate->num_tuples)
+ {
+ /* No point in another fetch if we already detected EOF, though */
+ if (!fsstate->eof_reached)
+ {
+ /* Begin another fetch */
+ fetch_more_data_begin(node);
+ /* Mark the request as needing a callback */
+ areq->callback_pending = true;
+ areq->request_complete = false;
+ return;
+ }
+ fsstate->conn_state->activated = NULL;
+
+ /* Activate the next ForeignScan node if any */
+ if (fsstate->next_node)
+ {
+ /* Mark the connection as used by the next ForeignScan node */
+ fsstate->conn_state->activated = fsstate->next_node;
+ Assert(!fsstate->conn_state->async_query_sent);
+ /* Begin an asynchronous fetch for that node */
+ fetch_more_data_begin(fsstate->next_node);
+ }
+
+ /* There's nothing more to do; just return a NULL pointer */
+ result = NULL;
+ /* Mark the request as complete */
+ ExecAsyncRequestDone(areq, result);
+ return;
+ }
+
+ /* Get a tuple from the ForeignScan node */
+ result = ExecProcNode((PlanState *) node);
+
+ if (TupIsNull(result))
+ {
+ Assert(fsstate->next_tuple >= fsstate->num_tuples);
+
+ /* Request some more tuples, if we've not detected EOF yet */
+ if (!fsstate->eof_reached)
+ {
+ /* Begin another fetch */
+ fetch_more_data_begin(node);
+ /* Mark the request as needing a callback */
+ areq->callback_pending = true;
+ areq->request_complete = false;
+ return;
+ }
+ fsstate->conn_state->activated = NULL;
+
+ /* Activate the next ForeignScan node if any */
+ if (fsstate->next_node)
+ {
+ /* Mark the connection as used by the next ForeignScan node */
+ fsstate->conn_state->activated = fsstate->next_node;
+ Assert(!fsstate->conn_state->async_query_sent);
+ /* Begin an asynchronous fetch for that node */
+ fetch_more_data_begin(fsstate->next_node);
+ }
+ }
+
+ /* Mark the request as complete */
+ ExecAsyncRequestDone(areq, result);
+}
+
/*
* Create a tuple from the specified row of the PGresult.
*
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index eef410db39..15c9750f8b 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -16,6 +16,7 @@
#include "foreign/foreign.h"
#include "lib/stringinfo.h"
#include "libpq-fe.h"
+#include "nodes/execnodes.h"
#include "nodes/pathnodes.h"
#include "utils/relcache.h"
@@ -124,12 +125,22 @@ typedef struct PgFdwRelationInfo
int relation_index;
} PgFdwRelationInfo;
+/*
+ * Extra control information relating to a connection.
+ */
+typedef struct PgFdwConnState
+{
+ ForeignScanState *activated; /* currently-activated ForeignScan node */
+ bool async_query_sent; /* has an asynchronous query been sent? */
+} PgFdwConnState;
+
/* in postgres_fdw.c */
extern int set_transmission_modes(void);
extern void reset_transmission_modes(int nestlevel);
/* in connection.c */
-extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
+extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt,
+ PgFdwConnState **state);
extern void ReleaseConnection(PGconn *conn);
extern unsigned int GetCursorNumber(PGconn *conn);
extern unsigned int GetPrepStmtNumber(PGconn *conn);
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 25dbc08b98..ad7bf90bb0 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -1799,31 +1799,31 @@ INSERT INTO b(aa) VALUES('bbb');
INSERT INTO b(aa) VALUES('bbbb');
INSERT INTO b(aa) VALUES('bbbbb');
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
SELECT tableoid::regclass, * FROM b;
SELECT tableoid::regclass, * FROM ONLY a;
UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
SELECT tableoid::regclass, * FROM b;
SELECT tableoid::regclass, * FROM ONLY a;
UPDATE b SET aa = 'new';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
SELECT tableoid::regclass, * FROM b;
SELECT tableoid::regclass, * FROM ONLY a;
UPDATE a SET aa = 'newtoo';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
SELECT tableoid::regclass, * FROM b;
SELECT tableoid::regclass, * FROM ONLY a;
DELETE FROM a;
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
SELECT tableoid::regclass, * FROM b;
SELECT tableoid::regclass, * FROM ONLY a;
@@ -1859,12 +1859,12 @@ insert into bar2 values(4,44,44);
insert into bar2 values(7,77,77);
explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for update;
-select * from bar where f1 in (select f1 from foo) for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for share;
-select * from bar where f1 in (select f1 from foo) for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
-- Check UPDATE with inherited target and an inherited source table
explain (verbose, costs off)
@@ -1874,6 +1874,7 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
select tableoid::regclass, * from bar order by 1,2;
-- Check UPDATE with inherited target and an appendrel subquery
+SET enable_async_append TO false;
explain (verbose, costs off)
update bar set f2 = f2 + 100
from
@@ -1883,6 +1884,7 @@ update bar set f2 = f2 + 100
from
( select f1 from foo union all select f1+3 from foo ) ss
where bar.f1 = ss.f1;
+RESET enable_async_append;
select tableoid::regclass, * from bar order by 1,2;
@@ -2492,9 +2494,11 @@ SELECT t1.a, t1.phv, t2.b, t2.phv FROM (SELECT 't1_phv' phv, * FROM fprt1 WHERE
SELECT t1.a, t1.phv, t2.b, t2.phv FROM (SELECT 't1_phv' phv, * FROM fprt1 WHERE a % 25 = 0) t1 FULL JOIN (SELECT 't2_phv' phv, * FROM fprt2 WHERE b % 25 = 0) t2 ON (t1.a = t2.b) ORDER BY t1.a, t2.b;
-- test FOR UPDATE; partitionwise join does not apply
+SET enable_async_append TO false;
EXPLAIN (COSTS OFF)
SELECT t1.a, t2.b FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) WHERE t1.a % 25 = 0 ORDER BY 1,2 FOR UPDATE OF t1;
SELECT t1.a, t2.b FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) WHERE t1.a % 25 = 0 ORDER BY 1,2 FOR UPDATE OF t1;
+RESET enable_async_append;
RESET enable_partitionwise_join;
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 67de4150b8..1427ffc82b 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4733,6 +4733,20 @@ ANY num_sync (
+ enable_async_append (boolean)
+
+ enable_async_append configuration parameter
+
+
+
+
+ Enables or disables the query planner's use of async-aware
+ append plan types. The default is on.
+
+
+
+
enable_bitmapscan (boolean)
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 3d6c901306..c07d6a4d95 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1557,6 +1557,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
+
+ AppendReady
+ Waiting for a subplan of Append to be ready.
+
BackupWaitWalArchive
Waiting for WAL files required for a backup to be successfully
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index d797b5f53e..895c4d8a4c 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -1390,6 +1390,8 @@ ExplainNode(PlanState *planstate, List *ancestors,
}
if (plan->parallel_aware)
appendStringInfoString(es->str, "Parallel ");
+ if (plan->async_capable)
+ appendStringInfoString(es->str, "Async ");
appendStringInfoString(es->str, pname);
es->indent++;
}
@@ -1409,6 +1411,7 @@ ExplainNode(PlanState *planstate, List *ancestors,
if (custom_name)
ExplainPropertyText("Custom Plan Provider", custom_name, es);
ExplainPropertyBool("Parallel Aware", plan->parallel_aware, es);
+ ExplainPropertyBool("Async Capable", plan->async_capable, es);
}
switch (nodeTag(plan))
diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index f990c6473a..1004647d4f 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global
OBJS = \
execAmi.o \
+ execAsync.o \
execCurrent.o \
execExpr.o \
execExprInterp.o \
diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c
index 0c10f1d35c..07f028bdf9 100644
--- a/src/backend/executor/execAmi.c
+++ b/src/backend/executor/execAmi.c
@@ -526,6 +526,10 @@ ExecSupportsBackwardScan(Plan *node)
{
ListCell *l;
+ /* With async, tuples may be interleaved, so can't back up. */
+ if (((Append *) node)->nasyncplans != 0)
+ return false;
+
foreach(l, ((Append *) node)->appendplans)
{
if (!ExecSupportsBackwardScan((Plan *) lfirst(l)))
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
index e69de29bb2..6174ea1eb6 100644
--- a/src/backend/executor/execAsync.c
+++ b/src/backend/executor/execAsync.c
@@ -0,0 +1,113 @@
+/*-------------------------------------------------------------------------
+ *
+ * execAsync.c
+ * Support routines for asynchronous execution
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/executor/execAsync.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "executor/execAsync.h"
+#include "executor/nodeAppend.h"
+#include "executor/nodeForeignscan.h"
+
+static void ExecAsyncResponse(AsyncRequest *areq);
+
+/*
+ * Asynchronously request a tuple from a designed async-capable node.
+ */
+void
+ExecAsyncRequest(AsyncRequest *areq)
+{
+ switch (nodeTag(areq->requestee))
+ {
+ case T_ForeignScanState:
+ ExecAsyncForeignScanRequest(areq);
+ break;
+ default:
+ /* If the node doesn't support async, caller messed up. */
+ elog(ERROR, "unrecognized node type: %d",
+ (int) nodeTag(areq->requestee));
+ }
+
+ ExecAsyncResponse(areq);
+}
+
+/*
+ * Give the asynchronous node a chance to configure the file descriptor event
+ * for which it wishes to wait. We expect the node-type specific callback to
+ * make a sigle call of the following form:
+ *
+ * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq);
+ */
+void
+ExecAsyncConfigureWait(AsyncRequest *areq)
+{
+ switch (nodeTag(areq->requestee))
+ {
+ case T_ForeignScanState:
+ ExecAsyncForeignScanConfigureWait(areq);
+ break;
+ default:
+ /* If the node doesn't support async, caller messed up. */
+ elog(ERROR, "unrecognized node type: %d",
+ (int) nodeTag(areq->requestee));
+ }
+}
+
+/*
+ * Call the asynchronous node back when a relevant event has occurred.
+ */
+void
+ExecAsyncNotify(AsyncRequest *areq)
+{
+ switch (nodeTag(areq->requestee))
+ {
+ case T_ForeignScanState:
+ ExecAsyncForeignScanNotify(areq);
+ break;
+ default:
+ /* If the node doesn't support async, caller messed up. */
+ elog(ERROR, "unrecognized node type: %d",
+ (int) nodeTag(areq->requestee));
+ }
+
+ ExecAsyncResponse(areq);
+}
+
+/*
+ * Call the requestor back when an asynchronous node has produced a result.
+ */
+static void
+ExecAsyncResponse(AsyncRequest *areq)
+{
+ switch (nodeTag(areq->requestor))
+ {
+ case T_AppendState:
+ ExecAsyncAppendResponse(areq);
+ break;
+ default:
+ /* If the node doesn't support async, caller messed up. */
+ elog(ERROR, "unrecognized node type: %d",
+ (int) nodeTag(areq->requestor));
+ }
+}
+
+/*
+ * A requestee node should call this function to deliver the tuple to its
+ * requestor node. The node can call this from its ExecAsyncRequest callback
+ * if the requested tuple is available immediately.
+ */
+void
+ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result)
+{
+ areq->request_complete = true;
+ areq->result = result;
+}
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 88919e62fa..2120481743 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -57,10 +57,13 @@
#include "postgres.h"
+#include "executor/execAsync.h"
#include "executor/execdebug.h"
#include "executor/execPartition.h"
#include "executor/nodeAppend.h"
#include "miscadmin.h"
+#include "pgstat.h"
+#include "storage/latch.h"
/* Shared state for parallel-aware Append. */
struct ParallelAppendState
@@ -78,12 +81,17 @@ struct ParallelAppendState
};
#define INVALID_SUBPLAN_INDEX -1
+#define EVENT_BUFFER_SIZE 16
static TupleTableSlot *ExecAppend(PlanState *pstate);
static bool choose_next_subplan_locally(AppendState *node);
static bool choose_next_subplan_for_leader(AppendState *node);
static bool choose_next_subplan_for_worker(AppendState *node);
static void mark_invalid_subplans_as_finished(AppendState *node);
+static void ExecAppendAsyncBegin(AppendState *node);
+static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result);
+static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result);
+static void ExecAppendAsyncEventWait(AppendState *node);
/* ----------------------------------------------------------------
* ExecInitAppend
@@ -102,7 +110,9 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
AppendState *appendstate = makeNode(AppendState);
PlanState **appendplanstates;
Bitmapset *validsubplans;
+ Bitmapset *asyncplans;
int nplans;
+ int nasyncplans;
int firstvalid;
int i,
j;
@@ -119,6 +129,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
/* Let choose_next_subplan_* function handle setting the first subplan */
appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
+ appendstate->as_syncdone = false;
/* If run-time partition pruning is enabled, then set that up now */
if (node->part_prune_info != NULL)
@@ -191,12 +202,24 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
* While at it, find out the first valid partial plan.
*/
j = 0;
+ asyncplans = NULL;
+ nasyncplans = 0;
firstvalid = nplans;
i = -1;
while ((i = bms_next_member(validsubplans, i)) >= 0)
{
Plan *initNode = (Plan *) list_nth(node->appendplans, i);
+ /*
+ * Record async subplans. When executing EvalPlanQual, we process
+ * async subplans synchronously, so don't do this in that case.
+ */
+ if (initNode->async_capable && estate->es_epq_active == NULL)
+ {
+ asyncplans = bms_add_member(asyncplans, j);
+ nasyncplans++;
+ }
+
/*
* Record the lowest appendplans index which is a valid partial plan.
*/
@@ -210,6 +233,38 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
appendstate->appendplans = appendplanstates;
appendstate->as_nplans = nplans;
+ /* Initialize async state */
+ appendstate->as_asyncplans = asyncplans;
+ appendstate->as_nasyncplans = nasyncplans;
+ appendstate->as_asyncrequests = NULL;
+ appendstate->as_asyncresults = (TupleTableSlot **)
+ palloc0(nasyncplans * sizeof(TupleTableSlot *));
+ appendstate->as_nasyncremain = nasyncplans;
+ appendstate->as_needrequest = NULL;
+ appendstate->as_eventset = NULL;
+
+ if (nasyncplans > 0)
+ {
+ appendstate->as_asyncrequests = (AsyncRequest **)
+ palloc0(nplans * sizeof(AsyncRequest *));
+
+ i = -1;
+ while ((i = bms_next_member(asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq;
+
+ areq = palloc(sizeof(AsyncRequest));
+ areq->requestor = (PlanState *) appendstate;
+ areq->requestee = appendplanstates[i];
+ areq->request_index = i;
+ areq->callback_pending = false;
+ areq->request_complete = false;
+ areq->result = NULL;
+
+ appendstate->as_asyncrequests[i] = areq;
+ }
+ }
+
/*
* Miscellaneous initialization
*/
@@ -232,31 +287,45 @@ static TupleTableSlot *
ExecAppend(PlanState *pstate)
{
AppendState *node = castNode(AppendState, pstate);
+ TupleTableSlot *result;
- if (node->as_whichplan < 0)
+ if (!node->as_syncdone && node->as_whichplan == INVALID_SUBPLAN_INDEX)
{
/* Nothing to do if there are no subplans */
if (node->as_nplans == 0)
return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ /* If there are any async subplans, begin execution of them */
+ if (node->as_nasyncplans > 0)
+ ExecAppendAsyncBegin(node);
+
/*
- * If no subplan has been chosen, we must choose one before
+ * If no sync subplan has been chosen, we must choose one before
* proceeding.
*/
- if (node->as_whichplan == INVALID_SUBPLAN_INDEX &&
- !node->choose_next_subplan(node))
+ if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
return ExecClearTuple(node->ps.ps_ResultTupleSlot);
}
for (;;)
{
PlanState *subnode;
- TupleTableSlot *result;
CHECK_FOR_INTERRUPTS();
/*
- * figure out which subplan we are currently processing
+ * try to get a tuple from any of the async subplans
+ */
+ if (!bms_is_empty(node->as_needrequest) ||
+ (node->as_syncdone && node->as_nasyncremain > 0))
+ {
+ if (ExecAppendAsyncGetNext(node, &result))
+ return result;
+ Assert(bms_is_empty(node->as_needrequest));
+ }
+
+ /*
+ * figure out which sync subplan we are currently processing
*/
Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
subnode = node->appendplans[node->as_whichplan];
@@ -276,8 +345,16 @@ ExecAppend(PlanState *pstate)
return result;
}
- /* choose new subplan; if none, we're done */
- if (!node->choose_next_subplan(node))
+ /* wait or poll async events */
+ if (node->as_nasyncremain > 0)
+ {
+ Assert(!node->as_syncdone);
+ Assert(bms_is_empty(node->as_needrequest));
+ ExecAppendAsyncEventWait(node);
+ }
+
+ /* choose new sync subplan; if no sync/async subplans, we're done */
+ if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
return ExecClearTuple(node->ps.ps_ResultTupleSlot);
}
}
@@ -313,6 +390,7 @@ ExecEndAppend(AppendState *node)
void
ExecReScanAppend(AppendState *node)
{
+ int nasyncplans = node->as_nasyncplans;
int i;
/*
@@ -347,8 +425,27 @@ ExecReScanAppend(AppendState *node)
ExecReScan(subnode);
}
+ /* Reset async state */
+ node->as_nasyncremain = nasyncplans;
+ bms_free(node->as_needrequest);
+ node->as_needrequest = NULL;
+
+ if (nasyncplans > 0)
+ {
+ i = -1;
+ while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq = node->as_asyncrequests[i];
+
+ areq->callback_pending = false;
+ areq->request_complete = false;
+ areq->result = NULL;
+ }
+ }
+
/* Let choose_next_subplan_* function handle setting the first subplan */
node->as_whichplan = INVALID_SUBPLAN_INDEX;
+ node->as_syncdone = false;
}
/* ----------------------------------------------------------------
@@ -429,7 +526,7 @@ ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
/* ----------------------------------------------------------------
* choose_next_subplan_locally
*
- * Choose next subplan for a non-parallel-aware Append,
+ * Choose next sync subplan for a non-parallel-aware Append,
* returning false if there are no more.
* ----------------------------------------------------------------
*/
@@ -444,9 +541,9 @@ choose_next_subplan_locally(AppendState *node)
/*
* If first call then have the bms member function choose the first valid
- * subplan by initializing whichplan to -1. If there happen to be no
- * valid subplans then the bms member function will handle that by
- * returning a negative number which will allow us to exit returning a
+ * sync subplan by initializing whichplan to -1. If there happen to be
+ * no valid sync subplans then the bms member function will handle that
+ * by returning a negative number which will allow us to exit returning a
* false value.
*/
if (whichplan == INVALID_SUBPLAN_INDEX)
@@ -467,7 +564,10 @@ choose_next_subplan_locally(AppendState *node)
nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
if (nextplan < 0)
+ {
+ node->as_syncdone = true;
return false;
+ }
node->as_whichplan = nextplan;
@@ -709,3 +809,265 @@ mark_invalid_subplans_as_finished(AppendState *node)
node->as_pstate->pa_finished[i] = true;
}
}
+
+/* ----------------------------------------------------------------
+ * ExecAppendAsyncBegin
+ *
+ * Begin execution of designed async-capable subplans.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecAppendAsyncBegin(AppendState *node)
+{
+ Bitmapset *valid_asyncplans;
+ int i;
+
+ /* We should never be called when there are no async subplans. */
+ Assert(node->as_nasyncplans > 0);
+
+ if (node->as_valid_subplans == NULL)
+ {
+ node->as_valid_subplans =
+ ExecFindMatchingSubPlans(node->as_prune_state);
+ }
+
+ /* Nothing to do if there are no valid async subplans. */
+ if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
+ return;
+
+ /* Get valid async subplans. */
+ valid_asyncplans = bms_copy(node->as_asyncplans);
+ valid_asyncplans = bms_int_members(valid_asyncplans,
+ node->as_valid_subplans);
+
+ /* Adjust the node's as_valid_suplans to only contain sync subplans. */
+ node->as_valid_subplans = bms_del_members(node->as_valid_subplans,
+ valid_asyncplans);
+
+ /* Make a request for each of the async subplans. */
+ i = -1;
+ while ((i = bms_next_member(valid_asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq = node->as_asyncrequests[i];
+
+ Assert(areq->request_index == i);
+ Assert(!areq->callback_pending);
+
+ /* Do the actual work. */
+ ExecAsyncRequest(areq);
+ }
+ bms_free(valid_asyncplans);
+}
+
+/* ----------------------------------------------------------------
+ * ExecAppendAsyncGetNext
+ *
+ * Get the next tuple from any of the asynchronous subplans.
+ * ----------------------------------------------------------------
+ */
+static bool
+ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
+{
+ *result = NULL;
+
+ /* Make new async requests. */
+ if (ExecAppendAsyncRequest(node, result))
+ return true;
+
+ while (node->as_nasyncremain > 0)
+ {
+ CHECK_FOR_INTERRUPTS();
+
+ /* Wait or poll async events. */
+ ExecAppendAsyncEventWait(node);
+
+ /* Make new async requests. */
+ if (ExecAppendAsyncRequest(node, result))
+ return true;
+
+ /* Break from loop if there is any sync node that is not complete */
+ if (!node->as_syncdone)
+ break;
+ }
+
+ /*
+ * If all sync subplans are complete, we're totally done scanning the
+ * givne node. Otherwise, we're done with the asynchronous stuff but
+ * must continue scanning the sync subplans.
+ */
+ if (node->as_syncdone)
+ {
+ Assert(node->as_nasyncremain == 0);
+ *result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ return true;
+ }
+
+ return false;
+}
+
+/* ----------------------------------------------------------------
+ * ExecAppendAsyncRequest
+ *
+ * If there are any asynchronous subplans that need a new asynchronous
+ * request, make all of them.
+ * ----------------------------------------------------------------
+ */
+static bool
+ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
+{
+ Bitmapset *needrequest;
+ int i;
+
+ /* Nothing to do if there are no async subplans needing a new request. */
+ if (bms_is_empty(node->as_needrequest))
+ return false;
+
+ /*
+ * If there are any asynchronously-generated results that have not yet
+ * been returned, we have nothing to do; just return one of them.
+ */
+ if (node->as_nasyncresults > 0)
+ {
+ --node->as_nasyncresults;
+ *result = node->as_asyncresults[node->as_nasyncresults];
+ return true;
+ }
+
+ /* Make a new request for each of the async subplans that need it. */
+ needrequest = node->as_needrequest;
+ node->as_needrequest = NULL;
+ i = -1;
+ while ((i = bms_next_member(needrequest, i)) >= 0)
+ {
+ AsyncRequest *areq = node->as_asyncrequests[i];
+
+ /* Do the actual work. */
+ ExecAsyncRequest(areq);
+ }
+ bms_free(needrequest);
+
+ /* Return one of the asynchronously-generated results if any. */
+ if (node->as_nasyncresults > 0)
+ {
+ --node->as_nasyncresults;
+ *result = node->as_asyncresults[node->as_nasyncresults];
+ return true;
+ }
+
+ return false;
+}
+
+/* ----------------------------------------------------------------
+ * ExecAppendAsyncEventWait
+ *
+ * Wait or poll for file descriptor wait events and fire callbacks.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecAppendAsyncEventWait(AppendState *node)
+{
+ long timeout = node->as_syncdone ? -1 : 0;
+ WaitEvent occurred_event[EVENT_BUFFER_SIZE];
+ int noccurred;
+ int i;
+
+ /* Nothing to do if there are no async remaining subplans. */
+ if (node->as_nasyncremain == 0)
+ return;
+
+ node->as_eventset = CreateWaitEventSet(CurrentMemoryContext,
+ node->as_nasyncplans + 1);
+ AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
+ NULL, NULL);
+
+ /* Give each waiting subplan a chance to add a event. */
+ i = -1;
+ while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq = node->as_asyncrequests[i];
+
+ if (areq->callback_pending)
+ ExecAsyncConfigureWait(areq);
+ }
+
+ /* Wait for at least one event to occur. */
+ noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
+ EVENT_BUFFER_SIZE, WAIT_EVENT_APPEND_READY);
+ FreeWaitEventSet(node->as_eventset);
+ node->as_eventset = NULL;
+ if (noccurred == 0)
+ return;
+
+ /* Deliver notifications. */
+ for (i = 0; i < noccurred; i++)
+ {
+ WaitEvent *w = &occurred_event[i];
+
+ /*
+ * Each waiting subplan should have registered its wait event with
+ * user_data pointing back to its AsyncRequest.
+ */
+ if ((w->events & WL_SOCKET_READABLE) != 0)
+ {
+ AsyncRequest *areq = (AsyncRequest *) w->user_data;
+
+ /*
+ * Mark it as no longer needing a callback. We must do this
+ * before dispatching the callback in case the callback resets
+ * the flag.
+ */
+ Assert(areq->callback_pending);
+ areq->callback_pending = false;
+
+ /* Do the actual work. */
+ ExecAsyncNotify(areq);
+ }
+ }
+}
+
+/* ----------------------------------------------------------------
+ * ExecAsyncAppendResponse
+ *
+ * Receive a response from an asynchronous request we made.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncAppendResponse(AsyncRequest *areq)
+{
+ AppendState *node = (AppendState *) areq->requestor;
+ TupleTableSlot *slot = areq->result;
+
+ /* The result should be a TupleTableSlot or NULL. */
+ Assert(slot == NULL || IsA(slot, TupleTableSlot));
+
+ /* Nothing to do if the request is pending. */
+ if (!areq->request_complete)
+ {
+ /*
+ * The subplan for which the request was made would be pending for a
+ * callback.
+ */
+ Assert(areq->callback_pending);
+ return;
+ }
+
+ /* If the result is NULL or an empty slot, there's nothing more to do. */
+ if (TupIsNull(slot))
+ {
+ /* The ending subplan would no longer be pending for a callback. */
+ Assert(!areq->callback_pending);
+ --node->as_nasyncremain;
+ return;
+ }
+
+ /* Save result so we can return it */
+ Assert(node->as_nasyncresults < node->as_nasyncplans);
+ node->as_asyncresults[node->as_nasyncresults++] = slot;
+
+ /*
+ * Mark the subplan that returned a result as ready for a new request. We
+ * don't launch another one here immediately because it might complete.
+ */
+ node->as_needrequest = bms_add_member(node->as_needrequest,
+ areq->request_index);
+}
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index 0b20f94035..aacd3464ce 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -391,3 +391,51 @@ ExecShutdownForeignScan(ForeignScanState *node)
if (fdwroutine->ShutdownForeignScan)
fdwroutine->ShutdownForeignScan(node);
}
+
+/* ----------------------------------------------------------------
+ * ExecAsyncForeignScanRequest
+ *
+ * Asynchronously request a tuple from a designed async-capable node
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncForeignScanRequest(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ Assert(fdwroutine->ForeignAsyncRequest != NULL);
+ fdwroutine->ForeignAsyncRequest(areq);
+}
+
+/* ----------------------------------------------------------------
+ * ExecAsyncForeignScanConfigureWait
+ *
+ * In async mode, configure for a wait
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncForeignScanConfigureWait(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ Assert(fdwroutine->ForeignAsyncConfigureWait != NULL);
+ fdwroutine->ForeignAsyncConfigureWait(areq);
+}
+
+/* ----------------------------------------------------------------
+ * ExecAsyncForeignScanNotify
+ *
+ * Callback invoked when a relevant event has occurred
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncForeignScanNotify(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ Assert(fdwroutine->ForeignAsyncNotify != NULL);
+ fdwroutine->ForeignAsyncNotify(areq);
+}
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 70f8b718e0..ea8f0ecfed 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -120,6 +120,7 @@ CopyPlanFields(const Plan *from, Plan *newnode)
COPY_SCALAR_FIELD(plan_width);
COPY_SCALAR_FIELD(parallel_aware);
COPY_SCALAR_FIELD(parallel_safe);
+ COPY_SCALAR_FIELD(async_capable);
COPY_SCALAR_FIELD(plan_node_id);
COPY_NODE_FIELD(targetlist);
COPY_NODE_FIELD(qual);
@@ -241,6 +242,7 @@ _copyAppend(const Append *from)
*/
COPY_BITMAPSET_FIELD(apprelids);
COPY_NODE_FIELD(appendplans);
+ COPY_SCALAR_FIELD(nasyncplans);
COPY_SCALAR_FIELD(first_partial_plan);
COPY_NODE_FIELD(part_prune_info);
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index d78b16ed1d..d8a9ec5be1 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -333,6 +333,7 @@ _outPlanInfo(StringInfo str, const Plan *node)
WRITE_INT_FIELD(plan_width);
WRITE_BOOL_FIELD(parallel_aware);
WRITE_BOOL_FIELD(parallel_safe);
+ WRITE_BOOL_FIELD(async_capable);
WRITE_INT_FIELD(plan_node_id);
WRITE_NODE_FIELD(targetlist);
WRITE_NODE_FIELD(qual);
@@ -431,6 +432,7 @@ _outAppend(StringInfo str, const Append *node)
WRITE_BITMAPSET_FIELD(apprelids);
WRITE_NODE_FIELD(appendplans);
+ WRITE_INT_FIELD(nasyncplans);
WRITE_INT_FIELD(first_partial_plan);
WRITE_NODE_FIELD(part_prune_info);
}
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 0f6a77afc4..56638a0437 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -1574,6 +1574,7 @@ ReadCommonPlan(Plan *local_node)
READ_INT_FIELD(plan_width);
READ_BOOL_FIELD(parallel_aware);
READ_BOOL_FIELD(parallel_safe);
+ READ_BOOL_FIELD(async_capable);
READ_INT_FIELD(plan_node_id);
READ_NODE_FIELD(targetlist);
READ_NODE_FIELD(qual);
@@ -1670,6 +1671,7 @@ _readAppend(void)
READ_BITMAPSET_FIELD(apprelids);
READ_NODE_FIELD(appendplans);
+ READ_INT_FIELD(nasyncplans);
READ_INT_FIELD(first_partial_plan);
READ_NODE_FIELD(part_prune_info);
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index 22d6935824..97f28227cb 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -147,6 +147,7 @@ bool enable_partitionwise_aggregate = false;
bool enable_parallel_append = true;
bool enable_parallel_hash = true;
bool enable_partition_pruning = true;
+bool enable_async_append = true;
typedef struct
{
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index f7a8dae3c6..7a29086f83 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -81,6 +81,7 @@ static List *get_gating_quals(PlannerInfo *root, List *quals);
static Plan *create_gating_plan(PlannerInfo *root, Path *path, Plan *plan,
List *gating_quals);
static Plan *create_join_plan(PlannerInfo *root, JoinPath *best_path);
+static bool is_async_capable_path(Path *path);
static Plan *create_append_plan(PlannerInfo *root, AppendPath *best_path,
int flags);
static Plan *create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
@@ -1066,6 +1067,30 @@ create_join_plan(PlannerInfo *root, JoinPath *best_path)
return plan;
}
+/*
+ * is_async_capable_path
+ * Check whether a given Path node is async-capable.
+ */
+static bool
+is_async_capable_path(Path *path)
+{
+ switch (nodeTag(path))
+ {
+ case T_ForeignPath:
+ {
+ FdwRoutine *fdwroutine = path->parent->fdwroutine;
+
+ Assert(fdwroutine != NULL);
+ if (fdwroutine->IsForeignPathAsyncCapable != NULL &&
+ fdwroutine->IsForeignPathAsyncCapable((ForeignPath *) path))
+ return true;
+ }
+ default:
+ break;
+ }
+ return false;
+}
+
/*
* create_append_plan
* Create an Append plan for 'best_path' and (recursively) plans
@@ -1083,6 +1108,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
List *pathkeys = best_path->path.pathkeys;
List *subplans = NIL;
ListCell *subpaths;
+ int nasyncplans = 0;
RelOptInfo *rel = best_path->path.parent;
PartitionPruneInfo *partpruneinfo = NULL;
int nodenumsortkeys = 0;
@@ -1090,6 +1116,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
Oid *nodeSortOperators = NULL;
Oid *nodeCollations = NULL;
bool *nodeNullsFirst = NULL;
+ bool consider_async = false;
/*
* The subpaths list could be empty, if every child was proven empty by
@@ -1153,6 +1180,11 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
tlist_was_changed = (orig_tlist_length != list_length(plan->plan.targetlist));
}
+ /* If appropriate, consider async append */
+ consider_async = (enable_async_append && pathkeys == NIL &&
+ !best_path->path.parallel_safe &&
+ list_length(best_path->subpaths) > 1);
+
/* Build the plan for each child */
foreach(subpaths, best_path->subpaths)
{
@@ -1220,6 +1252,13 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
}
subplans = lappend(subplans, subplan);
+
+ /* Check to see if subplan can be executed asynchronously */
+ if (consider_async && is_async_capable_path(subpath))
+ {
+ subplan->async_capable = true;
+ ++nasyncplans;
+ }
}
/*
@@ -1254,6 +1293,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
}
plan->appendplans = subplans;
+ plan->nasyncplans = nasyncplans;
plan->first_partial_plan = best_path->first_partial_path;
plan->part_prune_info = partpruneinfo;
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 123369f4fa..d438e4cd17 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3919,6 +3919,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
switch (w)
{
+ case WAIT_EVENT_APPEND_READY:
+ event_name = "AppendReady";
+ break;
case WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE:
event_name = "BackupWaitWalArchive";
break;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 878fcc2236..a4d4b2027a 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1108,6 +1108,16 @@ static struct config_bool ConfigureNamesBool[] =
true,
NULL, NULL, NULL
},
+ {
+ {"enable_async_append", PGC_USERSET, QUERY_TUNING_METHOD,
+ gettext_noop("Enables the planner's use of async append plans."),
+ NULL,
+ GUC_EXPLAIN
+ },
+ &enable_async_append,
+ true,
+ NULL, NULL, NULL
+ },
{
{"geqo", PGC_USERSET, QUERY_TUNING_GEQO,
gettext_noop("Enables genetic query optimization."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 5298e18ecd..e5415772be 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -370,6 +370,7 @@
#enable_partitionwise_aggregate = off
#enable_parallel_hash = on
#enable_partition_pruning = on
+#enable_async_append = on
# - Planner Cost Constants -
diff --git a/src/include/executor/execAsync.h b/src/include/executor/execAsync.h
index e69de29bb2..f7275fd154 100644
--- a/src/include/executor/execAsync.h
+++ b/src/include/executor/execAsync.h
@@ -0,0 +1,23 @@
+/*-------------------------------------------------------------------------
+ * execAsync.h
+ * Support functions for asynchronous execution
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/include/executor/execAsync.h
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef EXECASYNC_H
+#define EXECASYNC_H
+
+#include "nodes/execnodes.h"
+
+extern void ExecAsyncRequest(AsyncRequest *areq);
+extern void ExecAsyncConfigureWait(AsyncRequest *areq);
+extern void ExecAsyncNotify(AsyncRequest *areq);
+extern void ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result);
+
+#endif /* EXECASYNC_H */
diff --git a/src/include/executor/nodeAppend.h b/src/include/executor/nodeAppend.h
index be222ebff6..3d36096304 100644
--- a/src/include/executor/nodeAppend.h
+++ b/src/include/executor/nodeAppend.h
@@ -25,4 +25,6 @@ extern void ExecAppendInitializeDSM(AppendState *node, ParallelContext *pcxt);
extern void ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt);
extern void ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt);
+extern void ExecAsyncAppendResponse(AsyncRequest *areq);
+
#endif /* NODEAPPEND_H */
diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h
index 326d713ebf..abd782a6f3 100644
--- a/src/include/executor/nodeForeignscan.h
+++ b/src/include/executor/nodeForeignscan.h
@@ -31,4 +31,8 @@ extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
ParallelWorkerContext *pwcxt);
extern void ExecShutdownForeignScan(ForeignScanState *node);
+extern void ExecAsyncForeignScanRequest(AsyncRequest *areq);
+extern void ExecAsyncForeignScanConfigureWait(AsyncRequest *areq);
+extern void ExecAsyncForeignScanNotify(AsyncRequest *areq);
+
#endif /* NODEFOREIGNSCAN_H */
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 95556dfb15..03cdfa12c1 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -170,6 +170,14 @@ typedef List *(*ReparameterizeForeignPathByChild_function) (PlannerInfo *root,
List *fdw_private,
RelOptInfo *child_rel);
+typedef bool (*IsForeignPathAsyncCapable_function) (ForeignPath *path);
+
+typedef void (*ForeignAsyncRequest_function) (AsyncRequest *areq);
+
+typedef void (*ForeignAsyncConfigureWait_function) (AsyncRequest *areq);
+
+typedef void (*ForeignAsyncNotify_function) (AsyncRequest *areq);
+
/*
* FdwRoutine is the struct returned by a foreign-data wrapper's handler
* function. It provides pointers to the callback functions needed by the
@@ -246,6 +254,12 @@ typedef struct FdwRoutine
/* Support functions for path reparameterization. */
ReparameterizeForeignPathByChild_function ReparameterizeForeignPathByChild;
+
+ /* Support functions for asynchronous execution */
+ IsForeignPathAsyncCapable_function IsForeignPathAsyncCapable;
+ ForeignAsyncRequest_function ForeignAsyncRequest;
+ ForeignAsyncConfigureWait_function ForeignAsyncConfigureWait;
+ ForeignAsyncNotify_function ForeignAsyncNotify;
} FdwRoutine;
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 61ba4c3666..6e2db12895 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -502,6 +502,22 @@ typedef struct ResultRelInfo
struct CopyMultiInsertBuffer *ri_CopyMultiInsertBuffer;
} ResultRelInfo;
+/* ----------------
+ * AsyncRequest
+ *
+ * State for an asynchronous tuple request.
+ * ----------------
+ */
+typedef struct AsyncRequest
+{
+ struct PlanState *requestor; /* Node that wants a tuple */
+ struct PlanState *requestee; /* Node from which a tuple is wanted */
+ int request_index; /* Scratch space for requestor */
+ bool callback_pending; /* Callback is needed */
+ bool request_complete; /* Request complete, result valid */
+ TupleTableSlot *result; /* Result (NULL if no more tuples) */
+} AsyncRequest;
+
/* ----------------
* EState information
*
@@ -1207,6 +1223,16 @@ struct AppendState
PlanState **appendplans; /* array of PlanStates for my inputs */
int as_nplans;
int as_whichplan;
+ bool as_syncdone; /* all synchronous plans done? */
+ Bitmapset *as_asyncplans; /* asynchronous plans indexes */
+ int as_nasyncplans; /* # of asynchronous plans */
+ AsyncRequest **as_asyncrequests; /* array of AsyncRequests */
+ TupleTableSlot **as_asyncresults; /* unreturned results of async plans */
+ int as_nasyncresults; /* # of valid entries in as_asyncresults */
+ int as_nasyncremain; /* # of remaining async plans */
+ Bitmapset *as_needrequest; /* async plans ready for a request */
+ struct WaitEventSet *as_eventset; /* WaitEventSet used to configure
+ * file descriptor wait events */
int as_first_partial_plan; /* Index of 'appendplans' containing
* the first partial plan */
ParallelAppendState *as_pstate; /* parallel coordination info */
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index 7e6b10f86b..6c5396e6a3 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -129,6 +129,11 @@ typedef struct Plan
bool parallel_aware; /* engage parallel-aware logic? */
bool parallel_safe; /* OK to use as part of parallel plan? */
+ /*
+ * information needed for asynchronous execution
+ */
+ bool async_capable; /* engage asynchronous-capable logic? */
+
/*
* Common structural data for all Plan types.
*/
@@ -245,6 +250,7 @@ typedef struct Append
Plan plan;
Bitmapset *apprelids; /* RTIs of appendrel(s) formed by this node */
List *appendplans;
+ int nasyncplans; /* # of asynchronous plans */
/*
* All 'appendplans' preceding this index are non-partial plans. All
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index 8e621d2f76..33bc133dd4 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -65,6 +65,7 @@ extern PGDLLIMPORT bool enable_partitionwise_aggregate;
extern PGDLLIMPORT bool enable_parallel_append;
extern PGDLLIMPORT bool enable_parallel_hash;
extern PGDLLIMPORT bool enable_partition_pruning;
+extern PGDLLIMPORT bool enable_async_append;
extern PGDLLIMPORT int constraint_exclusion;
extern double index_pages_fetched(double tuples_fetched, BlockNumber pages,
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 5954068dec..3249570a18 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -923,6 +923,7 @@ typedef enum
*/
typedef enum
{
+ WAIT_EVENT_APPEND_READY,
WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE = PG_WAIT_IPC,
WAIT_EVENT_BGWORKER_SHUTDOWN,
WAIT_EVENT_BGWORKER_STARTUP,
diff --git a/src/test/regress/expected/explain.out b/src/test/regress/expected/explain.out
index dc7ab2ce8b..e78ca7bddb 100644
--- a/src/test/regress/expected/explain.out
+++ b/src/test/regress/expected/explain.out
@@ -87,6 +87,7 @@ select explain_filter('explain (analyze, buffers, format json) select * from int
"Plan": { +
"Node Type": "Seq Scan", +
"Parallel Aware": false, +
+ "Async Capable": false, +
"Relation Name": "int8_tbl",+
"Alias": "i8", +
"Startup Cost": N.N, +
@@ -136,6 +137,7 @@ select explain_filter('explain (analyze, buffers, format xml) select * from int8
+
Seq Scan +
false +
+ false +
int8_tbl +
i8 +
N.N +
@@ -183,6 +185,7 @@ select explain_filter('explain (analyze, buffers, format yaml) select * from int
- Plan: +
Node Type: "Seq Scan" +
Parallel Aware: false +
+ Async Capable: false +
Relation Name: "int8_tbl"+
Alias: "i8" +
Startup Cost: N.N +
@@ -233,6 +236,7 @@ select explain_filter('explain (buffers, format json) select * from int8_tbl i8'
"Plan": { +
"Node Type": "Seq Scan", +
"Parallel Aware": false, +
+ "Async Capable": false, +
"Relation Name": "int8_tbl",+
"Alias": "i8", +
"Startup Cost": N.N, +
@@ -348,6 +352,7 @@ select jsonb_pretty(
"Actual Rows": 0, +
"Actual Loops": 0, +
"Startup Cost": 0.0, +
+ "Async Capable": false, +
"Relation Name": "tenk1", +
"Parallel Aware": true, +
"Local Hit Blocks": 0, +
@@ -393,6 +398,7 @@ select jsonb_pretty(
"Actual Rows": 0, +
"Actual Loops": 0, +
"Startup Cost": 0.0, +
+ "Async Capable": false, +
"Parallel Aware": false, +
"Sort Space Used": 0, +
"Local Hit Blocks": 0, +
@@ -435,6 +441,7 @@ select jsonb_pretty(
"Actual Rows": 0, +
"Actual Loops": 0, +
"Startup Cost": 0.0, +
+ "Async Capable": false, +
"Parallel Aware": false, +
"Workers Planned": 0, +
"Local Hit Blocks": 0, +
diff --git a/src/test/regress/expected/incremental_sort.out b/src/test/regress/expected/incremental_sort.out
index a8cbfd9f5f..af38f3b93c 100644
--- a/src/test/regress/expected/incremental_sort.out
+++ b/src/test/regress/expected/incremental_sort.out
@@ -558,6 +558,7 @@ select jsonb_pretty(explain_analyze_inc_sort_nodes_without_memory('select * from
"Node Type": "Incremental Sort", +
"Actual Rows": 55, +
"Actual Loops": 1, +
+ "Async Capable": false, +
"Presorted Key": [ +
"t.a" +
], +
@@ -734,6 +735,7 @@ select jsonb_pretty(explain_analyze_inc_sort_nodes_without_memory('select * from
"Node Type": "Incremental Sort", +
"Actual Rows": 70, +
"Actual Loops": 1, +
+ "Async Capable": false, +
"Presorted Key": [ +
"t.a" +
], +
diff --git a/src/test/regress/expected/insert_conflict.out b/src/test/regress/expected/insert_conflict.out
index ff157ceb1c..499245068a 100644
--- a/src/test/regress/expected/insert_conflict.out
+++ b/src/test/regress/expected/insert_conflict.out
@@ -204,6 +204,7 @@ explain (costs off, format json) insert into insertconflicttest values (0, 'Bilb
"Node Type": "ModifyTable", +
"Operation": "Insert", +
"Parallel Aware": false, +
+ "Async Capable": false, +
"Relation Name": "insertconflicttest", +
"Alias": "insertconflicttest", +
"Conflict Resolution": "UPDATE", +
@@ -213,7 +214,8 @@ explain (costs off, format json) insert into insertconflicttest values (0, 'Bilb
{ +
"Node Type": "Result", +
"Parent Relationship": "Member", +
- "Parallel Aware": false +
+ "Parallel Aware": false, +
+ "Async Capable": false +
} +
] +
} +
diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out
index 81bdacf59d..b7818c0637 100644
--- a/src/test/regress/expected/sysviews.out
+++ b/src/test/regress/expected/sysviews.out
@@ -88,6 +88,7 @@ select count(*) = 1 as ok from pg_stat_wal;
select name, setting from pg_settings where name like 'enable%';
name | setting
--------------------------------+---------
+ enable_async_append | on
enable_bitmapscan | on
enable_gathermerge | on
enable_hashagg | on
@@ -106,7 +107,7 @@ select name, setting from pg_settings where name like 'enable%';
enable_seqscan | on
enable_sort | on
enable_tidscan | on
-(18 rows)
+(19 rows)
-- Test that the pg_timezone_names and pg_timezone_abbrevs views are
-- more-or-less working. We can't test their contents in any great detail