From 03df3a9feb6710cf34845fecfad3086389a45635 Mon Sep 17 00:00:00 2001
From: James Hunter <james.hunter.pg@gmail.com>
Date: Tue, 25 Feb 2025 22:44:01 +0000
Subject: [PATCH 1/4] Store working memory limit per Plan/SubPlan, rather than
 in GUC

This commit moves the working-memory limit that an executor node checks, at
runtime, from the "work_mem" and "hash_mem_multiplier" GUCs, to a new
list, "workMemLimits", added to the PlannedStmt node. At runtimem an exec
node checks its limit by looking up the list element corresponding to its
plan->workmem_id field.

Indirecting the workMemLimit via a List index allows us to handle SubPlans
as well as Plans. It also allows a future extension to set limits on
individual Plans/SubPlans, without needing to re-traverse the Plan +
Expr tree.

To preserve backward, this commit also copies the "work_mem", etc., values
from the existing GUCs to the new field. This means that this commit is
just a refactoring, and doesn't change any behavior.

This "workmem_id" field is on the Plan node, instead of the corresponding
PlanState, because the workMemLimit needs to be set before we can call
ExecInitNode().
---
 src/backend/executor/Makefile              |   1 +
 src/backend/executor/execGrouping.c        |  10 +-
 src/backend/executor/execMain.c            |   6 +
 src/backend/executor/execParallel.c        |   2 +
 src/backend/executor/execSRF.c             |   5 +-
 src/backend/executor/execWorkmem.c         |  87 ++++++++++++
 src/backend/executor/meson.build           |   1 +
 src/backend/executor/nodeAgg.c             |  64 ++++++---
 src/backend/executor/nodeBitmapIndexscan.c |   2 +-
 src/backend/executor/nodeBitmapOr.c        |   2 +-
 src/backend/executor/nodeCtescan.c         |   3 +-
 src/backend/executor/nodeFunctionscan.c    |   2 +
 src/backend/executor/nodeHash.c            |  22 +++-
 src/backend/executor/nodeIncrementalSort.c |   4 +-
 src/backend/executor/nodeMaterial.c        |   3 +-
 src/backend/executor/nodeMemoize.c         |   2 +-
 src/backend/executor/nodeRecursiveunion.c  |  14 +-
 src/backend/executor/nodeSetOp.c           |   1 +
 src/backend/executor/nodeSort.c            |   4 +-
 src/backend/executor/nodeSubplan.c         |  16 +++
 src/backend/executor/nodeTableFuncscan.c   |   3 +-
 src/backend/executor/nodeWindowAgg.c       |   3 +-
 src/backend/optimizer/path/costsize.c      |  16 ++-
 src/backend/optimizer/plan/createplan.c    | 146 ++++++++++++++++++---
 src/backend/optimizer/plan/planner.c       |   5 +-
 src/backend/optimizer/plan/subselect.c     |   2 +-
 src/include/executor/executor.h            |   7 +
 src/include/executor/hashjoin.h            |   3 +-
 src/include/executor/nodeAgg.h             |   3 +-
 src/include/executor/nodeHash.h            |   3 +-
 src/include/nodes/execnodes.h              |  13 ++
 src/include/nodes/pathnodes.h              |  11 ++
 src/include/nodes/plannodes.h              |  27 +++-
 src/include/nodes/primnodes.h              |   3 +
 src/include/optimizer/planmain.h           |   4 +-
 35 files changed, 434 insertions(+), 66 deletions(-)
 create mode 100644 src/backend/executor/execWorkmem.c

diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index 11118d0ce02..8aa9580558f 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -30,6 +30,7 @@ OBJS = \
 	execScan.o \
 	execTuples.o \
 	execUtils.o \
+	execWorkmem.o \
 	functions.o \
 	instrument.o \
 	nodeAgg.o \
diff --git a/src/backend/executor/execGrouping.c b/src/backend/executor/execGrouping.c
index b5400749353..24e8034e4ee 100644
--- a/src/backend/executor/execGrouping.c
+++ b/src/backend/executor/execGrouping.c
@@ -168,6 +168,7 @@ BuildTupleHashTable(PlanState *parent,
 					Oid *collations,
 					long nbuckets,
 					Size additionalsize,
+					Size hash_mem_limit,
 					MemoryContext metacxt,
 					MemoryContext tablecxt,
 					MemoryContext tempcxt,
@@ -175,7 +176,6 @@ BuildTupleHashTable(PlanState *parent,
 {
 	TupleHashTable hashtable;
 	Size		entrysize;
-	Size		hash_mem_limit;
 	MemoryContext oldcontext;
 	bool		allow_jit;
 	uint32		hash_iv = 0;
@@ -184,8 +184,12 @@ BuildTupleHashTable(PlanState *parent,
 	additionalsize = MAXALIGN(additionalsize);
 	entrysize = sizeof(TupleHashEntryData) + additionalsize;
 
-	/* Limit initial table size request to not more than hash_mem */
-	hash_mem_limit = get_hash_memory_limit() / entrysize;
+	/*
+	 * Limit initial table size request to not more than hash_mem
+	 *
+	 * XXX - we should also limit the *maximum* table size to hash_mem.
+	 */
+	hash_mem_limit = hash_mem_limit / entrysize;
 	if (nbuckets > hash_mem_limit)
 		nbuckets = hash_mem_limit;
 
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 0391798dd2c..6aa9dde5a80 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -949,6 +949,12 @@ InitPlan(QueryDesc *queryDesc, int eflags)
 	/* signal that this EState is not used for EPQ */
 	estate->es_epq_active = NULL;
 
+	/*
+	 * Assign working memory to SubPlan and Plan nodes, before initializing
+	 * their states.
+	 */
+	ExecAssignWorkMem(plannedstmt);
+
 	/*
 	 * Initialize private state information for each SubPlan.  We must do this
 	 * before running ExecInitNode on the main query tree, since
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index f098a5557cf..a8cb631963e 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -216,6 +216,8 @@ ExecSerializePlan(Plan *plan, EState *estate)
 	pstmt->utilityStmt = NULL;
 	pstmt->stmt_location = -1;
 	pstmt->stmt_len = -1;
+	pstmt->workMemCategories = estate->es_plannedstmt->workMemCategories;
+	pstmt->workMemLimits = estate->es_plannedstmt->workMemLimits;
 
 	/* Return serialized copy of our dummy PlannedStmt. */
 	return nodeToString(pstmt);
diff --git a/src/backend/executor/execSRF.c b/src/backend/executor/execSRF.c
index a03fe780a02..4b1e7e0ad1e 100644
--- a/src/backend/executor/execSRF.c
+++ b/src/backend/executor/execSRF.c
@@ -102,6 +102,7 @@ ExecMakeTableFunctionResult(SetExprState *setexpr,
 							ExprContext *econtext,
 							MemoryContext argContext,
 							TupleDesc expectedDesc,
+							int workMem,
 							bool randomAccess)
 {
 	Tuplestorestate *tupstore = NULL;
@@ -261,7 +262,7 @@ ExecMakeTableFunctionResult(SetExprState *setexpr,
 				MemoryContext oldcontext =
 					MemoryContextSwitchTo(econtext->ecxt_per_query_memory);
 
-				tupstore = tuplestore_begin_heap(randomAccess, false, work_mem);
+				tupstore = tuplestore_begin_heap(randomAccess, false, workMem);
 				rsinfo.setResult = tupstore;
 				if (!returnsTuple)
 				{
@@ -396,7 +397,7 @@ no_function_result:
 		MemoryContext oldcontext =
 			MemoryContextSwitchTo(econtext->ecxt_per_query_memory);
 
-		tupstore = tuplestore_begin_heap(randomAccess, false, work_mem);
+		tupstore = tuplestore_begin_heap(randomAccess, false, workMem);
 		rsinfo.setResult = tupstore;
 		MemoryContextSwitchTo(oldcontext);
 
diff --git a/src/backend/executor/execWorkmem.c b/src/backend/executor/execWorkmem.c
new file mode 100644
index 00000000000..d8a19a58ebe
--- /dev/null
+++ b/src/backend/executor/execWorkmem.c
@@ -0,0 +1,87 @@
+/*-------------------------------------------------------------------------
+ *
+ * execWorkmem.c
+ *	 routine to set the "workmem_limit" field(s) on Plan nodes that need
+ *   workimg memory.
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/executor/execWorkmem.c
+ *
+ * INTERFACE ROUTINES
+ *		ExecAssignWorkMem	- assign working memory to Plan nodes
+ *
+ *	 NOTES
+ *		Historically, every PlanState node, during initialization, looked at
+ *		the "work_mem" (plus maybe "hash_mem_multiplier") GUC, to determine
+ *		its working-memory limit.
+ *
+ *		Now, to allow different PlanState nodes to be restricted to different
+ *		amounts of memory, each PlanState node reads this limit off the
+ *		PlannedStmt's workMemLimits List, at the (1-based) position indicated
+ *		by the PlanState's Plan node's "workmem_id" field.
+ *
+ *		We assign the workmem_id and expand the workMemLimits List, when
+ *		creating the Plan node; and then we set this limit by calling
+ *		ExecAssignWorkMem(), from InitPlan(), before we initialize the PlanState
+ *		nodes.
+ *
+ * 		The workMemLimit has always applied "per data structure," rather than
+ *		"per PlanState". So a single SQL operator (e.g., RecursiveUnion) can
+ *		use more than the workMemLimit, even though each of its data
+ *		structures is restricted to it.
+ *
+ *		We store the "workmem_id" field(s) on the Plan, instead of the
+ *		PlanState, even though it conceptually belongs to execution rather than
+ *		to planning, because we need it to be set before initializing the
+ *		corresponding PlanState. This is a chicken-and-egg problem. We could,
+ *		of course, make ExecInitNode() a two-phase operation, but that seems
+ *		like overkill. Instead, we store these "workmem_id" fields on the Plan,
+ *		but set the workMemLimit when we start execution, as part of
+ *		InitPlan().
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/parallel.h"
+#include "executor/executor.h"
+#include "miscadmin.h"
+#include "nodes/plannodes.h"
+
+
+/* ------------------------------------------------------------------------
+ *		ExecAssignWorkMem
+ *
+ *		Assigns working memory to any Plans or SubPlans that need it.
+ *
+ *		Inputs:
+ *		  'plannedstmt' is the statement to which we assign working memory
+ *
+ * ------------------------------------------------------------------------
+ */
+void
+ExecAssignWorkMem(PlannedStmt *plannedstmt)
+{
+	ListCell   *lc_category;
+	ListCell   *lc_limit;
+
+	/*
+	 * No need to re-assign working memory on parallel workers, since workers
+	 * have the same work_mem and hash_mem_multiplier GUCs as the leader.
+	 *
+	 * We already assigned working-memory limits on the leader, and those
+	 * limits were sent to the workers inside the serialized Plan.
+	 */
+	if (IsParallelWorker())
+		return;
+
+	forboth(lc_category, plannedstmt->workMemCategories,
+			lc_limit, plannedstmt->workMemLimits)
+	{
+		lfirst_int(lc_limit) = lfirst_int(lc_category) == WORKMEM_HASH ?
+			get_hash_memory_limit() / 1024 : work_mem;
+	}
+}
diff --git a/src/backend/executor/meson.build b/src/backend/executor/meson.build
index 2cea41f8771..4e65974f5f3 100644
--- a/src/backend/executor/meson.build
+++ b/src/backend/executor/meson.build
@@ -18,6 +18,7 @@ backend_sources += files(
   'execScan.c',
   'execTuples.c',
   'execUtils.c',
+  'execWorkmem.c',
   'functions.c',
   'instrument.c',
   'nodeAgg.c',
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 377e016d732..bff143a8a8e 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -258,6 +258,7 @@
 #include "executor/execExpr.h"
 #include "executor/executor.h"
 #include "executor/nodeAgg.h"
+#include "executor/nodeHash.h"
 #include "lib/hyperloglog.h"
 #include "miscadmin.h"
 #include "nodes/nodeFuncs.h"
@@ -403,7 +404,8 @@ static void find_cols(AggState *aggstate, Bitmapset **aggregated,
 					  Bitmapset **unaggregated);
 static bool find_cols_walker(Node *node, FindColsContext *context);
 static void build_hash_tables(AggState *aggstate);
-static void build_hash_table(AggState *aggstate, int setno, long nbuckets);
+static void build_hash_table(AggState *aggstate, int setno, long nbuckets,
+							 Size hash_mem_limit);
 static void hashagg_recompile_expressions(AggState *aggstate, bool minslot,
 										  bool nullcheck);
 static void hash_create_memory(AggState *aggstate);
@@ -412,6 +414,7 @@ static long hash_choose_num_buckets(double hashentrysize,
 static int	hash_choose_num_partitions(double input_groups,
 									   double hashentrysize,
 									   int used_bits,
+									   Size hash_mem_limit,
 									   int *log2_npartitions);
 static void initialize_hash_entry(AggState *aggstate,
 								  TupleHashTable hashtable,
@@ -434,7 +437,7 @@ static HashAggBatch *hashagg_batch_new(LogicalTape *input_tape, int setno,
 static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp);
 static void hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset,
 							   int used_bits, double input_groups,
-							   double hashentrysize);
+							   double hashentrysize, Size hash_mem_limit);
 static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill,
 								TupleTableSlot *inputslot, uint32 hash);
 static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill,
@@ -522,6 +525,15 @@ initialize_phase(AggState *aggstate, int newphase)
 		Sort	   *sortnode = aggstate->phases[newphase + 1].sortnode;
 		PlanState  *outerNode = outerPlanState(aggstate);
 		TupleDesc	tupDesc = ExecGetResultType(outerNode);
+		int			workmem_limit;
+
+		/*
+		 * Read the sort-output workmem limit off the first AGG_SORTED node.
+		 * Since phase 0 is always AGG_HASHED, this will always be phase 1.
+		 */
+		workmem_limit =
+			workMemLimitFromId(aggstate,
+							   aggstate->phases[1].aggnode->plan.workmem_id);
 
 		aggstate->sort_out = tuplesort_begin_heap(tupDesc,
 												  sortnode->numCols,
@@ -529,7 +541,7 @@ initialize_phase(AggState *aggstate, int newphase)
 												  sortnode->sortOperators,
 												  sortnode->collations,
 												  sortnode->nullsFirst,
-												  work_mem,
+												  workmem_limit,
 												  NULL, TUPLESORT_NONE);
 	}
 
@@ -585,6 +597,8 @@ initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans,
 	 */
 	if (pertrans->aggsortrequired)
 	{
+		int			workmem_limit;
+
 		/*
 		 * In case of rescan, maybe there could be an uncompleted sort
 		 * operation?  Clean it up if so.
@@ -592,6 +606,12 @@ initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans,
 		if (pertrans->sortstates[aggstate->current_set])
 			tuplesort_end(pertrans->sortstates[aggstate->current_set]);
 
+		/*
+		 * Read the sort-input workmem limit off the first Agg node.
+		 */
+		workmem_limit =
+			workMemLimitFromId(aggstate,
+							   ((Agg *) aggstate->ss.ps.plan)->sortWorkMemId);
 
 		/*
 		 * We use a plain Datum sorter when there's a single input column;
@@ -607,7 +627,7 @@ initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans,
 									  pertrans->sortOperators[0],
 									  pertrans->sortCollations[0],
 									  pertrans->sortNullsFirst[0],
-									  work_mem, NULL, TUPLESORT_NONE);
+									  workmem_limit, NULL, TUPLESORT_NONE);
 		}
 		else
 			pertrans->sortstates[aggstate->current_set] =
@@ -617,7 +637,7 @@ initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans,
 									 pertrans->sortOperators,
 									 pertrans->sortCollations,
 									 pertrans->sortNullsFirst,
-									 work_mem, NULL, TUPLESORT_NONE);
+									 workmem_limit, NULL, TUPLESORT_NONE);
 	}
 
 	/*
@@ -1496,7 +1516,7 @@ build_hash_tables(AggState *aggstate)
 		}
 #endif
 
-		build_hash_table(aggstate, setno, nbuckets);
+		build_hash_table(aggstate, setno, nbuckets, memory);
 	}
 
 	aggstate->hash_ngroups_current = 0;
@@ -1506,7 +1526,8 @@ build_hash_tables(AggState *aggstate)
  * Build a single hashtable for this grouping set.
  */
 static void
-build_hash_table(AggState *aggstate, int setno, long nbuckets)
+build_hash_table(AggState *aggstate, int setno, long nbuckets,
+				 Size hash_mem_limit)
 {
 	AggStatePerHash perhash = &aggstate->perhash[setno];
 	MemoryContext metacxt = aggstate->hash_metacxt;
@@ -1535,6 +1556,7 @@ build_hash_table(AggState *aggstate, int setno, long nbuckets)
 											 perhash->aggnode->grpCollations,
 											 nbuckets,
 											 additionalsize,
+											 hash_mem_limit,
 											 metacxt,
 											 tablecxt,
 											 tmpcxt,
@@ -1807,12 +1829,11 @@ hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck)
  */
 void
 hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits,
-					Size *mem_limit, uint64 *ngroups_limit,
+					Size hash_mem_limit, Size *mem_limit, uint64 *ngroups_limit,
 					int *num_partitions)
 {
 	int			npartitions;
 	Size		partition_mem;
-	Size		hash_mem_limit = get_hash_memory_limit();
 
 	/* if not expected to spill, use all of hash_mem */
 	if (input_groups * hashentrysize <= hash_mem_limit)
@@ -1832,6 +1853,7 @@ hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits,
 	npartitions = hash_choose_num_partitions(input_groups,
 											 hashentrysize,
 											 used_bits,
+											 hash_mem_limit,
 											 NULL);
 	if (num_partitions != NULL)
 		*num_partitions = npartitions;
@@ -1932,7 +1954,8 @@ hash_agg_enter_spill_mode(AggState *aggstate)
 
 			hashagg_spill_init(spill, aggstate->hash_tapeset, 0,
 							   perhash->aggnode->numGroups,
-							   aggstate->hashentrysize);
+							   aggstate->hashentrysize,
+							   (Size) workMemLimit(aggstate) * 1024);
 		}
 	}
 }
@@ -2081,9 +2104,9 @@ hash_choose_num_buckets(double hashentrysize, long ngroups, Size memory)
  */
 static int
 hash_choose_num_partitions(double input_groups, double hashentrysize,
-						   int used_bits, int *log2_npartitions)
+						   int used_bits, Size hash_mem_limit,
+						   int *log2_npartitions)
 {
-	Size		hash_mem_limit = get_hash_memory_limit();
 	double		partition_limit;
 	double		mem_wanted;
 	double		dpartitions;
@@ -2219,7 +2242,8 @@ lookup_hash_entries(AggState *aggstate)
 			if (spill->partitions == NULL)
 				hashagg_spill_init(spill, aggstate->hash_tapeset, 0,
 								   perhash->aggnode->numGroups,
-								   aggstate->hashentrysize);
+								   aggstate->hashentrysize,
+								   (Size) workMemLimit(aggstate) * 1024);
 
 			hashagg_spill_tuple(aggstate, spill, slot, hash);
 			pergroup[setno] = NULL;
@@ -2693,7 +2717,9 @@ agg_refill_hash_table(AggState *aggstate)
 	aggstate->hash_batches = list_delete_last(aggstate->hash_batches);
 
 	hash_agg_set_limits(aggstate->hashentrysize, batch->input_card,
-						batch->used_bits, &aggstate->hash_mem_limit,
+						batch->used_bits,
+						(Size) workMemLimit(aggstate) * 1024,
+						&aggstate->hash_mem_limit,
 						&aggstate->hash_ngroups_limit, NULL);
 
 	/*
@@ -2783,7 +2809,8 @@ agg_refill_hash_table(AggState *aggstate)
 				 */
 				spill_initialized = true;
 				hashagg_spill_init(&spill, tapeset, batch->used_bits,
-								   batch->input_card, aggstate->hashentrysize);
+								   batch->input_card, aggstate->hashentrysize,
+								   (Size) workMemLimit(aggstate) * 1024);
 			}
 			/* no memory for a new group, spill */
 			hashagg_spill_tuple(aggstate, &spill, spillslot, hash);
@@ -2982,13 +3009,15 @@ agg_retrieve_hash_table_in_memory(AggState *aggstate)
  */
 static void
 hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset, int used_bits,
-				   double input_groups, double hashentrysize)
+				   double input_groups, double hashentrysize,
+				   Size hash_mem_limit)
 {
 	int			npartitions;
 	int			partition_bits;
 
 	npartitions = hash_choose_num_partitions(input_groups, hashentrysize,
-											 used_bits, &partition_bits);
+											 used_bits, hash_mem_limit,
+											 &partition_bits);
 
 #ifdef USE_INJECTION_POINTS
 	if (IS_INJECTION_POINT_ATTACHED("hash-aggregate-single-partition"))
@@ -3712,6 +3741,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
 			totalGroups += aggstate->perhash[k].aggnode->numGroups;
 
 		hash_agg_set_limits(aggstate->hashentrysize, totalGroups, 0,
+							(Size) workMemLimit(aggstate) * 1024,
 							&aggstate->hash_mem_limit,
 							&aggstate->hash_ngroups_limit,
 							&aggstate->hash_planned_partitions);
diff --git a/src/backend/executor/nodeBitmapIndexscan.c b/src/backend/executor/nodeBitmapIndexscan.c
index abbb033881a..8bbf1d047c4 100644
--- a/src/backend/executor/nodeBitmapIndexscan.c
+++ b/src/backend/executor/nodeBitmapIndexscan.c
@@ -91,7 +91,7 @@ MultiExecBitmapIndexScan(BitmapIndexScanState *node)
 	else
 	{
 		/* XXX should we use less than work_mem for this? */
-		tbm = tbm_create(work_mem * (Size) 1024,
+		tbm = tbm_create(workMemLimit(node) * (Size) 1024,
 						 ((BitmapIndexScan *) node->ss.ps.plan)->isshared ?
 						 node->ss.ps.state->es_query_dsa : NULL);
 	}
diff --git a/src/backend/executor/nodeBitmapOr.c b/src/backend/executor/nodeBitmapOr.c
index 231760ec93d..16d0a164292 100644
--- a/src/backend/executor/nodeBitmapOr.c
+++ b/src/backend/executor/nodeBitmapOr.c
@@ -143,7 +143,7 @@ MultiExecBitmapOr(BitmapOrState *node)
 			if (result == NULL) /* first subplan */
 			{
 				/* XXX should we use less than work_mem for this? */
-				result = tbm_create(work_mem * (Size) 1024,
+				result = tbm_create(workMemLimit(subnode) * (Size) 1024,
 									((BitmapOr *) node->ps.plan)->isshared ?
 									node->ps.state->es_query_dsa : NULL);
 			}
diff --git a/src/backend/executor/nodeCtescan.c b/src/backend/executor/nodeCtescan.c
index e1675f66b43..08f48f88e65 100644
--- a/src/backend/executor/nodeCtescan.c
+++ b/src/backend/executor/nodeCtescan.c
@@ -232,7 +232,8 @@ ExecInitCteScan(CteScan *node, EState *estate, int eflags)
 		/* I am the leader */
 		prmdata->value = PointerGetDatum(scanstate);
 		scanstate->leader = scanstate;
-		scanstate->cte_table = tuplestore_begin_heap(true, false, work_mem);
+		scanstate->cte_table =
+			tuplestore_begin_heap(true, false, workMemLimit(scanstate));
 		tuplestore_set_eflags(scanstate->cte_table, scanstate->eflags);
 		scanstate->readptr = 0;
 	}
diff --git a/src/backend/executor/nodeFunctionscan.c b/src/backend/executor/nodeFunctionscan.c
index 644363582d9..fda42a278b8 100644
--- a/src/backend/executor/nodeFunctionscan.c
+++ b/src/backend/executor/nodeFunctionscan.c
@@ -95,6 +95,7 @@ FunctionNext(FunctionScanState *node)
 											node->ss.ps.ps_ExprContext,
 											node->argcontext,
 											node->funcstates[0].tupdesc,
+											workMemLimit(node),
 											node->eflags & EXEC_FLAG_BACKWARD);
 
 			/*
@@ -154,6 +155,7 @@ FunctionNext(FunctionScanState *node)
 											node->ss.ps.ps_ExprContext,
 											node->argcontext,
 											fs->tupdesc,
+											workMemLimit(node),
 											node->eflags & EXEC_FLAG_BACKWARD);
 
 			/*
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 8d2201ab67f..bb9af08dc5d 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -448,6 +448,7 @@ ExecHashTableCreate(HashState *state)
 	Hash	   *node;
 	HashJoinTable hashtable;
 	Plan	   *outerNode;
+	size_t		worker_space_allowed;
 	size_t		space_allowed;
 	int			nbuckets;
 	int			nbatch;
@@ -471,11 +472,15 @@ ExecHashTableCreate(HashState *state)
 	 */
 	rows = node->plan.parallel_aware ? node->rows_total : outerNode->plan_rows;
 
+	worker_space_allowed = (size_t) workMemLimit(state) * 1024;
+	Assert(worker_space_allowed > 0);
+
 	ExecChooseHashTableSize(rows, outerNode->plan_width,
 							OidIsValid(node->skewTable),
 							state->parallel_state != NULL,
 							state->parallel_state != NULL ?
 							state->parallel_state->nparticipants - 1 : 0,
+							worker_space_allowed,
 							&space_allowed,
 							&nbuckets, &nbatch, &num_skew_mcvs);
 
@@ -599,6 +604,7 @@ ExecHashTableCreate(HashState *state)
 		{
 			pstate->nbatch = nbatch;
 			pstate->space_allowed = space_allowed;
+			pstate->worker_space_allowed = worker_space_allowed;
 			pstate->growth = PHJ_GROWTH_OK;
 
 			/* Set up the shared state for coordinating batches. */
@@ -658,7 +664,8 @@ void
 ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 						bool try_combined_hash_mem,
 						int parallel_workers,
-						size_t *space_allowed,
+						size_t worker_space_allowed,
+						size_t *total_space_allowed,
 						int *numbuckets,
 						int *numbatches,
 						int *num_skew_mcvs)
@@ -687,9 +694,9 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 	inner_rel_bytes = ntuples * tupsize;
 
 	/*
-	 * Compute in-memory hashtable size limit from GUCs.
+	 * Caller tells us our (per-worker) in-memory hashtable size limit.
 	 */
-	hash_table_bytes = get_hash_memory_limit();
+	hash_table_bytes = worker_space_allowed;
 
 	/*
 	 * Parallel Hash tries to use the combined hash_mem of all workers to
@@ -706,7 +713,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 		hash_table_bytes = (size_t) newlimit;
 	}
 
-	*space_allowed = hash_table_bytes;
+	*total_space_allowed = hash_table_bytes;
 
 	/*
 	 * If skew optimization is possible, estimate the number of skew buckets
@@ -808,7 +815,8 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 		{
 			ExecChooseHashTableSize(ntuples, tupwidth, useskew,
 									false, parallel_workers,
-									space_allowed,
+									worker_space_allowed,
+									total_space_allowed,
 									numbuckets,
 									numbatches,
 									num_skew_mcvs);
@@ -929,7 +937,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 		nbatch /= 2;
 		nbuckets *= 2;
 
-		*space_allowed = (*space_allowed) * 2;
+		*total_space_allowed = (*total_space_allowed) * 2;
 	}
 
 	Assert(nbuckets > 0);
@@ -1235,7 +1243,7 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
 					 * to switch from one large combined memory budget to the
 					 * regular hash_mem budget.
 					 */
-					pstate->space_allowed = get_hash_memory_limit();
+					pstate->space_allowed = pstate->worker_space_allowed;
 
 					/*
 					 * The combined hash_mem of all participants wasn't
diff --git a/src/backend/executor/nodeIncrementalSort.c b/src/backend/executor/nodeIncrementalSort.c
index 975b0397e7a..7a92c1eb2c0 100644
--- a/src/backend/executor/nodeIncrementalSort.c
+++ b/src/backend/executor/nodeIncrementalSort.c
@@ -312,7 +312,7 @@ switchToPresortedPrefixMode(PlanState *pstate)
 												&(plannode->sort.sortOperators[nPresortedCols]),
 												&(plannode->sort.collations[nPresortedCols]),
 												&(plannode->sort.nullsFirst[nPresortedCols]),
-												work_mem,
+												workMemLimit(pstate),
 												NULL,
 												node->bounded ? TUPLESORT_ALLOWBOUNDED : TUPLESORT_NONE);
 		node->prefixsort_state = prefixsort_state;
@@ -613,7 +613,7 @@ ExecIncrementalSort(PlanState *pstate)
 												  plannode->sort.sortOperators,
 												  plannode->sort.collations,
 												  plannode->sort.nullsFirst,
-												  work_mem,
+												  workMemLimit(pstate),
 												  NULL,
 												  node->bounded ?
 												  TUPLESORT_ALLOWBOUNDED :
diff --git a/src/backend/executor/nodeMaterial.c b/src/backend/executor/nodeMaterial.c
index 9798bb75365..bf5e921a205 100644
--- a/src/backend/executor/nodeMaterial.c
+++ b/src/backend/executor/nodeMaterial.c
@@ -61,7 +61,8 @@ ExecMaterial(PlanState *pstate)
 	 */
 	if (tuplestorestate == NULL && node->eflags != 0)
 	{
-		tuplestorestate = tuplestore_begin_heap(true, false, work_mem);
+		tuplestorestate =
+			tuplestore_begin_heap(true, false, workMemLimit(node));
 		tuplestore_set_eflags(tuplestorestate, node->eflags);
 		if (node->eflags & EXEC_FLAG_MARK)
 		{
diff --git a/src/backend/executor/nodeMemoize.c b/src/backend/executor/nodeMemoize.c
index 609deb12afb..4e3da4aab6b 100644
--- a/src/backend/executor/nodeMemoize.c
+++ b/src/backend/executor/nodeMemoize.c
@@ -1036,7 +1036,7 @@ ExecInitMemoize(Memoize *node, EState *estate, int eflags)
 	mstate->mem_used = 0;
 
 	/* Limit the total memory consumed by the cache to this */
-	mstate->mem_limit = get_hash_memory_limit();
+	mstate->mem_limit = (Size) workMemLimit(mstate) * 1024;
 
 	/* A memory context dedicated for the cache */
 	mstate->tableContext = AllocSetContextCreate(CurrentMemoryContext,
diff --git a/src/backend/executor/nodeRecursiveunion.c b/src/backend/executor/nodeRecursiveunion.c
index 40f66fd0680..5ffffd327d2 100644
--- a/src/backend/executor/nodeRecursiveunion.c
+++ b/src/backend/executor/nodeRecursiveunion.c
@@ -33,6 +33,8 @@ build_hash_table(RecursiveUnionState *rustate)
 {
 	RecursiveUnion *node = (RecursiveUnion *) rustate->ps.plan;
 	TupleDesc	desc = ExecGetResultType(outerPlanState(rustate));
+	int			workmem_limit = workMemLimitFromId(rustate,
+												   node->hashWorkMemId);
 
 	Assert(node->numCols > 0);
 	Assert(node->numGroups > 0);
@@ -52,6 +54,7 @@ build_hash_table(RecursiveUnionState *rustate)
 											 node->dupCollations,
 											 node->numGroups,
 											 0,
+											 (Size) workmem_limit * 1024,
 											 rustate->ps.state->es_query_cxt,
 											 rustate->tableContext,
 											 rustate->tempContext,
@@ -202,8 +205,15 @@ ExecInitRecursiveUnion(RecursiveUnion *node, EState *estate, int eflags)
 	/* initialize processing state */
 	rustate->recursing = false;
 	rustate->intermediate_empty = true;
-	rustate->working_table = tuplestore_begin_heap(false, false, work_mem);
-	rustate->intermediate_table = tuplestore_begin_heap(false, false, work_mem);
+
+	/*
+	 * NOTE: each of our working tables gets the same workmem_limit, since
+	 * we're going to swap them repeatedly.
+	 */
+	rustate->working_table = tuplestore_begin_heap(false, false,
+												   workMemLimit(rustate));
+	rustate->intermediate_table = tuplestore_begin_heap(false, false,
+														workMemLimit(rustate));
 
 	/*
 	 * If hashing, we need a per-tuple memory context for comparisons, and a
diff --git a/src/backend/executor/nodeSetOp.c b/src/backend/executor/nodeSetOp.c
index 4068481a523..0e2d02aa243 100644
--- a/src/backend/executor/nodeSetOp.c
+++ b/src/backend/executor/nodeSetOp.c
@@ -105,6 +105,7 @@ build_hash_table(SetOpState *setopstate)
 												node->cmpCollations,
 												node->numGroups,
 												sizeof(SetOpStatePerGroupData),
+												(Size) workMemLimit(setopstate) * 1024,
 												setopstate->ps.state->es_query_cxt,
 												setopstate->tableContext,
 												econtext->ecxt_per_tuple_memory,
diff --git a/src/backend/executor/nodeSort.c b/src/backend/executor/nodeSort.c
index f603337ecd3..8ec939e25d7 100644
--- a/src/backend/executor/nodeSort.c
+++ b/src/backend/executor/nodeSort.c
@@ -107,7 +107,7 @@ ExecSort(PlanState *pstate)
 												   plannode->sortOperators[0],
 												   plannode->collations[0],
 												   plannode->nullsFirst[0],
-												   work_mem,
+												   workMemLimit(pstate),
 												   NULL,
 												   tuplesortopts);
 		else
@@ -117,7 +117,7 @@ ExecSort(PlanState *pstate)
 												  plannode->sortOperators,
 												  plannode->collations,
 												  plannode->nullsFirst,
-												  work_mem,
+												  workMemLimit(pstate),
 												  NULL,
 												  tuplesortopts);
 		if (node->bounded)
diff --git a/src/backend/executor/nodeSubplan.c b/src/backend/executor/nodeSubplan.c
index f7f6fc2da0b..56036e79933 100644
--- a/src/backend/executor/nodeSubplan.c
+++ b/src/backend/executor/nodeSubplan.c
@@ -536,6 +536,12 @@ buildSubPlanHash(SubPlanState *node, ExprContext *econtext)
 	if (node->hashtable)
 		ResetTupleHashTable(node->hashtable);
 	else
+	{
+		int			workmem_limit;
+
+		workmem_limit = workMemLimitFromId(planstate,
+										   subplan->hashtab_workmem_id);
+
 		node->hashtable = BuildTupleHashTable(node->parent,
 											  node->descRight,
 											  &TTSOpsVirtual,
@@ -546,10 +552,12 @@ buildSubPlanHash(SubPlanState *node, ExprContext *econtext)
 											  node->tab_collations,
 											  nbuckets,
 											  0,
+											  (Size) workmem_limit * 1024,
 											  node->planstate->state->es_query_cxt,
 											  node->hashtablecxt,
 											  node->hashtempcxt,
 											  false);
+	}
 
 	if (!subplan->unknownEqFalse)
 	{
@@ -565,6 +573,12 @@ buildSubPlanHash(SubPlanState *node, ExprContext *econtext)
 		if (node->hashnulls)
 			ResetTupleHashTable(node->hashnulls);
 		else
+		{
+			int			workmem_limit;
+
+			workmem_limit = workMemLimitFromId(planstate,
+											   subplan->hashnul_workmem_id);
+
 			node->hashnulls = BuildTupleHashTable(node->parent,
 												  node->descRight,
 												  &TTSOpsVirtual,
@@ -575,10 +589,12 @@ buildSubPlanHash(SubPlanState *node, ExprContext *econtext)
 												  node->tab_collations,
 												  nbuckets,
 												  0,
+												  (Size) workmem_limit * 1024,
 												  node->planstate->state->es_query_cxt,
 												  node->hashtablecxt,
 												  node->hashtempcxt,
 												  false);
+		}
 	}
 	else
 		node->hashnulls = NULL;
diff --git a/src/backend/executor/nodeTableFuncscan.c b/src/backend/executor/nodeTableFuncscan.c
index 83ade3f9437..f679bd67bee 100644
--- a/src/backend/executor/nodeTableFuncscan.c
+++ b/src/backend/executor/nodeTableFuncscan.c
@@ -276,7 +276,8 @@ tfuncFetchRows(TableFuncScanState *tstate, ExprContext *econtext)
 
 	/* build tuplestore for the result */
 	oldcxt = MemoryContextSwitchTo(econtext->ecxt_per_query_memory);
-	tstate->tupstore = tuplestore_begin_heap(false, false, work_mem);
+	tstate->tupstore = tuplestore_begin_heap(false, false,
+											 workMemLimit(tstate));
 
 	/*
 	 * Each call to fetch a new set of rows - of which there may be very many
diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c
index 9a1acce2b5d..7660aa626b6 100644
--- a/src/backend/executor/nodeWindowAgg.c
+++ b/src/backend/executor/nodeWindowAgg.c
@@ -1092,7 +1092,8 @@ prepare_tuplestore(WindowAggState *winstate)
 	Assert(winstate->buffer == NULL);
 
 	/* Create new tuplestore */
-	winstate->buffer = tuplestore_begin_heap(false, false, work_mem);
+	winstate->buffer = tuplestore_begin_heap(false, false,
+											 workMemLimit(winstate));
 
 	/*
 	 * Set up read pointers for the tuplestore.  The current pointer doesn't
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index 344a3188317..353f51fdff2 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -102,6 +102,7 @@
 #include "optimizer/paths.h"
 #include "optimizer/placeholder.h"
 #include "optimizer/plancat.h"
+#include "optimizer/planmain.h"
 #include "optimizer/restrictinfo.h"
 #include "parser/parsetree.h"
 #include "utils/lsyscache.h"
@@ -2833,7 +2834,8 @@ cost_agg(Path *path, PlannerInfo *root,
 		hashentrysize = hash_agg_entry_size(list_length(root->aggtransinfos),
 											input_width,
 											aggcosts->transitionSpace);
-		hash_agg_set_limits(hashentrysize, numGroups, 0, &mem_limit,
+		hash_agg_set_limits(hashentrysize, numGroups, 0,
+							get_hash_memory_limit(), &mem_limit,
 							&ngroups_limit, &num_partitions);
 
 		nbatches = Max((numGroups * hashentrysize) / mem_limit,
@@ -4256,6 +4258,7 @@ initial_cost_hashjoin(PlannerInfo *root, JoinCostWorkspace *workspace,
 							true,	/* useskew */
 							parallel_hash,	/* try_combined_hash_mem */
 							outer_path->parallel_workers,
+							get_hash_memory_limit(),
 							&space_allowed,
 							&numbuckets,
 							&numbatches,
@@ -4583,6 +4586,17 @@ cost_subplan(PlannerInfo *root, SubPlan *subplan, Plan *plan)
 		sp_cost.startup += plan->total_cost +
 			cpu_operator_cost * plan->plan_rows;
 
+		/*
+		 * Working memory needed for the hashtable (and hashnulls, if needed).
+		 */
+		subplan->hashtab_workmem_id = add_hash_workmem(root->glob);
+
+		if (!subplan->unknownEqFalse)
+		{
+			/* Also needs a hashnulls table.  */
+			subplan->hashnul_workmem_id = add_hash_workmem(root->glob);
+		}
+
 		/*
 		 * The per-tuple costs include the cost of evaluating the lefthand
 		 * expressions, plus the cost of probing the hashtable.  We already
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index bfefc7dbea1..22834fe37f4 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -1706,6 +1706,8 @@ create_material_plan(PlannerInfo *root, MaterialPath *best_path, int flags)
 
 	copy_generic_path_info(&plan->plan, (Path *) best_path);
 
+	plan->plan.workmem_id = add_workmem(root->glob);
+
 	return plan;
 }
 
@@ -1761,6 +1763,8 @@ create_memoize_plan(PlannerInfo *root, MemoizePath *best_path, int flags)
 
 	copy_generic_path_info(&plan->plan, (Path *) best_path);
 
+	plan->plan.workmem_id = add_hash_workmem(root->glob);
+
 	return plan;
 }
 
@@ -1907,6 +1911,8 @@ create_unique_plan(PlannerInfo *root, UniquePath *best_path, int flags)
 								 best_path->path.rows,
 								 0,
 								 subplan);
+
+		plan->workmem_id = add_hash_workmem(root->glob);
 	}
 	else
 	{
@@ -2253,6 +2259,8 @@ create_sort_plan(PlannerInfo *root, SortPath *best_path, int flags)
 
 	copy_generic_path_info(&plan->plan, (Path *) best_path);
 
+	plan->plan.workmem_id = add_workmem(root->glob);
+
 	return plan;
 }
 
@@ -2279,6 +2287,8 @@ create_incrementalsort_plan(PlannerInfo *root, IncrementalSortPath *best_path,
 
 	copy_generic_path_info(&plan->sort.plan, (Path *) best_path);
 
+	plan->sort.plan.workmem_id = add_workmem(root->glob);
+
 	return plan;
 }
 
@@ -2390,6 +2400,12 @@ create_agg_plan(PlannerInfo *root, AggPath *best_path)
 
 	copy_generic_path_info(&plan->plan, (Path *) best_path);
 
+	if (plan->aggstrategy == AGG_HASHED)
+		plan->plan.workmem_id = add_hash_workmem(root->glob);
+
+	/* Also include working memory needed to sort the input: */
+	plan->sortWorkMemId = add_workmem(root->glob);
+
 	return plan;
 }
 
@@ -2443,6 +2459,7 @@ static Plan *
 create_groupingsets_plan(PlannerInfo *root, GroupingSetsPath *best_path)
 {
 	Agg		   *plan;
+	Agg		   *first_sort_agg = NULL;
 	Plan	   *subplan;
 	List	   *rollups = best_path->rollups;
 	AttrNumber *grouping_map;
@@ -2508,7 +2525,7 @@ create_groupingsets_plan(PlannerInfo *root, GroupingSetsPath *best_path)
 			RollupData *rollup = lfirst(lc);
 			AttrNumber *new_grpColIdx;
 			Plan	   *sort_plan = NULL;
-			Plan	   *agg_plan;
+			Agg		   *agg_plan;
 			AggStrategy strat;
 
 			new_grpColIdx = remap_groupColIdx(root, rollup->groupClause);
@@ -2531,19 +2548,19 @@ create_groupingsets_plan(PlannerInfo *root, GroupingSetsPath *best_path)
 			else
 				strat = AGG_SORTED;
 
-			agg_plan = (Plan *) make_agg(NIL,
-										 NIL,
-										 strat,
-										 AGGSPLIT_SIMPLE,
-										 list_length((List *) linitial(rollup->gsets)),
-										 new_grpColIdx,
-										 extract_grouping_ops(rollup->groupClause),
-										 extract_grouping_collations(rollup->groupClause, subplan->targetlist),
-										 rollup->gsets,
-										 NIL,
-										 rollup->numGroups,
-										 best_path->transitionSpace,
-										 sort_plan);
+			agg_plan = make_agg(NIL,
+								NIL,
+								strat,
+								AGGSPLIT_SIMPLE,
+								list_length((List *) linitial(rollup->gsets)),
+								new_grpColIdx,
+								extract_grouping_ops(rollup->groupClause),
+								extract_grouping_collations(rollup->groupClause, subplan->targetlist),
+								rollup->gsets,
+								NIL,
+								rollup->numGroups,
+								best_path->transitionSpace,
+								sort_plan);
 
 			/*
 			 * Remove stuff we don't need to avoid bloating debug output.
@@ -2554,6 +2571,12 @@ create_groupingsets_plan(PlannerInfo *root, GroupingSetsPath *best_path)
 				sort_plan->lefttree = NULL;
 			}
 
+			if (agg_plan->aggstrategy == AGG_SORTED && !first_sort_agg)
+			{
+				/* This might be the first Sort agg. */
+				first_sort_agg = agg_plan;
+			}
+
 			chain = lappend(chain, agg_plan);
 		}
 	}
@@ -2586,6 +2609,29 @@ create_groupingsets_plan(PlannerInfo *root, GroupingSetsPath *best_path)
 
 		/* Copy cost data from Path to Plan */
 		copy_generic_path_info(&plan->plan, &best_path->path);
+
+		/*
+		 * NOTE: We will place the workmem needed to sort the input (if any)
+		 * on the first agg, the Hash workmem on the first Hash agg, and the
+		 * Sort workmem (if any) on the first Sort agg.
+		 */
+		if (plan->aggstrategy == AGG_HASHED || plan->aggstrategy == AGG_MIXED)
+		{
+			/* All Hash Grouping Sets share the same workmem limit. */
+			plan->plan.workmem_id = add_hash_workmem(root->glob);
+		}
+		else if (plan->aggstrategy == AGG_SORTED)
+		{
+			/* Every Sort Grouping Set gets its own workmem limit. */
+			first_sort_agg = plan;
+		}
+
+		/* Store the workmem limit, for all Sorts, on the first Sort. */
+		if (first_sort_agg)
+			first_sort_agg->plan.workmem_id = add_workmem(root->glob);
+
+		/* Also include working memory needed to sort the input: */
+		plan->sortWorkMemId = add_workmem(root->glob);
 	}
 
 	return (Plan *) plan;
@@ -2750,6 +2796,8 @@ create_windowagg_plan(PlannerInfo *root, WindowAggPath *best_path)
 
 	copy_generic_path_info(&plan->plan, (Path *) best_path);
 
+	plan->plan.workmem_id = add_workmem(root->glob);
+
 	return plan;
 }
 
@@ -2790,6 +2838,8 @@ create_setop_plan(PlannerInfo *root, SetOpPath *best_path, int flags)
 
 	copy_generic_path_info(&plan->plan, (Path *) best_path);
 
+	plan->plan.workmem_id = add_hash_workmem(root->glob);
+
 	return plan;
 }
 
@@ -2826,6 +2876,12 @@ create_recursiveunion_plan(PlannerInfo *root, RecursiveUnionPath *best_path)
 
 	copy_generic_path_info(&plan->plan, (Path *) best_path);
 
+	plan->plan.workmem_id = add_workmem(root->glob);
+
+	/* Also include working memory for hash table. */
+	if (plan->numCols > 0)
+		plan->hashWorkMemId = add_hash_workmem(root->glob);
+
 	return plan;
 }
 
@@ -3532,6 +3588,9 @@ create_bitmap_subplan(PlannerInfo *root, Path *bitmapqual,
 		plan->plan_width = 0;	/* meaningless */
 		plan->parallel_aware = false;
 		plan->parallel_safe = ipath->path.parallel_safe;
+
+		plan->workmem_id = add_workmem(root->glob);
+
 		/* Extract original index clauses, actual index quals, relevant ECs */
 		subquals = NIL;
 		subindexquals = NIL;
@@ -3839,6 +3898,8 @@ create_functionscan_plan(PlannerInfo *root, Path *best_path,
 
 	copy_generic_path_info(&scan_plan->scan.plan, best_path);
 
+	scan_plan->scan.plan.workmem_id = add_workmem(root->glob);
+
 	return scan_plan;
 }
 
@@ -3882,6 +3943,8 @@ create_tablefuncscan_plan(PlannerInfo *root, Path *best_path,
 
 	copy_generic_path_info(&scan_plan->scan.plan, best_path);
 
+	scan_plan->scan.plan.workmem_id = add_workmem(root->glob);
+
 	return scan_plan;
 }
 
@@ -4020,6 +4083,8 @@ create_ctescan_plan(PlannerInfo *root, Path *best_path,
 
 	copy_generic_path_info(&scan_plan->scan.plan, best_path);
 
+	scan_plan->scan.plan.workmem_id = add_workmem(root->glob);
+
 	return scan_plan;
 }
 
@@ -4722,6 +4787,8 @@ create_mergejoin_plan(PlannerInfo *root,
 		copy_plan_costsize(matplan, inner_plan);
 		matplan->total_cost += cpu_operator_cost * matplan->plan_rows;
 
+		matplan->workmem_id = add_workmem(root->glob);
+
 		inner_plan = matplan;
 	}
 
@@ -5067,6 +5134,9 @@ create_hashjoin_plan(PlannerInfo *root,
 
 	copy_generic_path_info(&join_plan->join.plan, &best_path->jpath.path);
 
+	/* Assign workmem to the Hash subnode, not its parent HashJoin node. */
+	hash_plan->plan.workmem_id = add_hash_workmem(root->glob);
+
 	return join_plan;
 }
 
@@ -5619,6 +5689,8 @@ label_sort_with_costsize(PlannerInfo *root, Sort *plan, double limit_tuples)
 	plan->plan.plan_width = lefttree->plan_width;
 	plan->plan.parallel_aware = false;
 	plan->plan.parallel_safe = lefttree->parallel_safe;
+
+	plan->plan.workmem_id = add_workmem(root->glob);
 }
 
 /*
@@ -5650,6 +5722,8 @@ label_incrementalsort_with_costsize(PlannerInfo *root, IncrementalSort *plan,
 	plan->sort.plan.plan_width = lefttree->plan_width;
 	plan->sort.plan.parallel_aware = false;
 	plan->sort.plan.parallel_safe = lefttree->parallel_safe;
+
+	plan->sort.plan.workmem_id = add_workmem(root->glob);
 }
 
 /*
@@ -6701,14 +6775,14 @@ make_material(Plan *lefttree)
 
 /*
  * materialize_finished_plan: stick a Material node atop a completed plan
- *
+ *r/
  * There are a couple of places where we want to attach a Material node
  * after completion of create_plan(), without any MaterialPath path.
  * Those places should probably be refactored someday to do this on the
  * Path representation, but it's not worth the trouble yet.
  */
 Plan *
-materialize_finished_plan(Plan *subplan)
+materialize_finished_plan(PlannerGlobal *glob, Plan *subplan)
 {
 	Plan	   *matplan;
 	Path		matpath;		/* dummy for result of cost_material */
@@ -6747,6 +6821,8 @@ materialize_finished_plan(Plan *subplan)
 	matplan->parallel_aware = false;
 	matplan->parallel_safe = subplan->parallel_safe;
 
+	matplan->workmem_id = add_workmem(glob);
+
 	return matplan;
 }
 
@@ -7512,3 +7588,41 @@ is_projection_capable_plan(Plan *plan)
 	}
 	return true;
 }
+
+static int
+add_workmem_internal(PlannerGlobal *glob, WorkMemCategory category)
+{
+	glob->workMemCategories = lappend_int(glob->workMemCategories, category);
+	/* the executor will fill this in later: */
+	glob->workMemLimits = lappend_int(glob->workMemLimits, 0);
+
+	Assert(list_length(glob->workMemCategories) ==
+		   list_length(glob->workMemLimits));
+
+	return list_length(glob->workMemCategories);
+}
+
+/*
+ * add_workmem
+ *	  Add (non-hash) workmem info to the glob's lists
+ *
+ * This data structure will have its working-memory limit set to work_mem.
+ */
+int
+add_workmem(PlannerGlobal *glob)
+{
+	return add_workmem_internal(glob, WORKMEM_NORMAL);
+}
+
+/*
+ * add_hash_workmem
+ *	  Add hash workmem info to the glob's lists
+ *
+ * This data structure will have its working-memory limit set to work_mem *
+ * hash_mem_multiplier.
+ */
+int
+add_hash_workmem(PlannerGlobal *glob)
+{
+	return add_workmem_internal(glob, WORKMEM_HASH);
+}
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index d59d6e4c6a0..a431808be96 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -447,7 +447,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
 	if (cursorOptions & CURSOR_OPT_SCROLL)
 	{
 		if (!ExecSupportsBackwardScan(top_plan))
-			top_plan = materialize_finished_plan(top_plan);
+			top_plan = materialize_finished_plan(glob, top_plan);
 	}
 
 	/*
@@ -584,6 +584,9 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
 	result->stmt_location = parse->stmt_location;
 	result->stmt_len = parse->stmt_len;
 
+	result->workMemCategories = glob->workMemCategories;
+	result->workMemLimits = glob->workMemLimits;
+
 	result->jitFlags = PGJIT_NONE;
 	if (jit_enabled && jit_above_cost >= 0 &&
 		top_plan->total_cost > jit_above_cost)
diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c
index d71ed958e31..8bc99aa8bc1 100644
--- a/src/backend/optimizer/plan/subselect.c
+++ b/src/backend/optimizer/plan/subselect.c
@@ -533,7 +533,7 @@ build_subplan(PlannerInfo *root, Plan *plan, Path *path,
 		 */
 		else if (splan->parParam == NIL && enable_material &&
 				 !ExecMaterializesOutput(nodeTag(plan)))
-			plan = materialize_finished_plan(plan);
+			plan = materialize_finished_plan(root->glob, plan);
 
 		result = (Node *) splan;
 		isInitPlan = false;
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index a71502efeed..6008e3bc63c 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -140,6 +140,7 @@ extern TupleHashTable BuildTupleHashTable(PlanState *parent,
 										  Oid *collations,
 										  long nbuckets,
 										  Size additionalsize,
+										  Size hash_mem_limit,
 										  MemoryContext metacxt,
 										  MemoryContext tablecxt,
 										  MemoryContext tempcxt,
@@ -559,6 +560,7 @@ extern Tuplestorestate *ExecMakeTableFunctionResult(SetExprState *setexpr,
 													ExprContext *econtext,
 													MemoryContext argContext,
 													TupleDesc expectedDesc,
+													int workMem,
 													bool randomAccess);
 extern SetExprState *ExecInitFunctionResultSet(Expr *expr,
 											   ExprContext *econtext, PlanState *parent);
@@ -796,4 +798,9 @@ extern ResultRelInfo *ExecLookupResultRelByOid(ModifyTableState *node,
 											   bool missing_ok,
 											   bool update_cache);
 
+/*
+ * prototypes from functions in execWorkmem.c
+ */
+extern void ExecAssignWorkMem(PlannedStmt *plannedstmt);
+
 #endif							/* EXECUTOR_H  */
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index ecff4842fd3..9b184c47322 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -253,7 +253,8 @@ typedef struct ParallelHashJoinState
 	ParallelHashGrowth growth;	/* control batch/bucket growth */
 	dsa_pointer chunk_work_queue;	/* chunk work queue */
 	int			nparticipants;
-	size_t		space_allowed;
+	size_t		space_allowed;	/* -- might be shared with other workers */
+	size_t		worker_space_allowed;	/* -- exclusive to this worker */
 	size_t		total_tuples;	/* total number of inner tuples */
 	LWLock		lock;			/* lock protecting the above */
 
diff --git a/src/include/executor/nodeAgg.h b/src/include/executor/nodeAgg.h
index 6c4891bbaeb..fd8ed34178f 100644
--- a/src/include/executor/nodeAgg.h
+++ b/src/include/executor/nodeAgg.h
@@ -329,7 +329,8 @@ extern void ExecReScanAgg(AggState *node);
 extern Size hash_agg_entry_size(int numTrans, Size tupleWidth,
 								Size transitionSpace);
 extern void hash_agg_set_limits(double hashentrysize, double input_groups,
-								int used_bits, Size *mem_limit,
+								int used_bits,
+								Size hash_mem_limit, Size *mem_limit,
 								uint64 *ngroups_limit, int *num_partitions);
 
 /* parallel instrumentation support */
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index 3c1a09415aa..e4e9e0d1de1 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -59,7 +59,8 @@ extern void ExecHashTableResetMatchFlags(HashJoinTable hashtable);
 extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
 									bool try_combined_hash_mem,
 									int parallel_workers,
-									size_t *space_allowed,
+									size_t worker_space_allowed,
+									size_t *total_space_allowed,
 									int *numbuckets,
 									int *numbatches,
 									int *num_skew_mcvs);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index e107d6e5f81..d543011d92a 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1276,6 +1276,19 @@ typedef struct PlanState
 			((PlanState *)(node))->instrument->nfiltered2 += (delta); \
 	} while(0)
 
+/* macros for fetching the workmem info associated with a PlanState */
+#define workMemFieldFromId(node, field, id)								\
+	(list_nth_int(((PlanState *)(node))->state->es_plannedstmt->field, \
+				  (id) - 1))
+#define workMemField(node, field)   \
+	(workMemFieldFromId((node), field, ((PlanState *)(node))->plan->workmem_id))
+
+/* workmem limit: */
+#define workMemLimitFromId(node, id) \
+	(workMemFieldFromId(node, workMemLimits, id))
+#define workMemLimit(node) \
+	(workMemField(node, workMemLimits))
+
 /*
  * EPQState is state for executing an EvalPlanQual recheck on a candidate
  * tuples e.g. in ModifyTable or LockRows.
diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h
index ad2726f026f..181437ac933 100644
--- a/src/include/nodes/pathnodes.h
+++ b/src/include/nodes/pathnodes.h
@@ -182,6 +182,17 @@ typedef struct PlannerGlobal
 
 	/* hash table for NOT NULL attnums of relations */
 	struct HTAB *rel_notnullatts_hash pg_node_attr(read_write_ignore);
+
+	/*
+	 * Working-memory info, for Plan and SubPlans. Any Plan or SubPlan that
+	 * needs working memory for a data structure maintains a "workmem_id"
+	 * index into the following lists (all kept in sync).
+	 */
+
+	/* - IntList (of WorkMemCategory): is this a Hash or "normal" limit? */
+	List	   *workMemCategories;
+	/* - IntList: limit (in KB), after which data structure must spill */
+	List	   *workMemLimits;
 } PlannerGlobal;
 
 /* macro for fetching the Plan associated with a SubPlan node */
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index 29d7732d6a0..ba8fdc2e6db 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -154,13 +154,23 @@ typedef struct PlannedStmt
 	ParseLoc	stmt_location;
 	/* length in bytes; 0 means "rest of string" */
 	ParseLoc	stmt_len;
+
+	/*
+	 * Working-memory info, for Plan and SubPlans. Any Plan or SubPlan that
+	 * needs working memory for a data structure maintains a "workmem_id"
+	 * index into the following lists (all kept in sync).
+	 */
+
+	/* - IntList (of WorkMemCategory): is this a Hash or "normal" limit? */
+	List	   *workMemCategories;
+	/* - IntList: limit (in KB), after which data structure must spill */
+	List	   *workMemLimits;
 } PlannedStmt;
 
 /* macro for fetching the Plan associated with a SubPlan node */
 #define exec_subplan_get_plan(plannedstmt, subplan) \
 	((Plan *) list_nth((plannedstmt)->subplans, (subplan)->plan_id - 1))
 
-
 /* ----------------
  *		Plan node
  *
@@ -216,6 +226,8 @@ typedef struct Plan
 	 */
 	/* unique across entire final plan tree */
 	int			plan_node_id;
+	/* 1-based id of workMem to use, or else zero */
+	int			workmem_id;
 	/* target list to be computed at this node */
 	List	   *targetlist;
 	/* implicitly-ANDed qual conditions */
@@ -447,6 +459,9 @@ typedef struct RecursiveUnion
 
 	/* estimated number of groups in input */
 	long		numGroups;
+
+	/* 1-based id of workMem to use for hash table, or else zero */
+	int			hashWorkMemId;
 } RecursiveUnion;
 
 /* ----------------
@@ -1176,6 +1191,9 @@ typedef struct Agg
 	Oid		   *grpOperators pg_node_attr(array_size(numCols));
 	Oid		   *grpCollations pg_node_attr(array_size(numCols));
 
+	/* 1-based id of workMem to use to sort inputs, or else zero */
+	int			sortWorkMemId;
+
 	/* estimated number of groups in input */
 	long		numGroups;
 
@@ -1792,4 +1810,11 @@ typedef enum MonotonicFunction
 	MONOTONICFUNC_BOTH = MONOTONICFUNC_INCREASING | MONOTONICFUNC_DECREASING,
 } MonotonicFunction;
 
+/* different data structures get different working-memory limits*/
+typedef enum WorkMemCategory
+{
+	WORKMEM_NORMAL,				/* gets work_mem */
+	WORKMEM_HASH,				/* gets hash_mem_multiplier * work_mem */
+}			WorkMemCategory;
+
 #endif							/* PLANNODES_H */
diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h
index 6dfca3cb35b..c55b3cb356e 100644
--- a/src/include/nodes/primnodes.h
+++ b/src/include/nodes/primnodes.h
@@ -1111,6 +1111,9 @@ typedef struct SubPlan
 	/* Estimated execution costs: */
 	Cost		startup_cost;	/* one-time setup cost */
 	Cost		per_call_cost;	/* cost for each subplan evaluation */
+	/* 1-based id of workMem to use, or else zero: */
+	int			hashtab_workmem_id; /* for hash table */
+	int			hashnul_workmem_id; /* for NULLs hash table */
 } SubPlan;
 
 /*
diff --git a/src/include/optimizer/planmain.h b/src/include/optimizer/planmain.h
index 9d3debcab28..8436136026b 100644
--- a/src/include/optimizer/planmain.h
+++ b/src/include/optimizer/planmain.h
@@ -46,9 +46,11 @@ extern ForeignScan *make_foreignscan(List *qptlist, List *qpqual,
 									 Plan *outer_plan);
 extern Plan *change_plan_targetlist(Plan *subplan, List *tlist,
 									bool tlist_parallel_safe);
-extern Plan *materialize_finished_plan(Plan *subplan);
+extern Plan *materialize_finished_plan(PlannerGlobal *glob, Plan *subplan);
 extern bool is_projection_capable_path(Path *path);
 extern bool is_projection_capable_plan(Plan *plan);
+extern int	add_workmem(PlannerGlobal *glob);
+extern int	add_hash_workmem(PlannerGlobal *glob);
 
 /* External use of these functions is deprecated: */
 extern Sort *make_sort_from_sortclauses(List *sortcls, Plan *lefttree);
-- 
2.39.5

