diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 6f8a379e3b9..6cd0a3328bb 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -79,7 +79,8 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable,
 										  size_t size);
 static void ExecParallelHashMergeCounters(HashJoinTable hashtable);
 static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable);
-
+static void ExecHashDumpBatchToFile(HashJoinTable hashtable);
+static void ExecHashHandleTooManyBatches(HashJoinTable hashtable, TupleTableSlot *slot);
 
 /* ----------------------------------------------------------------
  *		ExecHash
@@ -195,6 +196,13 @@ MultiExecPrivateHash(HashState *node)
 		}
 	}
 
+	/*
+	 * If we had to disable adding more batches while building the hash, now
+	 * is a good time to do the next phase - release all file buffers and split
+	 * each batch into new batches.
+	 */
+	ExecHashHandleTooManyBatches(hashtable, node->ps.ps_ResultTupleSlot);
+
 	/* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */
 	if (hashtable->nbuckets != hashtable->nbuckets_optimal)
 		ExecHashIncreaseNumBuckets(hashtable);
@@ -502,10 +510,17 @@ ExecHashTableCreate(HashState *state)
 	hashtable->skewBucketLen = 0;
 	hashtable->nSkewBuckets = 0;
 	hashtable->skewBucketNums = NULL;
-	hashtable->nbatch = nbatch;
 	hashtable->curbatch = 0;
-	hashtable->nbatch_original = nbatch;
-	hashtable->nbatch_outstart = nbatch;
+
+	/* limit the number of batches we can populate at once */
+	hashtable->nbatch_maximum = HASHJOIN_BATCHES_PER_PHASE;
+	hashtable->nbatch = Min(nbatch, hashtable->nbatch_maximum);
+	hashtable->curbatch = 0;
+	hashtable->nbatch_original = Min(nbatch, hashtable->nbatch_maximum);
+	hashtable->nbatch_outstart = Min(nbatch, hashtable->nbatch_maximum);
+
+	/* assume we haven't hit the batch count limit */
+	hashtable->tooManyBatches = (nbatch > HASHJOIN_BATCHES_PER_PHASE);
 	hashtable->growEnabled = true;
 	hashtable->totalTuples = 0;
 	hashtable->partialTuples = 0;
@@ -874,7 +889,8 @@ ExecHashTableDestroy(HashJoinTable hashtable)
 	 */
 	if (hashtable->innerBatchFile != NULL)
 	{
-		for (i = 1; i < hashtable->nbatch; i++)
+		/* XXX We might have allocated a file for batch 0, so release it. */
+		for (i = 0; i < hashtable->nbatch; i++)
 		{
 			if (hashtable->innerBatchFile[i])
 				BufFileClose(hashtable->innerBatchFile[i]);
@@ -905,6 +921,12 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 	long		nfreed;
 	HashMemoryChunk oldchunks;
 
+	/*
+	 * We should never get here after hitting too many batches, and switching
+	 * to spilling data for all batches.
+	 */
+	Assert(!hashtable->tooManyBatches);
+
 	/* do nothing if we've decided to shut off growth */
 	if (!hashtable->growEnabled)
 		return;
@@ -1054,6 +1076,166 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 	}
 }
 
+/*
+ * ExecHashDumpBatchToFile
+ *		Dump the current hash table into a file for the current batch.
+ *
+ * Once we hit the number of batches we're populating, we stop doubling the
+ * number of batches. But we also can't keep the current batch in memory, so
+ * instead we dump the data to file and will read it later after we increase
+ * the number of batches (at which point we won't need to keep the current
+ * batches).
+ *
+ * XXX This is virtually the same thing we do in ExecHashIncreaseNumBatches.
+ */
+static void
+ExecHashDumpBatchToFile(HashJoinTable hashtable)
+{
+	HashMemoryChunk oldchunks;
+	int			curbatch = hashtable->curbatch;
+
+	memset(hashtable->buckets.unshared, 0,
+		   sizeof(HashJoinTuple) * hashtable->nbuckets);
+	oldchunks = hashtable->chunks;
+	hashtable->chunks = NULL;
+
+	/* so, let's scan through the old chunks, and all tuples in each chunk */
+	while (oldchunks != NULL)
+	{
+		HashMemoryChunk nextchunk = oldchunks->next.unshared;
+
+		/* position within the buffer (up to oldchunks->used) */
+		size_t		idx = 0;
+
+		/* process all tuples stored in this chunk (and then free it) */
+		while (idx < oldchunks->used)
+		{
+			HashJoinTuple hashTuple = (HashJoinTuple) (HASH_CHUNK_DATA(oldchunks) + idx);
+			MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
+			int			hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len);
+			int			bucketno;
+			int			batchno;
+
+			ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
+									  &bucketno, &batchno);
+
+			/* everything belongs to current batch, we're not adding any */
+			Assert(batchno == curbatch);
+
+			/* dump it out */
+			ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple),
+								  hashTuple->hashvalue,
+								  &hashtable->innerBatchFile[batchno],
+								  hashtable);
+
+			hashtable->spaceUsed -= hashTupleSize;
+
+			/* next tuple in this chunk */
+			idx += MAXALIGN(hashTupleSize);
+
+			/* allow this loop to be cancellable */
+			CHECK_FOR_INTERRUPTS();
+		}
+
+		/* we're done with this chunk - free it and proceed to the next one */
+		pfree(oldchunks);
+		oldchunks = nextchunk;
+	}
+}
+
+/*
+ * ExecHashHandleTooManyBatches
+ *		Handle the case when we hit the number of batches while building the
+ *		hash table.
+ *
+ * We can hit the situation again (e.g. we need 1M batches, but the limit is
+ * 128, so we go 128 -> 16384 -> 1M).
+ */
+static void
+ExecHashHandleTooManyBatches(HashJoinTable hashtable, TupleTableSlot *slot)
+{
+	if (!hashtable->tooManyBatches)
+		return;
+
+	/* didn't run into the issue, nothing to do */
+	while (hashtable->tooManyBatches)
+	{
+		int			nbatch;
+		int			oldnbatch;
+
+		BufFile *file = hashtable->innerBatchFile[hashtable->curbatch];
+
+		Assert(file != NULL);
+
+		/* XXX stupid way to increase number of batches */
+		oldnbatch = hashtable->nbatch;
+		nbatch = Max(hashtable->nbatch, hashtable->nbatch * 2);
+
+		hashtable->nbatch = nbatch;
+		hashtable->tooManyBatches = false;
+
+		/* enlarge arrays and zero out added entries */
+		hashtable->innerBatchFile = repalloc0_array(hashtable->innerBatchFile, BufFile *, oldnbatch, nbatch);
+		hashtable->outerBatchFile = repalloc0_array(hashtable->outerBatchFile, BufFile *, oldnbatch, nbatch);
+
+		/* set to NULL, so that we create a new / empty file for this batch */
+		hashtable->innerBatchFile[hashtable->curbatch] = NULL;
+
+		/* FIXME close/flush all current files (release the buffer) */
+		for (int i = 0; i < oldnbatch; i++)
+		{
+			if (hashtable->innerBatchFile[i] != NULL)
+				BufFileFreeBuffer(hashtable->innerBatchFile[i]);
+		}
+
+		BufFileSeek(file, 0, 0, SEEK_SET);
+
+		/* read the current tuples from the batch file, decide what
+		 * to do with them */
+		while (true)
+		{
+			uint32		hashvalue;
+			uint32		header[2];
+			size_t		nread;
+			MinimalTuple tuple;
+
+			/*
+			 * We check for interrupts here because this is typically taken as an
+			 * alternative code path to an ExecProcNode() call, which would include
+			 * such a check.
+			 */
+			CHECK_FOR_INTERRUPTS();
+
+			/*
+			 * Since both the hash value and the MinimalTuple length word are uint32,
+			 * we can read them both in one BufFileRead() call without any type
+			 * cheating.
+			 */
+			nread = BufFileReadMaybeEOF(file, header, sizeof(header), true);
+			if (nread == 0)				/* end of file */
+			{
+				ExecClearTuple(slot);
+				break;
+			}
+
+			hashvalue = header[0];
+			tuple = (MinimalTuple) palloc(header[1]);
+			tuple->t_len = header[1];
+
+			BufFileReadExact(file,
+							 (char *) tuple + sizeof(uint32),
+							 header[1] - sizeof(uint32));
+
+			ExecForceStoreMinimalTuple(tuple, slot, true);
+
+			ExecHashTableInsert(hashtable, slot, hashvalue);
+		}
+
+		/* close the old batch file */
+		BufFileClose(file);
+	}
+}
+
 /*
  * ExecParallelHashIncreaseNumBatches
  *		Every participant attached to grow_batches_barrier must run this
@@ -1625,8 +1807,11 @@ ExecHashTableInsert(HashJoinTable hashtable,
 
 	/*
 	 * decide whether to put the tuple in the hash table or a temp file
+	 *
+	 * XXX If we are already in "too many batches" situation, just write the
+	 * data directly to the temp file, even for current batch.
 	 */
-	if (batchno == hashtable->curbatch)
+	if (batchno == hashtable->curbatch && !hashtable->tooManyBatches)
 	{
 		/*
 		 * put the tuple in hash table
@@ -1678,14 +1863,29 @@ ExecHashTableInsert(HashJoinTable hashtable,
 		if (hashtable->spaceUsed +
 			hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
 			> hashtable->spaceAllowed)
-			ExecHashIncreaseNumBatches(hashtable);
+		{
+			/*
+			 * If we already hit the maximum for this phase, we can't increase
+			 * the number of batches further, so we stop and flush everything
+			 * into the temp file.
+			 */
+			if (hashtable->nbatch == hashtable->nbatch_maximum)
+			{
+				/* write everything from this batch to the temp file */
+				ExecHashDumpBatchToFile(hashtable);
+				hashtable->tooManyBatches = true;
+			}
+			else
+				ExecHashIncreaseNumBatches(hashtable);
+		}
 	}
 	else
 	{
 		/*
 		 * put the tuple into a temp file for later batches
 		 */
-		Assert(batchno > hashtable->curbatch);
+		Assert(batchno > hashtable->curbatch ||
+			   (hashtable->tooManyBatches && batchno == hashtable->curbatch));
 		ExecHashJoinSaveTuple(tuple,
 							  hashvalue,
 							  &hashtable->innerBatchFile[batchno],
@@ -2494,6 +2694,7 @@ ExecHashSkewTableInsert(HashJoinTable hashtable,
 	while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
 		ExecHashRemoveNextSkewBucket(hashtable);
 
+	/* XXX maybe should check nbatch_maximum? */
 	/* Check we are not over the total spaceAllowed, either */
 	if (hashtable->spaceUsed > hashtable->spaceAllowed)
 		ExecHashIncreaseNumBatches(hashtable);
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 5661ad76830..a7514ca5137 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -963,6 +963,107 @@ ExecEndHashJoin(HashJoinState *node)
 	ExecEndNode(innerPlanState(node));
 }
 
+/*
+ * ExecHashJoinRepartitionBatches
+ *		Build the hashes for the outer side, recursively to not need more than
+ *		HASHJOIN_BATCHES_PER_PHASE batches.
+ *
+ * XXX There's no recursion right now, we just do the first phase and leave
+ * the rest to the regular re-batching. This won't be enough if we need more
+ * than a single repartition cycle.
+ *
+ * XXX Does this need to worry about the skew buckets? Probably yes.
+ */
+static void
+ExecHashJoinRepartitionBatches(PlanState *outerNode,
+							   HashJoinState *hjstate)
+{
+	TupleTableSlot *slot;
+	HashJoinTable hashtable = hjstate->hj_HashTable;
+	int		nbatch = hashtable->nbatch;
+	int64	ntuples = 0;
+
+	/* fake the value so that we calculate the first phase batch */
+	hashtable->nbatch = HASHJOIN_BATCHES_PER_PHASE;
+
+	while (true)
+	{
+		/*
+		 * Check to see if first outer tuple was already fetched by
+		 * ExecHashJoin() and not used yet.
+		 */
+		slot = hjstate->hj_FirstOuterTupleSlot;
+		if (!TupIsNull(slot))
+			hjstate->hj_FirstOuterTupleSlot = NULL;
+		else
+			slot = ExecProcNode(outerNode);
+
+		while (!TupIsNull(slot))
+		{
+			uint32		hashvalue;
+			bool		isnull;
+
+			/*
+			 * We have to compute the tuple's hash value.
+			 */
+			ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
+
+			econtext->ecxt_outertuple = slot;
+
+			ResetExprContext(econtext);
+
+			hashvalue = DatumGetUInt32(ExecEvalExprSwitchContext(hjstate->hj_OuterHash,
+																  econtext,
+																  &isnull));
+
+			if (!isnull)
+			{
+				int	bucketno;
+				int	batchno;
+				bool		shouldFree;
+				MinimalTuple mintuple;
+
+				ntuples++;
+
+				/* remember outer relation is not empty for possible rescan */
+				hjstate->hj_OuterNotEmpty = true;
+
+				ExecHashGetBucketAndBatch(hashtable, hashvalue,
+										  &bucketno, &batchno);
+
+				/*
+				 * The tuple might not belong to the current batch (where
+				 * "current batch" includes the skew buckets if any).
+				 */
+				mintuple = ExecFetchSlotMinimalTuple(slot, &shouldFree);
+
+				/*
+				 * Need to postpone this outer tuple to a later batch.
+				 * Save it in the corresponding outer-batch file.
+				 */
+				Assert(batchno >= hashtable->curbatch);
+				ExecHashJoinSaveTuple(mintuple, hashvalue,
+									  &hashtable->outerBatchFile[batchno],
+									  hashtable);
+
+				if (shouldFree)
+					heap_free_minimal_tuple(mintuple);
+			}
+
+			/*
+			 * That tuple couldn't match because of a NULL, so discard it and
+			 * continue with the next one.
+			 */
+			slot = ExecProcNode(outerNode);
+		}
+
+		break;
+	}
+
+	/* restore the correct value */
+	hashtable->nbatch = nbatch;
+}
+
 /*
  * ExecHashJoinOuterGetTuple
  *
@@ -983,8 +1084,36 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode,
 	HashJoinTable hashtable = hjstate->hj_HashTable;
 	int			curbatch = hashtable->curbatch;
 	TupleTableSlot *slot;
+	bool		repartitioned = false;
 
-	if (curbatch == 0)			/* if it is the first pass */
+	/*
+	 * If this is the first batch, consider performing the repartitioning. This
+	 * will build HASHJOIN_BATCHES_PER_PHASE batches, which then need to be
+	 * rebatched using the regular logic.
+	 *
+	 * This includes the first batch, which means we then need to read the
+	 * tuples from the file, just like for every other batch.
+	 *
+	 * But do that only once, which we detect by (file == NULL).
+	 */
+	if ((curbatch == 0) &&
+		(hashtable->nbatch > HASHJOIN_BATCHES_PER_PHASE) &&
+		(hashtable->outerBatchFile[0] == NULL))
+	{
+		ExecHashJoinRepartitionBatches(outerNode, hjstate);
+
+		/* make sure to reset the first batch to start reading */
+		if (hashtable->outerBatchFile[0])
+			BufFileSeek(hashtable->outerBatchFile[0], 0, 0, SEEK_SET);
+	}
+
+	/*
+	 * We need to know if this is repartitioned even for later tuples in
+	 * the first batch. We detect that by having the batch file.
+	 */
+	repartitioned = (curbatch == 0) && (hashtable->outerBatchFile[0] != NULL);
+
+	if (curbatch == 0 && !repartitioned)			/* if it is the first pass */
 	{
 		/*
 		 * Check to see if first outer tuple was already fetched by
@@ -1120,6 +1249,23 @@ ExecParallelHashJoinOuterGetTuple(PlanState *outerNode,
 	return NULL;
 }
 
+/* XXX this is very expensive, we have to walk long arrays often */
+static void
+ExecHashJoinFreeBuffers(HashJoinTable hashtable)
+{
+	if ((!hashtable->innerBatchFile) && (!hashtable->outerBatchFile))
+		return;
+
+	for (int i = hashtable->curbatch; i < hashtable->nbatch; i++)
+	{
+		if (hashtable->innerBatchFile[i])
+			BufFileFreeBuffer(hashtable->innerBatchFile[i]);
+
+		if (hashtable->outerBatchFile[i])
+			BufFileFreeBuffer(hashtable->outerBatchFile[i]);
+	}
+}
+
 /*
  * ExecHashJoinNewBatch
  *		switch to a new hashjoin batch
@@ -1139,6 +1285,8 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
 	nbatch = hashtable->nbatch;
 	curbatch = hashtable->curbatch;
 
+	ExecHashJoinFreeBuffers(hashtable);
+
 	if (curbatch > 0)
 	{
 		/*
@@ -1398,6 +1546,20 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
 	return false;
 }
 
+static int
+ExecHashCountBuffers(BufFile **files, int nfiles)
+{
+	int cnt = 0;
+
+	for (int i = 0; i < nfiles; i++)
+	{
+		if (files[i] && BufFileHasBuffer(files[i]))
+			cnt++;
+	}
+
+	return cnt;
+}
+
 /*
  * ExecHashJoinSaveTuple
  *		save a tuple to a batch file.
@@ -1438,6 +1600,13 @@ ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
 		*fileptr = file;
 
 		MemoryContextSwitchTo(oldctx);
+
+		if (false)
+		{
+			int	ninner = ExecHashCountBuffers(hashtable->innerBatchFile, hashtable->nbatch);
+			int	nouter = ExecHashCountBuffers(hashtable->outerBatchFile, hashtable->nbatch);
+			elog(WARNING, "curbatch %d  inner %d  outer %d", hashtable->curbatch, ninner, nouter);
+		}
 	}
 
 	BufFileWrite(file, &hashvalue, sizeof(uint32));
diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c
index 6449f82a72b..e155976cc23 100644
--- a/src/backend/storage/file/buffile.c
+++ b/src/backend/storage/file/buffile.c
@@ -100,7 +100,8 @@ struct BufFile
 	 * XXX Should ideally us PGIOAlignedBlock, but might need a way to avoid
 	 * wasting per-file alignment padding when some users create many files.
 	 */
-	PGAlignedBlock buffer;
+	MemoryContext	bufctx;
+	PGAlignedBlock *buffer;
 };
 
 static BufFile *makeBufFileCommon(int nfiles);
@@ -128,6 +129,9 @@ makeBufFileCommon(int nfiles)
 	file->pos = 0;
 	file->nbytes = 0;
 
+	file->bufctx = CurrentMemoryContext;
+	file->buffer = NULL;
+
 	return file;
 }
 
@@ -423,6 +427,33 @@ BufFileClose(BufFile *file)
 	pfree(file);
 }
 
+static void
+BufFileAllocBuffer(BufFile *file)
+{
+	if (file->buffer != NULL)
+		return;
+
+	file->buffer = MemoryContextAlloc(file->bufctx, sizeof(PGAlignedBlock));
+}
+
+void
+BufFileFreeBuffer(BufFile *file)
+{
+	if (file->buffer == NULL)
+		return;
+
+	BufFileFlush(file);
+
+	pfree(file->buffer);
+	file->buffer = NULL;
+}
+
+bool
+BufFileHasBuffer(BufFile *file)
+{
+	return (file->buffer != NULL);
+}
+
 /*
  * BufFileLoadBuffer
  *
@@ -454,12 +485,14 @@ BufFileLoadBuffer(BufFile *file)
 	else
 		INSTR_TIME_SET_ZERO(io_start);
 
+	BufFileAllocBuffer(file);
+
 	/*
 	 * Read whatever we can get, up to a full bufferload.
 	 */
 	file->nbytes = FileRead(thisfile,
-							file->buffer.data,
-							sizeof(file->buffer),
+							file->buffer->data,
+							sizeof(file->buffer->data),
 							file->curOffset,
 							WAIT_EVENT_BUFFILE_READ);
 	if (file->nbytes < 0)
@@ -535,7 +568,7 @@ BufFileDumpBuffer(BufFile *file)
 			INSTR_TIME_SET_ZERO(io_start);
 
 		bytestowrite = FileWrite(thisfile,
-								 file->buffer.data + wpos,
+								 file->buffer->data + wpos,
 								 bytestowrite,
 								 file->curOffset,
 								 WAIT_EVENT_BUFFILE_WRITE);
@@ -616,7 +649,7 @@ BufFileReadCommon(BufFile *file, void *ptr, size_t size, bool exact, bool eofOK)
 			nthistime = size;
 		Assert(nthistime > 0);
 
-		memcpy(ptr, file->buffer.data + file->pos, nthistime);
+		memcpy(ptr, file->buffer->data + file->pos, nthistime);
 
 		file->pos += nthistime;
 		ptr = (char *) ptr + nthistime;
@@ -679,6 +712,8 @@ BufFileWrite(BufFile *file, const void *ptr, size_t size)
 
 	Assert(!file->readOnly);
 
+	BufFileAllocBuffer(file);
+
 	while (size > 0)
 	{
 		if (file->pos >= BLCKSZ)
@@ -700,7 +735,7 @@ BufFileWrite(BufFile *file, const void *ptr, size_t size)
 			nthistime = size;
 		Assert(nthistime > 0);
 
-		memcpy(file->buffer.data + file->pos, ptr, nthistime);
+		memcpy(file->buffer->data + file->pos, ptr, nthistime);
 
 		file->dirty = true;
 		file->pos += nthistime;
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 67ae89c8257..1ba7a6d1b88 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -322,9 +322,11 @@ typedef struct HashJoinTableData
 	int			nbatch;			/* number of batches */
 	int			curbatch;		/* current batch #; 0 during 1st pass */
 
+	int			nbatch_maximum;		/* maximum number of batches to create */
 	int			nbatch_original;	/* nbatch when we started inner scan */
 	int			nbatch_outstart;	/* nbatch when we started outer scan */
 
+	bool		tooManyBatches;	/* did we generate too many batches? */
 	bool		growEnabled;	/* flag to shut off nbatch increases */
 
 	double		totalTuples;	/* # tuples obtained from inner plan */
@@ -362,4 +364,7 @@ typedef struct HashJoinTableData
 	dsa_pointer current_chunk_shared;
 } HashJoinTableData;
 
+
+#define HASHJOIN_BATCHES_PER_PHASE	128
+
 #endif							/* HASHJOIN_H */
diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h
index 6b92668d90f..7dd2f394c0d 100644
--- a/src/include/storage/buffile.h
+++ b/src/include/storage/buffile.h
@@ -55,5 +55,7 @@ extern BufFile *BufFileOpenFileSet(FileSet *fileset, const char *name,
 extern void BufFileDeleteFileSet(FileSet *fileset, const char *name,
 								 bool missing_ok);
 extern void BufFileTruncateFileSet(BufFile *file, int fileno, off_t offset);
+extern void BufFileFreeBuffer(BufFile *file);
+extern bool BufFileHasBuffer(BufFile *file);
 
 #endif							/* BUFFILE_H */
