From e593c119c97ea31edac4c9f08a39eee451964a16 Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryzbyj@telsasoft.com>
Date: Thu, 19 Mar 2020 23:03:25 -0500
Subject: [PATCH v8 1/8] nodeAgg: separate context for each hashtable

---
 src/backend/executor/execExpr.c |  2 +-
 src/backend/executor/nodeAgg.c  | 82 +++++++++++++++++++--------------
 src/include/executor/nodeAgg.h  |  2 +
 src/include/nodes/execnodes.h   |  2 -
 4 files changed, 51 insertions(+), 37 deletions(-)

diff --git a/src/backend/executor/execExpr.c b/src/backend/executor/execExpr.c
index 1370ffec50..039c5a8b5f 100644
--- a/src/backend/executor/execExpr.c
+++ b/src/backend/executor/execExpr.c
@@ -3241,7 +3241,7 @@ ExecBuildAggTransCall(ExprState *state, AggState *aggstate,
 	int adjust_jumpnull = -1;
 
 	if (ishash)
-		aggcontext = aggstate->hashcontext;
+		aggcontext = aggstate->perhash[setno].hashcontext;
 	else
 		aggcontext = aggstate->aggcontexts[setno];
 
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 44c159ab2a..1d319f49d0 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -191,8 +191,7 @@
  *	  So we create an array, aggcontexts, with an ExprContext for each grouping
  *	  set in the largest rollup that we're going to process, and use the
  *	  per-tuple memory context of those ExprContexts to store the aggregate
- *	  transition values.  hashcontext is the single context created to support
- *	  all hash tables.
+ *	  transition values.
  *
  *	  Spilling To Disk
  *
@@ -457,7 +456,7 @@ select_current_set(AggState *aggstate, int setno, bool is_hash)
 	 * ExecAggPlainTransByRef().
 	 */
 	if (is_hash)
-		aggstate->curaggcontext = aggstate->hashcontext;
+		aggstate->curaggcontext = aggstate->perhash[setno].hashcontext;
 	else
 		aggstate->curaggcontext = aggstate->aggcontexts[setno];
 
@@ -1424,8 +1423,7 @@ find_unaggregated_cols_walker(Node *node, Bitmapset **colnos)
  * grouping set for which we're doing hashing.
  *
  * The contents of the hash tables always live in the hashcontext's per-tuple
- * memory context (there is only one of these for all tables together, since
- * they are all reset at the same time).
+ * memory context.
  */
 static void
 build_hash_tables(AggState *aggstate)
@@ -1465,8 +1463,8 @@ static void
 build_hash_table(AggState *aggstate, int setno, long nbuckets)
 {
 	AggStatePerHash perhash = &aggstate->perhash[setno];
-	MemoryContext	metacxt = aggstate->hash_metacxt;
-	MemoryContext	hashcxt = aggstate->hashcontext->ecxt_per_tuple_memory;
+	MemoryContext	metacxt = perhash->hash_metacxt;
+	MemoryContext	hashcxt = perhash->hashcontext->ecxt_per_tuple_memory;
 	MemoryContext	tmpcxt	= aggstate->tmpcontext->ecxt_per_tuple_memory;
 	Size            additionalsize;
 
@@ -1776,10 +1774,15 @@ static void
 hash_agg_check_limits(AggState *aggstate)
 {
 	uint64 ngroups = aggstate->hash_ngroups_current;
-	Size meta_mem = MemoryContextMemAllocated(
-		aggstate->hash_metacxt, true);
-	Size hash_mem = MemoryContextMemAllocated(
-		aggstate->hashcontext->ecxt_per_tuple_memory, true);
+	Size meta_mem = 0;
+	Size hash_mem = 0;
+
+	for (int i = 0; i < aggstate->num_hashes; ++i)
+		meta_mem = MemoryContextMemAllocated(
+		aggstate->perhash[i].hash_metacxt, true);
+	for (int i = 0; i < aggstate->num_hashes; ++i)
+		hash_mem += MemoryContextMemAllocated(
+			aggstate->perhash[i].hashcontext->ecxt_per_tuple_memory, true);
 
 	/*
 	 * Don't spill unless there's at least one group in the hash table so we
@@ -1837,8 +1840,8 @@ hash_agg_enter_spill_mode(AggState *aggstate)
 static void
 hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
 {
-	Size	meta_mem;
-	Size	hash_mem;
+	Size	meta_mem = 0;
+	Size	hash_mem = 0;
 	Size	buffer_mem;
 	Size	total_mem;
 
@@ -1846,12 +1849,16 @@ hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions)
 		aggstate->aggstrategy != AGG_HASHED)
 		return;
 
-	/* memory for the hash table itself */
-	meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt, true);
 
-	/* memory for the group keys and transition states */
-	hash_mem = MemoryContextMemAllocated(
-		aggstate->hashcontext->ecxt_per_tuple_memory, true);
+	for (int i = 0; i < aggstate->num_hashes; ++i)
+	{
+		/* memory for the hash table itself */
+		meta_mem += MemoryContextMemAllocated(
+			aggstate->perhash[i].hash_metacxt, true);
+		/* memory for the group keys and transition states */
+		hash_mem += MemoryContextMemAllocated(
+			aggstate->perhash[i].hashcontext->ecxt_per_tuple_memory, true);
+	}
 
 	/* memory for read/write tape buffers, if spilled */
 	buffer_mem = npartitions * HASHAGG_WRITE_BUFFER_SIZE;
@@ -2557,9 +2564,11 @@ agg_refill_hash_table(AggState *aggstate)
 		aggstate->all_pergroups[setoff] = NULL;
 
 	/* free memory and reset hash tables */
-	ReScanExprContext(aggstate->hashcontext);
 	for (int setno = 0; setno < aggstate->num_hashes; setno++)
+	{
+		ReScanExprContext(aggstate->perhash[setno].hashcontext);
 		ResetTupleHashTable(aggstate->perhash[setno].hashtable);
+	}
 
 	aggstate->hash_ngroups_current = 0;
 
@@ -3217,6 +3226,10 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
 	aggstate->aggcontexts = (ExprContext **)
 		palloc0(sizeof(ExprContext *) * numGroupingSets);
 
+	aggstate->num_hashes = numHashes;
+	if (numHashes)
+		aggstate->perhash = palloc0(sizeof(AggStatePerHashData) * numHashes);
+
 	/*
 	 * Create expression contexts.  We need three or more, one for
 	 * per-input-tuple processing, one for per-output-tuple processing, one
@@ -3240,10 +3253,10 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
 		aggstate->aggcontexts[i] = aggstate->ss.ps.ps_ExprContext;
 	}
 
-	if (use_hashing)
+	for (i = 0 ; i < numHashes; ++i)
 	{
 		ExecAssignExprContext(estate, &aggstate->ss.ps);
-		aggstate->hashcontext = aggstate->ss.ps.ps_ExprContext;
+		aggstate->perhash[i].hashcontext = aggstate->ss.ps.ps_ExprContext;
 	}
 
 	ExecAssignExprContext(estate, &aggstate->ss.ps);
@@ -3332,11 +3345,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
 	 * compare functions.  Accumulate all_grouped_cols in passing.
 	 */
 	aggstate->phases = palloc0(numPhases * sizeof(AggStatePerPhaseData));
-
-	aggstate->num_hashes = numHashes;
 	if (numHashes)
 	{
-		aggstate->perhash = palloc0(sizeof(AggStatePerHashData) * numHashes);
 		aggstate->phases[0].numsets = 0;
 		aggstate->phases[0].gset_lengths = palloc(numHashes * sizeof(int));
 		aggstate->phases[0].grouped_cols = palloc(numHashes * sizeof(Bitmapset *));
@@ -3528,10 +3538,12 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
 		uint64	totalGroups = 0;
 		int 	i;
 
-		aggstate->hash_metacxt = AllocSetContextCreate(
-			aggstate->ss.ps.state->es_query_cxt,
-			"HashAgg meta context",
-			ALLOCSET_DEFAULT_SIZES);
+		for (i = 0; i < aggstate->num_hashes; i++)
+			aggstate->perhash[i].hash_metacxt = AllocSetContextCreate(
+				aggstate->ss.ps.state->es_query_cxt,
+				"HashAgg meta context",
+				ALLOCSET_DEFAULT_SIZES);
+
 		aggstate->hash_spill_slot = ExecInitExtraTupleSlot(
 			estate, scanDesc, &TTSOpsMinimalTuple);
 
@@ -4461,10 +4473,11 @@ ExecEndAgg(AggState *node)
 
 	hashagg_reset_spill_state(node);
 
-	if (node->hash_metacxt != NULL)
+	for (setno = 0; setno < node->num_hashes; setno++)
 	{
-		MemoryContextDelete(node->hash_metacxt);
-		node->hash_metacxt = NULL;
+		MemoryContext *metacxt = &node->perhash[setno].hash_metacxt;
+		MemoryContextDelete(*metacxt);
+		*metacxt = NULL;
 	}
 
 	for (transno = 0; transno < node->numtrans; transno++)
@@ -4481,8 +4494,8 @@ ExecEndAgg(AggState *node)
 	/* And ensure any agg shutdown callbacks have been called */
 	for (setno = 0; setno < numGroupingSets; setno++)
 		ReScanExprContext(node->aggcontexts[setno]);
-	if (node->hashcontext)
-		ReScanExprContext(node->hashcontext);
+	for (setno = 0; setno < node->num_hashes; setno++)
+		ReScanExprContext(node->perhash[setno].hashcontext);
 
 	/*
 	 * We don't actually free any ExprContexts here (see comment in
@@ -4591,7 +4604,8 @@ ExecReScanAgg(AggState *node)
 		node->hash_spill_mode = false;
 		node->hash_ngroups_current = 0;
 
-		ReScanExprContext(node->hashcontext);
+		for (setno = 0; setno < node->num_hashes; setno++)
+			ReScanExprContext(node->perhash[setno].hashcontext);
 		/* Rebuild an empty hash table */
 		build_hash_tables(node);
 		node->table_filled = false;
diff --git a/src/include/executor/nodeAgg.h b/src/include/executor/nodeAgg.h
index a5b8a004d1..247bb65bd6 100644
--- a/src/include/executor/nodeAgg.h
+++ b/src/include/executor/nodeAgg.h
@@ -307,6 +307,8 @@ typedef struct AggStatePerHashData
 	AttrNumber *hashGrpColIdxInput; /* hash col indices in input slot */
 	AttrNumber *hashGrpColIdxHash;	/* indices in hash table tuples */
 	Agg		   *aggnode;		/* original Agg node, for numGroups etc. */
+	MemoryContext	hash_metacxt;	/* memory for hash table itself */
+	ExprContext	*hashcontext;	/* context for hash table data */
 }			AggStatePerHashData;
 
 
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 3d27d50f09..ddf0b43916 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2051,7 +2051,6 @@ typedef struct AggState
 	int			current_phase;	/* current phase number */
 	AggStatePerAgg peragg;		/* per-Aggref information */
 	AggStatePerTrans pertrans;	/* per-Trans state information */
-	ExprContext *hashcontext;	/* econtexts for long-lived data (hashtable) */
 	ExprContext **aggcontexts;	/* econtexts for long-lived data (per GS) */
 	ExprContext *tmpcontext;	/* econtext for input expressions */
 #define FIELDNO_AGGSTATE_CURAGGCONTEXT 14
@@ -2079,7 +2078,6 @@ typedef struct AggState
 	/* these fields are used in AGG_HASHED and AGG_MIXED modes: */
 	bool		table_filled;	/* hash table filled yet? */
 	int			num_hashes;
-	MemoryContext	hash_metacxt;	/* memory for hash table itself */
 	struct HashTapeInfo *hash_tapeinfo; /* metadata for spill tapes */
 	struct HashAggSpill *hash_spills; /* HashAggSpill for each grouping set,
 										 exists only during first pass */
-- 
2.17.0

