From f43b5204477c73073989e3fb60fe6167e9f6551e Mon Sep 17 00:00:00 2001
From: Yugo Nagata <nagata@sraoss.co.jp>
Date: Wed, 31 May 2023 20:58:25 +0900
Subject: [PATCH v29 09/11] Add support for min/max aggregates for IVM

Supporting min and max is more complicated than count, sum, or avg.

For an example of min, when tuples are inserted, the current min value
in the view and the min value in the inseteted tuples are compared,
then the smaller one is used as the latest min value. On the other
hand, when tuples are deleted, if the current min value in the view
equals to the min in the deleted tuples, we need re-computation the
latest min value from base tables. Otherwise, the current value in
the view remains.
---
 src/backend/commands/createas.c |  45 +++
 src/backend/commands/matview.c  | 644 +++++++++++++++++++++++++++++++-
 2 files changed, 680 insertions(+), 9 deletions(-)

diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index c8aa558f2e..c40ea6b2bc 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -1312,6 +1312,51 @@ check_aggregate_supports_ivm(Oid aggfnoid)
 		case F_AVG_FLOAT8:
 		case F_AVG_INTERVAL:
 
+		/* min */
+		case F_MIN_ANYARRAY:
+		case F_MIN_INT8:
+		case F_MIN_INT4:
+		case F_MIN_INT2:
+		case F_MIN_OID:
+		case F_MIN_FLOAT4:
+		case F_MIN_FLOAT8:
+		case F_MIN_DATE:
+		case F_MIN_TIME:
+		case F_MIN_TIMETZ:
+		case F_MIN_MONEY:
+		case F_MIN_TIMESTAMP:
+		case F_MIN_TIMESTAMPTZ:
+		case F_MIN_INTERVAL:
+		case F_MIN_TEXT:
+		case F_MIN_NUMERIC:
+		case F_MIN_BPCHAR:
+		case F_MIN_TID:
+		case F_MIN_ANYENUM:
+		case F_MIN_INET:
+		case F_MIN_PG_LSN:
+
+		/* max */
+		case F_MAX_ANYARRAY:
+		case F_MAX_INT8:
+		case F_MAX_INT4:
+		case F_MAX_INT2:
+		case F_MAX_OID:
+		case F_MAX_FLOAT4:
+		case F_MAX_FLOAT8:
+		case F_MAX_DATE:
+		case F_MAX_TIME:
+		case F_MAX_TIMETZ:
+		case F_MAX_MONEY:
+		case F_MAX_TIMESTAMP:
+		case F_MAX_TIMESTAMPTZ:
+		case F_MAX_INTERVAL:
+		case F_MAX_TEXT:
+		case F_MAX_NUMERIC:
+		case F_MAX_BPCHAR:
+		case F_MAX_TID:
+		case F_MAX_ANYENUM:
+		case F_MAX_INET:
+		case F_MAX_PG_LSN:
 			return true;
 
 		default:
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index ee41f0007d..eff512d40c 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -73,6 +73,34 @@ typedef struct
 
 #define MV_INIT_QUERYHASHSIZE	16
 
+/* MV query type codes */
+#define MV_PLAN_RECALC			1
+#define MV_PLAN_SET_VALUE		2
+
+/*
+ * MI_QueryKey
+ *
+ * The key identifying a prepared SPI plan in our query hashtable
+ */
+typedef struct MV_QueryKey
+{
+	Oid			matview_id;	/* OID of materialized view */
+	int32		query_type;	/* query type ID, see MV_PLAN_XXX above */
+} MV_QueryKey;
+
+/*
+ * MV_QueryHashEntry
+ *
+ * Hash entry for cached plans used to maintain materialized views.
+ */
+typedef struct MV_QueryHashEntry
+{
+	MV_QueryKey key;
+	SPIPlanPtr	plan;
+	SearchPathMatcher *search_path;        /* search_path used for parsing
+											 * and planning */
+} MV_QueryHashEntry;
+
 /*
  * MV_TriggerHashEntry
  *
@@ -109,6 +137,7 @@ typedef struct MV_TriggerTable
 	TupleTableSlot *slot;		/* for checking visibility in the pre-state table */
 } MV_TriggerTable;
 
+static HTAB *mv_query_cache = NULL;
 static HTAB *mv_trigger_info = NULL;
 
 static bool in_delta_calculation = false;
@@ -169,6 +198,9 @@ static void append_set_clause_for_sum(const char *resname, StringInfo buf_old,
 static void append_set_clause_for_avg(const char *resname, StringInfo buf_old,
 						  StringInfo buf_new, StringInfo aggs_list,
 						  const char *aggtype);
+static void append_set_clause_for_minmax(const char *resname, StringInfo buf_old,
+							 StringInfo buf_new, StringInfo aggs_list,
+							 bool is_min);
 static char *get_operation_string(IvmOp op, const char *col, const char *arg1, const char *arg2,
 					 const char* count_col, const char *castType);
 static char *get_null_condition_string(IvmOp op, const char *arg1, const char *arg2,
@@ -177,17 +209,30 @@ static void apply_old_delta(const char *matviewname, const char *deltaname_old,
 				List *keys);
 static void apply_old_delta_with_count(const char *matviewname, const char *deltaname_old,
 				List *keys, StringInfo aggs_list, StringInfo aggs_set,
-				const char *count_colname);
+				List *minmax_list, List *is_min_list,
+				const char *count_colname,
+				SPITupleTable **tuptable_recalc, uint64 *num_recalc);
 static void apply_new_delta(const char *matviewname, const char *deltaname_new,
 				StringInfo target_list);
 static void apply_new_delta_with_count(const char *matviewname, const char* deltaname_new,
 				List *keys, StringInfo target_list, StringInfo aggs_set,
 				const char* count_colname);
 static char *get_matching_condition_string(List *keys);
+static char *get_returning_string(List *minmax_list, List *is_min_list, List *keys);
+static char *get_minmax_recalc_condition_string(List *minmax_list, List *is_min_list);
+static char *get_select_for_recalc_string(List *keys);
+static void recalc_and_set_values(SPITupleTable *tuptable_recalc, int64 num_tuples,
+					  List *namelist, List *keys, Relation matviewRel);
+static SPIPlanPtr get_plan_for_recalc(Oid matviewOid, List *namelist, List *keys, Oid *keyTypes);
+static SPIPlanPtr get_plan_for_set_values(Oid matviewOid, char *matviewname, List *namelist,
+						Oid *valTypes);
 static void generate_equal(StringInfo querybuf, Oid opttype,
 			   const char *leftop, const char *rightop);
 
 static void mv_InitHashTables(void);
+static SPIPlanPtr mv_FetchPreparedPlan(MV_QueryKey *key);
+static void mv_HashPreparedPlan(MV_QueryKey *key, SPIPlanPtr plan);
+static void mv_BuildQueryKey(MV_QueryKey *key, Oid matview_id, int32 query_type);
 static void clean_up_IVM_hash_entry(MV_TriggerHashEntry *entry, bool is_abort);
 
 /*
@@ -2101,6 +2146,8 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n
 	ListCell	*lc;
 	int			i;
 	List	   *keys = NIL;
+	List	   *minmax_list = NIL;
+	List	   *is_min_list = NIL;
 
 
 	/*
@@ -2182,6 +2229,17 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n
 				append_set_clause_for_avg(resname, aggs_set_old, aggs_set_new, aggs_list_buf,
 										  format_type_be(aggref->aggtype));
 
+			/* min/max */
+			else if (!strcmp(aggname, "min") || !strcmp(aggname, "max"))
+			{
+				bool	is_min = (!strcmp(aggname, "min"));
+
+				append_set_clause_for_minmax(resname, aggs_set_old, aggs_set_new, aggs_list_buf, is_min);
+
+				/* make a resname list of min and max aggregates */
+				minmax_list = lappend(minmax_list, resname);
+				is_min_list = lappend_int(is_min_list, is_min);
+			}
 			else
 				elog(ERROR, "unsupported aggregate function: %s", aggname);
 		}
@@ -2211,6 +2269,8 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n
 	if (old_tuplestores && tuplestore_tuple_count(old_tuplestores) > 0)
 	{
 		EphemeralNamedRelation enr = palloc(sizeof(EphemeralNamedRelationData));
+		SPITupleTable  *tuptable_recalc = NULL;
+		uint64			num_recalc;
 		int				rc;
 
 		/* convert tuplestores to ENR, and register for SPI */
@@ -2229,10 +2289,18 @@ apply_delta(Oid matviewOid, Tuplestorestate *old_tuplestores, Tuplestorestate *n
 			/* apply old delta and get rows to be recalculated */
 			apply_old_delta_with_count(matviewname, OLD_DELTA_ENRNAME,
 									   keys, aggs_list_buf, aggs_set_old,
-									   count_colname);
+									   minmax_list, is_min_list,
+									   count_colname, &tuptable_recalc, &num_recalc);
 		else
 			apply_old_delta(matviewname, OLD_DELTA_ENRNAME, keys);
 
+		/*
+		 * If we have min or max, we might have to recalculate aggregate values from base tables
+		 * on some tuples. TIDs and keys such tuples are returned as a result of the above query.
+		 */
+		if (minmax_list && tuptable_recalc)
+			recalc_and_set_values(tuptable_recalc, num_recalc, minmax_list, keys, matviewRel);
+
 	}
 	/* For tuple insertion */
 	if (new_tuplestores && tuplestore_tuple_count(new_tuplestores) > 0)
@@ -2424,6 +2492,70 @@ append_set_clause_for_avg(const char *resname, StringInfo buf_old,
 	);
 }
 
+/*
+ * append_set_clause_for_minmax
+ *
+ * Append SET clause string for min or max aggregation to given buffers.
+ * Also, append resnames required for calculating the aggregate value.
+ * is_min is true if this is min, false if not.
+ */
+static void
+append_set_clause_for_minmax(const char *resname, StringInfo buf_old,
+							 StringInfo buf_new, StringInfo aggs_list,
+							 bool is_min)
+{
+	char *count_col = IVM_colname("count", resname);
+
+	/* For tuple deletion */
+	if (buf_old)
+	{
+		/*
+		 * If the new value doesn't became NULL then use the value remaining
+		 * in the view although this will be recomputated afterwords.
+		 */
+		appendStringInfo(buf_old,
+			", %s = CASE WHEN %s THEN NULL ELSE %s END",
+			quote_qualified_identifier(NULL, resname),
+			get_null_condition_string(IVM_SUB, "mv", "t", count_col),
+			quote_qualified_identifier("mv", resname)
+		);
+		/* count = mv.count - t.count */
+		appendStringInfo(buf_old,
+			", %s = %s",
+			quote_qualified_identifier(NULL, count_col),
+			get_operation_string(IVM_SUB, count_col, "mv", "t", NULL, NULL)
+		);
+	}
+	/* For tuple insertion */
+	if (buf_new)
+	{
+		/*
+		 * min = LEAST(mv.min, diff.min)
+		 * max = GREATEST(mv.max, diff.max)
+		 */
+		appendStringInfo(buf_new,
+			", %s = CASE WHEN %s THEN NULL ELSE %s(%s,%s) END",
+			quote_qualified_identifier(NULL, resname),
+			get_null_condition_string(IVM_ADD, "mv", "diff", count_col),
+
+			is_min ? "LEAST" : "GREATEST",
+			quote_qualified_identifier("mv", resname),
+			quote_qualified_identifier("diff", resname)
+		);
+		/* count = mv.count + diff.count */
+		appendStringInfo(buf_new,
+			", %s = %s",
+			quote_qualified_identifier(NULL, count_col),
+			get_operation_string(IVM_ADD, count_col, "mv", "diff", NULL, NULL)
+		);
+	}
+
+	appendStringInfo(aggs_list, ", %s, %s",
+		quote_qualified_identifier("diff", resname),
+		quote_qualified_identifier("diff", IVM_colname("count", resname))
+	);
+}
+
 /*
  * get_operation_string
  *
@@ -2526,19 +2658,44 @@ get_null_condition_string(IvmOp op, const char *arg1, const char *arg2,
  * list to identify a tuple in the view. If the view has aggregates, this
  * requires strings representing resnames of aggregates and SET clause for
  * updating aggregate values.
+ *
+ * If the view has min or max aggregate, this requires a list of resnames of
+ * min/max aggregates and a list of boolean which represents which entries in
+ * minmax_list is min. These are necessary to check if we need to recalculate
+ * min or max aggregate values. In this case, this query returns TID and keys
+ * of tuples which need to be recalculated.  This result and the number of rows
+ * are stored in tuptables and num_recalc repectedly.
+ *
  */
 static void
 apply_old_delta_with_count(const char *matviewname, const char *deltaname_old,
 				List *keys, StringInfo aggs_list, StringInfo aggs_set,
-				const char *count_colname)
+				List *minmax_list, List *is_min_list,
+				const char *count_colname,
+				SPITupleTable **tuptable_recalc, uint64 *num_recalc)
 {
 	StringInfoData	querybuf;
 	char   *match_cond;
+	char   *updt_returning = "";
+	char   *select_for_recalc = "SELECT";
 	bool	agg_without_groupby = (list_length(keys) == 0);
 
+	Assert(tuptable_recalc != NULL);
+	Assert(num_recalc != NULL);
+
 	/* build WHERE condition for searching tuples to be deleted */
 	match_cond = get_matching_condition_string(keys);
 
+	/*
+	 * We need a special RETURNING clause and SELECT statement for min/max to
+	 * check which tuple needs re-calculation from base tables.
+	 */
+	if (minmax_list)
+	{
+		updt_returning = get_returning_string(minmax_list, is_min_list, keys);
+		select_for_recalc = get_select_for_recalc_string(keys);
+	}
+
 	/* Search for matching tuples from the view and update or delete if found. */
 	initStringInfo(&querybuf);
 	appendStringInfo(&querybuf,
@@ -2553,10 +2710,11 @@ apply_old_delta_with_count(const char *matviewname, const char *deltaname_old,
 						"UPDATE %s AS mv SET %s = mv.%s OPERATOR(pg_catalog.-) t.%s "
 											"%s"	/* SET clauses for aggregates */
 						"FROM t WHERE mv.ctid OPERATOR(pg_catalog.=) t.ctid AND NOT for_dlt "
-					")"
-					/* delete a tuple if this is to be deleted */
-					"DELETE FROM %s AS mv USING t "
-					"WHERE mv.ctid OPERATOR(pg_catalog.=) t.ctid AND for_dlt",
+						"%s"						/* RETURNING clause for recalc infomation */
+					"), dlt AS ("			/* delete a tuple if this is to be deleted */
+						"DELETE FROM %s AS mv USING t "
+						"WHERE mv.ctid OPERATOR(pg_catalog.=) t.ctid AND for_dlt"
+					") %s",							/* SELECT returning which tuples need to be recalculated */
 					count_colname,
 					count_colname, count_colname, (agg_without_groupby ? "false" : "true"),
 					(aggs_list != NULL ? aggs_list->data : ""),
@@ -2564,10 +2722,25 @@ apply_old_delta_with_count(const char *matviewname, const char *deltaname_old,
 					match_cond,
 					matviewname, count_colname, count_colname, count_colname,
 					(aggs_set != NULL ? aggs_set->data : ""),
-					matviewname);
+					updt_returning,
+					matviewname,
+					select_for_recalc);
 
-	if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE)
+	if (SPI_exec(querybuf.data, 0) != SPI_OK_SELECT)
 		elog(ERROR, "SPI_exec failed: %s", querybuf.data);
+
+
+	/* Return tuples to be recalculated. */
+	if (minmax_list)
+	{
+		*tuptable_recalc = SPI_tuptable;
+		*num_recalc = SPI_processed;
+	}
+	else
+	{
+		*tuptable_recalc = NULL;
+		*num_recalc = 0;
+	}
 }
 
 /*
@@ -2750,6 +2923,349 @@ get_matching_condition_string(List *keys)
 	return match_cond.data;
 }
 
+/*
+ * get_returning_string
+ *
+ * Build a string for RETURNING clause of UPDATE used in apply_old_delta_with_count.
+ * This clause returns ctid and a boolean value that indicates if we need to
+ * recalculate min or max value, for each updated row.
+ */
+static char *
+get_returning_string(List *minmax_list, List *is_min_list, List *keys)
+{
+	StringInfoData returning;
+	char		*recalc_cond;
+	ListCell	*lc;
+
+	Assert(minmax_list != NIL && is_min_list != NIL);
+	recalc_cond = get_minmax_recalc_condition_string(minmax_list, is_min_list);
+
+	initStringInfo(&returning);
+
+	appendStringInfo(&returning, "RETURNING mv.ctid AS tid, (%s) AS recalc", recalc_cond);
+	foreach (lc, keys)
+	{
+		Form_pg_attribute attr = (Form_pg_attribute) lfirst(lc);
+		char *resname = NameStr(attr->attname);
+		appendStringInfo(&returning, ", %s", quote_qualified_identifier("mv", resname));
+	}
+
+	return returning.data;
+}
+
+/*
+ * get_minmax_recalc_condition_string
+ *
+ * Build a predicate string for checking if any min/max aggregate
+ * value needs to be recalculated.
+ */
+static char *
+get_minmax_recalc_condition_string(List *minmax_list, List *is_min_list)
+{
+	StringInfoData recalc_cond;
+	ListCell	*lc1, *lc2;
+
+	initStringInfo(&recalc_cond);
+
+	Assert (list_length(minmax_list) == list_length(is_min_list));
+
+	forboth (lc1, minmax_list, lc2, is_min_list)
+	{
+		char   *resname = (char *) lfirst(lc1);
+		bool	is_min = (bool) lfirst_int(lc2);
+		char   *op_str = (is_min ? ">=" : "<=");
+
+		appendStringInfo(&recalc_cond, "%s OPERATOR(pg_catalog.%s) %s",
+			quote_qualified_identifier("mv", resname),
+			op_str,
+			quote_qualified_identifier("t", resname)
+		);
+
+		if (lnext(minmax_list, lc1))
+			appendStringInfo(&recalc_cond, " OR ");
+	}
+
+	return recalc_cond.data;
+}
+
+/*
+ * get_select_for_recalc_string
+ *
+ * Build a query to return tid and keys of tuples which need
+ * recalculation. This is used as the result of the query
+ * built by apply_old_delta.
+ */
+static char *
+get_select_for_recalc_string(List *keys)
+{
+	StringInfoData qry;
+	ListCell	*lc;
+
+	initStringInfo(&qry);
+
+	appendStringInfo(&qry, "SELECT tid");
+	foreach (lc, keys)
+	{
+		Form_pg_attribute attr = (Form_pg_attribute) lfirst(lc);
+		appendStringInfo(&qry, ", %s", NameStr(attr->attname));
+	}
+
+	appendStringInfo(&qry, " FROM updt WHERE recalc");
+
+	return qry.data;
+}
+
+/*
+ * recalc_and_set_values
+ *
+ * Recalculate tuples in a materialized from base tables and update these.
+ * The tuples which needs recalculation are specified by keys, and resnames
+ * of columns to be updated are specified by namelist. TIDs and key values
+ * are given by tuples in tuptable_recalc. Its first attribute must be TID
+ * and key values must be following this.
+ */
+static void
+recalc_and_set_values(SPITupleTable *tuptable_recalc, int64 num_tuples,
+					  List *namelist, List *keys, Relation matviewRel)
+{
+	TupleDesc   tupdesc_recalc = tuptable_recalc->tupdesc;
+	Oid		   *keyTypes = NULL, *types = NULL;
+	char	   *keyNulls = NULL, *nulls = NULL;
+	Datum	   *keyVals = NULL, *vals = NULL;
+	int			num_vals = list_length(namelist);
+	int			num_keys = list_length(keys);
+	uint64      i;
+	Oid			matviewOid;
+	char	   *matviewname;
+
+	matviewOid = RelationGetRelid(matviewRel);
+	matviewname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
+											 RelationGetRelationName(matviewRel));
+
+	/* If we have keys, initialize arrays for them. */
+	if (keys)
+	{
+		keyTypes = palloc(sizeof(Oid) * num_keys);
+		keyNulls = palloc(sizeof(char) * num_keys);
+		keyVals = palloc(sizeof(Datum) * num_keys);
+		/* a tuple contains keys to be recalculated and ctid to be updated*/
+		Assert(tupdesc_recalc->natts == num_keys + 1);
+
+		/* Types of key attributes  */
+		for (i = 0; i < num_keys; i++)
+			keyTypes[i] = TupleDescAttr(tupdesc_recalc, i + 1)->atttypid;
+	}
+
+	/* allocate memory for all attribute names and tid */
+	types = palloc(sizeof(Oid) * (num_vals + 1));
+	nulls = palloc(sizeof(char) * (num_vals + 1));
+	vals = palloc(sizeof(Datum) * (num_vals + 1));
+
+	/* For each tuple which needs recalculation */
+	for (i = 0; i < num_tuples; i++)
+	{
+		int j;
+		bool isnull;
+		SPIPlanPtr plan;
+		SPITupleTable *tuptable_newvals;
+		TupleDesc   tupdesc_newvals;
+
+		/* Set group key values as parameters if needed. */
+		if (keys)
+		{
+			for (j = 0; j < num_keys; j++)
+			{
+				keyVals[j] = SPI_getbinval(tuptable_recalc->vals[i], tupdesc_recalc, j + 2, &isnull);
+				if (isnull)
+					keyNulls[j] = 'n';
+				else
+					keyNulls[j] = ' ';
+			}
+		}
+
+		/*
+		 * Get recalculated values from base tables. The result must be
+		 * only one tuple thich contains the new values for specified keys.
+		 */
+		plan = get_plan_for_recalc(matviewOid, namelist, keys, keyTypes);
+		if (SPI_execute_plan(plan, keyVals, keyNulls, false, 0) != SPI_OK_SELECT)
+			elog(ERROR, "SPI_execute_plan");
+		if (SPI_processed != 1)
+			elog(ERROR, "SPI_execute_plan returned zero or more than one rows");
+
+		tuptable_newvals = SPI_tuptable;
+		tupdesc_newvals = tuptable_newvals->tupdesc;
+
+		Assert(tupdesc_newvals->natts == num_vals);
+
+		/* Set the new values as parameters */
+		for (j = 0; j < tupdesc_newvals->natts; j++)
+		{
+			if (i == 0)
+				types[j] = TupleDescAttr(tupdesc_newvals, j)->atttypid;
+
+			vals[j] = SPI_getbinval(tuptable_newvals->vals[0], tupdesc_newvals, j + 1, &isnull);
+			if (isnull)
+				nulls[j] = 'n';
+			else
+				nulls[j] = ' ';
+		}
+		/* Set TID of the view tuple to be updated as a parameter */
+		types[j] = TIDOID;
+		vals[j] = SPI_getbinval(tuptable_recalc->vals[i], tupdesc_recalc, 1, &isnull);
+		nulls[j] = ' ';
+
+		/* Update the view tuple to the new values */
+		plan = get_plan_for_set_values(matviewOid, matviewname, namelist, types);
+		if (SPI_execute_plan(plan, vals, nulls, false, 0) != SPI_OK_UPDATE)
+			elog(ERROR, "SPI_execute_plan");
+	}
+}
+
+
+/*
+ * get_plan_for_recalc
+ *
+ * Create or fetch a plan for recalculating value in the view's target list
+ * from base tables using the definition query of materialized view specified
+ * by matviewOid. namelist is a list of resnames of values to be recalculated.
+ *
+ * keys is a list of keys to identify tuples to be recalculated if this is not
+ * empty. KeyTypes is an array of types of keys.
+ */
+static SPIPlanPtr
+get_plan_for_recalc(Oid matviewOid, List *namelist, List *keys, Oid *keyTypes)
+{
+	MV_QueryKey hash_key;
+	SPIPlanPtr	plan;
+
+	/* Fetch or prepare a saved plan for the recalculation */
+	mv_BuildQueryKey(&hash_key, matviewOid, MV_PLAN_RECALC);
+	if ((plan = mv_FetchPreparedPlan(&hash_key)) == NULL)
+	{
+		ListCell	   *lc;
+		StringInfoData	str;
+		char   *viewdef;
+
+		/* get view definition of matview */
+		viewdef = text_to_cstring((text *) DatumGetPointer(
+					DirectFunctionCall1(pg_get_viewdef, ObjectIdGetDatum(matviewOid))));
+		/* get rid of trailing semi-colon */
+		viewdef[strlen(viewdef)-1] = '\0';
+
+		/*
+		 * Build a query string for recalculating values. This is like
+		 *
+		 *  SELECT x1, x2, x3, ... FROM ( ... view definition query ...) mv
+		 *   WHERE (key1, key2, ...) = ($1, $2, ...);
+		 */
+
+		initStringInfo(&str);
+		appendStringInfo(&str, "SELECT ");
+		foreach (lc, namelist)
+		{
+			appendStringInfo(&str, "%s", (char *) lfirst(lc));
+			if (lnext(namelist, lc))
+				appendStringInfoString(&str, ", ");
+		}
+		appendStringInfo(&str, " FROM (%s) mv", viewdef);
+
+		if (keys)
+		{
+			int		i = 1;
+			char	paramname[16];
+
+			appendStringInfo(&str, " WHERE (");
+			foreach (lc, keys)
+			{
+				Form_pg_attribute attr = (Form_pg_attribute) lfirst(lc);
+				char   *resname = NameStr(attr->attname);
+				Oid		typid = attr->atttypid;
+
+				sprintf(paramname, "$%d", i);
+				appendStringInfo(&str, "(");
+				generate_equal(&str, typid, resname, paramname);
+				appendStringInfo(&str, " OR (%s IS NULL AND %s IS NULL))",
+								 resname, paramname);
+
+				if (lnext(keys, lc))
+					appendStringInfoString(&str, " AND ");
+				i++;
+			}
+			appendStringInfo(&str, ")");
+		}
+		else
+			keyTypes = NULL;
+
+		plan = SPI_prepare(str.data, list_length(keys), keyTypes);
+		if (plan == NULL)
+			elog(ERROR, "SPI_prepare returned %s for %s", SPI_result_code_string(SPI_result), str.data);
+
+		SPI_keepplan(plan);
+		mv_HashPreparedPlan(&hash_key, plan);
+	}
+
+	return plan;
+}
+
+/*
+ * get_plan_for_set_values
+ *
+ * Create or fetch a plan for applying new values calculated by
+ * get_plan_for_recalc to a materialized view specified by matviewOid.
+ * matviewname is the name of the view.  namelist is a list of resnames
+ * of attributes to be updated, and valTypes is an array of types of the
+ * values.
+ */
+static SPIPlanPtr
+get_plan_for_set_values(Oid matviewOid, char *matviewname, List *namelist,
+						Oid *valTypes)
+{
+	MV_QueryKey	key;
+	SPIPlanPtr	plan;
+
+	/* Fetch or prepare a saved plan for the real check */
+	mv_BuildQueryKey(&key, matviewOid, MV_PLAN_SET_VALUE);
+	if ((plan = mv_FetchPreparedPlan(&key)) == NULL)
+	{
+		ListCell	  *lc;
+		StringInfoData str;
+		int		i;
+
+		/*
+		 * Build a query string for applying min/max values. This is like
+		 *
+		 *  UPDATE matviewname AS mv
+		 *   SET (x1, x2, x3, x4) = ($1, $2, $3, $4)
+		 *   WHERE ctid = $5;
+		 */
+
+		initStringInfo(&str);
+		appendStringInfo(&str, "UPDATE %s AS mv SET (", matviewname);
+		foreach (lc, namelist)
+		{
+			appendStringInfo(&str, "%s", (char *) lfirst(lc));
+			if (lnext(namelist, lc))
+				appendStringInfoString(&str, ", ");
+		}
+		appendStringInfo(&str, ") = ROW(");
+
+		for (i = 1; i <= list_length(namelist); i++)
+			appendStringInfo(&str, "%s$%d", (i==1 ? "" : ", "), i);
+
+		appendStringInfo(&str, ") WHERE ctid OPERATOR(pg_catalog.=) $%d", i);
+
+		plan = SPI_prepare(str.data, list_length(namelist) + 1, valTypes);
+		if (plan == NULL)
+			elog(ERROR, "SPI_prepare returned %s for %s", SPI_result_code_string(SPI_result), str.data);
+
+		SPI_keepplan(plan);
+		mv_HashPreparedPlan(&key, plan);
+	}
+
+	return plan;
+}
+
 /*
  * generate_equals
  *
@@ -2783,6 +3299,13 @@ mv_InitHashTables(void)
 {
 	HASHCTL		ctl;
 
+	memset(&ctl, 0, sizeof(ctl));
+	ctl.keysize = sizeof(MV_QueryKey);
+	ctl.entrysize = sizeof(MV_QueryHashEntry);
+	mv_query_cache = hash_create("MV query cache",
+								 MV_INIT_QUERYHASHSIZE,
+								 &ctl, HASH_ELEM | HASH_BLOBS);
+
 	memset(&ctl, 0, sizeof(ctl));
 	ctl.keysize = sizeof(Oid);
 	ctl.entrysize = sizeof(MV_TriggerHashEntry);
@@ -2791,6 +3314,109 @@ mv_InitHashTables(void)
 								 &ctl, HASH_ELEM | HASH_BLOBS);
 }
 
+/*
+ * mv_FetchPreparedPlan
+ */
+static SPIPlanPtr
+mv_FetchPreparedPlan(MV_QueryKey *key)
+{
+	MV_QueryHashEntry *entry;
+	SPIPlanPtr	plan;
+
+	/*
+	 * On the first call initialize the hashtable
+	 */
+	if (!mv_query_cache)
+		mv_InitHashTables();
+
+	/*
+	 * Lookup for the key
+	 */
+	entry = (MV_QueryHashEntry *) hash_search(mv_query_cache,
+											  (void *) key,
+											  HASH_FIND, NULL);
+	if (entry == NULL)
+		return NULL;
+
+	/*
+	 * Check whether the plan is still valid.  If it isn't, we don't want to
+	 * simply rely on plancache.c to regenerate it; rather we should start
+	 * from scratch and rebuild the query text too.  This is to cover cases
+	 * such as table/column renames.  We depend on the plancache machinery to
+	 * detect possible invalidations, though.
+	 *
+	 * CAUTION: this check is only trustworthy if the caller has already
+	 * locked both materialized views and base tables.
+	 *
+	 * Also, check whether the search_path is still the same as when we made it.
+	 * If it isn't, we need to rebuild the query text because the result of
+	 * pg_ivm_get_viewdef() will change.
+	 */
+	plan = entry->plan;
+	if (plan && SPI_plan_is_valid(plan) &&
+		SearchPathMatchesCurrentEnvironment(entry->search_path))
+		return plan;
+
+	/*
+	 * Otherwise we might as well flush the cached plan now, to free a little
+	 * memory space before we make a new one.
+	 */
+	if (plan)
+		SPI_freeplan(plan);
+	if (entry->search_path)
+		pfree(entry->search_path);
+
+	entry->plan = NULL;
+	entry->search_path = NULL;
+
+	return NULL;
+}
+
+/*
+ * mv_HashPreparedPlan
+ *
+ * Add another plan to our private SPI query plan hashtable.
+ */
+static void
+mv_HashPreparedPlan(MV_QueryKey *key, SPIPlanPtr plan)
+{
+	MV_QueryHashEntry *entry;
+	bool		found;
+
+	/*
+	 * On the first call initialize the hashtable
+	 */
+	if (!mv_query_cache)
+		mv_InitHashTables();
+
+	/*
+	 * Add the new plan.  We might be overwriting an entry previously found
+	 * invalid by mv_FetchPreparedPlan.
+	 */
+	entry = (MV_QueryHashEntry *) hash_search(mv_query_cache,
+											  (void *) key,
+											  HASH_ENTER, &found);
+	Assert(!found || entry->plan == NULL);
+	entry->plan = plan;
+	entry->search_path = GetSearchPathMatcher(TopMemoryContext);
+}
+
+/*
+ * mv_BuildQueryKey
+ *
+ * Construct a hashtable key for a prepared SPI plan for IVM.
+ */
+static void
+mv_BuildQueryKey(MV_QueryKey *key, Oid matview_id, int32 query_type)
+{
+	/*
+	 * We assume struct MV_QueryKey contains no padding bytes, else we'd need
+	 * to use memset to clear them.
+	 */
+	key->matview_id = matview_id;
+	key->query_type = query_type;
+}
+
 /*
  * AtAbort_IVM
  *
-- 
2.25.1

