diff --git a/src/backend/access/common/heaptuple.c b/src/backend/access/common/heaptuple.c index 6ec73f0..16b0613 100644 --- a/src/backend/access/common/heaptuple.c +++ b/src/backend/access/common/heaptuple.c @@ -1142,7 +1142,7 @@ slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull) if (attnum <= 0) { if (tuple == NULL) /* internal error */ - elog(ERROR, "cannot extract system attribute from virtual tuple"); + elog(ERROR, "cannot extract system attribute from virtual tuple for attr %d", attnum); if (tuple == &(slot->tts_minhdr)) /* internal error */ elog(ERROR, "cannot extract system attribute from minimal tuple"); return heap_getsysattr(tuple, attnum, tupleDesc, isnull); diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 2f33fdb..0326555 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -171,6 +171,7 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) case CMD_INSERT: case CMD_DELETE: case CMD_UPDATE: + case CMD_MERGE: estate->es_output_cid = GetCurrentCommandId(true); break; @@ -795,6 +796,8 @@ InitPlan(QueryDesc *queryDesc, int eflags) i++; } + + /* * Initialize the private state information for all the nodes in the query * tree. This opens files, allocates storage and leaves us ready to start diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index f4cc7d9..2d1a4e7 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -153,6 +153,11 @@ ExecInitNode(Plan *node, EState *estate, int eflags) estate, eflags); break; + case T_MergeAction: + result = (PlanState *) ExecInitMergeAction((MergeAction*) node, + estate, eflags); + break; + case T_Append: result = (PlanState *) ExecInitAppend((Append *) node, estate, eflags); diff --git a/src/backend/executor/execQual.c b/src/backend/executor/execQual.c index 9808274..6cdab90 100644 --- a/src/backend/executor/execQual.c +++ b/src/backend/executor/execQual.c @@ -5237,7 +5237,6 @@ ExecProject(ProjectionInfo *projInfo, ExprDoneCond *isDone) /* Assume single result row until proven otherwise */ if (isDone) *isDone = ExprSingleResult; - /* * Clear any former contents of the result slot. This makes it safe for * us to use the slot's Datum/isnull arrays as workspace. (Also, we can @@ -5245,6 +5244,7 @@ ExecProject(ProjectionInfo *projInfo, ExprDoneCond *isDone) */ ExecClearTuple(slot); + /* * Force extraction of all input values that we'll need. The * Var-extraction loops below depend on this, and we are also prefetching @@ -5260,6 +5260,7 @@ ExecProject(ProjectionInfo *projInfo, ExprDoneCond *isDone) slot_getsomeattrs(econtext->ecxt_scantuple, projInfo->pi_lastScanVar); + /* * Assign simple Vars to result by direct extraction of fields from source * slots ... a mite ugly, but fast ... @@ -5304,6 +5305,7 @@ ExecProject(ProjectionInfo *projInfo, ExprDoneCond *isDone) } } + /* * If there are any generic expressions, evaluate them. It's possible * that there are set-returning functions in such expressions; if so and @@ -5321,6 +5323,7 @@ ExecProject(ProjectionInfo *projInfo, ExprDoneCond *isDone) return slot; /* no more result rows, return empty slot */ } + /* * Successfully formed a result row. Mark the result slot as containing a * valid virtual tuple. diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c index 877abf4..0322616 100644 --- a/src/backend/executor/execUtils.c +++ b/src/backend/executor/execUtils.c @@ -445,7 +445,6 @@ ExecAssignResultTypeFromTL(PlanState *planstate) { bool hasoid; TupleDesc tupDesc; - if (ExecContextForcesOids(planstate, &hasoid)) { /* context forces OID choice; hasoid is now set correctly */ diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 8619ce3..fe477b6 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -583,6 +583,103 @@ lreplace:; } +static TupleTableSlot * +ExecMerge(ItemPointer tupleid, + TupleTableSlot *slot, + TupleTableSlot *planSlot, + ModifyTableState *node, + EState *estate) +{ + + TupleTableSlot *actslot = NULL; + TupleTableSlot *res = NULL; + ListCell *each; + + foreach(each, node->mergeActPstates) + { + ModifyTableState *actpstate; + + PlanState *pstate; + + actpstate= (ModifyTableState *)lfirst(each); + + Assert(actpstate->mt_nplans == 1); + + pstate = actpstate->mt_plans[0]; + + + + + /* + * If tupleid == NULL, then it is a NOT MATCHED case, in which we can only do INSERT action + * If tupleid != NULL, then it is a MATCHED case, in which we can only do UPDATE or DELETE action + */ + + if((tupleid == NULL && actpstate->operation != CMD_INSERT) || + (tupleid != NULL && actpstate->operation == CMD_INSERT)) + { + continue; + } + + + ResetExprContext(pstate->ps_ExprContext); + + /* + If the action has an additional qual, which is not satisfied, skip it + */ + + if(pstate->qual && !ExecQual(pstate->qual, pstate->ps_ExprContext,false)) + { +zbxprint("the qual is not satisfied "); + + continue; + } + + /* + * Now we start to exec this action. Firstly , project the result tuple slot. + * The merge action result slot is projected in the same way as a returning slot. + * so we call ExecProcessReturning() directly here. + */ + actslot = ExecProcessReturning(pstate->ps_ProjInfo, slot, planSlot); + + + switch (actpstate->operation) + { + case CMD_INSERT: + res = ExecInsert(actslot, planSlot, estate); + break; + case CMD_UPDATE: + res = ExecUpdate(tupleid, + actslot, + planSlot, + &actpstate->mt_epqstate, + estate); + break; + case CMD_DELETE: + res = ExecDelete(tupleid, + planSlot, + &actpstate->mt_epqstate, + estate); + break; + default: + elog(ERROR, "unknown merge action type"); + break; + } + + + /* + * The choosen action is executed, no need to do the remain actions + */ + return res; + } + + /* + * Here, no action is taken. Let's pass this slot + */ + return NULL; + +} + /* * Process BEFORE EACH STATEMENT triggers */ @@ -603,6 +700,9 @@ fireBSTriggers(ModifyTableState *node) ExecBSDeleteTriggers(node->ps.state, node->ps.state->es_result_relations); break; + case CMD_MERGE: + printf("not sure about the BS triggers of merge\n"); + break; default: elog(ERROR, "unknown operation"); break; @@ -629,6 +729,9 @@ fireASTriggers(ModifyTableState *node) ExecASDeleteTriggers(node->ps.state, node->ps.state->es_result_relations); break; + case CMD_MERGE: + printf("not sure about the AS triggers of merge\n"); + break; default: elog(ERROR, "unknown operation"); break; @@ -654,7 +757,7 @@ ExecModifyTable(ModifyTableState *node) TupleTableSlot *planSlot; ItemPointer tupleid = NULL; ItemPointerData tuple_ctid; - +zbxprint("enter the ExecModifyTable\n"); /* * On first call, fire BEFORE STATEMENT triggers before proceeding. */ @@ -683,6 +786,7 @@ ExecModifyTable(ModifyTableState *node) for (;;) { planSlot = ExecProcNode(subplanstate); + zbxprint("ExecModifyTable get one tuple\n"); if (TupIsNull(planSlot)) { @@ -699,30 +803,53 @@ ExecModifyTable(ModifyTableState *node) else break; } +zbxprint("ExecModifyTable get one tuple 1\n"); EvalPlanQualSetSlot(&node->mt_epqstate, planSlot); slot = planSlot; - +print_slot(slot); if (junkfilter != NULL) { /* * extract the 'ctid' junk attribute. */ - if (operation == CMD_UPDATE || operation == CMD_DELETE) + if (operation == CMD_UPDATE || operation == CMD_DELETE || operation == CMD_MERGE) { Datum datum; bool isNull; + + + zbxprint("ExecModifyTable the junkattrno is %d\n", junkfilter->jf_junkAttNo); + + datum = ExecGetJunkAttribute(slot, junkfilter->jf_junkAttNo, &isNull); - /* shouldn't ever get a null result... */ + zbxprint("ExecModifyTable get one tuple 2\n"); + if (isNull) - elog(ERROR, "ctid is NULL"); - - tupleid = (ItemPointer) DatumGetPointer(datum); - tuple_ctid = *tupleid; /* be sure we don't free the ctid!! */ - tupleid = &tuple_ctid; + { + /* + * shouldn't ever get a null result for update and delete. + * Merge command will get a null ctid if in "NOT MATCHED" case + */ + if(operation != CMD_MERGE) + elog(ERROR, "ctid is NULL"); + } + else + { + zbxprint("ExecModifyTable get ctid the\n"); + + tupleid = (ItemPointer) DatumGetPointer(datum); + + zbxprint("ExecModifyTable get ctid is %d\n", tupleid); + tuple_ctid = *tupleid; /* be sure we don't free the ctid!! */ + tupleid = &tuple_ctid; + + zbxprint("ExecModifyTable get one tuple 2.4\n"); + } } + zbxprint("ExecModifyTable get one tuple 2.5\n"); /* * apply the junkfilter if needed. @@ -730,6 +857,7 @@ ExecModifyTable(ModifyTableState *node) if (operation != CMD_DELETE) slot = ExecFilterJunk(junkfilter, slot); } + zbxprint("ExecModifyTable get one tuple 3\n"); switch (operation) { @@ -744,6 +872,11 @@ ExecModifyTable(ModifyTableState *node) slot = ExecDelete(tupleid, planSlot, &node->mt_epqstate, estate); break; + case CMD_MERGE: + slot = ExecMerge(tupleid, slot, planSlot, + node, estate); + break; + default: elog(ERROR, "unknown operation"); break; @@ -771,6 +904,102 @@ ExecModifyTable(ModifyTableState *node) return NULL; } + + + + + + + +/* +* When init a merge plan, we also need init its action plans. +* However, these action plans are "pure" plans. We only want to handle the tlist and qual +* Thus, the return result is a plain "PlanState". We dont have a MeregeActionState kind of thing. +*/ +PlanState * +ExecInitMergeAction(MergeAction *node, EState *estate, int eflags) +{ + PlanState *result; + + /* + * do nothing when we get to the end of a leaf on tree. + */ + if (node == NULL) + return NULL; + + + /* + * create state structure + */ + result = makeNode(PlanState); + result->plan = (Plan *)node; + result->state = estate; + zbxprint("ExecInitMergeAction 3\n"); + + /* + * tuple table initialization + */ + ExecInitResultTupleSlot(estate, result); + + + + zbxprint("ExecInitMergeAction 1\n"); + /* + * initialize tuple type and projection info + */ + ExecAssignResultTypeFromTL(result); + zbxprint("ExecInitMergeAction 2\n"); + + + + + + /* + * Miscellaneous initialization + * + * create expression context for node + */ + + ExecAssignExprContext(estate, result); + zbxprint("ExecInitMergeAction 4\n"); + + + /* + * initialize child expressions + */ + result->targetlist = (List *) + ExecInitExpr((Expr *) node->plan.targetlist, result); + + + zbxprint("ExecInitMergeAction 5\n"); + result->qual = (List *) + ExecInitExpr((Expr *) node->plan.qual, result); + + + zbxprint("ExecInitMergeAction 6\n"); + /* + * init the projection information + */ + ExecAssignProjectionInfo(result, NULL); + +/* +printf("before check plan output for merge actio \n"); +if(estate->es_result_relation_info == NULL) + printf("re infro is null\n"); + +if(estate->es_result_relation_info->ri_RelationDesc == NULL) + printf("r relation desc is nUll\n"); + ExecCheckPlanOutput(estate->es_result_relation_info->ri_RelationDesc, + node->targetlist); +printf("AFTER check plan output for merge actio \n"); +*/ + return result; +} + + + + + /* ---------------------------------------------------------------- * ExecInitModifyTable * ---------------------------------------------------------------- @@ -786,7 +1015,9 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) Plan *subplan; ListCell *l; int i; + bool isMergeAction = false; + /* check for unsupported flags */ Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); @@ -797,7 +1028,6 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) */ if (estate->es_epqTuple != NULL) elog(ERROR, "ModifyTable should not be called during EvalPlanQual"); - /* * create state structure */ @@ -826,12 +1056,20 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) foreach(l, node->plans) { subplan = (Plan *) lfirst(l); + if(IsA(subplan, MergeAction)) + isMergeAction = true; + zbxprint("ExecInitModifyTable 2.4\n"); + mtstate->mt_plans[i] = ExecInitNode(subplan, estate, eflags); + zbxprint("ExecInitModifyTable2.5\n"); + estate->es_result_relation_info++; i++; } + estate->es_result_relation_info = NULL; + /* select first subplan */ mtstate->mt_whichplan = 0; subplan = (Plan *) linitial(node->plans); @@ -955,7 +1193,9 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) break; case CMD_UPDATE: case CMD_DELETE: - junk_filter_needed = true; + case CMD_MERGE: + if(!isMergeAction) + junk_filter_needed = true; break; default: elog(ERROR, "unknown operation"); @@ -973,15 +1213,16 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) if (operation == CMD_INSERT || operation == CMD_UPDATE) ExecCheckPlanOutput(resultRelInfo->ri_RelationDesc, subplan->targetlist); - + j = ExecInitJunkFilter(subplan->targetlist, resultRelInfo->ri_RelationDesc->rd_att->tdhasoid, ExecInitExtraTupleSlot(estate)); - if (operation == CMD_UPDATE || operation == CMD_DELETE) + if (operation == CMD_UPDATE || operation == CMD_DELETE || operation == CMD_MERGE) { /* For UPDATE/DELETE, find the ctid junk attr now */ j->jf_junkAttNo = ExecFindJunkAttribute(j, "ctid"); + if (!AttributeNumberIsValid(j->jf_junkAttNo)) elog(ERROR, "could not find junk ctid column"); } @@ -1006,6 +1247,22 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) if (estate->es_trig_tuple_slot == NULL) estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate); + + /* + * for the merge actions, we need to similar things as above + */ + zbxprint("ExecInitModifyTable last step , init the action nodes\n"); + + foreach(l, node->mergeActPlan) + { + PlanState *actpstate = ExecInitNode((Plan *)lfirst(l), estate, 0); + /* + * put the pstates of each action into ModifyTableState + */ + mtstate->mergeActPstates = lappend(mtstate->mergeActPstates, actpstate); + + } + return mtstate; } diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 93dcef5..f0be5e8 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -176,6 +176,7 @@ _copyModifyTable(ModifyTable *from) COPY_NODE_FIELD(returningLists); COPY_NODE_FIELD(rowMarks); COPY_SCALAR_FIELD(epqParam); + COPY_NODE_FIELD(mergeActPlan); return newnode; } @@ -2273,6 +2274,10 @@ _copyQuery(Query *from) COPY_NODE_FIELD(rowMarks); COPY_NODE_FIELD(setOperations); + COPY_SCALAR_FIELD(isMergeAction); + /*merge action list*/ + COPY_NODE_FIELD(mergeActQry); + return newnode; } @@ -2343,6 +2348,64 @@ _copySelectStmt(SelectStmt *from) return newnode; } + +static MergeStmt * +_copyMergeStmt(MergeStmt *from) +{ + MergeStmt *newnode = makeNode(MergeStmt); + + COPY_NODE_FIELD(relation); + COPY_NODE_FIELD(source); + COPY_NODE_FIELD(matchCondition); + COPY_NODE_FIELD(actions); + + return newnode; + +} + + +static MergeConditionAction * +_copyMergeConditionAction(MergeConditionAction *from) +{ + MergeConditionAction *newnode = makeNode(MergeConditionAction); + + COPY_SCALAR_FIELD(match); + COPY_NODE_FIELD(condition); + COPY_NODE_FIELD(action); + + return newnode; +} + +static MergeUpdate * +_copyMergeUpdate(MergeUpdate *from) +{ + MergeUpdate *newNode = (MergeUpdate *)_copyUpdateStmt((UpdateStmt *) from); + newNode->type = T_MergeUpdate; + + return newNode; +} + +static MergeInsert * +_copyMergeInsert(MergeInsert *from) +{ + MergeInsert *newNode = (MergeInsert *)_copyInsertStmt((InsertStmt *) from); + newNode->type = T_MergeInsert; + + return newNode; +} + + +static MergeDelete * +_copyMergeDelete(MergeDelete *from) +{ + MergeDelete *newNode = (MergeDelete *)_copyDeleteStmt((DeleteStmt *) from); + newNode->type = T_MergeDelete; + + return newNode; +} + + + static SetOperationStmt * _copySetOperationStmt(SetOperationStmt *from) { @@ -3902,7 +3965,22 @@ copyObject(void *from) break; case T_SelectStmt: retval = _copySelectStmt(from); - break; + break; + case T_MergeStmt: + retval = _copyMergeStmt(from); + break; + case T_MergeConditionAction: + retval = _copyMergeConditionAction(from); + break; + case T_MergeUpdate: + retval = _copyMergeUpdate(from); + break; + case T_MergeInsert: + retval = _copyMergeInsert(from); + break; + case T_MergeDelete: + retval = _copyMergeDelete(from); + break; case T_SetOperationStmt: retval = _copySetOperationStmt(from); break; diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 5d83727..572f6df 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -878,6 +878,7 @@ _equalQuery(Query *a, Query *b) COMPARE_NODE_FIELD(rowMarks); COMPARE_NODE_FIELD(setOperations); + COMPARE_SCALAR_FIELD(isMergeAction); return true; } diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 79baf4f..4b3dd60 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -331,6 +331,7 @@ _outModifyTable(StringInfo str, ModifyTable *node) WRITE_NODE_FIELD(returningLists); WRITE_NODE_FIELD(rowMarks); WRITE_INT_FIELD(epqParam); + WRITE_NODE_FIELD(mergeActPlan); } static void @@ -2019,9 +2020,52 @@ _outQuery(StringInfo str, Query *node) WRITE_NODE_FIELD(limitCount); WRITE_NODE_FIELD(rowMarks); WRITE_NODE_FIELD(setOperations); + WRITE_BOOL_FIELD(isMergeAction); + WRITE_NODE_FIELD(mergeActQry); +} + + +static void +_outMergeConditionAction(StringInfo str, MergeConditionAction *node) +{ + WRITE_NODE_TYPE("MERGECONDITIONACTION"); + + WRITE_BOOL_FIELD(match); + + WRITE_NODE_FIELD(condition); + WRITE_NODE_FIELD(action); + + } static void +_outMergeStmt(StringInfo str, MergeStmt *node) +{ + WRITE_NODE_TYPE("MERGESTMT"); + + WRITE_NODE_FIELD(relation); + WRITE_NODE_FIELD(source); + WRITE_NODE_FIELD(matchCondition); + WRITE_NODE_FIELD(actions); + +} + +static void +_outDeleteStmt(StringInfo str, DeleteStmt *node) +{ + WRITE_NODE_TYPE("DELETESTMT"); + + WRITE_NODE_FIELD(relation); + WRITE_NODE_FIELD(usingClause); + WRITE_NODE_FIELD(whereClause); + WRITE_NODE_FIELD(returningList); + + +} + + + +static void _outSortGroupClause(StringInfo str, SortGroupClause *node) { WRITE_NODE_TYPE("SORTGROUPCLAUSE"); @@ -2905,6 +2949,17 @@ _outNode(StringInfo str, void *obj) _outXmlSerialize(str, obj); break; + case T_MergeStmt: + _outMergeStmt(str, obj); + break; + case T_MergeConditionAction: + _outMergeConditionAction(str,obj); + break; + case T_DeleteStmt: + _outDeleteStmt(str,obj); + break; + + default: /* diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 2115cc0..9f6e6fe 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -3973,6 +3973,7 @@ make_modifytable(CmdType operation, List *resultRelations, node->rowMarks = rowMarks; node->epqParam = epqParam; + return node; } diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 3950ab4..f07afba 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -194,7 +194,7 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) /* Default assumption is we need all the tuples */ tuple_fraction = 0.0; } - +printf("going to do the top planner\n"); /* primary planning entry point (may recurse for subqueries) */ top_plan = subquery_planner(glob, parse, NULL, false, tuple_fraction, &root); @@ -220,6 +220,8 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) Assert(list_length(glob->subplans) == list_length(glob->subrowmarks)); lrt = list_head(glob->subrtables); lrm = list_head(glob->subrowmarks); + +printf("set supble refers\n"); foreach(lp, glob->subplans) { Plan *subplan = (Plan *) lfirst(lp); @@ -565,6 +567,28 @@ subquery_planner(PlannerGlobal *glob, Query *parse, returningLists, rowMarks, SS_assign_special_param(root)); + + + + /*do a simple plan for each actions in the merge command. + *put them in mergeActPlan list; + */ + if(parse->commandType == CMD_MERGE) + { + ListCell *l; + ModifyTable *top_plan = (ModifyTable *)plan; + foreach(l, parse->mergeActQry) + { + + Plan *actplan = (Plan *)merge_action_planner(glob, + (Query *)lfirst(l), + (Plan *)linitial(top_plan->plans) + ); + + top_plan->mergeActPlan = lappend(top_plan->mergeActPlan, actplan); + } + + } } } @@ -584,6 +608,126 @@ subquery_planner(PlannerGlobal *glob, Query *parse, return plan; } + + +ModifyTable * +merge_action_planner(PlannerGlobal *glob, Query *parse, + Plan *top_plan) +{ + PlannerInfo *root; + MergeAction *actplan; + ModifyTable *result; + + List *returningLists; + List *rowMarks; + + /* Create a PlannerInfo data structure for this subquery */ + root = makeNode(PlannerInfo); + root->parse = parse; + root->glob = glob; + root->query_level = 1; + root->parent_root = NULL; + root->planner_cxt = CurrentMemoryContext; + root->init_plans = NIL; + root->cte_plan_ids = NIL; + root->eq_classes = NIL; + root->append_rel_list = NIL; + + root->hasRecursion = false; + root->wt_param_id = -1; + root->non_recursive_plan = NULL; + + +zbxprint("the query node for palnner is \n%s\n", nodeToString(parse)); + + /* + * no having clause in a merge action + */ + Assert(parse->havingQual == NULL); + + /* Clear this flag; might get set in distribute_qual_to_rels */ + root->hasPseudoConstantQuals = false; + + /* + * Do expression preprocessing on targetlist and quals. + */ + +zbxprint("before targelist \n"); + + parse->targetList = (List *) + preprocess_expression(root, (Node *) parse->targetList, + EXPRKIND_TARGET); + +zbxprint("before qual condition \n"); + preprocess_qual_conditions(root, (Node *) parse->jointree); + + +zbxprint("after qual condition \n"); + + + /* + * Do the main planning. If we have an inherited target relation, that + * needs special processing, else go straight to grouping_planner. + */ + + + actplan = makeNode(MergeAction); + actplan->operation = parse->commandType; + + /*copy the cost from the top_plan*/ + actplan->plan.startup_cost = top_plan->startup_cost; + actplan->plan.total_cost = top_plan->total_cost; + actplan->plan.plan_rows = top_plan->plan_rows; + actplan->plan.plan_width = top_plan->plan_width; + + +zbxprint("before tlist preprocess \n"); + + actplan->plan.targetlist = preprocess_targetlist(root,parse->targetList); + +if(parse->jointree == NULL) + zbxprint("join tree is null \n"); + + if(parse->jointree->quals != NULL && IsA(parse->jointree->quals , List)) + actplan->plan.qual = (List *)parse->jointree->quals; + else + zbxprint("the qual in from expr is not a list\n"); + + if (parse->returningList) + { + List *rlist; + + Assert(parse->resultRelation); + rlist = set_returning_clause_references(root->glob, + parse->returningList, + &actplan->plan, + parse->resultRelation); + returningLists = list_make1(rlist); + } + else + returningLists = NIL; + + + if (parse->rowMarks) + rowMarks = NIL; + else + rowMarks = root->rowMarks; +zbxprint("make modifty table \n"); + + + result = make_modifytable(parse->commandType, + copyObject(root->resultRelations), + list_make1(actplan), + returningLists, + rowMarks, + SS_assign_special_param(root)); + + zbxprint("after make modifty table \n"); + + return result; +} + + /* * preprocess_expression * Do subquery_planner's preprocessing work for an expression, @@ -608,6 +752,8 @@ preprocess_expression(PlannerInfo *root, Node *expr, int kind) * can skip it in VALUES lists, however, since they can't contain any Vars * at all. */ + +printf("before flattern join alias vars\n"); if (root->hasJoinRTEs && kind != EXPRKIND_VALUES) expr = flatten_join_alias_vars(root, expr); @@ -626,6 +772,9 @@ preprocess_expression(PlannerInfo *root, Node *expr, int kind) * careful to maintain AND/OR flatness --- that is, do not generate a tree * with AND directly under AND, nor OR directly under OR. */ + + printf("before evalu const vars\n"); + expr = eval_const_expressions(root, expr); /* @@ -982,7 +1131,6 @@ grouping_planner(PlannerInfo *root, double tuple_fraction) if (parse->groupClause) preprocess_groupclause(root); numGroupCols = list_length(parse->groupClause); - /* Preprocess targetlist */ tlist = preprocess_targetlist(root, tlist); diff --git a/src/backend/optimizer/prep/preptlist.c b/src/backend/optimizer/prep/preptlist.c index 59d3518..6eb3d5a 100644 --- a/src/backend/optimizer/prep/preptlist.c +++ b/src/backend/optimizer/prep/preptlist.c @@ -55,7 +55,6 @@ preprocess_targetlist(PlannerInfo *root, List *tlist) List *range_table = parse->rtable; CmdType command_type = parse->commandType; ListCell *lc; - /* * Sanity check: if there is a result relation, it'd better be a real * relation not a subquery. Else parser or rewriter messed up. @@ -78,17 +77,25 @@ preprocess_targetlist(PlannerInfo *root, List *tlist) result_relation, range_table); /* - * for "update" and "delete" queries, add ctid of the result relation into + * for "update" , "delete" and "merge" queries, add ctid of the result relation into * the target list so that the ctid will propagate through execution and * ExecutePlan() will be able to identify the right tuple to replace or * delete. This extra field is marked "junk" so that it is not stored * back into the tuple. + * + * BUT, if the query node is a merge action, we don't need to expend the ctid attribute in tlist. + * The tlist of the merge top level plan already contains a "ctid" junk attr of the target relation. */ - if (command_type == CMD_UPDATE || command_type == CMD_DELETE) + + if(!parse->isMergeAction && + (command_type == CMD_UPDATE || + command_type == CMD_DELETE || + command_type == CMD_MERGE)) { TargetEntry *tle; Var *var; + var = makeVar(result_relation, SelfItemPointerAttributeNumber, TIDOID, -1, 0); diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c index 6b99a10..66287ab 100644 --- a/src/backend/parser/analyze.c +++ b/src/backend/parser/analyze.c @@ -47,6 +47,8 @@ static Query *transformDeleteStmt(ParseState *pstate, DeleteStmt *stmt); static Query *transformInsertStmt(ParseState *pstate, InsertStmt *stmt); static List *transformInsertRow(ParseState *pstate, List *exprlist, List *stmtcols, List *icolumns, List *attrnos); + +static Query *transformMergeStmt(ParseState *pstate, MergeStmt *stmt); static Query *transformSelectStmt(ParseState *pstate, SelectStmt *stmt); static Query *transformValuesClause(ParseState *pstate, SelectStmt *stmt); static Query *transformSetOperationStmt(ParseState *pstate, SelectStmt *stmt); @@ -164,17 +166,24 @@ transformStmt(ParseState *pstate, Node *parseTree) * Optimizable statements */ case T_InsertStmt: + case T_MergeInsert: result = transformInsertStmt(pstate, (InsertStmt *) parseTree); break; case T_DeleteStmt: + case T_MergeDelete: result = transformDeleteStmt(pstate, (DeleteStmt *) parseTree); break; case T_UpdateStmt: + case T_MergeUpdate: result = transformUpdateStmt(pstate, (UpdateStmt *) parseTree); break; + case T_MergeStmt: + result = transformMergeStmt(pstate, (MergeStmt *)parseTree); + break; + case T_SelectStmt: { SelectStmt *n = (SelectStmt *) parseTree; @@ -282,22 +291,33 @@ transformDeleteStmt(ParseState *pstate, DeleteStmt *stmt) qry->commandType = CMD_DELETE; - /* set up range table with just the result rel */ - qry->resultRelation = setTargetTable(pstate, stmt->relation, - interpretInhOption(stmt->relation->inhOpt), - true, - ACL_DELETE); - qry->distinctClause = NIL; + if(IsA(stmt,MergeDelete)) + qry->isMergeAction = true; /* - * The USING clause is non-standard SQL syntax, and is equivalent in - * functionality to the FROM list that can be specified for UPDATE. The - * USING keyword is used rather than FROM because FROM is already a - * keyword in the DELETE syntax. - */ - transformFromClause(pstate, stmt->usingClause); + * The input stmt could be a MergeDelete node. + * In this case, we don't need the process on range table. + */ + if(IsA(stmt, DeleteStmt)) + { + /* set up range table with just the result rel */ + qry->resultRelation = setTargetTable(pstate, stmt->relation, + interpretInhOption(stmt->relation->inhOpt), + true, + ACL_DELETE); + + + /* + * The USING clause is non-standard SQL syntax, and is equivalent in + * functionality to the FROM list that can be specified for UPDATE. The + * USING keyword is used rather than FROM because FROM is already a + * keyword in the DELETE syntax. + */ + transformFromClause(pstate, stmt->usingClause); + } + qual = transformWhereClause(pstate, stmt->whereClause, "WHERE"); qry->returningList = transformReturningList(pstate, stmt->returningList); @@ -342,11 +362,16 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt) qry->commandType = CMD_INSERT; pstate->p_is_insert = true; + if(IsA(stmt,MergeInsert)) + qry->isMergeAction = true; + /* * We have three cases to deal with: DEFAULT VALUES (selectStmt == NULL), * VALUES list, or general SELECT input. We special-case VALUES, both for * efficiency and so we can handle DEFAULT specifications. */ + + /*a MergeInsert statment is always a VALUE clause*/ isGeneralSelect = (selectStmt && selectStmt->valuesLists == NIL); /* @@ -382,7 +407,8 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt) * mentioned in the SELECT part. Note that the target table is not added * to the joinlist or namespace. */ - qry->resultRelation = setTargetTable(pstate, stmt->relation, + if(IsA(stmt,InsertStmt))/*for MergeInsert, no need to do this*/ + qry->resultRelation = setTargetTable(pstate, stmt->relation, false, false, ACL_INSERT); /* Validate stmt->cols list, or build default list if no list given */ @@ -799,6 +825,7 @@ transformSelectStmt(ParseState *pstate, SelectStmt *stmt) /* process the FROM clause */ transformFromClause(pstate, stmt->fromClause); + /* transform targetlist */ qry->targetList = transformTargetList(pstate, stmt->targetList); @@ -1730,16 +1757,27 @@ transformUpdateStmt(ParseState *pstate, UpdateStmt *stmt) qry->commandType = CMD_UPDATE; pstate->p_is_update = true; - qry->resultRelation = setTargetTable(pstate, stmt->relation, - interpretInhOption(stmt->relation->inhOpt), - true, - ACL_UPDATE); + if(IsA(stmt,MergeUpdate)) + qry->isMergeAction = true; + + + if(IsA(stmt, UpdateStmt))/*for MergeUpdate, no need to do this*/ + { + qry->resultRelation = setTargetTable(pstate, stmt->relation, + interpretInhOption(stmt->relation->inhOpt), + true, + ACL_UPDATE); + + /* + * the FROM clause is non-standard SQL syntax. We used to be able to do + * this with REPLACE in POSTQUEL so we keep the feature. + */ + transformFromClause(pstate, stmt->fromClause); + } + +zbxprint("the rel name space is \n%s\n", nodeToString(pstate->p_relnamespace)); +zbxprint("the var name space is \n%s\n", nodeToString(pstate->p_varnamespace)); - /* - * the FROM clause is non-standard SQL syntax. We used to be able to do - * this with REPLACE in POSTQUEL so we keep the feature. - */ - transformFromClause(pstate, stmt->fromClause); qry->targetList = transformTargetList(pstate, stmt->targetList); @@ -2241,3 +2279,336 @@ applyLockingClause(Query *qry, Index rtindex, rc->pushedDown = pushedDown; qry->rowMarks = lappend(qry->rowMarks, rc); } + + + +/*transform a action of merge command into a query. +No change of the pstate range table is allowed in this function. +*/ +static Query * +transformMergeActions(ParseState *pstate, MergeStmt *stmt, MergeConditionAction *condact) +{ + Query *actqry = makeNode(Query); + A_Expr *match_expr; //the expr of matched/not matched + A_Expr *act_qual_expr; + + /*firstly, we need to make sure that DELETE and UPDATE actions are only taken in MATCHED condition + and INSERTs are only takend when not MATCHED + */ + if(IsA(condact->action, MergeDelete)) + { + if(!condact->match) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("The DELETE action in MERGE command is not allowed when NOT MATCHED"))); + } + else if(IsA(condact->action, MergeUpdate)) + { + if(!condact->match) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("The UPDATE action in MERGE command is not allowed when NOT MATCHED"))); + } + else if(IsA(condact->action, MergeInsert)) + { + if(condact->match) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("The INSERT action in MERGE command is not allowed when MATCHED"))); + } + else + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("UNKONWN action type in MERGE"))); + + + + /*combine the condtion of this act with the ON qual of the merge command + do a copy of the merge condtion for safety. + */ + + /* + if(condact->match) + match_expr = copyObject(stmt->matchCondition); + else + match_expr = makeA_Expr(AEXPR_NOT, NIL, NULL, + copyObject(stmt->matchCondition), 1); + + + if(condact->condition) + act_qual_expr = makeA_Expr(AEXPR_AND, NIL, condact->condition, (Node *)match_expr, 2); + else + act_qual_expr = match_expr; + + */ + + act_qual_expr = condact->condition; + + /*use the transfomStmt() to parse all types of actions*/ + if(IsA(condact->action, MergeDelete)) + { + /*a delete action*/ + MergeDelete *deleteact = (MergeDelete *)(condact->action); + Assert(IsA(deleteact,DeleteStmt)); + + deleteact->relation = stmt->relation; + deleteact->usingClause = stmt->source; + deleteact->whereClause = (Node *)act_qual_expr; + + /*parse the action query*/ + actqry = transformStmt(pstate, (Node *)deleteact); + + if(!IsA(actqry, Query) || actqry->commandType != CMD_DELETE || actqry->utilityStmt != NULL) + elog(ERROR, "improper DELETE action in merge stmt"); + + return actqry; + } + else if(IsA(condact->action, MergeUpdate)) + { + /*an update action*/ + MergeUpdate *updateact = (MergeUpdate *)(condact->action); + + /*the "targetlist" of the updateact is filled in the parser */ + updateact->relation = stmt->relation; + updateact->fromClause = stmt->source; + updateact->whereClause = (Node *)act_qual_expr; + + /*parse the action query*/ + actqry = transformStmt(pstate, (Node *)updateact); + + if(!IsA(actqry, Query) || actqry->commandType != CMD_UPDATE|| actqry->utilityStmt != NULL) + elog(ERROR, "improper UPDATE action in merge stmt"); + + return actqry; + } + else if(IsA(condact->action, MergeInsert)) + { + /*an insert action*/ + Node *qual; + MergeInsert *insertact; + + insertact = (MergeInsert *)(condact->action); + + + /*the "cols" and "selectStmt" of the insertact is filled in the parser */ + insertact->relation = stmt->relation; + + /* + the merge insert action has a strange feature. + In an ordinary INSERT, the VALUES list can only contains constants and DEFAULT. (am I right??) + But in the INSERT action of MERGE command, the VALUES list can have expressions with variables(attributes of the targe and source tables). + Besides, in the ordinary INSERT, a VALUES list can never be followed by a WHERE clause. But in MERGE INSERT action, there are matching conditions. + + Thus, the output qry of this function is an INSERT query in the style of "INSERT...VALUES...", except that we have other range tables and a WHERE clause. + Note that it is also different from the "INSERT ... SELECT..." query, in which the whole SELECT is a subquery. (We don't have subquery here). + We construct this novel query structure in order to keep consitency with other merge action types (DELETE, UPDATE). + In this way, all the merge action queries are in fact share the very same Range Table, They only differs in their target lists and join trees + + */ + + + /*parse the action query, this will call transformInsertStmt() which analyzes the VALUES list.*/ + actqry = transformStmt(pstate, (Node *)insertact); + + /*do the WHERE clause here, Since the transformInsertStmt() function only analyzes the VALUES list but not the WHERE clause*/ + qual = transformWhereClause(pstate,(Node *)act_qual_expr, "WHERE"); + + actqry->jointree = makeFromExpr(pstate->p_joinlist, qual); + + + if(!IsA(actqry, Query) || actqry->commandType != CMD_INSERT|| actqry->utilityStmt != NULL) + elog(ERROR, "improper INSERT action in merge stmt"); + + + return actqry; + } + else + elog(ERROR, "unknown action type in MERGE"); + + /*never comes here*/ + return NULL; +} + + + +static Query * +transformMergeStmt(ParseState *pstate, MergeStmt *stmt) +{ + Query *qry; + + ColumnRef *starRef; + ResTarget *starResTarget; + ListCell *act; + ListCell *l; + JoinExpr *joinexp; + int rtindex; + + /*this will never happen, since the garm.y is restricted that only one rel name is allowed to appear in the source table position. + However, if we extent the command in future, we may need to note this check here. + */ + if(list_length(stmt->source) != 1) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("now we only accept merge command with only ONE source table"))); + + /*now, do the real tranformation of the merge command. */ + qry = makeNode(Query); + qry->commandType = CMD_MERGE; + + /* + What we are doing here is to create a query like + "SELECT * FROM LEFT JOIN ON ;" + Note: + 1. we set the "match condition" as the join qualification. + The left join will scan both the matched and non-matched tuples. + + 2. a normal SELECT query has no "target relation". + But here we need to set the targe relation in query, like the UPDATE/DELETE/INSERT queries. + So this is a left join SELECT with a "target table" in its range table. + + 3. We don't have a specific ACL level for Merge, here we just use ACL_SELECT. + But we will add other ACL levels when handle each merge actions. + */ + + + /*before analyze the FROM clause, we need to set the target table. + We cannot call setTargetTable() function directly. + We only need the lock target relation, without adding it to Range table. + */ + + + setTargetTableLock(pstate, stmt->relation); + + + + /*create the FROM clause. Make the join expression first*/ + joinexp = makeNode(JoinExpr); + joinexp->jointype = JOIN_LEFT; + joinexp->isNatural = FALSE; + joinexp->larg = linitial(stmt->source);/*source list has only one element*/ + joinexp->rarg = (Node *)stmt->relation; + joinexp->quals = stmt->matchCondition; /*match condtion*/ + + /*transform the FROM clause. The target relation and source relation will be add to Rtable here. */ + transformFromClause(pstate, list_make1(joinexp)); + + /*the targetList of the main query is "*" + */ + starRef = makeNode(ColumnRef); + starRef->fields = list_make1(makeNode(A_Star)); + starRef->location = 1; + + starResTarget = makeNode(ResTarget); + starResTarget->name = NULL; + starResTarget->indirection = NIL; + starResTarget->val = (Node *)starRef; + starResTarget->location = 1; + + qry->targetList = transformTargetList(pstate, list_make1(starResTarget)); + + /*we don't need the WHERE clause here. Set it null. */ + qry->jointree = makeFromExpr(pstate->p_joinlist, NULL); + + /*now , we find out the RTE for the target relation, and do some unfinished jobs*/ + rtindex = 1; + foreach(l, pstate->p_rtable) + { + RangeTblEntry *rte = (RangeTblEntry *)lfirst(l); + if(rte->relid == pstate->p_target_relation->rd_id) /*find the RTE*/ + { + pstate->p_target_rangetblentry = rte; + rte->requiredPerms = ACL_SELECT; + qry->resultRelation = rtindex; + break; + } + rtindex++; + } + + if(pstate->p_target_rangetblentry == NULL) + elog(ERROR, "cannot find the RTE for target table"); + + + qry->rtable = pstate->p_rtable; + + qry->hasSubLinks = pstate->p_hasSubLinks; + + /* + * Top-level aggregates are simply disallowed in MERGE + */ + if (pstate->p_hasAggs) + ereport(ERROR, + (errcode(ERRCODE_GROUPING_ERROR), + errmsg("cannot use aggregate function in top level of MERGE"), + parser_errposition(pstate, + locate_agg_of_level((Node *) qry, 0)))); + if (pstate->p_hasWindowFuncs) + ereport(ERROR, + (errcode(ERRCODE_WINDOWING_ERROR), + errmsg("cannot use window function in MERGE"), + parser_errposition(pstate, + locate_windowfunc((Node *) qry)))); + + + + + /* + the main query is done. + then for each actions ,we transform it to a seperate query. + the action queries shares the exactly same range table with the main query. + in other words, in the extra condtions of the sub actions, we don't allow involvement of new tables + */ + + qry->mergeActQry = NIL; + + foreach(act,stmt->actions) + { + MergeConditionAction *mca = lfirst(act); + Query *actqry; + + switch(mca->action->type) + { + case T_MergeDelete: + pstate->p_target_rangetblentry->requiredPerms |= ACL_DELETE; + break; + case T_MergeUpdate: + pstate->p_target_rangetblentry->requiredPerms |= ACL_UPDATE; + break; + case T_MergeInsert: + pstate->p_target_rangetblentry->requiredPerms |= ACL_INSERT; + break; + default: + elog(ERROR, "unknown MERGE action type %d", mca->type); + break; + + } + + + /*transform the act (and its condition) as a single query. Link it to the top-level query*/ + actqry = transformMergeActions(pstate, stmt, mca); + + /*since we don't invoke setTargetTable() in transformMergeActions(), we need to set actqry->resultRelation here + */ + actqry->resultRelation = qry->resultRelation; + + + qry->mergeActQry = lappend(qry->mergeActQry, actqry); + } + + + /*for a single-action merge, we just stransform it into a orignial update/delete command. + but the insert action cannot take this shortcut. + */ + /* + if(list_length(stmt->actions) == 1) + { + Query *q = linitial(qry->mergeActQry); + if(q->commandType == CMD_DELETE || q->commandType == CMD_UPDATE) + return q; + } + */ + return qry; + + +} + + diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 3f6eeeb..c139bd7 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -212,6 +212,12 @@ static TypeName *TableFuncTypeName(List *columns); DropOwnedStmt ReassignOwnedStmt AlterTSConfigurationStmt AlterTSDictionaryStmt +%type MergeStmt opt_and_condition merge_condition_action merge_action +%type opt_not +%type merge_condition_action_list + + + %type select_no_parens select_with_parens select_clause simple_select values_clause @@ -233,7 +239,8 @@ static TypeName *TableFuncTypeName(List *columns); %type opt_force opt_or_replace opt_grant_grant_option opt_grant_admin_option opt_nowait opt_if_exists opt_with_data - + + %type OptRoleList AlterOptRoleList %type CreateOptRoleElem AlterOptRoleElem @@ -504,6 +511,8 @@ static TypeName *TableFuncTypeName(List *columns); MAPPING MATCH MAXVALUE MINUTE_P MINVALUE MODE MONTH_P MOVE + MATCHED MERGE + NAME_P NAMES NATIONAL NATURAL NCHAR NEXT NO NOCREATEDB NOCREATEROLE NOCREATEUSER NOINHERIT NOLOGIN_P NONE NOSUPERUSER NOT NOTHING NOTIFY NOTNULL NOWAIT NULL_P NULLIF NULLS_P NUMERIC @@ -725,6 +734,7 @@ stmt : | ListenStmt | LoadStmt | LockStmt + | MergeStmt | NotifyStmt | PrepareStmt | ReassignOwnedStmt @@ -7297,6 +7307,101 @@ set_target_list: /***************************************************************************** * * QUERY: + * MERGE STATEMENT + * + *****************************************************************************/ + + +MergeStmt: + MERGE INTO relation_expr_opt_alias + USING table_ref + ON a_expr + merge_condition_action_list + { + MergeStmt *m = makeNode(MergeStmt); + + m->relation = $3; + m->source = list_make1($5); /*although we have only one USING table, but we still make it a list, maybe in future we will allow mutliple USING tables.*/ + m->matchCondition = $7; + m->actions = $8; + + $$ = (Node *)m; + } + ; + +merge_condition_action_list: + merge_condition_action + { $$ = list_make1($1); } + | merge_condition_action_list merge_condition_action + { $$ = lappend($1,$2); } + ; + +merge_condition_action: + WHEN opt_not MATCHED opt_and_condition THEN merge_action + { + MergeConditionAction *m = makeNode(MergeConditionAction); + + m->match = $2; + m->condition = $4; + m->action = $6; + + $$ = (Node *)m; + } + ; + + +opt_and_condition: + AND a_expr {$$ = $2;} + | /*EMPTY*/ {$$ = NULL;} + ; + +opt_not: + NOT {$$ = false;} + | /*EMPTY*/ {$$ = true;} + ; + +merge_action: + DELETE_P + {$$ = (Node *)makeNode(MergeDelete);} + | UPDATE SET set_clause_list + { + UpdateStmt *n = makeNode(MergeUpdate); + n->targetList = $3; + $$ = (Node *)n; + } + | INSERT values_clause + { + InsertStmt *n = makeNode(MergeInsert); + n->cols = NIL; + n->selectStmt = $2; + + $$ = (Node *)n; + } + + | INSERT '(' insert_column_list ')' values_clause + { + InsertStmt *n = makeNode(MergeInsert); + n->cols = $3; + n->selectStmt = $5; + + $$ = (Node *)n; + } + | INSERT DEFAULT VALUES + { + InsertStmt *n = makeNode(MergeInsert); + n->cols = NIL; + n->selectStmt = NULL; + + $$ = (Node *)n; + } + + ; + + + +/***************************************************************************** + * + * QUERY: * CURSOR STATEMENTS * *****************************************************************************/ @@ -10935,7 +11040,9 @@ unreserved_keyword: | LOGIN_P | MAPPING | MATCH + | MATCHED | MAXVALUE + | MERGE | MINUTE_P | MINVALUE | MODE diff --git a/src/backend/parser/parse_clause.c b/src/backend/parser/parse_clause.c index f30132a..657dd9c 100644 --- a/src/backend/parser/parse_clause.c +++ b/src/backend/parser/parse_clause.c @@ -213,6 +213,29 @@ setTargetTable(ParseState *pstate, RangeVar *relation, return rtindex; } + +void +setTargetTableLock(ParseState *pstate, RangeVar *relation) +{ + + /* Close old target; this could only happen for multi-action rules */ + if (pstate->p_target_relation != NULL) + heap_close(pstate->p_target_relation, NoLock); + + /* + * Open target rel and grab suitable lock (which we will hold till end of + * transaction). + * + * free_parsestate() will eventually do the corresponding heap_close(), + * but *not* release the lock. + */ + pstate->p_target_relation = parserOpenTable(pstate, relation, + RowExclusiveLock); + + +} + + /* * Simplify InhOption (yes/no/default) into boolean yes/no. * diff --git a/src/backend/rewrite/rewriteHandler.c b/src/backend/rewrite/rewriteHandler.c index 25b44dd..287fa1f 100644 --- a/src/backend/rewrite/rewriteHandler.c +++ b/src/backend/rewrite/rewriteHandler.c @@ -1837,6 +1837,45 @@ RewriteQuery(Query *parsetree, List *rewrite_events) } +#define insert_rewrite (1<<0) /*insert merge action has already been processed by rewriter*/ +#define delete_rewrite (1<<1) +#define update_rewrite (1<<2) + +#define insert_instead (1<<3) /*insert merge action is fully replace by rules.*/ +#define delete_instead (1<<4) +#define update_instead (1<<5) + + +#define merge_action_already_rewrite(acttype, flag) \ + ((acttype == CMD_INSERT && (flag & insert_rewrite)) || \ + (acttype == CMD_UPDATE && (flag & update_rewrite)) || \ + (acttype == CMD_DELETE && (flag & delete_rewrite))) + + +#define set_action_rewrite(acttype, flag) \ + if(acttype == CMD_INSERT) \ + {flag |= insert_rewrite;}\ + else if(acttype == CMD_UPDATE) \ + {flag |= update_rewrite;}\ + else if(acttype == CMD_DELETE) \ + {flag |= delete_rewrite;} + + + +#define merge_action_instead(acttype, flag) \ + ((acttype == CMD_INSERT && (flag & insert_instead)) || \ + (acttype == CMD_UPDATE && (flag & update_instead)) || \ + (acttype == CMD_DELETE && (flag & delete_instead))) + +#define set_action_instead(acttype, flag)\ + if(acttype == CMD_INSERT) \ + {flag |= insert_instead;}\ + else if(acttype == CMD_UPDATE) \ + {flag |= update_instead;}\ + else if(acttype == CMD_DELETE) \ + {flag |= delete_instead;} + + /* * QueryRewrite - * Primary entry point to the query rewriter. @@ -1861,7 +1900,133 @@ QueryRewrite(Query *parsetree) * * Apply all non-SELECT rules possibly getting 0 or many queries */ - querylist = RewriteQuery(parsetree, NIL); + + + if(parsetree->commandType == CMD_MERGE) + { + /*for merge query, we have a set of lower-level action queries (not subquery). + //each of these action queries should be applied to RewriteQuery(). + */ + ListCell *cell, + *prev, + *next; + + int flag = 0; + + + List *pre_qry = NIL; + List *post_qry = NIL; + + + querylist = NIL; + + + /*rewrite the merge action queries one by one.*/ + prev = NULL; + + for (cell = list_head(parsetree->mergeActQry); cell; cell = next) + { + List *queryList4action = NIL; + Query *actionqry; + Query *q; + + + actionqry = lfirst(cell); + + next = lnext(cell); + + /*if this kind of actions are fully replaced by rules, we delete it from the action list*/ + if(merge_action_instead(actionqry->commandType, flag)) + { + parsetree->mergeActQry = list_delete_cell(parsetree->mergeActQry, cell, prev); + continue; + } + + + /*if this kind of actions are already processed by rewriter, skip it.*/ + if(merge_action_already_rewrite(actionqry->commandType, flag)) + { + + + prev = cell; + continue; + } + + /*ok this action has not been processed before, let's do it now.*/ + + queryList4action = RewriteQuery(actionqry, NIL); + set_action_rewrite(actionqry->commandType,flag); /*this kind of actions has been processed.*/ + + /*if the returning list is nil, this merge action is replaced by a do-nothing rule*/ + if(queryList4action == NIL) + { + /*set the flag for other merge actions of the same type*/ + set_action_instead(actionqry->commandType, flag); + /*delete the action.*/ + parsetree->mergeActQry = list_delete_cell(parsetree->mergeActQry, cell, prev); + continue; + } + + + /*the merge action query could be one of the elements in the rewriten list. + //if it is in the list, it must be the head or tail. + */ + q = (Query *)linitial(queryList4action); + if(q->querySource == QSRC_ORIGINAL) + { + /*the merge action is the head, the remaining part of the list are the queries generated by rules + //we put them in the post_qry list. + */ + if(querylist == NIL) + querylist = list_make1(parsetree); + + + queryList4action = list_delete_first(queryList4action); + post_qry = list_concat(post_qry,queryList4action); + prev = cell; + continue; + + } + + + q = (Query *)llast(queryList4action); + if(q->querySource == QSRC_ORIGINAL) + { + /*the merge action is the tail. Put the rule queries in pre_qry list*/ + + if(querylist == NIL) + querylist = list_make1(parsetree); + + queryList4action = list_truncate(queryList4action,list_length(queryList4action)-1); + + pre_qry = list_concat(pre_qry,queryList4action); + + prev = cell; + continue; + + } + + + /*here, the merge action query is not in the rewriten query list, which means the action should be deleted + //It is replaced by INSTEAD rule(s). We need to delete the action + */ + post_qry = list_concat(post_qry,queryList4action); + set_action_instead(actionqry->commandType, flag); + parsetree->mergeActQry = list_delete_cell(parsetree->mergeActQry, cell, prev); + } + + + + /*finally, put the 3 lists into one. + If all the merge actions are replaced by rules, the original merge query + will not be involved in the querylist. + */ + querylist = list_concat(pre_qry,querylist); + querylist = list_concat(querylist, post_qry); + + } + else + querylist = RewriteQuery(parsetree, NIL); /* * Step 2 diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 5efad23..9a9bc7c 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -1012,6 +1012,7 @@ exec_simple_query(const char *query_string) */ PortalStart(portal, NULL, InvalidSnapshot); + /* * Select the appropriate output format: text unless we are doing a * FETCH from a binary cursor. (Pretty grotty to have to do this here @@ -1046,6 +1047,7 @@ exec_simple_query(const char *query_string) */ MemoryContextSwitchTo(oldcontext); + /* * Run the portal to completion, and then drop it (and the receiver). */ @@ -1056,6 +1058,7 @@ exec_simple_query(const char *query_string) receiver, completionTag); + (*receiver->rDestroy) (receiver); PortalDrop(portal, false); diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c index 8ad4915..4453b47 100644 --- a/src/backend/tcop/pquery.c +++ b/src/backend/tcop/pquery.c @@ -191,6 +191,7 @@ ProcessQuery(PlannedStmt *plan, */ ExecutorStart(queryDesc, 0); + /* * Run the plan to completion. */ @@ -225,6 +226,10 @@ ProcessQuery(PlannedStmt *plan, snprintf(completionTag, COMPLETION_TAG_BUFSIZE, "DELETE %u", queryDesc->estate->es_processed); break; + case CMD_MERGE: + snprintf(completionTag, COMPLETION_TAG_BUFSIZE, + "MERGE %u", queryDesc->estate->es_processed); + break; default: strcpy(completionTag, "???"); break; diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 8960246..2733e5d 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -125,6 +125,7 @@ CommandIsReadOnly(Node *parsetree) case CMD_UPDATE: case CMD_INSERT: case CMD_DELETE: + case CMD_MERGE: return false; default: elog(WARNING, "unrecognized commandType: %d", @@ -1398,6 +1399,10 @@ CreateCommandTag(Node *parsetree) tag = "SELECT"; break; + case T_MergeStmt: + tag = "MERGE"; + break; + /* utility statements --- same whether raw or cooked */ case T_TransactionStmt: { @@ -2235,6 +2240,7 @@ GetCommandLogLevel(Node *parsetree) case T_InsertStmt: case T_DeleteStmt: case T_UpdateStmt: + case T_MergeStmt: lev = LOGSTMT_MOD; break; diff --git a/src/include/executor/nodeModifyTable.h b/src/include/executor/nodeModifyTable.h index 67ba3e8..7ae2f0a 100644 --- a/src/include/executor/nodeModifyTable.h +++ b/src/include/executor/nodeModifyTable.h @@ -16,6 +16,9 @@ #include "nodes/execnodes.h" extern ModifyTableState *ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags); + +extern PlanState *ExecInitMergeAction(MergeAction *node, EState *estate, int eflags); + extern TupleTableSlot *ExecModifyTable(ModifyTableState *node); extern void ExecEndModifyTable(ModifyTableState *node); extern void ExecReScanModifyTable(ModifyTableState *node); diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 09fdb5d..8147c44 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1031,6 +1031,7 @@ typedef struct ModifyTableState int mt_whichplan; /* which one is being executed (0..n-1) */ EPQState mt_epqstate; /* for evaluating EvalPlanQual rechecks */ bool fireBSTriggers; /* do we need to fire stmt triggers? */ + List *mergeActPstates; /*the list of the panstate of each meger command action. NIL if this is not a merge command, all actions' */ } ModifyTableState; /* ---------------- diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index a5f5df5..15d11a0 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -44,6 +44,7 @@ typedef enum NodeTag T_Plan = 100, T_Result, T_ModifyTable, + T_MergeAction, T_Append, T_RecursiveUnion, T_BitmapAnd, @@ -261,6 +262,15 @@ typedef enum NodeTag T_DeleteStmt, T_UpdateStmt, T_SelectStmt, + + + T_MergeStmt, + T_MergeConditionAction, + T_MergeUpdate, + T_MergeDelete, + T_MergeInsert, + + T_AlterTableStmt, T_AlterTableCmd, T_AlterDomainStmt, @@ -511,6 +521,7 @@ typedef enum CmdType CMD_UPDATE, /* update stmt */ CMD_INSERT, /* insert stmt */ CMD_DELETE, + CMD_MERGE, /*merge stmt*/ CMD_UTILITY, /* cmds like create, destroy, copy, vacuum, * etc. */ CMD_NOTHING /* dummy command for instead nothing rules diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index b591073..d6de83c 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -146,6 +146,11 @@ typedef struct Query Node *setOperations; /* set-operation tree if this is top level of * a UNION/INTERSECT/EXCEPT query */ + + bool isMergeAction; /*if this query is a merge action. */ + + List *mergeActQry; /* the list of all the merge actions. + * used only for merge query statment*/ } Query; @@ -990,6 +995,30 @@ typedef struct SelectStmt /* Eventually add fields for CORRESPONDING spec here */ } SelectStmt; +/*ZBX: the structure for MERGE command statement*/ +typedef struct MergeStmt +{ + NodeTag type; + RangeVar *relation; /*targe relation for merge */ + List *source; /* source relations for the merge. Currently, we only allwo single-source merge, so the length of this list should always be 1 */ + Node *matchCondition; /* qualifications of the merge*/ + List *actions; /*list of MergeConditionAction structure. It stores all the match / non-matching conditions and the corresponding actions*/ + +}MergeStmt; + +/*the structure for the actions of MERGE command. Holds info of the clauses like "... WHEN MATCHED AND ... THEN UPDATE/DELETE/INSERT" +*/ +typedef struct MergeConditionAction +{ + NodeTag type; + bool match; /*match or not match*/ + Node *condition;/*the AND condition for this action*/ + Node *action; /*the actions: delete , insert or update*/ +}MergeConditionAction; + +typedef UpdateStmt MergeUpdate; +typedef DeleteStmt MergeDelete; +typedef InsertStmt MergeInsert; /* ---------------------- * Set Operation node for post-analysis query trees diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 037bc0b..4e067b2 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -66,6 +66,7 @@ typedef struct PlannedStmt List *invalItems; /* other dependencies, as PlanInvalItems */ int nParamExec; /* number of PARAM_EXEC Params used */ + } PlannedStmt; /* macro for fetching the Plan associated with a SubPlan node */ @@ -169,8 +170,16 @@ typedef struct ModifyTable List *returningLists; /* per-target-table RETURNING tlists */ List *rowMarks; /* PlanRowMarks (non-locking only) */ int epqParam; /* ID of Param for EvalPlanQual re-eval */ + List *mergeActPlan; /*the plans for merge actions, which are also ModifyTable nodes*/ } ModifyTable; + +typedef struct MergeAction +{ + Plan plan; + CmdType operation;/* INSERT, UPDATE, or DELETE */ +}MergeAction; + /* ---------------- * Append node - * Generate the concatenation of the results of sub-plans. diff --git a/src/include/optimizer/planner.h b/src/include/optimizer/planner.h index 805dee7..6d7ff4f 100644 --- a/src/include/optimizer/planner.h +++ b/src/include/optimizer/planner.h @@ -34,6 +34,7 @@ extern Plan *subquery_planner(PlannerGlobal *glob, Query *parse, PlannerInfo *parent_root, bool hasRecursion, double tuple_fraction, PlannerInfo **subroot); +extern ModifyTable *merge_action_planner(PlannerGlobal *glob, Query *parse, Plan *top_plan); extern Expr *expression_planner(Expr *expr); diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index 49d4b6c..436d459 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -229,7 +229,9 @@ PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD) PG_KEYWORD("login", LOGIN_P, UNRESERVED_KEYWORD) PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD) PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD) +PG_KEYWORD("matched", MATCHED, UNRESERVED_KEYWORD) PG_KEYWORD("maxvalue", MAXVALUE, UNRESERVED_KEYWORD) +PG_KEYWORD("merge", MERGE, UNRESERVED_KEYWORD) PG_KEYWORD("minute", MINUTE_P, UNRESERVED_KEYWORD) PG_KEYWORD("minvalue", MINVALUE, UNRESERVED_KEYWORD) PG_KEYWORD("mode", MODE, UNRESERVED_KEYWORD) diff --git a/src/include/parser/parse_clause.h b/src/include/parser/parse_clause.h index f3d3ee9..e4312d8 100644 --- a/src/include/parser/parse_clause.h +++ b/src/include/parser/parse_clause.h @@ -19,6 +19,8 @@ extern void transformFromClause(ParseState *pstate, List *frmList); extern int setTargetTable(ParseState *pstate, RangeVar *relation, bool inh, bool alsoSource, AclMode requiredPerms); +extern void setTargetTableLock(ParseState *pstate, RangeVar *relation); + extern bool interpretInhOption(InhOption inhOpt); extern bool interpretOidsOption(List *defList); diff --git a/src/include/port.h b/src/include/port.h index 291a3e7..93fb00a 100644 --- a/src/include/port.h +++ b/src/include/port.h @@ -152,7 +152,7 @@ extern unsigned char pg_toupper(unsigned char ch); extern unsigned char pg_tolower(unsigned char ch); #ifdef USE_REPL_SNPRINTF - +#define zbxprint printf /* * Versions of libintl >= 0.13 try to replace printf() and friends with * macros to their own versions that understand the %$ format. We do the