From 903311f0c1a494f530dfa7dba92edad518eb9329 Mon Sep 17 00:00:00 2001 From: Matheus Alcantara Date: Thu, 2 Jul 2026 16:29:26 -0300 Subject: [PATCH v4 4/4] Allow a CustomScan to consume a pushed-down hashjoin Bloom filter Extend the hashjoin Bloom-filter pushdown so that a base-relation CustomScan can build filter-aware paths, be chosen as a filter recipient, and consume the filter inside its own scan loop The whole mechanism is gated on a new opt-in path flag, CUSTOMPATH_SUPPORT_BLOOM_FILTERS; providers that do not set it, and heap, are unaffected. The core of this change is teaching path generation about the custom scan: * generate_expected_filter_paths()/create_filtered_scan_path() now clone a base-relation CustomPath into filter-bearing variants when the path advertised CUSTOMPATH_SUPPORT_BLOOM_FILTERS, exactly as they already do for the stock scan path types. * find_bloom_filter_recipient() treats a base-rel CustomScan (scanrelid > 0) that carries the flag the same as a SeqScan. The probe is not wired into ExecScanExtended() -- a CustomScan dispatches to the provider's ExecCustomScan -- so the provider calls ExecBloomFilters() itself at whatever granularity it supports. ExecInitCustomScan() compiles the probe state up front via ExecInitBloomFilters(), and set_customscan_references() fixes the pushed key expressions for a base-relation scan just like the scan qual. In addition, an opted-in recipient gets two things a stock scan does not: * Per-key filters. Alongside the combined-hash filter, the hash join optionally builds one Bloom filter per join key. The combined filter, keyed on the hash of all keys together, stays the default and remains the more selective one for a per-row probe: per-key filters only test whether each column's value appears somewhere in the build side, so on a multi-column join they are strictly weaker. What they enable is testing a single key column on its own (a column store can check one column against its per-column dictionary or zone map and skip whole row groups before decompression, which the combined filter cannot support). The build reuses per-key inner hash ExprStates; the extra CPU and memory are paid only by a consumer that opted in, and a recipient correlates HashState.perkey_filters[i] with BloomFilter.filter_exprs[i] by position. Heap and single-key joins are unaffected. * Eager filter build. When the outer relation's startup cost is below the hash-table build cost, ExecHashJoinImpl fetches the first outer tuple before building the hash table to take the empty-outer shortcut. For a CustomScan that applies the filter in its own scan loop that is too late -- its first tuple request may decompress a whole row group before the filter exists. A HashJoin.bloom_eager flag, set when the filter is pushed to such a recipient, tells the executor to skip the prefetch and build the hash table (and filter) first. Only such a recipient pays the cost of a possibly-needless hash build on an empty outer. Based on previous patches from Andrew Dunstan, adapted to the path-based pushdown design. --- src/backend/executor/nodeCustom.c | 11 +++++ src/backend/executor/nodeHash.c | 35 +++++++++++++++ src/backend/executor/nodeHashjoin.c | 36 +++++++++++++++ src/backend/optimizer/path/allpaths.c | 14 ++++++ src/backend/optimizer/plan/createplan.c | 45 +++++++++++++++++++ src/backend/optimizer/plan/setrefs.c | 10 +++++ src/backend/optimizer/util/pathnode.c | 23 +++++++--- src/include/nodes/execnodes.h | 14 ++++++ src/include/nodes/extensible.h | 2 + src/include/nodes/plannodes.h | 21 +++++++++ .../expected/test_bloom_customscan.out | 9 ++-- .../test_bloom_customscan.c | 5 +++ 12 files changed, 215 insertions(+), 10 deletions(-) diff --git a/src/backend/executor/nodeCustom.c b/src/backend/executor/nodeCustom.c index b7cc890cd20..c18dcb1035d 100644 --- a/src/backend/executor/nodeCustom.c +++ b/src/backend/executor/nodeCustom.c @@ -101,6 +101,17 @@ ExecInitCustomScan(CustomScan *cscan, EState *estate, int eflags) css->ss.ps.qual = ExecInitQual(cscan->scan.plan.qual, (PlanState *) css); + /* + * Set up any bloom filters a hash join pushed down to this scan (see + * nodeHashjoin.c). This compiles the probe expressions against the scan + * tuple slot; the provider is responsible for actually probing them with + * ExecBloomFilters() from its ExecCustomScan callback, at whatever + * granularity it supports. A no-op unless the provider advertised + * CUSTOMPATH_SUPPORT_BLOOM_FILTERS and the planner found a filter to + * push. + */ + ExecInitBloomFilters((PlanState *) css, css->ss.ss_ScanTupleSlot); + /* * The callback of custom-scan provider applies the final initialization * of the custom-scan-state node according to its logic. diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 37224324bce..2b045eae186 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -197,6 +197,25 @@ MultiExecPrivateHash(HashState *node) (unsigned char *) &hashvalue, sizeof(hashvalue)); + /* + * Likewise for the optional per-key filters, using the per-key + * (single-key) hash ExprStates. Same econtext as the combined + * hash above (ecxt_outertuple is the just-fetched inner tuple). + */ + for (int k = 0; k < node->perkey_nfilters; k++) + { + bool keyisnull; + uint32 keyhash; + + keyhash = DatumGetUInt32(ExecEvalExprSwitchContext(node->perkey_hash[k], + econtext, + &keyisnull)); + if (!keyisnull) + bloom_add_element(node->perkey_filters[k], + (unsigned char *) &keyhash, + sizeof(keyhash)); + } + bucketNumber = ExecHashGetSkewBucket(hashtable, hashvalue); if (bucketNumber != INVALID_SKEW_BUCKET_NO) { @@ -722,6 +741,22 @@ ExecHashTableCreate(HashState *state) oldctx = MemoryContextSwitchTo(hashtable->hashCxt); state->bloom_filter = bloom_create((int64) Max(rows, 1.0), bloom_work_mem, 0); + + /* + * If a recipient opted in, also build one filter per join key (in + * addition to the combined one above). These let a recipient test an + * individual key column on its own; they are less selective than the + * combined filter, so they are built only on demand. + */ + if (state->want_perkey_bloom) + { + state->perkey_filters = palloc_array(struct bloom_filter *, + state->perkey_nfilters); + for (int i = 0; i < state->perkey_nfilters; i++) + state->perkey_filters[i] = bloom_create((int64) Max(rows, 1.0), + bloom_work_mem, 0); + } + MemoryContextSwitchTo(oldctx); } diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index e3467f14739..c501b98067d 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -317,6 +317,17 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) */ node->hj_FirstOuterTupleSlot = NULL; } + else if (((HashJoin *) node->js.ps.plan)->bloom_eager) + { + /* + * We pushed a bloom filter to a CustomScan on the outer + * side that wants it at scan start (e.g. to skip row groups + * before decompression). Skip the empty-outer prefetch and + * build the hash table -- and the filter -- first, so it is + * ready before the outer scan produces its first tuple. + */ + node->hj_FirstOuterTupleSlot = NULL; + } else if (HJ_FILL_OUTER(node) || (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost && !node->hj_OuterNotEmpty)) @@ -908,6 +919,7 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags) hashState = castNode(HashState, innerPlanState(hjstate)); hashState->want_bloom_filter = (node->bloom_consumer_count > 0); hashState->bloom_filter_id = node->bloom_filter_id; + hashState->want_perkey_bloom = node->bloom_perkey; /* * Initialize result slot, type and projection. @@ -1031,6 +1043,28 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags) &hashstate->ps, 0); + /* + * If a recipient opted in to per-key bloom filters, build one inner + * (single-key) hash ExprState per join key, used by the Hash node to + * populate the per-key filters. The combined hash above cannot be + * decomposed, so this is the extra cost a per-key consumer pays. + */ + if (hashstate->want_perkey_bloom) + { + hashstate->perkey_nfilters = nkeys; + hashstate->perkey_hash = palloc_array(ExprState *, nkeys); + for (int i = 0; i < nkeys; i++) + hashstate->perkey_hash[i] = + ExecBuildHash32Expr(hashstate->ps.ps_ResultTupleDesc, + hashstate->ps.resultops, + &inner_hashfuncid[i], + list_make1_oid(list_nth_oid(node->hashcollations, i)), + list_make1(list_nth(hash->hashkeys, i)), + &hash_strict[i], + &hashstate->ps, + 0); + } + /* Remember whether we need to save tuples with null join keys */ hjstate->hj_KeepNullTuples = HJ_FILL_OUTER(hjstate); hashstate->keep_null_tuples = HJ_FILL_INNER(hjstate); @@ -1118,6 +1152,7 @@ ExecEndHashJoin(HashJoinState *node) ExecHashTableDestroy(node->hj_HashTable); node->hj_HashTable = NULL; hashNode->bloom_filter = NULL; + hashNode->perkey_filters = NULL; } /* @@ -1775,6 +1810,7 @@ ExecReScanHashJoin(HashJoinState *node) * freed by the ExecHashTableDestroy call. */ hashNode->bloom_filter = NULL; + hashNode->perkey_filters = NULL; /* * if chgParam of subnode is not null then plan will be re-scanned diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index 8829c7c3108..72c51422394 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -25,6 +25,7 @@ #include "catalog/pg_proc.h" #include "foreign/fdwapi.h" #include "miscadmin.h" +#include "nodes/extensible.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" #include "nodes/supportnodes.h" @@ -1294,6 +1295,19 @@ generate_expected_filter_paths(PlannerInfo *root, RelOptInfo *rel) case T_TidRangePath: basepaths = lappend(basepaths, path); break; + case T_CustomPath: + + /* + * A base-relation CustomScan can receive a pushed-down filter, + * but only if the provider advertised that it knows how to + * consume one (it probes the filter itself inside its scan + * loop; see find_bloom_filter_recipient in createplan.c and + * ExecInitCustomScan). Providers that don't opt in, like heap, + * are unaffected. + */ + if (((CustomPath *) path)->flags & CUSTOMPATH_SUPPORT_BLOOM_FILTERS) + basepaths = lappend(basepaths, path); + break; default: break; } diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 51990b98419..b9e481ba298 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -4781,6 +4781,25 @@ find_bloom_filter_recipient(Plan *plan, Index target_relid) return plan; return NULL; } + case T_CustomScan: + { + /* + * A CustomScan on a base relation can act as a recipient, but + * only if the provider advertised that it knows how to consume + * a pushed-down bloom filter. Unlike the stock scans, the + * probe is not performed by ExecScanExtended() (a CustomScan + * dispatches to the provider's own ExecCustomScan); the + * provider is responsible for calling ExecBloomFilters() at + * whatever granularity it likes. Non-leaf custom nodes have + * scanrelid == 0 and so are rejected by the relid test. + */ + CustomScan *cscan = (CustomScan *) plan; + + if ((cscan->flags & CUSTOMPATH_SUPPORT_BLOOM_FILTERS) && + cscan->scan.scanrelid == target_relid) + return plan; + return NULL; + } case T_Sort: case T_IncrementalSort: case T_Material: @@ -4910,6 +4929,32 @@ try_push_bloom_filter(PlannerInfo *root, HashJoin *hj, Plan *outer_plan, recipient->bloom_filters = lappend(recipient->bloom_filters, bf); + /* + * A CustomScan recipient that opted in consumes the filter in its own scan + * loop, possibly at the storage level, so it wants two things a stock scan + * does not. + */ + if (IsA(recipient, CustomScan) && + (((CustomScan *) recipient)->flags & CUSTOMPATH_SUPPORT_BLOOM_FILTERS)) + { + /* + * Build the hash table (and filter) before the outer scan starts, so + * the filter is available on the first tuple request rather than after + * a batch has already been scanned unfiltered. + */ + hj->bloom_eager = true; + + /* + * Also build a separate filter per join key, so the recipient can test + * a single column on its own (e.g. against a per-column dictionary or + * zone map). The combined filter is always built and is the more + * selective one for a per-row probe; there is nothing to gain for a + * single-key join, where the two coincide. + */ + if (list_length(hj->hashkeys) > 1) + hj->bloom_perkey = true; + } + hj->bloom_consumer_count++; } diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index 0059acfccbe..74c7a5bf3a5 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -1826,6 +1826,16 @@ set_customscan_references(PlannerInfo *root, cscan->custom_exprs = fix_scan_list(root, cscan->custom_exprs, rtoffset, NUM_EXEC_QUAL((Plan *) cscan)); + + /* + * Bloom filters pushed down to a base-relation CustomScan: the key + * expressions are plain Vars of the scanned relation, so they are + * fixed up the same way as the scan qual. (A CustomScan emitting a + * custom_scan_tlist takes the branch above and would instead need + * fix_upper_expr against the tlist index, like IndexOnlyScan; no + * in-tree provider needs that yet.) + */ + fix_scan_bloom_filters(root, (Plan *) cscan, rtoffset); } /* Adjust child plan-nodes recursively, if needed */ diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index 9cd9188a1cf..65690553cf5 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -501,6 +501,17 @@ create_filtered_scan_path(PlannerInfo *root, Path *subpath, List *filters) case T_TidRangePath: sz = sizeof(TidRangePath); break; + case T_CustomPath: + + /* + * A base-relation CustomScan provider that advertised + * CUSTOMPATH_SUPPORT_BLOOM_FILTERS can receive a pushed-down + * filter and apply it in its own scan loop + * .generate_expected_filter_paths() only offers such paths here, + * so we need not re-check the flag. + */ + sz = sizeof(CustomPath); + break; default: /* unsupported scan path type */ return NULL; @@ -620,10 +631,10 @@ add_path(RelOptInfo *parent_rel, Path *new_path) /* * Paths carrying different sets of expected Bloom filters serve - * different purposes (each may be consumed by a different parent join, - * or none at all), and their cost/row estimates aren't directly - * comparable. So if the two paths don't expect the same filters, keep - * both and don't let either dominate the other. + * different purposes (each may be consumed by a different parent + * join, or none at all), and their cost/row estimates aren't directly + * comparable. So if the two paths don't expect the same filters, + * keep both and don't let either dominate the other. */ if (!expected_filters_equal(new_path->expected_filters, old_path->expected_filters)) @@ -854,8 +865,8 @@ add_path_precheck(RelOptInfo *parent_rel, int disabled_nodes, * filters of their own, so any filter-bearing old path is a * non-comparable speculative path and must not be allowed to dominate * (and thereby suppress) the new path. Skipping them here also - * guarantees that a join relation always retains at least one ordinary, - * filter-free path to serve as cheapest_total_path. + * guarantees that a join relation always retains at least one + * ordinary, filter-free path to serve as cheapest_total_path. */ if (old_path->expected_filters != NIL) continue; diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index aad721f3421..873c02fb080 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -2786,6 +2786,20 @@ typedef struct HashState */ struct bloom_filter *bloom_filter; + /* + * Optional per-key bloom filters, built in addition to the combined + * bloom_filter above when a recipient opted in (HashJoin.bloom_perkey). + * perkey_filters has perkey_nfilters entries, one per join key, in hashkey + * order; a recipient correlates them with BloomFilter.filter_exprs by + * position. perkey_hash holds the matching per-key (single-key) hash + * ExprStates used to populate them during the build. All live in hashCxt + * and follow the same lifecycle as bloom_filter. + */ + bool want_perkey_bloom; + int perkey_nfilters; + struct bloom_filter **perkey_filters; + ExprState **perkey_hash; + /* * Counters with total per-filter instrumentation. Separate from the * per-recipient counters in BloomFilterState. Redundant, but will be diff --git a/src/include/nodes/extensible.h b/src/include/nodes/extensible.h index 517db95c4a3..ea2cef4fe3b 100644 --- a/src/include/nodes/extensible.h +++ b/src/include/nodes/extensible.h @@ -84,6 +84,8 @@ extern const ExtensibleNodeMethods *GetExtensibleNodeMethods(const char *extnode #define CUSTOMPATH_SUPPORT_BACKWARD_SCAN 0x0001 #define CUSTOMPATH_SUPPORT_MARK_RESTORE 0x0002 #define CUSTOMPATH_SUPPORT_PROJECTION 0x0004 +/* provider can accept a hashjoin bloom filter pushed down to its scan */ +#define CUSTOMPATH_SUPPORT_BLOOM_FILTERS 0x0008 /* * Custom path methods. Mostly, we just need to know how to convert a diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 4e35d77cc49..0e011f3d4e2 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -1124,6 +1124,27 @@ typedef struct HashJoin * Zero when this HashJoin has no consumers. */ int bloom_filter_id; + + /* + * Whether to also build one bloom filter per join key (in addition to the + * combined-hash filter), so that a recipient can test an individual key + * column on its own -- e.g. a column store probing a per-column dictionary + * or zone map. Set at plan time only when the recipient is a CustomScan + * that advertised CUSTOMPATH_SUPPORT_BLOOM_FILTERS. The combined filter is + * always built and remains the more selective one; per-key filters are an + * opt-in extra that nobody else pays for. + */ + bool bloom_perkey; + + /* + * Whether to build the hash table (and bloom filter) before fetching the + * first outer tuple, skipping the empty-outer prefetch optimization. Set + * at plan time when the filter is pushed to a CustomScan recipient, which + * may want to apply the filter the moment its scan starts (e.g. a column + * store skipping row groups before decompression) rather than after having + * already produced a batch unfiltered. See ExecHashJoinImpl. + */ + bool bloom_eager; } HashJoin; /* ---------------- diff --git a/src/test/modules/test_bloom_customscan/expected/test_bloom_customscan.out b/src/test/modules/test_bloom_customscan/expected/test_bloom_customscan.out index e1ce5903a02..8e784209901 100644 --- a/src/test/modules/test_bloom_customscan/expected/test_bloom_customscan.out +++ b/src/test/modules/test_bloom_customscan/expected/test_bloom_customscan.out @@ -33,9 +33,10 @@ SELECT count(*) FROM cs_fact f JOIN cs_dim d ON f.a = d.id; Output: f.a -> Hash Output: d.id + Bloom Filter 1 -> Custom Scan (TestBloomCustomScan) on public.cs_dim d Output: d.id -(10 rows) +(11 rows) -- Single-key join: the filter must actually reject fact rows. SELECT test_bloom_cs_reset(); @@ -53,7 +54,7 @@ SELECT count(*) FROM cs_fact f JOIN cs_dim d ON f.a = d.id; SELECT test_bloom_cs_rejected_rows() > 0 AS filter_rejected_rows; filter_rejected_rows ---------------------- - f + t (1 row) -- Correctness: the result must be identical with and without the filter. @@ -97,13 +98,13 @@ SELECT count(*) FROM cs_fact f JOIN cs_dim d ON f.a = d.id AND f.b = d.id2; SELECT test_bloom_cs_perkey_built() AS perkey_filters_built; perkey_filters_built ---------------------- - f + t (1 row) SELECT test_bloom_cs_rejected_rows() > 0 AS filter_rejected_rows; filter_rejected_rows ---------------------- - f + t (1 row) -- cleanup diff --git a/src/test/modules/test_bloom_customscan/test_bloom_customscan.c b/src/test/modules/test_bloom_customscan/test_bloom_customscan.c index 124cbef9ded..99338c7d361 100644 --- a/src/test/modules/test_bloom_customscan/test_bloom_customscan.c +++ b/src/test/modules/test_bloom_customscan/test_bloom_customscan.c @@ -145,6 +145,7 @@ bloom_cs_set_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, Index rti, cpath->path.total_cost = total_cost; cpath->path.pathkeys = NIL; + cpath->flags = CUSTOMPATH_SUPPORT_BLOOM_FILTERS; cpath->custom_paths = NIL; cpath->custom_private = NIL; cpath->methods = &bloom_cs_path_methods; @@ -234,6 +235,10 @@ bloom_cs_maybe_inspect_perkey(BloomCSScanState * bcss) if (hashNode->bloom_filter == NULL) return; + if (hashNode->want_perkey_bloom && + hashNode->perkey_nfilters > 0 && + hashNode->perkey_filters != NULL) + test_bloom_cs_perkey_seen = true; } bcss->inspected = true; -- 2.50.1 (Apple Git-155)