From 243965f12c16c8ded348255254a179ec198f6812 Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Thu, 19 Mar 2020 03:00:50 +0100 Subject: [PATCH 6/7] Parallel grouping sets MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We used to support grouping sets in one worker only, this PR want to support parallel grouping sets using multiple workers. the main idea of parallel grouping sets is: like parallel aggregate,  we separate grouping sets into two stages: The initial stage: this stage has almost the same plan and execution routines with the current implementation of grouping sets, the differenceis are 1) it only produces partial aggregate results 2) the output is attached with an extra grouping set id. We know partial aggregate results will be combined in the final stage and we have multiple grouping sets, so only partial aggregate results belong to the same grouping set can be combined, that is why grouping set id is introduced to identify the sets. We keep all the optimizations of multiple grouping sets in the initial stage, eg, 1) the grouping sets (that can be grouped by one single sort) are put into the one rollup structure so those sets arecomputed in one aggregate phase. 2) do hash aggregate concurrently when a sort aggregate is performed. 3) do all hash transitions in one expression state. The final stage: this stage combine the partial aggregate results according to the grouping set id. Obviously, all the optimizations in the initial stage cannot be used, so all rollups are extracted, each rollup contains only one grouping set, then each aggregate phase only processes one set. We do a filter in the final stage, redirect the tuples to each aggregate phase. --- src/backend/commands/explain.c | 10 +- src/backend/executor/execExpr.c | 10 +- src/backend/executor/execExprInterp.c | 11 + src/backend/executor/nodeAgg.c | 261 +++++++++++++++++- src/backend/jit/llvm/llvmjit_expr.c | 40 +++ src/backend/nodes/copyfuncs.c | 56 +++- src/backend/nodes/equalfuncs.c | 3 + src/backend/nodes/nodeFuncs.c | 8 + src/backend/nodes/outfuncs.c | 14 +- src/backend/nodes/readfuncs.c | 53 +++- src/backend/optimizer/path/allpaths.c | 5 +- src/backend/optimizer/plan/createplan.c | 26 +- src/backend/optimizer/plan/planner.c | 343 ++++++++++++++++++------ src/backend/optimizer/plan/setrefs.c | 16 ++ src/backend/optimizer/util/pathnode.c | 27 +- src/backend/utils/adt/ruleutils.c | 6 + src/include/executor/execExpr.h | 1 + src/include/executor/nodeAgg.h | 2 + src/include/nodes/execnodes.h | 8 +- src/include/nodes/nodes.h | 1 + src/include/nodes/pathnodes.h | 2 + src/include/nodes/plannodes.h | 4 +- src/include/nodes/primnodes.h | 6 + src/include/optimizer/pathnode.h | 1 + src/include/optimizer/planmain.h | 2 +- 25 files changed, 791 insertions(+), 125 deletions(-) diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 2c63cdb46c..8b6877c41e 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -2256,12 +2256,16 @@ show_agg_keys(AggState *astate, List *ancestors, { Agg *plan = (Agg *) astate->ss.ps.plan; - if (plan->numCols > 0 || plan->groupingSets) + if (plan->gsetid) + show_expression((Node *) plan->gsetid, "Filtered by", + (PlanState *) astate, ancestors, true, es); + + if (plan->numCols > 0 || plan->rollup) { /* The key columns refer to the tlist of the child plan */ ancestors = lcons(plan, ancestors); - if (plan->groupingSets) + if (plan->rollup) show_grouping_sets(outerPlanState(astate), plan, ancestors, es); else show_sort_group_keys(outerPlanState(astate), "Group Key", @@ -2312,7 +2316,7 @@ show_grouping_set_keys(PlanState *planstate, Plan *plan = planstate->plan; char *exprstr; ListCell *lc; - List *gsets = aggnode->groupingSets; + List *gsets = aggnode->rollup->gsets; AttrNumber *keycols = aggnode->grpColIdx; const char *keyname; const char *keysetname; diff --git a/src/backend/executor/execExpr.c b/src/backend/executor/execExpr.c index 669843faf5..bf69fcfe97 100644 --- a/src/backend/executor/execExpr.c +++ b/src/backend/executor/execExpr.c @@ -815,7 +815,7 @@ ExecInitExprRec(Expr *node, ExprState *state, agg = (Agg *) (state->parent->plan); - if (agg->groupingSets) + if (agg->rollup) scratch.d.grouping_func.clauses = grp_node->cols; else scratch.d.grouping_func.clauses = NIL; @@ -824,6 +824,14 @@ ExecInitExprRec(Expr *node, ExprState *state, break; } + case T_GroupingSetId: + { + scratch.opcode = EEOP_GROUPING_SET_ID; + + ExprEvalPushStep(state, &scratch); + break; + } + case T_WindowFunc: { WindowFunc *wfunc = (WindowFunc *) node; diff --git a/src/backend/executor/execExprInterp.c b/src/backend/executor/execExprInterp.c index b0dbba4e55..b3537eb8d9 100644 --- a/src/backend/executor/execExprInterp.c +++ b/src/backend/executor/execExprInterp.c @@ -428,6 +428,7 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull) &&CASE_EEOP_XMLEXPR, &&CASE_EEOP_AGGREF, &&CASE_EEOP_GROUPING_FUNC, + &&CASE_EEOP_GROUPING_SET_ID, &&CASE_EEOP_WINDOW_FUNC, &&CASE_EEOP_SUBPLAN, &&CASE_EEOP_ALTERNATIVE_SUBPLAN, @@ -1512,6 +1513,16 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull) EEO_NEXT(); } + EEO_CASE(EEOP_GROUPING_SET_ID) + { + AggState *aggstate = castNode(AggState, state->parent); + + *op->resvalue = aggstate->phase->setno_gsetids[aggstate->current_set]; + *op->resnull = false; + + EEO_NEXT(); + } + EEO_CASE(EEOP_WINDOW_FUNC) { /* diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index 38d0bd5895..f7b98dd798 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -282,6 +282,7 @@ static void lookup_hash_entry(AggState *aggstate, AggStatePerPhaseHash perhash, static TupleTableSlot *agg_retrieve_direct(AggState *aggstate); static void agg_fill_hash_table(AggState *aggstate); static void agg_sort_input(AggState *aggstate); +static void agg_preprocess_groupingsets(AggState *aggstate); static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate); static Datum GetAggInitVal(Datum textInitVal, Oid transtype); static void build_pertrans_for_aggref(AggStatePerTrans pertrans, @@ -341,17 +342,26 @@ initialize_phase(AggState *aggstate, int newphase) * Whatever the previous state, we're now done with whatever input * tuplesort was in use, cleanup them. * - * Note: we keep the first tuplesort/tuplestore, this will benifit the + * Note: we keep the first tuplesort/tuplestore when it's not the + * final stage of partial groupingsets, this will benifit the * rescan in some cases without resorting the input again. */ - if (!current_phase->is_hashed && aggstate->current_phase > 0) + if (!current_phase->is_hashed && + (aggstate->current_phase > 0 || DO_AGGSPLIT_COMBINE(aggstate->aggsplit))) { persort = (AggStatePerPhaseSort) current_phase; + if (persort->sort_in) { tuplesort_end(persort->sort_in); persort->sort_in = NULL; } + + if (persort->store_in) + { + tuplestore_end(persort->store_in); + persort->store_in = NULL; + } } /* advance to next phase */ @@ -420,6 +430,15 @@ fetch_input_tuple(AggState *aggstate) return NULL; slot = aggstate->sort_slot; } + else if (current_phase->store_in) + { + /* make sure we check for interrupts in either path through here */ + CHECK_FOR_INTERRUPTS(); + if (!tuplestore_gettupleslot(current_phase->store_in, true, false, + aggstate->sort_slot)) + return NULL; + slot = aggstate->sort_slot; + } else slot = ExecProcNode(outerPlanState(aggstate)); @@ -1597,6 +1616,9 @@ ExecAgg(PlanState *pstate) CHECK_FOR_INTERRUPTS(); + if (node->groupingsets_preprocess) + agg_preprocess_groupingsets(node); + if (!node->agg_done) { /* Dispatch based on strategy */ @@ -1637,7 +1659,7 @@ agg_retrieve_direct(AggState *aggstate) TupleTableSlot *outerslot; TupleTableSlot *firstSlot; TupleTableSlot *result; - bool hasGroupingSets = aggstate->phase->aggnode->groupingSets != NULL; + bool hasGroupingSets = aggstate->phase->aggnode->rollup != NULL; int numGroupingSets = aggstate->phase->numsets; int currentSet; int nextSetSize; @@ -1970,6 +1992,135 @@ agg_retrieve_direct(AggState *aggstate) return NULL; } +/* + * Routine for final phase of partial grouping sets: + * + * Preprocess tuples for final phase of grouping sets. In initial phase, + * tuples is decorated with a grouping set ID and in the final phase, all + * grouping set are fit into different aggregate phases, so we need to + * redirect the tuples to each aggregate phases according to the grouping + * set ID. + */ +static void +agg_preprocess_groupingsets(AggState *aggstate) +{ + AggStatePerPhaseSort persort; + AggStatePerPhaseHash perhash; + AggStatePerPhase phase; + TupleTableSlot *outerslot; + ExprContext *tmpcontext = aggstate->tmpcontext; + int phaseidx; + + Assert(DO_AGGSPLIT_COMBINE(aggstate->aggsplit)); + Assert(aggstate->groupingsets_preprocess); + + /* Initialize tuples storage for each aggregate phases */ + for (phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++) + { + phase = aggstate->phases[phaseidx]; + + if (!phase->is_hashed) + { + persort = (AggStatePerPhaseSort) phase; + if (phase->aggnode->sortnode) + { + Sort *sortnode = (Sort *) phase->aggnode->sortnode; + PlanState *outerNode = outerPlanState(aggstate); + TupleDesc tupDesc = ExecGetResultType(outerNode); + + persort->sort_in = tuplesort_begin_heap(tupDesc, + sortnode->numCols, + sortnode->sortColIdx, + sortnode->sortOperators, + sortnode->collations, + sortnode->nullsFirst, + work_mem, + NULL, false); + } + else + { + persort->store_in = tuplestore_begin_heap(false, false, work_mem); + } + } + else + { + /* + * If it's a AGG_HASHED, we don't need a storage to store + * the tuples for later process, we can do the transition + * immediately. + */ + } + } + + for (;;) + { + Datum ret; + bool isNull; + int setid; + + outerslot = ExecProcNode(outerPlanState(aggstate)); + if (TupIsNull(outerslot)) + break; + + tmpcontext->ecxt_outertuple = outerslot; + + /* Finger out which group set the tuple belongs to ?*/ + ret = ExecEvalExprSwitchContext(aggstate->gsetid, tmpcontext, &isNull); + + setid = DatumGetInt32(ret); + phase = aggstate->phases[aggstate->gsetid_phaseidxs[setid]]; + + if (!phase->is_hashed) + { + persort = (AggStatePerPhaseSort) phase; + + Assert(persort->sort_in || persort->store_in); + + if (persort->sort_in) + tuplesort_puttupleslot(persort->sort_in, outerslot); + else if (persort->store_in) + tuplestore_puttupleslot(persort->store_in, outerslot); + } + else + { + int hash; + bool dummynull; + + perhash = (AggStatePerPhaseHash) phase; + + /* If it is hashed, we can do the transition now. */ + select_current_set(aggstate, 0, true); + prepare_hash_slot(aggstate, perhash); + hash = TupleHashTableHash(perhash->hashtable, perhash->hashslot); + lookup_hash_entry(aggstate, perhash, hash); + + ExecEvalExprSwitchContext(phase->evaltrans, + tmpcontext, + &dummynull); + } + + ResetExprContext(aggstate->tmpcontext); + } + + /* Sort the first phase if needed */ + if (aggstate->aggstrategy != AGG_HASHED) + { + persort = (AggStatePerPhaseSort) aggstate->phase; + + if (persort->sort_in) + tuplesort_performsort(persort->sort_in); + } + + /* Mark the hash table to be filled */ + aggstate->table_filled = true; + + /* Mark the input table to be sorted */ + aggstate->input_sorted = true; + + /* Mark the flag to not preprocessing groupingsets again */ + aggstate->groupingsets_preprocess = false; +} + static void agg_sort_input(AggState *aggstate) { @@ -2246,21 +2397,22 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggstate->grp_firstTuple = NULL; aggstate->input_sorted = true; aggstate->eflags = eflags; + aggstate->groupingsets_preprocess = false; /* * Calculate the maximum number of grouping sets in any phase; this * determines the size of some allocations. */ - if (node->groupingSets) + if (node->rollup) { - numGroupingSets = list_length(node->groupingSets); + numGroupingSets = list_length(node->rollup->gsets); foreach(l, node->chain) { Agg *agg = lfirst(l); numGroupingSets = Max(numGroupingSets, - list_length(agg->groupingSets)); + list_length(agg->rollup->gsets)); if (agg->aggstrategy != AGG_HASHED) need_extra_slot = true; @@ -2270,6 +2422,28 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggstate->maxsets = numGroupingSets; aggstate->numphases = 1 + list_length(node->chain); + /* + * We are doing final stage of partial groupingsets, do preprocess + * to input tuples first, redirect the tuples to according aggregate + * phases. See agg_preprocess_groupingsets(). + */ + if (node->rollup && DO_AGGSPLIT_COMBINE(aggstate->aggsplit)) + { + aggstate->groupingsets_preprocess = true; + + /* + * Allocate gsetid <-> phases mapping, in final stage of + * partial groupingsets, all grouping sets are extracted + * to individual phases, so the number of sets is equal + * to the number of phases + */ + aggstate->gsetid_phaseidxs = + (int *) palloc0(aggstate->numphases * sizeof(int)); + + if (aggstate->aggstrategy != AGG_HASHED) + need_extra_slot = true; + } + /* * The first phase is not sorted, agg need to do its own sort. See * agg_sort_input(), this can only happen in groupingsets case. @@ -2384,6 +2558,12 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggstate->ss.ps.qual = ExecInitQual(node->plan.qual, (PlanState *) aggstate); + /* + * Initialize expression state to fetch grouping set id from + * the partial groupingsets aggregate result. + */ + aggstate->gsetid = + ExecInitExpr(node->gsetid, (PlanState *)aggstate); /* * We should now have found all Aggrefs in the targetlist and quals. */ @@ -2431,6 +2611,22 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) all_grouped_cols = bms_add_members(all_grouped_cols, cols); /* + * In the initial stage of partial grouping sets, it provides extra + * grouping sets ID in the targetlist, fill the setno <-> gsetid + * map, so EEOP_GROUPING_SET_ID can evaluate correct gsetid for + * the output. + */ + if (aggnode->rollup && + DO_AGGSPLIT_SERIALIZE(aggnode->aggsplit)) + { + GroupingSetData *gs; + phasedata->setno_gsetids = palloc(sizeof(int)); + gs = linitial_node(GroupingSetData, + aggnode->rollup->gsets_data); + phasedata->setno_gsetids[0] = gs->setId; + } + + /* * Initialize pergroup state. For AGG_HASHED, all groups do transition * on the fly, all pergroup states are kept in hashtable, everytime * a tuple is processed, lookup_hash_entry() choose one group and @@ -2448,8 +2644,12 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) * we can do the transition immediately when a tuple is fetched, * which means we can do the transition concurrently with the * first phase. + * + * Note: this is not work for final phase of partial groupingsets in + * which the partial input tuple has a specified target aggregate + * phase. */ - if (phaseidx > 0) + if (phaseidx > 0 && !aggstate->groupingsets_preprocess) { aggstate->phases[0]->concurrent_hashes = lappend(aggstate->phases[0]->concurrent_hashes, perhash); @@ -2467,17 +2667,19 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) phasedata->aggnode = aggnode; phasedata->aggstrategy = aggnode->aggstrategy; - if (aggnode->groupingSets) + if (aggnode->rollup) { - phasedata->numsets = list_length(aggnode->groupingSets); + phasedata->numsets = list_length(aggnode->rollup->gsets_data); phasedata->gset_lengths = palloc(phasedata->numsets * sizeof(int)); phasedata->grouped_cols = palloc(phasedata->numsets * sizeof(Bitmapset *)); + phasedata->setno_gsetids = palloc(phasedata->numsets * sizeof(int)); i = 0; - foreach(l, aggnode->groupingSets) + foreach(l, aggnode->rollup->gsets_data) { - int current_length = list_length(lfirst(l)); - Bitmapset *cols = NULL; + GroupingSetData *gs = lfirst_node(GroupingSetData, l); + int current_length = list_length(gs->set); + Bitmapset *cols = NULL; /* planner forces this to be correct */ for (j = 0; j < current_length; ++j) @@ -2486,6 +2688,15 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) phasedata->grouped_cols[i] = cols; phasedata->gset_lengths[i] = current_length; + /* + * In the initial stage of partial grouping sets, it provides extra + * grouping sets ID in the targetlist, fill the setno <-> gsetid + * map, so EEOP_GROUPING_SET_ID can evaluate correct gsetid for + * the output. + */ + if (DO_AGGSPLIT_SERIALIZE(aggstate->aggsplit)) + phasedata->setno_gsetids[i] = gs->setId; + ++i; } @@ -2562,8 +2773,11 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) * For non-first AGG_SORTED phase, it processes the same input * tuples with previous phase except that it need to resort the * input tuples. Tell the previous phase to copy out the tuples. + * + * Note: it doesn't work for final stage of partial grouping sets + * in which tuple has specified target aggregate phase. */ - if (phaseidx > 0) + if (phaseidx > 0 && !aggstate->groupingsets_preprocess) { AggStatePerPhaseSort prev = (AggStatePerPhaseSort) aggstate->phases[phaseidx - 1]; @@ -2574,6 +2788,18 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) } } + /* + * Fill the gsetid_phaseidxs array, so we can find according phases + * using gsetid. + */ + if (aggstate->groupingsets_preprocess) + { + GroupingSetData *gs = + linitial_node(GroupingSetData, aggnode->rollup->gsets_data); + + aggstate->gsetid_phaseidxs[gs->setId] = phaseidx; + } + aggstate->phases[phaseidx] = phasedata; } @@ -3461,6 +3687,8 @@ ExecEndAgg(AggState *node) persort = (AggStatePerPhaseSort) phase; if (persort->sort_in) tuplesort_end(persort->sort_in); + if (persort->store_in) + tuplestore_end(persort->store_in); } for (transno = 0; transno < node->numtrans; transno++) @@ -3643,6 +3871,13 @@ ExecReScanAgg(AggState *node) } } + /* + * if the agg is doing final stage of partial groupingsets, reset the + * flag to do groupingsets preprocess again. + */ + if (aggnode->rollup && DO_AGGSPLIT_COMBINE(node->aggsplit)) + node->groupingsets_preprocess = true; + /* reset to phase 0 */ initialize_phase(node, 0); diff --git a/src/backend/jit/llvm/llvmjit_expr.c b/src/backend/jit/llvm/llvmjit_expr.c index 066cd59554..f442442269 100644 --- a/src/backend/jit/llvm/llvmjit_expr.c +++ b/src/backend/jit/llvm/llvmjit_expr.c @@ -1882,6 +1882,46 @@ llvm_compile_expr(ExprState *state) LLVMBuildBr(b, opblocks[opno + 1]); break; + case EEOP_GROUPING_SET_ID: + { + LLVMValueRef v_resvalue; + LLVMValueRef v_aggstatep; + LLVMValueRef v_phase; + LLVMValueRef v_current_set; + LLVMValueRef v_setno_gsetids; + + v_aggstatep = + LLVMBuildBitCast(b, v_parent, l_ptr(StructAggState), ""); + + /* + * op->resvalue = + * aggstate->phase->setno_gsetids + * [aggstate->current_set] + */ + v_phase = + l_load_struct_gep(b, v_aggstatep, + FIELDNO_AGGSTATE_PHASE, + "aggstate.phase"); + v_setno_gsetids = + l_load_struct_gep(b, v_phase, + FIELDNO_AGGSTATEPERPHASE_SETNOGSETIDS, + "aggstateperphase.setno_gsetids"); + v_current_set = + l_load_struct_gep(b, v_aggstatep, + FIELDNO_AGGSTATE_CURRENT_SET, + "aggstate.current_set"); + v_resvalue = + l_load_gep1(b, v_setno_gsetids, v_current_set, ""); + v_resvalue = + LLVMBuildZExt(b, v_resvalue, TypeSizeT, ""); + + LLVMBuildStore(b, v_resvalue, v_resvaluep); + LLVMBuildStore(b, l_sbool_const(0), v_resnullp); + + LLVMBuildBr(b, opblocks[opno + 1]); + break; + } + case EEOP_WINDOW_FUNC: { WindowFuncExprState *wfunc = op->d.window_func.wfstate; diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 04b4c65858..de4dcfe165 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -990,8 +990,9 @@ _copyAgg(const Agg *from) COPY_SCALAR_FIELD(numGroups); COPY_SCALAR_FIELD(transitionSpace); COPY_BITMAPSET_FIELD(aggParams); - COPY_NODE_FIELD(groupingSets); + COPY_NODE_FIELD(rollup); COPY_NODE_FIELD(chain); + COPY_NODE_FIELD(gsetid); COPY_NODE_FIELD(sortnode); return newnode; @@ -1478,6 +1479,50 @@ _copyGroupingFunc(const GroupingFunc *from) return newnode; } +/* + * _copyGroupingSetId + */ +static GroupingSetId * +_copyGroupingSetId(const GroupingSetId *from) +{ + GroupingSetId *newnode = makeNode(GroupingSetId); + + return newnode; +} + +/* + * _copyRollupData + */ +static RollupData* +_copyRollupData(const RollupData *from) +{ + RollupData *newnode = makeNode(RollupData); + + COPY_NODE_FIELD(groupClause); + COPY_NODE_FIELD(gsets); + COPY_NODE_FIELD(gsets_data); + COPY_SCALAR_FIELD(numGroups); + COPY_SCALAR_FIELD(hashable); + COPY_SCALAR_FIELD(is_hashed); + + return newnode; +} + +/* + * _copyGroupingSetData + */ +static GroupingSetData * +_copyGroupingSetData(const GroupingSetData *from) +{ + GroupingSetData *newnode = makeNode(GroupingSetData); + + COPY_NODE_FIELD(set); + COPY_SCALAR_FIELD(setId); + COPY_SCALAR_FIELD(numGroups); + + return newnode; +} + /* * _copyWindowFunc */ @@ -4972,6 +5017,9 @@ copyObjectImpl(const void *from) case T_GroupingFunc: retval = _copyGroupingFunc(from); break; + case T_GroupingSetId: + retval = _copyGroupingSetId(from); + break; case T_WindowFunc: retval = _copyWindowFunc(from); break; @@ -5608,6 +5656,12 @@ copyObjectImpl(const void *from) case T_SortGroupClause: retval = _copySortGroupClause(from); break; + case T_RollupData: + retval = _copyRollupData(from); + break; + case T_GroupingSetData: + retval = _copyGroupingSetData(from); + break; case T_GroupingSet: retval = _copyGroupingSet(from); break; diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 88b912977e..6aa71d3723 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -3078,6 +3078,9 @@ equal(const void *a, const void *b) case T_GroupingFunc: retval = _equalGroupingFunc(a, b); break; + case T_GroupingSetId: + retval = true; + break; case T_WindowFunc: retval = _equalWindowFunc(a, b); break; diff --git a/src/backend/nodes/nodeFuncs.c b/src/backend/nodes/nodeFuncs.c index d85ca9f7c5..877ea0bc16 100644 --- a/src/backend/nodes/nodeFuncs.c +++ b/src/backend/nodes/nodeFuncs.c @@ -62,6 +62,9 @@ exprType(const Node *expr) case T_GroupingFunc: type = INT4OID; break; + case T_GroupingSetId: + type = INT4OID; + break; case T_WindowFunc: type = ((const WindowFunc *) expr)->wintype; break; @@ -740,6 +743,9 @@ exprCollation(const Node *expr) case T_GroupingFunc: coll = InvalidOid; break; + case T_GroupingSetId: + coll = InvalidOid; + break; case T_WindowFunc: coll = ((const WindowFunc *) expr)->wincollid; break; @@ -1869,6 +1875,7 @@ expression_tree_walker(Node *node, case T_NextValueExpr: case T_RangeTblRef: case T_SortGroupClause: + case T_GroupingSetId: /* primitive node types with no expression subnodes */ break; case T_WithCheckOption: @@ -2575,6 +2582,7 @@ expression_tree_mutator(Node *node, case T_NextValueExpr: case T_RangeTblRef: case T_SortGroupClause: + case T_GroupingSetId: return (Node *) copyObject(node); case T_WithCheckOption: { diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 5816d122c1..efcb1c7d4f 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -785,8 +785,9 @@ _outAgg(StringInfo str, const Agg *node) WRITE_LONG_FIELD(numGroups); WRITE_UINT64_FIELD(transitionSpace); WRITE_BITMAPSET_FIELD(aggParams); - WRITE_NODE_FIELD(groupingSets); + WRITE_NODE_FIELD(rollup); WRITE_NODE_FIELD(chain); + WRITE_NODE_FIELD(gsetid); WRITE_NODE_FIELD(sortnode); } @@ -1150,6 +1151,13 @@ _outGroupingFunc(StringInfo str, const GroupingFunc *node) WRITE_LOCATION_FIELD(location); } +static void +_outGroupingSetId(StringInfo str, + const GroupingSetId *node __attribute__((unused))) +{ + WRITE_NODE_TYPE("GROUPINGSETID"); +} + static void _outWindowFunc(StringInfo str, const WindowFunc *node) { @@ -2002,6 +2010,7 @@ _outGroupingSetData(StringInfo str, const GroupingSetData *node) WRITE_NODE_TYPE("GSDATA"); WRITE_NODE_FIELD(set); + WRITE_INT_FIELD(setId); WRITE_FLOAT_FIELD(numGroups, "%.0f"); } @@ -3847,6 +3856,9 @@ outNode(StringInfo str, const void *obj) case T_GroupingFunc: _outGroupingFunc(str, obj); break; + case T_GroupingSetId: + _outGroupingSetId(str, obj); + break; case T_WindowFunc: _outWindowFunc(str, obj); break; diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index af4fcfe1ee..c9a3340f58 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -636,6 +636,50 @@ _readGroupingFunc(void) READ_DONE(); } +/* + * _readGroupingSetId + */ +static GroupingSetId * +_readGroupingSetId(void) +{ + READ_LOCALS_NO_FIELDS(GroupingSetId); + + READ_DONE(); +} + +/* + * _readRollupData + */ +static RollupData * +_readRollupData(void) +{ + READ_LOCALS(RollupData); + + READ_NODE_FIELD(groupClause); + READ_NODE_FIELD(gsets); + READ_NODE_FIELD(gsets_data); + READ_FLOAT_FIELD(numGroups); + READ_BOOL_FIELD(hashable); + READ_BOOL_FIELD(is_hashed); + + READ_DONE(); +} + +/* + * _readGroupingSetData + */ +static GroupingSetData * +_readGroupingSetData(void) +{ + READ_LOCALS(GroupingSetData); + + READ_NODE_FIELD(set); + READ_INT_FIELD(setId); + READ_FLOAT_FIELD(numGroups); + + READ_DONE(); +} + /* * _readWindowFunc */ @@ -2205,8 +2249,9 @@ _readAgg(void) READ_LONG_FIELD(numGroups); READ_UINT64_FIELD(transitionSpace); READ_BITMAPSET_FIELD(aggParams); - READ_NODE_FIELD(groupingSets); + READ_NODE_FIELD(rollup); READ_NODE_FIELD(chain); + READ_NODE_FIELD(gsetid); READ_NODE_FIELD(sortnode); READ_DONE(); @@ -2642,6 +2687,12 @@ parseNodeString(void) return_value = _readAggref(); else if (MATCH("GROUPINGFUNC", 12)) return_value = _readGroupingFunc(); + else if (MATCH("GROUPINGSETID", 13)) + return_value = _readGroupingSetId(); + else if (MATCH("ROLLUP", 6)) + return_value = _readRollupData(); + else if (MATCH("GSDATA", 6)) + return_value = _readGroupingSetData(); else if (MATCH("WINDOWFUNC", 10)) return_value = _readWindowFunc(); else if (MATCH("SUBSCRIPTINGREF", 15)) diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index 905bbe77d8..e6c7f080e0 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -2710,8 +2710,11 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows) /* * For each useful ordering, we can consider an order-preserving Gather - * Merge. + * Merge. Don't do this for partial groupingsets. */ + if (root->parse->groupingSets) + return; + foreach(lc, rel->partial_pathlist) { Path *subpath = (Path *) lfirst(lc); diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 29f88bf0b7..64205893a3 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -1641,7 +1641,7 @@ create_unique_plan(PlannerInfo *root, UniquePath *best_path, int flags) groupColIdx, groupOperators, groupCollations, - NIL, + NULL, NIL, best_path->path.rows, 0, @@ -2095,7 +2095,7 @@ create_agg_plan(PlannerInfo *root, AggPath *best_path) extract_grouping_ops(best_path->groupClause), extract_grouping_collations(best_path->groupClause, subplan->targetlist), - NIL, + NULL, NIL, best_path->numGroups, best_path->transitionSpace, @@ -2214,7 +2214,6 @@ create_groupingsets_plan(PlannerInfo *root, GroupingSetsPath *best_path) * never be grouping in an UPDATE/DELETE; but let's Assert that. */ Assert(root->inhTargetKind == INHKIND_NONE); - Assert(root->grouping_map == NULL); root->grouping_map = grouping_map; /* @@ -2237,10 +2236,13 @@ create_groupingsets_plan(PlannerInfo *root, GroupingSetsPath *best_path) new_grpColIdx = remap_groupColIdx(root, rollup->groupClause); /* + * In final stage, rollup may contain empty set here + * * FIXME This combination of nested if checks needs some explanation * why we need this particular combination of flags. */ - if (!rollup->is_hashed) + if (!rollup->is_hashed && + list_length(linitial(rollup->gsets)) != 0) { sort_plan = (Plan *) make_sort_from_groupcols(rollup->groupClause, @@ -2264,12 +2266,12 @@ create_groupingsets_plan(PlannerInfo *root, GroupingSetsPath *best_path) agg_plan = (Plan *) make_agg(NIL, NIL, strat, - AGGSPLIT_SIMPLE, + best_path->aggsplit, list_length((List *) linitial(rollup->gsets)), new_grpColIdx, extract_grouping_ops(rollup->groupClause), extract_grouping_collations(rollup->groupClause, subplan->targetlist), - rollup->gsets, + rollup, NIL, rollup->numGroups, best_path->transitionSpace, @@ -2281,8 +2283,7 @@ create_groupingsets_plan(PlannerInfo *root, GroupingSetsPath *best_path) } /* - * Now make the real Agg node - */ + * Now make the real Agg node */ { RollupData *rollup = linitial(rollups); AttrNumber *top_grpColIdx; @@ -2314,12 +2315,12 @@ create_groupingsets_plan(PlannerInfo *root, GroupingSetsPath *best_path) plan = make_agg(build_path_tlist(root, &best_path->path), best_path->qual, best_path->aggstrategy, - AGGSPLIT_SIMPLE, + best_path->aggsplit, numGroupCols, top_grpColIdx, extract_grouping_ops(rollup->groupClause), extract_grouping_collations(rollup->groupClause, subplan->targetlist), - rollup->gsets, + rollup, chain, rollup->numGroups, best_path->transitionSpace, @@ -6221,7 +6222,7 @@ Agg * make_agg(List *tlist, List *qual, AggStrategy aggstrategy, AggSplit aggsplit, int numGroupCols, AttrNumber *grpColIdx, Oid *grpOperators, Oid *grpCollations, - List *groupingSets, List *chain, double dNumGroups, + RollupData *rollup, List *chain, double dNumGroups, Size transitionSpace, Plan *sortnode, Plan *lefttree) { Agg *node = makeNode(Agg); @@ -6240,8 +6241,9 @@ make_agg(List *tlist, List *qual, node->numGroups = numGroups; node->transitionSpace = transitionSpace; node->aggParams = NULL; /* SS_finalize_plan() will fill this */ - node->groupingSets = groupingSets; + node->rollup= rollup; node->chain = chain; + node->gsetid = NULL; node->sortnode = sortnode; plan->qual = qual; diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 68d9c88a53..cedd3e1c9d 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -113,6 +113,7 @@ typedef struct Bitmapset *unhashable_refs; List *unsortable_sets; int *tleref_to_colnum_map; + int num_sets; } grouping_sets_data; /* @@ -126,6 +127,8 @@ typedef struct * clauses per Window */ } WindowClauseSortData; +typedef void (*AddPathCallback) (RelOptInfo *parent_rel, Path *new_path); + /* Local functions */ static Node *preprocess_expression(PlannerInfo *root, Node *expr, int kind); static void preprocess_qual_conditions(PlannerInfo *root, Node *jtnode); @@ -142,7 +145,8 @@ static double preprocess_limit(PlannerInfo *root, static void remove_useless_groupby_columns(PlannerInfo *root); static List *preprocess_groupclause(PlannerInfo *root, List *force); static List *extract_rollup_sets(List *groupingSets); -static List *reorder_grouping_sets(List *groupingSets, List *sortclause); +static List *reorder_grouping_sets(grouping_sets_data *gd, + List *groupingSets, List *sortclause); static void standard_qp_callback(PlannerInfo *root, void *extra); static double get_number_of_groups(PlannerInfo *root, double path_rows, @@ -176,7 +180,11 @@ static void consider_groupingsets_paths(PlannerInfo *root, grouping_sets_data *gd, const AggClauseCosts *agg_costs, double dNumGroups, - AggStrategy strat); + List *havingQual, + AggStrategy strat, + AggSplit aggsplit, + AddPathCallback add_path_fn); + static RelOptInfo *create_window_paths(PlannerInfo *root, RelOptInfo *input_rel, PathTarget *input_target, @@ -250,6 +258,9 @@ static bool group_by_has_partkey(RelOptInfo *input_rel, List *groupClause); static int common_prefix_cmp(const void *a, const void *b); +static List *extract_final_rollups(PlannerInfo *root, + grouping_sets_data *gd, + List *rollups); /***************************************************************************** * @@ -2494,6 +2505,7 @@ preprocess_grouping_sets(PlannerInfo *root) GroupingSetData *gs = makeNode(GroupingSetData); gs->set = gset; + gs->setId = gd->num_sets++; gd->unsortable_sets = lappend(gd->unsortable_sets, gs); /* @@ -2538,7 +2550,7 @@ preprocess_grouping_sets(PlannerInfo *root) * largest-member-first, and applies the GroupingSetData annotations, * though the data will be filled in later. */ - current_sets = reorder_grouping_sets(current_sets, + current_sets = reorder_grouping_sets(gd, current_sets, (list_length(sets) == 1 ? parse->sortClause : NIL)); @@ -3547,7 +3559,7 @@ extract_rollup_sets(List *groupingSets) * gets implemented in one pass.) */ static List * -reorder_grouping_sets(List *groupingsets, List *sortclause) +reorder_grouping_sets(grouping_sets_data *gd, List *groupingsets, List *sortclause) { ListCell *lc; List *previous = NIL; @@ -3581,6 +3593,7 @@ reorder_grouping_sets(List *groupingsets, List *sortclause) previous = list_concat(previous, new_elems); gs->set = list_copy(previous); + gs->setId = gd->num_sets++; result = lcons(gs, result); } @@ -4191,8 +4204,14 @@ create_ordinary_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel, * The caller specifies the preferred aggregate strategy (sorted or hashed) using * the strat aprameter. When the requested strategy is AGG_SORTED, the input path * needs to be sorted accordingly (is_sorted needs to be true). + * + * The caller also needs to specify a callback used to add the path to the + * appropriate list - we can't simply use add_path, because with partial + * aggregation (PARTITIONWISE_AGGREGATE_PARTIAL) the path may need to be + * added to grouped_rel->pathlist. And aggsplit value is not sufficient to + * make a decision. */ -static void +static void consider_groupingsets_paths(PlannerInfo *root, RelOptInfo *grouped_rel, Path *path, @@ -4201,9 +4220,11 @@ consider_groupingsets_paths(PlannerInfo *root, grouping_sets_data *gd, const AggClauseCosts *agg_costs, double dNumGroups, - AggStrategy strat) + List *havingQual, + AggStrategy strat, + AggSplit aggsplit, + AddPathCallback add_path_fn) { - Query *parse = root->parse; Assert(strat == AGG_HASHED || strat == AGG_SORTED); /* @@ -4367,16 +4388,20 @@ consider_groupingsets_paths(PlannerInfo *root, strat = AGG_MIXED; } - add_path(grouped_rel, (Path *) - create_groupingsets_path(root, - grouped_rel, - path, - (List *) parse->havingQual, - strat, - new_rollups, - agg_costs, - dNumGroups, - is_sorted)); + if (DO_AGGSPLIT_COMBINE(aggsplit)) + new_rollups = extract_final_rollups(root, gd, new_rollups); + + add_path_fn(grouped_rel, (Path *) + create_groupingsets_path(root, + grouped_rel, + path, + havingQual, + strat, + new_rollups, + agg_costs, + dNumGroups, + aggsplit, + is_sorted)); return; } @@ -4388,7 +4413,7 @@ consider_groupingsets_paths(PlannerInfo *root, /* * Callers consider AGG_SORTED strategy, the first rollup must - * use non-hashed aggregate, 'is_sorted' tells whether the first + * use non-hashed aggregate, is_sorted tells whether the first * rollup need to do its own sort. * * we try and make two paths: one sorted and one mixed @@ -4533,16 +4558,20 @@ consider_groupingsets_paths(PlannerInfo *root, if (rollups) { - add_path(grouped_rel, (Path *) - create_groupingsets_path(root, - grouped_rel, - path, - (List *) parse->havingQual, - AGG_MIXED, - rollups, - agg_costs, - dNumGroups, - is_sorted)); + if (DO_AGGSPLIT_COMBINE(aggsplit)) + rollups = extract_final_rollups(root, gd, rollups); + + add_path_fn(grouped_rel, (Path *) + create_groupingsets_path(root, + grouped_rel, + path, + havingQual, + AGG_MIXED, + rollups, + agg_costs, + dNumGroups, + aggsplit, + is_sorted)); } } @@ -4550,16 +4579,82 @@ consider_groupingsets_paths(PlannerInfo *root, * Now try the simple sorted case. */ if (!gd->unsortable_sets) - add_path(grouped_rel, (Path *) - create_groupingsets_path(root, - grouped_rel, - path, - (List *) parse->havingQual, - AGG_SORTED, - gd->rollups, - agg_costs, - dNumGroups, - is_sorted)); + { + List *rollups; + + if (DO_AGGSPLIT_COMBINE(aggsplit)) + rollups = extract_final_rollups(root, gd, gd->rollups); + else + rollups = gd->rollups; + + add_path_fn(grouped_rel, (Path *) + create_groupingsets_path(root, + grouped_rel, + path, + havingQual, + AGG_SORTED, + rollups, + agg_costs, + dNumGroups, + aggsplit, + is_sorted)); + } +} + +/* + * If we are combining the partial groupingsets aggregation, the input is + * mixed with tuples from different grouping sets, executor dispatch the + * tuples to different rollups (phases) according to the grouping set id. + * + * We cannot use the same rollups with initial stage in which each tuple + * is processed by one or more grouping sets in one rollup, because in + * combining stage, each tuple only belong to one single grouping set. + * In this case, we use final_rollups instead in which each rollup has + * only one grouping set. + */ +static List * +extract_final_rollups(PlannerInfo *root, + grouping_sets_data *gd, + List *rollups) +{ + ListCell *lc; + List *new_rollups = NIL; + + foreach(lc, rollups) + { + ListCell *lc1; + RollupData *rollup = lfirst_node(RollupData, lc); + + foreach(lc1, rollup->gsets_data) + { + GroupingSetData *gs = lfirst_node(GroupingSetData, lc1); + RollupData *new_rollup = makeNode(RollupData); + + if (gs->set != NIL) + { + new_rollup->groupClause = preprocess_groupclause(root, gs->set); + new_rollup->gsets_data = list_make1(gs); + new_rollup->gsets = remap_to_groupclause_idx(new_rollup->groupClause, + new_rollup->gsets_data, + gd->tleref_to_colnum_map); + new_rollup->hashable = rollup->hashable; + new_rollup->is_hashed = rollup->is_hashed; + } + else + { + new_rollup->groupClause = NIL; + new_rollup->gsets_data = list_make1(gs); + new_rollup->gsets = list_make1(NIL); + new_rollup->hashable = false; + new_rollup->is_hashed = false; + } + + new_rollup->numGroups = gs->numGroups; + new_rollups = lappend(new_rollups, new_rollup); + } + } + + return new_rollups; } /* @@ -5269,6 +5364,17 @@ make_partial_grouping_target(PlannerInfo *root, add_new_columns_to_pathtarget(partial_target, non_group_exprs); + /* + * We are generate partial groupingsets path, add an expression to show + * the grouping set ID for a tuple, so in the final stage, executor can + * know which set this tuple belongs to and combine them. + * */ + if (parse->groupingSets) + { + GroupingSetId *expr = makeNode(GroupingSetId); + add_new_column_to_pathtarget(partial_target, (Expr *)expr); + } + /* * Adjust Aggrefs to put them in partial mode. At this point all Aggrefs * are at the top level of the target list, so we can just scan the list @@ -6433,7 +6539,10 @@ add_paths_to_grouping_rel(PlannerInfo *root, RelOptInfo *input_rel, consider_groupingsets_paths(root, grouped_rel, path, is_sorted, can_hash, gd, agg_costs, dNumGroups, - AGG_SORTED); + havingQual, + AGG_SORTED, + AGGSPLIT_SIMPLE, + add_path); continue; } @@ -6494,15 +6603,37 @@ add_paths_to_grouping_rel(PlannerInfo *root, RelOptInfo *input_rel, foreach(lc, partially_grouped_rel->pathlist) { Path *path = (Path *) lfirst(lc); + bool is_sorted; + + is_sorted = pathkeys_contained_in(root->group_pathkeys, + path->pathkeys); + + /* + * Use any available suitably-sorted path as input, and also + * consider sorting the cheapest-total path. + */ + if (path != partially_grouped_rel->cheapest_total_path && + !is_sorted) + continue; + + if (parse->groupingSets) + { + consider_groupingsets_paths(root, grouped_rel, + path, is_sorted, can_hash, + gd, agg_final_costs, dNumGroups, + havingQual, + AGG_SORTED, + AGGSPLIT_FINAL_DESERIAL, + add_path); + continue; + } /* * Insert a Sort node, if required. But there's no point in * sorting anything but the cheapest path. */ - if (!pathkeys_contained_in(root->group_pathkeys, path->pathkeys)) + if (!is_sorted) { - if (path != partially_grouped_rel->cheapest_total_path) - continue; path = (Path *) create_sort_path(root, grouped_rel, path, @@ -6546,7 +6677,10 @@ add_paths_to_grouping_rel(PlannerInfo *root, RelOptInfo *input_rel, consider_groupingsets_paths(root, grouped_rel, cheapest_path, false, true, gd, agg_costs, dNumGroups, - AGG_HASHED); + havingQual, + AGG_HASHED, + AGGSPLIT_SIMPLE, + add_path); } else { @@ -6589,22 +6723,39 @@ add_paths_to_grouping_rel(PlannerInfo *root, RelOptInfo *input_rel, { Path *path = partially_grouped_rel->cheapest_total_path; - hashaggtablesize = estimate_hashagg_tablesize(path, - agg_final_costs, - dNumGroups); + if (parse->groupingSets) + { + /* + * Try for a hash-only groupingsets path over unsorted input. + */ + consider_groupingsets_paths(root, grouped_rel, + path, false, true, + gd, agg_final_costs, dNumGroups, + havingQual, + AGG_HASHED, + AGGSPLIT_FINAL_DESERIAL, + add_path); + } + else + { - if (hashaggtablesize < work_mem * 1024L) - add_path(grouped_rel, (Path *) - create_agg_path(root, - grouped_rel, - path, - grouped_rel->reltarget, - AGG_HASHED, - AGGSPLIT_FINAL_DESERIAL, - parse->groupClause, - havingQual, - agg_final_costs, - dNumGroups)); + hashaggtablesize = estimate_hashagg_tablesize(path, + agg_final_costs, + dNumGroups); + + if (hashaggtablesize < work_mem * 1024L) + add_path(grouped_rel, (Path *) + create_agg_path(root, + grouped_rel, + path, + grouped_rel->reltarget, + AGG_HASHED, + AGGSPLIT_FINAL_DESERIAL, + parse->groupClause, + havingQual, + agg_final_costs, + dNumGroups)); + } } } @@ -6814,6 +6965,19 @@ create_partial_grouping_paths(PlannerInfo *root, path->pathkeys); if (path == cheapest_partial_path || is_sorted) { + if (parse->groupingSets) + { + consider_groupingsets_paths(root, partially_grouped_rel, + path, is_sorted, can_hash, + gd, agg_partial_costs, + dNumPartialPartialGroups, + NIL, + AGG_SORTED, + AGGSPLIT_INITIAL_SERIAL, + add_partial_path); + continue; + } + /* Sort the cheapest partial path, if it isn't already */ if (!is_sorted) path = (Path *) create_sort_path(root, @@ -6821,7 +6985,7 @@ create_partial_grouping_paths(PlannerInfo *root, path, root->group_pathkeys, -1.0); - + if (parse->hasAggs) add_partial_path(partially_grouped_rel, (Path *) create_agg_path(root, @@ -6883,26 +7047,41 @@ create_partial_grouping_paths(PlannerInfo *root, { double hashaggtablesize; - hashaggtablesize = - estimate_hashagg_tablesize(cheapest_partial_path, - agg_partial_costs, - dNumPartialPartialGroups); - - /* Do the same for partial paths. */ - if (hashaggtablesize < work_mem * 1024L && - cheapest_partial_path != NULL) + if (parse->groupingSets) { - add_partial_path(partially_grouped_rel, (Path *) - create_agg_path(root, - partially_grouped_rel, - cheapest_partial_path, - partially_grouped_rel->reltarget, - AGG_HASHED, - AGGSPLIT_INITIAL_SERIAL, - parse->groupClause, - NIL, - agg_partial_costs, - dNumPartialPartialGroups)); + consider_groupingsets_paths(root, partially_grouped_rel, + cheapest_partial_path, + false, true, + gd, agg_partial_costs, + dNumPartialPartialGroups, + NIL, + AGG_HASHED, + AGGSPLIT_INITIAL_SERIAL, + add_partial_path); + } + else + { + hashaggtablesize = + estimate_hashagg_tablesize(cheapest_partial_path, + agg_partial_costs, + dNumPartialPartialGroups); + + /* Do the same for partial paths. */ + if (hashaggtablesize < work_mem * 1024L && + cheapest_partial_path != NULL) + { + add_partial_path(partially_grouped_rel, (Path *) + create_agg_path(root, + partially_grouped_rel, + cheapest_partial_path, + partially_grouped_rel->reltarget, + AGG_HASHED, + AGGSPLIT_INITIAL_SERIAL, + parse->groupClause, + NIL, + agg_partial_costs, + dNumPartialPartialGroups)); + } } } @@ -6946,6 +7125,9 @@ gather_grouping_paths(PlannerInfo *root, RelOptInfo *rel) generate_gather_paths(root, rel, true); /* Try cheapest partial path + explicit Sort + Gather Merge. */ + if (root->parse->groupingSets) + return; + cheapest_partial_path = linitial(rel->partial_pathlist); if (!pathkeys_contained_in(root->group_pathkeys, cheapest_partial_path->pathkeys)) @@ -6990,11 +7172,6 @@ can_partial_agg(PlannerInfo *root, const AggClauseCosts *agg_costs) */ return false; } - else if (parse->groupingSets) - { - /* We don't know how to do grouping sets in parallel. */ - return false; - } else if (agg_costs->hasNonPartial || agg_costs->hasNonSerial) { /* Insufficient support for partial mode. */ diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index 3dcded506b..eae7d15701 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -754,6 +754,22 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) plan->qual = (List *) convert_combining_aggrefs((Node *) plan->qual, NULL); + + /* + * If it's groupingsets, we must add expression to evaluate + * the grouping set ID and set the reference from the + * targetlist of child plan node. + */ + if (agg->rollup) + { + GroupingSetId *expr = makeNode(GroupingSetId); + indexed_tlist *subplan_itlist = build_tlist_index(plan->lefttree->targetlist); + + agg->gsetid = (Expr *) fix_upper_expr(root, (Node *)expr, + subplan_itlist, + OUTER_VAR, + rtoffset); + } } set_upper_references(root, plan, rtoffset); diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index ff8f676dfb..9fe6f6a003 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -2994,6 +2994,7 @@ create_groupingsets_path(PlannerInfo *root, List *rollups, const AggClauseCosts *agg_costs, double numGroups, + AggSplit aggsplit, bool is_sorted) { GroupingSetsPath *pathnode = makeNode(GroupingSetsPath); @@ -3011,6 +3012,7 @@ create_groupingsets_path(PlannerInfo *root, subpath->parallel_safe; pathnode->path.parallel_workers = subpath->parallel_workers; pathnode->subpath = subpath; + pathnode->aggsplit = aggsplit; pathnode->is_sorted = is_sorted; /* @@ -3045,11 +3047,27 @@ create_groupingsets_path(PlannerInfo *root, Assert(aggstrategy != AGG_PLAIN || list_length(rollups) == 1); Assert(aggstrategy != AGG_MIXED || list_length(rollups) > 1); + /* + * Estimate the cost of groupingsets. + * + * If we are finalizing grouping sets, the subpath->rows + * contains rows from all sets, we need to estimate the + * number of rows in each rollup. Meanwhile, the cost of + * preprocess groupingsets is not estimated, the expression + * to redirect tuples is a simple Var expression which is + * normally cost zero. + */ foreach(lc, rollups) { RollupData *rollup = lfirst(lc); List *gsets = rollup->gsets; int numGroupCols = list_length(linitial(gsets)); + int rows = 0.0; + + if (DO_AGGSPLIT_COMBINE(aggsplit)) + rows = rollup->numGroups * subpath->rows / numGroups; + else + rows = subpath->rows; /* * In AGG_SORTED or AGG_PLAIN mode, the first rollup do its own @@ -3071,7 +3089,7 @@ create_groupingsets_path(PlannerInfo *root, cost_sort(&sort_path, root, NIL, input_total_cost, - subpath->rows, + rows, subpath->pathtarget->width, 0.0, work_mem, @@ -3089,7 +3107,7 @@ create_groupingsets_path(PlannerInfo *root, having_qual, input_startup_cost, input_total_cost, - subpath->rows); + rows); is_first = false; } @@ -3101,7 +3119,6 @@ create_groupingsets_path(PlannerInfo *root, sort_path.startup_cost = 0; sort_path.total_cost = 0; - sort_path.rows = subpath->rows; rollup_strategy = rollup->is_hashed ? AGG_HASHED : (numGroupCols ? AGG_SORTED : AGG_PLAIN); @@ -3111,7 +3128,7 @@ create_groupingsets_path(PlannerInfo *root, /* Account for cost of sort, but don't charge input cost again */ cost_sort(&sort_path, root, NIL, 0.0, - subpath->rows, + rows, subpath->pathtarget->width, 0.0, work_mem, @@ -3127,7 +3144,7 @@ create_groupingsets_path(PlannerInfo *root, having_qual, sort_path.startup_cost, sort_path.total_cost, - sort_path.rows); + rows); pathnode->path.total_cost += agg_path.total_cost; pathnode->path.rows += agg_path.rows; diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index 5e63238f03..5779d158ba 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -7941,6 +7941,12 @@ get_rule_expr(Node *node, deparse_context *context, } break; + case T_GroupingSetId: + { + appendStringInfoString(buf, "GROUPINGSETID()"); + } + break; + case T_WindowFunc: get_windowfunc_expr((WindowFunc *) node, context); break; diff --git a/src/include/executor/execExpr.h b/src/include/executor/execExpr.h index 4ed5d0a7de..4d36c2d77b 100644 --- a/src/include/executor/execExpr.h +++ b/src/include/executor/execExpr.h @@ -216,6 +216,7 @@ typedef enum ExprEvalOp EEOP_XMLEXPR, EEOP_AGGREF, EEOP_GROUPING_FUNC, + EEOP_GROUPING_SET_ID, EEOP_WINDOW_FUNC, EEOP_SUBPLAN, EEOP_ALTERNATIVE_SUBPLAN, diff --git a/src/include/executor/nodeAgg.h b/src/include/executor/nodeAgg.h index c5d4121c37..967af08af7 100644 --- a/src/include/executor/nodeAgg.h +++ b/src/include/executor/nodeAgg.h @@ -281,6 +281,8 @@ typedef struct AggStatePerPhaseData List *concurrent_hashes; /* hash phases can do transition concurrently */ AggStatePerGroup *pergroups; /* pergroup states for a phase */ bool skip_build_trans; +#define FIELDNO_AGGSTATEPERPHASE_SETNOGSETIDS 10 + int *setno_gsetids; /* setno <-> gsetid map */ } AggStatePerPhaseData; typedef struct AggStatePerPhaseSortData diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 4081a0978e..dea5b10597 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -2047,6 +2047,7 @@ typedef struct AggState int numtrans; /* number of pertrans items */ AggStrategy aggstrategy; /* strategy mode */ AggSplit aggsplit; /* agg-splitting mode, see nodes.h */ +#define FIELDNO_AGGSTATE_PHASE 6 AggStatePerPhase phase; /* pointer to current phase data */ int numphases; /* number of phases (including phase 0) */ int current_phase; /* current phase number */ @@ -2070,8 +2071,6 @@ typedef struct AggState /* These fields are for grouping set phase data */ int maxsets; /* The max number of sets in any phase */ AggStatePerPhase *phases; /* array of all phases */ - Tuplesortstate *sort_in; /* sorted input to phases > 1 */ - Tuplesortstate *sort_out; /* input is copied here for next phase */ TupleTableSlot *sort_slot; /* slot for sort results */ /* these fields are used in AGG_PLAIN and AGG_SORTED modes: */ HeapTuple grp_firstTuple; /* copy of first tuple of current group */ @@ -2083,6 +2082,11 @@ typedef struct AggState int eflags; /* eflags for the first sort */ ProjectionInfo *combinedproj; /* projection machinery */ + + /* these field are used in parallel grouping sets */ + bool groupingsets_preprocess; /* groupingsets preprocessed yet? */ + ExprState *gsetid; /* expression state to get grpsetid from input */ + int *gsetid_phaseidxs; /* grpsetid <-> phaseidx mapping */ } AggState; /* ---------------- diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 8a76afe8cc..a48a7af0e3 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -153,6 +153,7 @@ typedef enum NodeTag T_Param, T_Aggref, T_GroupingFunc, + T_GroupingSetId, T_WindowFunc, T_SubscriptingRef, T_FuncExpr, diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h index c1e69c808f..2761fa6d01 100644 --- a/src/include/nodes/pathnodes.h +++ b/src/include/nodes/pathnodes.h @@ -1676,6 +1676,7 @@ typedef struct GroupingSetData { NodeTag type; List *set; /* grouping set as list of sortgrouprefs */ + int setId; /* unique grouping set identifier */ double numGroups; /* est. number of result groups */ } GroupingSetData; @@ -1702,6 +1703,7 @@ typedef struct GroupingSetsPath List *rollups; /* list of RollupData */ List *qual; /* quals (HAVING quals), if any */ uint64 transitionSpace; /* for pass-by-ref transition data */ + AggSplit aggsplit; /* agg-splitting mode, see nodes.h */ bool is_sorted; /* input sorted in groupcols of first rollup */ } GroupingSetsPath; diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 3cd2537e9e..5b1239adf2 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -20,6 +20,7 @@ #include "nodes/bitmapset.h" #include "nodes/lockoptions.h" #include "nodes/primnodes.h" +#include "nodes/pathnodes.h" /* ---------------------------------------------------------------- @@ -816,8 +817,9 @@ typedef struct Agg uint64 transitionSpace; /* for pass-by-ref transition data */ Bitmapset *aggParams; /* IDs of Params used in Aggref inputs */ /* Note: planner provides numGroups & aggParams only in HASHED/MIXED case */ - List *groupingSets; /* grouping sets to use */ + RollupData *rollup; /* grouping sets to use */ List *chain; /* chained Agg/Sort nodes */ + Expr *gsetid; /* expression to fetch grouping set id */ Plan *sortnode; /* agg does its own sort, only used by grouping sets now */ } Agg; diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h index d73be2ad46..f8f85d431a 100644 --- a/src/include/nodes/primnodes.h +++ b/src/include/nodes/primnodes.h @@ -364,6 +364,12 @@ typedef struct GroupingFunc int location; /* token location */ } GroupingFunc; +/* GroupingSetId */ +typedef struct GroupingSetId +{ + Expr xpr; +} GroupingSetId; + /* * WindowFunc */ diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index f9f388ba06..4fde8b22bf 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -218,6 +218,7 @@ extern GroupingSetsPath *create_groupingsets_path(PlannerInfo *root, List *rollups, const AggClauseCosts *agg_costs, double numGroups, + AggSplit aggsplit, bool is_sorted); extern MinMaxAggPath *create_minmaxagg_path(PlannerInfo *root, RelOptInfo *rel, diff --git a/src/include/optimizer/planmain.h b/src/include/optimizer/planmain.h index 5954ff3997..e987011328 100644 --- a/src/include/optimizer/planmain.h +++ b/src/include/optimizer/planmain.h @@ -54,7 +54,7 @@ extern Sort *make_sort_from_sortclauses(List *sortcls, Plan *lefttree); extern Agg *make_agg(List *tlist, List *qual, AggStrategy aggstrategy, AggSplit aggsplit, int numGroupCols, AttrNumber *grpColIdx, Oid *grpOperators, Oid *grpCollations, - List *groupingSets, List *chain, double dNumGroups, + RollupData *rollup, List *chain, double dNumGroups, Size transitionSpace, Plan *sortnode, Plan *lefttree); extern Limit *make_limit(Plan *lefttree, Node *limitOffset, Node *limitCount); -- 2.21.1