From c8c7d2460af8e3b2d580d1c44101319eb2f90646 Mon Sep 17 00:00:00 2001
From: Julien Tachoires <julien@tachoires.me>
Date: Mon, 25 Aug 2025 20:16:47 +0200
Subject: [PATCH 5/6] Push down IN/NOT IN <array> quals to table AMs

In order to allow table AMs to apply key filtering against scalar array
values, when a such qualifier is found then the executor is in charge of
collecting the required informations to later build a hash table. The
table AM is then able to create a simple hash table and use it to quickly
check existence or absence of the key in the given array in a O(1)
fashion.

The new structure ScanKeyHashInfoData is used to store the hash
informations that are passed to the table AM via the new ScanKey field:
sk_hashinfo
In case of Index scan, this field is set to NULL and unused.
---
 src/backend/access/common/scankey.c         |   9 +
 src/backend/access/heap/Makefile            |   1 +
 src/backend/access/heap/heapam.c            |   1 -
 src/backend/access/heap/heapam_valid.c      | 290 ++++++++++++++++++++
 src/backend/access/heap/meson.build         |   1 +
 src/backend/executor/nodeSeqscan.c          | 171 +++++++++++-
 src/backend/optimizer/plan/createplan.c     |  60 ++++
 src/include/access/heapam.h                 |   2 +
 src/include/access/skey.h                   |  34 +++
 src/include/access/valid.h                  |  58 ----
 src/test/regress/expected/qual_pushdown.out | 126 +++++++++
 src/test/regress/sql/qual_pushdown.sql      |  12 +
 12 files changed, 694 insertions(+), 71 deletions(-)
 create mode 100644 src/backend/access/heap/heapam_valid.c
 delete mode 100644 src/include/access/valid.h

diff --git a/src/backend/access/common/scankey.c b/src/backend/access/common/scankey.c
index 2d65ab02dd3..0d34bab755c 100644
--- a/src/backend/access/common/scankey.c
+++ b/src/backend/access/common/scankey.c
@@ -44,6 +44,7 @@ ScanKeyEntryInitialize(ScanKey entry,
 	entry->sk_subtype = subtype;
 	entry->sk_collation = collation;
 	entry->sk_argument = argument;
+	entry->sk_hashinfo = NULL;
 	if (RegProcedureIsValid(procedure))
 	{
 		fmgr_info(procedure, &entry->sk_func);
@@ -85,6 +86,7 @@ ScanKeyInit(ScanKey entry,
 	entry->sk_subtype = InvalidOid;
 	entry->sk_collation = C_COLLATION_OID;
 	entry->sk_argument = argument;
+	entry->sk_hashinfo = NULL;
 	fmgr_info(procedure, &entry->sk_func);
 }
 
@@ -113,5 +115,12 @@ ScanKeyEntryInitializeWithInfo(ScanKey entry,
 	entry->sk_subtype = subtype;
 	entry->sk_collation = collation;
 	entry->sk_argument = argument;
+	entry->sk_hashinfo = NULL;
 	fmgr_info_copy(&entry->sk_func, finfo, CurrentMemoryContext);
 }
+
+void
+ScanKeyEntrySetHashInfo(ScanKey entry, ScanKeyHashInfo hashinfo)
+{
+	entry->sk_hashinfo = hashinfo;
+}
diff --git a/src/backend/access/heap/Makefile b/src/backend/access/heap/Makefile
index 394534172fa..b796a4ccdff 100644
--- a/src/backend/access/heap/Makefile
+++ b/src/backend/access/heap/Makefile
@@ -15,6 +15,7 @@ include $(top_builddir)/src/Makefile.global
 OBJS = \
 	heapam.o \
 	heapam_handler.o \
+	heapam_valid.o \
 	heapam_visibility.o \
 	heapam_xlog.o \
 	heaptoast.o \
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 71d8e06d8dd..ad19804b5e1 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -37,7 +37,6 @@
 #include "access/multixact.h"
 #include "access/subtrans.h"
 #include "access/syncscan.h"
-#include "access/valid.h"
 #include "access/visibilitymap.h"
 #include "access/xloginsert.h"
 #include "catalog/pg_database.h"
diff --git a/src/backend/access/heap/heapam_valid.c b/src/backend/access/heap/heapam_valid.c
new file mode 100644
index 00000000000..7261723e378
--- /dev/null
+++ b/src/backend/access/heap/heapam_valid.c
@@ -0,0 +1,290 @@
+/*-------------------------------------------------------------------------
+ *
+ * heapam_valid.c
+ *	  Heap tuple qualification validity definitions
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/heap/heapam_valid.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "utils/array.h"
+#include "utils/lsyscache.h"
+#include "access/heapam.h"
+#include "access/htup.h"
+#include "access/htup_details.h"
+#include "access/skey.h"
+#include "access/tupdesc.h"
+
+/*
+ * SearchArrayHashEntry - Hash table entry type used by SK_SEARCHARRAY
+ */
+typedef struct SearchArrayHashEntry
+{
+	Datum		key;
+	uint32		status;			/* hash status */
+	uint32		hash;			/* hash value (cached) */
+}			SearchArrayHashEntry;
+
+#define SH_PREFIX searcharray
+#define SH_ELEMENT_TYPE SearchArrayHashEntry
+#define SH_KEY_TYPE Datum
+#define SH_SCOPE static inline
+#define SH_DECLARE
+#include "lib/simplehash.h"
+
+static bool searcharray_hash_element_match(struct searcharray_hash *tb, Datum key1,
+										   Datum key2);
+static uint32 searcharray_element_hash(struct searcharray_hash *tb, Datum key);
+
+/*
+ * SearchArrayHashTable - Hash table for SK_SEARCHARRAY
+ */
+typedef struct SearchArrayHashTable
+{
+	searcharray_hash *tab;		/* underlying hash table */
+	FmgrInfo	hash_finfo;		/* hash function */
+	FunctionCallInfo hash_fcinfo;	/* arguments etc */
+	FmgrInfo	match_finfo;	/* comparison function */
+	FunctionCallInfo match_fcinfo;	/* arguments etc */
+	bool		has_nulls;
+}			SearchArrayHashTable;
+
+/* Define parameters for SearchArray hash table code generation. */
+#define SH_PREFIX searcharray
+#define SH_ELEMENT_TYPE SearchArrayHashEntry
+#define SH_KEY_TYPE Datum
+#define SH_KEY key
+#define SH_HASH_KEY(tb, key) searcharray_element_hash(tb, key)
+#define SH_EQUAL(tb, a, b) searcharray_hash_element_match(tb, a, b)
+#define SH_SCOPE static inline
+#define SH_STORE_HASH
+#define SH_GET_HASH(tb, a) a->hash
+#define SH_DEFINE
+#include "lib/simplehash.h"
+
+/*
+ * Hash function for scalar array hash op elements.
+ *
+ * We use the element type's default hash opclass, and the column collation
+ * if the type is collation-sensitive.
+ */
+static uint32
+searcharray_element_hash(struct searcharray_hash *tb, Datum key)
+{
+	SearchArrayHashTable *elements_tab = (SearchArrayHashTable *) tb->private_data;
+	FunctionCallInfo fcinfo = elements_tab->hash_fcinfo;
+	Datum		hash;
+
+	fcinfo->args[0].value = key;
+	fcinfo->args[0].isnull = false;
+
+	hash = elements_tab->hash_finfo.fn_addr(fcinfo);
+
+	return DatumGetUInt32(hash);
+}
+
+/*
+ * Matching function for scalar array hash op elements, to be used in hashtable
+ * lookups.
+ */
+static bool
+searcharray_hash_element_match(struct searcharray_hash *tb, Datum key1, Datum key2)
+{
+	Datum		result;
+
+	SearchArrayHashTable *elements_tab = (SearchArrayHashTable *) tb->private_data;
+	FunctionCallInfo fcinfo = elements_tab->match_fcinfo;
+
+	fcinfo->args[0].value = key1;
+	fcinfo->args[0].isnull = false;
+	fcinfo->args[1].value = key2;
+	fcinfo->args[1].isnull = false;
+
+	result = elements_tab->match_finfo.fn_addr(fcinfo);
+
+	return DatumGetBool(result);
+}
+
+/*
+ *		HeapKeyTest
+ *
+ *		Test a heap tuple to see if it satisfies a scan key.
+ */
+bool
+HeapKeyTest(HeapTuple tuple, TupleDesc tupdesc, int nkeys, ScanKey keys)
+{
+	int			cur_nkeys = nkeys;
+	ScanKey		cur_key = keys;
+
+	for (; cur_nkeys--; cur_key++)
+	{
+		Datum		atp;
+		bool		isnull;
+		Datum		test;
+
+		if (cur_key->sk_flags & SK_ISNULL)
+			return false;
+
+		atp = heap_getattr(tuple, cur_key->sk_attno, tupdesc, &isnull);
+
+		/* Case when the rightop was a scalar array */
+		if (cur_key->sk_flags & SK_SEARCHARRAY)
+		{
+			bool		hashfound;
+			ScanKeyHashInfo hashinfo = cur_key->sk_hashinfo;
+			SearchArrayHashTable *hashtab;
+
+			/*
+			 * Build the hash table on the first call if needed
+			 */
+			if (hashinfo->hashtab == NULL)
+			{
+				ArrayType  *arr;
+				int16		typlen;
+				bool		typbyval;
+				char		typalign;
+				int			nitems;
+				bool		has_nulls = false;
+				char	   *s;
+				bits8	   *bitmap;
+				int			bitmask;
+
+				arr = DatumGetArrayTypeP(cur_key->sk_argument);
+				nitems = ArrayGetNItems(ARR_NDIM(arr), ARR_DIMS(arr));
+
+				get_typlenbyvalalign(ARR_ELEMTYPE(arr),
+									 &typlen,
+									 &typbyval,
+									 &typalign);
+
+				hashtab = (SearchArrayHashTable *)
+					palloc0(sizeof(SearchArrayHashTable));
+
+				hashtab->hash_finfo = hashinfo->hash_finfo;
+				hashtab->match_finfo = hashinfo->match_finfo;
+				hashtab->hash_fcinfo = hashinfo->hash_fcinfo;
+				hashtab->match_fcinfo = hashinfo->match_fcinfo;
+
+				/*
+				 * Create the hash table sizing it according to the number of
+				 * elements in the array.  This does assume that the array has
+				 * no duplicates. If the array happens to contain many
+				 * duplicate values then it'll just mean that we sized the
+				 * table a bit on the large side.
+				 */
+				hashtab->tab = searcharray_create(CurrentMemoryContext,
+												  nitems,
+												  hashtab);
+
+
+				s = (char *) ARR_DATA_PTR(arr);
+				bitmap = ARR_NULLBITMAP(arr);
+				bitmask = 1;
+				for (int i = 0; i < nitems; i++)
+				{
+					/* Get array element, checking for NULL. */
+					if (bitmap && (*bitmap & bitmask) == 0)
+					{
+						has_nulls = true;
+					}
+					else
+					{
+						Datum		element;
+
+						element = fetch_att(s, typbyval, typlen);
+						s = att_addlength_pointer(s, typlen, s);
+						s = (char *) att_align_nominal(s, typalign);
+
+						searcharray_insert(hashtab->tab, element,
+										   &hashfound);
+					}
+
+					/* Advance bitmap pointer if any. */
+					if (bitmap)
+					{
+						bitmask <<= 1;
+						if (bitmask == 0x100)
+						{
+							bitmap++;
+							bitmask = 1;
+						}
+					}
+				}
+
+				/*
+				 * Remember if we had any nulls so that we know if we need to
+				 * execute non-strict functions with a null lhs value if no
+				 * match is found.
+				 */
+				hashtab->has_nulls = has_nulls;
+
+				/* Link the hash table to the current ScanKey */
+				hashinfo->hashtab = hashtab;
+			}
+			else
+				hashtab = (SearchArrayHashTable *) hashinfo->hashtab;
+
+			/* Check the hash to see if we have a match. */
+			hashfound = NULL != searcharray_lookup(hashtab->tab, atp);
+
+			/* IN case */
+			if (hashinfo->inclause && hashfound)
+				return true;
+			/* NOT IN case */
+			if (!hashinfo->inclause && !hashfound)
+				return true;
+
+			if (!hashfound && hashtab->has_nulls)
+			{
+				if (!hashtab->match_finfo.fn_strict)
+				{
+					Datum		result;
+
+					/*
+					 * Execute function will null rhs just once.
+					 */
+					hashtab->match_fcinfo->args[0].value = atp;
+					hashtab->match_fcinfo->args[0].isnull = isnull;
+					hashtab->match_fcinfo->args[1].value = (Datum) 0;
+					hashtab->match_fcinfo->args[1].isnull = true;
+
+					result = hashtab->match_finfo.fn_addr(hashtab->match_fcinfo);
+
+					/*
+					 * Reverse the result for NOT IN clauses since the above
+					 * function is the equality function and we need
+					 * not-equals.
+					 */
+					if (!hashinfo->inclause)
+						result = !result;
+
+					if (result)
+						return true;
+				}
+			}
+
+			return false;
+		}
+		else
+		{
+			if (isnull)
+				return false;
+
+			test = FunctionCall2Coll(&cur_key->sk_func,
+									 cur_key->sk_collation,
+									 atp, cur_key->sk_argument);
+
+			if (!DatumGetBool(test))
+				return false;
+		}
+	}
+
+	return true;
+}
diff --git a/src/backend/access/heap/meson.build b/src/backend/access/heap/meson.build
index 2637b24112f..2e23ca9a586 100644
--- a/src/backend/access/heap/meson.build
+++ b/src/backend/access/heap/meson.build
@@ -3,6 +3,7 @@
 backend_sources += files(
   'heapam.c',
   'heapam_handler.c',
+  'heapam_valid.c',
   'heapam_visibility.c',
   'heapam_xlog.c',
   'heaptoast.c',
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index f134ff591c3..1b181c8f254 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -96,16 +96,18 @@ ExecSeqBuildScanKeys(PlanState *planstate, List *quals, int *numScanKeys,
 		Expr	   *leftop;		/* expr on lhs of operator */
 		Expr	   *rightop;	/* expr on rhs ... */
 		AttrNumber	varattno;	/* att number used in scan */
+		int			flags = 0;
+		Datum		scanvalue;
+		Oid			collationid = InvalidOid;
+		ScanKeyHashInfo skeyhashinfo = NULL;
 
 		/*
 		 * Simple qual case: <leftop> <op> <rightop>
 		 */
 		if (IsA(clause, OpExpr))
 		{
-			int			flags = 0;
-			Datum		scanvalue;
-
 			opfuncid = ((OpExpr *) clause)->opfuncid;
+			collationid = ((OpExpr *) clause)->inputcollid;
 
 			/*
 			 * leftop and rightop are not relabeled and can be used as they
@@ -154,17 +156,149 @@ ExecSeqBuildScanKeys(PlanState *planstate, List *quals, int *numScanKeys,
 				n_runtime_keys++;
 				scanvalue = (Datum) 0;
 			}
+		}
+		/* <leftop> <op> ANY/ALL (array-expression) */
+		else if (IsA(clause, ScalarArrayOpExpr))
+		{
+			ScalarArrayOpExpr *saop = (ScalarArrayOpExpr *) clause;
+			Oid			cmpfuncid;
+			Oid			hashfuncid;
+			Oid			negfuncid;
+
+			opfuncid = saop->opfuncid;
+			collationid = saop->inputcollid;
+
+			leftop = (Expr *) linitial(saop->args);
+			rightop = (Expr *) lsecond(saop->args);
+
+			varattno = ((Var *) leftop)->varattno;
+
+			flags |= SK_SEARCHARRAY;
+
+			if (IsA(rightop, Const))
+			{
+				/*
+				 * OK, simple constant comparison value
+				 */
+				scanvalue = ((Const *) rightop)->constvalue;
+				if (((Const *) rightop)->constisnull)
+					flags |= SK_ISNULL;
+			}
+			else
+			{
+				/* Need to treat this one as a run-time key */
+				if (n_runtime_keys >= max_runtime_keys)
+				{
+					if (max_runtime_keys == 0)
+					{
+						max_runtime_keys = 8;
+						runtime_keys = (SeqScanRuntimeKeyInfo *)
+							palloc(max_runtime_keys * sizeof(SeqScanRuntimeKeyInfo));
+					}
+					else
+					{
+						max_runtime_keys *= 2;
+						runtime_keys = (SeqScanRuntimeKeyInfo *)
+							repalloc(runtime_keys,
+									 max_runtime_keys * sizeof(SeqScanRuntimeKeyInfo));
+					}
+				}
+				runtime_keys[n_runtime_keys].scan_key = this_scan_key;
+				runtime_keys[n_runtime_keys].key_expr =
+					ExecInitExpr(rightop, planstate);
+				runtime_keys[n_runtime_keys].key_toastable =
+					TypeIsToastable(((Var *) leftop)->vartype);
+				n_runtime_keys++;
+				scanvalue = (Datum) 0;
+			}
+
+			hashfuncid = saop->hashfuncid;
+			negfuncid = saop->negfuncid;
+
+			/*
+			 * If there is no hash function attached to the expr., then we
+			 * need to force one.
+			 *
+			 * One reason why there is no hash function attached is that the
+			 * scalar array is too small. In this case, the executor assumes
+			 * that for small array, using hash table/functions does not worth
+			 * it. But in our case, we want to handle all arrays in the same
+			 * way, whatever the array size.
+			 *
+			 * Another reason is that the right op. is not a constant and
+			 * needs runtime evaluation.
+			 */
+			if (!OidIsValid(hashfuncid))
+			{
+				Oid			lefthashfunc;
+				Oid			righthashfunc;
+
+				if (saop->useOr)
+				{
+					if (get_op_hash_functions(saop->opno, &lefthashfunc, &righthashfunc) &&
+						lefthashfunc == righthashfunc)
+						hashfuncid = lefthashfunc;
+				}
+				else
+				{
+					Oid			negator = get_negator(saop->opno);
+
+					if (OidIsValid(negator) &&
+						get_op_hash_functions(negator, &lefthashfunc, &righthashfunc) &&
+						lefthashfunc == righthashfunc)
+					{
+						hashfuncid = lefthashfunc;
+						negfuncid = get_opcode(negator);
+					}
+				}
+			}
 
-			n_scan_keys++;
+			/*
+			 * If no hash function can be found, it means that we cannot use a
+			 * hash table to handle array search because the operator does not
+			 * support hashing.
+			 *
+			 * TODO: use an alternative to hash table in this case. For now,
+			 * we just ignore this qual and don't push it, so we let the
+			 * executor handle it for us.
+			 */
+			if (!OidIsValid(hashfuncid))
+				continue;
 
-			ScanKeyEntryInitialize(this_scan_key,
-								   flags,
-								   varattno,
-								   InvalidStrategy, /* no strategy */
-								   InvalidOid,	/* no subtype */
-								   ((OpExpr *) clause)->inputcollid,
-								   opfuncid,
-								   scanvalue);
+			/*
+			 * If we have a negative function set, let's use it as the
+			 * comparison function because we are in a NOT IN case.
+			 */
+			if (OidIsValid(negfuncid))
+				cmpfuncid = negfuncid;
+			else
+				cmpfuncid = saop->opfuncid;
+
+			skeyhashinfo = (ScanKeyHashInfo) palloc0(sizeof(ScanKeyHashInfoData));
+
+			/* IN or NOT IN */
+			skeyhashinfo->inclause = saop->useOr;
+			skeyhashinfo->hash_fcinfo = palloc0(SizeForFunctionCallInfo(1));
+			skeyhashinfo->match_fcinfo = palloc0(SizeForFunctionCallInfo(2));
+
+			fmgr_info(hashfuncid, &skeyhashinfo->hash_finfo);
+			fmgr_info_set_expr((Node *) saop, &skeyhashinfo->hash_finfo);
+			fmgr_info(cmpfuncid, &skeyhashinfo->match_finfo);
+			fmgr_info_set_expr((Node *) saop, &skeyhashinfo->match_finfo);
+
+			InitFunctionCallInfoData(*skeyhashinfo->hash_fcinfo,
+									 &skeyhashinfo->hash_finfo,
+									 1,
+									 saop->inputcollid,
+									 NULL,
+									 NULL);
+
+			InitFunctionCallInfoData(*skeyhashinfo->match_fcinfo,
+									 &skeyhashinfo->match_finfo,
+									 2,
+									 saop->inputcollid,
+									 NULL,
+									 NULL);
 		}
 		else
 		{
@@ -173,6 +307,19 @@ ExecSeqBuildScanKeys(PlanState *planstate, List *quals, int *numScanKeys,
 			 */
 			continue;
 		}
+
+		n_scan_keys++;
+
+		ScanKeyEntryInitialize(this_scan_key,
+							   flags,
+							   varattno,
+							   InvalidStrategy, /* no strategy */
+							   InvalidOid,	/* no subtype */
+							   collationid,
+							   opfuncid,
+							   scanvalue);
+
+		ScanKeyEntrySetHashInfo(this_scan_key, skeyhashinfo);
 	}
 
 	/*
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index bb0856ac0bc..d301dc22661 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -5369,6 +5369,66 @@ fix_tablequal_references(PlannerInfo *root, Path *best_path,
 					fixed_tablequals = lappend(fixed_tablequals, clause);
 					break;
 				}
+
+				/*
+				 * ScalarArrayOpExpr case: <leftop> <op> ANY(ARRAY(..))
+				 */
+			case T_ScalarArrayOpExpr:
+				{
+					ScalarArrayOpExpr *saopexpr = (ScalarArrayOpExpr *) clause;
+					Expr	   *leftop;
+					Expr	   *rightop;
+
+					leftop = (Expr *) get_leftop(clause);
+					rightop = (Expr *) get_rightop(clause);
+
+					if (leftop && IsA(leftop, RelabelType))
+						leftop = ((RelabelType *) leftop)->arg;
+
+					if (rightop && IsA(rightop, RelabelType))
+						rightop = ((RelabelType *) rightop)->arg;
+
+					if (leftop == NULL || rightop == NULL)
+						continue;
+
+					if (saopexpr->opno >= FirstNormalObjectId)
+						continue;
+
+					if (!get_func_leakproof(saopexpr->opfuncid))
+						continue;
+
+					if (IsA(rightop, Var) && !IsA(leftop, Var)
+						&& ((Var *) rightop)->varattno > 0)
+					{
+						Expr	   *tmpop = leftop;
+						Oid			commutator;
+
+						leftop = rightop;
+						rightop = tmpop;
+
+						commutator = get_commutator(saopexpr->opno);
+
+						if (OidIsValid(commutator))
+						{
+							saopexpr->opno = commutator;
+							saopexpr->opfuncid = get_opcode(saopexpr->opno);
+						}
+						else
+							continue;
+					}
+
+					if (!(IsA(leftop, Var) && ((Var *) leftop)->varattno > 0))
+						continue;
+
+					if (!check_tablequal_rightop(rightop))
+						continue;
+
+					list_free(saopexpr->args);
+					saopexpr->args = list_make2(leftop, rightop);
+
+					fixed_tablequals = lappend(fixed_tablequals, clause);
+					break;
+				}
 			default:
 				continue;
 		}
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 252f5e661c1..091aa1ff11a 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -429,6 +429,8 @@ extern bool ResolveCminCmaxDuringDecoding(struct HTAB *tuplecid_data,
 extern void HeapCheckForSerializableConflictOut(bool visible, Relation relation, HeapTuple tuple,
 												Buffer buffer, Snapshot snapshot);
 
+extern bool HeapKeyTest(HeapTuple tuple, TupleDesc tupdesc, int nkeys, ScanKey keys);
+
 /*
  * heap_execute_freeze_tuple
  *		Execute the prepared freezing of a tuple with caller's freeze plan.
diff --git a/src/include/access/skey.h b/src/include/access/skey.h
index e650c2e7baf..0f5a556df4c 100644
--- a/src/include/access/skey.h
+++ b/src/include/access/skey.h
@@ -18,6 +18,38 @@
 #include "access/stratnum.h"
 #include "fmgr.h"
 
+/*
+ * A ScanKeyHashInfoData contains the necessary informations required to apply
+ * tuple filtering in table/heap scan when the condition is "column op
+ * ANY(ARRAY[...])".
+ *
+ * This structure is only used when pushing down quals to the Table Access
+ * Method layer in a table/heap scan context. In this case, the Table AM can
+ * use it to filter out tuples, based on a hash table.
+ *
+ * hashtab is a void pointer that will be used to store the actual reference
+ * to the hash table that will be created later during table scan.
+ *
+ * inclause indicates if the IN clause is involved. If not, then, the NOT IN
+ * clause is.
+ *
+ * hash_finfo and hash_fcinfo define the function and function call in charge
+ * of hashing a value.
+ *
+ * match_finfo and match_fcinfo define the function and function call in charge
+ * of making the comparison between two hashed values.
+ */
+typedef struct ScanKeyHashInfoData
+{
+	void	   *hashtab;
+	bool		inclause;
+	FmgrInfo	hash_finfo;
+	FmgrInfo	match_finfo;
+	FunctionCallInfo hash_fcinfo;
+	FunctionCallInfo match_fcinfo;
+}			ScanKeyHashInfoData;
+
+typedef ScanKeyHashInfoData * ScanKeyHashInfo;
 
 /*
  * A ScanKey represents the application of a comparison operator between
@@ -70,6 +102,7 @@ typedef struct ScanKeyData
 	Oid			sk_collation;	/* collation to use, if needed */
 	FmgrInfo	sk_func;		/* lookup info for function to call */
 	Datum		sk_argument;	/* data to compare */
+	ScanKeyHashInfo sk_hashinfo;	/* hash table informations */
 } ScanKeyData;
 
 typedef ScanKeyData *ScanKey;
@@ -147,5 +180,6 @@ extern void ScanKeyEntryInitializeWithInfo(ScanKey entry,
 										   Oid collation,
 										   FmgrInfo *finfo,
 										   Datum argument);
+extern void ScanKeyEntrySetHashInfo(ScanKey entry, ScanKeyHashInfo hashinfo);
 
 #endif							/* SKEY_H */
diff --git a/src/include/access/valid.h b/src/include/access/valid.h
deleted file mode 100644
index 8b33089dac4..00000000000
--- a/src/include/access/valid.h
+++ /dev/null
@@ -1,58 +0,0 @@
-/*-------------------------------------------------------------------------
- *
- * valid.h
- *	  POSTGRES tuple qualification validity definitions.
- *
- *
- * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
- * Portions Copyright (c) 1994, Regents of the University of California
- *
- * src/include/access/valid.h
- *
- *-------------------------------------------------------------------------
- */
-#ifndef VALID_H
-#define VALID_H
-
-#include "access/htup.h"
-#include "access/htup_details.h"
-#include "access/skey.h"
-#include "access/tupdesc.h"
-
-/*
- *		HeapKeyTest
- *
- *		Test a heap tuple to see if it satisfies a scan key.
- */
-static inline bool
-HeapKeyTest(HeapTuple tuple, TupleDesc tupdesc, int nkeys, ScanKey keys)
-{
-	int			cur_nkeys = nkeys;
-	ScanKey		cur_key = keys;
-
-	for (; cur_nkeys--; cur_key++)
-	{
-		Datum		atp;
-		bool		isnull;
-		Datum		test;
-
-		if (cur_key->sk_flags & SK_ISNULL)
-			return false;
-
-		atp = heap_getattr(tuple, cur_key->sk_attno, tupdesc, &isnull);
-
-		if (isnull)
-			return false;
-
-		test = FunctionCall2Coll(&cur_key->sk_func,
-								 cur_key->sk_collation,
-								 atp, cur_key->sk_argument);
-
-		if (!DatumGetBool(test))
-			return false;
-	}
-
-	return true;
-}
-
-#endif							/* VALID_H */
diff --git a/src/test/regress/expected/qual_pushdown.out b/src/test/regress/expected/qual_pushdown.out
index 5b43553c945..965102b146c 100644
--- a/src/test/regress/expected/qual_pushdown.out
+++ b/src/test/regress/expected/qual_pushdown.out
@@ -92,6 +92,43 @@ EXPLAIN (ANALYZE, COSTS off, TIMING off, SUMMARY off, BUFFERS off) SELECT ii FRO
    Rows Removed In Executor by Filter: 999
 (3 rows)
 
+EXPLAIN (ANALYZE, COSTS off, TIMING off, SUMMARY off, BUFFERS off) SELECT ii FROM qa WHERE i = ANY('{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}'::INT[]);
+                        QUERY PLAN                         
+-----------------------------------------------------------
+ Seq Scan on qa (actual rows=10.00 loops=1)
+   Filter: (i = ANY ('{1,2,3,4,5,6,7,8,9,10}'::integer[]))
+   Rows Removed In Executor by Filter: 990
+(3 rows)
+
+EXPLAIN (ANALYZE, COSTS off, TIMING off, SUMMARY off, BUFFERS off) SELECT ii FROM qa WHERE i = ANY('{1, 2}'::INT[]);
+                QUERY PLAN                 
+-------------------------------------------
+ Seq Scan on qa (actual rows=2.00 loops=1)
+   Filter: (i = ANY ('{1,2}'::integer[]))
+   Rows Removed In Executor by Filter: 998
+(3 rows)
+
+EXPLAIN (ANALYZE, COSTS off, TIMING off, SUMMARY off, BUFFERS off) SELECT ii FROM qa WHERE NOT (i <> ALL('{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}'::INT[]));
+                        QUERY PLAN                         
+-----------------------------------------------------------
+ Seq Scan on qa (actual rows=10.00 loops=1)
+   Filter: (i = ANY ('{1,2,3,4,5,6,7,8,9,10}'::integer[]))
+   Rows Removed In Executor by Filter: 990
+(3 rows)
+
+EXPLAIN (ANALYZE, COSTS off, TIMING off, SUMMARY off, BUFFERS off) SELECT ii FROM qa WHERE i = ANY((SELECT array_agg(j) FROM qb WHERE j > 50 AND j <= 60)::int[]);
+                        QUERY PLAN                        
+----------------------------------------------------------
+ Seq Scan on qa (actual rows=10.00 loops=1)
+   Filter: (i = ANY ((InitPlan 1).col1))
+   Rows Removed In Executor by Filter: 990
+   InitPlan 1
+     ->  Aggregate (actual rows=1.00 loops=1)
+           ->  Seq Scan on qb (actual rows=10.00 loops=1)
+                 Filter: ((j > 50) AND (j <= 60))
+                 Rows Removed In Executor by Filter: 990
+(8 rows)
+
 -- Enable quals push down
 ALTER TABLE qa SET (quals_push_down=on);
 ALTER TABLE qb SET (quals_push_down=on);
@@ -249,5 +286,94 @@ SELECT ii FROM qa WHERE i = ii AND ii < 10;
   1
 (1 row)
 
+EXPLAIN (ANALYZE, COSTS off, TIMING off, SUMMARY off, BUFFERS off) SELECT ii FROM qa WHERE i = ANY('{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}'::INT[]);
+                        QUERY PLAN                         
+-----------------------------------------------------------
+ Seq Scan on qa (actual rows=10.00 loops=1)
+   Filter: (i = ANY ('{1,2,3,4,5,6,7,8,9,10}'::integer[]))
+   Rows Removed In Table AM by Filter: 990
+(3 rows)
+
+SELECT ii FROM qa WHERE i = ANY('{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}'::INT[]);
+ ii  
+-----
+   1
+   4
+   9
+  16
+  25
+  36
+  49
+  64
+  81
+ 100
+(10 rows)
+
+EXPLAIN (ANALYZE, COSTS off, TIMING off, SUMMARY off, BUFFERS off) SELECT ii FROM qa WHERE i = ANY('{1, 2}'::INT[]);
+                QUERY PLAN                 
+-------------------------------------------
+ Seq Scan on qa (actual rows=2.00 loops=1)
+   Filter: (i = ANY ('{1,2}'::integer[]))
+   Rows Removed In Table AM by Filter: 998
+(3 rows)
+
+SELECT ii FROM qa WHERE i = ANY('{1, 2}'::INT[]);
+ ii 
+----
+  1
+  4
+(2 rows)
+
+EXPLAIN (ANALYZE, COSTS off, TIMING off, SUMMARY off, BUFFERS off) SELECT ii FROM qa WHERE NOT (i <> ALL('{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}'::INT[]));
+                        QUERY PLAN                         
+-----------------------------------------------------------
+ Seq Scan on qa (actual rows=10.00 loops=1)
+   Filter: (i = ANY ('{1,2,3,4,5,6,7,8,9,10}'::integer[]))
+   Rows Removed In Table AM by Filter: 990
+(3 rows)
+
+SELECT ii FROM qa WHERE NOT (i <> ALL('{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}'::INT[]));
+ ii  
+-----
+   1
+   4
+   9
+  16
+  25
+  36
+  49
+  64
+  81
+ 100
+(10 rows)
+
+EXPLAIN (ANALYZE, COSTS off, TIMING off, SUMMARY off, BUFFERS off) SELECT ii FROM qa WHERE i = ANY((SELECT array_agg(j) FROM qb WHERE j > 50 AND j <= 60)::INT[]);
+                        QUERY PLAN                        
+----------------------------------------------------------
+ Seq Scan on qa (actual rows=10.00 loops=1)
+   Filter: (i = ANY ((InitPlan 1).col1))
+   Rows Removed In Table AM by Filter: 990
+   InitPlan 1
+     ->  Aggregate (actual rows=1.00 loops=1)
+           ->  Seq Scan on qb (actual rows=10.00 loops=1)
+                 Filter: ((j > 50) AND (j <= 60))
+                 Rows Removed In Table AM by Filter: 990
+(8 rows)
+
+SELECT ii FROM qa WHERE i = ANY((SELECT array_agg(j) FROM qb WHERE j > 50 AND j <= 60)::INT[]);
+  ii  
+------
+ 2601
+ 2704
+ 2809
+ 2916
+ 3025
+ 3136
+ 3249
+ 3364
+ 3481
+ 3600
+(10 rows)
+
 DROP TABLE IF EXISTS qa;
 DROP TABLE IF EXISTS qb;
diff --git a/src/test/regress/sql/qual_pushdown.sql b/src/test/regress/sql/qual_pushdown.sql
index 0f0410cd1d5..38e88a50c33 100644
--- a/src/test/regress/sql/qual_pushdown.sql
+++ b/src/test/regress/sql/qual_pushdown.sql
@@ -19,6 +19,10 @@ EXPLAIN (ANALYZE, COSTS off, TIMING off, SUMMARY off, BUFFERS off) SELECT ii FRO
 EXPLAIN (ANALYZE, COSTS off, TIMING off, SUMMARY off, BUFFERS off) SELECT ii FROM qa WHERE i = (SELECT SQRT(j)::INT FROM qb WHERE j = 100);
 EXPLAIN (ANALYZE, COSTS off, TIMING off, SUMMARY off, BUFFERS off) SELECT ii FROM qa JOIN qb ON (qa.i = qb.j) WHERE j = 100;
 EXPLAIN (ANALYZE, COSTS off, TIMING off, SUMMARY off, BUFFERS off) SELECT ii FROM qa WHERE i = ii AND ii < 10;
+EXPLAIN (ANALYZE, COSTS off, TIMING off, SUMMARY off, BUFFERS off) SELECT ii FROM qa WHERE i = ANY('{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}'::INT[]);
+EXPLAIN (ANALYZE, COSTS off, TIMING off, SUMMARY off, BUFFERS off) SELECT ii FROM qa WHERE i = ANY('{1, 2}'::INT[]);
+EXPLAIN (ANALYZE, COSTS off, TIMING off, SUMMARY off, BUFFERS off) SELECT ii FROM qa WHERE NOT (i <> ALL('{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}'::INT[]));
+EXPLAIN (ANALYZE, COSTS off, TIMING off, SUMMARY off, BUFFERS off) SELECT ii FROM qa WHERE i = ANY((SELECT array_agg(j) FROM qb WHERE j > 50 AND j <= 60)::int[]);
 
 -- Enable quals push down
 ALTER TABLE qa SET (quals_push_down=on);
@@ -43,6 +47,14 @@ EXPLAIN (ANALYZE, COSTS off, TIMING off, SUMMARY off, BUFFERS off) SELECT ii FRO
 SELECT ii FROM qa JOIN qb ON (qa.i = qb.j) WHERE j = 100;
 EXPLAIN (ANALYZE, COSTS off, TIMING off, SUMMARY off, BUFFERS off) SELECT ii FROM qa WHERE i = ii AND ii < 10;
 SELECT ii FROM qa WHERE i = ii AND ii < 10;
+EXPLAIN (ANALYZE, COSTS off, TIMING off, SUMMARY off, BUFFERS off) SELECT ii FROM qa WHERE i = ANY('{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}'::INT[]);
+SELECT ii FROM qa WHERE i = ANY('{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}'::INT[]);
+EXPLAIN (ANALYZE, COSTS off, TIMING off, SUMMARY off, BUFFERS off) SELECT ii FROM qa WHERE i = ANY('{1, 2}'::INT[]);
+SELECT ii FROM qa WHERE i = ANY('{1, 2}'::INT[]);
+EXPLAIN (ANALYZE, COSTS off, TIMING off, SUMMARY off, BUFFERS off) SELECT ii FROM qa WHERE NOT (i <> ALL('{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}'::INT[]));
+SELECT ii FROM qa WHERE NOT (i <> ALL('{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}'::INT[]));
+EXPLAIN (ANALYZE, COSTS off, TIMING off, SUMMARY off, BUFFERS off) SELECT ii FROM qa WHERE i = ANY((SELECT array_agg(j) FROM qb WHERE j > 50 AND j <= 60)::INT[]);
+SELECT ii FROM qa WHERE i = ANY((SELECT array_agg(j) FROM qb WHERE j > 50 AND j <= 60)::INT[]);
 
 DROP TABLE IF EXISTS qa;
 DROP TABLE IF EXISTS qb;
-- 
2.39.5

