diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index aeda826..c6044e1 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -3632,6 +3632,20 @@ ANY num_sync (
+ enable_parallel_append (boolean)
+
+ enable_parallel_append> configuration parameter
+
+
+
+
+ Enables or disables the query planner's use of parallel-aware
+ append plan types. The default is on>.
+
+
+
+
enable_partition_wise_join (boolean)
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 6f82033..12a8635 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1117,7 +1117,12 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
Waiting for TBM shared iterator lock.
- Lock
+ parallel_append
+ Waiting to choose the next subplan during Parallel Append plan
+ execution.
+
+
+ Lock
relation
Waiting to acquire a lock on a relation.
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 1b477ba..1445dd4 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -25,6 +25,7 @@
#include "executor/execParallel.h"
#include "executor/executor.h"
+#include "executor/nodeAppend.h"
#include "executor/nodeBitmapHeapscan.h"
#include "executor/nodeCustom.h"
#include "executor/nodeForeignscan.h"
@@ -244,6 +245,11 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
ExecForeignScanEstimate((ForeignScanState *) planstate,
e->pcxt);
break;
+ case T_AppendState:
+ if (planstate->plan->parallel_aware)
+ ExecAppendEstimate((AppendState *) planstate,
+ e->pcxt);
+ break;
case T_CustomScanState:
if (planstate->plan->parallel_aware)
ExecCustomScanEstimate((CustomScanState *) planstate,
@@ -316,6 +322,11 @@ ExecParallelInitializeDSM(PlanState *planstate,
ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
d->pcxt);
break;
+ case T_AppendState:
+ if (planstate->plan->parallel_aware)
+ ExecAppendInitializeDSM((AppendState *) planstate,
+ d->pcxt);
+ break;
case T_CustomScanState:
if (planstate->plan->parallel_aware)
ExecCustomScanInitializeDSM((CustomScanState *) planstate,
@@ -699,6 +710,10 @@ ExecParallelReInitializeDSM(PlanState *planstate,
ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
pcxt);
break;
+ case T_AppendState:
+ if (planstate->plan->parallel_aware)
+ ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
+ break;
case T_SortState:
/* even when not parallel-aware */
ExecSortReInitializeDSM((SortState *) planstate, pcxt);
@@ -969,6 +984,10 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
toc);
break;
+ case T_AppendState:
+ if (planstate->plan->parallel_aware)
+ ExecAppendInitializeWorker((AppendState *) planstate, toc);
+ break;
case T_CustomScanState:
if (planstate->plan->parallel_aware)
ExecCustomScanInitializeWorker((CustomScanState *) planstate,
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index bed9bb8..fafcccf 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -60,10 +60,43 @@
#include "executor/execdebug.h"
#include "executor/nodeAppend.h"
#include "miscadmin.h"
+#include "optimizer/cost.h"
+#include "storage/spin.h"
-static TupleTableSlot *ExecAppend(PlanState *pstate);
-static bool exec_append_initialize_next(AppendState *appendstate);
+/*
+ * Shared state for Parallel Append.
+ *
+ * Each backend participating in a Parallel Append has its own
+ * descriptor in backend-private memory, and those objects all contain
+ * a pointer to this structure.
+ */
+typedef struct ParallelAppendDescData
+{
+ LWLock pa_lock; /* mutual exclusion to choose next subplan */
+ int pa_next_plan; /* next plan to choose by any worker */
+
+ /*
+ * pa_finished: workers currently executing the subplan. A worker which
+ * finishes a subplan should set pa_finished to true, so that no new worker
+ * picks this subplan. For non-partial subplan, a worker which picks up
+ * that subplan should immediately set to true, so as to make sure there
+ * are no more than 1 worker assigned to this subplan.
+ */
+ bool pa_finished[FLEXIBLE_ARRAY_MEMBER];
+} ParallelAppendDescData;
+typedef ParallelAppendDescData *ParallelAppendDesc;
+
+/*
+ * Special value of AppendState->as_whichplan for Parallel Append, which
+ * indicates there are no plans left to be executed.
+ */
+#define PA_INVALID_PLAN -1
+
+static TupleTableSlot *ExecAppend(PlanState *pstate);
+static bool exec_append_seq_next(AppendState *appendstate);
+static bool exec_append_parallel_next(AppendState *state);
+static bool exec_append_leader_next(AppendState *state);
/* ----------------------------------------------------------------
* exec_append_initialize_next
@@ -74,11 +107,20 @@ static bool exec_append_initialize_next(AppendState *appendstate);
* ----------------------------------------------------------------
*/
static bool
-exec_append_initialize_next(AppendState *appendstate)
+exec_append_seq_next(AppendState *appendstate)
{
int whichplan;
/*
+ * Not parallel-aware. Fine, just go on to the next subplan in the
+ * appropriate direction.
+ */
+ if (ScanDirectionIsForward(appendstate->ps.state->es_direction))
+ appendstate->as_whichplan++;
+ else
+ appendstate->as_whichplan--;
+
+ /*
* get information from the append node
*/
whichplan = appendstate->as_whichplan;
@@ -185,10 +227,10 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
appendstate->ps.ps_ProjInfo = NULL;
/*
- * initialize to scan first subplan
+ * Initialize to scan first subplan (but note that we'll override this
+ * later in the case of a parallel append).
*/
appendstate->as_whichplan = 0;
- exec_append_initialize_next(appendstate);
return appendstate;
}
@@ -204,6 +246,16 @@ ExecAppend(PlanState *pstate)
{
AppendState *node = castNode(AppendState, pstate);
+ /*
+ * If this is the first time we are executing a Parallel Append node,
+ * we need to choose a subplan first.
+ */
+ if (node->as_padesc && node->as_whichplan == PA_INVALID_PLAN)
+ {
+ if (!exec_append_parallel_next(node))
+ return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ }
+
for (;;)
{
PlanState *subnode;
@@ -232,16 +284,34 @@ ExecAppend(PlanState *pstate)
}
/*
- * Go on to the "next" subplan in the appropriate direction. If no
- * more subplans, return the empty slot set up for us by
- * ExecInitAppend.
+ * Go on to the "next" subplan. If no more subplans, return the empty
+ * slot set up for us by ExecInitAppend.
*/
- if (ScanDirectionIsForward(node->ps.state->es_direction))
- node->as_whichplan++;
+ if (!node->as_padesc)
+ {
+ if (!exec_append_seq_next(node))
+ return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ }
else
- node->as_whichplan--;
- if (!exec_append_initialize_next(node))
- return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ {
+ /*
+ * Parallel-aware Append follows different logic for choosing the
+ * next subplan.
+ */
+
+ /*
+ * We are done with this subplan. There might be other workers
+ * still processing the last chunk of rows for this same subplan,
+ * but there's no point for new workers to run this subplan, so
+ * mark this subplan as finished.
+ */
+ LWLockAcquire(&node->as_padesc->pa_lock, LW_EXCLUSIVE);
+ node->as_padesc->pa_finished[node->as_whichplan] = true;
+ LWLockRelease(&node->as_padesc->pa_lock);
+
+ if (!exec_append_parallel_next(node))
+ return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ }
/* Else loop back and try to get a tuple from the new subplan */
}
@@ -299,5 +369,265 @@ ExecReScanAppend(AppendState *node)
ExecReScan(subnode);
}
node->as_whichplan = 0;
- exec_append_initialize_next(node);
+}
+
+/* ----------------------------------------------------------------
+ * Parallel Append Support
+ * ----------------------------------------------------------------
+ */
+
+/* ----------------------------------------------------------------
+ * ExecAppendEstimate
+ *
+ * estimates the space required to serialize Append node.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAppendEstimate(AppendState *node,
+ ParallelContext *pcxt)
+{
+ node->pappend_len =
+ add_size(offsetof(struct ParallelAppendDescData, pa_finished),
+ sizeof(bool) * node->as_nplans);
+
+ shm_toc_estimate_chunk(&pcxt->estimator, node->pappend_len);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+
+/* ----------------------------------------------------------------
+ * ExecAppendInitializeDSM
+ *
+ * Set up a Parallel Append descriptor.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAppendInitializeDSM(AppendState *node,
+ ParallelContext *pcxt)
+{
+ ParallelAppendDesc padesc;
+
+ padesc = shm_toc_allocate(pcxt->toc, node->pappend_len);
+
+ /*
+ * Just setting all the fields to 0 is enough. The logic of choosing the
+ * next plan in workers will take care of everything else.
+ */
+ memset(padesc, 0, sizeof(ParallelAppendDescData));
+ memset(padesc->pa_finished, 0, sizeof(bool) * node->as_nplans);
+
+ LWLockInitialize(&padesc->pa_lock, LWTRANCHE_PARALLEL_APPEND);
+
+ node->as_padesc = padesc;
+ node->as_whichplan = PA_INVALID_PLAN;
+
+ shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, padesc);
+}
+
+/* ----------------------------------------------------------------
+ * ExecAppendReInitializeDSM
+ *
+ * Reset shared state before beginning a fresh scan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
+{
+ ParallelAppendDesc padesc = node->as_padesc;
+
+ padesc->pa_next_plan = 0;
+ memset(padesc->pa_finished, 0, sizeof(bool) * node->as_nplans);
+ node->as_whichplan = PA_INVALID_PLAN;
+}
+
+/* ----------------------------------------------------------------
+ * ExecAppendInitializeWorker
+ *
+ * Copy relevant information from TOC into planstate, and initialize
+ * whatever is required to choose and execute the optimal subplan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAppendInitializeWorker(AppendState *node, shm_toc *toc)
+{
+ node->as_padesc = shm_toc_lookup(toc, node->ps.plan->plan_node_id, false);
+ node->as_whichplan = PA_INVALID_PLAN;
+ node->is_parallel_worker = true;
+}
+
+/* ----------------------------------------------------------------
+ * exec_append_parallel_next
+ *
+ * Determine the next subplan that should be executed. Each worker uses a
+ * shared field 'pa_next_plan' to start looking for unfinished plan,
+ * executes the subplan, then shifts ahead this field to the next
+ * subplan, so that other workers know which next plan to choose. This
+ * way, workers choose the subplans in round robin order, and thus they
+ * get evenly distributed among the subplans.
+ *
+ * Returns false if and only if all subplans are already finished
+ * processing.
+ * ----------------------------------------------------------------
+ */
+static bool
+exec_append_parallel_next(AppendState *state)
+{
+ ParallelAppendDesc padesc = state->as_padesc;
+ int whichplan;
+ int nextplan;
+ int initial_plan;
+ int first_partial_plan = ((Append *)state->ps.plan)->first_partial_plan;
+ bool found;
+
+ Assert(padesc != NULL);
+
+ /* Backward scan is not supported by parallel-aware plans */
+ Assert(ScanDirectionIsForward(state->ps.state->es_direction));
+
+ /* The parallel leader chooses its next subplan differently */
+ if (!state->is_parallel_worker)
+ return exec_append_leader_next(state);
+
+ LWLockAcquire(&padesc->pa_lock, LW_EXCLUSIVE);
+
+ /* If all the plans are already done, we have nothing to do */
+ if (padesc->pa_next_plan == PA_INVALID_PLAN)
+ {
+ LWLockRelease(&padesc->pa_lock);
+ return false;
+ }
+
+ /* Make a note of which subplan we have started with */
+ initial_plan = nextplan = padesc->pa_next_plan;
+
+ /*
+ * Keep going to the next plan until we find an unfinished one or we made a
+ * full circle. Finished ones also include non-partial subplans which are
+ * already taken by a worker.
+ */
+ do
+ {
+ whichplan = nextplan;
+ /*
+ * Either go to the next plan, or if we are at the last plan, wrap
+ * around to the first partial one. We don't have to go back to the
+ * non-partial plans. Due to the round-robin traversal, the fact that
+ * we are wrapping around means that all the non-partial plans are
+ * already taken.
+ */
+ if (whichplan + 1 == state->as_nplans)
+ {
+ nextplan = first_partial_plan;
+ /*
+ * If we had started from a non-partial plan, that means we have
+ * searched all the nonpartial and partial plans.
+ */
+ if (initial_plan <= first_partial_plan)
+ break;
+ }
+ else
+ {
+ nextplan = whichplan + 1;
+
+ /* Have we made a full circle ? */
+ if (nextplan == initial_plan)
+ break;
+ }
+ } while (padesc->pa_finished[whichplan]);
+
+ /*
+ * Note: There is a chance that just after the child plan node is chosen
+ * above, some other worker finishes this node and sets pa_finished to
+ * true. In that case, this worker will go ahead and call
+ * ExecProcNode(child_node), which will return NULL tuple since it is
+ * already finished, and then once again this worker will try to choose
+ * next subplan; but this is ok : it's just an extra "choose_next_subplan"
+ * operation.
+ */
+
+ Assert(0 <= whichplan && whichplan < state->as_nplans);
+ found = !padesc->pa_finished[whichplan];
+
+ /* If we found no plans, indicate the same to other workers */
+ if (!found)
+ state->as_whichplan = padesc->pa_next_plan = PA_INVALID_PLAN;
+ else
+ {
+ /* Set the chosen plan */
+ state->as_whichplan = whichplan;
+
+ /* If this a non-partial plan, immediately mark it finished */
+ if (whichplan < first_partial_plan)
+ padesc->pa_finished[whichplan] = true;
+
+ /*
+ * nextplan can be state->as_nplans if we wrapped around to the first
+ * partial plan but there were no partial plans.
+ */
+ padesc->pa_next_plan = (nextplan == state->as_nplans ?
+ PA_INVALID_PLAN : nextplan);
+ }
+
+ LWLockRelease(&padesc->pa_lock);
+
+ return found;
+}
+
+/* ----------------------------------------------------------------
+ * exec_append_leader_next
+ *
+ * To be used only if it's a parallel leader.
+ * With more workers, the leader is known to do more work servicing the
+ * worker tuple queue, and less work contributing to parallel processing.
+ * Hence, it should not take expensive plans, otherwise it will affect the
+ * total time to finish Append. Since we have non-partial plans sorted in
+ * descending cost, let the leader scan backwards from the last plan, i.e.
+ * the cheapest plan.
+ * ----------------------------------------------------------------
+ */
+static bool
+exec_append_leader_next(AppendState *state)
+{
+ ParallelAppendDesc padesc = state->as_padesc;
+ int whichplan;
+ int worker_next_plan;
+ bool found = false;
+
+ LWLockAcquire(&padesc->pa_lock, LW_EXCLUSIVE);
+
+ worker_next_plan = padesc->pa_next_plan;
+
+ if (worker_next_plan != PA_INVALID_PLAN)
+ {
+ int first_partial_plan = ((Append *)state->ps.plan)->first_partial_plan;
+ bool nonpartial_plans_finished = (worker_next_plan >= first_partial_plan);
+
+ /* The parallel leader should start from the last subplan. */
+ for (whichplan = state->as_nplans - 1; whichplan >= 0; whichplan--)
+ {
+ if (!padesc->pa_finished[whichplan])
+ {
+ found = true;
+ /* If this a non-partial plan, immediately mark it finished */
+ if (whichplan < first_partial_plan)
+ padesc->pa_finished[whichplan] = true;
+ break;
+ }
+
+ /*
+ * If we are into non-partial plans but they are already done, no
+ * point in going back further.
+ */
+ if (whichplan < first_partial_plan && nonpartial_plans_finished)
+ break;
+ }
+
+ }
+
+ LWLockRelease(&padesc->pa_lock);
+
+ state->as_whichplan = (found ? whichplan : PA_INVALID_PLAN);
+
+ /* Return false only if we didn't find any plan to execute */
+ return found;
}
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index c1a83ca..5852484 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -242,6 +242,7 @@ _copyAppend(const Append *from)
*/
COPY_NODE_FIELD(partitioned_rels);
COPY_NODE_FIELD(appendplans);
+ COPY_SCALAR_FIELD(first_partial_plan);
return newnode;
}
diff --git a/src/backend/nodes/list.c b/src/backend/nodes/list.c
index acaf4b5..75761a9 100644
--- a/src/backend/nodes/list.c
+++ b/src/backend/nodes/list.c
@@ -1250,6 +1250,45 @@ list_copy_tail(const List *oldlist, int nskip)
}
/*
+ * Sort a list using qsort. A sorted list is built but the cells of the original
+ * list are re-used. Caller has to pass a copy of the list if the original list
+ * needs to be untouched. Effectively, the comparator function is passed
+ * pointers to ListCell* pointers.
+ */
+List *
+list_qsort(const List *list, list_qsort_comparator cmp)
+{
+ ListCell *cell;
+ int i;
+ int len = list_length(list);
+ ListCell **list_arr;
+ List *new_list;
+
+ if (len == 0)
+ return NIL;
+
+ i = 0;
+ list_arr = palloc(sizeof(ListCell *) * len);
+ foreach(cell, list)
+ list_arr[i++] = cell;
+
+ qsort(list_arr, len, sizeof(ListCell *), cmp);
+
+ new_list = (List *) palloc(sizeof(List));
+ new_list->type = T_List;
+ new_list->length = len;
+ new_list->head = list_arr[0];
+ new_list->tail = list_arr[len-1];
+
+ for (i = 0; i < len-1; i++)
+ list_arr[i]->next = list_arr[i+1];
+
+ list_arr[len-1]->next = NULL;
+ pfree(list_arr);
+ return new_list;
+}
+
+/*
* Temporary compatibility functions
*
* In order to avoid warnings for these function definitions, we need
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index 43d6206..3236d58 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -399,6 +399,7 @@ _outAppend(StringInfo str, const Append *node)
WRITE_NODE_FIELD(partitioned_rels);
WRITE_NODE_FIELD(appendplans);
+ WRITE_INT_FIELD(first_partial_plan);
}
static void
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index ccb6a1f..23fcc1b 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -1600,6 +1600,7 @@ _readAppend(void)
READ_NODE_FIELD(partitioned_rels);
READ_NODE_FIELD(appendplans);
+ READ_INT_FIELD(first_partial_plan);
READ_DONE();
}
diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index 5535b63..902ddc1 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -102,6 +102,9 @@ static Path *get_cheapest_parameterized_child_path(PlannerInfo *root,
RelOptInfo *rel,
Relids required_outer);
static List *accumulate_append_subpath(List *subpaths, Path *path);
+static List *accumulate_partialappend_subpath(List *partial_subpaths,
+ Path *subpath, bool is_partial,
+ List **nonpartial_subpaths);
static void set_subquery_pathlist(PlannerInfo *root, RelOptInfo *rel,
Index rti, RangeTblEntry *rte);
static void set_function_pathlist(PlannerInfo *root, RelOptInfo *rel,
@@ -1310,7 +1313,10 @@ set_append_rel_pathlist(PlannerInfo *root, RelOptInfo *rel,
* non-dummy children. For every such parameterization or ordering, it creates
* an append path collecting one path from each non-dummy child with given
* parameterization or ordering. Similarly it collects partial paths from
- * non-dummy children to create partial append paths.
+ * non-dummy children to create partial append paths. Furthermore, it creates
+ * a parallel-aware partial Append path that can contain a mix of partial and
+ * non-partial paths of its children.
+ *
*/
static void
add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel,
@@ -1319,7 +1325,10 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel,
List *subpaths = NIL;
bool subpaths_valid = true;
List *partial_subpaths = NIL;
+ List *pa_partial_subpaths = NIL;
+ List *pa_nonpartial_subpaths = NIL;
bool partial_subpaths_valid = true;
+ bool pa_subpaths_valid = enable_parallel_append;
List *all_child_pathkeys = NIL;
List *all_child_outers = NIL;
ListCell *l;
@@ -1401,7 +1410,62 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel,
else
subpaths_valid = false;
- /* Same idea, but for a partial plan. */
+ /* Same idea, but for a parallel append path. */
+ if (pa_subpaths_valid && enable_parallel_append)
+ {
+ Path *chosen_path = NULL;
+ Path *cheapest_partial_path = NULL;
+ Path *cheapest_parallel_safe_path = NULL;
+
+ /*
+ * Extract the cheapest unparameterized, parallel-safe one among
+ * the child paths.
+ */
+ cheapest_parallel_safe_path =
+ get_cheapest_parallel_safe_total_inner(childrel->pathlist);
+
+ /* Get the cheapest partial path */
+ if (childrel->partial_pathlist != NIL)
+ cheapest_partial_path = linitial(childrel->partial_pathlist);
+
+ if (!cheapest_parallel_safe_path && !cheapest_partial_path)
+ {
+ /*
+ * This child rel neither has a partial path, nor has a
+ * parallel-safe path. Drop the idea for parallel append.
+ */
+ pa_subpaths_valid = false;
+ }
+ else if (cheapest_partial_path && cheapest_parallel_safe_path)
+ {
+ /* Both are valid. Choose the cheaper out of the two */
+ if (cheapest_parallel_safe_path->total_cost <
+ cheapest_partial_path->total_cost)
+ chosen_path = cheapest_parallel_safe_path;
+ else
+ chosen_path = cheapest_partial_path;
+ }
+ else
+ {
+ /* Either one is valid. Choose the valid one */
+ chosen_path = cheapest_partial_path ?
+ cheapest_partial_path :
+ cheapest_parallel_safe_path;
+ }
+
+ /* If we got a valid path, add it */
+ if (chosen_path)
+ {
+ pa_partial_subpaths =
+ accumulate_partialappend_subpath(
+ pa_partial_subpaths,
+ chosen_path,
+ chosen_path == cheapest_partial_path,
+ &pa_nonpartial_subpaths);
+ }
+ }
+
+ /* Same idea, but for a non-parallel partial plan. */
if (childrel->partial_pathlist != NIL)
partial_subpaths = accumulate_append_subpath(partial_subpaths,
linitial(childrel->partial_pathlist));
@@ -1479,35 +1543,47 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel,
* if we have zero or one live subpath due to constraint exclusion.)
*/
if (subpaths_valid)
- add_path(rel, (Path *) create_append_path(rel, subpaths, NULL, 0,
+ add_path(rel, (Path *) create_append_path(rel, subpaths, NIL,
+ NULL, 0, false,
partitioned_rels));
+ /* Consider parallel append path. */
+ if (pa_subpaths_valid)
+ {
+ AppendPath *appendpath;
+ int parallel_workers;
+
+ parallel_workers = get_append_num_workers(pa_partial_subpaths,
+ pa_nonpartial_subpaths,
+ true);
+ appendpath = create_append_path(rel, pa_nonpartial_subpaths,
+ pa_partial_subpaths,
+ NULL, parallel_workers, true,
+ partitioned_rels);
+ add_partial_path(rel, (Path *) appendpath);
+ }
+
/*
- * Consider an append of partial unordered, unparameterized partial paths.
+ * If parallel append has not been added above, or the added one has a mix
+ * of partial and non-partial subpaths, then consider another Parallel
+ * Append path which will have *all* partial subpaths. We can add such a
+ * path only if all childrels have partial paths in the first place. This
+ * new path will be parallel-aware unless enable_parallel_append is off.
*/
- if (partial_subpaths_valid)
+ if (partial_subpaths_valid &&
+ (!pa_subpaths_valid || pa_nonpartial_subpaths != NIL))
{
AppendPath *appendpath;
- ListCell *lc;
- int parallel_workers = 0;
-
- /*
- * Decide on the number of workers to request for this append path.
- * For now, we just use the maximum value from among the members. It
- * might be useful to use a higher number if the Append node were
- * smart enough to spread out the workers, but it currently isn't.
- */
- foreach(lc, partial_subpaths)
- {
- Path *path = lfirst(lc);
+ int parallel_workers;
- parallel_workers = Max(parallel_workers, path->parallel_workers);
- }
- Assert(parallel_workers > 0);
+ parallel_workers = get_append_num_workers(partial_subpaths,
+ NIL,
+ enable_parallel_append);
+ appendpath = create_append_path(rel, NIL, partial_subpaths,
+ NULL, parallel_workers,
+ enable_parallel_append,
+ partitioned_rels);
- /* Generate a partial append path. */
- appendpath = create_append_path(rel, partial_subpaths, NULL,
- parallel_workers, partitioned_rels);
add_partial_path(rel, (Path *) appendpath);
}
@@ -1560,7 +1636,8 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel,
if (subpaths_valid)
add_path(rel, (Path *)
- create_append_path(rel, subpaths, required_outer, 0,
+ create_append_path(rel, subpaths, NIL,
+ required_outer, 0, false,
partitioned_rels));
}
}
@@ -1778,6 +1855,69 @@ accumulate_append_subpath(List *subpaths, Path *path)
}
/*
+ * accumulate_partialappend_subpath:
+ * Add a subpath to the list being built for a partial Append.
+ *
+ * This is same as accumulate_append_subpath, except that two separate lists
+ * are created, one containing only partial subpaths, and the other containing
+ * only non-partial subpaths. Also, the non-partial paths are kept ordered
+ * by descending total cost.
+ *
+ * is_partial is true if the subpath being added is a partial subpath.
+ */
+static List *
+accumulate_partialappend_subpath(List *partial_subpaths,
+ Path *subpath, bool is_partial,
+ List **nonpartial_subpaths)
+{
+ /* list_copy is important here to avoid sharing list substructure */
+
+ if (IsA(subpath, AppendPath))
+ {
+ AppendPath *apath = (AppendPath *) subpath;
+ List *apath_partial_paths;
+ List *apath_nonpartial_paths;
+
+ /* Split the Append subpaths into partial and non-partial paths */
+ apath_nonpartial_paths = list_truncate(list_copy(apath->subpaths),
+ apath->first_partial_path);
+ apath_partial_paths = list_copy_tail(apath->subpaths,
+ apath->first_partial_path);
+
+ /* Add non-partial subpaths, if any. */
+ *nonpartial_subpaths = list_concat(*nonpartial_subpaths,
+ list_copy(apath_nonpartial_paths));
+
+ /* Add partial subpaths, if any. */
+ partial_subpaths = list_concat(partial_subpaths, apath_partial_paths);
+ }
+ else if (IsA(subpath, MergeAppendPath))
+ {
+ MergeAppendPath *mpath = (MergeAppendPath *) subpath;
+
+ /* We don't create partial MergeAppend path */
+ Assert(!is_partial);
+
+ /*
+ * Since MergePath itself is non-partial, treat all its subpaths
+ * non-partial.
+ */
+ *nonpartial_subpaths = list_concat(*nonpartial_subpaths,
+ list_copy(mpath->subpaths));
+ }
+ else
+ {
+ /* Just add it to the right list depending upon whether it's partial */
+ if (is_partial)
+ partial_subpaths = lappend(partial_subpaths, subpath);
+ else
+ *nonpartial_subpaths = lappend(*nonpartial_subpaths, subpath);
+ }
+
+ return partial_subpaths;
+}
+
+/*
* set_dummy_rel_pathlist
* Build a dummy path for a relation that's been excluded by constraints
*
@@ -1797,7 +1937,8 @@ set_dummy_rel_pathlist(RelOptInfo *rel)
rel->pathlist = NIL;
rel->partial_pathlist = NIL;
- add_path(rel, (Path *) create_append_path(rel, NIL, NULL, 0, NIL));
+ add_path(rel, (Path *) create_append_path(rel, NIL, NIL, NULL,
+ 0, false, NIL));
/*
* We set the cheapest path immediately, to ensure that IS_DUMMY_REL()
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index ce32b8a4..5ae02c8 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -128,6 +128,7 @@ bool enable_mergejoin = true;
bool enable_hashjoin = true;
bool enable_gathermerge = true;
bool enable_partition_wise_join = false;
+bool enable_parallel_append = true;
typedef struct
{
@@ -160,6 +161,8 @@ static Selectivity get_foreign_key_join_selectivity(PlannerInfo *root,
Relids inner_relids,
SpecialJoinInfo *sjinfo,
List **restrictlist);
+static Cost append_nonpartial_cost(List *subpaths, int numpaths,
+ int parallel_workers);
static void set_rel_width(PlannerInfo *root, RelOptInfo *rel);
static double relation_byte_size(double tuples, int width);
static double page_size(double tuples, int width);
@@ -1742,6 +1745,189 @@ cost_sort(Path *path, PlannerInfo *root,
}
/*
+ * append_nonpartial_cost
+ * Determines and returns the cost of non-partial paths of Append node.
+ *
+ * It is the total cost units taken by all the workers to finish all the
+ * non-partial subpaths.
+ * subpaths contains non-partial paths followed by partial paths.
+ * numpaths tells the number of non-partial paths.
+ */
+static Cost
+append_nonpartial_cost(List *subpaths, int numpaths, int parallel_workers)
+{
+ Cost *costarr;
+ int arrlen;
+ ListCell *l;
+ ListCell *cell;
+ int i;
+ int path_index;
+ int min_index;
+ int max_index;
+
+ if (numpaths == 0)
+ return 0;
+
+ /*
+ * Build the cost array containing costs of first n number of subpaths,
+ * where n = parallel_workers. Also, its size is kept only as long as the
+ * number of subpaths, or parallel_workers, whichever is minimum.
+ */
+ arrlen = Min(parallel_workers, numpaths);
+ costarr = (Cost *) palloc(sizeof(Cost) * arrlen);
+ path_index = 0;
+ foreach(cell, subpaths)
+ {
+ Path *subpath = (Path *) lfirst(cell);
+
+ if (path_index == arrlen)
+ break;
+ costarr[path_index++] = subpath->total_cost;
+ }
+
+ /*
+ * Since the subpaths are non-partial paths, the array is initially sorted
+ * by decreasing cost. So choose the last one for the index with minimum
+ * cost.
+ */
+ min_index = arrlen - 1;
+
+ /*
+ * For each of the remaining subpaths, add its cost to the array element
+ * with minimum cost.
+ */
+ for_each_cell(l, cell)
+ {
+ Path *subpath = (Path *) lfirst(l);
+ int i;
+
+ /* Consider only the non-partial paths */
+ if (path_index++ == numpaths)
+ break;
+
+ costarr[min_index] += subpath->total_cost;
+
+ /* Update the new min cost array index */
+ for (min_index = i = 0; i < arrlen; i++)
+ {
+ if (costarr[i] < costarr[min_index])
+ min_index = i;
+ }
+ }
+
+ /* Return the highest cost from the array */
+ for (max_index = i = 0; i < arrlen; i++)
+ {
+ if (costarr[i] > costarr[max_index])
+ max_index = i;
+ }
+
+ return costarr[max_index];
+}
+
+/*
+ * cost_append
+ * Determines and returns the cost of an Append node.
+ *
+ * We charge nothing extra for the Append itself, which perhaps is too
+ * optimistic, but since it doesn't do any selection or projection, it is a
+ * pretty cheap node.
+ */
+void
+cost_append(Path *path, List *subpaths, int num_nonpartial_subpaths)
+{
+ ListCell *l;
+
+ path->rows = 0;
+ path->startup_cost = 0;
+ path->total_cost = 0;
+
+ if (list_length(subpaths) == 0)
+ return;
+
+ if (!path->parallel_aware)
+ {
+ Path *subpath = (Path *) linitial(subpaths);
+
+ /*
+ * Startup cost of non-parallel-aware Append is the startup cost of
+ * first subpath.
+ */
+ path->startup_cost = subpath->startup_cost;
+
+ /* Compute rows and costs as sums of subplan rows and costs. */
+ foreach(l, subpaths)
+ {
+ Path *subpath = (Path *) lfirst(l);
+
+ path->rows += subpath->rows;
+ path->total_cost += subpath->total_cost;
+ }
+ }
+ else /* parallel-aware */
+ {
+ double max_rows = 0;
+ double nonpartial_rows = 0;
+ int i = 0;
+
+ /* Include the non-partial paths total cost */
+ path->total_cost += append_nonpartial_cost(subpaths,
+ num_nonpartial_subpaths,
+ path->parallel_workers);
+
+ /* Calculate startup cost; also add up all the rows for later use */
+ foreach(l, subpaths)
+ {
+ Path *subpath = (Path *) lfirst(l);
+
+ /*
+ * Append would start returning tuples when the child node having
+ * lowest startup cost is done setting up. We consider only the
+ * first few subplans that immediately get a worker assigned.
+ */
+ if (i < path->parallel_workers)
+ {
+ path->startup_cost = Min(path->startup_cost,
+ subpath->startup_cost);
+ }
+
+ if (i < num_nonpartial_subpaths)
+ {
+ nonpartial_rows += subpath->rows;
+
+ /* Also keep track of max rows for any given subpath */
+ max_rows = Max(max_rows, subpath->rows);
+ }
+
+ i++;
+ }
+
+ /*
+ * As an approximation, non-partial rows are calculated as total rows
+ * divided by number of workers. But if there are highly unequal number
+ * of rows across the paths, this figure might not reflect correctly.
+ * So we make a note that it also should not be less than the maximum
+ * of all the path rows.
+ */
+ nonpartial_rows /= path->parallel_workers;
+ path->rows += Max(nonpartial_rows, max_rows);
+
+ /* Calculate partial paths cost. */
+ if (list_length(subpaths) > num_nonpartial_subpaths)
+ {
+ /* Compute rows and costs as sums of subplan rows and costs. */
+ for_each_cell(l, list_nth_cell(subpaths, num_nonpartial_subpaths))
+ {
+ Path *subpath = (Path *) lfirst(l);
+
+ path->rows += subpath->rows;
+ path->total_cost += subpath->total_cost;
+ }
+ }
+ }
+}
+
+/*
* cost_merge_append
* Determines and returns the cost of a MergeAppend node.
*
diff --git a/src/backend/optimizer/path/joinrels.c b/src/backend/optimizer/path/joinrels.c
index 2b868c5..9f457d4 100644
--- a/src/backend/optimizer/path/joinrels.c
+++ b/src/backend/optimizer/path/joinrels.c
@@ -1232,7 +1232,8 @@ mark_dummy_rel(RelOptInfo *rel)
rel->partial_pathlist = NIL;
/* Set up the dummy path */
- add_path(rel, (Path *) create_append_path(rel, NIL, NULL, 0, NIL));
+ add_path(rel, (Path *) create_append_path(rel, NIL, NIL, NULL,
+ 0, false, NIL));
/* Set or update cheapest_total_path and related fields */
set_cheapest(rel);
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index c802d61..93a5296 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -203,7 +203,8 @@ static NamedTuplestoreScan *make_namedtuplestorescan(List *qptlist, List *qpqual
Index scanrelid, char *enrname);
static WorkTableScan *make_worktablescan(List *qptlist, List *qpqual,
Index scanrelid, int wtParam);
-static Append *make_append(List *appendplans, List *tlist, List *partitioned_rels);
+static Append *make_append(List *appendplans, int first_partial_plan,
+ List *tlist, List *partitioned_rels);
static RecursiveUnion *make_recursive_union(List *tlist,
Plan *lefttree,
Plan *righttree,
@@ -1050,7 +1051,8 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path)
* parent-rel Vars it'll be asked to emit.
*/
- plan = make_append(subplans, tlist, best_path->partitioned_rels);
+ plan = make_append(subplans, best_path->first_partial_path,
+ tlist, best_path->partitioned_rels);
copy_generic_path_info(&plan->plan, (Path *) best_path);
@@ -5285,7 +5287,7 @@ make_foreignscan(List *qptlist,
}
static Append *
-make_append(List *appendplans, List *tlist, List *partitioned_rels)
+make_append(List *appendplans, int first_partial_plan, List *tlist, List *partitioned_rels)
{
Append *node = makeNode(Append);
Plan *plan = &node->plan;
@@ -5296,6 +5298,7 @@ make_append(List *appendplans, List *tlist, List *partitioned_rels)
plan->righttree = NULL;
node->partitioned_rels = partitioned_rels;
node->appendplans = appendplans;
+ node->first_partial_plan = first_partial_plan;
return node;
}
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index ecdd728..446d6f6 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -3655,8 +3655,10 @@ create_grouping_paths(PlannerInfo *root,
path = (Path *)
create_append_path(grouped_rel,
paths,
+ NIL,
NULL,
0,
+ false,
NIL);
path->pathtarget = target;
}
diff --git a/src/backend/optimizer/prep/prepunion.c b/src/backend/optimizer/prep/prepunion.c
index 1c84a2c..6ea7029 100644
--- a/src/backend/optimizer/prep/prepunion.c
+++ b/src/backend/optimizer/prep/prepunion.c
@@ -590,8 +590,8 @@ generate_union_path(SetOperationStmt *op, PlannerInfo *root,
/*
* Append the child results together.
*/
- path = (Path *) create_append_path(result_rel, pathlist, NULL, 0, NIL);
-
+ path = (Path *) create_append_path(result_rel, pathlist, NIL,
+ NULL, 0, false, NIL);
/* We have to manually jam the right tlist into the path; ick */
path->pathtarget = create_pathtarget(root, tlist);
@@ -702,7 +702,8 @@ generate_nonunion_path(SetOperationStmt *op, PlannerInfo *root,
/*
* Append the child results together.
*/
- path = (Path *) create_append_path(result_rel, pathlist, NULL, 0, NIL);
+ path = (Path *) create_append_path(result_rel, pathlist, NIL,
+ NULL, 0, false, NIL);
/* We have to manually jam the right tlist into the path; ick */
path->pathtarget = create_pathtarget(root, tlist);
diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index 2d491eb..454a09d 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -51,6 +51,8 @@ typedef enum
#define STD_FUZZ_FACTOR 1.01
static List *translate_sub_tlist(List *tlist, int relid);
+static int append_total_cost_compare(const void *a, const void *b);
+static int append_startup_cost_compare(const void *a, const void *b);
static List *reparameterize_pathlist_by_child(PlannerInfo *root,
List *pathlist,
RelOptInfo *child_rel);
@@ -1201,6 +1203,82 @@ create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals,
}
/*
+ * get_append_num_workers
+ * Return the number of workers to request for partial append path.
+ */
+int
+get_append_num_workers(List *partial_subpaths,
+ List *nonpartial_subpaths,
+ bool parallel_aware)
+{
+ ListCell *lc;
+ double log2w;
+ int num_workers;
+ int max_per_plan_workers;
+
+ /*
+ * log2(number_of_subpaths)+1 formula seems to give an appropriate number of
+ * workers for Append path either having high number of children (> 100) or
+ * having all non-partial subpaths or subpaths with 1-2 parallel_workers.
+ * Whereas, if the subpaths->parallel_workers is high, this formula is not
+ * suitable, because it does not take into account per-subpath workers.
+ * For e.g., with 3 subplans having per-subplan workers such as (2, 8, 8),
+ * the Append workers should be at least 8, whereas the formula gives 2. In
+ * this case, it seems better to follow the method used for calculating
+ * parallel_workers of an unpartitioned table : log3(table_size). So we
+ * treat a partitioned table as if the data belongs to a single
+ * unpartitioned table, and then derive its workers. So it will be :
+ * logb(b^w1 + b^w2 + b^w3) where w1, w2.. are per-subplan workers and
+ * b is some logarithmic base such as 2 or 3. It turns out that this
+ * evaluates to a value just a bit greater than max(w1,w2, w3). So, we
+ * just use the maximum of workers formula. But this formula gives too few
+ * workers when all paths have single worker (meaning they are non-partial)
+ * For e.g. with workers : (1, 1, 1, 1, 1, 1), it is better to allocate 3
+ * workers, whereas this method allocates only 1.
+ * So we use whichever method that gives higher number of workers.
+ */
+
+ /* Get log2(num_subpaths) */
+ log2w = fls(list_length(partial_subpaths) +
+ list_length(nonpartial_subpaths));
+
+ /* Avoid further calculations if we already crossed max workers limit */
+ if (max_parallel_workers_per_gather <= log2w + 1)
+ return max_parallel_workers_per_gather;
+
+
+ /*
+ * Get the parallel_workers value of the partial subpath having the highest
+ * parallel_workers.
+ */
+ max_per_plan_workers = 1;
+ foreach(lc, partial_subpaths)
+ {
+ Path *subpath = lfirst(lc);
+ max_per_plan_workers = Max(max_per_plan_workers,
+ subpath->parallel_workers);
+ }
+
+ /*
+ * For non-parallel-aware Append, all workers run a common subplan at a
+ * time, so it makes sense to just choose the per-subplan max workers as
+ * the Append workers. For parallel-aware Append, choose the higher of the
+ * results of the two formulae.
+ */
+ num_workers = (parallel_aware ?
+ rint(Max(log2w, max_per_plan_workers) + 1)
+ : max_per_plan_workers);
+
+
+ /* In no case use more than max_parallel_workers_per_gather workers. */
+ num_workers = Min(num_workers, max_parallel_workers_per_gather);
+
+ Assert(num_workers > 0);
+
+ return num_workers;
+}
+
+/*
* create_append_path
* Creates a path corresponding to an Append plan, returning the
* pathnode.
@@ -1208,8 +1286,11 @@ create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals,
* Note that we must handle subpaths = NIL, representing a dummy access path.
*/
AppendPath *
-create_append_path(RelOptInfo *rel, List *subpaths, Relids required_outer,
- int parallel_workers, List *partitioned_rels)
+create_append_path(RelOptInfo *rel,
+ List *subpaths, List *partial_subpaths,
+ Relids required_outer,
+ int parallel_workers, bool parallel_aware,
+ List *partitioned_rels)
{
AppendPath *pathnode = makeNode(AppendPath);
ListCell *l;
@@ -1219,44 +1300,83 @@ create_append_path(RelOptInfo *rel, List *subpaths, Relids required_outer,
pathnode->path.pathtarget = rel->reltarget;
pathnode->path.param_info = get_appendrel_parampathinfo(rel,
required_outer);
- pathnode->path.parallel_aware = false;
+ pathnode->path.parallel_aware = (parallel_aware && parallel_workers > 0);
pathnode->path.parallel_safe = rel->consider_parallel;
pathnode->path.parallel_workers = parallel_workers;
pathnode->path.pathkeys = NIL; /* result is always considered unsorted */
pathnode->partitioned_rels = list_copy(partitioned_rels);
- pathnode->subpaths = subpaths;
- /*
- * We don't bother with inventing a cost_append(), but just do it here.
- *
- * Compute rows and costs as sums of subplan rows and costs. We charge
- * nothing extra for the Append itself, which perhaps is too optimistic,
- * but since it doesn't do any selection or projection, it is a pretty
- * cheap node.
+ /* For parallel append, non-partial paths are sorted by descending total
+ * costs. That way, the total time to finish all non-partial paths is
+ * minimized. Also, the partial paths are sorted by descending startup
+ * costs. There may be some paths that require to do startup work by a
+ * single worker. In such case, it's better for workers to choose the
+ * expensive ones first, whereas the leader should choose the cheapest
+ * startup plan.
*/
- pathnode->path.rows = 0;
- pathnode->path.startup_cost = 0;
- pathnode->path.total_cost = 0;
+ if (pathnode->path.parallel_aware)
+ {
+ subpaths = list_qsort(subpaths, append_total_cost_compare);
+ partial_subpaths = list_qsort(partial_subpaths,
+ append_startup_cost_compare);
+ }
+ pathnode->first_partial_path = list_length(subpaths);
+ pathnode->subpaths = list_concat(subpaths, partial_subpaths);
+
foreach(l, subpaths)
{
Path *subpath = (Path *) lfirst(l);
- pathnode->path.rows += subpath->rows;
-
- if (l == list_head(subpaths)) /* first node? */
- pathnode->path.startup_cost = subpath->startup_cost;
- pathnode->path.total_cost += subpath->total_cost;
pathnode->path.parallel_safe = pathnode->path.parallel_safe &&
- subpath->parallel_safe;
+ subpath->parallel_safe;
/* All child paths must have same parameterization */
Assert(bms_equal(PATH_REQ_OUTER(subpath), required_outer));
}
+ cost_append(&pathnode->path, pathnode->subpaths,
+ pathnode->first_partial_path);
+
return pathnode;
}
/*
+ * append_total_cost_compare
+ * list_qsort comparator for sorting append child paths by total_cost
+ */
+static int
+append_total_cost_compare(const void *a, const void *b)
+{
+ Path *path1 = (Path *) lfirst(*(ListCell **) a);
+ Path *path2 = (Path *) lfirst(*(ListCell **) b);
+
+ if (path1->total_cost > path2->total_cost)
+ return -1;
+ if (path1->total_cost < path2->total_cost)
+ return 1;
+
+ return 0;
+}
+
+/*
+ * append_startup_cost_compare
+ * list_qsort comparator for sorting append child paths by startup_cost
+ */
+static int
+append_startup_cost_compare(const void *a, const void *b)
+{
+ Path *path1 = (Path *) lfirst(*(ListCell **) a);
+ Path *path2 = (Path *) lfirst(*(ListCell **) b);
+
+ if (path1->startup_cost > path2->startup_cost)
+ return -1;
+ if (path1->startup_cost < path2->startup_cost)
+ return 1;
+
+ return 0;
+}
+
+/*
* create_merge_append_path
* Creates a path corresponding to a MergeAppend plan, returning the
* pathnode.
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index f1060f9..f2b4474 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -517,6 +517,7 @@ RegisterLWLockTranches(void)
LWLockRegisterTranche(LWTRANCHE_SESSION_TYPMOD_TABLE,
"session_typmod_table");
LWLockRegisterTranche(LWTRANCHE_TBM, "tbm");
+ LWLockRegisterTranche(LWTRANCHE_PARALLEL_APPEND, "parallel_append");
/* Register named tranches. */
for (i = 0; i < NamedLWLockTrancheRequests; i++)
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index ae22185..5251259 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -920,6 +920,15 @@ static struct config_bool ConfigureNamesBool[] =
false,
NULL, NULL, NULL
},
+ {
+ {"enable_parallel_append", PGC_USERSET, QUERY_TUNING_METHOD,
+ gettext_noop("Enables the planner's use of parallel append plans."),
+ NULL
+ },
+ &enable_parallel_append,
+ true,
+ NULL, NULL, NULL
+ },
{
{"geqo", PGC_USERSET, QUERY_TUNING_GEQO,
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 368b280..87c54f0 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -296,6 +296,7 @@
#enable_material = on
#enable_mergejoin = on
#enable_nestloop = on
+#enable_parallel_append = on
#enable_seqscan = on
#enable_sort = on
#enable_tidscan = on
diff --git a/src/include/executor/nodeAppend.h b/src/include/executor/nodeAppend.h
index 4e38a13..78e5943 100644
--- a/src/include/executor/nodeAppend.h
+++ b/src/include/executor/nodeAppend.h
@@ -14,10 +14,15 @@
#ifndef NODEAPPEND_H
#define NODEAPPEND_H
+#include "access/parallel.h"
#include "nodes/execnodes.h"
extern AppendState *ExecInitAppend(Append *node, EState *estate, int eflags);
extern void ExecEndAppend(AppendState *node);
extern void ExecReScanAppend(AppendState *node);
+extern void ExecAppendEstimate(AppendState *node, ParallelContext *pcxt);
+extern void ExecAppendInitializeDSM(AppendState *node, ParallelContext *pcxt);
+extern void ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt);
+extern void ExecAppendInitializeWorker(AppendState *node, shm_toc *toc);
#endif /* NODEAPPEND_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 52d3532..d23ff47 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -21,6 +21,7 @@
#include "lib/pairingheap.h"
#include "nodes/params.h"
#include "nodes/plannodes.h"
+#include "storage/spin.h"
#include "utils/hsearch.h"
#include "utils/queryenvironment.h"
#include "utils/reltrigger.h"
@@ -998,12 +999,16 @@ typedef struct ModifyTableState
* whichplan which plan is being executed (0 .. n-1)
* ----------------
*/
+struct ParallelAppendDescData;
typedef struct AppendState
{
PlanState ps; /* its first field is NodeTag */
PlanState **appendplans; /* array of PlanStates for my inputs */
int as_nplans;
int as_whichplan;
+ bool is_parallel_worker;
+ struct ParallelAppendDescData *as_padesc; /* parallel coordination info */
+ Size pappend_len; /* size of parallel coordination info */
} AppendState;
/* ----------------
diff --git a/src/include/nodes/pg_list.h b/src/include/nodes/pg_list.h
index 667d5e2..711db92 100644
--- a/src/include/nodes/pg_list.h
+++ b/src/include/nodes/pg_list.h
@@ -269,6 +269,9 @@ extern void list_free_deep(List *list);
extern List *list_copy(const List *list);
extern List *list_copy_tail(const List *list, int nskip);
+typedef int (*list_qsort_comparator) (const void *a, const void *b);
+extern List *list_qsort(const List *list, list_qsort_comparator cmp);
+
/*
* To ease migration to the new list API, a set of compatibility
* macros are provided that reduce the impact of the list API changes
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index dd74efa..08d486f 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -248,6 +248,7 @@ typedef struct Append
/* RT indexes of non-leaf tables in a partition tree */
List *partitioned_rels;
List *appendplans;
+ int first_partial_plan;
} Append;
/* ----------------
diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h
index e085cef..ec5da88 100644
--- a/src/include/nodes/relation.h
+++ b/src/include/nodes/relation.h
@@ -1255,6 +1255,9 @@ typedef struct CustomPath
* AppendPath represents an Append plan, ie, successive execution of
* several member plans.
*
+ * For partial Append, 'subpaths' contains non-partial subpaths followed by
+ * partial subpaths.
+ *
* Note: it is possible for "subpaths" to contain only one, or even no,
* elements. These cases are optimized during create_append_plan.
* In particular, an AppendPath with no subpaths is a "dummy" path that
@@ -1266,6 +1269,9 @@ typedef struct AppendPath
/* RT indexes of non-leaf tables in a partition tree */
List *partitioned_rels;
List *subpaths; /* list of component Paths */
+
+ /* Index of first partial path in subpaths */
+ int first_partial_path;
} AppendPath;
#define IS_DUMMY_PATH(p) \
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index 306d923..7d3a547 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -68,6 +68,7 @@ extern bool enable_mergejoin;
extern bool enable_hashjoin;
extern bool enable_gathermerge;
extern bool enable_partition_wise_join;
+extern bool enable_parallel_append;
extern int constraint_exclusion;
extern double clamp_row_est(double nrows);
@@ -106,6 +107,8 @@ extern void cost_sort(Path *path, PlannerInfo *root,
List *pathkeys, Cost input_cost, double tuples, int width,
Cost comparison_cost, int sort_mem,
double limit_tuples);
+extern void cost_append(Path *path, List *subpaths,
+ int num_nonpartial_subpaths);
extern void cost_merge_append(Path *path, PlannerInfo *root,
List *pathkeys, int n_streams,
Cost input_startup_cost, Cost input_total_cost,
diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h
index e9ed16a..51b5096 100644
--- a/src/include/optimizer/pathnode.h
+++ b/src/include/optimizer/pathnode.h
@@ -14,6 +14,7 @@
#ifndef PATHNODE_H
#define PATHNODE_H
+#include "nodes/bitmapset.h"
#include "nodes/relation.h"
@@ -63,9 +64,14 @@ extern BitmapOrPath *create_bitmap_or_path(PlannerInfo *root,
List *bitmapquals);
extern TidPath *create_tidscan_path(PlannerInfo *root, RelOptInfo *rel,
List *tidquals, Relids required_outer);
-extern AppendPath *create_append_path(RelOptInfo *rel, List *subpaths,
- Relids required_outer, int parallel_workers,
- List *partitioned_rels);
+extern int get_append_num_workers(List *partial_subpaths,
+ List *nonpartial_subpaths,
+ bool parallel_aware);
+extern AppendPath *create_append_path(RelOptInfo *rel,
+ List *subpaths, List *partial_subpaths,
+ Relids required_outer,
+ int parallel_workers, bool parallel_aware,
+ List *partitioned_rels);
extern MergeAppendPath *create_merge_append_path(PlannerInfo *root,
RelOptInfo *rel,
List *subpaths,
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index f4c4aed..e190155 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -216,6 +216,7 @@ typedef enum BuiltinTrancheIds
LWTRANCHE_SESSION_RECORD_TABLE,
LWTRANCHE_SESSION_TYPMOD_TABLE,
LWTRANCHE_TBM,
+ LWTRANCHE_PARALLEL_APPEND,
LWTRANCHE_FIRST_USER_DEFINED
} BuiltinTrancheIds;
diff --git a/src/test/regress/expected/inherit.out b/src/test/regress/expected/inherit.out
index c698faf..9692155 100644
--- a/src/test/regress/expected/inherit.out
+++ b/src/test/regress/expected/inherit.out
@@ -1404,6 +1404,7 @@ select min(1-id) from matest0;
reset enable_indexscan;
set enable_seqscan = off; -- plan with fewest seqscans should be merge
+set enable_parallel_append = off; -- Don't let parallel-append interfere
explain (verbose, costs off) select * from matest0 order by 1-id;
QUERY PLAN
------------------------------------------------------------------
@@ -1470,6 +1471,7 @@ select min(1-id) from matest0;
(1 row)
reset enable_seqscan;
+reset enable_parallel_append;
drop table matest0 cascade;
NOTICE: drop cascades to 3 other objects
DETAIL: drop cascades to table matest1
diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out
index 2ae600f..b4cf7cb 100644
--- a/src/test/regress/expected/select_parallel.out
+++ b/src/test/regress/expected/select_parallel.out
@@ -11,13 +11,38 @@ set parallel_setup_cost=0;
set parallel_tuple_cost=0;
set min_parallel_table_scan_size=0;
set max_parallel_workers_per_gather=4;
+-- test Parallel Append.
explain (costs off)
- select count(*) from a_star;
+ select round(avg(aa)), sum(aa) from a_star;
QUERY PLAN
-----------------------------------------------------
Finalize Aggregate
-> Gather
- Workers Planned: 1
+ Workers Planned: 4
+ -> Partial Aggregate
+ -> Parallel Append
+ -> Parallel Seq Scan on a_star
+ -> Parallel Seq Scan on b_star
+ -> Parallel Seq Scan on c_star
+ -> Parallel Seq Scan on d_star
+ -> Parallel Seq Scan on e_star
+ -> Parallel Seq Scan on f_star
+(11 rows)
+
+select round(avg(aa)), sum(aa) from a_star;
+ round | sum
+-------+-----
+ 14 | 355
+(1 row)
+
+set enable_parallel_append to off;
+explain (costs off)
+ select round(avg(aa)), sum(aa) from a_star;
+ QUERY PLAN
+-----------------------------------------------------
+ Finalize Aggregate
+ -> Gather
+ Workers Planned: 4
-> Partial Aggregate
-> Append
-> Parallel Seq Scan on a_star
@@ -28,12 +53,63 @@ explain (costs off)
-> Parallel Seq Scan on f_star
(11 rows)
-select count(*) from a_star;
- count
--------
- 50
+select round(avg(aa)), sum(aa) from a_star;
+ round | sum
+-------+-----
+ 14 | 355
+(1 row)
+
+set enable_parallel_append to on;
+-- Mix of partial and non-partial subplans.
+alter table c_star set (parallel_workers = 0);
+alter table d_star set (parallel_workers = 0);
+explain (costs off)
+ select round(avg(aa)), sum(aa) from a_star;
+ QUERY PLAN
+-----------------------------------------------------
+ Finalize Aggregate
+ -> Gather
+ Workers Planned: 4
+ -> Partial Aggregate
+ -> Parallel Append
+ -> Seq Scan on d_star
+ -> Seq Scan on c_star
+ -> Parallel Seq Scan on a_star
+ -> Parallel Seq Scan on b_star
+ -> Parallel Seq Scan on e_star
+ -> Parallel Seq Scan on f_star
+(11 rows)
+
+select round(avg(aa)), sum(aa) from a_star;
+ round | sum
+-------+-----
+ 14 | 355
+(1 row)
+
+set enable_parallel_append to off;
+explain (costs off)
+ select round(avg(aa)), sum(aa) from a_star;
+ QUERY PLAN
+--------------------------------
+ Aggregate
+ -> Append
+ -> Seq Scan on a_star
+ -> Seq Scan on b_star
+ -> Seq Scan on c_star
+ -> Seq Scan on d_star
+ -> Seq Scan on e_star
+ -> Seq Scan on f_star
+(8 rows)
+
+select round(avg(aa)), sum(aa) from a_star;
+ round | sum
+-------+-----
+ 14 | 355
(1 row)
+reset enable_parallel_append;
+alter table c_star reset (parallel_workers);
+alter table d_star reset (parallel_workers);
-- test that parallel_restricted function doesn't run in worker
alter table tenk1 set (parallel_workers = 4);
explain (verbose, costs off)
diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out
index cd1f7f3..2b738aa 100644
--- a/src/test/regress/expected/sysviews.out
+++ b/src/test/regress/expected/sysviews.out
@@ -81,11 +81,12 @@ select name, setting from pg_settings where name like 'enable%';
enable_material | on
enable_mergejoin | on
enable_nestloop | on
+ enable_parallel_append | on
enable_partition_wise_join | off
enable_seqscan | on
enable_sort | on
enable_tidscan | on
-(13 rows)
+(14 rows)
-- Test that the pg_timezone_names and pg_timezone_abbrevs views are
-- more-or-less working. We can't test their contents in any great detail
diff --git a/src/test/regress/sql/inherit.sql b/src/test/regress/sql/inherit.sql
index 169d0dc..3fafc5f 100644
--- a/src/test/regress/sql/inherit.sql
+++ b/src/test/regress/sql/inherit.sql
@@ -508,11 +508,13 @@ select min(1-id) from matest0;
reset enable_indexscan;
set enable_seqscan = off; -- plan with fewest seqscans should be merge
+set enable_parallel_append = off; -- Don't let parallel-append interfere
explain (verbose, costs off) select * from matest0 order by 1-id;
select * from matest0 order by 1-id;
explain (verbose, costs off) select min(1-id) from matest0;
select min(1-id) from matest0;
reset enable_seqscan;
+reset enable_parallel_append;
drop table matest0 cascade;
diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql
index 89fe80a..8de082b 100644
--- a/src/test/regress/sql/select_parallel.sql
+++ b/src/test/regress/sql/select_parallel.sql
@@ -15,9 +15,28 @@ set parallel_tuple_cost=0;
set min_parallel_table_scan_size=0;
set max_parallel_workers_per_gather=4;
-explain (costs off)
- select count(*) from a_star;
-select count(*) from a_star;
+-- test Parallel Append.
+explain (costs off)
+ select round(avg(aa)), sum(aa) from a_star;
+select round(avg(aa)), sum(aa) from a_star;
+set enable_parallel_append to off;
+explain (costs off)
+ select round(avg(aa)), sum(aa) from a_star;
+select round(avg(aa)), sum(aa) from a_star;
+set enable_parallel_append to on;
+-- Mix of partial and non-partial subplans.
+alter table c_star set (parallel_workers = 0);
+alter table d_star set (parallel_workers = 0);
+explain (costs off)
+ select round(avg(aa)), sum(aa) from a_star;
+select round(avg(aa)), sum(aa) from a_star;
+set enable_parallel_append to off;
+explain (costs off)
+ select round(avg(aa)), sum(aa) from a_star;
+select round(avg(aa)), sum(aa) from a_star;
+reset enable_parallel_append;
+alter table c_star reset (parallel_workers);
+alter table d_star reset (parallel_workers);
-- test that parallel_restricted function doesn't run in worker
alter table tenk1 set (parallel_workers = 4);