From a0dd541ad4e47735fc388d0c553e935f0956f254 Mon Sep 17 00:00:00 2001
From: Jehan-Guillaume de Rorthais <jgdr@dalibo.com>
Date: Mon, 27 Mar 2023 15:54:39 +0200
Subject: [PATCH] Allocate hash batches related BufFile in a dedicated context

---
 src/backend/executor/nodeHash.c     | 39 +++++++++++++++++++++++++++--
 src/backend/executor/nodeHashjoin.c | 14 ++++++++---
 src/include/executor/hashjoin.h     |  1 +
 3 files changed, 49 insertions(+), 5 deletions(-)

diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 748c9b0024..3f1ae9755e 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -484,7 +484,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
 	 *
 	 * The hashtable control block is just palloc'd from the executor's
 	 * per-query memory context.  Everything else should be kept inside the
-	 * subsidiary hashCxt or batchCxt.
+	 * subsidiary hashCxt, batchCxt or fileCxt.
 	 */
 	hashtable = palloc_object(HashJoinTableData);
 	hashtable->nbuckets = nbuckets;
@@ -538,6 +538,10 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
 												"HashBatchContext",
 												ALLOCSET_DEFAULT_SIZES);
 
+	hashtable->fileCxt = AllocSetContextCreate(CurrentMemoryContext,
+								 "HashBatchFiles",
+								 ALLOCSET_DEFAULT_SIZES);
+
 	/* Allocate data that will live for the life of the hashjoin */
 
 	oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
@@ -570,15 +574,21 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
 
 	if (nbatch > 1 && hashtable->parallel_state == NULL)
 	{
+		MemoryContext oldctx;
+
 		/*
 		 * allocate and initialize the file arrays in hashCxt (not needed for
 		 * parallel case which uses shared tuplestores instead of raw files)
 		 */
+		oldctx = MemoryContextSwitchTo(hashtable->fileCxt);
+
 		hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch);
 		hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch);
 		/* The files will not be opened until needed... */
 		/* ... but make sure we have temp tablespaces established for them */
 		PrepareTempTablespaces();
+
+		MemoryContextSwitchTo(oldctx);
 	}
 
 	MemoryContextSwitchTo(oldcxt);
@@ -934,7 +944,7 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 		   hashtable, nbatch, hashtable->spaceUsed);
 #endif
 
-	oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
+	oldcxt = MemoryContextSwitchTo(hashtable->fileCxt);
 
 	if (hashtable->innerBatchFile == NULL)
 	{
@@ -1020,12 +1030,19 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 			}
 			else
 			{
+				MemoryContext oldctx;
+
 				/* dump it out */
 				Assert(batchno > curbatch);
+
+				oldctx = MemoryContextSwitchTo(hashtable->fileCxt);
+
 				ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple),
 									  hashTuple->hashvalue,
 									  &hashtable->innerBatchFile[batchno]);
 
+				MemoryContextSwitchTo(oldctx);
+
 				hashtable->spaceUsed -= hashTupleSize;
 				nfreed++;
 			}
@@ -1042,6 +1059,9 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 		oldchunks = nextchunk;
 	}
 
+	if (hashtable->fileCxt->mem_allocated > hashtable->spaceAllowed)
+		elog(WARNING, "Growing number of hash batch is exhausting memory");
+
 #ifdef HJDEBUG
 	printf("Hashjoin %p: freed %ld of %ld tuples, space now %zu\n",
 		   hashtable, nfreed, ninmemory, hashtable->spaceUsed);
@@ -1677,13 +1697,20 @@ ExecHashTableInsert(HashJoinTable hashtable,
 	}
 	else
 	{
+		MemoryContext oldctx;
+
 		/*
 		 * put the tuple into a temp file for later batches
 		 */
 		Assert(batchno > hashtable->curbatch);
+
+		oldctx = MemoryContextSwitchTo(hashtable->fileCxt);
+
 		ExecHashJoinSaveTuple(tuple,
 							  hashvalue,
 							  &hashtable->innerBatchFile[batchno]);
+
+		MemoryContextSwitchTo(oldctx);
 	}
 
 	if (shouldFree)
@@ -2532,10 +2559,18 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
 		}
 		else
 		{
+			MemoryContext oldctx;
+
 			/* Put the tuple into a temp file for later batches */
 			Assert(batchno > hashtable->curbatch);
+
+			oldctx = MemoryContextSwitchTo(hashtable->fileCxt);
+
 			ExecHashJoinSaveTuple(tuple, hashvalue,
 								  &hashtable->innerBatchFile[batchno]);
+
+			MemoryContextSwitchTo(oldctx);
+
 			pfree(hashTuple);
 			hashtable->spaceUsed -= tupleSize;
 			hashtable->spaceUsedSkew -= tupleSize;
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index f189fb4d28..ba1f27c2c4 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -422,6 +422,7 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 				if (batchno != hashtable->curbatch &&
 					node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO)
 				{
+					MemoryContext oldctx;
 					bool		shouldFree;
 					MinimalTuple mintuple = ExecFetchSlotMinimalTuple(outerTupleSlot,
 																	  &shouldFree);
@@ -432,9 +433,14 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 					 */
 					Assert(parallel_state == NULL);
 					Assert(batchno > hashtable->curbatch);
+
+					oldctx = MemoryContextSwitchTo(hashtable->fileCxt);
+
 					ExecHashJoinSaveTuple(mintuple, hashvalue,
 										  &hashtable->outerBatchFile[batchno]);
 
+					MemoryContextSwitchTo(oldctx);
+
 					if (shouldFree)
 						heap_free_minimal_tuple(mintuple);
 
@@ -1234,9 +1240,9 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
  * The data recorded in the file for each tuple is its hash value,
  * then the tuple in MinimalTuple format.
  *
- * Note: it is important always to call this in the regular executor
- * context, not in a shorter-lived context; else the temp file buffers
- * will get messed up.
+ * Note: it is important always to call this in the HashBatchFiles context,
+ * not in a shorter-lived context; else the temp file buffers will get messed
+ * up.
  */
 void
 ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
@@ -1246,6 +1252,8 @@ ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
 
 	if (file == NULL)
 	{
+		Assert(strcmp(CurrentMemoryContext->name, "HashBatchFiles") == 0);
+
 		/* First write to this batch file, so open it. */
 		file = BufFileCreateTemp(false);
 		*fileptr = file;
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index acb7592ca0..d36beb7229 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -348,6 +348,7 @@ typedef struct HashJoinTableData
 
 	MemoryContext hashCxt;		/* context for whole-hash-join storage */
 	MemoryContext batchCxt;		/* context for this-batch-only storage */
+	MemoryContext fileCxt;		/* context for the BufFile related storage */
 
 	/* used for dense allocation of tuples (into linked chunks) */
 	HashMemoryChunk chunks;		/* one list for the whole batch */
-- 
2.39.2

