From e631c43c5a5d7d6b58fcf655d8f69a61a976647f Mon Sep 17 00:00:00 2001
From: Yugo Nagata <nagata@sraoss.co.jp>
Date: Wed, 31 May 2023 20:46:32 +0900
Subject: [PATCH v29 08/11] Add aggregates support in IVM
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

count, sum, adn avg are supported.

As a restriction, expressions specified in GROUP BY must appear in
the target list because tuples to be updated in IMMV are identified
by using this group key. However, in the case of aggregates without
GROUP BY, there is only one tuple in the view, so keys are not uses
to identify tuples.

When creating a IMMV, in addition to __ivm_count column, some hidden
columns for each aggregate are added to the target list. For example,
names of these hidden columns are ivm_count_avg and ivm_sum_avg for
the average function, and so on.

When a base table is modified, the aggregated values and related
hidden columns are also updated as well as __ivm_count__. The
way of update depends the kind of aggregate function.　Specifically,
sum and count are updated by simply adding or subtracting delta value
calculated from delta tables. avg is updated by using values of sum
and count stored in views as hidden columns and deltas calculated
from delta tables.

About aggregate functions except "count()" (sum and avg), NULLs in input
values are ignored, and the result of aggegate should be NULL when no
rows are selected.  To support this specification, the numbers of non-NULL
input values are counted and stored in hidden columns. In the case of
count(), count(x) returns zero when no rows are selected, but count(*)
doesn't ignore NULL input.
---
 src/backend/commands/createas.c | 264 +++++++++++++++++--
 src/backend/commands/matview.c  | 433 ++++++++++++++++++++++++++++++--
 src/include/commands/createas.h |   1 +
 3 files changed, 661 insertions(+), 37 deletions(-)

diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index 076f35ee6b..c8aa558f2e 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -54,14 +54,19 @@
 #include "parser/parsetree.h"
 #include "parser/parse_clause.h"
 #include "parser/parse_func.h"
+#include "parser/parse_type.h"
 #include "rewrite/rewriteHandler.h"
+#include "rewrite/rewriteManip.h"
 #include "storage/smgr.h"
 #include "tcop/tcopprot.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
+#include "utils/regproc.h"
+#include "utils/fmgroids.h"
 #include "utils/rel.h"
 #include "utils/rls.h"
 #include "utils/snapmgr.h"
+#include "utils/syscache.h"
 
 typedef struct
 {
@@ -75,6 +80,11 @@ typedef struct
 	BulkInsertState bistate;	/* bulk insert state */
 } DR_intorel;
 
+typedef struct
+{
+	bool	has_agg;
+} check_ivm_restriction_context;
+
 /* utility functions for CTAS definition creation */
 static ObjectAddress create_ctas_internal(List *attrList, IntoClause *into);
 static ObjectAddress create_ctas_nodata(List *tlist, IntoClause *into);
@@ -89,8 +99,9 @@ static void CreateIvmTriggersOnBaseTablesRecurse(Query *qry, Node *node, Oid mat
 									 Relids *relids, bool ex_lock);
 static void CreateIvmTrigger(Oid relOid, Oid viewOid, int16 type, int16 timing, bool ex_lock);
 static void check_ivm_restriction(Node *node);
-static bool check_ivm_restriction_walker(Node *node, void *context);
+static bool check_ivm_restriction_walker(Node *node, check_ivm_restriction_context *context);
 static Bitmapset *get_primary_key_attnos_from_query(Query *query, List **constraintList);
+static bool check_aggregate_supports_ivm(Oid aggfnoid);
 
 /*
  * create_ctas_internal
@@ -421,6 +432,7 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt,
  * rewriteQueryForIMMV -- rewrite view definition query for IMMV
  *
  * count(*) is added for counting distinct tuples in views.
+ * Also, additional hidden columns are added for aggregate values.
  */
 Query *
 rewriteQueryForIMMV(Query *query, List *colNames)
@@ -434,16 +446,49 @@ rewriteQueryForIMMV(Query *query, List *colNames)
 	rewritten = copyObject(query);
 	pstate->p_expr_kind = EXPR_KIND_SELECT_TARGET;
 
-	/*
-	 * Convert DISTINCT to GROUP BY and add count(*) for counting distinct
-	 * tuples in views.
-	 */
-	if (rewritten->distinctClause)
+	/* group keys must be in targetlist */
+	if (rewritten->groupClause)
 	{
-		TargetEntry *tle;
+		ListCell *lc;
+		foreach(lc, rewritten->groupClause)
+		{
+			SortGroupClause *scl = (SortGroupClause *) lfirst(lc);
+			TargetEntry *tle = get_sortgroupclause_tle(scl, rewritten->targetList);
 
+			if (tle->resjunk)
+				ereport(ERROR,
+						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+						 errmsg("GROUP BY expression not appearing in select list is not supported on incrementally maintainable materialized view")));
+		}
+	}
+	/* Convert DISTINCT to GROUP BY.  count(*) will be added afterward. */
+	else if (!rewritten->hasAggs && rewritten->distinctClause)
 		rewritten->groupClause = transformDistinctClause(NULL, &rewritten->targetList, rewritten->sortClause, false);
 
+	/* Add additional columns for aggregate values */
+	if (rewritten->hasAggs)
+	{
+		ListCell *lc;
+		List *aggs = NIL;
+		AttrNumber next_resno = list_length(rewritten->targetList) + 1;
+
+		foreach(lc, rewritten->targetList)
+		{
+			TargetEntry *tle = (TargetEntry *) lfirst(lc);
+			char *resname = (colNames == NIL || foreach_current_index(lc) >= list_length(colNames) ?
+								tle->resname : strVal(list_nth(colNames, tle->resno - 1)));
+
+			if (IsA(tle->expr, Aggref))
+				makeIvmAggColumn(pstate, (Aggref *) tle->expr, resname, &next_resno, &aggs);
+		}
+		rewritten->targetList = list_concat(rewritten->targetList, aggs);
+	}
+
+	/* Add count(*) for counting distinct tuples in views */
+	if (rewritten->distinctClause || rewritten->hasAggs)
+	{
+		TargetEntry *tle;
+
 		fn = makeFuncCall(SystemFuncName("count"), NIL, COERCE_EXPLICIT_CALL, -1);
 		fn->agg_star = true;
 
@@ -460,6 +505,91 @@ rewriteQueryForIMMV(Query *query, List *colNames)
 	return rewritten;
 }
 
+/*
+ * makeIvmAggColumn -- make additional aggregate columns for IVM
+ *
+ * For an aggregate column specified by aggref, additional aggregate columns
+ * are added, which are used to calculate the new aggregate value in IMMV.
+ * An additional aggregate columns has a name based on resname
+ * (ex. ivm_count_resname), and resno specified by next_resno. The created
+ * columns are returned to aggs, and the resno for the next column is also
+ * returned to next_resno.
+ *
+ * Currently, an additional count() is created for aggref other than count.
+ * In addition, sum() is created for avg aggregate column.
+ */
+void
+makeIvmAggColumn(ParseState *pstate, Aggref *aggref, char *resname, AttrNumber *next_resno, List **aggs)
+{
+	TargetEntry *tle_count;
+	Node *node;
+	FuncCall *fn;
+	Const	*dmy_arg = makeConst(INT4OID,
+								 -1,
+								 InvalidOid,
+								 sizeof(int32),
+								 Int32GetDatum(1),
+								 false,
+								 true); /* pass by value */
+	const char *aggname = get_func_name(aggref->aggfnoid);
+
+	/*
+	 * For aggregate functions except count, add count() func with the same arg parameters.
+	 * This count result is used for determining if the aggregate value should be NULL or not.
+	 * Also, add sum() func for avg because we need to calculate an average value as sum/count.
+	 *
+	 * XXX: If there are same expressions explicitly in the target list, we can use this instead
+	 * of adding new duplicated one.
+	 */
+	if (strcmp(aggname, "count") != 0)
+	{
+		fn = makeFuncCall(SystemFuncName("count"), NIL, COERCE_EXPLICIT_CALL, -1);
+
+		/* Make a Func with a dummy arg, and then override this by the original agg's args. */
+		node = ParseFuncOrColumn(pstate, fn->funcname, list_make1(dmy_arg), NULL, fn, false, -1);
+		((Aggref *)node)->args = aggref->args;
+
+		tle_count = makeTargetEntry((Expr *) node,
+									*next_resno,
+									pstrdup(makeObjectName("__ivm_count",resname, "_")),
+									false);
+		*aggs = lappend(*aggs, tle_count);
+		(*next_resno)++;
+	}
+	if (strcmp(aggname, "avg") == 0)
+	{
+		List *dmy_args = NIL;
+		ListCell *lc;
+		foreach(lc, aggref->aggargtypes)
+		{
+			Oid		typeid = lfirst_oid(lc);
+			Type	type = typeidType(typeid);
+
+			Const *con = makeConst(typeid,
+								   -1,
+								   typeTypeCollation(type),
+								   typeLen(type),
+								   (Datum) 0,
+								   true,
+								   typeByVal(type));
+			dmy_args = lappend(dmy_args, con);
+			ReleaseSysCache(type);
+		}
+		fn = makeFuncCall(SystemFuncName("sum"), NIL, COERCE_EXPLICIT_CALL, -1);
+
+		/* Make a Func with dummy args, and then override this by the original agg's args. */
+		node = ParseFuncOrColumn(pstate, fn->funcname, dmy_args, NULL, fn, false, -1);
+		((Aggref *)node)->args = aggref->args;
+
+		tle_count = makeTargetEntry((Expr *) node,
+									*next_resno,
+									pstrdup(makeObjectName("__ivm_sum",resname, "_")),
+									false);
+		*aggs = lappend(*aggs, tle_count);
+		(*next_resno)++;
+	}
+}
+
 /*
  * GetIntoRelEFlags --- compute executor flags needed for CREATE TABLE AS
  *
@@ -943,11 +1073,13 @@ CreateIvmTrigger(Oid relOid, Oid viewOid, int16 type, int16 timing, bool ex_lock
 static void
 check_ivm_restriction(Node *node)
 {
-	check_ivm_restriction_walker(node, NULL);
+	check_ivm_restriction_context context = {false};
+
+	check_ivm_restriction_walker(node, &context);
 }
 
 static bool
-check_ivm_restriction_walker(Node *node, void *context)
+check_ivm_restriction_walker(Node *node, check_ivm_restriction_context *context)
 {
 	if (node == NULL)
 		return false;
@@ -976,6 +1108,10 @@ check_ivm_restriction_walker(Node *node, void *context)
 					ereport(ERROR,
 							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 							 errmsg("CTE is not supported on incrementally maintainable materialized view")));
+				if (qry->groupClause != NIL && !qry->hasAggs)
+					ereport(ERROR,
+							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+							 errmsg("GROUP BY clause without aggregate is not supported on incrementally maintainable materialized view")));
 				if (qry->havingQual != NULL)
 					ereport(ERROR,
 							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
@@ -1028,6 +1164,8 @@ check_ivm_restriction_walker(Node *node, void *context)
 					}
 				}
 
+				context->has_agg |= qry->hasAggs;
+
 				/* restrictions for rtable */
 				foreach(lc, qry->rtable)
 				{
@@ -1076,7 +1214,7 @@ check_ivm_restriction_walker(Node *node, void *context)
 
 				}
 
-				query_tree_walker(qry, check_ivm_restriction_walker, NULL, QTW_IGNORE_RANGE_TABLE);
+				query_tree_walker(qry, check_ivm_restriction_walker, (void *) context, QTW_IGNORE_RANGE_TABLE);
 
 				break;
 			}
@@ -1087,8 +1225,12 @@ check_ivm_restriction_walker(Node *node, void *context)
 						ereport(ERROR,
 								(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 								 errmsg("column name %s is not supported on incrementally maintainable materialized view", tle->resname)));
+				if (context->has_agg && !IsA(tle->expr, Aggref) && contain_aggs_of_level((Node *) tle->expr, 0))
+					ereport(ERROR,
+							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+							 errmsg("expression containing an aggregate in it is not supported on incrementally maintainable materialized view")));
 
-				expression_tree_walker(node, check_ivm_restriction_walker, NULL);
+				expression_tree_walker(node, check_ivm_restriction_walker, (void *) context);
 				break;
 			}
 		case T_JoinExpr:
@@ -1100,14 +1242,36 @@ check_ivm_restriction_walker(Node *node, void *context)
 								(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 								 errmsg("OUTER JOIN is not supported on incrementally maintainable materialized view")));
 
-				expression_tree_walker(node, check_ivm_restriction_walker, NULL);
+				expression_tree_walker(node, check_ivm_restriction_walker, (void *) context);
+				break;
 			}
-			break;
 		case T_Aggref:
-			ereport(ERROR,
-					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-					 errmsg("aggregate function is not supported on incrementally maintainable materialized view")));
-			break;
+			{
+				/* Check if this supports IVM */
+				Aggref *aggref = (Aggref *) node;
+				const char *aggname = format_procedure(aggref->aggfnoid);
+
+				if (aggref->aggfilter != NULL)
+					ereport(ERROR,
+							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+							 errmsg("aggregate function with FILTER clause is not supported on incrementally maintainable materialized view")));
+
+				if (aggref->aggdistinct != NULL)
+					ereport(ERROR,
+							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+							 errmsg("aggregate function with DISTINCT arguments is not supported on incrementally maintainable materialized view")));
+
+				if (aggref->aggorder != NULL)
+					ereport(ERROR,
+							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+							 errmsg("aggregate function with ORDER clause is not supported on incrementally maintainable materialized view")));
+
+				if (!check_aggregate_supports_ivm(aggref->aggfnoid))
+					ereport(ERROR,
+							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+							 errmsg("aggregate function %s is not supported on incrementally maintainable materialized view", aggname)));
+				break;
+			}
 		default:
 			expression_tree_walker(node, check_ivm_restriction_walker, (void *) context);
 			break;
@@ -1115,6 +1279,46 @@ check_ivm_restriction_walker(Node *node, void *context)
 	return false;
 }
 
+/*
+ * check_aggregate_supports_ivm
+ *
+ * Check if the given aggregate function is supporting IVM
+ */
+static bool
+check_aggregate_supports_ivm(Oid aggfnoid)
+{
+	switch (aggfnoid)
+	{
+		/* count */
+		case F_COUNT_ANY:
+		case F_COUNT_:
+
+		/* sum */
+		case F_SUM_INT8:
+		case F_SUM_INT4:
+		case F_SUM_INT2:
+		case F_SUM_FLOAT4:
+		case F_SUM_FLOAT8:
+		case F_SUM_MONEY:
+		case F_SUM_INTERVAL:
+		case F_SUM_NUMERIC:
+
+		/* avg */
+		case F_AVG_INT8:
+		case F_AVG_INT4:
+		case F_AVG_INT2:
+		case F_AVG_NUMERIC:
+		case F_AVG_FLOAT4:
+		case F_AVG_FLOAT8:
+		case F_AVG_INTERVAL:
+
+			return true;
+
+		default:
+			return false;
+	}
+}
+
 /*
  * CreateIndexOnIMMV
  *
@@ -1172,7 +1376,29 @@ CreateIndexOnIMMV(Query *query, Relation matviewRel)
 	index->concurrent = false;
 	index->if_not_exists = false;
 
-	if (query->distinctClause)
+	if (query->groupClause)
+	{
+		/* create unique constraint on GROUP BY expression columns */
+		foreach(lc, query->groupClause)
+		{
+			SortGroupClause *scl = (SortGroupClause *) lfirst(lc);
+			TargetEntry *tle = get_sortgroupclause_tle(scl, query->targetList);
+			Form_pg_attribute attr = TupleDescAttr(matviewRel->rd_att, tle->resno - 1);
+			IndexElem  *iparam;
+
+			iparam = makeNode(IndexElem);
+			iparam->name = pstrdup(NameStr(attr->attname));
+			iparam->expr = NULL;
+			iparam->indexcolname = NULL;
+			iparam->collation = NIL;
+			iparam->opclass = NIL;
+			iparam->opclassopts = NIL;
+			iparam->ordering = SORTBY_DEFAULT;
+			iparam->nulls_ordering = SORTBY_NULLS_DEFAULT;
+			index->indexParams = lappend(index->indexParams, iparam);
+		}
+	}
+	else if (query->distinctClause)
 	{
 		/* create unique constraint on all columns */
 		foreach(lc, query->targetList)
@@ -1230,7 +1456,7 @@ CreateIndexOnIMMV(Query *query, Relation matviewRel)
 					(errmsg("could not create an index on materialized view \"%s\" automatically",
 							RelationGetRelationName(matviewRel)),
 					 errdetail("This target list does not have all the primary key columns, "
-							   "or this view does not contain DISTINCT clause."),
+							   "or this view does not contain GROUP BY or DISTINCT clause."),
 					 errhint("Create an index on the materialized view for efficient incremental maintenance.")));
 			return;
 		}
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index aa518f20ef..ee41f0007d 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -30,6 +30,7 @@
 #include "catalog/pg_opclass.h"
 #include "catalog/pg_operator.h"
 #include "commands/cluster.h"
+#include "commands/defrem.h"
 #include "commands/matview.h"
 #include "commands/tablecmds.h"
 #include "commands/tablespace.h"
@@ -39,6 +40,7 @@
 #include "executor/tstoreReceiver.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
+#include "optimizer/optimizer.h"
 #include "parser/analyze.h"
 #include "parser/parse_clause.h"
 #include "parser/parse_func.h"
@@ -111,6 +113,13 @@ static HTAB *mv_trigger_info = NULL;
 
 static bool in_delta_calculation = false;
 
+/* kind of IVM operation for the view */
+typedef enum
+{
+	IVM_ADD,
+	IVM_SUB
+} IvmOp;
+
 /* ENR name for materialized view delta */
 #define NEW_DELTA_ENRNAME "new_delta"
 #define OLD_DELTA_ENRNAME "old_delta"
@@ -142,7 +151,7 @@ static RangeTblEntry *get_prestate_rte(RangeTblEntry *rte, MV_TriggerTable *tabl
 				 QueryEnvironment *queryEnv, Oid matviewid);
 static RangeTblEntry *replace_rte_with_delta(RangeTblEntry *rte, MV_TriggerTable *table, bool is_new,
 		   QueryEnvironment *queryEnv);
-static Query *rewrite_query_for_counting(Query *query, ParseState *pstate);
+static Query *rewrite_query_for_counting_and_aggregates(Query *query, ParseState *pstate);
 
 static void calc_delta(MV_TriggerTable *table, int rte_index, Query *query,
 			DestReceiver *dest_old, DestReceiver *dest_new,
@@ -153,14 +162,27 @@ static Query *rewrite_query_for_postupdate_state(Query *query, MV_TriggerTable *
 static void apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *new_tuplestores,
 			TupleDesc tupdesc_old, TupleDesc tupdesc_new,
 			Query *query, bool use_count, char *count_colname);
+static void append_set_clause_for_count(const char *resname, StringInfo buf_old,
+							StringInfo buf_new,StringInfo aggs_list);
+static void append_set_clause_for_sum(const char *resname, StringInfo buf_old,
+						  StringInfo buf_new, StringInfo aggs_list);
+static void append_set_clause_for_avg(const char *resname, StringInfo buf_old,
+						  StringInfo buf_new, StringInfo aggs_list,
+						  const char *aggtype);
+static char *get_operation_string(IvmOp op, const char *col, const char *arg1, const char *arg2,
+					 const char* count_col, const char *castType);
+static char *get_null_condition_string(IvmOp op, const char *arg1, const char *arg2,
+						  const char* count_col);
 static void apply_old_delta(const char *matviewname, const char *deltaname_old,
 				List *keys);
 static void apply_old_delta_with_count(const char *matviewname, const char *deltaname_old,
-				List *keys, const char *count_colname);
+				List *keys, StringInfo aggs_list, StringInfo aggs_set,
+				const char *count_colname);
 static void apply_new_delta(const char *matviewname, const char *deltaname_new,
 				StringInfo target_list);
 static void apply_new_delta_with_count(const char *matviewname, const char* deltaname_new,
-				List *keys, StringInfo target_list, const char* count_colname);
+				List *keys, StringInfo target_list, StringInfo aggs_set,
+				const char* count_colname);
 static char *get_matching_condition_string(List *keys);
 static void generate_equal(StringInfo querybuf, Oid opttype,
 			   const char *leftop, const char *rightop);
@@ -1431,11 +1453,44 @@ IVM_immediate_maintenance(PG_FUNCTION_ARGS)
 	 * When a base table is truncated, the view content will be empty if the
 	 * view definition query does not contain an aggregate without a GROUP clause.
 	 * Therefore, such views can be truncated.
+	 *
+	 * Aggregate views without a GROUP clause always have one row. Therefore,
+	 * if a base table is truncated, the view will not be empty and will contain
+	 * a row with NULL value (or 0 for count()). So, in this case, we refresh the
+	 * view instead of truncating it.
 	 */
 	if (TRIGGER_FIRED_BY_TRUNCATE(trigdata->tg_event))
 	{
-		ExecuteTruncateGuts(list_make1(matviewRel), list_make1_oid(matviewOid),
-							NIL, DROP_RESTRICT, false, false);
+		if (!(query->hasAggs && query->groupClause == NIL))
+			ExecuteTruncateGuts(list_make1(matviewRel), list_make1_oid(matviewOid),
+								NIL, DROP_RESTRICT, false, false);
+		else
+		{
+			Oid			OIDNewHeap;
+			DestReceiver *dest;
+			uint64		processed = 0;
+			Query	   *dataQuery = rewriteQueryForIMMV(query, NIL);
+			char		relpersistence = matviewRel->rd_rel->relpersistence;
+
+			/*
+			 * Create the transient table that will receive the regenerated data. Lock
+			 * it against access by any other process until commit (by which time it
+			 * will be gone).
+			 */
+			OIDNewHeap = make_new_heap(matviewOid, matviewRel->rd_rel->reltablespace,
+									   matviewRel->rd_rel->relam,
+									   relpersistence,  ExclusiveLock);
+			LockRelationOid(OIDNewHeap, AccessExclusiveLock);
+			dest = CreateTransientRelDestReceiver(OIDNewHeap);
+
+			/* Generate the data */
+			processed = refresh_matview_datafill(dest, dataQuery, NULL, NULL, "");
+			refresh_by_heap_swap(matviewOid, OIDNewHeap, relpersistence);
+
+			/* Inform cumulative stats system about our activity */
+			pgstat_count_truncate(matviewRel);
+			pgstat_count_heap_insert(matviewRel, processed);
+		}
 
 		/* Clean up hash entry and delete tuplestores */
 		clean_up_IVM_hash_entry(entry, false);
@@ -1475,8 +1530,8 @@ IVM_immediate_maintenance(PG_FUNCTION_ARGS)
 	/* Set all tables in the query to pre-update state */
 	rewritten = rewrite_query_for_preupdate_state(rewritten, entry->tables,
 												  pstate, matviewOid);
-	/* Rewrite for counting duplicated tuples */
-	rewritten = rewrite_query_for_counting(rewritten, pstate);
+	/* Rewrite for counting duplicated tuples and aggregates functions*/
+	rewritten = rewrite_query_for_counting_and_aggregates(rewritten, pstate);
 
 	/* Create tuplestores to store view deltas */
 	if (entry->has_old)
@@ -1527,7 +1582,7 @@ IVM_immediate_maintenance(PG_FUNCTION_ARGS)
 
 			count_colname = pstrdup("__ivm_count__");
 
-			if (query->distinctClause)
+			if (query->hasAggs || query->distinctClause)
 				use_count = true;
 
 			/* calculate delta tables */
@@ -1923,17 +1978,34 @@ replace_rte_with_delta(RangeTblEntry *rte, MV_TriggerTable *table, bool is_new,
 }
 
 /*
- * rewrite_query_for_counting
+ * rewrite_query_for_counting_and_aggregates
  *
- * Rewrite query for counting duplicated tuples.
+ * Rewrite query for counting duplicated tuples and aggregate functions.
  */
 static Query *
-rewrite_query_for_counting(Query *query, ParseState *pstate)
+rewrite_query_for_counting_and_aggregates(Query *query, ParseState *pstate)
 {
 	TargetEntry *tle_count;
 	FuncCall *fn;
 	Node *node;
 
+	/* For aggregate views */
+	if (query->hasAggs)
+	{
+		ListCell *lc;
+		List *aggs = NIL;
+		AttrNumber next_resno = list_length(query->targetList) + 1;
+
+		foreach(lc, query->targetList)
+		{
+			TargetEntry *tle = (TargetEntry *) lfirst(lc);
+
+			if (IsA(tle->expr, Aggref))
+				makeIvmAggColumn(pstate, (Aggref *)tle->expr, tle->resname, &next_resno, &aggs);
+		}
+		query->targetList = list_concat(query->targetList, aggs);
+	}
+
 	/* Add count(*) for counting distinct tuples in views */
 	fn = makeFuncCall(SystemFuncName("count"), NIL, COERCE_EXPLICIT_CALL, -1);
 	fn->agg_star = true;
@@ -2006,6 +2078,8 @@ rewrite_query_for_postupdate_state(Query *query, MV_TriggerTable *table, int rte
 	return query;
 }
 
+#define IVM_colname(type, col) makeObjectName("__ivm_" type, col, "_")
+
 /*
  * apply_delta
  *
@@ -2019,6 +2093,9 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n
 {
 	StringInfoData querybuf;
 	StringInfoData target_list_buf;
+	StringInfo	aggs_list_buf = NULL;
+	StringInfo	aggs_set_old = NULL;
+	StringInfo	aggs_set_new = NULL;
 	Relation	matviewRel;
 	char	   *matviewname;
 	ListCell	*lc;
@@ -2041,6 +2118,15 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n
 	initStringInfo(&querybuf);
 	initStringInfo(&target_list_buf);
 
+	if (query->hasAggs)
+	{
+		if (old_tuplestores && tuplestore_tuple_count(old_tuplestores) > 0)
+			aggs_set_old = makeStringInfo();
+		if (new_tuplestores && tuplestore_tuple_count(new_tuplestores) > 0)
+			aggs_set_new = makeStringInfo();
+		aggs_list_buf = makeStringInfo();
+	}
+
 	/* build string of target list */
 	for (i = 0; i < matviewRel->rd_att->natts; i++)
 	{
@@ -2057,13 +2143,61 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n
 	{
 		TargetEntry *tle = (TargetEntry *) lfirst(lc);
 		Form_pg_attribute attr = TupleDescAttr(matviewRel->rd_att, i);
+		char *resname = NameStr(attr->attname);
 
 		i++;
 
 		if (tle->resjunk)
 			continue;
 
-		keys = lappend(keys, attr);
+		/*
+		 * For views without aggregates, all attributes are used as keys to identify a
+		 * tuple in a view.
+		 */
+		if (!query->hasAggs)
+			keys = lappend(keys, attr);
+
+		/* For views with aggregates, we need to build SET clause for updating aggregate
+		 * values. */
+		if (query->hasAggs && IsA(tle->expr, Aggref))
+		{
+			Aggref *aggref = (Aggref *) tle->expr;
+			const char *aggname = get_func_name(aggref->aggfnoid);
+
+			/*
+			 * We can use function names here because it is already checked if these
+			 * can be used in IMMV by its OID at the definition time.
+			 */
+
+			/* count */
+			if (!strcmp(aggname, "count"))
+				append_set_clause_for_count(resname, aggs_set_old, aggs_set_new, aggs_list_buf);
+
+			/* sum */
+			else if (!strcmp(aggname, "sum"))
+				append_set_clause_for_sum(resname, aggs_set_old, aggs_set_new, aggs_list_buf);
+
+			/* avg */
+			else if (!strcmp(aggname, "avg"))
+				append_set_clause_for_avg(resname, aggs_set_old, aggs_set_new, aggs_list_buf,
+										  format_type_be(aggref->aggtype));
+
+			else
+				elog(ERROR, "unsupported aggregate function: %s", aggname);
+		}
+	}
+
+	/* If we have GROUP BY clause, we use its entries as keys. */
+	if (query->hasAggs && query->groupClause)
+	{
+		foreach (lc, query->groupClause)
+		{
+			SortGroupClause *sgcl = (SortGroupClause *) lfirst(lc);
+			TargetEntry		*tle = get_sortgroupclause_tle(sgcl, query->targetList);
+			Form_pg_attribute attr = TupleDescAttr(matviewRel->rd_att, tle->resno - 1);
+
+			keys = lappend(keys, attr);
+		}
 	}
 
 	/* Start maintaining the materialized view. */
@@ -2094,7 +2228,8 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n
 		if (use_count)
 			/* apply old delta and get rows to be recalculated */
 			apply_old_delta_with_count(matviewname, OLD_DELTA_ENRNAME,
-									   keys, count_colname);
+									   keys, aggs_list_buf, aggs_set_old,
+									   count_colname);
 		else
 			apply_old_delta(matviewname, OLD_DELTA_ENRNAME, keys);
 
@@ -2120,7 +2255,7 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n
 		/* apply new delta */
 		if (use_count)
 			apply_new_delta_with_count(matviewname, NEW_DELTA_ENRNAME,
-								keys, &target_list_buf, count_colname);
+								keys, aggs_set_new, &target_list_buf, count_colname);
 		else
 			apply_new_delta(matviewname, NEW_DELTA_ENRNAME, &target_list_buf);
 	}
@@ -2135,6 +2270,250 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n
 		elog(ERROR, "SPI_finish failed");
 }
 
+/*
+ * append_set_clause_for_count
+ *
+ * Append SET clause string for count aggregation to given buffers.
+ * Also, append resnames required for calculating the aggregate value.
+ */
+static void
+append_set_clause_for_count(const char *resname, StringInfo buf_old,
+							StringInfo buf_new,StringInfo aggs_list)
+{
+	/* For tuple deletion */
+	if (buf_old)
+	{
+		/* resname = mv.resname - t.resname */
+		appendStringInfo(buf_old,
+			", %s = %s",
+			quote_qualified_identifier(NULL, resname),
+			get_operation_string(IVM_SUB, resname, "mv", "t", NULL, NULL));
+	}
+	/* For tuple insertion */
+	if (buf_new)
+	{
+		/* resname = mv.resname + diff.resname */
+		appendStringInfo(buf_new,
+			", %s = %s",
+			quote_qualified_identifier(NULL, resname),
+			get_operation_string(IVM_ADD, resname, "mv", "diff", NULL, NULL));
+	}
+
+	appendStringInfo(aggs_list, ", %s",
+		quote_qualified_identifier("diff", resname)
+	);
+}
+
+/*
+ * append_set_clause_for_sum
+ *
+ * Append SET clause string for sum aggregation to given buffers.
+ * Also, append resnames required for calculating the aggregate value.
+ */
+static void
+append_set_clause_for_sum(const char *resname, StringInfo buf_old,
+						  StringInfo buf_new, StringInfo aggs_list)
+{
+	char *count_col = IVM_colname("count", resname);
+
+	/* For tuple deletion */
+	if (buf_old)
+	{
+		/* sum = mv.sum - t.sum */
+		appendStringInfo(buf_old,
+			", %s = %s",
+			quote_qualified_identifier(NULL, resname),
+			get_operation_string(IVM_SUB, resname, "mv", "t", count_col, NULL)
+		);
+		/* count = mv.count - t.count */
+		appendStringInfo(buf_old,
+			", %s = %s",
+			quote_qualified_identifier(NULL, count_col),
+			get_operation_string(IVM_SUB, count_col, "mv", "t", NULL, NULL)
+		);
+	}
+	/* For tuple insertion */
+	if (buf_new)
+	{
+		/* sum = mv.sum + diff.sum */
+		appendStringInfo(buf_new,
+			", %s = %s",
+			quote_qualified_identifier(NULL, resname),
+			get_operation_string(IVM_ADD, resname, "mv", "diff", count_col, NULL)
+		);
+		/* count = mv.count + diff.count */
+		appendStringInfo(buf_new,
+			", %s = %s",
+			quote_qualified_identifier(NULL, count_col),
+			get_operation_string(IVM_ADD, count_col, "mv", "diff", NULL, NULL)
+		);
+	}
+
+	appendStringInfo(aggs_list, ", %s, %s",
+		quote_qualified_identifier("diff", resname),
+		quote_qualified_identifier("diff", IVM_colname("count", resname))
+	);
+}
+
+/*
+ * append_set_clause_for_avg
+ *
+ * Append SET clause string for avg aggregation to given buffers.
+ * Also, append resnames required for calculating the aggregate value.
+ */
+static void
+append_set_clause_for_avg(const char *resname, StringInfo buf_old,
+						  StringInfo buf_new, StringInfo aggs_list,
+						  const char *aggtype)
+{
+	char *sum_col = IVM_colname("sum", resname);
+	char *count_col = IVM_colname("count", resname);
+
+	/* For tuple deletion */
+	if (buf_old)
+	{
+		/* avg = (mv.sum - t.sum)::aggtype / (mv.count - t.count) */
+		appendStringInfo(buf_old,
+			", %s = %s OPERATOR(pg_catalog./) %s",
+			quote_qualified_identifier(NULL, resname),
+			get_operation_string(IVM_SUB, sum_col, "mv", "t", count_col, aggtype),
+			get_operation_string(IVM_SUB, count_col, "mv", "t", NULL, NULL)
+		);
+		/* sum = mv.sum - t.sum */
+		appendStringInfo(buf_old,
+			", %s = %s",
+			quote_qualified_identifier(NULL, sum_col),
+			get_operation_string(IVM_SUB, sum_col, "mv", "t", count_col, NULL)
+		);
+		/* count = mv.count - t.count */
+		appendStringInfo(buf_old,
+			", %s = %s",
+			quote_qualified_identifier(NULL, count_col),
+			get_operation_string(IVM_SUB, count_col, "mv", "t", NULL, NULL)
+		);
+
+	}
+	/* For tuple insertion */
+	if (buf_new)
+	{
+		/* avg = (mv.sum + diff.sum)::aggtype / (mv.count + diff.count) */
+		appendStringInfo(buf_new,
+			", %s = %s OPERATOR(pg_catalog./) %s",
+			quote_qualified_identifier(NULL, resname),
+			get_operation_string(IVM_ADD, sum_col, "mv", "diff", count_col, aggtype),
+			get_operation_string(IVM_ADD, count_col, "mv", "diff", NULL, NULL)
+		);
+		/* sum = mv.sum + diff.sum */
+		appendStringInfo(buf_new,
+			", %s = %s",
+			quote_qualified_identifier(NULL, sum_col),
+			get_operation_string(IVM_ADD, sum_col, "mv", "diff", count_col, NULL)
+		);
+		/* count = mv.count + diff.count */
+		appendStringInfo(buf_new,
+			", %s = %s",
+			quote_qualified_identifier(NULL, count_col),
+			get_operation_string(IVM_ADD, count_col, "mv", "diff", NULL, NULL)
+		);
+	}
+
+	appendStringInfo(aggs_list, ", %s, %s, %s",
+		quote_qualified_identifier("diff", resname),
+		quote_qualified_identifier("diff", IVM_colname("sum", resname)),
+		quote_qualified_identifier("diff", IVM_colname("count", resname))
+	);
+}
+
+/*
+ * get_operation_string
+ *
+ * Build a string to calculate the new aggregate values.
+ */
+static char *
+get_operation_string(IvmOp op, const char *col, const char *arg1, const char *arg2,
+					 const char* count_col, const char *castType)
+{
+	StringInfoData buf;
+	StringInfoData castString;
+	char   *col1 = quote_qualified_identifier(arg1, col);
+	char   *col2 = quote_qualified_identifier(arg2, col);
+	char	op_char = (op == IVM_SUB ? '-' : '+');
+
+	initStringInfo(&buf);
+	initStringInfo(&castString);
+
+	if (castType)
+		appendStringInfo(&castString, "::%s", castType);
+
+	if (!count_col)
+	{
+		/*
+		 * If the attributes don't have count columns then calc the result
+		 * by using the operator simply.
+		 */
+		appendStringInfo(&buf, "(%s OPERATOR(pg_catalog.%c) %s)%s",
+			col1, op_char, col2, castString.data);
+	}
+	else
+	{
+		/*
+		 * If the attributes have count columns then consider the condition
+		 * where the result becomes NULL.
+		 */
+		char *null_cond = get_null_condition_string(op, arg1, arg2, count_col);
+
+		appendStringInfo(&buf,
+			"(CASE WHEN %s THEN NULL "
+				"WHEN %s IS NULL THEN %s "
+				"WHEN %s IS NULL THEN %s "
+				"ELSE (%s OPERATOR(pg_catalog.%c) %s)%s END)",
+			null_cond,
+			col1, col2,
+			col2, col1,
+			col1, op_char, col2, castString.data
+		);
+	}
+
+	return buf.data;
+}
+
+/*
+ * get_null_condition_string
+ *
+ * Build a predicate string for CASE clause to check if an aggregate value
+ * will became NULL after the given operation is applied.
+ */
+static char *
+get_null_condition_string(IvmOp op, const char *arg1, const char *arg2,
+						  const char* count_col)
+{
+	StringInfoData null_cond;
+	initStringInfo(&null_cond);
+
+	switch (op)
+	{
+		case IVM_ADD:
+			appendStringInfo(&null_cond,
+				"%s OPERATOR(pg_catalog.=) 0 AND %s OPERATOR(pg_catalog.=) 0",
+				quote_qualified_identifier(arg1, count_col),
+				quote_qualified_identifier(arg2, count_col)
+			);
+			break;
+		case IVM_SUB:
+			appendStringInfo(&null_cond,
+				"%s OPERATOR(pg_catalog.=) %s",
+				quote_qualified_identifier(arg1, count_col),
+				quote_qualified_identifier(arg2, count_col)
+			);
+			break;
+		default:
+			elog(ERROR,"unknown operation");
+	}
+
+	return null_cond.data;
+}
+
+
 /*
  * apply_old_delta_with_count
  *
@@ -2142,13 +2521,20 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n
  * which contains tuples to be deleted from to a materialized view given by
  * matviewname.  This is used when counting is required, that is, the view
  * has aggregate or distinct.
+ *
+ * If the view desn't have aggregates or has GROUP BY, this requires a keys
+ * list to identify a tuple in the view. If the view has aggregates, this
+ * requires strings representing resnames of aggregates and SET clause for
+ * updating aggregate values.
  */
 static void
 apply_old_delta_with_count(const char *matviewname, const char *deltaname_old,
-				List *keys, const char *count_colname)
+				List *keys, StringInfo aggs_list, StringInfo aggs_set,
+				const char *count_colname)
 {
 	StringInfoData	querybuf;
 	char   *match_cond;
+	bool	agg_without_groupby = (list_length(keys) == 0);
 
 	/* build WHERE condition for searching tuples to be deleted */
 	match_cond = get_matching_condition_string(keys);
@@ -2158,22 +2544,26 @@ apply_old_delta_with_count(const char *matviewname, const char *deltaname_old,
 	appendStringInfo(&querybuf,
 					"WITH t AS ("			/* collecting tid of target tuples in the view */
 						"SELECT diff.%s, "			/* count column */
-								"(diff.%s OPERATOR(pg_catalog.=) mv.%s) AS for_dlt, "
+								"(diff.%s OPERATOR(pg_catalog.=) mv.%s AND %s) AS for_dlt, "
 								"mv.ctid "
+								"%s "				/* aggregate columns */
 						"FROM %s AS mv, %s AS diff "
 						"WHERE %s"					/* tuple matching condition */
 					"), updt AS ("			/* update a tuple if this is not to be deleted */
 						"UPDATE %s AS mv SET %s = mv.%s OPERATOR(pg_catalog.-) t.%s "
+											"%s"	/* SET clauses for aggregates */
 						"FROM t WHERE mv.ctid OPERATOR(pg_catalog.=) t.ctid AND NOT for_dlt "
 					")"
 					/* delete a tuple if this is to be deleted */
 					"DELETE FROM %s AS mv USING t "
 					"WHERE mv.ctid OPERATOR(pg_catalog.=) t.ctid AND for_dlt",
 					count_colname,
-					count_colname, count_colname,
+					count_colname, count_colname, (agg_without_groupby ? "false" : "true"),
+					(aggs_list != NULL ? aggs_list->data : ""),
 					matviewname, deltaname_old,
 					match_cond,
 					matviewname, count_colname, count_colname, count_colname,
+					(aggs_set != NULL ? aggs_set->data : ""),
 					matviewname);
 
 	if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE)
@@ -2237,10 +2627,15 @@ apply_old_delta(const char *matviewname, const char *deltaname_old,
  * matviewname.  This is used when counting is required, that is, the view
  * has aggregate or distinct. Also, when a table in EXISTS sub queries
  * is modified.
+ *
+ * If the view desn't have aggregates or has GROUP BY, this requires a keys
+ * list to identify a tuple in the view. If the view has aggregates, this
+ * requires strings representing SET clause for updating aggregate values.
  */
 static void
 apply_new_delta_with_count(const char *matviewname, const char* deltaname_new,
-				List *keys, StringInfo target_list, const char* count_colname)
+				List *keys, StringInfo aggs_set, StringInfo target_list,
+				const char* count_colname)
 {
 	StringInfoData	querybuf;
 	StringInfoData	returning_keys;
@@ -2271,6 +2666,7 @@ apply_new_delta_with_count(const char *matviewname, const char* deltaname_new,
 	appendStringInfo(&querybuf,
 					"WITH updt AS ("		/* update a tuple if this exists in the view */
 						"UPDATE %s AS mv SET %s = mv.%s OPERATOR(pg_catalog.+) diff.%s "
+											"%s "	/* SET clauses for aggregates */
 						"FROM %s AS diff "
 						"WHERE %s "					/* tuple matching condition */
 						"RETURNING %s"				/* returning keys of updated tuples */
@@ -2278,6 +2674,7 @@ apply_new_delta_with_count(const char *matviewname, const char* deltaname_new,
 						"SELECT %s FROM %s AS diff "
 						"WHERE NOT EXISTS (SELECT 1 FROM updt AS mv WHERE %s);",
 					matviewname, count_colname, count_colname, count_colname,
+					(aggs_set != NULL ? aggs_set->data : ""),
 					deltaname_new,
 					match_cond,
 					returning_keys.data,
diff --git a/src/include/commands/createas.h b/src/include/commands/createas.h
index 76a7873ebf..599bae3b5a 100644
--- a/src/include/commands/createas.h
+++ b/src/include/commands/createas.h
@@ -30,6 +30,7 @@ extern void CreateIvmTriggersOnBaseTables(Query *qry, Oid matviewOid);
 extern void CreateIndexOnIMMV(Query *query, Relation matviewRel);
 
 extern Query *rewriteQueryForIMMV(Query *query, List *colNames);
+extern void makeIvmAggColumn(ParseState *pstate, Aggref *aggref, char *resname, AttrNumber *next_resno, List **aggs);
 
 extern int	GetIntoRelEFlags(IntoClause *intoClause);
 
-- 
2.25.1

