diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index b776ad1..1a9e39a 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -73,6 +73,8 @@ static void show_sort_keys(SortState *sortstate, List *ancestors, static void show_sort_info(SortState *sortstate, ExplainState *es); static void show_hash_info(HashState *hashstate, ExplainState *es); static const char *explain_get_index_name(Oid indexId); +static void ExplainMergeActions(ModifyTableState *mt_planstate, + List *ancestors, ExplainState *es); static void ExplainScanTarget(Scan *plan, ExplainState *es); static void ExplainMemberNodes(List *plans, PlanState **planstates, List *ancestors, ExplainState *es); @@ -636,6 +638,9 @@ ExplainNode(PlanState *planstate, List *ancestors, case CMD_DELETE: pname = operation = "Delete"; break; + case CMD_MERGE: + pname = operation = "Merge"; + break; default: pname = "???"; break; @@ -1190,6 +1195,8 @@ ExplainNode(PlanState *planstate, List *ancestors, switch (nodeTag(plan)) { case T_ModifyTable: + ExplainMergeActions((ModifyTableState *)planstate, ancestors, es); + ExplainMemberNodes(((ModifyTable *) plan)->plans, ((ModifyTableState *) planstate)->mt_plans, ancestors, es); @@ -1482,6 +1489,64 @@ explain_get_index_name(Oid indexId) return result; } +static void +ExplainMergeActions(ModifyTableState *mt_planstate, List *ancestors, ExplainState *es) +{ + ListCell *l; + StringInfo buf = makeStringInfo(); + + if(mt_planstate->operation != CMD_MERGE || mt_planstate->mergeActPstates == NIL) + return; + + foreach(l,mt_planstate->mergeActPstates) + { + ModifyTableState *mt_state = (ModifyTableState *)lfirst(l); + + MergeActionState *act_pstate = (MergeActionState *)mt_state->mt_plans[0]; + + MergeAction *act_plan = (MergeAction *)act_pstate->ps.plan; + + resetStringInfo(buf); + + /*prepare the string for printing*/ + switch(act_pstate->operation) + { + case CMD_INSERT: + appendStringInfoString(buf, "INSERT WHEN "); + break; + case CMD_UPDATE: + appendStringInfoString(buf, "UPDATE WHEN "); + break; + case CMD_DELETE: + appendStringInfoString(buf, "DELETE WHEN "); + break; + case CMD_DONOTHING: + appendStringInfoString(buf, "DO NOTHING WHEN "); + break; + case CMD_RAISEERR: + appendStringInfoString(buf, "RAISE ERROR WHEN "); + break; + default: + elog(ERROR, "unknown merge action type when explain"); + } + + if(act_plan->matched) + appendStringInfoString(buf, "MATCHED "); + else + appendStringInfoString(buf, "NOT MATCHED "); + + if(act_plan->flattenedqual) + appendStringInfoString(buf, "AND "); + + /*print it*/ + ExplainPropertyText("ACTION", buf->data, es); + + show_qual(act_plan->flattenedqual, " qual", &act_pstate->ps, ancestors, true, es); + + } + +} + /* * Show the target of a Scan node */ diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c index 2cbc192..99cb221 100644 --- a/src/backend/commands/trigger.c +++ b/src/backend/commands/trigger.c @@ -2315,6 +2315,107 @@ ExecASTruncateTriggers(EState *estate, ResultRelInfo *relinfo) false, NULL, NULL, NIL, NULL); } +void +ExecBSMergeTriggers(ModifyTableState *mt_state) +{ + ListCell *l; + + bool doUpdateTriggers = false; + bool doInsertTriggers = false; + bool doDeleteTriggers = false; + + foreach(l, mt_state->mergeActPstates) + { + ModifyTableState *actmtstate; + MergeActionState *actPstate; + MergeAction *actplan; + + actmtstate = (ModifyTable *)lfirst(l); + + actPstate = (MergeActionState *)actmtstate->mt_plans[0]; + + actplan = (MergeAction *)actPstate->ps.plan; + /*the replace action does not fire triggers*/ + if(actplan->replaced) + continue; + + if(actplan->operation == CMD_UPDATE) + doUpdateTriggers = true; + else if(actplan->operation == CMD_INSERT) + doInsertTriggers = true; + else if(actplan->operation == CMD_DELETE) + doDeleteTriggers = true; + + } + + /*fire the triggers*/ + if(doUpdateTriggers) + ExecBSUpdateTriggers(mt_state->ps.state, + mt_state->ps.state->es_result_relations); + + + if(doInsertTriggers) + ExecBSInsertTriggers(mt_state->ps.state, + mt_state->ps.state->es_result_relations); + + + if(doDeleteTriggers) + ExecBSDeleteTriggers(mt_state->ps.state, + mt_state->ps.state->es_result_relations); + + return; +} + +void +ExecASMergeTriggers(ModifyTableState *mt_state) +{ + ListCell *l; + + bool doUpdateTriggers = false; + bool doInsertTriggers = false; + bool doDeleteTriggers = false; + + foreach(l, mt_state->mergeActPstates) + { + ModifyTableState *actmtstate; + MergeActionState *actPstate; + MergeAction *actplan; + + actmtstate = (ModifyTable *)lfirst(l); + + actPstate = (MergeActionState *)actmtstate->mt_plans[0]; + + actplan = (MergeAction *)actPstate->ps.plan; + /*the replace action does not fire triggers*/ + if(actplan->replaced) + continue; + + if(actplan->operation == CMD_UPDATE) + doUpdateTriggers = true; + else if(actplan->operation == CMD_INSERT) + doInsertTriggers = true; + else if(actplan->operation == CMD_DELETE) + doDeleteTriggers = true; + + } + + /*fire the triggers*/ + if(doUpdateTriggers) + ExecASUpdateTriggers(mt_state->ps.state, + mt_state->ps.state->es_result_relations); + + + if(doInsertTriggers) + ExecASInsertTriggers(mt_state->ps.state, + mt_state->ps.state->es_result_relations); + + + if(doDeleteTriggers) + ExecASDeleteTriggers(mt_state->ps.state, + mt_state->ps.state->es_result_relations); + + return; +} static HeapTuple GetTupleForTrigger(EState *estate, diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 2f33fdb..56c13ee 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -171,6 +171,7 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) case CMD_INSERT: case CMD_DELETE: case CMD_UPDATE: + case CMD_MERGE: estate->es_output_cid = GetCurrentCommandId(true); break; diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index f4cc7d9..2d1a4e7 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -153,6 +153,11 @@ ExecInitNode(Plan *node, EState *estate, int eflags) estate, eflags); break; + case T_MergeAction: + result = (PlanState *) ExecInitMergeAction((MergeAction*) node, + estate, eflags); + break; + case T_Append: result = (PlanState *) ExecInitAppend((Append *) node, estate, eflags); diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 8619ce3..8ab81ae 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -582,6 +582,143 @@ lreplace:; return NULL; } +static TupleTableSlot * +MergeRaiseErr(void) +{ + elog(NOTICE, "one tuple is ERROR"); + return NULL; +} + +static TupleTableSlot * +ExecMerge(ItemPointer tupleid, + TupleTableSlot *slot, + TupleTableSlot *planSlot, + ModifyTableState *node, + EState *estate) +{ + + TupleTableSlot *actslot = NULL; + TupleTableSlot *res = NULL; + ListCell *each; + + /* + * try the merge actions one by one + */ + foreach(each, node->mergeActPstates) + { + ModifyTableState *mt_pstate; + + MergeActionState *action_pstate; + + ExprContext *econtext; + + bool matched; + + + mt_pstate = (ModifyTableState *)lfirst(each); + + /* + * mt_pstate is supposed to have only ONE mt_plans, + * which is a MergeActionState + */ + Assert(mt_pstate->mt_nplans == 1); + + action_pstate = (MergeActionState *)mt_pstate->mt_plans[0]; + + matched = ((MergeAction *)action_pstate->ps.plan)->matched; + + + /* + * If tupleid == NULL, it is a NOT MATCHED case, + * else, it is a MATCHED case, + */ + if((tupleid == NULL && matched) || (tupleid != NULL && !matched)) + { + continue; + } + + /*Setup the expression context*/ + econtext = action_pstate->ps.ps_ExprContext; + + /* + If the action has an additional qual, + which is not satisfied, skip it + */ + if(action_pstate->ps.qual) + { + ResetExprContext(econtext); + + econtext->ecxt_scantuple = slot; + econtext->ecxt_outertuple = planSlot; + + if(!ExecQual(action_pstate->ps.qual, econtext,false)) + { + continue; + } + } + + + /* + * OK, the input tuple is caugth by current action. + * If this action is "replaced" by rules, we will skip it + * AND THE REMAINING ACTIONS. + */ + Assert(IsA(action_pstate->ps.plan, MergeAction)); + if(((MergeAction *)action_pstate->ps.plan)->replaced) + return NULL; + + + /*Now we start to exec this action. + We have 5 action types*/ + + /*1. do nothing for a DO NOTHING action*/ + if(action_pstate->operation == CMD_DONOTHING) + return NULL; + + /*2. throw an error for a RAISE ERROR action*/ + if(action_pstate->operation == CMD_RAISEERR) + return MergeRaiseErr(); + + /*3. project the result tuple slot, for INSERT/UPDATE action*/ + if(action_pstate->operation != CMD_DELETE) + actslot = ExecProcessReturning(action_pstate->ps.ps_ProjInfo, + slot, planSlot); + + switch (action_pstate->operation) + { + case CMD_INSERT: + res = ExecInsert(actslot, planSlot, estate); + return res; + break; + case CMD_UPDATE: + res = ExecUpdate(tupleid, + actslot, + planSlot, + &mt_pstate->mt_epqstate, + estate); + return res; + break; + case CMD_DELETE: + res = ExecDelete(tupleid, + planSlot, + &mt_pstate->mt_epqstate, + estate); + return res; + break; + default: + elog(ERROR, "unknown merge action type for excute"); + break; + } + + } + + /* + * Here, no action is taken. Let's do the default thing, + * which is Raise Error in crrent edition + */ + return MergeRaiseErr(); + +} /* * Process BEFORE EACH STATEMENT triggers @@ -603,6 +740,9 @@ fireBSTriggers(ModifyTableState *node) ExecBSDeleteTriggers(node->ps.state, node->ps.state->es_result_relations); break; + case CMD_MERGE: + ExecBSMergeTriggers(node); + break; default: elog(ERROR, "unknown operation"); break; @@ -629,6 +769,9 @@ fireASTriggers(ModifyTableState *node) ExecASDeleteTriggers(node->ps.state, node->ps.state->es_result_relations); break; + case CMD_MERGE: + ExecASMergeTriggers(node); + break; default: elog(ERROR, "unknown operation"); break; @@ -708,20 +851,34 @@ ExecModifyTable(ModifyTableState *node) /* * extract the 'ctid' junk attribute. */ - if (operation == CMD_UPDATE || operation == CMD_DELETE) + if (operation == CMD_UPDATE || operation == CMD_DELETE || operation == CMD_MERGE) { Datum datum; bool isNull; datum = ExecGetJunkAttribute(slot, junkfilter->jf_junkAttNo, &isNull); - /* shouldn't ever get a null result... */ + if (isNull) - elog(ERROR, "ctid is NULL"); - - tupleid = (ItemPointer) DatumGetPointer(datum); - tuple_ctid = *tupleid; /* be sure we don't free the ctid!! */ - tupleid = &tuple_ctid; + { + /* + * shouldn't ever get a null result for update and delete. + * Merge command will get a null ctid in "NOT MATCHED" case + */ + if(operation != CMD_MERGE) + elog(ERROR, "ctid is NULL"); + else + tupleid = NULL; + } + else + { + + tupleid = (ItemPointer) DatumGetPointer(datum); + + tuple_ctid = *tupleid; /* be sure we don't free the ctid!! */ + tupleid = &tuple_ctid; + + } } /* @@ -744,6 +901,10 @@ ExecModifyTable(ModifyTableState *node) slot = ExecDelete(tupleid, planSlot, &node->mt_epqstate, estate); break; + case CMD_MERGE: + slot = ExecMerge(tupleid, slot, planSlot, + node, estate); + break; default: elog(ERROR, "unknown operation"); break; @@ -771,6 +932,74 @@ ExecModifyTable(ModifyTableState *node) return NULL; } +/* +* When init a merge plan, we also need init its action plans. +* These action plans are "MergeAction" plans . +* +* This function mainly handles the tlist and qual in the plan. +* The returning result is a "MergeActionState". +*/ +MergeActionState * +ExecInitMergeAction(MergeAction *node, EState *estate, int eflags) +{ + MergeActionState *result; + + /* + * do nothing when we get to the end of a leaf on tree. + */ + if (node == NULL) + return NULL; + + /* + * create state structure + */ + result = makeNode(MergeActionState); + result->operation = node->operation; + result->ps.plan = (Plan *)node; + result->ps.state = estate; + + /* + * tuple table initialization + */ + ExecInitResultTupleSlot(estate, &result->ps); + + /* + * initialize tuple type + */ + ExecAssignResultTypeFromTL(&result->ps); + + + /* + * create expression context for node + */ + + ExecAssignExprContext(estate, &result->ps); + + + /* + * initialize child expressions + */ + result->ps.targetlist = (List *) + ExecInitExpr((Expr *) node->plan.targetlist, &result->ps); + + + result->ps.qual = (List *) + ExecInitExpr((Expr *) node->plan.qual, &result->ps); + + + /* + * init the projection information + */ + ExecAssignProjectionInfo(&result->ps, NULL); + + /* + do we need a check for the plan output here ? + (by calling the ExecCheckPlanOutput() function + */ + + return result; +} + /* ---------------------------------------------------------------- * ExecInitModifyTable * ---------------------------------------------------------------- @@ -786,6 +1015,7 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) Plan *subplan; ListCell *l; int i; + bool isMergeAction = false; /* check for unsupported flags */ Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); @@ -826,6 +1056,16 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) foreach(l, node->plans) { subplan = (Plan *) lfirst(l); + + /* + * test if this subplan node is a MergeAction. + * We need this information for setting the junckfilter. + * juckfiler is necessary for an ordinary UPDATE/DELETE plan, + * but not for an UPDATE/DELETE merge action + */ + if(IsA(subplan, MergeAction)) + isMergeAction = true; + mtstate->mt_plans[i] = ExecInitNode(subplan, estate, eflags); estate->es_result_relation_info++; i++; @@ -955,7 +1195,12 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) break; case CMD_UPDATE: case CMD_DELETE: - junk_filter_needed = true; + case CMD_MERGE: + if(!isMergeAction) + junk_filter_needed = true; + break; + case CMD_DONOTHING: + case CMD_RAISEERR: break; default: elog(ERROR, "unknown operation"); @@ -978,9 +1223,9 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) resultRelInfo->ri_RelationDesc->rd_att->tdhasoid, ExecInitExtraTupleSlot(estate)); - if (operation == CMD_UPDATE || operation == CMD_DELETE) + if (operation == CMD_UPDATE || operation == CMD_DELETE || operation == CMD_MERGE) { - /* For UPDATE/DELETE, find the ctid junk attr now */ + /* For UPDATE/DELETE/MERGE, find the ctid junk attr now */ j->jf_junkAttNo = ExecFindJunkAttribute(j, "ctid"); if (!AttributeNumberIsValid(j->jf_junkAttNo)) elog(ERROR, "could not find junk ctid column"); @@ -1006,6 +1251,19 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) if (estate->es_trig_tuple_slot == NULL) estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate); + /* + * for the merge actions, we need to do similar things as above + */ + foreach(l, node->mergeActPlan) + { + PlanState *actpstate = ExecInitNode((Plan *)lfirst(l), estate, 0); + /* + * put the pstates of each action into ModifyTableState + */ + mtstate->mergeActPstates = lappend(mtstate->mergeActPstates, actpstate); + + } + return mtstate; } diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 93dcef5..8036a27 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -176,6 +176,7 @@ _copyModifyTable(ModifyTable *from) COPY_NODE_FIELD(returningLists); COPY_NODE_FIELD(rowMarks); COPY_SCALAR_FIELD(epqParam); + COPY_NODE_FIELD(mergeActPlan); return newnode; } @@ -2273,6 +2274,11 @@ _copyQuery(Query *from) COPY_NODE_FIELD(rowMarks); COPY_NODE_FIELD(setOperations); + COPY_SCALAR_FIELD(isMergeAction); + COPY_SCALAR_FIELD(replaced); + /*merge action list*/ + COPY_NODE_FIELD(mergeActQry); + return newnode; } @@ -2343,6 +2349,59 @@ _copySelectStmt(SelectStmt *from) return newnode; } +static MergeStmt * +_copyMergeStmt(MergeStmt *from) +{ + MergeStmt *newnode = makeNode(MergeStmt); + + COPY_NODE_FIELD(relation); + COPY_NODE_FIELD(source); + COPY_NODE_FIELD(matchCondition); + COPY_NODE_FIELD(actions); + + return newnode; + +} + +static MergeConditionAction * +_copyMergeConditionAction(MergeConditionAction *from) +{ + MergeConditionAction *newnode = makeNode(MergeConditionAction); + + COPY_SCALAR_FIELD(match); + COPY_NODE_FIELD(condition); + COPY_NODE_FIELD(action); + + return newnode; +} + +static MergeUpdate * +_copyMergeUpdate(MergeUpdate *from) +{ + MergeUpdate *newNode = (MergeUpdate *)_copyUpdateStmt((UpdateStmt *) from); + newNode->type = T_MergeUpdate; + + return newNode; +} + +static MergeInsert * +_copyMergeInsert(MergeInsert *from) +{ + MergeInsert *newNode = (MergeInsert *)_copyInsertStmt((InsertStmt *) from); + newNode->type = T_MergeInsert; + + return newNode; +} + +static MergeDelete * +_copyMergeDelete(MergeDelete *from) +{ + MergeDelete *newNode = (MergeDelete *)_copyDeleteStmt((DeleteStmt *) from); + newNode->type = T_MergeDelete; + + return newNode; +} + static SetOperationStmt * _copySetOperationStmt(SetOperationStmt *from) { @@ -3903,6 +3962,21 @@ copyObject(void *from) case T_SelectStmt: retval = _copySelectStmt(from); break; + case T_MergeStmt: + retval = _copyMergeStmt(from); + break; + case T_MergeConditionAction: + retval = _copyMergeConditionAction(from); + break; + case T_MergeUpdate: + retval = _copyMergeUpdate(from); + break; + case T_MergeInsert: + retval = _copyMergeInsert(from); + break; + case T_MergeDelete: + retval = _copyMergeDelete(from); + break; case T_SetOperationStmt: retval = _copySetOperationStmt(from); break; diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 5d83727..3e3589a 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -878,6 +878,9 @@ _equalQuery(Query *a, Query *b) COMPARE_NODE_FIELD(rowMarks); COMPARE_NODE_FIELD(setOperations); + COMPARE_SCALAR_FIELD(isMergeAction); + COMPARE_SCALAR_FIELD(replaced); + COMPARE_NODE_FIELD(mergeActQry); return true; } diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 79baf4f..f61bd84 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -331,6 +331,7 @@ _outModifyTable(StringInfo str, ModifyTable *node) WRITE_NODE_FIELD(returningLists); WRITE_NODE_FIELD(rowMarks); WRITE_INT_FIELD(epqParam); + WRITE_NODE_FIELD(mergeActPlan); } static void @@ -2019,6 +2020,56 @@ _outQuery(StringInfo str, Query *node) WRITE_NODE_FIELD(limitCount); WRITE_NODE_FIELD(rowMarks); WRITE_NODE_FIELD(setOperations); + WRITE_BOOL_FIELD(isMergeAction); + WRITE_BOOL_FIELD(matched); + WRITE_BOOL_FIELD(replaced); + WRITE_NODE_FIELD(mergeActQry); +} + +static void +_outMergeConditionAction(StringInfo str, MergeConditionAction *node) +{ + WRITE_NODE_TYPE("MERGECONDITIONACTION"); + + WRITE_BOOL_FIELD(match); + + WRITE_NODE_FIELD(condition); + WRITE_NODE_FIELD(action); + + +} + +static void +_outMergeStmt(StringInfo str, MergeStmt *node) +{ + WRITE_NODE_TYPE("MERGESTMT"); + + WRITE_NODE_FIELD(relation); + WRITE_NODE_FIELD(source); + WRITE_NODE_FIELD(matchCondition); + WRITE_NODE_FIELD(actions); + +} + +static void +_outMergeAction(StringInfo str, MergeAction*node) +{ + _outPlanInfo(str, (Plan *)node); + WRITE_BOOL_FIELD(replaced); + WRITE_ENUM_FIELD(operation, CmdType); + WRITE_BOOL_FIELD(matched); + WRITE_NODE_FIELD(flattenedqual); +} + +static void +_outDeleteStmt(StringInfo str, DeleteStmt *node) +{ + WRITE_NODE_TYPE("DELETESTMT"); + + WRITE_NODE_FIELD(relation); + WRITE_NODE_FIELD(usingClause); + WRITE_NODE_FIELD(whereClause); + WRITE_NODE_FIELD(returningList); } static void @@ -2904,6 +2955,18 @@ _outNode(StringInfo str, void *obj) case T_XmlSerialize: _outXmlSerialize(str, obj); break; + case T_MergeAction: + _outMergeAction(str, obj); + break; + case T_MergeStmt: + _outMergeStmt(str, obj); + break; + case T_MergeConditionAction: + _outMergeConditionAction(str,obj); + break; + case T_DeleteStmt: + _outDeleteStmt(str,obj); + break; default: diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index bc6e2a6..9c137cc 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -218,7 +218,11 @@ _readQuery(void) READ_NODE_FIELD(limitCount); READ_NODE_FIELD(rowMarks); READ_NODE_FIELD(setOperations); - + READ_BOOL_FIELD(isMergeAction); + READ_BOOL_FIELD(matched); + READ_BOOL_FIELD(replaced); + READ_NODE_FIELD(mergeActQry); + READ_DONE(); } diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 3950ab4..2b7fd13 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -102,7 +102,12 @@ static void get_column_info_for_window(PlannerInfo *root, WindowClause *wc, int *ordNumCols, AttrNumber **ordColIdx, Oid **ordOperators); - +static ModifyTable *merge_action_planner(PlannerGlobal *glob, + Query *parse, + Plan *top_plan); +static void merge_action_list_planner(PlannerGlobal *glob, + Query *parse, + ModifyTable *mainplan); /***************************************************************************** * @@ -565,6 +570,11 @@ subquery_planner(PlannerGlobal *glob, Query *parse, returningLists, rowMarks, SS_assign_special_param(root)); + + /*do a simple plan for each actions in the merge command. + *put them in mergeActPlan list; + */ + merge_action_list_planner(glob, parse, (ModifyTable *)plan); } } @@ -584,6 +594,137 @@ subquery_planner(PlannerGlobal *glob, Query *parse, return plan; } +static void +merge_action_list_planner(PlannerGlobal *glob, Query *parse, ModifyTable *mainplan) +{ + ListCell *l; + + /*this is a function for MERGE command only*/ + if(parse->commandType != CMD_MERGE || + mainplan->operation != CMD_MERGE) + return; + + /*if the merge actions are already there, no need to do it again*/ + if(mainplan->mergeActPlan != NIL) + return; + + /*plan each action query*/ + foreach(l, parse->mergeActQry) + { + Plan *actplan = (Plan *)merge_action_planner(glob, + (Query *)lfirst(l), + (Plan *)linitial(mainplan->plans) + ); + + mainplan->mergeActPlan = lappend(mainplan->mergeActPlan, actplan); + } + + return; +} + +/*create plan for a single merge action*/ +static ModifyTable * +merge_action_planner(PlannerGlobal *glob, Query *parse, + Plan *top_plan) +{ + PlannerInfo *root; + MergeAction *actplan; + ModifyTable *result; + + List *returningLists; + List *rowMarks; + + /* + * no having clause in a merge action + */ + Assert(parse->havingQual == NULL); + + + /* Create a PlannerInfo data structure for this subquery */ + root = makeNode(PlannerInfo); + root->parse = parse; + root->glob = glob; + root->query_level = 1; + root->parent_root = NULL; + root->planner_cxt = CurrentMemoryContext; + root->init_plans = NIL; + root->cte_plan_ids = NIL; + root->eq_classes = NIL; + root->append_rel_list = NIL; + root->hasPseudoConstantQuals = false; + root->hasRecursion = false; + root->wt_param_id = -1; + root->non_recursive_plan = NULL; + + + /* + * Create the action plan node + */ + actplan = makeNode(MergeAction); + actplan->operation = parse->commandType; + actplan->replaced = parse->replaced; + actplan->matched = parse->matched; + + /* + * Do expression preprocessing on targetlist and quals. + */ + parse->targetList = (List *) + preprocess_expression(root, (Node *) parse->targetList, + EXPRKIND_TARGET); + + preprocess_qual_conditions(root, (Node *) parse->jointree); + + + /* + * we need a flat qual for explaining + */ + actplan->flattenedqual = flatten_join_alias_vars(root, parse->jointree->quals); + + /*copy the cost from the top_plan*/ + actplan->plan.startup_cost = top_plan->startup_cost; + actplan->plan.total_cost = top_plan->total_cost; + actplan->plan.plan_rows = top_plan->plan_rows; + actplan->plan.plan_width = top_plan->plan_width; + + /* + * prepare the result + */ + if(parse->targetList) + actplan->plan.targetlist = preprocess_targetlist(root,parse->targetList); + + actplan->plan.qual = (List *)parse->jointree->quals; + push_up_merge_action_vars(actplan, parse); + + if (parse->returningList) + { + List *rlist; + + Assert(parse->resultRelation); + rlist = set_returning_clause_references(root->glob, + parse->returningList, + &actplan->plan, + parse->resultRelation); + returningLists = list_make1(rlist); + } + else + returningLists = NIL; + + + if (parse->rowMarks) + rowMarks = NIL; + else + rowMarks = root->rowMarks; + + result = make_modifytable(parse->commandType, + copyObject(root->resultRelations), + list_make1(actplan), + returningLists, + rowMarks, + SS_assign_special_param(root)); + + return result; +} + /* * preprocess_expression * Do subquery_planner's preprocessing work for an expression, diff --git a/src/backend/optimizer/prep/preptlist.c b/src/backend/optimizer/prep/preptlist.c index 59d3518..b4514b8 100644 --- a/src/backend/optimizer/prep/preptlist.c +++ b/src/backend/optimizer/prep/preptlist.c @@ -78,13 +78,23 @@ preprocess_targetlist(PlannerInfo *root, List *tlist) result_relation, range_table); /* - * for "update" and "delete" queries, add ctid of the result relation into - * the target list so that the ctid will propagate through execution and - * ExecutePlan() will be able to identify the right tuple to replace or - * delete. This extra field is marked "junk" so that it is not stored + * for "update" , "delete" and "merge" queries, add ctid of the result + * relation into the target list so that the ctid will propagate through + * execution and ExecutePlan() will be able to identify the right tuple + * to replace or delete. + * This extra field is marked "junk" so that it is not stored * back into the tuple. + * + * BUT, if the query node is a merge action, + * we don't need to expend the ctid attribute in tlist. + * The tlist of the merge top level plan already contains + * a "ctid" junk attr of the target relation. */ - if (command_type == CMD_UPDATE || command_type == CMD_DELETE) + + if(!parse->isMergeAction && + (command_type == CMD_UPDATE || + command_type == CMD_DELETE || + command_type == CMD_MERGE)) { TargetEntry *tle; Var *var; diff --git a/src/backend/optimizer/util/var.c b/src/backend/optimizer/util/var.c index 92c2208..1775554 100644 --- a/src/backend/optimizer/util/var.c +++ b/src/backend/optimizer/util/var.c @@ -67,6 +67,16 @@ typedef struct bool inserted_sublink; /* have we inserted a SubLink? */ } flatten_join_alias_vars_context; +typedef struct +{ + int varno_source; + int varno_target; + int varno_join; + + int offset_source; + int offset_target; +} push_up_merge_action_vars_context; + static bool pull_varnos_walker(Node *node, pull_varnos_context *context); static bool pull_varattnos_walker(Node *node, Bitmapset **varattnos); @@ -83,6 +93,8 @@ static bool pull_var_clause_walker(Node *node, static Node *flatten_join_alias_vars_mutator(Node *node, flatten_join_alias_vars_context *context); static Relids alias_relid_set(PlannerInfo *root, Relids relids); +static bool push_up_merge_action_vars_walker(Node *node, + push_up_merge_action_vars_context *context); /* @@ -677,6 +689,91 @@ pull_var_clause_walker(Node *node, pull_var_clause_context *context) (void *) context); } +/* +* When prepare for the MERGE command, we have made a +* left join between the Source table and target table as the +* main plan. +* +* In this case, the range table contains ONLY THREE range table entries: +* 1. the source table, which may be a subquery or a plain table +* 2. the entry of the targe table, which is a plain table +* 3. join expression with the sourse table and target table as its parameters. +* +* Each merge action of the command has its own query and +* plan nodes as well. And, the vars in its target list and qual +* expressions may refers to the attribute in any one of the above 3 +* range table entries. +* +* However, since the result tuple slots of merge actions are +* projected from the returned tuple of the join, we need to +* mapping the vars of source table and target table to their +* corresponding attributes in the third range table entry. +* +* This function does the opposit of the flatten_join_alias_vars() +* function. It walks through the target list and qual of a +* MergeAction plan, changes the vars' varno and varattno to the +* corresponding position in the upper level join RTE. +*/ +void +push_up_merge_action_vars(MergeAction *actplan, Query *actqry) +{ + push_up_merge_action_vars_context context; + RangeTblEntry *source_rte = rt_fetch(1,actqry->rtable); + + + /* + * We are supposed to do a more careful assingment + * of the values in context + * But lets take a shortcut for simple. + */ + context.varno_source = 1; + context.varno_target = 2; + context.varno_join = 3; + + context.offset_source = 0; + + + context.offset_target = list_length(source_rte->eref->colnames); + + push_up_merge_action_vars_walker(actplan->plan.targetlist, &context); + + push_up_merge_action_vars_walker(actplan->plan.qual, &context); + +} + +static bool +push_up_merge_action_vars_walker(Node *node, + push_up_merge_action_vars_context *context) +{ + if (node == NULL) + return false; + if (IsA(node, Var)) + { + Var *var = (Var *)node; + + if(var->varno == context->varno_source) + { + var->varno = context->varno_join; + var->varattno += context->offset_source; + return false; + } + else if(var->varno == context->varno_target) + { + var->varno = context->varno_join; + var->varattno += context->offset_target; + return false; + } + else if(var->varno == context->varno_join) + return false; + else + elog(ERROR, "the vars in merge action tlist of qual should only belongs to the source table or targe table"); + + + } + + return expression_tree_walker(node, push_up_merge_action_vars_walker, + (void *) context); +} /* * flatten_join_alias_vars diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c index 6b99a10..b23cadd 100644 --- a/src/backend/parser/analyze.c +++ b/src/backend/parser/analyze.c @@ -47,6 +47,7 @@ static Query *transformDeleteStmt(ParseState *pstate, DeleteStmt *stmt); static Query *transformInsertStmt(ParseState *pstate, InsertStmt *stmt); static List *transformInsertRow(ParseState *pstate, List *exprlist, List *stmtcols, List *icolumns, List *attrnos); +static Query *transformMergeStmt(ParseState *pstate, MergeStmt *stmt); static Query *transformSelectStmt(ParseState *pstate, SelectStmt *stmt); static Query *transformValuesClause(ParseState *pstate, SelectStmt *stmt); static Query *transformSetOperationStmt(ParseState *pstate, SelectStmt *stmt); @@ -164,17 +165,24 @@ transformStmt(ParseState *pstate, Node *parseTree) * Optimizable statements */ case T_InsertStmt: + case T_MergeInsert: result = transformInsertStmt(pstate, (InsertStmt *) parseTree); break; case T_DeleteStmt: + case T_MergeDelete: result = transformDeleteStmt(pstate, (DeleteStmt *) parseTree); break; case T_UpdateStmt: + case T_MergeUpdate: result = transformUpdateStmt(pstate, (UpdateStmt *) parseTree); break; + case T_MergeStmt: + result = transformMergeStmt(pstate, (MergeStmt *)parseTree); + break; + case T_SelectStmt: { SelectStmt *n = (SelectStmt *) parseTree; @@ -282,22 +290,28 @@ transformDeleteStmt(ParseState *pstate, DeleteStmt *stmt) qry->commandType = CMD_DELETE; - /* set up range table with just the result rel */ - qry->resultRelation = setTargetTable(pstate, stmt->relation, - interpretInhOption(stmt->relation->inhOpt), - true, - ACL_DELETE); - qry->distinctClause = NIL; /* - * The USING clause is non-standard SQL syntax, and is equivalent in - * functionality to the FROM list that can be specified for UPDATE. The - * USING keyword is used rather than FROM because FROM is already a - * keyword in the DELETE syntax. - */ - transformFromClause(pstate, stmt->usingClause); - + * The input stmt could be a MergeDelete node. + * In this case, we don't need the process on range table. + */ + if(IsA(stmt, DeleteStmt)) + { + /* set up range table with just the result rel */ + qry->resultRelation = setTargetTable(pstate, stmt->relation, + interpretInhOption(stmt->relation->inhOpt), + true, + ACL_DELETE); + /* + * The USING clause is non-standard SQL syntax, and is equivalent in + * functionality to the FROM list that can be specified for UPDATE. The + * USING keyword is used rather than FROM because FROM is already a + * keyword in the DELETE syntax. + */ + transformFromClause(pstate, stmt->usingClause); + } + qual = transformWhereClause(pstate, stmt->whereClause, "WHERE"); qry->returningList = transformReturningList(pstate, stmt->returningList); @@ -347,6 +361,8 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt) * VALUES list, or general SELECT input. We special-case VALUES, both for * efficiency and so we can handle DEFAULT specifications. */ + + /*a MergeInsert statment is always a VALUE clause*/ isGeneralSelect = (selectStmt && selectStmt->valuesLists == NIL); /* @@ -382,7 +398,8 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt) * mentioned in the SELECT part. Note that the target table is not added * to the joinlist or namespace. */ - qry->resultRelation = setTargetTable(pstate, stmt->relation, + if(IsA(stmt,InsertStmt))/*for MergeInsert, no need to do this*/ + qry->resultRelation = setTargetTable(pstate, stmt->relation, false, false, ACL_INSERT); /* Validate stmt->cols list, or build default list if no list given */ @@ -1730,16 +1747,19 @@ transformUpdateStmt(ParseState *pstate, UpdateStmt *stmt) qry->commandType = CMD_UPDATE; pstate->p_is_update = true; - qry->resultRelation = setTargetTable(pstate, stmt->relation, - interpretInhOption(stmt->relation->inhOpt), - true, - ACL_UPDATE); + if(IsA(stmt, UpdateStmt))/*for MergeUpdate, no need to do this*/ + { + qry->resultRelation = setTargetTable(pstate, stmt->relation, + interpretInhOption(stmt->relation->inhOpt), + true, + ACL_UPDATE); - /* - * the FROM clause is non-standard SQL syntax. We used to be able to do - * this with REPLACE in POSTQUEL so we keep the feature. - */ - transformFromClause(pstate, stmt->fromClause); + /* + * the FROM clause is non-standard SQL syntax. We used to be able to do + * this with REPLACE in POSTQUEL so we keep the feature. + */ + transformFromClause(pstate, stmt->fromClause); + } qry->targetList = transformTargetList(pstate, stmt->targetList); @@ -2241,3 +2261,389 @@ applyLockingClause(Query *qry, Index rtindex, rc->pushedDown = pushedDown; qry->rowMarks = lappend(qry->rowMarks, rc); } + +/* +transform an action of merge command into a query. +No change of the pstate range table is allowed in this function. +*/ +static Query * +transformMergeActions(ParseState *pstate, MergeStmt *stmt, MergeConditionAction *condact) +{ + Query *actqry; + + /* + * firstly, we need to make sure that DELETE and UPDATE + * actions are only taken in MATCHED condition, + * and INSERTs are only takend when not MATCHED + */ + + switch(condact->action->type) + { + case T_MergeDelete:/*a delete action*/ + { + MergeDelete *deleteact = (MergeDelete *)(condact->action); + Assert(IsA(deleteact,MergeDelete)); + + if(!condact->match) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("The DELETE action in MERGE command is not allowed when NOT MATCHED"))); + + /*put new right code to the result relaion. + This line chages the RTE in range table directly*/ + pstate->p_target_rangetblentry->requiredPerms |= ACL_DELETE; + + deleteact->relation = stmt->relation; + deleteact->usingClause = stmt->source; + deleteact->whereClause = condact->condition;; + + /*parse the action query*/ + actqry = transformStmt(pstate, (Node *)deleteact); + + if(!IsA(actqry, Query) || + actqry->commandType != CMD_DELETE || + actqry->utilityStmt != NULL) + elog(ERROR, "improper DELETE action in merge stmt"); + + actqry->isMergeAction = true; + actqry->matched = condact->match; + + return actqry; + } + break; + case T_MergeUpdate:/*an update action*/ + { + MergeUpdate *updateact = (MergeUpdate *)(condact->action); + Assert(IsA(updateact,MergeUpdate)); + + + if(!condact->match) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("The UPDATE action in MERGE command is not allowed when NOT MATCHED"))); + + pstate->p_target_rangetblentry->requiredPerms |= ACL_UPDATE; + + + /*the "targetlist" of the updateact is filled in the parser */ + updateact->relation = stmt->relation; + updateact->fromClause = stmt->source; + updateact->whereClause = condact->condition; + + /*parse the action query*/ + actqry = transformStmt(pstate, (Node *)updateact); + + if(!IsA(actqry, Query) || + actqry->commandType != CMD_UPDATE|| + actqry->utilityStmt != NULL) + elog(ERROR, "improper UPDATE action in merge stmt"); + + actqry->isMergeAction = true; + actqry->matched = condact->match; + + return actqry; + } + break; + case T_MergeInsert:/*an insert action*/ + { + MergeInsert *insertact = (MergeInsert *)(condact->action); + Assert(IsA(insertact,MergeInsert)); + + if(condact->match) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("The INSERT action in MERGE command is not allowed when MATCHED"))); + + + pstate->p_target_rangetblentry->requiredPerms |= ACL_INSERT; + + /*the "cols" and "selectStmt" of the insertact is filled in the parser */ + insertact->relation = stmt->relation; + + /* + the merge insert action has a strange feature. + In an ordinary INSERT, the VALUES list can only + contains constants and DEFAULT. (am I right??) + But in the INSERT action of MERGE command, + the VALUES list can have expressions with + variables(attributes of the targe and source tables). + Besides, in the ordinary INSERT, a VALUES list can + never be followed by a WHERE clause. + But in MERGE INSERT action, there are matching conditions. + + Thus, the output qry of this function is an INSERT + query in the style of "INSERT...VALUES...", + except that we have other range tables and a WHERE clause. + Note that it is also different from the "INSERT ... SELECT..." + query, in which the whole SELECT is a subquery. + (We don't have subquery here). + + We construct this novel query structure in order + to keep consitency with other merge action types + (DELETE, UPDATE). In this way, all the merge action + queries are in fact share the very same Range Table, + They only differs in their target lists and join trees + */ + + + /*parse the action query, this will call + transformInsertStmt() which analyzes the VALUES list.*/ + actqry = transformStmt(pstate, (Node *)insertact); + + /*do the WHERE clause here, Since the + transformInsertStmt() function only analyzes + the VALUES list but not the WHERE clause*/ + actqry->jointree = makeFromExpr(pstate->p_joinlist, + transformWhereClause(pstate, + condact->condition, + "WHERE")); + + if(!IsA(actqry, Query) || + actqry->commandType != CMD_INSERT|| + actqry->utilityStmt != NULL) + elog(ERROR, "improper INSERT action in merge stmt"); + + actqry->isMergeAction = true; + actqry->matched = condact->match; + + return actqry; + } + break; + case T_MergeDoNothing: + { + MergeDoNothing *nothingact = (MergeDoNothing *)(condact->action); + + Assert(IsA(nothingact,MergeDoNothing)); + + actqry = makeNode(Query); + + actqry->jointree = makeFromExpr(pstate->p_joinlist, + transformWhereClause(pstate, + condact->condition, + "WHERE")); + + actqry->rtable = pstate->p_rtable; + + actqry->commandType = CMD_DONOTHING; + actqry->isMergeAction = true; + actqry->matched = condact->match; + + return actqry; + } + break; + case T_MergeError: + { + MergeError *erract = (MergeError *)(condact->action); + Assert(IsA(erract,MergeError)); + + actqry = makeNode(Query); + + actqry->jointree = makeFromExpr(pstate->p_joinlist, + transformWhereClause(pstate, + condact->condition, + "WHERE")); + + actqry->rtable = pstate->p_rtable; + + actqry->commandType = CMD_RAISEERR; + actqry->isMergeAction = true; + actqry->matched = condact->match; + + return actqry; + } + break; + default: + elog(ERROR, "unknown MERGE action type %d", condact->action->type); + break; + + } + + /*never comes here*/ + return NULL; +} + +static Query * +transformMergeStmt(ParseState *pstate, MergeStmt *stmt) +{ + Query *qry; + + ColumnRef *starRef; + ResTarget *starResTarget; + ListCell *act; + ListCell *l; + JoinExpr *joinexp; + int rtindex; + MergeConditionAction *lastaction; + + /*The source list has only one element*/ + if(list_length(stmt->source) != 1) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("now we only accept merge command with only ONE source table"))); + + /*now, do the real tranformation of the merge command. */ + qry = makeNode(Query); + qry->commandType = CMD_MERGE; + + /* + What we are doing here is to create a query like + "SELECT * FROM LEFT JOIN ON ;" + + Note: + 1. we set the "match condition" as the join qualification. + The left join will scan both the matched and non-matched tuples. + + 2. a normal SELECT query has no "target relation". + But here we need to set the targe relation in query, + like the UPDATE/DELETE/INSERT queries. + So this is a left join SELECT with a "target table" in its range table. + + 3. We don't have a specific ACL level for Merge, here we just use + ACL_SELECT. + But we will add other ACL levels when handle each merge actions. + */ + + + /*before analyze the FROM clause, we need to set the target table. + We cannot call setTargetTable() function directly. + We only need the lock target relation, without adding it to Range table. + */ + setTargetTableLock(pstate, stmt->relation); + + /*create the FROM clause. Make the join expression first*/ + joinexp = makeNode(JoinExpr); + joinexp->jointype = JOIN_LEFT; + joinexp->isNatural = FALSE; + /*source list has only one element*/ + joinexp->larg = linitial(stmt->source); + joinexp->rarg = (Node *)stmt->relation; + /*match condtion*/ + joinexp->quals = stmt->matchCondition; + + /*transform the FROM clause. The target relation and + source relation will be add to Rtable here. */ + transformFromClause(pstate, list_make1(joinexp)); + + /*the targetList of the main query is "*" */ + starRef = makeNode(ColumnRef); + starRef->fields = list_make1(makeNode(A_Star)); + starRef->location = 1; + + starResTarget = makeNode(ResTarget); + starResTarget->name = NULL; + starResTarget->indirection = NIL; + starResTarget->val = (Node *)starRef; + starResTarget->location = 1; + + qry->targetList = transformTargetList(pstate, list_make1(starResTarget)); + + /*we don't need the WHERE clause here. Set it null. */ + qry->jointree = makeFromExpr(pstate->p_joinlist, NULL); + + /*now , we find out the RTE for the target relation, + and do some unfinished jobs*/ + rtindex = 1; + foreach(l, pstate->p_rtable) + { + RangeTblEntry *rte = (RangeTblEntry *)lfirst(l); + if(rte->relid == pstate->p_target_relation->rd_id) + { + /*find the RTE*/ + pstate->p_target_rangetblentry = rte; + rte->requiredPerms = ACL_SELECT; + qry->resultRelation = rtindex; + break; + } + rtindex++; + } + + if(pstate->p_target_rangetblentry == NULL) + elog(ERROR, "cannot find the RTE for target table"); + + + qry->rtable = pstate->p_rtable; + + qry->hasSubLinks = pstate->p_hasSubLinks; + + /* + * Top-level aggregates are simply disallowed in MERGE + */ + if (pstate->p_hasAggs) + ereport(ERROR, + (errcode(ERRCODE_GROUPING_ERROR), + errmsg("cannot use aggregate function in top level of MERGE"), + parser_errposition(pstate, + locate_agg_of_level((Node *) qry, 0)))); + if (pstate->p_hasWindowFuncs) + ereport(ERROR, + (errcode(ERRCODE_WINDOWING_ERROR), + errmsg("cannot use window function in MERGE"), + parser_errposition(pstate, + locate_windowfunc((Node *) qry)))); + +#if 0 + /* + the main query is done. Ready for tansform actions. + + Firstly, we check the last action of the action list. + If it is not a DO NOTING action, we need to generate + an INSERT DEFAULT VALUES action and append it to action list. + */ + lastaction = (MergeConditionAction *)llast(stmt->actions); + + if(lastaction->action == NULL) + { + /* + we have a do nothing action here, + What we need to do is just delete it from action list + */ + stmt->actions = list_truncate(stmt->actions, + list_length(stmt->actions) - 1); + } + else + { + /* + The last action is no the DO NOTHING action, + we need to generate an INSERT action. + */ + lastaction = makeNode(MergeConditionAction); + + lastaction->condition = NULL; + lastaction->match = NOT; + lastaction->action = makeNode(MergeInsert); + + /*nothing need to be filled into the node*/ + + stmt->actions = lappend(stmt->actions, lastaction); + } +#endif + + /* + For each actions ,we transform it to a seperate query. + the action queries shares the exactly same + range table with the main query. + + In other words, in the extra condtions of the sub actions, + we don't allow involvement of new tables + */ + qry->mergeActQry = NIL; + + foreach(act,stmt->actions) + { + MergeConditionAction *mca = (MergeConditionAction *)lfirst(act); + Query *actqry; + + /*transform the act (and its condition) as a single query. */ + actqry = transformMergeActions(pstate, stmt, mca); + + /*since we don't invoke setTargetTable() in transformMergeActions(), + we need to set actqry->resultRelation here + */ + actqry->resultRelation = qry->resultRelation; + + /*put it into the list*/ + qry->mergeActQry = lappend(qry->mergeActQry, actqry); + } + + return qry; +} diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 3f6eeeb..06b9c81 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -212,6 +212,10 @@ static TypeName *TableFuncTypeName(List *columns); DropOwnedStmt ReassignOwnedStmt AlterTSConfigurationStmt AlterTSDictionaryStmt +%type MergeStmt opt_and_condition merge_condition_action merge_action +%type opt_not +%type merge_condition_action_list + %type select_no_parens select_with_parens select_clause simple_select values_clause @@ -504,6 +508,8 @@ static TypeName *TableFuncTypeName(List *columns); MAPPING MATCH MAXVALUE MINUTE_P MINVALUE MODE MONTH_P MOVE + MATCHED MERGE RAISE ERROR_P + NAME_P NAMES NATIONAL NATURAL NCHAR NEXT NO NOCREATEDB NOCREATEROLE NOCREATEUSER NOINHERIT NOLOGIN_P NONE NOSUPERUSER NOT NOTHING NOTIFY NOTNULL NOWAIT NULL_P NULLIF NULLS_P NUMERIC @@ -725,6 +731,7 @@ stmt : | ListenStmt | LoadStmt | LockStmt + | MergeStmt | NotifyStmt | PrepareStmt | ReassignOwnedStmt @@ -6952,6 +6959,7 @@ ExplainableStmt: | InsertStmt | UpdateStmt | DeleteStmt + | MergeStmt | DeclareCursorStmt | CreateAsStmt | ExecuteStmt /* by default all are $$=$1 */ @@ -7297,6 +7305,114 @@ set_target_list: /***************************************************************************** * * QUERY: + * MERGE STATEMENT + * + *****************************************************************************/ + + +MergeStmt: + MERGE INTO relation_expr_opt_alias + USING table_ref + ON a_expr + merge_condition_action_list + { + MergeStmt *m = makeNode(MergeStmt); + + m->relation = $3; + + /*although we have only one USING table, + we still make it a list, maybe in future + we will allow mutliple USING tables.*/ + m->matchCondition = $7; + m->source = list_make1($5); + m->actions = $8; + + $$ = (Node *)m; + } + ; + +merge_condition_action_list: + merge_condition_action + { $$ = list_make1($1); } + | merge_condition_action_list merge_condition_action + { $$ = lappend($1,$2); } + ; + +merge_condition_action: + WHEN opt_not MATCHED opt_and_condition THEN merge_action + { + MergeConditionAction *m = makeNode(MergeConditionAction); + + m->match = $2; + m->condition = $4; + m->action = $6; + + $$ = (Node *)m; + } + ; + + +opt_and_condition: + AND a_expr {$$ = $2;} + | /*EMPTY*/ {$$ = NULL;} + ; + +opt_not: + NOT {$$ = false;} + | /*EMPTY*/ {$$ = true;} + ; + +merge_action: + DELETE_P + { + $$ = (Node *)makeNode(MergeDelete); + } + | UPDATE SET set_clause_list + { + UpdateStmt *n = makeNode(MergeUpdate); + n->targetList = $3; + $$ = (Node *)n; + } + | INSERT values_clause + { + InsertStmt *n = makeNode(MergeInsert); + n->cols = NIL; + n->selectStmt = $2; + + $$ = (Node *)n; + } + + | INSERT '(' insert_column_list ')' values_clause + { + InsertStmt *n = makeNode(MergeInsert); + n->cols = $3; + n->selectStmt = $5; + + $$ = (Node *)n; + } + | INSERT DEFAULT VALUES + { + InsertStmt *n = makeNode(MergeInsert); + n->cols = NIL; + n->selectStmt = NULL; + + $$ = (Node *)n; + } + | DO NOTHING + { + $$ = makeNode(MergeDoNothing); + } + | RAISE ERROR_P + { + $$ = makeNode(MergeError); + } + ; + + + +/***************************************************************************** + * + * QUERY: * CURSOR STATEMENTS * *****************************************************************************/ @@ -10882,6 +10998,7 @@ unreserved_keyword: | ENCODING | ENCRYPTED | ENUM_P + | ERROR_P | ESCAPE | EXCLUDE | EXCLUDING @@ -10935,7 +11052,9 @@ unreserved_keyword: | LOGIN_P | MAPPING | MATCH + | MATCHED | MAXVALUE + | MERGE | MINUTE_P | MINVALUE | MODE @@ -10977,6 +11096,7 @@ unreserved_keyword: | PROCEDURAL | PROCEDURE | QUOTE + | RAISE | RANGE | READ | REASSIGN diff --git a/src/backend/parser/parse_clause.c b/src/backend/parser/parse_clause.c index f30132a..8876f73 100644 --- a/src/backend/parser/parse_clause.c +++ b/src/backend/parser/parse_clause.c @@ -213,6 +213,25 @@ setTargetTable(ParseState *pstate, RangeVar *relation, return rtindex; } +void +setTargetTableLock(ParseState *pstate, RangeVar *relation) +{ + + /* Close old target; this could only happen for multi-action rules */ + if (pstate->p_target_relation != NULL) + heap_close(pstate->p_target_relation, NoLock); + + /* + * Open target rel and grab suitable lock (which we will hold till end of + * transaction). + * + * free_parsestate() will eventually do the corresponding heap_close(), + * but *not* release the lock. + */ + pstate->p_target_relation = parserOpenTable(pstate, relation, + RowExclusiveLock); +} + /* * Simplify InhOption (yes/no/default) into boolean yes/no. * diff --git a/src/backend/rewrite/rewriteHandler.c b/src/backend/rewrite/rewriteHandler.c index 25b44dd..3ee0428 100644 --- a/src/backend/rewrite/rewriteHandler.c +++ b/src/backend/rewrite/rewriteHandler.c @@ -1836,6 +1836,41 @@ RewriteQuery(Query *parsetree, List *rewrite_events) return rewritten; } +/*if the merge action type has already been processed by rewriter*/ +#define insert_rewrite (1<<0) +#define delete_rewrite (1<<1) +#define update_rewrite (1<<2) + +/*if the merge action type is fully replace by rules.*/ +#define insert_instead (1<<3) +#define delete_instead (1<<4) +#define update_instead (1<<5) + +#define merge_action_already_rewrite(acttype, flag) \ + ((acttype == CMD_INSERT && (flag & insert_rewrite)) || \ + (acttype == CMD_UPDATE && (flag & update_rewrite)) || \ + (acttype == CMD_DELETE && (flag & delete_rewrite))) + +#define set_action_rewrite(acttype, flag) \ + if(acttype == CMD_INSERT) \ + {flag |= insert_rewrite;}\ + else if(acttype == CMD_UPDATE) \ + {flag |= update_rewrite;}\ + else if(acttype == CMD_DELETE) \ + {flag |= delete_rewrite;} + +#define merge_action_instead(acttype, flag) \ + ((acttype == CMD_INSERT && (flag & insert_instead)) || \ + (acttype == CMD_UPDATE && (flag & update_instead)) || \ + (acttype == CMD_DELETE && (flag & delete_instead))) + +#define set_action_instead(acttype, flag)\ + if(acttype == CMD_INSERT) \ + {flag |= insert_instead;}\ + else if(acttype == CMD_UPDATE) \ + {flag |= update_instead;}\ + else if(acttype == CMD_DELETE) \ + {flag |= delete_instead;} /* * QueryRewrite - @@ -1861,7 +1896,151 @@ QueryRewrite(Query *parsetree) * * Apply all non-SELECT rules possibly getting 0 or many queries */ - querylist = RewriteQuery(parsetree, NIL); + if(parsetree->commandType == CMD_MERGE) + { + /* + *for merge query, we have a set of action queries (not subquery). + *each of these action queries should be applied to RewriteQuery(). + */ + ListCell *l; + + int flag = 0; + + List *pre_qry = NIL; + List *post_qry = NIL; + + + querylist = NIL; + + + /*rewrite the merge action queries one by one.*/ + foreach(l, parsetree->mergeActQry) + { + List *queryList4action = NIL; + Query *actionqry; + Query *q; + + + actionqry = lfirst(l); + + /* + * no rewriting for DO NOTHING or ERROR + */ + if(actionqry->commandType == CMD_DONOTHING || + actionqry->commandType == CMD_RAISEERR) + continue; + + + /* + *if this kind of actions are fully replaced by rules, + *we mark it as "replaced" + */ + if(merge_action_instead(actionqry->commandType, flag)) + { + /* + *Still need to call RewriteQuery(), + *since we need the process on target list and so on. + *BUT, the returned list is discarded + */ + RewriteQuery(actionqry, NIL); + actionqry->replaced = true; + continue; + } + + + /*if this kind of actions are already processed by rewriter, skip it.*/ + if(merge_action_already_rewrite(actionqry->commandType, flag)) + { + RewriteQuery(actionqry, NIL); + continue; + } + + /*ok this action has not been processed before, let's do it now.*/ + queryList4action = RewriteQuery(actionqry, NIL); + + /*this kind of actions has been processed, set the flag*/ + set_action_rewrite(actionqry->commandType,flag); + + /*if the returning list is nil, this merge action + is replaced by a do-nothing rule*/ + if(queryList4action == NIL) + { + /*set the flag for other merge actions of the same type*/ + set_action_instead(actionqry->commandType, flag); + actionqry->replaced = true; + continue; + } + + /* + * if the rewriter return a non-NIL list, the merge action query + *could be one element in it. + *if so, it must be the head (for INSERT acton) + *or tail (for UPDATE/DELETE action). + */ + + /*test the list head*/ + q = (Query *)linitial(queryList4action); + if(q->querySource == QSRC_ORIGINAL) + { + /* + *the merge action is the head, the remaining part + *of the list are the queries generated by rules + *we put them in the post_qry list. + */ + if(querylist == NIL) + querylist = list_make1(parsetree); + + + queryList4action = list_delete_first(queryList4action); + post_qry = list_concat(post_qry,queryList4action); + + continue; + + } + + /*test the list tail*/ + q = (Query *)llast(queryList4action); + if(q->querySource == QSRC_ORIGINAL) + { + /*the merge action is the tail. + Put the rule queries in pre_qry list*/ + if(querylist == NIL) + querylist = list_make1(parsetree); + + queryList4action = list_truncate(queryList4action,list_length(queryList4action)-1); + + pre_qry = list_concat(pre_qry,queryList4action); + continue; + + } + + /*here, the merge action query is not in the rewriten query list, + *which means the action is replaced by INSTEAD rule(s). + *We need to mark it as "replaced". + + For a INSERT action, we put the rule queries in the post list + otherwise, in the pre list + */ + if(actionqry->commandType == CMD_INSERT) + post_qry = list_concat(post_qry,queryList4action); + else + pre_qry = list_concat(pre_qry,queryList4action); + + set_action_instead(actionqry->commandType, flag); + actionqry->replaced = true; + } + + /*finally, put the 3 lists into one. + *If all the merge actions are replaced by rules, + *the original merge query + *will not be involved in the querylist. + */ + querylist = list_concat(pre_qry,querylist); + querylist = list_concat(querylist, post_qry); + + } + else + querylist = RewriteQuery(parsetree, NIL); /* * Step 2 diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c index 8ad4915..0dc3117 100644 --- a/src/backend/tcop/pquery.c +++ b/src/backend/tcop/pquery.c @@ -225,6 +225,10 @@ ProcessQuery(PlannedStmt *plan, snprintf(completionTag, COMPLETION_TAG_BUFSIZE, "DELETE %u", queryDesc->estate->es_processed); break; + case CMD_MERGE: + snprintf(completionTag, COMPLETION_TAG_BUFSIZE, + "MERGE %u", queryDesc->estate->es_processed); + break; default: strcpy(completionTag, "???"); break; diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 8960246..2733e5d 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -125,6 +125,7 @@ CommandIsReadOnly(Node *parsetree) case CMD_UPDATE: case CMD_INSERT: case CMD_DELETE: + case CMD_MERGE: return false; default: elog(WARNING, "unrecognized commandType: %d", @@ -1398,6 +1399,10 @@ CreateCommandTag(Node *parsetree) tag = "SELECT"; break; + case T_MergeStmt: + tag = "MERGE"; + break; + /* utility statements --- same whether raw or cooked */ case T_TransactionStmt: { @@ -2235,6 +2240,7 @@ GetCommandLogLevel(Node *parsetree) case T_InsertStmt: case T_DeleteStmt: case T_UpdateStmt: + case T_MergeStmt: lev = LOGSTMT_MOD; break; diff --git a/src/include/commands/trigger.h b/src/include/commands/trigger.h index 98bc0c6..be4ad6e 100644 --- a/src/include/commands/trigger.h +++ b/src/include/commands/trigger.h @@ -163,6 +163,8 @@ extern void ExecBSTruncateTriggers(EState *estate, ResultRelInfo *relinfo); extern void ExecASTruncateTriggers(EState *estate, ResultRelInfo *relinfo); +extern void ExecBSMergeTriggers(ModifyTableState *mt_state); +extern void ExecASMergeTriggers(ModifyTableState *mt_state); extern void AfterTriggerBeginXact(void); extern void AfterTriggerBeginQuery(void); diff --git a/src/include/executor/nodeModifyTable.h b/src/include/executor/nodeModifyTable.h index 67ba3e8..422e3ce 100644 --- a/src/include/executor/nodeModifyTable.h +++ b/src/include/executor/nodeModifyTable.h @@ -16,6 +16,7 @@ #include "nodes/execnodes.h" extern ModifyTableState *ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags); +extern MergeActionState *ExecInitMergeAction(MergeAction *node, EState *estate, int eflags); extern TupleTableSlot *ExecModifyTable(ModifyTableState *node); extern void ExecEndModifyTable(ModifyTableState *node); extern void ExecReScanModifyTable(ModifyTableState *node); diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 09fdb5d..3013f13 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1031,9 +1031,22 @@ typedef struct ModifyTableState int mt_whichplan; /* which one is being executed (0..n-1) */ EPQState mt_epqstate; /* for evaluating EvalPlanQual rechecks */ bool fireBSTriggers; /* do we need to fire stmt triggers? */ + List *mergeActPstates; /*the list of the planstate of meger command actions. + NIL if this is not a merge command. + The elements if it are still ModifyTableState nodes*/ } ModifyTableState; /* ---------------- + * MergeActionState information + * ---------------- + */ +typedef struct MergeActionState +{ + PlanState ps; /* its first field is NodeTag */ + CmdType operation; +} MergeActionState; + +/* ---------------- * AppendState information * * nplans how many plans are in the array diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index a5f5df5..a840349 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -44,6 +44,7 @@ typedef enum NodeTag T_Plan = 100, T_Result, T_ModifyTable, + T_MergeAction, T_Append, T_RecursiveUnion, T_BitmapAnd, @@ -86,6 +87,7 @@ typedef enum NodeTag T_PlanState = 200, T_ResultState, T_ModifyTableState, + T_MergeActionState, T_AppendState, T_RecursiveUnionState, T_BitmapAndState, @@ -347,6 +349,13 @@ typedef enum NodeTag T_AlterUserMappingStmt, T_DropUserMappingStmt, T_AlterTableSpaceOptionsStmt, + T_MergeStmt, + T_MergeConditionAction, + T_MergeUpdate, + T_MergeDelete, + T_MergeInsert, + T_MergeDoNothing, + T_MergeError, /* * TAGS FOR PARSE TREE NODES (parsenodes.h) @@ -511,6 +520,9 @@ typedef enum CmdType CMD_UPDATE, /* update stmt */ CMD_INSERT, /* insert stmt */ CMD_DELETE, + CMD_MERGE, /*merge stmt*/ + CMD_DONOTHING, + CMD_RAISEERR, CMD_UTILITY, /* cmds like create, destroy, copy, vacuum, * etc. */ CMD_NOTHING /* dummy command for instead nothing rules diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index b591073..5f329ef 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -146,6 +146,12 @@ typedef struct Query Node *setOperations; /* set-operation tree if this is top level of * a UNION/INTERSECT/EXCEPT query */ + /*the fileds for merge actions*/ + bool isMergeAction; /*if this query is a merge action. */ + bool matched; /*this is a MATCHED action or NOT*/ + bool replaced; /*is this merge action replaced by rules*/ + List *mergeActQry; /* the list of all the merge actions. + * used only for merge query statment*/ } Query; @@ -990,6 +996,58 @@ typedef struct SelectStmt /* Eventually add fields for CORRESPONDING spec here */ } SelectStmt; +/*The structure for MERGE command statement*/ +typedef struct MergeStmt +{ + NodeTag type; + RangeVar *relation; /*targe relation for merge */ + + /* source relations for the merge. + *Currently, we only allwo single-source merge, + *so the length of this list should always be 1 + */ + List *source; + Node *matchCondition; /* qualifications of the merge*/ + + /*list of MergeConditionAction structure. + *It stores all the matched / not-matched + *conditions and the corresponding actions + *The elments of this list are MergeConditionAction + *nodes + */ + List *actions; + +}MergeStmt; + +/* the structure for the actions of MERGE command. +* Holds info of the clauses like +* "WHEN MATCHED AND ... THEN UPDATE/DELETE/INSERT" +*/ +typedef struct MergeConditionAction +{ + NodeTag type; + bool match; /*match or not match*/ + Node *condition;/*the AND condition for this action*/ + Node *action; /*the actions: delete , insert or update*/ +}MergeConditionAction; + +/* +* The merge action nodes are in fact the +* ordinary nodes of UPDATE,DELETE and INSERT +*/ +typedef UpdateStmt MergeUpdate; +typedef DeleteStmt MergeDelete; +typedef InsertStmt MergeInsert; + +typedef struct MergeDoNothing +{ + NodeTag type; +}MergeDoNothing; + +typedef struct MergeError +{ + NodeTag type; +}MergeError; /* ---------------------- * Set Operation node for post-analysis query trees diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 037bc0b..a020051 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -169,9 +169,25 @@ typedef struct ModifyTable List *returningLists; /* per-target-table RETURNING tlists */ List *rowMarks; /* PlanRowMarks (non-locking only) */ int epqParam; /* ID of Param for EvalPlanQual re-eval */ + List *mergeActPlan; /*the plans for merge actions, + which are also ModifyTable nodes*/ } ModifyTable; /* ---------------- + * MergeAction node - + * The plan node for the actions of MERGE command + * ---------------- + */ +typedef struct MergeAction +{ + Plan plan; + bool replaced; /*if this action is replaced by INSTEAD rules*/ + CmdType operation;/* INSERT, UPDATE, or DELETE */ + bool matched; + List *flattenedqual; /*the flattened qual expression of action*/ +}MergeAction; + +/* ---------------- * Append node - * Generate the concatenation of the results of sub-plans. * ---------------- diff --git a/src/include/optimizer/var.h b/src/include/optimizer/var.h index b0e04a0..4d6c9e8 100644 --- a/src/include/optimizer/var.h +++ b/src/include/optimizer/var.h @@ -15,6 +15,7 @@ #define VAR_H #include "nodes/relation.h" +#include "nodes/plannodes.h" typedef enum { @@ -32,5 +33,5 @@ extern int locate_var_of_relation(Node *node, int relid, int levelsup); extern int find_minimum_var_level(Node *node); extern List *pull_var_clause(Node *node, PVCPlaceHolderBehavior behavior); extern Node *flatten_join_alias_vars(PlannerInfo *root, Node *node); - +extern void push_up_merge_action_vars(MergeAction * actplan,Query * actqry); #endif /* VAR_H */ diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index 49d4b6c..91ddcb6 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -141,6 +141,7 @@ PG_KEYWORD("encoding", ENCODING, UNRESERVED_KEYWORD) PG_KEYWORD("encrypted", ENCRYPTED, UNRESERVED_KEYWORD) PG_KEYWORD("end", END_P, RESERVED_KEYWORD) PG_KEYWORD("enum", ENUM_P, UNRESERVED_KEYWORD) +PG_KEYWORD("error", ERROR_P, UNRESERVED_KEYWORD) PG_KEYWORD("escape", ESCAPE, UNRESERVED_KEYWORD) PG_KEYWORD("except", EXCEPT, RESERVED_KEYWORD) PG_KEYWORD("exclude", EXCLUDE, UNRESERVED_KEYWORD) @@ -229,7 +230,9 @@ PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD) PG_KEYWORD("login", LOGIN_P, UNRESERVED_KEYWORD) PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD) PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD) +PG_KEYWORD("matched", MATCHED, UNRESERVED_KEYWORD) PG_KEYWORD("maxvalue", MAXVALUE, UNRESERVED_KEYWORD) +PG_KEYWORD("merge", MERGE, UNRESERVED_KEYWORD) PG_KEYWORD("minute", MINUTE_P, UNRESERVED_KEYWORD) PG_KEYWORD("minvalue", MINVALUE, UNRESERVED_KEYWORD) PG_KEYWORD("mode", MODE, UNRESERVED_KEYWORD) @@ -295,6 +298,7 @@ PG_KEYWORD("privileges", PRIVILEGES, UNRESERVED_KEYWORD) PG_KEYWORD("procedural", PROCEDURAL, UNRESERVED_KEYWORD) PG_KEYWORD("procedure", PROCEDURE, UNRESERVED_KEYWORD) PG_KEYWORD("quote", QUOTE, UNRESERVED_KEYWORD) +PG_KEYWORD("raise", RAISE, UNRESERVED_KEYWORD) PG_KEYWORD("range", RANGE, UNRESERVED_KEYWORD) PG_KEYWORD("read", READ, UNRESERVED_KEYWORD) PG_KEYWORD("real", REAL, COL_NAME_KEYWORD) diff --git a/src/include/parser/parse_clause.h b/src/include/parser/parse_clause.h index f3d3ee9..b54f530 100644 --- a/src/include/parser/parse_clause.h +++ b/src/include/parser/parse_clause.h @@ -19,6 +19,7 @@ extern void transformFromClause(ParseState *pstate, List *frmList); extern int setTargetTable(ParseState *pstate, RangeVar *relation, bool inh, bool alsoSource, AclMode requiredPerms); +extern void setTargetTableLock(ParseState *pstate, RangeVar *relation); extern bool interpretInhOption(InhOption inhOpt); extern bool interpretOidsOption(List *defList);