From 3a6ccb18a97bce531b0c16eb0db48ef182bdc0ad Mon Sep 17 00:00:00 2001 From: Matheus Alcantara Date: Tue, 16 Dec 2025 16:32:14 -0300 Subject: [PATCH v9 3/3] Create execAppend.c to avoid duplicated code on [Merge]Append --- contrib/pg_overexplain/pg_overexplain.c | 4 +- contrib/postgres_fdw/postgres_fdw.c | 8 +- src/backend/commands/explain.c | 26 +- src/backend/executor/Makefile | 1 + src/backend/executor/execAmi.c | 2 +- src/backend/executor/execAppend.c | 409 +++++++++++++++++++ src/backend/executor/execCurrent.c | 4 +- src/backend/executor/execProcnode.c | 8 +- src/backend/executor/meson.build | 1 + src/backend/executor/nodeAppend.c | 498 +++++------------------- src/backend/executor/nodeMergeAppend.c | 417 +++----------------- src/backend/nodes/nodeFuncs.c | 8 +- src/backend/optimizer/plan/createplan.c | 34 +- src/backend/optimizer/plan/setrefs.c | 44 +-- src/backend/optimizer/plan/subselect.c | 4 +- src/backend/utils/adt/ruleutils.c | 8 +- src/include/executor/execAppend.h | 33 ++ src/include/nodes/execnodes.h | 80 ++-- src/include/nodes/plannodes.h | 45 +-- 19 files changed, 721 insertions(+), 913 deletions(-) create mode 100644 src/backend/executor/execAppend.c create mode 100644 src/include/executor/execAppend.h diff --git a/contrib/pg_overexplain/pg_overexplain.c b/contrib/pg_overexplain/pg_overexplain.c index fcdc17012da..7f18c2ab06c 100644 --- a/contrib/pg_overexplain/pg_overexplain.c +++ b/contrib/pg_overexplain/pg_overexplain.c @@ -228,12 +228,12 @@ overexplain_per_node_hook(PlanState *planstate, List *ancestors, break; case T_Append: overexplain_bitmapset("Append RTIs", - ((Append *) plan)->apprelids, + ((Append *) plan)->ap.apprelids, es); break; case T_MergeAppend: overexplain_bitmapset("Append RTIs", - ((MergeAppend *) plan)->apprelids, + ((MergeAppend *) plan)->ap.apprelids, es); break; case T_Result: diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index bd551a1db72..b01ad40ad17 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -2412,8 +2412,8 @@ find_modifytable_subplan(PlannerInfo *root, { Append *appendplan = (Append *) subplan; - if (subplan_index < list_length(appendplan->appendplans)) - subplan = (Plan *) list_nth(appendplan->appendplans, subplan_index); + if (subplan_index < list_length(appendplan->ap.subplans)) + subplan = (Plan *) list_nth(appendplan->ap.subplans, subplan_index); } else if (IsA(subplan, Result) && outerPlan(subplan) != NULL && @@ -2421,8 +2421,8 @@ find_modifytable_subplan(PlannerInfo *root, { Append *appendplan = (Append *) outerPlan(subplan); - if (subplan_index < list_length(appendplan->appendplans)) - subplan = (Plan *) list_nth(appendplan->appendplans, subplan_index); + if (subplan_index < list_length(appendplan->ap.subplans)) + subplan = (Plan *) list_nth(appendplan->ap.subplans, subplan_index); } /* Now, have we got a ForeignScan on the desired rel? */ diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 5a6390631eb..3eaa1f7459e 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -1224,11 +1224,11 @@ ExplainPreScanNode(PlanState *planstate, Bitmapset **rels_used) break; case T_Append: *rels_used = bms_add_members(*rels_used, - ((Append *) plan)->apprelids); + ((Append *) plan)->ap.apprelids); break; case T_MergeAppend: *rels_used = bms_add_members(*rels_used, - ((MergeAppend *) plan)->apprelids); + ((MergeAppend *) plan)->ap.apprelids); break; case T_Result: *rels_used = bms_add_members(*rels_used, @@ -1272,7 +1272,7 @@ plan_is_disabled(Plan *plan) * includes any run-time pruned children. Ignoring those could give * us the incorrect number of disabled nodes. */ - foreach(lc, aplan->appendplans) + foreach(lc, aplan->ap.subplans) { Plan *subplan = lfirst(lc); @@ -1289,7 +1289,7 @@ plan_is_disabled(Plan *plan) * includes any run-time pruned children. Ignoring those could give * us the incorrect number of disabled nodes. */ - foreach(lc, maplan->mergeplans) + foreach(lc, maplan->ap.subplans) { Plan *subplan = lfirst(lc); @@ -2336,13 +2336,13 @@ ExplainNode(PlanState *planstate, List *ancestors, switch (nodeTag(plan)) { case T_Append: - ExplainMissingMembers(((AppendState *) planstate)->as_nplans, - list_length(((Append *) plan)->appendplans), + ExplainMissingMembers(((AppendState *) planstate)->as.nplans, + list_length(((Append *) plan)->ap.subplans), es); break; case T_MergeAppend: - ExplainMissingMembers(((MergeAppendState *) planstate)->ms_nplans, - list_length(((MergeAppend *) plan)->mergeplans), + ExplainMissingMembers(((MergeAppendState *) planstate)->ms.nplans, + list_length(((MergeAppend *) plan)->ap.subplans), es); break; default: @@ -2386,13 +2386,13 @@ ExplainNode(PlanState *planstate, List *ancestors, switch (nodeTag(plan)) { case T_Append: - ExplainMemberNodes(((AppendState *) planstate)->appendplans, - ((AppendState *) planstate)->as_nplans, + ExplainMemberNodes(((AppendState *) planstate)->as.plans, + ((AppendState *) planstate)->as.nplans, ancestors, es); break; case T_MergeAppend: - ExplainMemberNodes(((MergeAppendState *) planstate)->mergeplans, - ((MergeAppendState *) planstate)->ms_nplans, + ExplainMemberNodes(((MergeAppendState *) planstate)->ms.plans, + ((MergeAppendState *) planstate)->ms.nplans, ancestors, es); break; case T_BitmapAnd: @@ -2606,7 +2606,7 @@ static void show_merge_append_keys(MergeAppendState *mstate, List *ancestors, ExplainState *es) { - MergeAppend *plan = (MergeAppend *) mstate->ps.plan; + MergeAppend *plan = (MergeAppend *) mstate->ms.ps.plan; show_sort_group_keys((PlanState *) mstate, "Sort Key", plan->numCols, 0, plan->sortColIdx, diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile index 11118d0ce02..66b62fca921 100644 --- a/src/backend/executor/Makefile +++ b/src/backend/executor/Makefile @@ -15,6 +15,7 @@ include $(top_builddir)/src/Makefile.global OBJS = \ execAmi.o \ execAsync.o \ + execAppend.o \ execCurrent.o \ execExpr.o \ execExprInterp.o \ diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c index 1d0e8ad57b4..5c897048ba3 100644 --- a/src/backend/executor/execAmi.c +++ b/src/backend/executor/execAmi.c @@ -537,7 +537,7 @@ ExecSupportsBackwardScan(Plan *node) if (((Append *) node)->nasyncplans > 0) return false; - foreach(l, ((Append *) node)->appendplans) + foreach(l, ((Append *) node)->ap.subplans) { if (!ExecSupportsBackwardScan((Plan *) lfirst(l))) return false; diff --git a/src/backend/executor/execAppend.c b/src/backend/executor/execAppend.c new file mode 100644 index 00000000000..624fb207882 --- /dev/null +++ b/src/backend/executor/execAppend.c @@ -0,0 +1,409 @@ +/*------------------------------------------------------------------------- + * + * execAppend.c + * This code provides support functions for executing MergeAppend and Append + * nodes. + * + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/executor/execAppend.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" +#include "executor/executor.h" +#include "executor/execAppend.h" +#include "executor/execAsync.h" +#include "executor/execPartition.h" +#include "storage/latch.h" +#include "storage/waiteventset.h" +#include "miscadmin.h" + +#define EVENT_BUFFER_SIZE 16 + +void +ExecInitAppender(AppenderState * state, + Appender * node, + EState *estate, + int eflags, + int first_partial_plan, + int *first_valid_partial_plan) +{ + PlanState **appendplanstates; + const TupleTableSlotOps *appendops; + Bitmapset *validsubplans; + Bitmapset *asyncplans; + int nplans; + int nasyncplans; + int firstvalid; + int i, + j; + + /* If run-time partition pruning is enabled, then set that up now */ + if (node->part_prune_index >= 0) + { + PartitionPruneState *prunestate; + + /* + * Set up pruning data structure. This also initializes the set of + * subplans to initialize (validsubplans) by taking into account the + * result of performing initial pruning if any. + */ + prunestate = ExecInitPartitionExecPruning(&state->ps, + list_length(node->subplans), + node->part_prune_index, + node->apprelids, + &validsubplans); + state->prune_state = prunestate; + nplans = bms_num_members(validsubplans); + + /* + * When no run-time pruning is required and there's at least one + * subplan, we can fill as_valid_subplans immediately, preventing + * later calls to ExecFindMatchingSubPlans. + */ + if (!prunestate->do_exec_prune && nplans > 0) + { + state->valid_subplans = bms_add_range(NULL, 0, nplans - 1); + state->valid_subplans_identified = true; + } + } + else + { + nplans = list_length(node->subplans); + + /* + * When run-time partition pruning is not enabled we can just mark all + * subplans as valid; they must also all be initialized. + */ + Assert(nplans > 0); + state->valid_subplans = validsubplans = + bms_add_range(NULL, 0, nplans - 1); + state->valid_subplans_identified = true; + state->prune_state = NULL; + } + + appendplanstates = (PlanState **) palloc(nplans * sizeof(PlanState *)); + + /* + * call ExecInitNode on each of the valid plans to be executed and save + * the results into the appendplanstates array. + * + * While at it, find out the first valid partial plan. + */ + j = 0; + asyncplans = NULL; + nasyncplans = 0; + firstvalid = nplans; + i = -1; + while ((i = bms_next_member(validsubplans, i)) >= 0) + { + Plan *initNode = (Plan *) list_nth(node->subplans, i); + + /* + * Record async subplans. When executing EvalPlanQual, we treat them + * as sync ones; don't do this when initializing an EvalPlanQual plan + * tree. + */ + if (initNode->async_capable && estate->es_epq_active == NULL) + { + asyncplans = bms_add_member(asyncplans, j); + nasyncplans++; + } + + /* + * Record the lowest appendplans index which is a valid partial plan. + */ + if (first_valid_partial_plan && i >= first_partial_plan && j < firstvalid) + firstvalid = j; + + appendplanstates[j++] = ExecInitNode(initNode, estate, eflags); + } + + if (first_valid_partial_plan) + *first_valid_partial_plan = firstvalid; + + state->plans = appendplanstates; + state->nplans = nplans; + + /* + * Initialize Append's result tuple type and slot. If the child plans all + * produce the same fixed slot type, we can use that slot type; otherwise + * make a virtual slot. (Note that the result slot itself is used only to + * return a null tuple at end of execution; real tuples are returned to + * the caller in the children's own result slots. What we are doing here + * is allowing the parent plan node to optimize if the Append will return + * only one kind of slot.) + */ + appendops = ExecGetCommonSlotOps(appendplanstates, j); + if (appendops != NULL) + { + ExecInitResultTupleSlotTL(&state->ps, appendops); + } + else + { + ExecInitResultTupleSlotTL(&state->ps, &TTSOpsVirtual); + /* show that the output slot type is not fixed */ + state->ps.resultopsset = true; + state->ps.resultopsfixed = false; + } + + /* Initialize async state */ + state->asyncplans = asyncplans; + state->nasyncplans = nasyncplans; + state->asyncrequests = NULL; + state->asyncresults = NULL; + state->needrequest = NULL; + state->eventset = NULL; + state->valid_asyncplans = NULL; + + if (nasyncplans > 0) + { + state->asyncrequests = (AsyncRequest **) + palloc0(nplans * sizeof(AsyncRequest *)); + + i = -1; + while ((i = bms_next_member(asyncplans, i)) >= 0) + { + AsyncRequest *areq; + + areq = palloc_object(AsyncRequest); + areq->requestor = (PlanState *) state; + areq->requestee = appendplanstates[i]; + areq->request_index = i; + areq->callback_pending = false; + areq->request_complete = false; + areq->result = NULL; + + state->asyncrequests[i] = areq; + } + + /* + * AppendState and MergeAppendState have slightly different allocation + * sizes for asyncresults in the original code, but we unify to the + * larger requirement or specific nplans if required. + */ + state->asyncresults = (TupleTableSlot **) + palloc0(nplans * sizeof(TupleTableSlot *)); + } + + /* + * Miscellaneous initialization + */ + state->ps.ps_ProjInfo = NULL; +} + +void +ExecReScanAppender(AppenderState *node) +{ + int i; + int nasyncplans = node->nasyncplans; + + /* + * If any PARAM_EXEC Params used in pruning expressions have changed, then + * we'd better unset the valid subplans so that they are reselected for + * the new parameter values. + */ + if (node->prune_state && + bms_overlap(node->ps.chgParam, + node->prune_state->execparamids)) + { + node->valid_subplans_identified = false; + bms_free(node->valid_subplans); + node->valid_subplans = NULL; + bms_free(node->valid_asyncplans); + node->valid_asyncplans = NULL; + } + + for (i = 0; i < node->nplans; i++) + { + PlanState *subnode = node->plans[i]; + + /* + * ExecReScan doesn't know about my subplans, so I have to do + * changed-parameter signaling myself. + */ + if (node->ps.chgParam != NULL) + UpdateChangedParamSet(subnode, node->ps.chgParam); + + /* + * If chgParam of subnode is not null then plan will be re-scanned by + * first ExecProcNode. + */ + if (subnode->chgParam == NULL) + ExecReScan(subnode); + } + + /* Reset async state */ + if (nasyncplans > 0) + { + i = -1; + while ((i = bms_next_member(node->asyncplans, i)) >= 0) + { + AsyncRequest *areq = node->asyncrequests[i]; + + areq->callback_pending = false; + areq->request_complete = false; + areq->result = NULL; + } + + node->needrequest = NULL; + } +} + +void +ExecAppenderAsyncEventWait(AppenderState * node, int timeout, uint32 wait_event_info) +{ + int nevents = node->nasyncplans + 2; /* one for PM death and + * one for latch */ + int noccurred; + int i; + WaitEvent occurred_event[EVENT_BUFFER_SIZE]; + + Assert(node->eventset == NULL); + + node->eventset = CreateWaitEventSet(CurrentResourceOwner, nevents); + AddWaitEventToSet(node->eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, + NULL, NULL); + + /* Give each waiting subplan a chance to add an event. */ + i = -1; + while ((i = bms_next_member(node->asyncplans, i)) >= 0) + { + AsyncRequest *areq = node->asyncrequests[i]; + + if (areq->callback_pending) + ExecAsyncConfigureWait(areq); + } + + /* + * No need for further processing if none of the subplans configured any + * events. + */ + if (GetNumRegisteredWaitEvents(node->eventset) == 1) + { + FreeWaitEventSet(node->eventset); + node->eventset = NULL; + return; + } + + /* + * Add the process latch to the set, so that we wake up to process the + * standard interrupts with CHECK_FOR_INTERRUPTS(). + * + * NOTE: For historical reasons, it's important that this is added to the + * WaitEventSet after the ExecAsyncConfigureWait() calls. Namely, + * postgres_fdw calls "GetNumRegisteredWaitEvents(set) == 1" to check if + * any other events are in the set. That's a poor design, it's + * questionable for postgres_fdw to be doing that in the first place, but + * we cannot change it now. The pattern has possibly been copied to other + * extensions too. + */ + AddWaitEventToSet(node->eventset, WL_LATCH_SET, PGINVALID_SOCKET, + MyLatch, NULL); + + /* Return at most EVENT_BUFFER_SIZE events in one call. */ + if (nevents > EVENT_BUFFER_SIZE) + nevents = EVENT_BUFFER_SIZE; + + /* + * Wait until at least one event occurs. + * + * XXX: Make timeout and wait event configured + */ + noccurred = WaitEventSetWait(node->eventset, timeout , occurred_event, + nevents, wait_event_info); + + + FreeWaitEventSet(node->eventset); + node->eventset = NULL; + if (noccurred == 0) + return; + + + /* Deliver notifications. */ + for (i = 0; i < noccurred; i++) + { + WaitEvent *w = &occurred_event[i]; + + /* + * Each waiting subplan should have registered its wait event with + * user_data pointing back to its AsyncRequest. + */ + if ((w->events & WL_SOCKET_READABLE) != 0) + { + AsyncRequest *areq = (AsyncRequest *) w->user_data; + + if (areq->callback_pending) + { + /* + * Mark it as no longer needing a callback. We must do this + * before dispatching the callback in case the callback resets + * the flag. + */ + areq->callback_pending = false; + + /* Do the actual work. */ + ExecAsyncNotify(areq); + } + } + + /* Handle standard interrupts */ + if ((w->events & WL_LATCH_SET) != 0) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + } +} + +void +ExecAppenderAsyncBegin(AppenderState * node) +{ + int i; + + /* Backward scan is not supported by async-aware Appends. */ + Assert(ScanDirectionIsForward(node->ps.state->es_direction)); + + /* We should never be called when there are no subplans */ + Assert(node->nplans > 0); + + /* We should never be called when there are no async subplans. */ + Assert(node->nasyncplans > 0); + + /* Make a request for each of the valid async subplans. */ + i = -1; + while ((i = bms_next_member(node->valid_asyncplans, i)) >= 0) + { + AsyncRequest *areq = node->asyncrequests[i]; + + Assert(areq->request_index == i); + Assert(!areq->callback_pending); + + /* Do the actual work. */ + ExecAsyncRequest(areq); + } +} + +void +ExecEndAppender(AppenderState *node) +{ + PlanState **mergeplans; + int nplans; + int i; + + /* + * get information from the node + */ + mergeplans = node->plans; + nplans = node->nplans; + + /* + * shut down each of the subscans + */ + for (i = 0; i < nplans; i++) + ExecEndNode(mergeplans[i]); +} diff --git a/src/backend/executor/execCurrent.c b/src/backend/executor/execCurrent.c index 3bfdc0230ff..e8cf2ead8a8 100644 --- a/src/backend/executor/execCurrent.c +++ b/src/backend/executor/execCurrent.c @@ -375,9 +375,9 @@ search_plan_tree(PlanState *node, Oid table_oid, AppendState *astate = (AppendState *) node; int i; - for (i = 0; i < astate->as_nplans; i++) + for (i = 0; i < astate->as.nplans; i++) { - ScanState *elem = search_plan_tree(astate->appendplans[i], + ScanState *elem = search_plan_tree(astate->as.plans[i], table_oid, pending_rescan); diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index f5f9cfbeead..3eb1de1cd30 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -910,8 +910,8 @@ ExecSetTupleBound(int64 tuples_needed, PlanState *child_node) AppendState *aState = (AppendState *) child_node; int i; - for (i = 0; i < aState->as_nplans; i++) - ExecSetTupleBound(tuples_needed, aState->appendplans[i]); + for (i = 0; i < aState->as.nplans; i++) + ExecSetTupleBound(tuples_needed, aState->as.plans[i]); } else if (IsA(child_node, MergeAppendState)) { @@ -923,8 +923,8 @@ ExecSetTupleBound(int64 tuples_needed, PlanState *child_node) MergeAppendState *maState = (MergeAppendState *) child_node; int i; - for (i = 0; i < maState->ms_nplans; i++) - ExecSetTupleBound(tuples_needed, maState->mergeplans[i]); + for (i = 0; i < maState->ms.nplans; i++) + ExecSetTupleBound(tuples_needed, maState->ms.plans[i]); } else if (IsA(child_node, ResultState)) { diff --git a/src/backend/executor/meson.build b/src/backend/executor/meson.build index 2cea41f8771..b5cb710a59f 100644 --- a/src/backend/executor/meson.build +++ b/src/backend/executor/meson.build @@ -3,6 +3,7 @@ backend_sources += files( 'execAmi.c', 'execAsync.c', + 'execAppend.c', 'execCurrent.c', 'execExpr.c', 'execExprInterp.c', diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c index dfbc7b510c4..873e4cb559e 100644 --- a/src/backend/executor/nodeAppend.c +++ b/src/backend/executor/nodeAppend.c @@ -57,13 +57,13 @@ #include "postgres.h" +#include "executor/execAppend.h" #include "executor/execAsync.h" #include "executor/execPartition.h" #include "executor/executor.h" #include "executor/nodeAppend.h" #include "miscadmin.h" #include "pgstat.h" -#include "storage/latch.h" /* Shared state for parallel-aware Append. */ struct ParallelAppendState @@ -109,15 +109,6 @@ AppendState * ExecInitAppend(Append *node, EState *estate, int eflags) { AppendState *appendstate = makeNode(AppendState); - PlanState **appendplanstates; - const TupleTableSlotOps *appendops; - Bitmapset *validsubplans; - Bitmapset *asyncplans; - int nplans; - int nasyncplans; - int firstvalid; - int i, - j; /* check for unsupported flags */ Assert(!(eflags & EXEC_FLAG_MARK)); @@ -125,167 +116,27 @@ ExecInitAppend(Append *node, EState *estate, int eflags) /* * create new AppendState for our append node */ - appendstate->ps.plan = (Plan *) node; - appendstate->ps.state = estate; - appendstate->ps.ExecProcNode = ExecAppend; + appendstate->as.ps.plan = (Plan *) node; + appendstate->as.ps.state = estate; + appendstate->as.ps.ExecProcNode = ExecAppend; /* Let choose_next_subplan_* function handle setting the first subplan */ appendstate->as_whichplan = INVALID_SUBPLAN_INDEX; appendstate->as_syncdone = false; appendstate->as_begun = false; - /* If run-time partition pruning is enabled, then set that up now */ - if (node->part_prune_index >= 0) - { - PartitionPruneState *prunestate; - - /* - * Set up pruning data structure. This also initializes the set of - * subplans to initialize (validsubplans) by taking into account the - * result of performing initial pruning if any. - */ - prunestate = ExecInitPartitionExecPruning(&appendstate->ps, - list_length(node->appendplans), - node->part_prune_index, - node->apprelids, - &validsubplans); - appendstate->as_prune_state = prunestate; - nplans = bms_num_members(validsubplans); - - /* - * When no run-time pruning is required and there's at least one - * subplan, we can fill as_valid_subplans immediately, preventing - * later calls to ExecFindMatchingSubPlans. - */ - if (!prunestate->do_exec_prune && nplans > 0) - { - appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1); - appendstate->as_valid_subplans_identified = true; - } - } - else - { - nplans = list_length(node->appendplans); - - /* - * When run-time partition pruning is not enabled we can just mark all - * subplans as valid; they must also all be initialized. - */ - Assert(nplans > 0); - appendstate->as_valid_subplans = validsubplans = - bms_add_range(NULL, 0, nplans - 1); - appendstate->as_valid_subplans_identified = true; - appendstate->as_prune_state = NULL; - } - - appendplanstates = (PlanState **) palloc(nplans * - sizeof(PlanState *)); - - /* - * call ExecInitNode on each of the valid plans to be executed and save - * the results into the appendplanstates array. - * - * While at it, find out the first valid partial plan. - */ - j = 0; - asyncplans = NULL; - nasyncplans = 0; - firstvalid = nplans; - i = -1; - while ((i = bms_next_member(validsubplans, i)) >= 0) - { - Plan *initNode = (Plan *) list_nth(node->appendplans, i); - - /* - * Record async subplans. When executing EvalPlanQual, we treat them - * as sync ones; don't do this when initializing an EvalPlanQual plan - * tree. - */ - if (initNode->async_capable && estate->es_epq_active == NULL) - { - asyncplans = bms_add_member(asyncplans, j); - nasyncplans++; - } - - /* - * Record the lowest appendplans index which is a valid partial plan. - */ - if (i >= node->first_partial_plan && j < firstvalid) - firstvalid = j; - - appendplanstates[j++] = ExecInitNode(initNode, estate, eflags); - } - - appendstate->as_first_partial_plan = firstvalid; - appendstate->appendplans = appendplanstates; - appendstate->as_nplans = nplans; + /* Initialize common fields */ + ExecInitAppender(&appendstate->as, + &node->ap, + estate, + eflags, + node->first_partial_plan, + &appendstate->as_first_partial_plan); - /* - * Initialize Append's result tuple type and slot. If the child plans all - * produce the same fixed slot type, we can use that slot type; otherwise - * make a virtual slot. (Note that the result slot itself is used only to - * return a null tuple at end of execution; real tuples are returned to - * the caller in the children's own result slots. What we are doing here - * is allowing the parent plan node to optimize if the Append will return - * only one kind of slot.) - */ - appendops = ExecGetCommonSlotOps(appendplanstates, j); - if (appendops != NULL) - { - ExecInitResultTupleSlotTL(&appendstate->ps, appendops); - } - else - { - ExecInitResultTupleSlotTL(&appendstate->ps, &TTSOpsVirtual); - /* show that the output slot type is not fixed */ - appendstate->ps.resultopsset = true; - appendstate->ps.resultopsfixed = false; - } + if (appendstate->as.nasyncplans > 0 && appendstate->as.valid_subplans_identified) + classify_matching_subplans(appendstate); - /* Initialize async state */ - appendstate->as_asyncplans = asyncplans; - appendstate->as_nasyncplans = nasyncplans; - appendstate->as_asyncrequests = NULL; - appendstate->as_asyncresults = NULL; - appendstate->as_nasyncresults = 0; appendstate->as_nasyncremain = 0; - appendstate->as_needrequest = NULL; - appendstate->as_eventset = NULL; - appendstate->as_valid_asyncplans = NULL; - - if (nasyncplans > 0) - { - appendstate->as_asyncrequests = (AsyncRequest **) - palloc0(nplans * sizeof(AsyncRequest *)); - - i = -1; - while ((i = bms_next_member(asyncplans, i)) >= 0) - { - AsyncRequest *areq; - - areq = palloc_object(AsyncRequest); - areq->requestor = (PlanState *) appendstate; - areq->requestee = appendplanstates[i]; - areq->request_index = i; - areq->callback_pending = false; - areq->request_complete = false; - areq->result = NULL; - - appendstate->as_asyncrequests[i] = areq; - } - - appendstate->as_asyncresults = (TupleTableSlot **) - palloc0(nasyncplans * sizeof(TupleTableSlot *)); - - if (appendstate->as_valid_subplans_identified) - classify_matching_subplans(appendstate); - } - - /* - * Miscellaneous initialization - */ - - appendstate->ps.ps_ProjInfo = NULL; /* For parallel query, this will be overridden later. */ appendstate->choose_next_subplan = choose_next_subplan_locally; @@ -315,11 +166,11 @@ ExecAppend(PlanState *pstate) Assert(!node->as_syncdone); /* Nothing to do if there are no subplans */ - if (node->as_nplans == 0) - return ExecClearTuple(node->ps.ps_ResultTupleSlot); + if (node->as.nplans == 0) + return ExecClearTuple(node->as.ps.ps_ResultTupleSlot); /* If there are any async subplans, begin executing them. */ - if (node->as_nasyncplans > 0) + if (node->as.nasyncplans > 0) ExecAppendAsyncBegin(node); /* @@ -327,11 +178,11 @@ ExecAppend(PlanState *pstate) * proceeding. */ if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0) - return ExecClearTuple(node->ps.ps_ResultTupleSlot); + return ExecClearTuple(node->as.ps.ps_ResultTupleSlot); Assert(node->as_syncdone || (node->as_whichplan >= 0 && - node->as_whichplan < node->as_nplans)); + node->as_whichplan < node->as.nplans)); /* And we're initialized. */ node->as_begun = true; @@ -346,19 +197,19 @@ ExecAppend(PlanState *pstate) /* * try to get a tuple from an async subplan if any */ - if (node->as_syncdone || !bms_is_empty(node->as_needrequest)) + if (node->as_syncdone || !bms_is_empty(node->as.needrequest)) { if (ExecAppendAsyncGetNext(node, &result)) return result; Assert(!node->as_syncdone); - Assert(bms_is_empty(node->as_needrequest)); + Assert(bms_is_empty(node->as.needrequest)); } /* * figure out which sync subplan we are currently processing */ - Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans); - subnode = node->appendplans[node->as_whichplan]; + Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as.nplans); + subnode = node->as.plans[node->as_whichplan]; /* * get a tuple from the subplan @@ -385,7 +236,7 @@ ExecAppend(PlanState *pstate) /* choose new sync subplan; if no sync/async subplans, we're done */ if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0) - return ExecClearTuple(node->ps.ps_ResultTupleSlot); + return ExecClearTuple(node->as.ps.ps_ResultTupleSlot); } } @@ -400,81 +251,23 @@ ExecAppend(PlanState *pstate) void ExecEndAppend(AppendState *node) { - PlanState **appendplans; - int nplans; - int i; - - /* - * get information from the node - */ - appendplans = node->appendplans; - nplans = node->as_nplans; - - /* - * shut down each of the subscans - */ - for (i = 0; i < nplans; i++) - ExecEndNode(appendplans[i]); + ExecEndAppender(&node->as); } void ExecReScanAppend(AppendState *node) { - int nasyncplans = node->as_nasyncplans; - int i; - - /* - * If any PARAM_EXEC Params used in pruning expressions have changed, then - * we'd better unset the valid subplans so that they are reselected for - * the new parameter values. - */ - if (node->as_prune_state && - bms_overlap(node->ps.chgParam, - node->as_prune_state->execparamids)) - { - node->as_valid_subplans_identified = false; - bms_free(node->as_valid_subplans); - node->as_valid_subplans = NULL; - bms_free(node->as_valid_asyncplans); - node->as_valid_asyncplans = NULL; - } - - for (i = 0; i < node->as_nplans; i++) - { - PlanState *subnode = node->appendplans[i]; - /* - * ExecReScan doesn't know about my subplans, so I have to do - * changed-parameter signaling myself. - */ - if (node->ps.chgParam != NULL) - UpdateChangedParamSet(subnode, node->ps.chgParam); + int nasyncplans = node->as.nasyncplans; - /* - * If chgParam of subnode is not null then plan will be re-scanned by - * first ExecProcNode or by first ExecAsyncRequest. - */ - if (subnode->chgParam == NULL) - ExecReScan(subnode); - } + ExecReScanAppender(&node->as); - /* Reset async state */ + /* Reset specific append async state */ if (nasyncplans > 0) { - i = -1; - while ((i = bms_next_member(node->as_asyncplans, i)) >= 0) - { - AsyncRequest *areq = node->as_asyncrequests[i]; - - areq->callback_pending = false; - areq->request_complete = false; - areq->result = NULL; - } - node->as_nasyncresults = 0; node->as_nasyncremain = 0; - bms_free(node->as_needrequest); - node->as_needrequest = NULL; + bms_free(node->as.needrequest); } /* Let choose_next_subplan_* function handle setting the first subplan */ @@ -501,7 +294,7 @@ ExecAppendEstimate(AppendState *node, { node->pstate_len = add_size(offsetof(ParallelAppendState, pa_finished), - sizeof(bool) * node->as_nplans); + sizeof(bool) * node->as.nplans); shm_toc_estimate_chunk(&pcxt->estimator, node->pstate_len); shm_toc_estimate_keys(&pcxt->estimator, 1); @@ -523,7 +316,7 @@ ExecAppendInitializeDSM(AppendState *node, pstate = shm_toc_allocate(pcxt->toc, node->pstate_len); memset(pstate, 0, node->pstate_len); LWLockInitialize(&pstate->pa_lock, LWTRANCHE_PARALLEL_APPEND); - shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate); + shm_toc_insert(pcxt->toc, node->as.ps.plan->plan_node_id, pstate); node->as_pstate = pstate; node->choose_next_subplan = choose_next_subplan_for_leader; @@ -541,7 +334,7 @@ ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt) ParallelAppendState *pstate = node->as_pstate; pstate->pa_next_plan = 0; - memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans); + memset(pstate->pa_finished, 0, sizeof(bool) * node->as.nplans); } /* ---------------------------------------------------------------- @@ -554,7 +347,7 @@ ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt) void ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt) { - node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false); + node->as_pstate = shm_toc_lookup(pwcxt->toc, node->as.ps.plan->plan_node_id, false); node->choose_next_subplan = choose_next_subplan_for_worker; } @@ -572,7 +365,7 @@ choose_next_subplan_locally(AppendState *node) int nextplan; /* We should never be called when there are no subplans */ - Assert(node->as_nplans > 0); + Assert(node->as.nplans > 0); /* Nothing to do if syncdone */ if (node->as_syncdone) @@ -587,33 +380,33 @@ choose_next_subplan_locally(AppendState *node) */ if (whichplan == INVALID_SUBPLAN_INDEX) { - if (node->as_nasyncplans > 0) + if (node->as.nasyncplans > 0) { /* We'd have filled as_valid_subplans already */ - Assert(node->as_valid_subplans_identified); + Assert(node->as.valid_subplans_identified); } - else if (!node->as_valid_subplans_identified) + else if (!node->as.valid_subplans_identified) { - node->as_valid_subplans = - ExecFindMatchingSubPlans(node->as_prune_state, false, NULL); - node->as_valid_subplans_identified = true; + node->as.valid_subplans = + ExecFindMatchingSubPlans(node->as.prune_state, false, NULL); + node->as.valid_subplans_identified = true; } whichplan = -1; } /* Ensure whichplan is within the expected range */ - Assert(whichplan >= -1 && whichplan <= node->as_nplans); + Assert(whichplan >= -1 && whichplan <= node->as.nplans); - if (ScanDirectionIsForward(node->ps.state->es_direction)) - nextplan = bms_next_member(node->as_valid_subplans, whichplan); + if (ScanDirectionIsForward(node->as.ps.state->es_direction)) + nextplan = bms_next_member(node->as.valid_subplans, whichplan); else - nextplan = bms_prev_member(node->as_valid_subplans, whichplan); + nextplan = bms_prev_member(node->as.valid_subplans, whichplan); if (nextplan < 0) { /* Set as_syncdone if in async mode */ - if (node->as_nasyncplans > 0) + if (node->as.nasyncplans > 0) node->as_syncdone = true; return false; } @@ -637,10 +430,10 @@ choose_next_subplan_for_leader(AppendState *node) ParallelAppendState *pstate = node->as_pstate; /* Backward scan is not supported by parallel-aware plans */ - Assert(ScanDirectionIsForward(node->ps.state->es_direction)); + Assert(ScanDirectionIsForward(node->as.ps.state->es_direction)); /* We should never be called when there are no subplans */ - Assert(node->as_nplans > 0); + Assert(node->as.nplans > 0); LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE); @@ -652,18 +445,18 @@ choose_next_subplan_for_leader(AppendState *node) else { /* Start with last subplan. */ - node->as_whichplan = node->as_nplans - 1; + node->as_whichplan = node->as.nplans - 1; /* * If we've yet to determine the valid subplans then do so now. If * run-time pruning is disabled then the valid subplans will always be * set to all subplans. */ - if (!node->as_valid_subplans_identified) + if (!node->as.valid_subplans_identified) { - node->as_valid_subplans = - ExecFindMatchingSubPlans(node->as_prune_state, false, NULL); - node->as_valid_subplans_identified = true; + node->as.valid_subplans = + ExecFindMatchingSubPlans(node->as.prune_state, false, NULL); + node->as.valid_subplans_identified = true; /* * Mark each invalid plan as finished to allow the loop below to @@ -719,10 +512,10 @@ choose_next_subplan_for_worker(AppendState *node) ParallelAppendState *pstate = node->as_pstate; /* Backward scan is not supported by parallel-aware plans */ - Assert(ScanDirectionIsForward(node->ps.state->es_direction)); + Assert(ScanDirectionIsForward(node->as.ps.state->es_direction)); /* We should never be called when there are no subplans */ - Assert(node->as_nplans > 0); + Assert(node->as.nplans > 0); LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE); @@ -735,11 +528,11 @@ choose_next_subplan_for_worker(AppendState *node) * run-time pruning is disabled then the valid subplans will always be set * to all subplans. */ - else if (!node->as_valid_subplans_identified) + else if (!node->as.valid_subplans_identified) { - node->as_valid_subplans = - ExecFindMatchingSubPlans(node->as_prune_state, false, NULL); - node->as_valid_subplans_identified = true; + node->as.valid_subplans = + ExecFindMatchingSubPlans(node->as.prune_state, false, NULL); + node->as.valid_subplans_identified = true; mark_invalid_subplans_as_finished(node); } @@ -759,7 +552,7 @@ choose_next_subplan_for_worker(AppendState *node) { int nextplan; - nextplan = bms_next_member(node->as_valid_subplans, + nextplan = bms_next_member(node->as.valid_subplans, pstate->pa_next_plan); if (nextplan >= 0) { @@ -772,7 +565,7 @@ choose_next_subplan_for_worker(AppendState *node) * Try looping back to the first valid partial plan, if there is * one. If there isn't, arrange to bail out below. */ - nextplan = bms_next_member(node->as_valid_subplans, + nextplan = bms_next_member(node->as.valid_subplans, node->as_first_partial_plan - 1); pstate->pa_next_plan = nextplan < 0 ? node->as_whichplan : nextplan; @@ -797,7 +590,7 @@ choose_next_subplan_for_worker(AppendState *node) /* Pick the plan we found, and advance pa_next_plan one more time. */ node->as_whichplan = pstate->pa_next_plan; - pstate->pa_next_plan = bms_next_member(node->as_valid_subplans, + pstate->pa_next_plan = bms_next_member(node->as.valid_subplans, pstate->pa_next_plan); /* @@ -806,7 +599,7 @@ choose_next_subplan_for_worker(AppendState *node) */ if (pstate->pa_next_plan < 0) { - int nextplan = bms_next_member(node->as_valid_subplans, + int nextplan = bms_next_member(node->as.valid_subplans, node->as_first_partial_plan - 1); if (nextplan >= 0) @@ -848,16 +641,16 @@ mark_invalid_subplans_as_finished(AppendState *node) Assert(node->as_pstate); /* Shouldn't have been called when run-time pruning is not enabled */ - Assert(node->as_prune_state); + Assert(node->as.prune_state); /* Nothing to do if all plans are valid */ - if (bms_num_members(node->as_valid_subplans) == node->as_nplans) + if (bms_num_members(node->as.valid_subplans) == node->as.nplans) return; /* Mark all non-valid plans as finished */ - for (i = 0; i < node->as_nplans; i++) + for (i = 0; i < node->as.nplans; i++) { - if (!bms_is_member(i, node->as_valid_subplans)) + if (!bms_is_member(i, node->as.valid_subplans)) node->as_pstate->pa_finished[i] = true; } } @@ -876,47 +669,25 @@ mark_invalid_subplans_as_finished(AppendState *node) static void ExecAppendAsyncBegin(AppendState *node) { - int i; - - /* Backward scan is not supported by async-aware Appends. */ - Assert(ScanDirectionIsForward(node->ps.state->es_direction)); - - /* We should never be called when there are no subplans */ - Assert(node->as_nplans > 0); - - /* We should never be called when there are no async subplans. */ - Assert(node->as_nasyncplans > 0); - /* If we've yet to determine the valid subplans then do so now. */ - if (!node->as_valid_subplans_identified) + if (!node->as.valid_subplans_identified) { - node->as_valid_subplans = - ExecFindMatchingSubPlans(node->as_prune_state, false, NULL); - node->as_valid_subplans_identified = true; + node->as.valid_subplans = + ExecFindMatchingSubPlans(node->as.prune_state, false, NULL); + node->as.valid_subplans_identified = true; classify_matching_subplans(node); } /* Initialize state variables. */ - node->as_syncdone = bms_is_empty(node->as_valid_subplans); - node->as_nasyncremain = bms_num_members(node->as_valid_asyncplans); + node->as_syncdone = bms_is_empty(node->as.valid_subplans); + node->as_nasyncremain = bms_num_members(node->as.valid_asyncplans); /* Nothing to do if there are no valid async subplans. */ if (node->as_nasyncremain == 0) return; - /* Make a request for each of the valid async subplans. */ - i = -1; - while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0) - { - AsyncRequest *areq = node->as_asyncrequests[i]; - - Assert(areq->request_index == i); - Assert(!areq->callback_pending); - - /* Do the actual work. */ - ExecAsyncRequest(areq); - } + ExecAppenderAsyncBegin(&node->as); } /* ---------------------------------------------------------------- @@ -961,7 +732,7 @@ ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result) if (node->as_syncdone) { Assert(node->as_nasyncremain == 0); - *result = ExecClearTuple(node->ps.ps_ResultTupleSlot); + *result = ExecClearTuple(node->as.ps.ps_ResultTupleSlot); return true; } @@ -981,7 +752,7 @@ ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result) int i; /* Nothing to do if there are no async subplans needing a new request. */ - if (bms_is_empty(node->as_needrequest)) + if (bms_is_empty(node->as.needrequest)) { Assert(node->as_nasyncresults == 0); return false; @@ -994,17 +765,17 @@ ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result) if (node->as_nasyncresults > 0) { --node->as_nasyncresults; - *result = node->as_asyncresults[node->as_nasyncresults]; + *result = node->as.asyncresults[node->as_nasyncresults]; return true; } /* Make a new request for each of the async subplans that need it. */ - needrequest = node->as_needrequest; - node->as_needrequest = NULL; + needrequest = node->as.needrequest; + node->as.needrequest = NULL; i = -1; while ((i = bms_next_member(needrequest, i)) >= 0) { - AsyncRequest *areq = node->as_asyncrequests[i]; + AsyncRequest *areq = node->as.asyncrequests[i]; /* Do the actual work. */ ExecAsyncRequest(areq); @@ -1015,7 +786,7 @@ ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result) if (node->as_nasyncresults > 0) { --node->as_nasyncresults; - *result = node->as_asyncresults[node->as_nasyncresults]; + *result = node->as.asyncresults[node->as_nasyncresults]; return true; } @@ -1031,105 +802,12 @@ ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result) static void ExecAppendAsyncEventWait(AppendState *node) { - int nevents = node->as_nasyncplans + 2; long timeout = node->as_syncdone ? -1 : 0; - WaitEvent occurred_event[EVENT_BUFFER_SIZE]; - int noccurred; - int i; /* We should never be called when there are no valid async subplans. */ Assert(node->as_nasyncremain > 0); - Assert(node->as_eventset == NULL); - node->as_eventset = CreateWaitEventSet(CurrentResourceOwner, nevents); - AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, - NULL, NULL); - - /* Give each waiting subplan a chance to add an event. */ - i = -1; - while ((i = bms_next_member(node->as_asyncplans, i)) >= 0) - { - AsyncRequest *areq = node->as_asyncrequests[i]; - - if (areq->callback_pending) - ExecAsyncConfigureWait(areq); - } - - /* - * No need for further processing if none of the subplans configured any - * events. - */ - if (GetNumRegisteredWaitEvents(node->as_eventset) == 1) - { - FreeWaitEventSet(node->as_eventset); - node->as_eventset = NULL; - return; - } - - /* - * Add the process latch to the set, so that we wake up to process the - * standard interrupts with CHECK_FOR_INTERRUPTS(). - * - * NOTE: For historical reasons, it's important that this is added to the - * WaitEventSet after the ExecAsyncConfigureWait() calls. Namely, - * postgres_fdw calls "GetNumRegisteredWaitEvents(set) == 1" to check if - * any other events are in the set. That's a poor design, it's - * questionable for postgres_fdw to be doing that in the first place, but - * we cannot change it now. The pattern has possibly been copied to other - * extensions too. - */ - AddWaitEventToSet(node->as_eventset, WL_LATCH_SET, PGINVALID_SOCKET, - MyLatch, NULL); - - /* Return at most EVENT_BUFFER_SIZE events in one call. */ - if (nevents > EVENT_BUFFER_SIZE) - nevents = EVENT_BUFFER_SIZE; - - /* - * If the timeout is -1, wait until at least one event occurs. If the - * timeout is 0, poll for events, but do not wait at all. - */ - noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event, - nevents, WAIT_EVENT_APPEND_READY); - FreeWaitEventSet(node->as_eventset); - node->as_eventset = NULL; - if (noccurred == 0) - return; - - /* Deliver notifications. */ - for (i = 0; i < noccurred; i++) - { - WaitEvent *w = &occurred_event[i]; - - /* - * Each waiting subplan should have registered its wait event with - * user_data pointing back to its AsyncRequest. - */ - if ((w->events & WL_SOCKET_READABLE) != 0) - { - AsyncRequest *areq = (AsyncRequest *) w->user_data; - - if (areq->callback_pending) - { - /* - * Mark it as no longer needing a callback. We must do this - * before dispatching the callback in case the callback resets - * the flag. - */ - areq->callback_pending = false; - - /* Do the actual work. */ - ExecAsyncNotify(areq); - } - } - - /* Handle standard interrupts */ - if ((w->events & WL_LATCH_SET) != 0) - { - ResetLatch(MyLatch); - CHECK_FOR_INTERRUPTS(); - } - } + ExecAppenderAsyncEventWait(&node->as, timeout, WAIT_EVENT_APPEND_READY); } /* ---------------------------------------------------------------- @@ -1165,14 +843,14 @@ ExecAsyncAppendResponse(AsyncRequest *areq) } /* Save result so we can return it. */ - Assert(node->as_nasyncresults < node->as_nasyncplans); - node->as_asyncresults[node->as_nasyncresults++] = slot; + Assert(node->as_nasyncresults < node->as.nasyncplans); + node->as.asyncresults[node->as_nasyncresults++] = slot; /* * Mark the subplan that returned a result as ready for a new request. We * don't launch another one here immediately because it might complete. */ - node->as_needrequest = bms_add_member(node->as_needrequest, + node->as.needrequest = bms_add_member(node->as.needrequest, areq->request_index); } @@ -1187,10 +865,10 @@ ExecAsyncAppendResponse(AsyncRequest *areq) static void classify_matching_subplans(AppendState *node) { - Assert(node->as_valid_subplans_identified); + Assert(node->as.valid_subplans_identified); /* Nothing to do if there are no valid subplans. */ - if (bms_is_empty(node->as_valid_subplans)) + if (bms_is_empty(node->as.valid_subplans)) { node->as_syncdone = true; node->as_nasyncremain = 0; @@ -1199,8 +877,8 @@ classify_matching_subplans(AppendState *node) /* No valid async subplans identified. */ if (!classify_matching_subplans_common( - &node->as_valid_subplans, - node->as_asyncplans, - &node->as_valid_asyncplans)) + &node->as.valid_subplans, + node->as.asyncplans, + &node->as.valid_asyncplans)) node->as_nasyncremain = 0; } diff --git a/src/backend/executor/nodeMergeAppend.c b/src/backend/executor/nodeMergeAppend.c index f1c267eb9eb..6eb8c80bc4c 100644 --- a/src/backend/executor/nodeMergeAppend.c +++ b/src/backend/executor/nodeMergeAppend.c @@ -38,6 +38,7 @@ #include "postgres.h" +#include "executor/execAppend.h" #include "executor/executor.h" #include "executor/execAsync.h" #include "executor/execPartition.h" @@ -76,14 +77,7 @@ MergeAppendState * ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags) { MergeAppendState *mergestate = makeNode(MergeAppendState); - PlanState **mergeplanstates; - const TupleTableSlotOps *mergeops; - Bitmapset *validsubplans; - int nplans; - int i, - j; - Bitmapset *asyncplans; - int nasyncplans; + int i; /* check for unsupported flags */ Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); @@ -91,154 +85,27 @@ ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags) /* * create new MergeAppendState for our node */ - mergestate->ps.plan = (Plan *) node; - mergestate->ps.state = estate; - mergestate->ps.ExecProcNode = ExecMergeAppend; - - /* If run-time partition pruning is enabled, then set that up now */ - if (node->part_prune_index >= 0) - { - PartitionPruneState *prunestate; - - /* - * Set up pruning data structure. This also initializes the set of - * subplans to initialize (validsubplans) by taking into account the - * result of performing initial pruning if any. - */ - prunestate = ExecInitPartitionExecPruning(&mergestate->ps, - list_length(node->mergeplans), - node->part_prune_index, - node->apprelids, - &validsubplans); - mergestate->ms_prune_state = prunestate; - nplans = bms_num_members(validsubplans); - - /* - * When no run-time pruning is required and there's at least one - * subplan, we can fill ms_valid_subplans immediately, preventing - * later calls to ExecFindMatchingSubPlans. - */ - if (!prunestate->do_exec_prune && nplans > 0) - { - mergestate->ms_valid_subplans = bms_add_range(NULL, 0, nplans - 1); - mergestate->ms_valid_subplans_identified = true; - } - } - else - { - nplans = list_length(node->mergeplans); - - /* - * When run-time partition pruning is not enabled we can just mark all - * subplans as valid; they must also all be initialized. - */ - Assert(nplans > 0); - mergestate->ms_valid_subplans = validsubplans = - bms_add_range(NULL, 0, nplans - 1); - mergestate->ms_valid_subplans_identified = true; - mergestate->ms_prune_state = NULL; - } - - mergeplanstates = palloc_array(PlanState *, nplans); - mergestate->mergeplans = mergeplanstates; - mergestate->ms_nplans = nplans; - - mergestate->ms_slots = palloc0_array(TupleTableSlot *, nplans); - mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots, + mergestate->ms.ps.plan = (Plan *) node; + mergestate->ms.ps.state = estate; + mergestate->ms.ps.ExecProcNode = ExecMergeAppend; + + /* Initialize common fields */ + ExecInitAppender(&mergestate->ms, + &node->ap, + estate, + eflags, + -1, + NULL); + + if (mergestate->ms.nasyncplans > 0 && mergestate->ms.valid_subplans_identified) + classify_matching_subplans(mergestate); + + mergestate->ms_slots = palloc0_array(TupleTableSlot *, mergestate->ms.nplans); + mergestate->ms_heap = binaryheap_allocate(mergestate->ms.nplans, heap_compare_slots, mergestate); - /* - * call ExecInitNode on each of the valid plans to be executed and save - * the results into the mergeplanstates array. - */ - j = 0; - asyncplans = NULL; - nasyncplans = 0; - - i = -1; - while ((i = bms_next_member(validsubplans, i)) >= 0) - { - Plan *initNode = (Plan *) list_nth(node->mergeplans, i); - - /* - * Record async subplans. When executing EvalPlanQual, we treat them - * as sync ones; don't do this when initializing an EvalPlanQual plan - * tree. - */ - if (initNode->async_capable && estate->es_epq_active == NULL) - { - asyncplans = bms_add_member(asyncplans, j); - nasyncplans++; - } - - mergeplanstates[j++] = ExecInitNode(initNode, estate, eflags); - } - - /* - * Initialize MergeAppend's result tuple type and slot. If the child - * plans all produce the same fixed slot type, we can use that slot type; - * otherwise make a virtual slot. (Note that the result slot itself is - * used only to return a null tuple at end of execution; real tuples are - * returned to the caller in the children's own result slots. What we are - * doing here is allowing the parent plan node to optimize if the - * MergeAppend will return only one kind of slot.) - */ - mergeops = ExecGetCommonSlotOps(mergeplanstates, j); - if (mergeops != NULL) - { - ExecInitResultTupleSlotTL(&mergestate->ps, mergeops); - } - else - { - ExecInitResultTupleSlotTL(&mergestate->ps, &TTSOpsVirtual); - /* show that the output slot type is not fixed */ - mergestate->ps.resultopsset = true; - mergestate->ps.resultopsfixed = false; - } - - /* - * Miscellaneous initialization - */ - mergestate->ps.ps_ProjInfo = NULL; - - /* Initialize async state */ - mergestate->ms_asyncplans = asyncplans; - mergestate->ms_nasyncplans = nasyncplans; - mergestate->ms_asyncrequests = NULL; - mergestate->ms_asyncresults = NULL; mergestate->ms_has_asyncresults = NULL; mergestate->ms_asyncremain = NULL; - mergestate->ms_needrequest = NULL; - mergestate->ms_eventset = NULL; - mergestate->ms_valid_asyncplans = NULL; - - if (nasyncplans > 0) - { - mergestate->ms_asyncrequests = (AsyncRequest **) - palloc0(nplans * sizeof(AsyncRequest *)); - - i = -1; - while ((i = bms_next_member(asyncplans, i)) >= 0) - { - AsyncRequest *areq; - - areq = palloc(sizeof(AsyncRequest)); - areq->requestor = (PlanState *) mergestate; - areq->requestee = mergeplanstates[i]; - areq->request_index = i; - areq->callback_pending = false; - areq->request_complete = false; - areq->result = NULL; - - mergestate->ms_asyncrequests[i] = areq; - } - - mergestate->ms_asyncresults = (TupleTableSlot **) - palloc0(nplans * sizeof(TupleTableSlot *)); - - if (mergestate->ms_valid_subplans_identified) - classify_matching_subplans(mergestate); - } /* * initialize sort-key information @@ -293,20 +160,20 @@ ExecMergeAppend(PlanState *pstate) if (!node->ms_initialized) { /* Nothing to do if all subplans were pruned */ - if (node->ms_nplans == 0) - return ExecClearTuple(node->ps.ps_ResultTupleSlot); + if (node->ms.nplans == 0) + return ExecClearTuple(node->ms.ps.ps_ResultTupleSlot); /* If we've yet to determine the valid subplans then do so now. */ - if (!node->ms_valid_subplans_identified) + if (!node->ms.valid_subplans_identified) { - node->ms_valid_subplans = - ExecFindMatchingSubPlans(node->ms_prune_state, false, NULL); - node->ms_valid_subplans_identified = true; + node->ms.valid_subplans = + ExecFindMatchingSubPlans(node->ms.prune_state, false, NULL); + node->ms.valid_subplans_identified = true; classify_matching_subplans(node); } /* If there are any async subplans, begin executing them. */ - if (node->ms_nasyncplans > 0) + if (node->ms.nasyncplans > 0) ExecMergeAppendAsyncBegin(node); /* @@ -314,16 +181,16 @@ ExecMergeAppend(PlanState *pstate) * and set up the heap. */ i = -1; - while ((i = bms_next_member(node->ms_valid_subplans, i)) >= 0) + while ((i = bms_next_member(node->ms.valid_subplans, i)) >= 0) { - node->ms_slots[i] = ExecProcNode(node->mergeplans[i]); + node->ms_slots[i] = ExecProcNode(node->ms.plans[i]); if (!TupIsNull(node->ms_slots[i])) binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i)); } /* Look at valid async subplans */ i = -1; - while ((i = bms_next_member(node->ms_valid_asyncplans, i)) >= 0) + while ((i = bms_next_member(node->ms.valid_asyncplans, i)) >= 0) { ExecMergeAppendAsyncGetNext(node, i); if (!TupIsNull(node->ms_slots[i])) @@ -344,12 +211,12 @@ ExecMergeAppend(PlanState *pstate) * to not pull tuples until necessary.) */ i = DatumGetInt32(binaryheap_first(node->ms_heap)); - if (bms_is_member(i, node->ms_asyncplans)) + if (bms_is_member(i, node->ms.asyncplans)) ExecMergeAppendAsyncGetNext(node, i); else { - Assert(bms_is_member(i, node->ms_valid_subplans)); - node->ms_slots[i] = ExecProcNode(node->mergeplans[i]); + Assert(bms_is_member(i, node->ms.valid_subplans)); + node->ms_slots[i] = ExecProcNode(node->ms.plans[i]); } if (!TupIsNull(node->ms_slots[i])) binaryheap_replace_first(node->ms_heap, Int32GetDatum(i)); @@ -360,7 +227,7 @@ ExecMergeAppend(PlanState *pstate) if (binaryheap_empty(node->ms_heap)) { /* All the subplans are exhausted, and so is the heap */ - result = ExecClearTuple(node->ps.ps_ResultTupleSlot); + result = ExecClearTuple(node->ms.ps.ps_ResultTupleSlot); } else { @@ -426,81 +293,22 @@ heap_compare_slots(Datum a, Datum b, void *arg) void ExecEndMergeAppend(MergeAppendState *node) { - PlanState **mergeplans; - int nplans; - int i; - - /* - * get information from the node - */ - mergeplans = node->mergeplans; - nplans = node->ms_nplans; - - /* - * shut down each of the subscans - */ - for (i = 0; i < nplans; i++) - ExecEndNode(mergeplans[i]); + ExecEndAppender(&node->ms); } void ExecReScanMergeAppend(MergeAppendState *node) { - int i; - int nasyncplans = node->ms_nasyncplans; + int nasyncplans = node->ms.nasyncplans; - /* - * If any PARAM_EXEC Params used in pruning expressions have changed, then - * we'd better unset the valid subplans so that they are reselected for - * the new parameter values. - */ - if (node->ms_prune_state && - bms_overlap(node->ps.chgParam, - node->ms_prune_state->execparamids)) - { - node->ms_valid_subplans_identified = false; - bms_free(node->ms_valid_subplans); - node->ms_valid_subplans = NULL; - bms_free(node->ms_valid_asyncplans); - node->ms_valid_asyncplans = NULL; - } - - for (i = 0; i < node->ms_nplans; i++) - { - PlanState *subnode = node->mergeplans[i]; - - /* - * ExecReScan doesn't know about my subplans, so I have to do - * changed-parameter signaling myself. - */ - if (node->ps.chgParam != NULL) - UpdateChangedParamSet(subnode, node->ps.chgParam); - - /* - * If chgParam of subnode is not null then plan will be re-scanned by - * first ExecProcNode. - */ - if (subnode->chgParam == NULL) - ExecReScan(subnode); - } + ExecReScanAppender(&node->ms); - /* Reset async state */ + /* Reset specific merge append async state */ if (nasyncplans > 0) { - i = -1; - while ((i = bms_next_member(node->ms_asyncplans, i)) >= 0) - { - AsyncRequest *areq = node->ms_asyncrequests[i]; - - areq->callback_pending = false; - areq->request_complete = false; - areq->result = NULL; - } - bms_free(node->ms_asyncremain); node->ms_asyncremain = NULL; - bms_free(node->ms_needrequest); - node->ms_needrequest = NULL; + bms_free(node->ms.needrequest); bms_free(node->ms_has_asyncresults); node->ms_has_asyncresults = NULL; } @@ -519,10 +327,10 @@ ExecReScanMergeAppend(MergeAppendState *node) static void classify_matching_subplans(MergeAppendState *node) { - Assert(node->ms_valid_subplans_identified); + Assert(node->ms.valid_subplans_identified); /* Nothing to do if there are no valid subplans. */ - if (bms_is_empty(node->ms_valid_subplans)) + if (bms_is_empty(node->ms.valid_subplans)) { node->ms_asyncremain = NULL; return; @@ -530,9 +338,9 @@ classify_matching_subplans(MergeAppendState *node) /* No valid async subplans identified. */ if (!classify_matching_subplans_common( - &node->ms_valid_subplans, - node->ms_asyncplans, - &node->ms_valid_asyncplans)) + &node->ms.valid_subplans, + node->ms.asyncplans, + &node->ms.valid_asyncplans)) node->ms_asyncremain = NULL; } @@ -545,39 +353,17 @@ classify_matching_subplans(MergeAppendState *node) static void ExecMergeAppendAsyncBegin(MergeAppendState *node) { - int i; - - /* Backward scan is not supported by async-aware MergeAppends. */ - Assert(ScanDirectionIsForward(node->ps.state->es_direction)); - - /* We should never be called when there are no subplans */ - Assert(node->ms_nplans > 0); - - /* We should never be called when there are no async subplans. */ - Assert(node->ms_nasyncplans > 0); - /* ExecMergeAppend() identifies valid subplans */ - Assert(node->ms_valid_subplans_identified); + Assert(node->ms.valid_subplans_identified); /* Initialize state variables. */ - node->ms_asyncremain = bms_copy(node->ms_valid_asyncplans); + node->ms_asyncremain = bms_copy(node->ms.valid_asyncplans); /* Nothing to do if there are no valid async subplans. */ if (bms_is_empty(node->ms_asyncremain)) return; - /* Make a request for each of the valid async subplans. */ - i = -1; - while ((i = bms_next_member(node->ms_valid_asyncplans, i)) >= 0) - { - AsyncRequest *areq = node->ms_asyncrequests[i]; - - Assert(areq->request_index == i); - Assert(!areq->callback_pending); - - /* Do the actual work. */ - ExecAsyncRequest(areq); - } + ExecAppenderAsyncBegin(&node->ms); } /* ---------------------------------------------------------------- @@ -638,7 +424,7 @@ ExecMergeAppendAsyncRequest(MergeAppendState *node, int mplan) */ if (bms_is_member(mplan, node->ms_has_asyncresults)) { - node->ms_slots[mplan] = node->ms_asyncresults[mplan]; + node->ms_slots[mplan] = node->ms.asyncresults[mplan]; return true; } @@ -648,7 +434,7 @@ ExecMergeAppendAsyncRequest(MergeAppendState *node, int mplan) */ needrequest = NULL; i = -1; - while ((i = bms_next_member(node->ms_needrequest, i)) >= 0) + while ((i = bms_next_member(node->ms.needrequest, i)) >= 0) { if (!bms_is_member(i, node->ms_has_asyncresults)) needrequest = bms_add_member(needrequest, i); @@ -661,13 +447,13 @@ ExecMergeAppendAsyncRequest(MergeAppendState *node, int mplan) return false; /* Clear ms_needrequest flag, as we are going to send requests now */ - node->ms_needrequest = bms_del_members(node->ms_needrequest, needrequest); + node->ms.needrequest = bms_del_members(node->ms.needrequest, needrequest); /* Make a new request for each of the async subplans that need it. */ i = -1; while ((i = bms_next_member(needrequest, i)) >= 0) { - AsyncRequest *areq = node->ms_asyncrequests[i]; + AsyncRequest *areq = node->ms.asyncrequests[i]; /* * We've just checked that subplan doesn't already have some fetched @@ -683,7 +469,7 @@ ExecMergeAppendAsyncRequest(MergeAppendState *node, int mplan) /* Return needed asynchronously-generated results if any. */ if (bms_is_member(mplan, node->ms_has_asyncresults)) { - node->ms_slots[mplan] = node->ms_asyncresults[mplan]; + node->ms_slots[mplan] = node->ms.asyncresults[mplan]; return true; } @@ -707,7 +493,7 @@ ExecAsyncMergeAppendResponse(AsyncRequest *areq) /* We should handle previous async result prior to getting new one */ Assert(!bms_is_member(areq->request_index, node->ms_has_asyncresults)); - node->ms_asyncresults[areq->request_index] = NULL; + node->ms.asyncresults[areq->request_index] = NULL; /* Nothing to do if the request is pending. */ if (!areq->request_complete) { @@ -730,13 +516,13 @@ ExecAsyncMergeAppendResponse(AsyncRequest *areq) node->ms_has_asyncresults = bms_add_member(node->ms_has_asyncresults, areq->request_index); /* Save result so we can return it. */ - node->ms_asyncresults[areq->request_index] = slot; + node->ms.asyncresults[areq->request_index] = slot; /* * Mark the subplan that returned a result as ready for a new request. We * don't launch another one here immediately because it might complete. */ - node->ms_needrequest = bms_add_member(node->ms_needrequest, + node->ms.needrequest = bms_add_member(node->ms.needrequest, areq->request_index); } @@ -749,101 +535,8 @@ ExecAsyncMergeAppendResponse(AsyncRequest *areq) static void ExecMergeAppendAsyncEventWait(MergeAppendState *node) { - int nevents = node->ms_nasyncplans + 2; /* one for PM death and - * one for latch */ - WaitEvent occurred_event[EVENT_BUFFER_SIZE]; - int noccurred; - int i; - /* We should never be called when there are no valid async subplans. */ Assert(bms_num_members(node->ms_asyncremain) > 0); - node->ms_eventset = CreateWaitEventSet(CurrentResourceOwner, nevents); - AddWaitEventToSet(node->ms_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, - NULL, NULL); - - /* Give each waiting subplan a chance to add an event. */ - i = -1; - while ((i = bms_next_member(node->ms_asyncplans, i)) >= 0) - { - AsyncRequest *areq = node->ms_asyncrequests[i]; - - if (areq->callback_pending) - ExecAsyncConfigureWait(areq); - } - - /* - * No need for further processing if none of the subplans configured any - * events. - */ - if (GetNumRegisteredWaitEvents(node->ms_eventset) == 1) - { - FreeWaitEventSet(node->ms_eventset); - node->ms_eventset = NULL; - return; - } - - /* - * Add the process latch to the set, so that we wake up to process the - * standard interrupts with CHECK_FOR_INTERRUPTS(). - * - * NOTE: For historical reasons, it's important that this is added to the - * WaitEventSet after the ExecAsyncConfigureWait() calls. Namely, - * postgres_fdw calls "GetNumRegisteredWaitEvents(set) == 1" to check if - * any other events are in the set. That's a poor design, it's - * questionable for postgres_fdw to be doing that in the first place, but - * we cannot change it now. The pattern has possibly been copied to other - * extensions too. - */ - AddWaitEventToSet(node->ms_eventset, WL_LATCH_SET, PGINVALID_SOCKET, - MyLatch, NULL); - - /* Return at most EVENT_BUFFER_SIZE events in one call. */ - if (nevents > EVENT_BUFFER_SIZE) - nevents = EVENT_BUFFER_SIZE; - - /* - * Wait until at least one event occurs. - */ - noccurred = WaitEventSetWait(node->ms_eventset, -1 /* no timeout */ , occurred_event, - nevents, WAIT_EVENT_APPEND_READY); - FreeWaitEventSet(node->ms_eventset); - node->ms_eventset = NULL; - if (noccurred == 0) - return; - - /* Deliver notifications. */ - for (i = 0; i < noccurred; i++) - { - WaitEvent *w = &occurred_event[i]; - - /* - * Each waiting subplan should have registered its wait event with - * user_data pointing back to its AsyncRequest. - */ - if ((w->events & WL_SOCKET_READABLE) != 0) - { - AsyncRequest *areq = (AsyncRequest *) w->user_data; - - if (areq->callback_pending) - { - /* - * Mark it as no longer needing a callback. We must do this - * before dispatching the callback in case the callback resets - * the flag. - */ - areq->callback_pending = false; - - /* Do the actual work. */ - ExecAsyncNotify(areq); - } - } - - /* Handle standard interrupts */ - if ((w->events & WL_LATCH_SET) != 0) - { - ResetLatch(MyLatch); - CHECK_FOR_INTERRUPTS(); - } - } + ExecAppenderAsyncEventWait(&node->ms, -1 /* no timeout */, WAIT_EVENT_APPEND_READY); } diff --git a/src/backend/nodes/nodeFuncs.c b/src/backend/nodes/nodeFuncs.c index 024a2b2fd84..2f4e2ae6d39 100644 --- a/src/backend/nodes/nodeFuncs.c +++ b/src/backend/nodes/nodeFuncs.c @@ -4751,14 +4751,14 @@ planstate_tree_walker_impl(PlanState *planstate, switch (nodeTag(plan)) { case T_Append: - if (planstate_walk_members(((AppendState *) planstate)->appendplans, - ((AppendState *) planstate)->as_nplans, + if (planstate_walk_members(((AppendState *) planstate)->as.plans, + ((AppendState *) planstate)->as.nplans, walker, context)) return true; break; case T_MergeAppend: - if (planstate_walk_members(((MergeAppendState *) planstate)->mergeplans, - ((MergeAppendState *) planstate)->ms_nplans, + if (planstate_walk_members(((MergeAppendState *) planstate)->ms.plans, + ((MergeAppendState *) planstate)->ms.nplans, walker, context)) return true; break; diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 24325d42f0d..bb84040e8f9 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -1262,11 +1262,11 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) * child plans, to make cross-checking the sort info easier. */ plan = makeNode(Append); - plan->plan.targetlist = tlist; - plan->plan.qual = NIL; - plan->plan.lefttree = NULL; - plan->plan.righttree = NULL; - plan->apprelids = rel->relids; + plan->ap.plan.targetlist = tlist; + plan->ap.plan.qual = NIL; + plan->ap.plan.lefttree = NULL; + plan->ap.plan.righttree = NULL; + plan->ap.apprelids = rel->relids; if (pathkeys != NIL) { @@ -1285,7 +1285,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) &nodeSortOperators, &nodeCollations, &nodeNullsFirst); - tlist_was_changed = (orig_tlist_length != list_length(plan->plan.targetlist)); + tlist_was_changed = (orig_tlist_length != list_length(plan->ap.plan.targetlist)); } /* If appropriate, consider async append */ @@ -1395,7 +1395,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) } /* Set below if we find quals that we can use to run-time prune */ - plan->part_prune_index = -1; + plan->ap.part_prune_index = -1; /* * If any quals exist, they may be useful to perform further partition @@ -1420,16 +1420,16 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) } if (prunequal != NIL) - plan->part_prune_index = make_partition_pruneinfo(root, rel, + plan->ap.part_prune_index = make_partition_pruneinfo(root, rel, best_path->subpaths, prunequal); } - plan->appendplans = subplans; + plan->ap.subplans = subplans; plan->nasyncplans = nasyncplans; plan->first_partial_plan = best_path->first_partial_path; - copy_generic_path_info(&plan->plan, (Path *) best_path); + copy_generic_path_info(&plan->ap.plan, (Path *) best_path); /* * If prepare_sort_from_pathkeys added sort columns, but we were told to @@ -1438,9 +1438,9 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) */ if (tlist_was_changed && (flags & (CP_EXACT_TLIST | CP_SMALL_TLIST))) { - tlist = list_copy_head(plan->plan.targetlist, orig_tlist_length); + tlist = list_copy_head(plan->ap.plan.targetlist, orig_tlist_length); return inject_projection_plan((Plan *) plan, tlist, - plan->plan.parallel_safe); + plan->ap.plan.parallel_safe); } else return (Plan *) plan; @@ -1458,7 +1458,7 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path, int flags) { MergeAppend *node = makeNode(MergeAppend); - Plan *plan = &node->plan; + Plan *plan = &node->ap.plan; List *tlist = build_path_tlist(root, &best_path->path); int orig_tlist_length = list_length(tlist); bool tlist_was_changed; @@ -1479,7 +1479,7 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path, plan->qual = NIL; plan->lefttree = NULL; plan->righttree = NULL; - node->apprelids = rel->relids; + node->ap.apprelids = rel->relids; consider_async = (enable_async_merge_append && !best_path->path.parallel_safe && @@ -1593,7 +1593,7 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path, } /* Set below if we find quals that we can use to run-time prune */ - node->part_prune_index = -1; + node->ap.part_prune_index = -1; /* * If any quals exist, they may be useful to perform further partition @@ -1610,12 +1610,12 @@ create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path, Assert(best_path->path.param_info == NULL); if (prunequal != NIL) - node->part_prune_index = make_partition_pruneinfo(root, rel, + node->ap.part_prune_index = make_partition_pruneinfo(root, rel, best_path->subpaths, prunequal); } - node->mergeplans = subplans; + node->ap.subplans = subplans; /* * If prepare_sort_from_pathkeys added sort columns, but we were told to diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index cd7ea1e6b58..a595f34c87b 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -1850,10 +1850,10 @@ set_append_references(PlannerInfo *root, * check quals. If it's got exactly one child plan, then it's not doing * anything useful at all, and we can strip it out. */ - Assert(aplan->plan.qual == NIL); + Assert(aplan->ap.plan.qual == NIL); /* First, we gotta recurse on the children */ - foreach(l, aplan->appendplans) + foreach(l, aplan->ap.subplans) { lfirst(l) = set_plan_refs(root, (Plan *) lfirst(l), rtoffset); } @@ -1866,11 +1866,11 @@ set_append_references(PlannerInfo *root, * plan may execute the non-parallel aware child multiple times. (If you * change these rules, update create_append_path to match.) */ - if (list_length(aplan->appendplans) == 1) + if (list_length(aplan->ap.subplans) == 1) { - Plan *p = (Plan *) linitial(aplan->appendplans); + Plan *p = (Plan *) linitial(aplan->ap.subplans); - if (p->parallel_aware == aplan->plan.parallel_aware) + if (p->parallel_aware == aplan->ap.plan.parallel_aware) return clean_up_removed_plan_level((Plan *) aplan, p); } @@ -1881,19 +1881,19 @@ set_append_references(PlannerInfo *root, */ set_dummy_tlist_references((Plan *) aplan, rtoffset); - aplan->apprelids = offset_relid_set(aplan->apprelids, rtoffset); + aplan->ap.apprelids = offset_relid_set(aplan->ap.apprelids, rtoffset); /* * Add PartitionPruneInfo, if any, to PlannerGlobal and update the index. * Also update the RT indexes present in it to add the offset. */ - if (aplan->part_prune_index >= 0) - aplan->part_prune_index = - register_partpruneinfo(root, aplan->part_prune_index, rtoffset); + if (aplan->ap.part_prune_index >= 0) + aplan->ap.part_prune_index = + register_partpruneinfo(root, aplan->ap.part_prune_index, rtoffset); /* We don't need to recurse to lefttree or righttree ... */ - Assert(aplan->plan.lefttree == NULL); - Assert(aplan->plan.righttree == NULL); + Assert(aplan->ap.plan.lefttree == NULL); + Assert(aplan->ap.plan.righttree == NULL); return (Plan *) aplan; } @@ -1917,10 +1917,10 @@ set_mergeappend_references(PlannerInfo *root, * or check quals. If it's got exactly one child plan, then it's not * doing anything useful at all, and we can strip it out. */ - Assert(mplan->plan.qual == NIL); + Assert(mplan->ap.plan.qual == NIL); /* First, we gotta recurse on the children */ - foreach(l, mplan->mergeplans) + foreach(l, mplan->ap.subplans) { lfirst(l) = set_plan_refs(root, (Plan *) lfirst(l), rtoffset); } @@ -1934,11 +1934,11 @@ set_mergeappend_references(PlannerInfo *root, * multiple times. (If you change these rules, update * create_merge_append_path to match.) */ - if (list_length(mplan->mergeplans) == 1) + if (list_length(mplan->ap.subplans) == 1) { - Plan *p = (Plan *) linitial(mplan->mergeplans); + Plan *p = (Plan *) linitial(mplan->ap.subplans); - if (p->parallel_aware == mplan->plan.parallel_aware) + if (p->parallel_aware == mplan->ap.plan.parallel_aware) return clean_up_removed_plan_level((Plan *) mplan, p); } @@ -1949,19 +1949,19 @@ set_mergeappend_references(PlannerInfo *root, */ set_dummy_tlist_references((Plan *) mplan, rtoffset); - mplan->apprelids = offset_relid_set(mplan->apprelids, rtoffset); + mplan->ap.apprelids = offset_relid_set(mplan->ap.apprelids, rtoffset); /* * Add PartitionPruneInfo, if any, to PlannerGlobal and update the index. * Also update the RT indexes present in it to add the offset. */ - if (mplan->part_prune_index >= 0) - mplan->part_prune_index = - register_partpruneinfo(root, mplan->part_prune_index, rtoffset); + if (mplan->ap.part_prune_index >= 0) + mplan->ap.part_prune_index = + register_partpruneinfo(root, mplan->ap.part_prune_index, rtoffset); /* We don't need to recurse to lefttree or righttree ... */ - Assert(mplan->plan.lefttree == NULL); - Assert(mplan->plan.righttree == NULL); + Assert(mplan->ap.plan.lefttree == NULL); + Assert(mplan->ap.plan.righttree == NULL); return (Plan *) mplan; } diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c index ff63d20f8d5..eb616c977bc 100644 --- a/src/backend/optimizer/plan/subselect.c +++ b/src/backend/optimizer/plan/subselect.c @@ -2759,7 +2759,7 @@ finalize_plan(PlannerInfo *root, Plan *plan, case T_Append: { - foreach(l, ((Append *) plan)->appendplans) + foreach(l, ((Append *) plan)->ap.subplans) { context.paramids = bms_add_members(context.paramids, @@ -2774,7 +2774,7 @@ finalize_plan(PlannerInfo *root, Plan *plan, case T_MergeAppend: { - foreach(l, ((MergeAppend *) plan)->mergeplans) + foreach(l, ((MergeAppend *) plan)->ap.subplans) { context.paramids = bms_add_members(context.paramids, diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index 9f85eb86da1..ce57f80e5e3 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -5163,9 +5163,9 @@ set_deparse_plan(deparse_namespace *dpns, Plan *plan) * natural choice. */ if (IsA(plan, Append)) - dpns->outer_plan = linitial(((Append *) plan)->appendplans); + dpns->outer_plan = linitial(((Append *) plan)->ap.subplans); else if (IsA(plan, MergeAppend)) - dpns->outer_plan = linitial(((MergeAppend *) plan)->mergeplans); + dpns->outer_plan = linitial(((MergeAppend *) plan)->ap.subplans); else dpns->outer_plan = outerPlan(plan); @@ -7955,10 +7955,10 @@ resolve_special_varno(Node *node, deparse_context *context, if (IsA(dpns->plan, Append)) context->appendparents = bms_union(context->appendparents, - ((Append *) dpns->plan)->apprelids); + ((Append *) dpns->plan)->ap.apprelids); else if (IsA(dpns->plan, MergeAppend)) context->appendparents = bms_union(context->appendparents, - ((MergeAppend *) dpns->plan)->apprelids); + ((MergeAppend *) dpns->plan)->ap.apprelids); push_child_plan(dpns, dpns->outer_plan, &save_dpns); resolve_special_varno((Node *) tle->expr, context, diff --git a/src/include/executor/execAppend.h b/src/include/executor/execAppend.h new file mode 100644 index 00000000000..c1030dc5282 --- /dev/null +++ b/src/include/executor/execAppend.h @@ -0,0 +1,33 @@ +/*------------------------------------------------------------------------- + * execAppend.h + * Support functions for MergeAppend and Append nodes. + * + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/include/executor/execAppend.h + *------------------------------------------------------------------------- + */ + +#ifndef EXECAPPEND_H +#define EXECAPPEND_H + +#include "nodes/execnodes.h" + +void ExecInitAppender(AppenderState * state, + Appender * node, + EState *estate, + int eflags, + int first_partial_plan, + int *first_valid_partial_plan); + +void ExecEndAppender(AppenderState * node); + +void ExecReScanAppender(AppenderState * node); + +void ExecAppenderAsyncBegin(AppenderState * node); + +void ExecAppenderAsyncEventWait(AppenderState * node, int timeout, uint32 wait_event_info); + +#endif /* EXECAPPEND_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 5887cbf4f16..69123a31bbd 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1472,6 +1472,27 @@ typedef struct ModifyTableState List *mt_mergeJoinConditions; } ModifyTableState; +typedef struct AppenderState +{ + PlanState ps; /* its first field is NodeTag */ + PlanState **plans; /* array of PlanStates for my inputs */ + int nplans; + + /* Asynchronous execution state */ + Bitmapset *asyncplans; /* asynchronous plans indexes */ + int nasyncplans; /* # of asynchronous plans */ + AsyncRequest **asyncrequests; /* array of AsyncRequests */ + TupleTableSlot **asyncresults; /* unreturned results of async plans */ + Bitmapset *needrequest; /* asynchronous plans needing a new request */ + struct WaitEventSet *eventset; /* WaitEventSet for file descriptor waits */ + + /* Partition pruning state */ + struct PartitionPruneState *prune_state; + bool valid_subplans_identified; + Bitmapset *valid_subplans; + Bitmapset *valid_asyncplans; /* valid asynchronous plans indexes */ +} AppenderState; + /* ---------------- * AppendState information * @@ -1493,31 +1514,20 @@ struct PartitionPruneState; struct AppendState { - PlanState ps; /* its first field is NodeTag */ - PlanState **appendplans; /* array of PlanStates for my inputs */ - int as_nplans; + AppenderState as; + int as_whichplan; bool as_begun; /* false means need to initialize */ - Bitmapset *as_asyncplans; /* asynchronous plans indexes */ - int as_nasyncplans; /* # of asynchronous plans */ - AsyncRequest **as_asyncrequests; /* array of AsyncRequests */ - TupleTableSlot **as_asyncresults; /* unreturned results of async plans */ - int as_nasyncresults; /* # of valid entries in as_asyncresults */ - bool as_syncdone; /* true if all synchronous plans done in - * asynchronous mode, else false */ + int as_nasyncresults; /* # of valid entries in asyncresults */ + bool as_syncdone; /* all sync plans done in async mode? */ int as_nasyncremain; /* # of remaining asynchronous plans */ - Bitmapset *as_needrequest; /* asynchronous plans needing a new request */ - struct WaitEventSet *as_eventset; /* WaitEventSet used to configure file - * descriptor wait events */ - int as_first_partial_plan; /* Index of 'appendplans' containing - * the first partial plan */ - ParallelAppendState *as_pstate; /* parallel coordination info */ - Size pstate_len; /* size of parallel coordination info */ - struct PartitionPruneState *as_prune_state; - bool as_valid_subplans_identified; /* is as_valid_subplans valid? */ - Bitmapset *as_valid_subplans; - Bitmapset *as_valid_asyncplans; /* valid asynchronous plans indexes */ - bool (*choose_next_subplan) (AppendState *); + int as_first_partial_plan; + + /* Parallel append specific */ + ParallelAppendState *as_pstate; + Size pstate_len; + + bool (*choose_next_subplan) (struct AppendState *); }; /* ---------------- @@ -1537,27 +1547,17 @@ struct AppendState */ typedef struct MergeAppendState { - PlanState ps; /* its first field is NodeTag */ - PlanState **mergeplans; /* array of PlanStates for my inputs */ - int ms_nplans; + AppenderState ms; + int ms_nkeys; SortSupport ms_sortkeys; /* array of length ms_nkeys */ TupleTableSlot **ms_slots; /* array of length ms_nplans */ struct binaryheap *ms_heap; /* binary heap of slot indices */ bool ms_initialized; /* are subplans started? */ - Bitmapset *ms_asyncplans; /* asynchronous plans indexes */ - int ms_nasyncplans; /* # of asynchronous plans */ - AsyncRequest **ms_asyncrequests; /* array of AsyncRequests */ - TupleTableSlot **ms_asyncresults; /* unreturned results of async plans */ + + /* Merge-specific async tracking */ Bitmapset *ms_has_asyncresults; /* plans which have async results */ Bitmapset *ms_asyncremain; /* remaining asynchronous plans */ - Bitmapset *ms_needrequest; /* asynchronous plans needing a new request */ - struct WaitEventSet *ms_eventset; /* WaitEventSet used to configure file - * descriptor wait events */ - struct PartitionPruneState *ms_prune_state; - bool ms_valid_subplans_identified; /* is ms_valid_subplans valid? */ - Bitmapset *ms_valid_subplans; - Bitmapset *ms_valid_asyncplans; /* valid asynchronous plans indexes */ } MergeAppendState; /* Getters for AppendState and MergeAppendState */ @@ -1567,9 +1567,9 @@ GetAppendEventSet(PlanState *ps) Assert(IsA(ps, AppendState) || IsA(ps, MergeAppendState)); if (IsA(ps, AppendState)) - return ((AppendState *) ps)->as_eventset; + return ((AppendState *) ps)->as.eventset; else - return ((MergeAppendState *) ps)->ms_eventset; + return ((MergeAppendState *) ps)->ms.eventset; } static inline Bitmapset * @@ -1578,9 +1578,9 @@ GetNeedRequest(PlanState *ps) Assert(IsA(ps, AppendState) || IsA(ps, MergeAppendState)); if (IsA(ps, AppendState)) - return ((AppendState *) ps)->as_needrequest; + return ((AppendState *) ps)->as.needrequest; else - return ((MergeAppendState *) ps)->ms_needrequest; + return ((MergeAppendState *) ps)->ms.needrequest; } /* Common part of classify_matching_subplans() for Append and MergeAppend */ diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index c4393a94321..30c20e80b40 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -380,6 +380,20 @@ typedef struct ModifyTable struct PartitionPruneInfo; /* forward reference to struct below */ +typedef struct Appender +{ + Plan plan; /* its first field is NodeTag */ + Bitmapset *apprelids; /* RTIs of appendrel(s) formed by this node */ + List *subplans; /* List of Plans (formerly + * appendplans/mergeplans) */ + + /* + * Index into PlannedStmt.partPruneInfos and parallel lists in EState. Set + * to -1 if no run-time pruning is used. + */ + int part_prune_index; +} Appender; + /* ---------------- * Append node - * Generate the concatenation of the results of sub-plans. @@ -387,25 +401,16 @@ struct PartitionPruneInfo; /* forward reference to struct below */ */ typedef struct Append { - Plan plan; - /* RTIs of appendrel(s) formed by this node */ - Bitmapset *apprelids; - List *appendplans; + Appender ap; + /* # of asynchronous plans */ int nasyncplans; /* - * All 'appendplans' preceding this index are non-partial plans. All - * 'appendplans' from this index onwards are partial plans. + * All 'subplans' preceding this index are non-partial plans. All + * 'subplans' from this index onwards are partial plans. */ int first_partial_plan; - - /* - * Index into PlannedStmt.partPruneInfos and parallel lists in EState: - * es_part_prune_states and es_part_prune_results. Set to -1 if no - * run-time pruning is used. - */ - int part_prune_index; } Append; /* ---------------- @@ -415,12 +420,7 @@ typedef struct Append */ typedef struct MergeAppend { - Plan plan; - - /* RTIs of appendrel(s) formed by this node */ - Bitmapset *apprelids; - - List *mergeplans; + Appender ap; /* these fields are just like the sort-key info in struct Sort: */ @@ -438,13 +438,6 @@ typedef struct MergeAppend /* NULLS FIRST/LAST directions */ bool *nullsFirst pg_node_attr(array_size(numCols)); - - /* - * Index into PlannedStmt.partPruneInfos and parallel lists in EState: - * es_part_prune_states and es_part_prune_results. Set to -1 if no - * run-time pruning is used. - */ - int part_prune_index; } MergeAppend; /* ---------------- -- 2.51.2