diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 2f33fdb..9f17eca 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -86,7 +86,9 @@ static void intorel_startup(DestReceiver *self, int operation, TupleDesc typeinf static void intorel_receive(TupleTableSlot *slot, DestReceiver *self); static void intorel_shutdown(DestReceiver *self); static void intorel_destroy(DestReceiver *self); - +static void ExecMerge(TupleTableSlot * slot,ItemPointer tupleid, + TupleTableSlot * planSlot,DestReceiver * dest, + EState * estate); /* end of local decls */ @@ -171,6 +173,7 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) case CMD_INSERT: case CMD_DELETE: case CMD_UPDATE: + case CMD_MERGE: estate->es_output_cid = GetCurrentCommandId(true); break; @@ -795,6 +798,32 @@ InitPlan(QueryDesc *queryDesc, int eflags) i++; } + + + + /* + * for the merge actions, we need to similar things as above + */ + + if(operation == CMD_MERGE) + { + ListCell *eachact; + foreach(eachact, queryDesc->plannedstmt->mergeActPlan) + { + Plan *actplan = (Plan *)lfirst(eachact); + PlanState *actpstate; + + Assert(IsA(actplan,Plan)); + + //here, I call ExecInitNode directly. (correct or not?) + actpstate = ExecInitMergeActNode(actplan,estate,0); + //replace the plan with this planstate. Still put them in the plannedstmt->mergeActPlan list, + + lfirst(eachact) = actpstate; + } + } + + /* * Initialize the private state information for all the nodes in the query * tree. This opens files, allocates storage and leaves us ready to start diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index f4cc7d9..cc549ea 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -327,6 +327,54 @@ ExecInitNode(Plan *node, EState *estate, int eflags) } +/* +* When init a merge plan, we also need init its action plans. +* However, these action plans are "pure" plans. We only want to handle the tlist and quals +*/ +PlanState * +ExecInitMergeActNode(Plan *node, EState *estate, int eflags) +{ + PlanState *result; + + /* + * do nothing when we get to the end of a leaf on tree. + */ + if (node == NULL) + return NULL; + + + /* + * create state structure + */ + result = makeNode(PlanState); + result->plan = node; + result->state = estate; + + /* + * Miscellaneous initialization + * + * create expression context for node + */ + + ExecAssignExprContext(estate, result); + + /* + * initialize child expressions + */ + result->targetlist = (List *) + ExecInitExpr((Expr *) node->targetlist, result); + result->qual = (List *) + ExecInitExpr((Expr *) node->qual, result); + + + /* Set up instrumentation for this node if requested */ + if (estate->es_instrument) + result->instrument = InstrAlloc(1, estate->es_instrument); + + return result; +} + + /* ---------------------------------------------------------------- * ExecProcNode * diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 8619ce3..ea0ce87 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -583,6 +583,18 @@ lreplace:; } +static TupleTableSlot * +ExecMerge(ItemPointer tupleid, + TupleTableSlot *slot, + TupleTableSlot *planSlot, + EPQState *epqstate, + EState *estate) +{ + printf("get a tuple for merge \n"); + return NULL; + +} + /* * Process BEFORE EACH STATEMENT triggers */ @@ -603,6 +615,9 @@ fireBSTriggers(ModifyTableState *node) ExecBSDeleteTriggers(node->ps.state, node->ps.state->es_result_relations); break; + case CMD_MERGE: + printf("not sure about the BS triggers of merge\n"); + break; default: elog(ERROR, "unknown operation"); break; @@ -629,6 +644,9 @@ fireASTriggers(ModifyTableState *node) ExecASDeleteTriggers(node->ps.state, node->ps.state->es_result_relations); break; + case CMD_MERGE: + printf("not sure about the AS triggers of merge\n"); + break; default: elog(ERROR, "unknown operation"); break; @@ -744,6 +762,11 @@ ExecModifyTable(ModifyTableState *node) slot = ExecDelete(tupleid, planSlot, &node->mt_epqstate, estate); break; + case CMD_MERGE: + slot = ExecMerge(tupleid, slot, planSlot, + &node->mt_epqstate, estate); + break; + default: elog(ERROR, "unknown operation"); break; @@ -955,6 +978,7 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) break; case CMD_UPDATE: case CMD_DELETE: + case CMD_MERGE: junk_filter_needed = true; break; default: diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 93dcef5..ff22fa2 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -90,6 +90,7 @@ _copyPlannedStmt(PlannedStmt *from) COPY_NODE_FIELD(relationOids); COPY_NODE_FIELD(invalItems); COPY_SCALAR_FIELD(nParamExec); + COPY_NODE_FIELD(mergeActPlan); return newnode; } @@ -2273,6 +2274,9 @@ _copyQuery(Query *from) COPY_NODE_FIELD(rowMarks); COPY_NODE_FIELD(setOperations); + /*merge action list*/ + COPY_NODE_FIELD(mergeActQry); + return newnode; } @@ -2343,6 +2347,64 @@ _copySelectStmt(SelectStmt *from) return newnode; } + +static MergeStmt * +_copyMergeStmt(MergeStmt *from) +{ + MergeStmt *newnode = makeNode(MergeStmt); + + COPY_NODE_FIELD(relation); + COPY_NODE_FIELD(source); + COPY_NODE_FIELD(matchCondition); + COPY_NODE_FIELD(actions); + + return newnode; + +} + + +static MergeConditionAction * +_copyMergeConditionAction(MergeConditionAction *from) +{ + MergeConditionAction *newnode = makeNode(MergeConditionAction); + + COPY_SCALAR_FIELD(match); + COPY_NODE_FIELD(condition); + COPY_NODE_FIELD(action); + + return newnode; +} + +static MergeUpdate * +_copyMergeUpdate(MergeUpdate *from) +{ + MergeUpdate *newNode = (MergeUpdate *)_copyUpdateStmt((UpdateStmt *) from); + newNode->type = T_MergeUpdate; + + return newNode; +} + +static MergeInsert * +_copyMergeInsert(MergeInsert *from) +{ + MergeInsert *newNode = (MergeInsert *)_copyInsertStmt((InsertStmt *) from); + newNode->type = T_MergeInsert; + + return newNode; +} + + +static MergeDelete * +_copyMergeDelete(MergeDelete *from) +{ + MergeDelete *newNode = (MergeDelete *)_copyDeleteStmt((DeleteStmt *) from); + newNode->type = T_MergeDelete; + + return newNode; +} + + + static SetOperationStmt * _copySetOperationStmt(SetOperationStmt *from) { @@ -3902,7 +3964,22 @@ copyObject(void *from) break; case T_SelectStmt: retval = _copySelectStmt(from); - break; + break; + case T_MergeStmt: + retval = _copyMergeStmt(from); + break; + case T_MergeConditionAction: + retval = _copyMergeConditionAction(from); + break; + case T_MergeUpdate: + retval = _copyMergeUpdate(from); + break; + case T_MergeInsert: + retval = _copyMergeInsert(from); + break; + case T_MergeDelete: + retval = _copyMergeDelete(from); + break; case T_SetOperationStmt: retval = _copySetOperationStmt(from); break; diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 5d83727..f797f64 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -877,7 +877,7 @@ _equalQuery(Query *a, Query *b) COMPARE_NODE_FIELD(limitCount); COMPARE_NODE_FIELD(rowMarks); COMPARE_NODE_FIELD(setOperations); - + return true; } diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 79baf4f..dae9f31 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -255,6 +255,7 @@ _outPlannedStmt(StringInfo str, PlannedStmt *node) WRITE_NODE_FIELD(relationOids); WRITE_NODE_FIELD(invalItems); WRITE_INT_FIELD(nParamExec); + WRITE_NODE_FIELD(mergeActPlan); } /* @@ -2019,9 +2020,51 @@ _outQuery(StringInfo str, Query *node) WRITE_NODE_FIELD(limitCount); WRITE_NODE_FIELD(rowMarks); WRITE_NODE_FIELD(setOperations); + WRITE_NODE_FIELD(mergeActQry); +} + + +static void +_outMergeConditionAction(StringInfo str, MergeConditionAction *node) +{ + WRITE_NODE_TYPE("MERGECONDITIONACTION"); + + WRITE_BOOL_FIELD(match); + + WRITE_NODE_FIELD(condition); + WRITE_NODE_FIELD(action); + + } static void +_outMergeStmt(StringInfo str, MergeStmt *node) +{ + WRITE_NODE_TYPE("MERGESTMT"); + + WRITE_NODE_FIELD(relation); + WRITE_NODE_FIELD(source); + WRITE_NODE_FIELD(matchCondition); + WRITE_NODE_FIELD(actions); + +} + +static void +_outDeleteStmt(StringInfo str, DeleteStmt *node) +{ + WRITE_NODE_TYPE("DELETESTMT"); + + WRITE_NODE_FIELD(relation); + WRITE_NODE_FIELD(usingClause); + WRITE_NODE_FIELD(whereClause); + WRITE_NODE_FIELD(returningList); + + +} + + + +static void _outSortGroupClause(StringInfo str, SortGroupClause *node) { WRITE_NODE_TYPE("SORTGROUPCLAUSE"); @@ -2905,6 +2948,17 @@ _outNode(StringInfo str, void *obj) _outXmlSerialize(str, obj); break; + case T_MergeStmt: + _outMergeStmt(str, obj); + break; + case T_MergeConditionAction: + _outMergeConditionAction(str,obj); + break; + case T_DeleteStmt: + _outDeleteStmt(str,obj); + break; + + default: /* diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 3950ab4..04bb60d 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -251,6 +251,25 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) result->invalItems = glob->invalItems; result->nParamExec = list_length(glob->paramlist); + /*do a simple plan for each actions in the merge command. + //put them in mergeActPlan list; + */ + if(parse->commandType == CMD_MERGE) + { + ListCell *l; + + + foreach(l, parse->mergeActQry) + { + + Plan *actplan = merge_action_planner(glob, (Query *)lfirst(l), NULL, + false, tuple_fraction, &root); + + result->mergeActPlan = lappend(result->mergeActPlan, actplan); + } + + } + return result; } @@ -584,6 +603,74 @@ subquery_planner(PlannerGlobal *glob, Query *parse, return plan; } + + +Plan * +merge_action_planner(PlannerGlobal *glob, Query *parse, + PlannerInfo *parent_root, + bool hasRecursion, double tuple_fraction, + PlannerInfo **subroot) +{ + PlannerInfo *root; + Plan *plan; + + + + /* Create a PlannerInfo data structure for this subquery */ + root = makeNode(PlannerInfo); + root->parse = parse; + root->glob = glob; + root->query_level = 1; + root->parent_root = NULL; + root->planner_cxt = CurrentMemoryContext; + root->init_plans = NIL; + root->cte_plan_ids = NIL; + root->eq_classes = NIL; + root->append_rel_list = NIL; + + root->hasRecursion = false; + root->wt_param_id = -1; + root->non_recursive_plan = NULL; + + + + + /* + * no having clause in a merge action + */ + Assert(parse->havingQual == NULL); + + /* Clear this flag; might get set in distribute_qual_to_rels */ + root->hasPseudoConstantQuals = false; + + /* + * Do expression preprocessing on targetlist and quals. + */ + parse->targetList = (List *) + preprocess_expression(root, (Node *) parse->targetList, + EXPRKIND_TARGET); + + + preprocess_qual_conditions(root, (Node *) parse->jointree); + + + + + /* + * Do the main planning. If we have an inherited target relation, that + * needs special processing, else go straight to grouping_planner. + */ + if (parse->resultRelation && + rt_fetch(parse->resultRelation, parse->rtable)->inh) + plan = inheritance_planner(root); + else + plan = grouping_planner(root, tuple_fraction); + + + return plan; +} + + /* * preprocess_expression * Do subquery_planner's preprocessing work for an expression, diff --git a/src/backend/optimizer/prep/preptlist.c b/src/backend/optimizer/prep/preptlist.c index 59d3518..adf8795 100644 --- a/src/backend/optimizer/prep/preptlist.c +++ b/src/backend/optimizer/prep/preptlist.c @@ -84,7 +84,7 @@ preprocess_targetlist(PlannerInfo *root, List *tlist) * delete. This extra field is marked "junk" so that it is not stored * back into the tuple. */ - if (command_type == CMD_UPDATE || command_type == CMD_DELETE) + if (command_type == CMD_UPDATE || command_type == CMD_DELETE || command_type == CMD_MERGE) { TargetEntry *tle; Var *var; diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c index 6b99a10..8c6af99 100644 --- a/src/backend/parser/analyze.c +++ b/src/backend/parser/analyze.c @@ -47,6 +47,8 @@ static Query *transformDeleteStmt(ParseState *pstate, DeleteStmt *stmt); static Query *transformInsertStmt(ParseState *pstate, InsertStmt *stmt); static List *transformInsertRow(ParseState *pstate, List *exprlist, List *stmtcols, List *icolumns, List *attrnos); + +static Query *transformMergeStmt(ParseState *pstate, MergeStmt *stmt); static Query *transformSelectStmt(ParseState *pstate, SelectStmt *stmt); static Query *transformValuesClause(ParseState *pstate, SelectStmt *stmt); static Query *transformSetOperationStmt(ParseState *pstate, SelectStmt *stmt); @@ -164,17 +166,24 @@ transformStmt(ParseState *pstate, Node *parseTree) * Optimizable statements */ case T_InsertStmt: + case T_MergeInsert: result = transformInsertStmt(pstate, (InsertStmt *) parseTree); break; case T_DeleteStmt: + case T_MergeDelete: result = transformDeleteStmt(pstate, (DeleteStmt *) parseTree); break; case T_UpdateStmt: + case T_MergeUpdate: result = transformUpdateStmt(pstate, (UpdateStmt *) parseTree); break; + case T_MergeStmt: + result = transformMergeStmt(pstate, (MergeStmt *)parseTree); + break; + case T_SelectStmt: { SelectStmt *n = (SelectStmt *) parseTree; @@ -282,22 +291,32 @@ transformDeleteStmt(ParseState *pstate, DeleteStmt *stmt) qry->commandType = CMD_DELETE; - /* set up range table with just the result rel */ - qry->resultRelation = setTargetTable(pstate, stmt->relation, - interpretInhOption(stmt->relation->inhOpt), - true, - ACL_DELETE); - qry->distinctClause = NIL; + /* - * The USING clause is non-standard SQL syntax, and is equivalent in - * functionality to the FROM list that can be specified for UPDATE. The - * USING keyword is used rather than FROM because FROM is already a - * keyword in the DELETE syntax. - */ - transformFromClause(pstate, stmt->usingClause); + * The input stmt could be a MergeDelete node. + * In this case, we don't need the process on range table. + */ + if(IsA(stmt, DeleteStmt)) + { + /* set up range table with just the result rel */ + qry->resultRelation = setTargetTable(pstate, stmt->relation, + interpretInhOption(stmt->relation->inhOpt), + true, + ACL_DELETE); + + + /* + * The USING clause is non-standard SQL syntax, and is equivalent in + * functionality to the FROM list that can be specified for UPDATE. The + * USING keyword is used rather than FROM because FROM is already a + * keyword in the DELETE syntax. + */ + transformFromClause(pstate, stmt->usingClause); + } + qual = transformWhereClause(pstate, stmt->whereClause, "WHERE"); qry->returningList = transformReturningList(pstate, stmt->returningList); @@ -347,6 +366,8 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt) * VALUES list, or general SELECT input. We special-case VALUES, both for * efficiency and so we can handle DEFAULT specifications. */ + + /*a MergeInsert statment is always a VALUE clause*/ isGeneralSelect = (selectStmt && selectStmt->valuesLists == NIL); /* @@ -382,7 +403,8 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt) * mentioned in the SELECT part. Note that the target table is not added * to the joinlist or namespace. */ - qry->resultRelation = setTargetTable(pstate, stmt->relation, + if(IsA(stmt,InsertStmt))/*for MergeInsert, no need to do this*/ + qry->resultRelation = setTargetTable(pstate, stmt->relation, false, false, ACL_INSERT); /* Validate stmt->cols list, or build default list if no list given */ @@ -1730,17 +1752,19 @@ transformUpdateStmt(ParseState *pstate, UpdateStmt *stmt) qry->commandType = CMD_UPDATE; pstate->p_is_update = true; - qry->resultRelation = setTargetTable(pstate, stmt->relation, - interpretInhOption(stmt->relation->inhOpt), - true, - ACL_UPDATE); - - /* - * the FROM clause is non-standard SQL syntax. We used to be able to do - * this with REPLACE in POSTQUEL so we keep the feature. - */ - transformFromClause(pstate, stmt->fromClause); + if(IsA(stmt, UpdateStmt))/*for MergeUpdate, no need to do this*/ + { + qry->resultRelation = setTargetTable(pstate, stmt->relation, + interpretInhOption(stmt->relation->inhOpt), + true, + ACL_UPDATE); + /* + * the FROM clause is non-standard SQL syntax. We used to be able to do + * this with REPLACE in POSTQUEL so we keep the feature. + */ + transformFromClause(pstate, stmt->fromClause); + } qry->targetList = transformTargetList(pstate, stmt->targetList); qual = transformWhereClause(pstate, stmt->whereClause, "WHERE"); @@ -2241,3 +2265,336 @@ applyLockingClause(Query *qry, Index rtindex, rc->pushedDown = pushedDown; qry->rowMarks = lappend(qry->rowMarks, rc); } + + + +/*transform a action of merge command into a query. +No change of the pstate range table is allowed in this function. +*/ +static Query * +transformMergeActions(ParseState *pstate, MergeStmt *stmt, MergeConditionAction *condact) +{ + Query *actqry = makeNode(Query); + A_Expr *match_expr; //the expr of matched/not matched + A_Expr *act_qual_expr; + + /*firstly, we need to make sure that DELETE and UPDATE actions are only taken in MATCHED condition + and INSERTs are only takend when not MATCHED + */ + if(IsA(condact->action, MergeDelete)) + { + if(!condact->match) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("The DELETE action in MERGE command is not allowed when NOT MATCHED"))); + } + else if(IsA(condact->action, MergeUpdate)) + { + if(!condact->match) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("The UPDATE action in MERGE command is not allowed when NOT MATCHED"))); + } + else if(IsA(condact->action, MergeInsert)) + { + if(condact->match) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("The INSERT action in MERGE command is not allowed when MATCHED"))); + } + else + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("UNKONWN action type in MERGE"))); + + + + /*combine the condtion of this act with the ON qual of the merge command + do a copy of the merge condtion for safety. + */ + + /* + if(condact->match) + match_expr = copyObject(stmt->matchCondition); + else + match_expr = makeA_Expr(AEXPR_NOT, NIL, NULL, + copyObject(stmt->matchCondition), 1); + + + if(condact->condition) + act_qual_expr = makeA_Expr(AEXPR_AND, NIL, condact->condition, (Node *)match_expr, 2); + else + act_qual_expr = match_expr; + + */ + + act_qual_expr = condact->condition; + + /*use the transfomStmt() to parse all types of actions*/ + if(IsA(condact->action, MergeDelete)) + { + /*a delete action*/ + MergeDelete *deleteact = (MergeDelete *)(condact->action); + Assert(IsA(deleteact,DeleteStmt)); + + deleteact->relation = stmt->relation; + deleteact->usingClause = stmt->source; + deleteact->whereClause = (Node *)act_qual_expr; + + /*parse the action query*/ + actqry = transformStmt(pstate, (Node *)deleteact); + + if(!IsA(actqry, Query) || actqry->commandType != CMD_DELETE || actqry->utilityStmt != NULL) + elog(ERROR, "improper DELETE action in merge stmt"); + + return actqry; + } + else if(IsA(condact->action, MergeUpdate)) + { + /*an update action*/ + MergeUpdate *updateact = (MergeUpdate *)(condact->action); + + /*the "targetlist" of the updateact is filled in the parser */ + updateact->relation = stmt->relation; + updateact->fromClause = stmt->source; + updateact->whereClause = (Node *)act_qual_expr; + + /*parse the action query*/ + actqry = transformStmt(pstate, (Node *)updateact); + + if(!IsA(actqry, Query) || actqry->commandType != CMD_UPDATE|| actqry->utilityStmt != NULL) + elog(ERROR, "improper UPDATE action in merge stmt"); + + return actqry; + } + else if(IsA(condact->action, MergeInsert)) + { + /*an insert action*/ + Node *qual; + MergeInsert *insertact; + + insertact = (MergeInsert *)(condact->action); + + + /*the "cols" and "selectStmt" of the insertact is filled in the parser */ + insertact->relation = stmt->relation; + + /* + the merge insert action has a strange feature. + In an ordinary INSERT, the VALUES list can only contains constants and DEFAULT. (am I right??) + But in the INSERT action of MERGE command, the VALUES list can have expressions with variables(attributes of the targe and source tables). + Besides, in the ordinary INSERT, a VALUES list can never be followed by a WHERE clause. But in MERGE INSERT action, there are matching conditions. + + Thus, the output qry of this function is an INSERT query in the style of "INSERT...VALUES...", except that we have other range tables and a WHERE clause. + Note that it is also different from the "INSERT ... SELECT..." query, in which the whole SELECT is a subquery. (We don't have subquery here). + We construct this novel query structure in order to keep consitency with other merge action types (DELETE, UPDATE). + In this way, all the merge action queries are in fact share the very same Range Table, They only differs in their target lists and join trees + + */ + + + /*parse the action query, this will call transformInsertStmt() which analyzes the VALUES list.*/ + actqry = transformStmt(pstate, (Node *)insertact); + + /*do the WHERE clause here, Since the transformInsertStmt() function only analyzes the VALUES list but not the WHERE clause*/ + qual = transformWhereClause(pstate,(Node *)act_qual_expr, "WHERE"); + + actqry->jointree = makeFromExpr(pstate->p_joinlist, qual); + + + if(!IsA(actqry, Query) || actqry->commandType != CMD_INSERT|| actqry->utilityStmt != NULL) + elog(ERROR, "improper INSERT action in merge stmt"); + + + return actqry; + } + else + elog(ERROR, "unknown action type in MERGE"); + + /*never comes here*/ + return NULL; +} + + + +static Query * +transformMergeStmt(ParseState *pstate, MergeStmt *stmt) +{ + Query *qry; + + ColumnRef *starRef; + ResTarget *starResTarget; + ListCell *act; + ListCell *l; + JoinExpr *joinexp; + int rtindex; + + /*this will never happen, since the garm.y is restricted that only one rel name is allowed to appear in the source table position. + However, if we extent the command in future, we may need to note this check here. + */ + if(list_length(stmt->source) != 1) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("now we only accept merge command with only ONE source table"))); + + /*now, do the real tranformation of the merge command. */ + qry = makeNode(Query); + qry->commandType = CMD_MERGE; + + /* + What we are doing here is to create a query like + "SELECT * FROM LEFT JOIN ON ;" + Note: + 1. we set the "match condition" as the join qualification. + The left join will scan both the matched and non-matched tuples. + + 2. a normal SELECT query has no "target relation". + But here we need to set the targe relation in query, like the UPDATE/DELETE/INSERT queries. + So this is a left join SELECT with a "target table" in its range table. + + 3. We don't have a specific ACL level for Merge, here we just use ACL_SELECT. + But we will add other ACL levels when handle each merge actions. + */ + + + /*before analyze the FROM clause, we need to set the target table. + We cannot call setTargetTable() function directly. + We only need the lock target relation, without adding it to Range table. + */ + + + setTargetTableLock(pstate, stmt->relation); + + + + /*create the FROM clause. Make the join expression first*/ + joinexp = makeNode(JoinExpr); + joinexp->jointype = JOIN_LEFT; + joinexp->isNatural = FALSE; + joinexp->larg = linitial(stmt->source);//source list has only one element + joinexp->rarg = (Node *)stmt->relation; + joinexp->quals = stmt->matchCondition; //match condtion + + /*transform the FROM clause. The target relation and source relation will be add to Rtable here. */ + transformFromClause(pstate, list_make1(joinexp)); + + /*the targetList of the main query is "*" + */ + starRef = makeNode(ColumnRef); + starRef->fields = list_make1(makeNode(A_Star)); + starRef->location = 1; + + starResTarget = makeNode(ResTarget); + starResTarget->name = NULL; + starResTarget->indirection = NIL; + starResTarget->val = (Node *)starRef; + starResTarget->location = 1; + + qry->targetList = transformTargetList(pstate, list_make1(starResTarget)); + + /*we don't need the WHERE clause here. Set it null. */ + qry->jointree = makeFromExpr(pstate->p_joinlist, NULL); + + /*now , we find out the RTE for the target relation, and do some unfinished jobs*/ + rtindex = 1; + foreach(l, pstate->p_rtable) + { + RangeTblEntry *rte = (RangeTblEntry *)lfirst(l); + if(rte->relid == pstate->p_target_relation->rd_id) /*find the RTE*/ + { + pstate->p_target_rangetblentry = rte; + rte->requiredPerms = ACL_SELECT; + qry->resultRelation = rtindex; + break; + } + rtindex++; + } + + if(pstate->p_target_rangetblentry == NULL) + elog(ERROR, "cannot find the RTE for target table"); + + + qry->rtable = pstate->p_rtable; + + qry->hasSubLinks = pstate->p_hasSubLinks; + + /* + * Top-level aggregates are simply disallowed in MERGE + */ + if (pstate->p_hasAggs) + ereport(ERROR, + (errcode(ERRCODE_GROUPING_ERROR), + errmsg("cannot use aggregate function in top level of MERGE"), + parser_errposition(pstate, + locate_agg_of_level((Node *) qry, 0)))); + if (pstate->p_hasWindowFuncs) + ereport(ERROR, + (errcode(ERRCODE_WINDOWING_ERROR), + errmsg("cannot use window function in MERGE"), + parser_errposition(pstate, + locate_windowfunc((Node *) qry)))); + + + + + /* + the main query is done. + then for each actions ,we transform it to a seperate query. + the action queries shares the exactly same range table with the main query. + in other words, in the extra condtions of the sub actions, we don't allow involvement of new tables + */ + + qry->mergeActQry = NIL; + + foreach(act,stmt->actions) + { + MergeConditionAction *mca = lfirst(act); + Query *actqry; + + switch(mca->action->type) + { + case T_MergeDelete: + pstate->p_target_rangetblentry->requiredPerms |= ACL_DELETE; + break; + case T_MergeUpdate: + pstate->p_target_rangetblentry->requiredPerms |= ACL_UPDATE; + break; + case T_MergeInsert: + pstate->p_target_rangetblentry->requiredPerms |= ACL_INSERT; + break; + default: + elog(ERROR, "unknown MERGE action type %d", mca->type); + break; + + } + + + /*transform the act (and its condition) as a single query. Link it to the top-level query*/ + actqry = transformMergeActions(pstate, stmt, mca); + + /*since we don't invoke setTargetTable() in transformMergeActions(), we need to set actqry->resultRelation here + */ + actqry->resultRelation = qry->resultRelation; + + + qry->mergeActQry = lappend(qry->mergeActQry, actqry); + } + + + /*for a single-action merge, we just stransform it into a orignial update/delete command. + but the insert action cannot take this shortcut. + */ + /* + if(list_length(stmt->actions) == 1) + { + Query *q = linitial(qry->mergeActQry); + if(q->commandType == CMD_DELETE || q->commandType == CMD_UPDATE) + return q; + } + */ + return qry; + + +} + + diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 3f6eeeb..c139bd7 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -212,6 +212,12 @@ static TypeName *TableFuncTypeName(List *columns); DropOwnedStmt ReassignOwnedStmt AlterTSConfigurationStmt AlterTSDictionaryStmt +%type MergeStmt opt_and_condition merge_condition_action merge_action +%type opt_not +%type merge_condition_action_list + + + %type select_no_parens select_with_parens select_clause simple_select values_clause @@ -233,7 +239,8 @@ static TypeName *TableFuncTypeName(List *columns); %type opt_force opt_or_replace opt_grant_grant_option opt_grant_admin_option opt_nowait opt_if_exists opt_with_data - + + %type OptRoleList AlterOptRoleList %type CreateOptRoleElem AlterOptRoleElem @@ -504,6 +511,8 @@ static TypeName *TableFuncTypeName(List *columns); MAPPING MATCH MAXVALUE MINUTE_P MINVALUE MODE MONTH_P MOVE + MATCHED MERGE + NAME_P NAMES NATIONAL NATURAL NCHAR NEXT NO NOCREATEDB NOCREATEROLE NOCREATEUSER NOINHERIT NOLOGIN_P NONE NOSUPERUSER NOT NOTHING NOTIFY NOTNULL NOWAIT NULL_P NULLIF NULLS_P NUMERIC @@ -725,6 +734,7 @@ stmt : | ListenStmt | LoadStmt | LockStmt + | MergeStmt | NotifyStmt | PrepareStmt | ReassignOwnedStmt @@ -7297,6 +7307,101 @@ set_target_list: /***************************************************************************** * * QUERY: + * MERGE STATEMENT + * + *****************************************************************************/ + + +MergeStmt: + MERGE INTO relation_expr_opt_alias + USING table_ref + ON a_expr + merge_condition_action_list + { + MergeStmt *m = makeNode(MergeStmt); + + m->relation = $3; + m->source = list_make1($5); /*although we have only one USING table, but we still make it a list, maybe in future we will allow mutliple USING tables.*/ + m->matchCondition = $7; + m->actions = $8; + + $$ = (Node *)m; + } + ; + +merge_condition_action_list: + merge_condition_action + { $$ = list_make1($1); } + | merge_condition_action_list merge_condition_action + { $$ = lappend($1,$2); } + ; + +merge_condition_action: + WHEN opt_not MATCHED opt_and_condition THEN merge_action + { + MergeConditionAction *m = makeNode(MergeConditionAction); + + m->match = $2; + m->condition = $4; + m->action = $6; + + $$ = (Node *)m; + } + ; + + +opt_and_condition: + AND a_expr {$$ = $2;} + | /*EMPTY*/ {$$ = NULL;} + ; + +opt_not: + NOT {$$ = false;} + | /*EMPTY*/ {$$ = true;} + ; + +merge_action: + DELETE_P + {$$ = (Node *)makeNode(MergeDelete);} + | UPDATE SET set_clause_list + { + UpdateStmt *n = makeNode(MergeUpdate); + n->targetList = $3; + $$ = (Node *)n; + } + | INSERT values_clause + { + InsertStmt *n = makeNode(MergeInsert); + n->cols = NIL; + n->selectStmt = $2; + + $$ = (Node *)n; + } + + | INSERT '(' insert_column_list ')' values_clause + { + InsertStmt *n = makeNode(MergeInsert); + n->cols = $3; + n->selectStmt = $5; + + $$ = (Node *)n; + } + | INSERT DEFAULT VALUES + { + InsertStmt *n = makeNode(MergeInsert); + n->cols = NIL; + n->selectStmt = NULL; + + $$ = (Node *)n; + } + + ; + + + +/***************************************************************************** + * + * QUERY: * CURSOR STATEMENTS * *****************************************************************************/ @@ -10935,7 +11040,9 @@ unreserved_keyword: | LOGIN_P | MAPPING | MATCH + | MATCHED | MAXVALUE + | MERGE | MINUTE_P | MINVALUE | MODE diff --git a/src/backend/parser/parse_clause.c b/src/backend/parser/parse_clause.c index f30132a..657dd9c 100644 --- a/src/backend/parser/parse_clause.c +++ b/src/backend/parser/parse_clause.c @@ -213,6 +213,29 @@ setTargetTable(ParseState *pstate, RangeVar *relation, return rtindex; } + +void +setTargetTableLock(ParseState *pstate, RangeVar *relation) +{ + + /* Close old target; this could only happen for multi-action rules */ + if (pstate->p_target_relation != NULL) + heap_close(pstate->p_target_relation, NoLock); + + /* + * Open target rel and grab suitable lock (which we will hold till end of + * transaction). + * + * free_parsestate() will eventually do the corresponding heap_close(), + * but *not* release the lock. + */ + pstate->p_target_relation = parserOpenTable(pstate, relation, + RowExclusiveLock); + + +} + + /* * Simplify InhOption (yes/no/default) into boolean yes/no. * diff --git a/src/backend/rewrite/rewriteHandler.c b/src/backend/rewrite/rewriteHandler.c index 25b44dd..5567188 100644 --- a/src/backend/rewrite/rewriteHandler.c +++ b/src/backend/rewrite/rewriteHandler.c @@ -1837,6 +1837,45 @@ RewriteQuery(Query *parsetree, List *rewrite_events) } +#define insert_rewrite (1<<0) /*insert merge action has already been processed by rewriter*/ +#define delete_rewrite (1<<1) +#define update_rewrite (1<<2) + +#define insert_instead (1<<3) /*insert merge action is fully replace by rules.*/ +#define delete_instead (1<<4) +#define update_instead (1<<5) + + +#define merge_action_already_rewrite(acttype, flag) \ + ((acttype == CMD_INSERT && (flag & insert_rewrite)) || \ + (acttype == CMD_UPDATE && (flag & update_rewrite)) || \ + (acttype == CMD_DELETE && (flag & delete_rewrite))) + + +#define set_action_rewrite(acttype, flag) \ + if(acttype == CMD_INSERT) \ + {flag |= insert_rewrite;}\ + else if(acttype == CMD_UPDATE) \ + {flag |= update_rewrite;}\ + else if(acttype == CMD_DELETE) \ + {flag |= delete_rewrite;} + + + +#define merge_action_instead(acttype, flag) \ + ((acttype == CMD_INSERT && (flag & insert_instead)) || \ + (acttype == CMD_UPDATE && (flag & update_instead)) || \ + (acttype == CMD_DELETE && (flag & delete_instead))) + +#define set_action_instead(acttype, flag)\ + if(acttype == CMD_INSERT) \ + {flag |= insert_instead;}\ + else if(acttype == CMD_UPDATE) \ + {flag |= update_instead;}\ + else if(acttype == CMD_DELETE) \ + {flag |= delete_instead;} + + /* * QueryRewrite - * Primary entry point to the query rewriter. @@ -1861,7 +1900,131 @@ QueryRewrite(Query *parsetree) * * Apply all non-SELECT rules possibly getting 0 or many queries */ - querylist = RewriteQuery(parsetree, NIL); + + + if(parsetree->commandType == CMD_MERGE) + { + /*for merge query, we have a set of lower-level action queries (not subquery). + //each of these action queries should be applied to RewriteQuery(). + */ + ListCell *cell, + *prev, + *next; + + int flag = 0; + + + List *pre_qry = NIL; + List *post_qry = NIL; + + + querylist = NIL; + + + /*rewrite the merge action queries one by one.*/ + prev = NULL; + + for (cell = list_head(parsetree->mergeActQry); cell; cell = next) + { + List *queryList4action = NIL; + Query *actionqry; + Query *q; + + + actionqry = lfirst(cell); + + next = lnext(cell); + + /*if this kind of actions are fully replaced by rules, we delete it from the action list*/ + if(merge_action_instead(actionqry->commandType, flag)) + { + parsetree->mergeActQry = list_delete_cell(parsetree->mergeActQry, cell, prev); + continue; + } + + + /*if this kind of actions are already processed by rewriter, skip it.*/ + if(merge_action_already_rewrite(actionqry->commandType, flag)) + { + prev = cell; + continue; + } + + /*ok this action has not been processed before, let's do it now.*/ + + queryList4action = RewriteQuery(actionqry, NIL); + set_action_rewrite(actionqry->commandType,flag); /*this kind of actions has been processed.*/ + + /*if the returning list is nil, this merge action is replaced by a do-nothing rule*/ + if(queryList4action == NIL) + { + /*set the flag for other merge actions of the same type*/ + set_action_instead(actionqry->commandType, flag); + + /*delete the action.*/ + parsetree->mergeActQry = list_delete_cell(parsetree->mergeActQry, cell, prev); + continue; + } + + + /*the merge action query could be one of the elements in the rewriten list. + //if it is in the list, it must be the head or tail. + */ + q = (Query *)linitial(queryList4action); + if(q->querySource == QSRC_ORIGINAL) + { + /*the merge action is the head, the remaining part of the list are the queries generated by rules + //we put them in the post_qry list. + */ + if(querylist == NIL) + querylist = list_make1(parsetree); + + queryList4action = list_delete_first(queryList4action); + post_qry = list_concat(post_qry,queryList4action); + prev = cell; + continue; + + } + + + q = (Query *)llast(queryList4action); + if(q->querySource == QSRC_ORIGINAL) + { + /*the merge action is the tail. Put the rule queries in pre_qry list*/ + + if(querylist == NIL) + querylist = list_make1(parsetree); + + queryList4action = list_truncate(queryList4action,list_length(queryList4action)-1); + + pre_qry = list_concat(pre_qry,queryList4action); + + prev = cell; + continue; + + } + + + /*here, the merge action query is not in the rewriten query list, which means the action should be deleted + //It is replaced by INSTEAD rule(s). We need to delete the action + */ + post_qry = list_concat(post_qry,queryList4action); + set_action_instead(actionqry->commandType, flag); + parsetree->mergeActQry = list_delete_cell(parsetree->mergeActQry, cell, prev); + } + + + + /*finally, put the 3 lists into one. + If all the merge actions are replaced by rules, the original merge query + will not be involved in the querylist. + */ + querylist = list_concat(pre_qry,querylist); + querylist = list_concat(querylist, post_qry); + + } + else + querylist = RewriteQuery(parsetree, NIL); /* * Step 2 diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 8960246..2733e5d 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -125,6 +125,7 @@ CommandIsReadOnly(Node *parsetree) case CMD_UPDATE: case CMD_INSERT: case CMD_DELETE: + case CMD_MERGE: return false; default: elog(WARNING, "unrecognized commandType: %d", @@ -1398,6 +1399,10 @@ CreateCommandTag(Node *parsetree) tag = "SELECT"; break; + case T_MergeStmt: + tag = "MERGE"; + break; + /* utility statements --- same whether raw or cooked */ case T_TransactionStmt: { @@ -2235,6 +2240,7 @@ GetCommandLogLevel(Node *parsetree) case T_InsertStmt: case T_DeleteStmt: case T_UpdateStmt: + case T_MergeStmt: lev = LOGSTMT_MOD; break; diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 9fe6e4c..8b75855 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -194,6 +194,7 @@ extern DestReceiver *CreateIntoRelDestReceiver(void); * prototypes from functions in execProcnode.c */ extern PlanState *ExecInitNode(Plan *node, EState *estate, int eflags); +extern PlanState *ExecInitMergeActNode(Plan *node, EState *estate, int eflags); extern TupleTableSlot *ExecProcNode(PlanState *node); extern Node *MultiExecProcNode(PlanState *node); extern void ExecEndNode(PlanState *node); diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index a5f5df5..38bea7c 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -261,6 +261,15 @@ typedef enum NodeTag T_DeleteStmt, T_UpdateStmt, T_SelectStmt, + + + T_MergeStmt, + T_MergeConditionAction, + T_MergeUpdate, + T_MergeDelete, + T_MergeInsert, + + T_AlterTableStmt, T_AlterTableCmd, T_AlterDomainStmt, @@ -511,6 +520,7 @@ typedef enum CmdType CMD_UPDATE, /* update stmt */ CMD_INSERT, /* insert stmt */ CMD_DELETE, + CMD_MERGE, /*merge stmt*/ CMD_UTILITY, /* cmds like create, destroy, copy, vacuum, * etc. */ CMD_NOTHING /* dummy command for instead nothing rules diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index b591073..392d7f7 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -146,6 +146,9 @@ typedef struct Query Node *setOperations; /* set-operation tree if this is top level of * a UNION/INTERSECT/EXCEPT query */ + + List *mergeActQry; /* the list of all the merge actions. + * used only for merge query statment*/ } Query; @@ -990,6 +993,30 @@ typedef struct SelectStmt /* Eventually add fields for CORRESPONDING spec here */ } SelectStmt; +/*ZBX: the structure for MERGE command statement*/ +typedef struct MergeStmt +{ + NodeTag type; + RangeVar *relation; /*targe relation for merge */ + List *source; /* source relations for the merge. Currently, we only allwo single-source merge, so the length of this list should always be 1 */ + Node *matchCondition; /* qualifications of the merge*/ + List *actions; /*list of MergeConditionAction structure. It stores all the match / non-matching conditions and the corresponding actions*/ + +}MergeStmt; + +/*the structure for the actions of MERGE command. Holds info of the clauses like "... WHEN MATCHED AND ... THEN UPDATE/DELETE/INSERT" +*/ +typedef struct MergeConditionAction +{ + NodeTag type; + bool match; /*match or not match*/ + Node *condition;/*the AND condition for this action*/ + Node *action; /*the actions: delete , insert or update*/ +}MergeConditionAction; + +typedef UpdateStmt MergeUpdate; +typedef DeleteStmt MergeDelete; +typedef InsertStmt MergeInsert; /* ---------------------- * Set Operation node for post-analysis query trees diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 037bc0b..236b0c7 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -66,6 +66,9 @@ typedef struct PlannedStmt List *invalItems; /* other dependencies, as PlanInvalItems */ int nParamExec; /* number of PARAM_EXEC Params used */ + + List *mergeActPlan; /*the plans for merge actions*/ + } PlannedStmt; /* macro for fetching the Plan associated with a SubPlan node */ diff --git a/src/include/optimizer/planner.h b/src/include/optimizer/planner.h index 805dee7..ae705ec 100644 --- a/src/include/optimizer/planner.h +++ b/src/include/optimizer/planner.h @@ -34,6 +34,10 @@ extern Plan *subquery_planner(PlannerGlobal *glob, Query *parse, PlannerInfo *parent_root, bool hasRecursion, double tuple_fraction, PlannerInfo **subroot); +extern Plan *merge_action_planner(PlannerGlobal *glob, Query *parse, + PlannerInfo *parent_root, + bool hasRecursion, double tuple_fraction, + PlannerInfo **subroot); extern Expr *expression_planner(Expr *expr); diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index 49d4b6c..436d459 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -229,7 +229,9 @@ PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD) PG_KEYWORD("login", LOGIN_P, UNRESERVED_KEYWORD) PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD) PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD) +PG_KEYWORD("matched", MATCHED, UNRESERVED_KEYWORD) PG_KEYWORD("maxvalue", MAXVALUE, UNRESERVED_KEYWORD) +PG_KEYWORD("merge", MERGE, UNRESERVED_KEYWORD) PG_KEYWORD("minute", MINUTE_P, UNRESERVED_KEYWORD) PG_KEYWORD("minvalue", MINVALUE, UNRESERVED_KEYWORD) PG_KEYWORD("mode", MODE, UNRESERVED_KEYWORD) diff --git a/src/include/parser/parse_clause.h b/src/include/parser/parse_clause.h index f3d3ee9..e4312d8 100644 --- a/src/include/parser/parse_clause.h +++ b/src/include/parser/parse_clause.h @@ -19,6 +19,8 @@ extern void transformFromClause(ParseState *pstate, List *frmList); extern int setTargetTable(ParseState *pstate, RangeVar *relation, bool inh, bool alsoSource, AclMode requiredPerms); +extern void setTargetTableLock(ParseState *pstate, RangeVar *relation); + extern bool interpretInhOption(InhOption inhOpt); extern bool interpretOidsOption(List *defList);