From 24ffd790ec0e1bd05f426aaf7a27a5380c132432 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas@vondra.me>
Date: Mon, 1 Jun 2026 14:22:36 +0200
Subject: [PATCH v2 2/2] Using Bloom filters for parallel hash joins

Extends the usage of Bloom filters to parallel hash joins too. Overall
it works very similarly (when the filter is built/used, and the adaptive
behavior managing that) to the serial case.

The main difference is that with parallel hash join, the filter is
placed in allocated in DSA, so that it can be shared by all workers
participating in the join.

While building the filter, workers are building a filter in their
private memory. Once the hash table build is complete, the private
filters are merged into the shared filter. After that, the workers
probe the shared filter.
---
 src/backend/commands/explain.c                |  61 +++-
 src/backend/executor/nodeHash.c               | 297 +++++++++++++++++-
 src/backend/executor/nodeHashjoin.c           |  12 +-
 src/backend/lib/bloomfilter.c                 | 127 ++++++++
 src/include/executor/hashjoin.h               |  16 +
 src/include/lib/bloomfilter.h                 |   9 +
 src/test/regress/expected/join_hash_bloom.out | 145 +++++++++
 src/test/regress/sql/join_hash_bloom.sql      |  46 +++
 8 files changed, 695 insertions(+), 18 deletions(-)

diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 1b3a3579df9..7c24be797e2 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -3433,6 +3433,61 @@ show_hash_info(HashState *hashstate, ExplainState *es)
 											  worker_hi->nbatch_original);
 			hinstrument.space_peak = Max(hinstrument.space_peak,
 										 worker_hi->space_peak);
+
+			/*
+			 * In a parallel-aware hash join each worker probes its own outer
+			 * tuples, so the probe and match counts are summed.
+			 */
+			hinstrument.bloom_nprobes += worker_hi->bloom_nprobes;
+			hinstrument.bloom_nmatches += worker_hi->bloom_nmatches;
+			hinstrument.hash_nlookups += worker_hi->hash_nlookups;
+			hinstrument.hash_nmatches += worker_hi->hash_nmatches;
+
+			ExplainOpenWorker(i, es);
+
+			if (es->verbose)
+			{
+				if (worker_hi->hash_nlookups > 0)
+				{
+					if (es->format == EXPLAIN_FORMAT_TEXT)
+					{
+						ExplainIndentText(es);
+						appendStringInfo(es->str,
+										 "Hash Lookups: " INT64_FORMAT "  Matches: " INT64_FORMAT "  Match Rate: %.3f%%\n",
+										 worker_hi->hash_nlookups,
+										 worker_hi->hash_nmatches,
+										 (100.0 * worker_hi->hash_nmatches / worker_hi->hash_nlookups));
+					}
+				}
+
+				if (worker_hi->bloom_nprobes > 0)
+				{
+					if (es->format == EXPLAIN_FORMAT_TEXT)
+					{
+						ExplainIndentText(es);
+						appendStringInfo(es->str,
+										 "Bloom Filter Probes: " INT64_FORMAT "  Matches: " INT64_FORMAT "  Match Rate: %.3f%%\n",
+										 worker_hi->bloom_nprobes,
+										 worker_hi->bloom_nmatches,
+										 (100.0 * worker_hi->bloom_nmatches / worker_hi->bloom_nprobes));
+					}
+				}
+			}
+
+			ExplainCloseWorker(i, es);
+
+
+			/*
+			 * The Bloom filter dimensions and false positive rate describe the
+			 * (shared) filter itself rather than per-worker counters, so they
+			 * are identical across participants; just keep any non-zero value.
+			 */
+			hinstrument.bloom_nbytes = Max(hinstrument.bloom_nbytes,
+										   worker_hi->bloom_nbytes);
+			hinstrument.bloom_nhashfuncs = Max(hinstrument.bloom_nhashfuncs,
+											   worker_hi->bloom_nhashfuncs);
+			hinstrument.bloom_false_positive_rate = Max(hinstrument.bloom_false_positive_rate,
+														worker_hi->bloom_false_positive_rate);
 		}
 	}
 
@@ -3497,7 +3552,7 @@ show_hash_info(HashState *hashstate, ExplainState *es)
 			ExplainPropertyFloat("Hash Match Rate", NULL,
 								 (100.0 * match_rate), 3, es);
 		}
-		else
+		else if (hinstrument.hash_nlookups > 0)
 		{
 			ExplainIndentText(es);
 			appendStringInfo(es->str,
@@ -3539,7 +3594,7 @@ show_hash_info(HashState *hashstate, ExplainState *es)
 			ExplainPropertyFloat("False Positive Rate", NULL,
 								 100.0 * hinstrument.bloom_false_positive_rate, 3, es);
 
-			if (es->analyze)
+			if (es->analyze && es->verbose)
 			{
 				ExplainPropertyInteger("Probes", NULL,
 									   hinstrument.bloom_nprobes, es);
@@ -3560,7 +3615,7 @@ show_hash_info(HashState *hashstate, ExplainState *es)
 							 hinstrument.bloom_nhashfuncs,
 							 100.0 * hinstrument.bloom_false_positive_rate);
 
-			if (es->analyze)
+			if (es->analyze && es->verbose)
 			{
 				ExplainIndentText(es);
 				appendStringInfo(es->str,
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 442beee7b70..1cb141703a2 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -83,6 +83,9 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable,
 static void ExecParallelHashMergeCounters(HashJoinTable hashtable);
 static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable);
 
+static void ExecParallelHashInitBloomFilter(HashJoinTable hashtable, int64 nelems);
+static void ExecParallelHashBuildBloomFilter(HashJoinTable hashtable);
+
 /*
  * Bloom filters
  *
@@ -391,6 +394,15 @@ MultiExecParallelHash(HashState *node)
 				ExecParallelHashIncreaseNumBuckets(hashtable);
 			ExecParallelHashEnsureBatchAccessors(hashtable);
 			ExecParallelHashTableSetCurrentBatch(hashtable, 0);
+
+			/*
+			 * When in a multi-batch case, we want to build a shared filter from
+			 * the very beginning. So create per-worker filters with matching
+			 * parameters, and we'll merge them at the end of the build.
+			 */
+			if (pstate->bloom_nelems > 0)
+				ExecParallelHashInitBloomFilter(hashtable, pstate->bloom_nelems);
+
 			for (;;)
 			{
 				bool		isnull;
@@ -412,6 +424,11 @@ MultiExecParallelHash(HashState *node)
 					/* normal case with a non-null join key */
 					ExecParallelHashTableInsert(hashtable, slot, hashvalue);
 					hashtable->reportTuples++;
+
+					/* add the hash to the private Bloom filter */
+					if (hashtable->bloomFilter != NULL)
+						bloom_add_element(hashtable->bloomFilter,
+										  (unsigned char *) &hashvalue, sizeof(uint32));
 				}
 				else if (node->keep_null_tuples)
 				{
@@ -437,6 +454,35 @@ MultiExecParallelHash(HashState *node)
 			 */
 			ExecParallelHashMergeCounters(hashtable);
 
+			/*
+			 * If we built a private Bloom filter, merge it into the shared
+			 * filter now, while still holding the build barrier, so that the
+			 * shared filter is complete by the time anyone starts probing.
+			 *
+			 * XXX This covers both the case when we know about batching from
+			 * the beginning of the build (with the shared filter allocated in
+			 * ExecHashTableCreate), and when we start batching only during
+			 * the build (in ExecParallelHashIncreaseNumBatches).
+			 */
+			if (hashtable->bloomFilter != NULL &&
+				DsaPointerIsValid(pstate->bloom_filter))
+			{
+				bloom_filter *shared;
+
+				LWLockAcquire(&pstate->lock, LW_EXCLUSIVE);
+				shared = (bloom_filter *)
+					dsa_get_address(hashtable->area, pstate->bloom_filter);
+				bloom_merge(shared, hashtable->bloomFilter);
+				LWLockRelease(&pstate->lock);
+
+				/*
+				 * Free the private filter, we won't need it anymore, and we
+				 * will set the pointer to the shared one in a bit.
+				 */
+				bloom_free(hashtable->bloomFilter);
+				hashtable->bloomFilter = NULL;
+			}
+
 			BarrierDetach(&pstate->grow_buckets_barrier);
 			BarrierDetach(&pstate->grow_batches_barrier);
 
@@ -469,6 +515,19 @@ MultiExecParallelHash(HashState *node)
 	hashtable->log2_nbuckets = pg_ceil_log2_32(hashtable->nbuckets);
 	hashtable->totalTuples = pstate->total_tuples;
 
+	/*
+	 * In the multi-batch case, set the pointer at the filter in shared memory,
+	 * so that the workers can probe it directly. In the single-batch case the
+	 * filter may get built adaptively later, during probing (and we'll have to
+	 * do this then).
+	 */
+	if (pstate->bloom_state == PHJ_BLOOM_BUILT &&
+		DsaPointerIsValid(pstate->bloom_filter))
+	{
+		hashtable->bloomFilter = (bloom_filter *)
+			dsa_get_address(hashtable->area, pstate->bloom_filter);
+	}
+
 	/*
 	 * Unless we're completely done and the batch state has been freed, make
 	 * sure we have accessors.
@@ -742,6 +801,38 @@ ExecHashTableCreate(HashState *state)
 			 */
 			pstate->nbuckets = nbuckets;
 			ExecParallelHashTableAlloc(hashtable, 0);
+
+			/*
+			 * If we already know we'll need more than one batch, set up a
+			 * shared Bloom filter right away so that it includes all inner
+			 * inner tuples. Each worker builds a private filter while hashing
+			 * and merges it into this one at the end of the build (see
+			 * MultiExecParallelHash).
+			 *
+			 * We size it from the planner's estimate so that all the filters
+			 * have matching dimensions (which is required for merging).
+			 *
+			 * XXX Minimum set to 1000 tuples. Maybe we should use a multiple
+			 * of rows, to handle misestimates better? But Bloom filters degrade
+			 * smoothly, so that's probably fine. We don't want the filters to
+			 * get too large - once it exceeds CPU caches, it gets much slower.
+			 * In the worst case, we'll adaptively disable the filter once the
+			 * false positive rate gets too high (too many probes matching).
+			 */
+			if (nbatch > 1 && enable_hashjoin_bloom)
+			{
+				pstate->bloom_nelems = Max((int64) rows, 1000);
+				pstate->bloom_filter =
+					dsa_allocate(hashtable->area,
+								 bloom_estimate_custom(pstate->bloom_nelems,
+													   work_mem,
+													   BLOOM_MIN_FILTER_SIZE));
+				bloom_init_custom(dsa_get_address(hashtable->area,
+												  pstate->bloom_filter),
+								  pstate->bloom_nelems, work_mem,
+								  BLOOM_MIN_FILTER_SIZE, 0);
+				pstate->bloom_state = PHJ_BLOOM_BUILT;
+			}
 		}
 
 		/*
@@ -910,8 +1001,13 @@ ExecHashBloomReject(HashJoinTable hashtable, uint32 hashvalue)
 	/*
 	 * Ignore the filter after processing the first batch (all tuples spilled
 	 * to temporary files already went through the check).
+	 *
+	 * XXX With parallel joins we need to allow (curbatch > 0), because it works
+	 * differently with batches (compared to serial builds). For more details ses
+	 * ExecParallelHashJoinOuterGetTuple. Without this we'd fail to probe the
+	 * filter from some workers (for many outer tuples).
 	 */
-	if (hashtable->curbatch != 0)
+	if (hashtable->curbatch != 0 && !hashtable->parallel_state)
 		return false;
 
 	/* If there's no filter, all tuples should pass. */
@@ -993,6 +1089,8 @@ ExecHashBloomSamplingUpdate(HashJoinTable hashtable, bool match)
 void
 ExecHashBloomAccountLookup(HashJoinTable hashtable)
 {
+	ParallelHashJoinState *pstate = hashtable->parallel_state;
+
 	hashtable->hashMatches++;
 
 	/* Bail out if Bloom filters are disabled. */
@@ -1003,12 +1101,16 @@ ExecHashBloomAccountLookup(HashJoinTable hashtable)
 	if (hashtable->bloomFilter != NULL)
 		return;
 
-	/* We can't build filters for parallel hash joins. */
-	if (hashtable->parallel_state != NULL)
+	/* All batched runs should have a filter created automatically. */
+	Assert(hashtable->nbatch == 1);
+
+	/* haven't collected enough probe samples yet */
+	if ((hashtable->hashLookups % BLOOM_BUILD_WINDOW) != 0)
 		return;
 
-	/* All serial batched runs should have a filter created automatically. */
-	Assert(hashtable->nbatch == 1);
+	/* have enough samples, but there are too many matches */
+	if (hashtable->hashMatches > hashtable->hashLookups * BLOOM_BUILD_THRESHOLD)
+		return;
 
 	/*
 	 * Build a filter if the hash table lookups found sufficiently few matches
@@ -1018,11 +1120,40 @@ ExecHashBloomAccountLookup(HashJoinTable hashtable)
 	 * would mean we look at individual windows, while now we look at the whole
 	 * history of lookups. Not sure if one of these is a "more right".
 	 */
-	if (((hashtable->hashLookups % BLOOM_BUILD_WINDOW) == 0) &&
-		(hashtable->hashMatches < hashtable->hashLookups * BLOOM_BUILD_THRESHOLD))
+	if (!pstate)
 	{
+		/* serial join */
 		ExecHashBuildBloomFilter(hashtable);
 	}
+	else
+	{
+		/*
+		 * Parallel join: coordinate so that only one backend builds the shared
+		 * filter (by scanning the current hash table). The first backend to finish
+		 * its sample makes the decision for everyone; later backends simply observe
+		 * the result.
+		 *
+		 * The workers may hit the adaptive build threshold at different times (or
+		 * maybe some workers may not hit it at all), in which case the worker will
+		 * not know about the filter and won't probe it. But that's fine - it's up
+		 * to the worker to decide whether to probe or not (based on it's local
+		 * stats). We're not adding items to the hash table, so this can't cause
+		 * missing data or anything like that.
+		 */
+		LWLockAcquire(&pstate->lock, LW_EXCLUSIVE);
+
+		if (pstate->bloom_state == PHJ_BLOOM_NONE)
+		{
+			ExecParallelHashBuildBloomFilter(hashtable);
+		}
+		else if (pstate->bloom_state == PHJ_BLOOM_BUILT)
+		{
+			hashtable->bloomFilter = (bloom_filter *)
+					dsa_get_address(hashtable->area, pstate->bloom_filter);
+		}
+
+		LWLockRelease(&pstate->lock);
+	}
 }
 
 /*
@@ -1361,6 +1492,73 @@ ExecHashTableDestroy(HashJoinTable hashtable)
 	pfree(hashtable);
 }
 
+/*
+ * ExecParallelHashInitBloomFilter
+ *		Allocate an empty Bloom filter for this hash table.
+ *
+ * The filter is allocated in the long-lived hashCxt so that it survives
+ * per-batch resets.  "nelems" is an estimate of the number of inner tuples,
+ * used to size the filter; it should be computed identically by every
+ * participant of a Parallel Hash join so that local filters can be merged.
+ */
+static void
+ExecParallelHashInitBloomFilter(HashJoinTable hashtable, int64 nelems)
+{
+	MemoryContext oldcxt;
+
+	Assert(hashtable->bloomFilter == NULL);
+
+	oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
+	hashtable->bloomFilter = bloom_create_custom(nelems, work_mem,
+												 BLOOM_MIN_FILTER_SIZE, 0);
+	MemoryContextSwitchTo(oldcxt);
+}
+
+/*
+ * ExecParallelHashBuildBloomFilter
+ *		Build the shared Bloom filter for a parallel single-batch hash table.
+ *
+ * Called with pstate->lock held by a single backend that has decided the join
+ * is selective enough to benefit.  The completed shared hash table is scanned
+ * to populate the filter, which is then published for all backends to probe.
+ */
+static void
+ExecParallelHashBuildBloomFilter(HashJoinTable hashtable)
+{
+	ParallelHashJoinState *pstate = hashtable->parallel_state;
+	bloom_filter *shared;
+	int			i;
+
+	Assert(hashtable->nbatch == 1);
+	Assert(pstate->bloom_state == PHJ_BLOOM_NONE);
+	Assert(!DsaPointerIsValid(pstate->bloom_filter));
+
+	pstate->bloom_nelems = Max((int64) pstate->total_tuples, 1000);
+	pstate->bloom_filter =
+		dsa_allocate(hashtable->area,
+					 bloom_estimate_custom(pstate->bloom_nelems, work_mem,
+										   BLOOM_MIN_FILTER_SIZE));
+	shared = bloom_init_custom(dsa_get_address(hashtable->area,
+											   pstate->bloom_filter),
+							   pstate->bloom_nelems, work_mem,
+							   BLOOM_MIN_FILTER_SIZE, 0);
+
+	for (i = 0; i < hashtable->nbuckets; i++)
+	{
+		HashJoinTuple tuple = ExecParallelHashFirstTuple(hashtable, i);
+
+		while (tuple != NULL)
+		{
+			bloom_add_element(shared,
+							  (unsigned char *) &tuple->hashvalue,
+							  sizeof(uint32));
+			tuple = ExecParallelHashNextTuple(hashtable, tuple);
+		}
+	}
+
+	pstate->bloom_state = PHJ_BLOOM_BUILT;
+}
+
 /*
  * Consider adjusting the allowed hash table size, depending on the number
  * of batches, to minimize the overall memory usage (for both the hashtable
@@ -1699,6 +1897,49 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
 					for (i = 0; i < new_nbuckets; ++i)
 						dsa_pointer_atomic_init(&buckets[i], InvalidDsaPointer);
 					pstate->nbuckets = new_nbuckets;
+
+					/*
+					 * Create the new shared filter (we'll create the new private
+					 * per-worker filters in ExecParallelHashRepartitionFirst).
+					 *
+					 * sizing: We assume we've only seen 1/2 the tuples so far.
+					 * We clearly expected fewer tuples (otherwise we'd choose
+					 * batching right away), but maybe we should use a bigger
+					 * factor? We can easily be off by an order of magniture.
+					 * OTOH we don't want to overdo it, oversized filters get
+					 * somewhat useless, especially once larger than CPU cache.
+					 *
+					 * Also, we have a per-worker count. Let's assume workers
+					 * saw the same number.
+					 *
+					 * XXX Is there a bettew way to estimate the number of tuples
+					 * we'll see for the inner relation?
+					 *
+					 * XXX This might be a good fit for scalable Bloom filters
+					 * (or some other type of filter?)
+					 */
+					if (enable_hashjoin_bloom)
+					{
+						/*
+						 * Double the number of tuples we saw so far (in the
+						 * only batch we have). Calculate total for all workers
+						 * participating in the join.
+						 */
+						pstate->bloom_nelems = (old_batch0->ntuples * 2) *
+									hashtable->parallel_state->nparticipants;
+
+						pstate->bloom_filter =
+							dsa_allocate(hashtable->area,
+										 bloom_estimate_custom(pstate->bloom_nelems,
+															   work_mem,
+															   BLOOM_MIN_FILTER_SIZE));
+						bloom_init_custom(dsa_get_address(hashtable->area,
+														  pstate->bloom_filter),
+										  pstate->bloom_nelems, work_mem,
+										  BLOOM_MIN_FILTER_SIZE, 0);
+						pstate->bloom_state = PHJ_BLOOM_BUILT;
+					}
+
 				}
 				else
 				{
@@ -1822,8 +2063,25 @@ ExecParallelHashRepartitionFirst(HashJoinTable hashtable)
 	dsa_pointer chunk_shared;
 	HashMemoryChunk chunk;
 
+	/*
+	 * If starting to batch (from nbatch=1), we need to create a local filter
+	 * and populate it with entries in each worker.
+	 */
+	bool build_filter = enable_hashjoin_bloom &&
+			(hashtable->parallel_state->old_nbatch == 1);
+
 	Assert(hashtable->nbatch == hashtable->parallel_state->nbatch);
 
+	/*
+	 * Build filter with the same parameters as the shared filter created in
+	 * ExecParallelHashIncreaseNumBatches (so that we can merge them later).
+	 */
+	if (build_filter)
+	{
+		ExecParallelHashInitBloomFilter(hashtable,
+										hashtable->parallel_state->bloom_nelems);
+	}
+
 	while ((chunk = ExecParallelHashPopChunkQueue(hashtable, &chunk_shared)))
 	{
 		size_t		idx = 0;
@@ -1841,6 +2099,12 @@ ExecParallelHashRepartitionFirst(HashJoinTable hashtable)
 			ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
 									  &bucketno, &batchno);
 
+			/* insert everything into the filter */
+			if (build_filter)
+				bloom_add_element(hashtable->bloomFilter,
+								  (unsigned char *) &hashTuple->hashvalue,
+								  sizeof(uint32));
+
 			Assert(batchno < hashtable->nbatch);
 			if (batchno == 0)
 			{
@@ -3324,17 +3588,22 @@ ExecHashAccumInstrumentation(HashInstrumentation *instrument,
 		instrument->bloom_nbytes = bloom_total_bits(hashtable->bloomFilter) / BITS_PER_BYTE;
 		instrument->bloom_false_positive_rate =
 			bloom_false_positive_rate(hashtable->bloomFilter);
-		instrument->bloom_nprobes = hashtable->bloomProbes;
-		instrument->bloom_nmatches = hashtable->bloomMatches;
 	}
 
 	/*
-	 * Record hash-table probe statistics.
-	 *
-	 * XXX Shouldn't this use Max(), just like the earlier block?
+	 * Bloom filter probe and match counts are cumulative, so sum them across
+	 * successive hash table instances (e.g. rescans) rather than taking the
+	 * maximum.
+	 */
+	instrument->bloom_nprobes += hashtable->bloomProbes;
+	instrument->bloom_nmatches += hashtable->bloomMatches;
+
+	/*
+	 * Hash table lookup and match counts are cumulative as well, so sum them
+	 * across successive hash table instances (e.g. rescans).
 	 */
-	instrument->hash_nlookups = hashtable->hashLookups;
-	instrument->hash_nmatches = hashtable->hashMatches;
+	instrument->hash_nlookups += hashtable->hashLookups;
+	instrument->hash_nmatches += hashtable->hashMatches;
 }
 
 /*
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index db14cf98f9b..5f87e948dce 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -509,7 +509,7 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
 				 * HJ_FILL_OUTER_TUPLE emits a null-extended row for outer joins
 				 * and simply discards the tuple otherwise.
 				 */
-				if (!parallel && ExecHashBloomReject(hashtable, hashvalue))
+				if (ExecHashBloomReject(hashtable, hashvalue))
 				{
 					node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
 					continue;
@@ -1914,6 +1914,9 @@ ExecHashJoinInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
 	pg_atomic_init_u32(&pstate->distributor, 0);
 	pstate->nparticipants = pcxt->nworkers + 1;
 	pstate->total_tuples = 0;
+	pstate->bloom_filter = InvalidDsaPointer;
+	pstate->bloom_state = PHJ_BLOOM_NONE;
+	pstate->bloom_nelems = 0;
 	LWLockInitialize(&pstate->lock,
 					 LWTRANCHE_PARALLEL_HASH_JOIN);
 	BarrierInit(&pstate->build_barrier, 0);
@@ -1985,6 +1988,13 @@ ExecHashJoinReInitializeDSM(HashJoinState *state, ParallelContext *pcxt)
 
 	/* Reset build_barrier to PHJ_BUILD_ELECT so we can go around again. */
 	BarrierInit(&pstate->build_barrier, 0);
+
+	/* Free any shared Bloom filter from the previous scan and reset state. */
+	if (DsaPointerIsValid(pstate->bloom_filter))
+		dsa_free(state->js.ps.state->es_query_dsa, pstate->bloom_filter);
+	pstate->bloom_filter = InvalidDsaPointer;
+	pstate->bloom_state = PHJ_BLOOM_NONE;
+	pstate->bloom_nelems = 0;
 }
 
 void
diff --git a/src/backend/lib/bloomfilter.c b/src/backend/lib/bloomfilter.c
index bb04aa600e8..b4c2b7b8315 100644
--- a/src/backend/lib/bloomfilter.c
+++ b/src/backend/lib/bloomfilter.c
@@ -60,10 +60,137 @@ struct bloom_filter
 
 static int	my_bloom_power(uint64 target_bitset_bits);
 static int	optimal_k(uint64 bitset_bits, int64 total_elems);
+static uint64 bloom_bitset_bytes(int64 total_elems, int bloom_work_mem,
+								 size_t min_filter_size);
 static void k_hashes(bloom_filter *filter, uint32 *hashes, unsigned char *elem,
 					 size_t len);
 static inline uint32 mod_m(uint32 val, uint64 m);
 
+/*
+ * Determine the size of the bitset (in bytes) that bloom_create()/bloom_init()
+ * will use for the given parameters.  The bitset is always a power-of-two
+ * number of bits; see bloom_create() for the rationale behind the sizing.
+ *
+ * min_filter_size is the minimum size of the bitset, in bytes.  The bitset
+ * will never be sized below this, even when the total_elems estimate would
+ * suggest a smaller one.
+ */
+static uint64
+bloom_bitset_bytes(int64 total_elems, int bloom_work_mem, size_t min_filter_size)
+{
+	uint64		bitset_bytes;
+	uint64		bitset_bits;
+	int			bloom_power;
+
+	/*
+	 * Aim for two bytes per element; this is sufficient to get a false
+	 * positive rate below 1%, independent of the size of the bitset or total
+	 * number of elements.  Also, if rounding down the size of the bitset to
+	 * the next lowest power of two turns out to be a significant drop, the
+	 * false positive rate still won't exceed 2% in almost all cases.
+	 */
+	bitset_bytes = Min(bloom_work_mem * UINT64CONST(1024), total_elems * 2);
+	bitset_bytes = Max(min_filter_size, bitset_bytes);
+
+	/*
+	 * Size in bits should be the highest power of two <= target.  bitset_bits
+	 * is uint64 because PG_UINT32_MAX is 2^32 - 1, not 2^32
+	 */
+	bloom_power = my_bloom_power(bitset_bytes * BITS_PER_BYTE);
+	bitset_bits = UINT64CONST(1) << bloom_power;
+	bitset_bytes = bitset_bits / BITS_PER_BYTE;
+
+	return bitset_bytes;
+}
+
+/*
+ * Amount of memory (in bytes) that a Bloom filter sized for the given
+ * parameters occupies, including the fixed-size header.  This lets callers
+ * place a Bloom filter in caller-managed storage (for example shared memory)
+ * with bloom_init().
+ */
+size_t
+bloom_estimate(int64 total_elems, int bloom_work_mem)
+{
+	return bloom_estimate_custom(total_elems, bloom_work_mem,
+								 DEFAULT_MIN_BITSET_BYTES);
+}
+
+/*
+ * Like bloom_estimate(), but the minimum size of the bitset (in bytes) is
+ * provided by the caller instead of the default.  See bloom_create_custom().
+ */
+size_t
+bloom_estimate_custom(int64 total_elems, int bloom_work_mem,
+					  size_t min_filter_size)
+{
+	return offsetof(bloom_filter, bitset) +
+		sizeof(unsigned char) * bloom_bitset_bytes(total_elems, bloom_work_mem,
+												   min_filter_size);
+}
+
+/*
+ * Initialize a Bloom filter in caller-provided memory.
+ *
+ * "ptr" must point to at least bloom_estimate(total_elems, bloom_work_mem)
+ * bytes.  This is useful when the filter must live in memory that the caller
+ * manages itself, such as a DSA allocation shared between parallel workers.
+ *
+ * Two filters initialized with identical total_elems, bloom_work_mem and seed
+ * values share the same dimensions and may be combined with bloom_merge().
+ */
+bloom_filter *
+bloom_init(void *ptr, int64 total_elems, int bloom_work_mem, uint64 seed)
+{
+	return bloom_init_custom(ptr, total_elems, bloom_work_mem,
+							 DEFAULT_MIN_BITSET_BYTES, seed);
+}
+
+/*
+ * Like bloom_init(), but the minimum size of the bitset (in bytes) is provided
+ * by the caller instead of the default.  See bloom_create_custom().
+ */
+bloom_filter *
+bloom_init_custom(void *ptr, int64 total_elems, int bloom_work_mem,
+				  size_t min_bitset_bytes, uint64 seed)
+{
+	bloom_filter *filter = (bloom_filter *) ptr;
+	uint64		bitset_bytes = bloom_bitset_bytes(total_elems, bloom_work_mem,
+												  min_bitset_bytes);
+	uint64		bitset_bits = bitset_bytes * BITS_PER_BYTE;
+
+	filter->k_hash_funcs = optimal_k(bitset_bits, total_elems);
+	filter->seed = seed;
+	filter->m = bitset_bits;
+	memset(filter->bitset, 0, bitset_bytes);
+
+	return filter;
+}
+
+/*
+ * Merge the bits set in "src" into "dst".
+ *
+ * Both filters must have been created with identical dimensions (that is, the
+ * same total_elems, bloom_work_mem and seed values).  After this call "dst"
+ * reports an element as possibly-present if it was possibly-present in either
+ * of the input filters, which is exactly the filter that would have resulted
+ * from adding every element of both filters to a single Bloom filter.
+ */
+void
+bloom_merge(bloom_filter *dst, const bloom_filter *src)
+{
+	uint64		bitset_bytes;
+	uint64		i;
+
+	Assert(dst->m == src->m);
+	Assert(dst->k_hash_funcs == src->k_hash_funcs);
+	Assert(dst->seed == src->seed);
+
+	bitset_bytes = dst->m / BITS_PER_BYTE;
+	for (i = 0; i < bitset_bytes; i++)
+		dst->bitset[i] |= src->bitset[i];
+}
+
 /*
  * Create Bloom filter in caller's memory context.  We aim for a false positive
  * rate of between 1% and 2% when bitset size is not constrained by memory
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 62d43c7dab4..3e932dbb985 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -275,8 +275,24 @@ typedef struct ParallelHashJoinState
 	pg_atomic_uint32 distributor;	/* counter for load balancing */
 
 	SharedFileSet fileset;		/* space for shared temporary files */
+
+	/*
+	 * Shared Bloom filter state.  When a Parallel Hash join uses a Bloom
+	 * filter, the filter lives in the DSA area pointed to by "bloom_filter".
+	 * "bloom_state" coordinates building it (see PHJ_BLOOM_* constants), and
+	 * "bloom_nelems" is the element count estimate used to size it so that
+	 * every worker builds a mergeable local filter of identical dimensions.
+	 */
+	dsa_pointer bloom_filter;	/* shared bloom_filter, or InvalidDsaPointer */
+	int			bloom_state;	/* PHJ_BLOOM_* */
+	int64		bloom_nelems;	/* element estimate used to size the filter */
 } ParallelHashJoinState;
 
+/* Values for ParallelHashJoinState.bloom_state. */
+#define PHJ_BLOOM_NONE			0	/* no decision made yet */
+#define PHJ_BLOOM_BUILT			1	/* shared filter is built and usable */
+#define PHJ_BLOOM_DISABLED		2	/* decided not to use a bloom filter */
+
 /* The phases for building batches, used by build_barrier. */
 #define PHJ_BUILD_ELECT					0
 #define PHJ_BUILD_ALLOCATE				1
diff --git a/src/include/lib/bloomfilter.h b/src/include/lib/bloomfilter.h
index 8b705319f82..dc19bec93da 100644
--- a/src/include/lib/bloomfilter.h
+++ b/src/include/lib/bloomfilter.h
@@ -19,6 +19,15 @@ extern bloom_filter *bloom_create(int64 total_elems, int bloom_work_mem,
 								  uint64 seed);
 extern bloom_filter *bloom_create_custom(int64 total_elems, int bloom_work_mem,
 										 uint64 min_bitset_bytes, uint64 seed);
+extern size_t bloom_estimate(int64 total_elems, int bloom_work_mem);
+extern size_t bloom_estimate_custom(int64 total_elems, int bloom_work_mem,
+									size_t min_filter_size);
+extern bloom_filter *bloom_init(void *ptr, int64 total_elems,
+								int bloom_work_mem, uint64 seed);
+extern bloom_filter *bloom_init_custom(void *ptr, int64 total_elems,
+									   int bloom_work_mem, size_t min_filter_size,
+									   uint64 seed);
+extern void bloom_merge(bloom_filter *dst, const bloom_filter *src);
 extern void bloom_free(bloom_filter *filter);
 extern void bloom_add_element(bloom_filter *filter, unsigned char *elem,
 							  size_t len);
diff --git a/src/test/regress/expected/join_hash_bloom.out b/src/test/regress/expected/join_hash_bloom.out
index c9b5bdc66c9..7396e84a9ab 100644
--- a/src/test/regress/expected/join_hash_bloom.out
+++ b/src/test/regress/expected/join_hash_bloom.out
@@ -174,5 +174,150 @@ EXPLAIN (ANALYZE, VERBOSE, TIMING OFF, COSTS OFF, BUFFERS OFF, SUMMARY OFF) SELE
                Rows Removed by Filter: 4982
 (15 rows)
 
+-- test parallel hash joins
+SET work_mem = '512kB';
+SET max_parallel_workers_per_gather = 2;
+SET parallel_setup_cost = 0;
+SET parallel_tuple_cost = 0;
+ALTER TABLE hash_bloom_fact SET (parallel_workers = 2);
+-- non-selective in-memory hash join does not use Bloom filters
+SET enable_hashjoin_bloom = off;
+EXPLAIN (ANALYZE, TIMING OFF, COSTS OFF, BUFFERS OFF, SUMMARY OFF) SELECT * FROM hash_bloom_fact f JOIN hash_bloom_dimension d ON (f.did = d.id);
+                                     QUERY PLAN                                      
+-------------------------------------------------------------------------------------
+ Gather (actual rows=100000.00 loops=1)
+   Workers Planned: 2
+   Workers Launched: 2
+   ->  Hash Join (actual rows=33333.33 loops=3)
+         Hash Cond: (f.did = d.id)
+         ->  Parallel Seq Scan on hash_bloom_fact f (actual rows=33333.33 loops=3)
+         ->  Hash (actual rows=10000.00 loops=3)
+               Buckets: 16384  Batches: 1  Memory Usage: 920kB
+               ->  Seq Scan on hash_bloom_dimension d (actual rows=10000.00 loops=3)
+(9 rows)
+
+SET enable_hashjoin_bloom = on;
+EXPLAIN (ANALYZE, TIMING OFF, COSTS OFF, BUFFERS OFF, SUMMARY OFF) SELECT * FROM hash_bloom_fact f JOIN hash_bloom_dimension d ON (f.did = d.id);
+                                     QUERY PLAN                                      
+-------------------------------------------------------------------------------------
+ Gather (actual rows=100000.00 loops=1)
+   Workers Planned: 2
+   Workers Launched: 2
+   ->  Hash Join (actual rows=33333.33 loops=3)
+         Hash Cond: (f.did = d.id)
+         ->  Parallel Seq Scan on hash_bloom_fact f (actual rows=33333.33 loops=3)
+         ->  Hash (actual rows=10000.00 loops=3)
+               Buckets: 16384  Batches: 1  Memory Usage: 920kB
+               ->  Seq Scan on hash_bloom_dimension d (actual rows=10000.00 loops=3)
+(9 rows)
+
+-- a selective in-memory join uses a filter (after 1000 lookups)
+SET enable_hashjoin_bloom = off;
+EXPLAIN (ANALYZE, TIMING OFF, COSTS OFF, BUFFERS OFF, SUMMARY OFF) SELECT * FROM hash_bloom_fact f JOIN hash_bloom_dimension d ON (f.did = d.id) WHERE d.r < 0.5;
+                                     QUERY PLAN                                     
+------------------------------------------------------------------------------------
+ Gather (actual rows=50180.00 loops=1)
+   Workers Planned: 2
+   Workers Launched: 2
+   ->  Hash Join (actual rows=16726.67 loops=3)
+         Hash Cond: (f.did = d.id)
+         ->  Parallel Seq Scan on hash_bloom_fact f (actual rows=33333.33 loops=3)
+         ->  Hash (actual rows=5018.00 loops=3)
+               Buckets: 8192  Batches: 1  Memory Usage: 461kB
+               ->  Seq Scan on hash_bloom_dimension d (actual rows=5018.00 loops=3)
+                     Filter: (r < '0.5'::double precision)
+                     Rows Removed by Filter: 4982
+(11 rows)
+
+SET enable_hashjoin_bloom = on;
+EXPLAIN (ANALYZE, TIMING OFF, COSTS OFF, BUFFERS OFF, SUMMARY OFF) SELECT * FROM hash_bloom_fact f JOIN hash_bloom_dimension d ON (f.did = d.id) WHERE d.r < 0.5;
+                                      QUERY PLAN                                       
+---------------------------------------------------------------------------------------
+ Gather (actual rows=50180.00 loops=1)
+   Workers Planned: 2
+   Workers Launched: 2
+   ->  Hash Join (actual rows=16726.67 loops=3)
+         Hash Cond: (f.did = d.id)
+         ->  Parallel Seq Scan on hash_bloom_fact f (actual rows=33333.33 loops=3)
+         ->  Hash (actual rows=5018.00 loops=3)
+               Buckets: 8192  Batches: 1  Memory Usage: 461kB
+               Bloom Filter: Size: 8kB  Hash Functions: 9  False Positive Rate: 0.191%
+               ->  Seq Scan on hash_bloom_dimension d (actual rows=5018.00 loops=3)
+                     Filter: (r < '0.5'::double precision)
+                     Rows Removed by Filter: 4982
+(12 rows)
+
+-- force batching
+SET work_mem = '128kB';
+-- batched join always creates a Bloom filter, but then disables it if
+-- not selective enough
+SET enable_hashjoin_bloom = off;
+EXPLAIN (ANALYZE, TIMING OFF, COSTS OFF, BUFFERS OFF, SUMMARY OFF) SELECT * FROM hash_bloom_fact f JOIN hash_bloom_dimension d ON (f.did = d.id);
+                                     QUERY PLAN                                      
+-------------------------------------------------------------------------------------
+ Gather (actual rows=100000.00 loops=1)
+   Workers Planned: 2
+   Workers Launched: 2
+   ->  Hash Join (actual rows=33333.33 loops=3)
+         Hash Cond: (f.did = d.id)
+         ->  Parallel Seq Scan on hash_bloom_fact f (actual rows=33333.33 loops=3)
+         ->  Hash (actual rows=10000.00 loops=3)
+               Buckets: 4096  Batches: 4  Memory Usage: 229kB
+               ->  Seq Scan on hash_bloom_dimension d (actual rows=10000.00 loops=3)
+(9 rows)
+
+SET enable_hashjoin_bloom = on;
+EXPLAIN (ANALYZE, TIMING OFF, COSTS OFF, BUFFERS OFF, SUMMARY OFF) SELECT * FROM hash_bloom_fact f JOIN hash_bloom_dimension d ON (f.did = d.id);
+                                       QUERY PLAN                                       
+----------------------------------------------------------------------------------------
+ Gather (actual rows=100000.00 loops=1)
+   Workers Planned: 2
+   Workers Launched: 2
+   ->  Hash Join (actual rows=33333.33 loops=3)
+         Hash Cond: (f.did = d.id)
+         ->  Parallel Seq Scan on hash_bloom_fact f (actual rows=33333.33 loops=3)
+         ->  Hash (actual rows=10000.00 loops=3)
+               Buckets: 4096  Batches: 4  Memory Usage: 229kB
+               Bloom Filter: Size: 16kB  Hash Functions: 9  False Positive Rate: 0.187%
+               ->  Seq Scan on hash_bloom_dimension d (actual rows=10000.00 loops=3)
+(10 rows)
+
+-- batched join always creates a Bloom filter, and keeps using it if
+-- selective enough
+SET enable_hashjoin_bloom = off;
+EXPLAIN (ANALYZE, TIMING OFF, COSTS OFF, BUFFERS OFF, SUMMARY OFF) SELECT * FROM hash_bloom_fact f JOIN hash_bloom_dimension d ON (f.did = d.id) WHERE d.r < 0.5;
+                                     QUERY PLAN                                     
+------------------------------------------------------------------------------------
+ Gather (actual rows=50180.00 loops=1)
+   Workers Planned: 2
+   Workers Launched: 2
+   ->  Hash Join (actual rows=16726.67 loops=3)
+         Hash Cond: (f.did = d.id)
+         ->  Parallel Seq Scan on hash_bloom_fact f (actual rows=33333.33 loops=3)
+         ->  Hash (actual rows=5018.00 loops=3)
+               Buckets: 4096  Batches: 2  Memory Usage: 228kB
+               ->  Seq Scan on hash_bloom_dimension d (actual rows=5018.00 loops=3)
+                     Filter: (r < '0.5'::double precision)
+                     Rows Removed by Filter: 4982
+(11 rows)
+
+SET enable_hashjoin_bloom = on;
+EXPLAIN (ANALYZE, TIMING OFF, COSTS OFF, BUFFERS OFF, SUMMARY OFF) SELECT * FROM hash_bloom_fact f JOIN hash_bloom_dimension d ON (f.did = d.id) WHERE d.r < 0.5;
+                                      QUERY PLAN                                       
+---------------------------------------------------------------------------------------
+ Gather (actual rows=50180.00 loops=1)
+   Workers Planned: 2
+   Workers Launched: 2
+   ->  Hash Join (actual rows=16726.67 loops=3)
+         Hash Cond: (f.did = d.id)
+         ->  Parallel Seq Scan on hash_bloom_fact f (actual rows=33333.33 loops=3)
+         ->  Hash (actual rows=5018.00 loops=3)
+               Buckets: 4096  Batches: 2  Memory Usage: 228kB
+               Bloom Filter: Size: 8kB  Hash Functions: 9  False Positive Rate: 0.191%
+               ->  Seq Scan on hash_bloom_dimension d (actual rows=5018.00 loops=3)
+                     Filter: (r < '0.5'::double precision)
+                     Rows Removed by Filter: 4982
+(12 rows)
+
 DROP TABLE hash_bloom_fact;
 DROP TABLE hash_bloom_dimension;
diff --git a/src/test/regress/sql/join_hash_bloom.sql b/src/test/regress/sql/join_hash_bloom.sql
index b62e0b2ed90..139266390a0 100644
--- a/src/test/regress/sql/join_hash_bloom.sql
+++ b/src/test/regress/sql/join_hash_bloom.sql
@@ -52,5 +52,51 @@ EXPLAIN (ANALYZE, VERBOSE, TIMING OFF, COSTS OFF, BUFFERS OFF, SUMMARY OFF) SELE
 SET enable_hashjoin_bloom = on;
 EXPLAIN (ANALYZE, VERBOSE, TIMING OFF, COSTS OFF, BUFFERS OFF, SUMMARY OFF) SELECT * FROM hash_bloom_fact f JOIN hash_bloom_dimension d ON (f.did = d.id) WHERE d.r < 0.5;
 
+-- test parallel hash joins
+SET work_mem = '512kB';
+SET max_parallel_workers_per_gather = 2;
+SET parallel_setup_cost = 0;
+SET parallel_tuple_cost = 0;
+
+ALTER TABLE hash_bloom_fact SET (parallel_workers = 2);
+
+-- non-selective in-memory hash join does not use Bloom filters
+
+SET enable_hashjoin_bloom = off;
+EXPLAIN (ANALYZE, TIMING OFF, COSTS OFF, BUFFERS OFF, SUMMARY OFF) SELECT * FROM hash_bloom_fact f JOIN hash_bloom_dimension d ON (f.did = d.id);
+
+SET enable_hashjoin_bloom = on;
+EXPLAIN (ANALYZE, TIMING OFF, COSTS OFF, BUFFERS OFF, SUMMARY OFF) SELECT * FROM hash_bloom_fact f JOIN hash_bloom_dimension d ON (f.did = d.id);
+
+-- a selective in-memory join uses a filter (after 1000 lookups)
+
+SET enable_hashjoin_bloom = off;
+EXPLAIN (ANALYZE, TIMING OFF, COSTS OFF, BUFFERS OFF, SUMMARY OFF) SELECT * FROM hash_bloom_fact f JOIN hash_bloom_dimension d ON (f.did = d.id) WHERE d.r < 0.5;
+
+SET enable_hashjoin_bloom = on;
+EXPLAIN (ANALYZE, TIMING OFF, COSTS OFF, BUFFERS OFF, SUMMARY OFF) SELECT * FROM hash_bloom_fact f JOIN hash_bloom_dimension d ON (f.did = d.id) WHERE d.r < 0.5;
+
+-- force batching
+SET work_mem = '128kB';
+
+-- batched join always creates a Bloom filter, but then disables it if
+-- not selective enough
+
+SET enable_hashjoin_bloom = off;
+EXPLAIN (ANALYZE, TIMING OFF, COSTS OFF, BUFFERS OFF, SUMMARY OFF) SELECT * FROM hash_bloom_fact f JOIN hash_bloom_dimension d ON (f.did = d.id);
+
+SET enable_hashjoin_bloom = on;
+EXPLAIN (ANALYZE, TIMING OFF, COSTS OFF, BUFFERS OFF, SUMMARY OFF) SELECT * FROM hash_bloom_fact f JOIN hash_bloom_dimension d ON (f.did = d.id);
+
+-- batched join always creates a Bloom filter, and keeps using it if
+-- selective enough
+
+SET enable_hashjoin_bloom = off;
+EXPLAIN (ANALYZE, TIMING OFF, COSTS OFF, BUFFERS OFF, SUMMARY OFF) SELECT * FROM hash_bloom_fact f JOIN hash_bloom_dimension d ON (f.did = d.id) WHERE d.r < 0.5;
+
+SET enable_hashjoin_bloom = on;
+EXPLAIN (ANALYZE, TIMING OFF, COSTS OFF, BUFFERS OFF, SUMMARY OFF) SELECT * FROM hash_bloom_fact f JOIN hash_bloom_dimension d ON (f.did = d.id) WHERE d.r < 0.5;
+
+
 DROP TABLE hash_bloom_fact;
 DROP TABLE hash_bloom_dimension;
-- 
2.54.0

