From 86c176d94eb621dca8e19d75a4a081cc21b227fb Mon Sep 17 00:00:00 2001
From: Yugo Nagata <nagata@sraoss.co.jp>
Date: Mon, 2 Aug 2021 14:59:27 +0900
Subject: [PATCH v25 08/15] Add aggregates support in IVM
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Currently, count, sum, avg, min and max are supported.

As a restriction, expressions specified in GROUP BY must appear in
the target list because tuples to be updated in IMMV are identified
by using this group key. However, in the case of aggregates without
GROUP BY, there is only one tuple in the view, so keys are not uses
to identify tuples.

When creating a IMMV, in addition to __ivm_count column, some hidden
columns for each aggregate are added to the target list. For example,
names of these hidden columns are ivm_count_avg and ivm_sum_avg for
the average function, and so on.

In the case of views without aggregate functions, only the number of
tuple multiplicities in __ivm_count__ column are updated at incremental
maintenance. On the other hand, in the case of view with aggregates,
the aggregated values and related hidden columns are also updated. The
way of update depends the kind of aggregate function.　Specifically,
sum and count are updated by simply adding or subtracting delta value
calculated from delta tables. avg is updated by using values of sum
and count stored in views as hidden columns and deltas calculated
from delta tables.

In min or max cases, it becomes more complicated. For an example of min,
when tuples are inserted, the smaller value between the current min value
in the view and the value calculated from the new delta table is used.
When tuples are deleted, if the current min value in the view equals to
the min in the old delta table, we need re-computation the latest min
value from base tables. Otherwise, the current value in the view remains.

As to sum, avg, min, and max (any aggregate functions except count),
NULL in input values is ignored, and this returns a null value when no
rows are selected. To support this specification, the number of not-NULL
input values is counted and stored in views as a hidden column. In the
case of count(), count(x) returns zero when no rows are selected, and
count(*) doesn't ignore NULL input. These specification are also supported.
---
 src/backend/commands/createas.c |  299 ++++++++-
 src/backend/commands/matview.c  | 1016 ++++++++++++++++++++++++++++++-
 src/include/commands/createas.h |    1 +
 3 files changed, 1281 insertions(+), 35 deletions(-)

diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index 4b73965e56..c58cacec05 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -63,6 +63,7 @@
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
 #include "utils/lsyscache.h"
+#include "utils/regproc.h"
 #include "utils/rel.h"
 #include "utils/rls.h"
 #include "utils/snapmgr.h"
@@ -80,6 +81,11 @@ typedef struct
 	BulkInsertState bistate;	/* bulk insert state */
 } DR_intorel;
 
+typedef struct
+{
+	bool	has_agg;
+} check_ivm_restriction_context;
+
 /* utility functions for CTAS definition creation */
 static ObjectAddress create_ctas_internal(List *attrList, IntoClause *into);
 static ObjectAddress create_ctas_nodata(List *tlist, IntoClause *into);
@@ -94,8 +100,9 @@ static void CreateIvmTriggersOnBaseTablesRecurse(Query *qry, Node *node, Oid mat
 									 Relids *relids, bool ex_lock);
 static void CreateIvmTrigger(Oid relOid, Oid viewOid, int16 type, int16 timing, bool ex_lock);
 static void check_ivm_restriction(Node *node);
-static bool check_ivm_restriction_walker(Node *node, void *context);
+static bool check_ivm_restriction_walker(Node *node, check_ivm_restriction_context *context);
 static Bitmapset *get_primary_key_attnos_from_query(Query *qry, List **constraintList);
+static bool check_aggregate_supports_ivm(Oid aggfnoid);
 
 /*
  * create_ctas_internal
@@ -431,6 +438,7 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt,
  * rewriteQueryForIMMV -- rewrite view definition query for IMMV
  *
  * count(*) is added for counting distinct tuples in views.
+ * Also, additional hidden columns are added for aggregate values.
  */
 Query *
 rewriteQueryForIMMV(Query *query, List *colNames)
@@ -445,14 +453,46 @@ rewriteQueryForIMMV(Query *query, List *colNames)
 	rewritten = copyObject(query);
 	pstate->p_expr_kind = EXPR_KIND_SELECT_TARGET;
 
-	/*
-	 * Convert DISTINCT to GROUP BY and add count(*) for counting distinct
-	 * tuples in views.
-	 */
-	if (rewritten->distinctClause)
+	/* group keys must be in targetlist */
+	if (rewritten->groupClause)
 	{
+		ListCell *lc;
+		foreach(lc, rewritten->groupClause)
+		{
+			SortGroupClause *scl = (SortGroupClause *) lfirst(lc);
+			TargetEntry *tle = get_sortgroupclause_tle(scl, rewritten->targetList);
+
+			if (tle->resjunk)
+				ereport(ERROR,
+						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+						 errmsg("GROUP BY expression not appearing in select list is not supported on incrementally maintainable materialized view")));
+		}
+	}
+	/* Convert DISTINCT to GROUP BY.  count(*) will be added afterward. */
+	else if (!rewritten->hasAggs && rewritten->distinctClause)
 		rewritten->groupClause = transformDistinctClause(NULL, &rewritten->targetList, rewritten->sortClause, false);
 
+	/* Add additional columns for aggregate values */
+	if (rewritten->hasAggs)
+	{
+		ListCell *lc;
+		List *aggs = NIL;
+		AttrNumber next_resno = list_length(rewritten->targetList) + 1;
+
+		foreach(lc, rewritten->targetList)
+		{
+			TargetEntry *tle = (TargetEntry *) lfirst(lc);
+			char *resname = (colNames == NIL ? tle->resname : strVal(list_nth(colNames, tle->resno - 1)));
+
+			if (IsA(tle->expr, Aggref))
+				makeIvmAggColumn(pstate, (Aggref *)tle->expr, resname, &next_resno, &aggs);
+		}
+		rewritten->targetList = list_concat(rewritten->targetList, aggs);
+	}
+
+	/* Add count(*) for counting distinct tuples in views */
+	if (rewritten->distinctClause || rewritten->hasAggs)
+	{
 		fn = makeFuncCall(list_make1(makeString("count")), NIL, COERCE_EXPLICIT_CALL, -1);
 		fn->agg_star = true;
 
@@ -469,6 +509,91 @@ rewriteQueryForIMMV(Query *query, List *colNames)
 	return rewritten;
 }
 
+/*
+ * makeIvmAggColumn -- make additional aggregate columns for IVM
+ *
+ * For an aggregate column specified by aggref, additional aggregate columns
+ * are added, which are used to calculate the new aggregate value in IMMV.
+ * An additional aggregate columns has a name based on resname
+ * (ex. ivm_count_resname), and resno specified by next_resno. The created
+ * columns are returned to aggs, and the resno for the next column is also
+ * returned to next_resno.
+ *
+ * Currently, an additional count() is created for aggref other than count.
+ * In addition, sum() is created for avg aggregate column.
+ */
+void
+makeIvmAggColumn(ParseState *pstate, Aggref *aggref, char *resname, AttrNumber *next_resno, List **aggs)
+{
+	TargetEntry *tle_count;
+	Node *node;
+	FuncCall *fn;
+	Const	*dmy_arg = makeConst(INT4OID,
+								 -1,
+								 InvalidOid,
+								 sizeof(int32),
+								 Int32GetDatum(1),
+								 false,
+								 true); /* pass by value */
+	const char *aggname = get_func_name(aggref->aggfnoid);
+
+	/*
+	 * For aggregate functions except count, add count() func with the same arg parameters.
+	 * This count result is used for determining if the aggregate value should be NULL or not.
+	 * Also, add sum() func for avg because we need to calculate an average value as sum/count.
+	 *
+	 * XXX: If there are same expressions explicitly in the target list, we can use this instead
+	 * of adding new duplicated one.
+	 */
+	if (strcmp(aggname, "count") != 0)
+	{
+		fn = makeFuncCall(list_make1(makeString("count")), NIL, COERCE_EXPLICIT_CALL, -1);
+
+		/* Make a Func with a dummy arg, and then override this by the original agg's args. */
+		node = ParseFuncOrColumn(pstate, fn->funcname, list_make1(dmy_arg), NULL, fn, false, -1);
+		((Aggref *)node)->args = aggref->args;
+
+		tle_count = makeTargetEntry((Expr *) node,
+									*next_resno,
+									pstrdup(makeObjectName("__ivm_count",resname, "_")),
+									false);
+		*aggs = lappend(*aggs, tle_count);
+		(*next_resno)++;
+	}
+	if (strcmp(aggname, "avg") == 0)
+	{
+		List *dmy_args = NIL;
+		ListCell *lc;
+		foreach(lc, aggref->aggargtypes)
+		{
+			Oid		typeid = lfirst_oid(lc);
+			Type	type = typeidType(typeid);
+
+			Const *con = makeConst(typeid,
+								   -1,
+								   typeTypeCollation(type),
+								   typeLen(type),
+								   (Datum) 0,
+								   true,
+								   typeByVal(type));
+			dmy_args = lappend(dmy_args, con);
+			ReleaseSysCache(type);
+		}
+		fn = makeFuncCall(list_make1(makeString("sum")), NIL, COERCE_EXPLICIT_CALL, -1);
+
+		/* Make a Func with dummy args, and then override this by the original agg's args. */
+		node = ParseFuncOrColumn(pstate, fn->funcname, dmy_args, NULL, fn, false, -1);
+		((Aggref *)node)->args = aggref->args;
+
+		tle_count = makeTargetEntry((Expr *) node,
+									*next_resno,
+									pstrdup(makeObjectName("__ivm_sum",resname, "_")),
+									false);
+		*aggs = lappend(*aggs, tle_count);
+		(*next_resno)++;
+	}
+}
+
 /*
  * GetIntoRelEFlags --- compute executor flags needed for CREATE TABLE AS
  *
@@ -922,11 +1047,13 @@ CreateIvmTrigger(Oid relOid, Oid viewOid, int16 type, int16 timing, bool ex_lock
 static void
 check_ivm_restriction(Node *node)
 {
-	check_ivm_restriction_walker(node, NULL);
+	check_ivm_restriction_context context = {false};
+
+	check_ivm_restriction_walker(node, &context);
 }
 
 static bool
-check_ivm_restriction_walker(Node *node, void *context)
+check_ivm_restriction_walker(Node *node, check_ivm_restriction_context *context)
 {
 	if (node == NULL)
 		return false;
@@ -1007,6 +1134,8 @@ check_ivm_restriction_walker(Node *node, void *context)
 					}
 				}
 
+				context->has_agg |= qry->hasAggs;
+
 				/* restrictions for rtable */
 				foreach(lc, qry->rtable)
 				{
@@ -1055,7 +1184,7 @@ check_ivm_restriction_walker(Node *node, void *context)
 
 				}
 
-				query_tree_walker(qry, check_ivm_restriction_walker, NULL, QTW_IGNORE_RANGE_TABLE);
+				query_tree_walker(qry, check_ivm_restriction_walker, (void *) context, QTW_IGNORE_RANGE_TABLE);
 
 				break;
 			}
@@ -1066,8 +1195,12 @@ check_ivm_restriction_walker(Node *node, void *context)
 						ereport(ERROR,
 								(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 								 errmsg("column name %s is not supported on incrementally maintainable materialized view", tle->resname)));
+				if (context->has_agg && !IsA(tle->expr, Aggref) && contain_aggs_of_level((Node *) tle->expr, 0))
+					ereport(ERROR,
+							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+							 errmsg("expression containing an aggregate in it is not supported on incrementally maintainable materialized view")));
 
-				expression_tree_walker(node, check_ivm_restriction_walker, NULL);
+				expression_tree_walker(node, check_ivm_restriction_walker, (void *) context);
 				break;
 			}
 		case T_JoinExpr:
@@ -1079,15 +1212,128 @@ check_ivm_restriction_walker(Node *node, void *context)
 								(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 								 errmsg("OUTER JOIN is not supported on incrementally maintainable materialized view")));
 
-				expression_tree_walker(node, check_ivm_restriction_walker, NULL);
+				expression_tree_walker(node, check_ivm_restriction_walker, (void *) context);
+				break;
 			}
-			break;
-			expression_tree_walker(node, check_ivm_restriction_walker, NULL);
+		case T_Aggref:
+			{
+				/* Check if this supports IVM */
+				Aggref *aggref = (Aggref *) node;
+				const char *aggname = format_procedure(aggref->aggfnoid);
+
+				if (aggref->aggfilter != NULL)
+					ereport(ERROR,
+							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+							 errmsg("aggregate function with FILTER clause is not supported on incrementally maintainable materialized view")));
+
+				if (aggref->aggdistinct != NULL)
+					ereport(ERROR,
+							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+							 errmsg("aggregate function with DISTINCT arguments is not supported on incrementally maintainable materialized view")));
+
+				if (aggref->aggorder != NULL)
+					ereport(ERROR,
+							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+							 errmsg("aggregate function with ORDER clause is not supported on incrementally maintainable materialized view")));
+
+				if (!check_aggregate_supports_ivm(aggref->aggfnoid))
+					ereport(ERROR,
+							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+							 errmsg("aggregate function %s is not supported on incrementally maintainable materialized view", aggname)));
+				break;
+			}
+		default:
+			expression_tree_walker(node, check_ivm_restriction_walker, (void *) context);
 			break;
 	}
 	return false;
 }
 
+/*
+ * check_aggregate_supports_ivm
+ *
+ * Check if the given aggregate function is supporting IVM
+ */
+static bool
+check_aggregate_supports_ivm(Oid aggfnoid)
+{
+	switch (aggfnoid)
+	{
+		/* count */
+		case F_COUNT_ANY:
+		case F_COUNT_:
+
+		/* sum */
+		case F_SUM_INT8:
+		case F_SUM_INT4:
+		case F_SUM_INT2:
+		case F_SUM_FLOAT4:
+		case F_SUM_FLOAT8:
+		case F_SUM_MONEY:
+		case F_SUM_INTERVAL:
+		case F_SUM_NUMERIC:
+
+		/* avg */
+		case F_AVG_INT8:
+		case F_AVG_INT4:
+		case F_AVG_INT2:
+		case F_AVG_NUMERIC:
+		case F_AVG_FLOAT4:
+		case F_AVG_FLOAT8:
+		case F_AVG_INTERVAL:
+
+		/* min */
+		case F_MIN_ANYARRAY:
+		case F_MIN_INT8:
+		case F_MIN_INT4:
+		case F_MIN_INT2:
+		case F_MIN_OID:
+		case F_MIN_FLOAT4:
+		case F_MIN_FLOAT8:
+		case F_MIN_DATE:
+		case F_MIN_TIME:
+		case F_MIN_TIMETZ:
+		case F_MIN_MONEY:
+		case F_MIN_TIMESTAMP:
+		case F_MIN_TIMESTAMPTZ:
+		case F_MIN_INTERVAL:
+		case F_MIN_TEXT:
+		case F_MIN_NUMERIC:
+		case F_MIN_BPCHAR:
+		case F_MIN_TID:
+		case F_MIN_ANYENUM:
+		case F_MIN_INET:
+		case F_MIN_PG_LSN:
+
+		/* max */
+		case F_MAX_ANYARRAY:
+		case F_MAX_INT8:
+		case F_MAX_INT4:
+		case F_MAX_INT2:
+		case F_MAX_OID:
+		case F_MAX_FLOAT4:
+		case F_MAX_FLOAT8:
+		case F_MAX_DATE:
+		case F_MAX_TIME:
+		case F_MAX_TIMETZ:
+		case F_MAX_MONEY:
+		case F_MAX_TIMESTAMP:
+		case F_MAX_TIMESTAMPTZ:
+		case F_MAX_INTERVAL:
+		case F_MAX_TEXT:
+		case F_MAX_NUMERIC:
+		case F_MAX_BPCHAR:
+		case F_MAX_TID:
+		case F_MAX_ANYENUM:
+		case F_MAX_INET:
+		case F_MAX_PG_LSN:
+			return true;
+
+		default:
+			return false;
+	}
+}
+
 /*
  * CreateIndexOnIMMV
  *
@@ -1139,7 +1385,30 @@ CreateIndexOnIMMV(Query *query, Relation matviewRel)
 	index->concurrent = false;
 	index->if_not_exists = false;
 
-	if (qry->distinctClause)
+
+	if (qry->groupClause)
+	{
+		/* create unique constraint on GROUP BY expression columns */
+		foreach(lc, qry->groupClause)
+		{
+			SortGroupClause *scl = (SortGroupClause *) lfirst(lc);
+			TargetEntry *tle = get_sortgroupclause_tle(scl, qry->targetList);
+			Form_pg_attribute attr = TupleDescAttr(matviewRel->rd_att, tle->resno - 1);
+			IndexElem  *iparam;
+
+			iparam = makeNode(IndexElem);
+			iparam->name = pstrdup(NameStr(attr->attname));
+			iparam->expr = NULL;
+			iparam->indexcolname = NULL;
+			iparam->collation = NIL;
+			iparam->opclass = NIL;
+			iparam->opclassopts = NIL;
+			iparam->ordering = SORTBY_DEFAULT;
+			iparam->nulls_ordering = SORTBY_NULLS_DEFAULT;
+			index->indexParams = lappend(index->indexParams, iparam);
+		}
+	}
+	else if (qry->distinctClause)
 	{
 		/* create unique constraint on all columns */
 		foreach(lc, qry->targetList)
@@ -1198,7 +1467,7 @@ CreateIndexOnIMMV(Query *query, Relation matviewRel)
 					(errmsg("could not create an index on materialized view \"%s\" automatically",
 							RelationGetRelationName(matviewRel)),
 					 errdetail("This target list does not have all the primary key columns, "
-							   "or this view does not contain DISTINCT clause."),
+							   "or this view does not contain GROUP BY or DISTINCT clause."),
 					 errhint("Create an index on the materialized view for efficient incremental maintenance.")));
 			return;
 		}
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index 943de5dfba..1f50aaa1b8 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -81,6 +81,32 @@ typedef struct
 
 #define MV_INIT_QUERYHASHSIZE	16
 
+/* MV query type codes */
+#define MV_PLAN_RECALC			1
+#define MV_PLAN_SET_VALUE		2
+
+/*
+ * MI_QueryKey
+ *
+ * The key identifying a prepared SPI plan in our query hashtable
+ */
+typedef struct MV_QueryKey
+{
+	Oid			matview_id;	/* OID of materialized view */
+	int32		query_type;	/* query type ID, see MV_PLAN_XXX above */
+} MV_QueryKey;
+
+/*
+ * MV_QueryHashEntry
+ *
+ * Hash entry for cached plans used to maintain materialized views.
+ */
+typedef struct MV_QueryHashEntry
+{
+	MV_QueryKey key;
+	SPIPlanPtr	plan;
+} MV_QueryHashEntry;
+
 /*
  * MV_TriggerHashEntry
  *
@@ -117,8 +143,16 @@ typedef struct MV_TriggerTable
 	RangeTblEntry *original_rte;	/* the original RTE saved before rewriting query */
 } MV_TriggerTable;
 
+static HTAB *mv_query_cache = NULL;
 static HTAB *mv_trigger_info = NULL;
 
+/* kind of IVM operation for the view */
+typedef enum
+{
+	IVM_ADD,
+	IVM_SUB
+} IvmOp;
+
 /* ENR name for materialized view delta */
 #define NEW_DELTA_ENRNAME "new_delta"
 #define OLD_DELTA_ENRNAME "old_delta"
@@ -152,7 +186,7 @@ static RangeTblEntry *get_prestate_rte(RangeTblEntry *rte, MV_TriggerTable *tabl
 				 QueryEnvironment *queryEnv);
 static RangeTblEntry *union_ENRs(RangeTblEntry *rte, Oid relid, List *enr_rtes, const char *prefix,
 		   QueryEnvironment *queryEnv);
-static Query *rewrite_query_for_distinct(Query *query, ParseState *pstate);
+static Query *rewrite_query_for_distinct_and_aggregates(Query *query, ParseState *pstate);
 
 static void calc_delta(MV_TriggerTable *table, int rte_index, Query *query,
 			DestReceiver *dest_old, DestReceiver *dest_new,
@@ -163,19 +197,48 @@ static Query *rewrite_query_for_postupdate_state(Query *query, MV_TriggerTable *
 static void apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *new_tuplestores,
 			TupleDesc tupdesc_old, TupleDesc tupdesc_new,
 			Query *query, bool use_count, char *count_colname);
+static void append_set_clause_for_count(const char *resname, StringInfo buf_old,
+							StringInfo buf_new,StringInfo aggs_list);
+static void append_set_clause_for_sum(const char *resname, StringInfo buf_old,
+						  StringInfo buf_new, StringInfo aggs_list);
+static void append_set_clause_for_avg(const char *resname, StringInfo buf_old,
+						  StringInfo buf_new, StringInfo aggs_list,
+						  const char *aggtype);
+static void append_set_clause_for_minmax(const char *resname, StringInfo buf_old,
+							 StringInfo buf_new, StringInfo aggs_list,
+							 bool is_min);
+static char *get_operation_string(IvmOp op, const char *col, const char *arg1, const char *arg2,
+					 const char* count_col, const char *castType);
+static char *get_null_condition_string(IvmOp op, const char *arg1, const char *arg2,
+						  const char* count_col);
 static void apply_old_delta(const char *matviewname, const char *deltaname_old,
 				List *keys);
 static void apply_old_delta_with_count(const char *matviewname, const char *deltaname_old,
-				List *keys, const char *count_colname);
+				List *keys, StringInfo aggs_list, StringInfo aggs_set,
+				List *minmax_list, List *is_min_list,
+				const char *count_colname,
+				SPITupleTable **tuptable_recalc, uint64 *num_recalc);
 static void apply_new_delta(const char *matviewname, const char *deltaname_new,
 				StringInfo target_list);
 static void apply_new_delta_with_count(const char *matviewname, const char* deltaname_new,
-				List *keys, StringInfo target_list, const char* count_colname);
+				List *keys, StringInfo target_list, StringInfo aggs_set,
+				const char* count_colname);
 static char *get_matching_condition_string(List *keys);
+static char *get_returning_string(List *minmax_list, List *is_min_list, List *keys);
+static char *get_minmax_recalc_condition_string(List *minmax_list, List *is_min_list);
+static char *get_select_for_recalc_string(List *keys);
+static void recalc_and_set_values(SPITupleTable *tuptable_recalc, int64 num_tuples,
+					  List *namelist, List *keys, Relation matviewRel);
+static SPIPlanPtr get_plan_for_recalc(Oid matviewOid, List *namelist, List *keys, Oid *keyTypes);
+static SPIPlanPtr get_plan_for_set_values(Oid matviewOid, char *matviewname, List *namelist,
+						Oid *valTypes);
 static void generate_equal(StringInfo querybuf, Oid opttype,
 			   const char *leftop, const char *rightop);
 
 static void mv_InitHashTables(void);
+static SPIPlanPtr mv_FetchPreparedPlan(MV_QueryKey *key);
+static void mv_HashPreparedPlan(MV_QueryKey *key, SPIPlanPtr plan);
+static void mv_BuildQueryKey(MV_QueryKey *key, Oid matview_id, int32 query_type);
 static void clean_up_IVM_hash_entry(MV_TriggerHashEntry *entry);
 
 static List *get_securityQuals(Oid relId, int rt_index, Query *query);
@@ -1447,8 +1510,8 @@ IVM_immediate_maintenance(PG_FUNCTION_ARGS)
 	rewritten = rewrite_query_for_preupdate_state(rewritten, entry->tables,
 												  entry->xid, entry->cid,
 												  pstate);
-	/* Rewrite for DISTINCT clause */
-	rewritten = rewrite_query_for_distinct(rewritten, pstate);
+	/* Rewrite for DISTINCT clause and aggregates functions */
+	rewritten = rewrite_query_for_distinct_and_aggregates(rewritten, pstate);
 
 	/* Create tuplestores to store view deltas */
 	if (entry->has_old)
@@ -1499,7 +1562,7 @@ IVM_immediate_maintenance(PG_FUNCTION_ARGS)
 
 			count_colname = pstrdup("__ivm_count__");
 
-			if (query->distinctClause)
+			if (query->hasAggs || query->distinctClause)
 				use_count = true;
 
 			/* calculate delta tables */
@@ -1861,17 +1924,34 @@ union_ENRs(RangeTblEntry *rte, Oid relid, List *enr_rtes, const char *prefix,
 }
 
 /*
- * rewrite_query_for_distinct
+ * rewrite_query_for_distinct_and_aggregates
  *
- * Rewrite query for counting DISTINCT clause.
+ * Rewrite query for counting DISTINCT clause and aggregate functions.
  */
 static Query *
-rewrite_query_for_distinct(Query *query, ParseState *pstate)
+rewrite_query_for_distinct_and_aggregates(Query *query, ParseState *pstate)
 {
 	TargetEntry *tle_count;
 	FuncCall *fn;
 	Node *node;
 
+	/* For aggregate views */
+	if (query->hasAggs)
+	{
+		ListCell *lc;
+		List *aggs = NIL;
+		AttrNumber next_resno = list_length(query->targetList) + 1;
+
+		foreach(lc, query->targetList)
+		{
+			TargetEntry *tle = (TargetEntry *) lfirst(lc);
+
+			if (IsA(tle->expr, Aggref))
+				makeIvmAggColumn(pstate, (Aggref *)tle->expr, tle->resname, &next_resno, &aggs);
+		}
+		query->targetList = list_concat(query->targetList, aggs);
+	}
+
 	/* Add count(*) for counting distinct tuples in views */
 	fn = makeFuncCall(list_make1(makeString("count")), NIL, COERCE_EXPLICIT_CALL, -1);
 	fn->agg_star = true;
@@ -1940,6 +2020,8 @@ rewrite_query_for_postupdate_state(Query *query, MV_TriggerTable *table, int rte
 	return query;
 }
 
+#define IVM_colname(type, col) makeObjectName("__ivm_" type, col, "_")
+
 /*
  * apply_delta
  *
@@ -1953,11 +2035,16 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n
 {
 	StringInfoData querybuf;
 	StringInfoData target_list_buf;
+	StringInfo	aggs_list_buf = NULL;
+	StringInfo	aggs_set_old = NULL;
+	StringInfo	aggs_set_new = NULL;
 	Relation	matviewRel;
 	char	   *matviewname;
 	ListCell	*lc;
 	int			i;
 	List	   *keys = NIL;
+	List	   *minmax_list = NIL;
+	List	   *is_min_list = NIL;
 
 
 	/*
@@ -1975,6 +2062,15 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n
 	initStringInfo(&querybuf);
 	initStringInfo(&target_list_buf);
 
+	if (query->hasAggs)
+	{
+		if (old_tuplestores && tuplestore_tuple_count(old_tuplestores) > 0)
+			aggs_set_old = makeStringInfo();
+		if (new_tuplestores && tuplestore_tuple_count(new_tuplestores) > 0)
+			aggs_set_new = makeStringInfo();
+		aggs_list_buf = makeStringInfo();
+	}
+
 	/* build string of target list */
 	for (i = 0; i < matviewRel->rd_att->natts; i++)
 	{
@@ -1998,7 +2094,65 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n
 		if (tle->resjunk)
 			continue;
 
-		keys = lappend(keys, resname);
+		/*
+		 * For views without aggregates, all attributes are used as keys to identify a
+		 * tuple in a view.
+		 */
+		if (!query->hasAggs)
+			keys = lappend(keys, attr);
+
+		/* For views with aggregates, we need to build SET clause for updating aggregate
+		 * values. */
+		if (query->hasAggs && IsA(tle->expr, Aggref))
+		{
+			Aggref *aggref = (Aggref *) tle->expr;
+			const char *aggname = get_func_name(aggref->aggfnoid);
+
+			/*
+			 * We can use function names here because it is already checked if these
+			 * can be used in IMMV by its OID at the definition time.
+			 */
+
+			/* count */
+			if (!strcmp(aggname, "count"))
+				append_set_clause_for_count(resname, aggs_set_old, aggs_set_new, aggs_list_buf);
+
+			/* sum */
+			else if (!strcmp(aggname, "sum"))
+				append_set_clause_for_sum(resname, aggs_set_old, aggs_set_new, aggs_list_buf);
+
+			/* avg */
+			else if (!strcmp(aggname, "avg"))
+				append_set_clause_for_avg(resname, aggs_set_old, aggs_set_new, aggs_list_buf,
+										  format_type_be(aggref->aggtype));
+
+			/* min/max */
+			else if (!strcmp(aggname, "min") || !strcmp(aggname, "max"))
+			{
+				bool	is_min = (!strcmp(aggname, "min"));
+
+				append_set_clause_for_minmax(resname, aggs_set_old, aggs_set_new, aggs_list_buf, is_min);
+
+				/* make a resname list of min and max aggregates */
+				minmax_list = lappend(minmax_list, resname);
+				is_min_list = lappend_int(is_min_list, is_min);
+			}
+			else
+				elog(ERROR, "unsupported aggregate function: %s", aggname);
+		}
+	}
+
+	/* If we have GROUP BY clause, we use its entries as keys. */
+	if (query->hasAggs && query->groupClause)
+	{
+		foreach (lc, query->groupClause)
+		{
+			SortGroupClause *sgcl = (SortGroupClause *) lfirst(lc);
+			TargetEntry		*tle = get_sortgroupclause_tle(sgcl, query->targetList);
+			Form_pg_attribute attr = TupleDescAttr(matviewRel->rd_att, tle->resno - 1);
+
+			keys = lappend(keys, attr);
+		}
 	}
 
 	/* Start maintaining the materialized view. */
@@ -2012,6 +2166,8 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n
 	if (old_tuplestores && tuplestore_tuple_count(old_tuplestores) > 0)
 	{
 		EphemeralNamedRelation enr = palloc(sizeof(EphemeralNamedRelationData));
+		SPITupleTable  *tuptable_recalc = NULL;
+		uint64			num_recalc;
 		int				rc;
 
 		/* convert tuplestores to ENR, and register for SPI */
@@ -2029,10 +2185,19 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n
 		if (use_count)
 			/* apply old delta and get rows to be recalculated */
 			apply_old_delta_with_count(matviewname, OLD_DELTA_ENRNAME,
-									   keys, count_colname);
+									   keys, aggs_list_buf, aggs_set_old,
+									   minmax_list, is_min_list,
+									   count_colname, &tuptable_recalc, &num_recalc);
 		else
 			apply_old_delta(matviewname, OLD_DELTA_ENRNAME, keys);
 
+		/*
+		 * If we have min or max, we might have to recalculate aggregate values from base tables
+		 * on some tuples. TIDs and keys such tuples are returned as a result of the above query.
+		 */
+		if (minmax_list && tuptable_recalc)
+			recalc_and_set_values(tuptable_recalc, num_recalc, minmax_list, keys, matviewRel);
+
 	}
 	/* For tuple insertion */
 	if (new_tuplestores && tuplestore_tuple_count(new_tuplestores) > 0)
@@ -2055,7 +2220,7 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n
 		/* apply new delta */
 		if (use_count)
 			apply_new_delta_with_count(matviewname, NEW_DELTA_ENRNAME,
-								keys, &target_list_buf, count_colname);
+								keys, aggs_set_new, &target_list_buf, count_colname);
 		else
 			apply_new_delta(matviewname, NEW_DELTA_ENRNAME, &target_list_buf);
 	}
@@ -2070,49 +2235,410 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n
 		elog(ERROR, "SPI_finish failed");
 }
 
+/*
+ * append_set_clause_for_count
+ *
+ * Append SET clause string for count aggregation to given buffers.
+ * Also, append resnames required for calculating the aggregate value.
+ */
+static void
+append_set_clause_for_count(const char *resname, StringInfo buf_old,
+							StringInfo buf_new,StringInfo aggs_list)
+{
+	/* For tuple deletion */
+	if (buf_old)
+	{
+		/* resname = mv.resname - t.resname */
+		appendStringInfo(buf_old,
+			", %s = %s",
+			quote_qualified_identifier(NULL, resname),
+			get_operation_string(IVM_SUB, resname, "mv", "t", NULL, NULL));
+	}
+	/* For tuple insertion */
+	if (buf_new)
+	{
+		/* resname = mv.resname + diff.resname */
+		appendStringInfo(buf_new,
+			", %s = %s",
+			quote_qualified_identifier(NULL, resname),
+			get_operation_string(IVM_ADD, resname, "mv", "diff", NULL, NULL));
+	}
+
+	appendStringInfo(aggs_list, ", %s",
+		quote_qualified_identifier("diff", resname)
+	);
+}
+
+/*
+ * append_set_clause_for_sum
+ *
+ * Append SET clause string for sum aggregation to given buffers.
+ * Also, append resnames required for calculating the aggregate value.
+ */
+static void
+append_set_clause_for_sum(const char *resname, StringInfo buf_old,
+						  StringInfo buf_new, StringInfo aggs_list)
+{
+	char *count_col = IVM_colname("count", resname);
+
+	/* For tuple deletion */
+	if (buf_old)
+	{
+		/* sum = mv.sum - t.sum */
+		appendStringInfo(buf_old,
+			", %s = %s",
+			quote_qualified_identifier(NULL, resname),
+			get_operation_string(IVM_SUB, resname, "mv", "t", count_col, NULL)
+		);
+		/* count = mv.count - t.count */
+		appendStringInfo(buf_old,
+			", %s = %s",
+			quote_qualified_identifier(NULL, count_col),
+			get_operation_string(IVM_SUB, count_col, "mv", "t", NULL, NULL)
+		);
+	}
+	/* For tuple insertion */
+	if (buf_new)
+	{
+		/* sum = mv.sum + diff.sum */
+		appendStringInfo(buf_new,
+			", %s = %s",
+			quote_qualified_identifier(NULL, resname),
+			get_operation_string(IVM_ADD, resname, "mv", "diff", count_col, NULL)
+		);
+		/* count = mv.count + diff.count */
+		appendStringInfo(buf_new,
+			", %s = %s",
+			quote_qualified_identifier(NULL, count_col),
+			get_operation_string(IVM_ADD, count_col, "mv", "diff", NULL, NULL)
+		);
+	}
+
+	appendStringInfo(aggs_list, ", %s, %s",
+		quote_qualified_identifier("diff", resname),
+		quote_qualified_identifier("diff", IVM_colname("count", resname))
+	);
+}
+
+/*
+ * append_set_clause_for_avg
+ *
+ * Append SET clause string for avg aggregation to given buffers.
+ * Also, append resnames required for calculating the aggregate value.
+ */
+static void
+append_set_clause_for_avg(const char *resname, StringInfo buf_old,
+						  StringInfo buf_new, StringInfo aggs_list,
+						  const char *aggtype)
+{
+	char *sum_col = IVM_colname("sum", resname);
+	char *count_col = IVM_colname("count", resname);
+
+	/* For tuple deletion */
+	if (buf_old)
+	{
+		/* avg = (mv.sum - t.sum)::aggtype / (mv.count - t.count) */
+		appendStringInfo(buf_old,
+			", %s = %s OPERATOR(pg_catalog./) %s",
+			quote_qualified_identifier(NULL, resname),
+			get_operation_string(IVM_SUB, sum_col, "mv", "t", count_col, aggtype),
+			get_operation_string(IVM_SUB, count_col, "mv", "t", NULL, NULL)
+		);
+		/* sum = mv.sum - t.sum */
+		appendStringInfo(buf_old,
+			", %s = %s",
+			quote_qualified_identifier(NULL, sum_col),
+			get_operation_string(IVM_SUB, sum_col, "mv", "t", count_col, NULL)
+		);
+		/* count = mv.count - t.count */
+		appendStringInfo(buf_old,
+			", %s = %s",
+			quote_qualified_identifier(NULL, count_col),
+			get_operation_string(IVM_SUB, count_col, "mv", "t", NULL, NULL)
+		);
+
+	}
+	/* For tuple insertion */
+	if (buf_new)
+	{
+		/* avg = (mv.sum + diff.sum)::aggtype / (mv.count + diff.count) */
+		appendStringInfo(buf_new,
+			", %s = %s OPERATOR(pg_catalog./) %s",
+			quote_qualified_identifier(NULL, resname),
+			get_operation_string(IVM_ADD, sum_col, "mv", "diff", count_col, aggtype),
+			get_operation_string(IVM_ADD, count_col, "mv", "diff", NULL, NULL)
+		);
+		/* sum = mv.sum + diff.sum */
+		appendStringInfo(buf_new,
+			", %s = %s",
+			quote_qualified_identifier(NULL, sum_col),
+			get_operation_string(IVM_ADD, sum_col, "mv", "diff", count_col, NULL)
+		);
+		/* count = mv.count + diff.count */
+		appendStringInfo(buf_new,
+			", %s = %s",
+			quote_qualified_identifier(NULL, count_col),
+			get_operation_string(IVM_ADD, count_col, "mv", "diff", NULL, NULL)
+		);
+	}
+
+	appendStringInfo(aggs_list, ", %s, %s, %s",
+		quote_qualified_identifier("diff", resname),
+		quote_qualified_identifier("diff", IVM_colname("sum", resname)),
+		quote_qualified_identifier("diff", IVM_colname("count", resname))
+	);
+}
+
+/*
+ * append_set_clause_for_minmax
+ *
+ * Append SET clause string for min or max aggregation to given buffers.
+ * Also, append resnames required for calculating the aggregate value.
+ * is_min is true if this is min, false if not.
+ */
+static void
+append_set_clause_for_minmax(const char *resname, StringInfo buf_old,
+							 StringInfo buf_new, StringInfo aggs_list,
+							 bool is_min)
+{
+	char *count_col = IVM_colname("count", resname);
+
+	/* For tuple deletion */
+	if (buf_old)
+	{
+		/*
+		 * If the new value doesn't became NULL then use the value remaining
+		 * in the view although this will be recomputated afterwords.
+		 */
+		appendStringInfo(buf_old,
+			", %s = CASE WHEN %s THEN NULL ELSE %s END",
+			quote_qualified_identifier(NULL, resname),
+			get_null_condition_string(IVM_SUB, "mv", "t", count_col),
+			quote_qualified_identifier("mv", resname)
+		);
+		/* count = mv.count - t.count */
+		appendStringInfo(buf_old,
+			", %s = %s",
+			quote_qualified_identifier(NULL, count_col),
+			get_operation_string(IVM_SUB, count_col, "mv", "t", NULL, NULL)
+		);
+	}
+	/* For tuple insertion */
+	if (buf_new)
+	{
+		/*
+		 * min = LEAST(mv.min, diff.min)
+		 * max = GREATEST(mv.max, diff.max)
+		 */
+		appendStringInfo(buf_new,
+			", %s = CASE WHEN %s THEN NULL ELSE %s(%s,%s) END",
+			quote_qualified_identifier(NULL, resname),
+			get_null_condition_string(IVM_ADD, "mv", "diff", count_col),
+
+			is_min ? "LEAST" : "GREATEST",
+			quote_qualified_identifier("mv", resname),
+			quote_qualified_identifier("diff", resname)
+		);
+		/* count = mv.count + diff.count */
+		appendStringInfo(buf_new,
+			", %s = %s",
+			quote_qualified_identifier(NULL, count_col),
+			get_operation_string(IVM_ADD, count_col, "mv", "diff", NULL, NULL)
+		);
+	}
+
+	appendStringInfo(aggs_list, ", %s, %s",
+		quote_qualified_identifier("diff", resname),
+		quote_qualified_identifier("diff", IVM_colname("count", resname))
+	);
+}
+
+/*
+ * get_operation_string
+ *
+ * Build a string to calculate the new aggregate values.
+ */
+static char *
+get_operation_string(IvmOp op, const char *col, const char *arg1, const char *arg2,
+					 const char* count_col, const char *castType)
+{
+	StringInfoData buf;
+	StringInfoData castString;
+	char   *col1 = quote_qualified_identifier(arg1, col);
+	char   *col2 = quote_qualified_identifier(arg2, col);
+	char	op_char = (op == IVM_SUB ? '-' : '+');
+
+	initStringInfo(&buf);
+	initStringInfo(&castString);
+
+	if (castType)
+		appendStringInfo(&castString, "::%s", castType);
+
+	if (!count_col)
+	{
+		/*
+		 * If the attributes don't have count columns then calc the result
+		 * by using the operator simply.
+		 */
+		appendStringInfo(&buf, "(%s OPERATOR(pg_catalog.%c) %s)%s",
+			col1, op_char, col2, castString.data);
+	}
+	else
+	{
+		/*
+		 * If the attributes have count columns then consider the condition
+		 * where the result becomes NULL.
+		 */
+		char *null_cond = get_null_condition_string(op, arg1, arg2, count_col);
+
+		appendStringInfo(&buf,
+			"(CASE WHEN %s THEN NULL "
+				"WHEN %s IS NULL THEN %s "
+				"WHEN %s IS NULL THEN %s "
+				"ELSE (%s OPERATOR(pg_catalog.%c) %s)%s END)",
+			null_cond,
+			col1, col2,
+			col2, col1,
+			col1, op_char, col2, castString.data
+		);
+	}
+
+	return buf.data;
+}
+
+/*
+ * get_null_condition_string
+ *
+ * Build a predicate string for CASE clause to check if an aggregate value
+ * will became NULL after the given operation is applied.
+ */
+static char *
+get_null_condition_string(IvmOp op, const char *arg1, const char *arg2,
+						  const char* count_col)
+{
+	StringInfoData null_cond;
+	initStringInfo(&null_cond);
+
+	switch (op)
+	{
+		case IVM_ADD:
+			appendStringInfo(&null_cond,
+				"%s OPERATOR(pg_catalog.=) 0 AND %s OPERATOR(pg_catalog.=) 0",
+				quote_qualified_identifier(arg1, count_col),
+				quote_qualified_identifier(arg2, count_col)
+			);
+			break;
+		case IVM_SUB:
+			appendStringInfo(&null_cond,
+				"%s OPERATOR(pg_catalog.=) %s",
+				quote_qualified_identifier(arg1, count_col),
+				quote_qualified_identifier(arg2, count_col)
+			);
+			break;
+		default:
+			elog(ERROR,"unknown operation");
+	}
+
+	return null_cond.data;
+}
+
+
 /*
  * apply_old_delta_with_count
  *
  * Execute a query for applying a delta table given by deltname_old
  * which contains tuples to be deleted from to a materialized view given by
  * matviewname.  This is used when counting is required, that is, the view
- * has aggregate or distinct.
+ * has aggregate or distinct. Also, when a table in EXISTS sub queries
+ * is modified.
+ *
+ * If the view desn't have aggregates or has GROUP BY, this requires a keys
+ * list to identify a tuple in the view. If the view has aggregates, this
+ * requires strings representing resnames of aggregates and SET clause for
+ * updating aggregate values.
+ *
+ * If the view has min or max aggregate, this requires a list of resnames of
+ * min/max aggregates and a list of boolean which represents which entries in
+ * minmax_list is min. These are necessary to check if we need to recalculate
+ * min or max aggregate values. In this case, this query returns TID and keys
+ * of tuples which need to be recalculated.  This result and the number of rows
+ * are stored in tuptables and num_recalc repectedly.
+ *
  */
 static void
 apply_old_delta_with_count(const char *matviewname, const char *deltaname_old,
-				List *keys, const char *count_colname)
+				List *keys, StringInfo aggs_list, StringInfo aggs_set,
+				List *minmax_list, List *is_min_list,
+				const char *count_colname,
+				SPITupleTable **tuptable_recalc, uint64 *num_recalc)
 {
 	StringInfoData	querybuf;
 	char   *match_cond;
+	char   *updt_returning = "";
+	char   *select_for_recalc = "SELECT";
+	bool	agg_without_groupby = (list_length(keys) == 0);
+
+	Assert(tuptable_recalc != NULL);
+	Assert(num_recalc != NULL);
 
 	/* build WHERE condition for searching tuples to be deleted */
 	match_cond = get_matching_condition_string(keys);
 
+	/*
+	 * We need a special RETURNING clause and SELECT statement for min/max to
+	 * check which tuple needs re-calculation from base tables.
+	 */
+	if (minmax_list)
+	{
+		updt_returning = get_returning_string(minmax_list, is_min_list, keys);
+		select_for_recalc = get_select_for_recalc_string(keys);
+	}
+
 	/* Search for matching tuples from the view and update or delete if found. */
 	initStringInfo(&querybuf);
 	appendStringInfo(&querybuf,
 					"WITH t AS ("			/* collecting tid of target tuples in the view */
 						"SELECT diff.%s, "			/* count column */
-								"(diff.%s OPERATOR(pg_catalog.=) mv.%s) AS for_dlt, "
+								"(diff.%s OPERATOR(pg_catalog.=) mv.%s AND %s) AS for_dlt, "
 								"mv.ctid "
+								"%s "				/* aggregate columns */
 						"FROM %s AS mv, %s AS diff "
 						"WHERE %s"					/* tuple matching condition */
 					"), updt AS ("			/* update a tuple if this is not to be deleted */
 						"UPDATE %s AS mv SET %s = mv.%s OPERATOR(pg_catalog.-) t.%s "
+											"%s"	/* SET clauses for aggregates */
 						"FROM t WHERE mv.ctid OPERATOR(pg_catalog.=) t.ctid AND NOT for_dlt "
+						"%s"						/* RETURNING clause for recalc infomation */
 					"), dlt AS ("			/* delete a tuple if this is to be deleted */
 						"DELETE FROM %s AS mv USING t "
 						"WHERE mv.ctid OPERATOR(pg_catalog.=) t.ctid AND for_dlt"
-					")",
+					") %s",							/* SELECT returning which tuples need to be recalculated */
 					count_colname,
-					count_colname, count_colname,
+					count_colname, count_colname, (agg_without_groupby ? "false" : "true"),
+					(aggs_list != NULL ? aggs_list->data : ""),
 					matviewname, deltaname_old,
 					match_cond,
 					matviewname, count_colname, count_colname, count_colname,
-					matviewname);
+					(aggs_set != NULL ? aggs_set->data : ""),
+					updt_returning,
+					matviewname,
+					select_for_recalc);
 
-	if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE)
+	if (SPI_exec(querybuf.data, 0) != SPI_OK_SELECT)
 		elog(ERROR, "SPI_exec failed: %s", querybuf.data);
+
+
+	/* Return tuples to be recalculated. */
+	if (minmax_list)
+	{
+		*tuptable_recalc = SPI_tuptable;
+		*num_recalc = SPI_processed;
+	}
+	else
+	{
+		*tuptable_recalc = NULL;
+		*num_recalc = 0;
+	}
 }
 
 /*
@@ -2172,10 +2698,15 @@ apply_old_delta(const char *matviewname, const char *deltaname_old,
  * matviewname.  This is used when counting is required, that is, the view
  * has aggregate or distinct. Also, when a table in EXISTS sub queries
  * is modified.
+ *
+ * If the view desn't have aggregates or has GROUP BY, this requires a keys
+ * list to identify a tuple in the view. If the view has aggregates, this
+ * requires strings representing SET clause for updating aggregate values.
  */
 static void
 apply_new_delta_with_count(const char *matviewname, const char* deltaname_new,
-				List *keys, StringInfo target_list, const char* count_colname)
+				List *keys, StringInfo aggs_set, StringInfo target_list,
+				const char* count_colname)
 {
 	StringInfoData	querybuf;
 	StringInfoData	returning_keys;
@@ -2206,6 +2737,7 @@ apply_new_delta_with_count(const char *matviewname, const char* deltaname_new,
 	appendStringInfo(&querybuf,
 					"WITH updt AS ("		/* update a tuple if this exists in the view */
 						"UPDATE %s AS mv SET %s = mv.%s OPERATOR(pg_catalog.+) diff.%s "
+											"%s "	/* SET clauses for aggregates */
 						"FROM %s AS diff "
 						"WHERE %s "					/* tuple matching condition */
 						"RETURNING %s"				/* returning keys of updated tuples */
@@ -2213,6 +2745,7 @@ apply_new_delta_with_count(const char *matviewname, const char* deltaname_new,
 						"SELECT %s FROM %s AS diff "
 						"WHERE NOT EXISTS (SELECT 1 FROM updt AS mv WHERE %s);",
 					matviewname, count_colname, count_colname, count_colname,
+					(aggs_set != NULL ? aggs_set->data : ""),
 					deltaname_new,
 					match_cond,
 					returning_keys.data,
@@ -2287,6 +2820,349 @@ get_matching_condition_string(List *keys)
 	return match_cond.data;
 }
 
+/*
+ * get_returning_string
+ *
+ * Build a string for RETURNING clause of UPDATE used in apply_old_delta_with_count.
+ * This clause returns ctid and a boolean value that indicates if we need to
+ * recalculate min or max value, for each updated row.
+ */
+static char *
+get_returning_string(List *minmax_list, List *is_min_list, List *keys)
+{
+	StringInfoData returning;
+	char		*recalc_cond;
+	ListCell	*lc;
+
+	Assert(minmax_list != NIL && is_min_list != NIL);
+	recalc_cond = get_minmax_recalc_condition_string(minmax_list, is_min_list);
+
+	initStringInfo(&returning);
+
+	appendStringInfo(&returning, "RETURNING mv.ctid AS tid, (%s) AS recalc", recalc_cond);
+	foreach (lc, keys)
+	{
+		Form_pg_attribute attr = (Form_pg_attribute) lfirst(lc);
+		char *resname = NameStr(attr->attname);
+		appendStringInfo(&returning, ", %s", quote_qualified_identifier("mv", resname));
+	}
+
+	return returning.data;
+}
+
+/*
+ * get_minmax_recalc_condition_string
+ *
+ * Build a predicate string for checking if any min/max aggregate
+ * value needs to be recalculated.
+ */
+static char *
+get_minmax_recalc_condition_string(List *minmax_list, List *is_min_list)
+{
+	StringInfoData recalc_cond;
+	ListCell	*lc1, *lc2;
+
+	initStringInfo(&recalc_cond);
+
+	Assert (list_length(minmax_list) == list_length(is_min_list));
+
+	forboth (lc1, minmax_list, lc2, is_min_list)
+	{
+		char   *resname = (char *) lfirst(lc1);
+		bool	is_min = (bool) lfirst_int(lc2);
+		char   *op_str = (is_min ? ">=" : "<=");
+
+		appendStringInfo(&recalc_cond, "%s OPERATOR(pg_catalog.%s) %s",
+			quote_qualified_identifier("mv", resname),
+			op_str,
+			quote_qualified_identifier("t", resname)
+		);
+
+		if (lnext(minmax_list, lc1))
+			appendStringInfo(&recalc_cond, " OR ");
+	}
+
+	return recalc_cond.data;
+}
+
+/*
+ * get_select_for_recalc_string
+ *
+ * Build a query to return tid and keys of tuples which need
+ * recalculation. This is used as the result of the query
+ * built by apply_old_delta.
+ */
+static char *
+get_select_for_recalc_string(List *keys)
+{
+	StringInfoData qry;
+	ListCell	*lc;
+
+	initStringInfo(&qry);
+
+	appendStringInfo(&qry, "SELECT tid");
+	foreach (lc, keys)
+	{
+		Form_pg_attribute attr = (Form_pg_attribute) lfirst(lc);
+		appendStringInfo(&qry, ", %s", NameStr(attr->attname));
+	}
+
+	appendStringInfo(&qry, " FROM updt WHERE recalc");
+
+	return qry.data;
+}
+
+/*
+ * recalc_and_set_values
+ *
+ * Recalculate tuples in a materialized from base tables and update these.
+ * The tuples which needs recalculation are specified by keys, and resnames
+ * of columns to be updated are specified by namelist. TIDs and key values
+ * are given by tuples in tuptable_recalc. Its first attribute must be TID
+ * and key values must be following this.
+ */
+static void
+recalc_and_set_values(SPITupleTable *tuptable_recalc, int64 num_tuples,
+					  List *namelist, List *keys, Relation matviewRel)
+{
+	TupleDesc   tupdesc_recalc = tuptable_recalc->tupdesc;
+	Oid		   *keyTypes = NULL, *types = NULL;
+	char	   *keyNulls = NULL, *nulls = NULL;
+	Datum	   *keyVals = NULL, *vals = NULL;
+	int			num_vals = list_length(namelist);
+	int			num_keys = list_length(keys);
+	uint64      i;
+	Oid			matviewOid;
+	char	   *matviewname;
+
+	matviewOid = RelationGetRelid(matviewRel);
+	matviewname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
+											 RelationGetRelationName(matviewRel));
+
+	/* If we have keys, initialize arrays for them. */
+	if (keys)
+	{
+		keyTypes = palloc(sizeof(Oid) * num_keys);
+		keyNulls = palloc(sizeof(char) * num_keys);
+		keyVals = palloc(sizeof(Datum) * num_keys);
+		/* a tuple contains keys to be recalculated and ctid to be updated*/
+		Assert(tupdesc_recalc->natts == num_keys + 1);
+
+		/* Types of key attributes  */
+		for (i = 0; i < num_keys; i++)
+			keyTypes[i] = TupleDescAttr(tupdesc_recalc, i + 1)->atttypid;
+	}
+
+	/* allocate memory for all attribute names and tid */
+	types = palloc(sizeof(Oid) * (num_vals + 1));
+	nulls = palloc(sizeof(char) * (num_vals + 1));
+	vals = palloc(sizeof(Datum) * (num_vals + 1));
+
+	/* For each tuple which needs recalculation */
+	for (i = 0; i < num_tuples; i++)
+	{
+		int j;
+		bool isnull;
+		SPIPlanPtr plan;
+		SPITupleTable *tuptable_newvals;
+		TupleDesc   tupdesc_newvals;
+
+		/* Set group key values as parameters if needed. */
+		if (keys)
+		{
+			for (j = 0; j < num_keys; j++)
+			{
+				keyVals[j] = SPI_getbinval(tuptable_recalc->vals[i], tupdesc_recalc, j + 2, &isnull);
+				if (isnull)
+					keyNulls[j] = 'n';
+				else
+					keyNulls[j] = ' ';
+			}
+		}
+
+		/*
+		 * Get recalculated values from base tables. The result must be
+		 * only one tuple thich contains the new values for specified keys.
+		 */
+		plan = get_plan_for_recalc(matviewOid, namelist, keys, keyTypes);
+		if (SPI_execute_plan(plan, keyVals, keyNulls, false, 0) != SPI_OK_SELECT)
+			elog(ERROR, "SPI_execute_plan");
+		if (SPI_processed != 1)
+			elog(ERROR, "SPI_execute_plan returned zero or more than one rows");
+
+		tuptable_newvals = SPI_tuptable;
+		tupdesc_newvals = tuptable_newvals->tupdesc;
+
+		Assert(tupdesc_newvals->natts == num_vals);
+
+		/* Set the new values as parameters */
+		for (j = 0; j < tupdesc_newvals->natts; j++)
+		{
+			if (i == 0)
+				types[j] = TupleDescAttr(tupdesc_newvals, j)->atttypid;
+
+			vals[j] = SPI_getbinval(tuptable_newvals->vals[0], tupdesc_newvals, j + 1, &isnull);
+			if (isnull)
+				nulls[j] = 'n';
+			else
+				nulls[j] = ' ';
+		}
+		/* Set TID of the view tuple to be updated as a parameter */
+		types[j] = TIDOID;
+		vals[j] = SPI_getbinval(tuptable_recalc->vals[i], tupdesc_recalc, 1, &isnull);
+		nulls[j] = ' ';
+
+		/* Update the view tuple to the new values */
+		plan = get_plan_for_set_values(matviewOid, matviewname, namelist, types);
+		if (SPI_execute_plan(plan, vals, nulls, false, 0) != SPI_OK_UPDATE)
+			elog(ERROR, "SPI_execute_plan");
+	}
+}
+
+
+/*
+ * get_plan_for_recalc
+ *
+ * Create or fetch a plan for recalculating value in the view's target list
+ * from base tables using the definition query of materialized view specified
+ * by matviewOid. namelist is a list of resnames of values to be recalculated.
+ *
+ * keys is a list of keys to identify tuples to be recalculated if this is not
+ * empty. KeyTypes is an array of types of keys.
+ */
+static SPIPlanPtr
+get_plan_for_recalc(Oid matviewOid, List *namelist, List *keys, Oid *keyTypes)
+{
+	MV_QueryKey hash_key;
+	SPIPlanPtr	plan;
+
+	/* Fetch or prepare a saved plan for the recalculation */
+	mv_BuildQueryKey(&hash_key, matviewOid, MV_PLAN_RECALC);
+	if ((plan = mv_FetchPreparedPlan(&hash_key)) == NULL)
+	{
+		ListCell	   *lc;
+		StringInfoData	str;
+		char   *viewdef;
+
+		/* get view definition of matview */
+		viewdef = text_to_cstring((text *) DatumGetPointer(
+					DirectFunctionCall1(pg_get_viewdef, ObjectIdGetDatum(matviewOid))));
+		/* get rid of trailing semi-colon */
+		viewdef[strlen(viewdef)-1] = '\0';
+
+		/*
+		 * Build a query string for recalculating values. This is like
+		 *
+		 *  SELECT x1, x2, x3, ... FROM ( ... view definition query ...) mv
+		 *   WHERE (key1, key2, ...) = ($1, $2, ...);
+		 */
+
+		initStringInfo(&str);
+		appendStringInfo(&str, "SELECT ");
+		foreach (lc, namelist)
+		{
+			appendStringInfo(&str, "%s", (char *) lfirst(lc));
+			if (lnext(namelist, lc))
+				appendStringInfoString(&str, ", ");
+		}
+		appendStringInfo(&str, " FROM (%s) mv", viewdef);
+
+		if (keys)
+		{
+			int		i = 1;
+			char	paramname[16];
+
+			appendStringInfo(&str, " WHERE (");
+			foreach (lc, keys)
+			{
+				Form_pg_attribute attr = (Form_pg_attribute) lfirst(lc);
+				char   *resname = NameStr(attr->attname);
+				Oid		typid = attr->atttypid;
+
+				sprintf(paramname, "$%d", i);
+				appendStringInfo(&str, "(");
+				generate_equal(&str, typid, resname, paramname);
+				appendStringInfo(&str, " OR (%s IS NULL AND %s IS NULL))",
+								 resname, paramname);
+
+				if (lnext(keys, lc))
+					appendStringInfoString(&str, " AND ");
+				i++;
+			}
+			appendStringInfo(&str, ")");
+		}
+		else
+			keyTypes = NULL;
+
+		plan = SPI_prepare(str.data, list_length(keys), keyTypes);
+		if (plan == NULL)
+			elog(ERROR, "SPI_prepare returned %s for %s", SPI_result_code_string(SPI_result), str.data);
+
+		SPI_keepplan(plan);
+		mv_HashPreparedPlan(&hash_key, plan);
+	}
+
+	return plan;
+}
+
+/*
+ * get_plan_for_set_values
+ *
+ * Create or fetch a plan for applying new values calculated by
+ * get_plan_for_recalc to a materialized view specified by matviewOid.
+ * matviewname is the name of the view.  namelist is a list of resnames
+ * of attributes to be updated, and valTypes is an array of types of the
+ * values.
+ */
+static SPIPlanPtr
+get_plan_for_set_values(Oid matviewOid, char *matviewname, List *namelist,
+						Oid *valTypes)
+{
+	MV_QueryKey	key;
+	SPIPlanPtr	plan;
+
+	/* Fetch or prepare a saved plan for the real check */
+	mv_BuildQueryKey(&key, matviewOid, MV_PLAN_SET_VALUE);
+	if ((plan = mv_FetchPreparedPlan(&key)) == NULL)
+	{
+		ListCell	  *lc;
+		StringInfoData str;
+		int		i;
+
+		/*
+		 * Build a query string for applying min/max values. This is like
+		 *
+		 *  UPDATE matviewname AS mv
+		 *   SET (x1, x2, x3, x4) = ($1, $2, $3, $4)
+		 *   WHERE ctid = $5;
+		 */
+
+		initStringInfo(&str);
+		appendStringInfo(&str, "UPDATE %s AS mv SET (", matviewname);
+		foreach (lc, namelist)
+		{
+			appendStringInfo(&str, "%s", (char *) lfirst(lc));
+			if (lnext(namelist, lc))
+				appendStringInfoString(&str, ", ");
+		}
+		appendStringInfo(&str, ") = ROW(");
+
+		for (i = 1; i <= list_length(namelist); i++)
+			appendStringInfo(&str, "%s$%d", (i==1 ? "" : ", "), i);
+
+		appendStringInfo(&str, ") WHERE ctid OPERATOR(pg_catalog.=) $%d", i);
+
+		plan = SPI_prepare(str.data, list_length(namelist) + 1, valTypes);
+		if (plan == NULL)
+			elog(ERROR, "SPI_prepare returned %s for %s", SPI_result_code_string(SPI_result), str.data);
+
+		SPI_keepplan(plan);
+		mv_HashPreparedPlan(&key, plan);
+	}
+
+	return plan;
+}
+
 /*
  * generate_equals
  *
@@ -2320,6 +3196,13 @@ mv_InitHashTables(void)
 {
 	HASHCTL		ctl;
 
+	memset(&ctl, 0, sizeof(ctl));
+	ctl.keysize = sizeof(MV_QueryKey);
+	ctl.entrysize = sizeof(MV_QueryHashEntry);
+	mv_query_cache = hash_create("MV query cache",
+								 MV_INIT_QUERYHASHSIZE,
+								 &ctl, HASH_ELEM | HASH_BLOBS);
+
 	memset(&ctl, 0, sizeof(ctl));
 	ctl.keysize = sizeof(Oid);
 	ctl.entrysize = sizeof(MV_TriggerHashEntry);
@@ -2328,6 +3211,99 @@ mv_InitHashTables(void)
 								 &ctl, HASH_ELEM | HASH_BLOBS);
 }
 
+/*
+ * mv_FetchPreparedPlan
+ */
+static SPIPlanPtr
+mv_FetchPreparedPlan(MV_QueryKey *key)
+{
+	MV_QueryHashEntry *entry;
+	SPIPlanPtr	plan;
+
+	/*
+	 * On the first call initialize the hashtable
+	 */
+	if (!mv_query_cache)
+		mv_InitHashTables();
+
+	/*
+	 * Lookup for the key
+	 */
+	entry = (MV_QueryHashEntry *) hash_search(mv_query_cache,
+											  (void *) key,
+											  HASH_FIND, NULL);
+	if (entry == NULL)
+		return NULL;
+
+	/*
+	 * Check whether the plan is still valid.  If it isn't, we don't want to
+	 * simply rely on plancache.c to regenerate it; rather we should start
+	 * from scratch and rebuild the query text too.  This is to cover cases
+	 * such as table/column renames.  We depend on the plancache machinery to
+	 * detect possible invalidations, though.
+	 *
+	 * CAUTION: this check is only trustworthy if the caller has already
+	 * locked both materialized views and base tables.
+	 */
+	plan = entry->plan;
+	if (plan && SPI_plan_is_valid(plan))
+		return plan;
+
+	/*
+	 * Otherwise we might as well flush the cached plan now, to free a little
+	 * memory space before we make a new one.
+	 */
+	entry->plan = NULL;
+	if (plan)
+		SPI_freeplan(plan);
+
+	return NULL;
+}
+
+/*
+ * mv_HashPreparedPlan
+ *
+ * Add another plan to our private SPI query plan hashtable.
+ */
+static void
+mv_HashPreparedPlan(MV_QueryKey *key, SPIPlanPtr plan)
+{
+	MV_QueryHashEntry *entry;
+	bool		found;
+
+	/*
+	 * On the first call initialize the hashtable
+	 */
+	if (!mv_query_cache)
+		mv_InitHashTables();
+
+	/*
+	 * Add the new plan.  We might be overwriting an entry previously found
+	 * invalid by mv_FetchPreparedPlan.
+	 */
+	entry = (MV_QueryHashEntry *) hash_search(mv_query_cache,
+											  (void *) key,
+											  HASH_ENTER, &found);
+	Assert(!found || entry->plan == NULL);
+	entry->plan = plan;
+}
+
+/*
+ * mv_BuildQueryKey
+ *
+ * Construct a hashtable key for a prepared SPI plan for IVM.
+ */
+static void
+mv_BuildQueryKey(MV_QueryKey *key, Oid matview_id, int32 query_type)
+{
+	/*
+	 * We assume struct MV_QueryKey contains no padding bytes, else we'd need
+	 * to use memset to clear them.
+	 */
+	key->matview_id = matview_id;
+	key->query_type = query_type;
+}
+
 /*
  * AtAbort_IVM
  *
diff --git a/src/include/commands/createas.h b/src/include/commands/createas.h
index bcea9782d3..e36302845f 100644
--- a/src/include/commands/createas.h
+++ b/src/include/commands/createas.h
@@ -30,6 +30,7 @@ extern void CreateIvmTriggersOnBaseTables(Query *qry, Oid matviewOid, bool is_cr
 extern void CreateIndexOnIMMV(Query *query, Relation matviewRel);
 
 extern Query *rewriteQueryForIMMV(Query *query, List *colNames);
+extern void makeIvmAggColumn(ParseState *pstate, Aggref *aggref, char *resname, AttrNumber *next_resno, List **aggs);
 
 extern int	GetIntoRelEFlags(IntoClause *intoClause);
 
-- 
2.17.1

