From d935bbb70565d70f1b0f547bf37e71ffc6fa2ef2 Mon Sep 17 00:00:00 2001 From: "Andrey V. Lepikhov" Date: Tue, 29 Jun 2021 22:09:54 +0300 Subject: [PATCH] Choose async append subplans at the initial execution stage --- contrib/file_fdw/file_fdw.c | 3 +- .../postgres_fdw/expected/postgres_fdw.out | 81 ++++++++++++++++++- contrib/postgres_fdw/postgres_fdw.c | 13 +-- contrib/postgres_fdw/sql/postgres_fdw.sql | 29 +++++++ src/backend/commands/explain.c | 2 +- src/backend/executor/execAmi.c | 4 - src/backend/executor/nodeAppend.c | 27 ++++--- src/backend/executor/nodeForeignscan.c | 7 -- src/backend/nodes/copyfuncs.c | 1 - src/backend/nodes/outfuncs.c | 1 - src/backend/nodes/readfuncs.c | 1 - src/backend/optimizer/path/costsize.c | 1 - src/backend/optimizer/plan/createplan.c | 45 +---------- src/backend/utils/misc/guc.c | 1 + src/include/executor/nodeAppend.h | 2 + src/include/nodes/plannodes.h | 1 - src/include/optimizer/cost.h | 1 - src/include/optimizer/planmain.h | 2 +- 18 files changed, 141 insertions(+), 81 deletions(-) diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c index 2c2f149fb0..5f67e1ca94 100644 --- a/contrib/file_fdw/file_fdw.c +++ b/contrib/file_fdw/file_fdw.c @@ -609,7 +609,8 @@ fileGetForeignPlan(PlannerInfo *root, best_path->fdw_private, NIL, /* no custom tlist */ NIL, /* no remote quals */ - outer_plan); + outer_plan, + false); } /* diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 31b5de91ad..30c38c6992 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -10169,7 +10169,7 @@ EXECUTE async_pt_query (2000, 505); Insert on public.result_tbl -> Append Subplans Removed: 2 - -> Async Foreign Scan on public.async_p1 async_pt_1 + -> Foreign Scan on public.async_p1 async_pt_1 Output: async_pt_1.a, async_pt_1.b, async_pt_1.c Filter: (async_pt_1.b === $2) Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < $1::integer)) @@ -10237,6 +10237,85 @@ SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c 2505 | 505 | bar | 2505 | 505 | 0505 (1 row) +-- Subquery flattening must be done before choosing of async plans. +EXPLAIN (VERBOSE, COSTS OFF) +(SELECT * FROM async_p1 LIMIT 1) + UNION ALL +(SELECT * FROM async_p2 WHERE a < 5) + UNION ALL +(SELECT * FROM async_p2) + UNION ALL +(SELECT * FROM async_p3 LIMIT 3); + QUERY PLAN +-------------------------------------------------------------------------- + Append + -> Async Foreign Scan on public.async_p1 + Output: async_p1.a, async_p1.b, async_p1.c + Remote SQL: SELECT a, b, c FROM public.base_tbl1 LIMIT 1::bigint + -> Async Foreign Scan on public.async_p2 async_p2_1 + Output: async_p2_1.a, async_p2_1.b, async_p2_1.c + Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE ((a < 5)) + -> Async Foreign Scan on public.async_p2 + Output: async_p2.a, async_p2.b, async_p2.c + Remote SQL: SELECT a, b, c FROM public.base_tbl2 + -> Limit + Output: async_p3.a, async_p3.b, async_p3.c + -> Seq Scan on public.async_p3 + Output: async_p3.a, async_p3.b, async_p3.c +(14 rows) + +-- Check that async append doesn't break the scrollable cursors logic: +-- If the query plan doesn't support backward scan, a materialize node will be +-- inserted in the head of this plan. +BEGIN; +EXPLAIN (COSTS OFF) +DECLARE curs1 SCROLL CURSOR FOR (SELECT * FROM async_p3); + QUERY PLAN +---------------------- + Seq Scan on async_p3 +(1 row) + +EXPLAIN (COSTS OFF) +DECLARE curs1 SCROLL CURSOR FOR (SELECT * FROM async_pt); + QUERY PLAN +------------------------------------------------------- + Materialize + -> Append + -> Async Foreign Scan on async_p1 async_pt_1 + -> Async Foreign Scan on async_p2 async_pt_2 + -> Seq Scan on async_p3 async_pt_3 +(5 rows) + +EXPLAIN (COSTS OFF) +DECLARE curs1 NO SCROLL CURSOR FOR (SELECT * FROM async_p1); + QUERY PLAN +-------------------------- + Foreign Scan on async_p1 +(1 row) + +EXPLAIN (VERBOSE, COSTS OFF) +DECLARE curs2 SCROLL CURSOR FOR + (SELECT * FROM async_p1) + UNION ALL + (SELECT * FROM async_p2 WHERE a < 5) + UNION ALL + (SELECT * FROM async_p3); + QUERY PLAN +-------------------------------------------------------------------------------- + Materialize + Output: async_p1.a, async_p1.b, async_p1.c + -> Append + -> Async Foreign Scan on public.async_p1 + Output: async_p1.a, async_p1.b, async_p1.c + Remote SQL: SELECT a, b, c FROM public.base_tbl1 + -> Async Foreign Scan on public.async_p2 + Output: async_p2.a, async_p2.b, async_p2.c + Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE ((a < 5)) + -> Seq Scan on public.async_p3 + Output: async_p3.a, async_p3.b, async_p3.c +(11 rows) + +ROLLBACK; ALTER FOREIGN TABLE async_p1 OPTIONS (DROP use_remote_estimate); ALTER FOREIGN TABLE async_p2 OPTIONS (DROP use_remote_estimate); DROP TABLE local_tbl; diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index fafbab6b02..6a52859f5e 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -163,9 +163,6 @@ typedef struct PgFdwScanState int fetch_ct_2; /* Min(# of fetches done, 2) */ bool eof_reached; /* true if last fetch reached EOF */ - /* for asynchronous execution */ - bool async_capable; /* engage asynchronous-capable logic? */ - /* working memory contexts */ MemoryContext batch_cxt; /* context holding current batch of tuples */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ @@ -1436,7 +1433,8 @@ postgresGetForeignPlan(PlannerInfo *root, fdw_private, fdw_scan_tlist, fdw_recheck_quals, - outer_plan); + outer_plan, + fpinfo->async_capable); } /* @@ -1591,9 +1589,6 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) &fsstate->param_flinfo, &fsstate->param_exprs, &fsstate->param_values); - - /* Set the async-capable flag */ - fsstate->async_capable = node->ss.ps.async_capable; } /* @@ -1622,7 +1617,7 @@ postgresIterateForeignScan(ForeignScanState *node) if (fsstate->next_tuple >= fsstate->num_tuples) { /* In async mode, just clear tuple slot. */ - if (fsstate->async_capable) + if (node->ss.ps.async_capable) return ExecClearTuple(slot); /* No point in another fetch if we already detected EOF, though. */ if (!fsstate->eof_reached) @@ -3781,7 +3776,7 @@ fetch_more_data(ForeignScanState *node) int numrows; int i; - if (fsstate->async_capable) + if (node->ss.ps.async_capable) { Assert(fsstate->conn_state->pendingAreq); diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 286dd99573..1c1c152bcf 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -3252,6 +3252,35 @@ EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar'; SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar'; +-- Subquery flattening must be done before choosing of async plans. +EXPLAIN (VERBOSE, COSTS OFF) +(SELECT * FROM async_p1 LIMIT 1) + UNION ALL +(SELECT * FROM async_p2 WHERE a < 5) + UNION ALL +(SELECT * FROM async_p2) + UNION ALL +(SELECT * FROM async_p3 LIMIT 3); + +-- Check that async append doesn't break the scrollable cursors logic: +-- If the query plan doesn't support backward scan, a materialize node will be +-- inserted in the head of this plan. +BEGIN; +EXPLAIN (COSTS OFF) +DECLARE curs1 SCROLL CURSOR FOR (SELECT * FROM async_p3); +EXPLAIN (COSTS OFF) +DECLARE curs1 SCROLL CURSOR FOR (SELECT * FROM async_pt); +EXPLAIN (COSTS OFF) +DECLARE curs1 NO SCROLL CURSOR FOR (SELECT * FROM async_p1); +EXPLAIN (VERBOSE, COSTS OFF) +DECLARE curs2 SCROLL CURSOR FOR + (SELECT * FROM async_p1) + UNION ALL + (SELECT * FROM async_p2 WHERE a < 5) + UNION ALL + (SELECT * FROM async_p3); +ROLLBACK; + ALTER FOREIGN TABLE async_p1 OPTIONS (DROP use_remote_estimate); ALTER FOREIGN TABLE async_p2 OPTIONS (DROP use_remote_estimate); diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index e81b990092..6c7f8e9d9f 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -1411,7 +1411,7 @@ ExplainNode(PlanState *planstate, List *ancestors, } if (plan->parallel_aware) appendStringInfoString(es->str, "Parallel "); - if (plan->async_capable) + if (planstate->async_capable) appendStringInfoString(es->str, "Async "); appendStringInfoString(es->str, pname); es->indent++; diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c index 10f0b349b5..ddeb028cf1 100644 --- a/src/backend/executor/execAmi.c +++ b/src/backend/executor/execAmi.c @@ -537,10 +537,6 @@ ExecSupportsBackwardScan(Plan *node) { ListCell *l; - /* With async, tuples may be interleaved, so can't back up. */ - if (((Append *) node)->nasyncplans > 0) - return false; - foreach(l, ((Append *) node)->appendplans) { if (!ExecSupportsBackwardScan((Plan *) lfirst(l))) diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c index 755c1392f0..0f2148e097 100644 --- a/src/backend/executor/nodeAppend.c +++ b/src/backend/executor/nodeAppend.c @@ -83,6 +83,8 @@ struct ParallelAppendState #define INVALID_SUBPLAN_INDEX -1 #define EVENT_BUFFER_SIZE 16 +bool enable_async_append = true; + static TupleTableSlot *ExecAppend(PlanState *pstate); static bool choose_next_subplan_locally(AppendState *node); static bool choose_next_subplan_for_leader(AppendState *node); @@ -117,6 +119,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags) int firstvalid; int i, j; + bool consider_async; /* check for unsupported flags */ Assert(!(eflags & EXEC_FLAG_MARK)); @@ -197,6 +200,8 @@ ExecInitAppend(Append *node, EState *estate, int eflags) appendplanstates = (PlanState **) palloc(nplans * sizeof(PlanState *)); + consider_async = (enable_async_append && !node->plan.parallel_safe && + bms_num_members(validsubplans) > 1); /* * call ExecInitNode on each of the valid plans to be executed and save * the results into the appendplanstates array. @@ -212,24 +217,28 @@ ExecInitAppend(Append *node, EState *estate, int eflags) { Plan *initNode = (Plan *) list_nth(node->appendplans, i); + /* + * Record the lowest appendplans index which is a valid partial plan. + */ + if (i >= node->first_partial_plan && j < firstvalid) + firstvalid = j; + + appendplanstates[j] = ExecInitNode(initNode, estate, eflags); + /* * Record async subplans. When executing EvalPlanQual, we treat them * as sync ones; don't do this when initializing an EvalPlanQual plan * tree. */ - if (initNode->async_capable && estate->es_epq_active == NULL) + if (consider_async && initNode->async_capable && + estate->es_epq_active == NULL) { asyncplans = bms_add_member(asyncplans, j); nasyncplans++; + appendplanstates[j++]->async_capable = true; } - - /* - * Record the lowest appendplans index which is a valid partial plan. - */ - if (i >= node->first_partial_plan && j < firstvalid) - firstvalid = j; - - appendplanstates[j++] = ExecInitNode(initNode, estate, eflags); + else + appendplanstates[j++]->async_capable = false; } appendstate->as_first_partial_plan = firstvalid; diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c index 9dc38d47ea..898890fb08 100644 --- a/src/backend/executor/nodeForeignscan.c +++ b/src/backend/executor/nodeForeignscan.c @@ -209,13 +209,6 @@ ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags) scanstate->fdw_recheck_quals = ExecInitQual(node->fdw_recheck_quals, (PlanState *) scanstate); - /* - * Determine whether to scan the foreign relation asynchronously or not; - * this has to be kept in sync with the code in ExecInitAppend(). - */ - scanstate->ss.ps.async_capable = (((Plan *) node)->async_capable && - estate->es_epq_active == NULL); - /* * Initialize FDW-related state. */ diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index bd87f23784..aca4a7cce4 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -243,7 +243,6 @@ _copyAppend(const Append *from) */ COPY_BITMAPSET_FIELD(apprelids); COPY_NODE_FIELD(appendplans); - COPY_SCALAR_FIELD(nasyncplans); COPY_SCALAR_FIELD(first_partial_plan); COPY_NODE_FIELD(part_prune_info); diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index e32b92e299..8e72c1333f 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -433,7 +433,6 @@ _outAppend(StringInfo str, const Append *node) WRITE_BITMAPSET_FIELD(apprelids); WRITE_NODE_FIELD(appendplans); - WRITE_INT_FIELD(nasyncplans); WRITE_INT_FIELD(first_partial_plan); WRITE_NODE_FIELD(part_prune_info); } diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index f0b34ecfac..a2aafcd2ce 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -1717,7 +1717,6 @@ _readAppend(void) READ_BITMAPSET_FIELD(apprelids); READ_NODE_FIELD(appendplans); - READ_INT_FIELD(nasyncplans); READ_INT_FIELD(first_partial_plan); READ_NODE_FIELD(part_prune_info); diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 8577c7b138..21e0dd0049 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -149,7 +149,6 @@ bool enable_partitionwise_aggregate = false; bool enable_parallel_append = true; bool enable_parallel_hash = true; bool enable_partition_pruning = true; -bool enable_async_append = true; typedef struct { diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 439e6b6426..f2baa58269 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -82,7 +82,6 @@ static List *get_gating_quals(PlannerInfo *root, List *quals); static Plan *create_gating_plan(PlannerInfo *root, Path *path, Plan *plan, List *gating_quals); static Plan *create_join_plan(PlannerInfo *root, JoinPath *best_path); -static bool is_async_capable_path(Path *path); static Plan *create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags); static Plan *create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path, @@ -1093,31 +1092,6 @@ create_join_plan(PlannerInfo *root, JoinPath *best_path) return plan; } -/* - * is_async_capable_path - * Check whether a given Path node is async-capable. - */ -static bool -is_async_capable_path(Path *path) -{ - switch (nodeTag(path)) - { - case T_ForeignPath: - { - FdwRoutine *fdwroutine = path->parent->fdwroutine; - - Assert(fdwroutine != NULL); - if (fdwroutine->IsForeignPathAsyncCapable != NULL && - fdwroutine->IsForeignPathAsyncCapable((ForeignPath *) path)) - return true; - } - break; - default: - break; - } - return false; -} - /* * create_append_plan * Create an Append plan for 'best_path' and (recursively) plans @@ -1135,7 +1109,6 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) List *pathkeys = best_path->path.pathkeys; List *subplans = NIL; ListCell *subpaths; - int nasyncplans = 0; RelOptInfo *rel = best_path->path.parent; PartitionPruneInfo *partpruneinfo = NULL; int nodenumsortkeys = 0; @@ -1143,7 +1116,6 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) Oid *nodeSortOperators = NULL; Oid *nodeCollations = NULL; bool *nodeNullsFirst = NULL; - bool consider_async = false; /* * The subpaths list could be empty, if every child was proven empty by @@ -1207,11 +1179,6 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) tlist_was_changed = (orig_tlist_length != list_length(plan->plan.targetlist)); } - /* If appropriate, consider async append */ - consider_async = (enable_async_append && pathkeys == NIL && - !best_path->path.parallel_safe && - list_length(best_path->subpaths) > 1); - /* Build the plan for each child */ foreach(subpaths, best_path->subpaths) { @@ -1279,13 +1246,6 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) } subplans = lappend(subplans, subplan); - - /* Check to see if subplan can be executed asynchronously */ - if (consider_async && is_async_capable_path(subpath)) - { - subplan->async_capable = true; - ++nasyncplans; - } } /* @@ -1318,7 +1278,6 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) } plan->appendplans = subplans; - plan->nasyncplans = nasyncplans; plan->first_partial_plan = best_path->first_partial_path; plan->part_prune_info = partpruneinfo; @@ -5685,7 +5644,8 @@ make_foreignscan(List *qptlist, List *fdw_private, List *fdw_scan_tlist, List *fdw_recheck_quals, - Plan *outer_plan) + Plan *outer_plan, + bool async_capable) { ForeignScan *node = makeNode(ForeignScan); Plan *plan = &node->scan.plan; @@ -5695,6 +5655,7 @@ make_foreignscan(List *qptlist, plan->qual = qpqual; plan->lefttree = outer_plan; plan->righttree = NULL; + plan->async_capable = async_capable; /* set support of async opts */ node->scan.scanrelid = scanrelid; /* these may be overridden by the FDW's PlanDirectModify callback. */ diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 480e8cd199..b14fe74050 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -51,6 +51,7 @@ #include "commands/vacuum.h" #include "commands/variable.h" #include "common/string.h" +#include "executor/nodeAppend.h" #include "funcapi.h" #include "jit/jit.h" #include "libpq/auth.h" diff --git a/src/include/executor/nodeAppend.h b/src/include/executor/nodeAppend.h index fa54ac6ad2..1831d6e021 100644 --- a/src/include/executor/nodeAppend.h +++ b/src/include/executor/nodeAppend.h @@ -17,6 +17,8 @@ #include "access/parallel.h" #include "nodes/execnodes.h" +extern PGDLLIMPORT bool enable_async_append; + extern AppendState *ExecInitAppend(Append *node, EState *estate, int eflags); extern void ExecEndAppend(AppendState *node); extern void ExecReScanAppend(AppendState *node); diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index aaa3b65d04..4d7595a7b1 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -251,7 +251,6 @@ typedef struct Append Plan plan; Bitmapset *apprelids; /* RTIs of appendrel(s) formed by this node */ List *appendplans; - int nasyncplans; /* # of asynchronous plans */ /* * All 'appendplans' preceding this index are non-partial plans. All diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index 0fe60d82e4..67f925e793 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -66,7 +66,6 @@ extern PGDLLIMPORT bool enable_partitionwise_aggregate; extern PGDLLIMPORT bool enable_parallel_append; extern PGDLLIMPORT bool enable_parallel_hash; extern PGDLLIMPORT bool enable_partition_pruning; -extern PGDLLIMPORT bool enable_async_append; extern PGDLLIMPORT int constraint_exclusion; extern double index_pages_fetched(double tuples_fetched, BlockNumber pages, diff --git a/src/include/optimizer/planmain.h b/src/include/optimizer/planmain.h index bf1adfc52a..710503e501 100644 --- a/src/include/optimizer/planmain.h +++ b/src/include/optimizer/planmain.h @@ -42,7 +42,7 @@ extern Plan *create_plan(PlannerInfo *root, Path *best_path); extern ForeignScan *make_foreignscan(List *qptlist, List *qpqual, Index scanrelid, List *fdw_exprs, List *fdw_private, List *fdw_scan_tlist, List *fdw_recheck_quals, - Plan *outer_plan); + Plan *outer_plan, bool async_capable); extern Plan *change_plan_targetlist(Plan *subplan, List *tlist, bool tlist_parallel_safe); extern Plan *materialize_finished_plan(Plan *subplan); -- 2.31.1