From a040da89f37965891587f029c107c355168e43ec Mon Sep 17 00:00:00 2001
From: Tomas Vondra <tomas.vondra@postgresql.org>
Date: Sat, 12 Sep 2020 15:07:04 +0200
Subject: [PATCH 1/6] Pass all scan keys to BRIN consistent function at once

Passing all scan keys to the BRIN consistent function at once may allow
elimination of additional ranges, which would be impossible when only
passing individual scan keys.

The code continues to support both the original (one scan key at a time)
and new (all scan keys at once) approaches, depending on whether the
consistent function accepts three or four arguments.

This modifies the existing BRIN opclasses (minmax, inclusion) although
those don't really benefit from this change. The primary purpose of this
is to allow more advanced opclases in the future.

Author: Tomas Vondra <tomas.vondra@2ndquadrant.com>
Reviewed-by: Alvaro Herrera <alvherre@2ndquadrant.com>
Reviewed-by: Mark Dilger <hornschnorter@gmail.com>
Reviewed-by: Alexander Korotkov <aekorotkov@gmail.com>
Discussion: https://postgr.es/m/c1138ead-7668-f0e1-0638-c3be3237e812@2ndquadrant.com
---
 src/backend/access/brin/brin.c           | 158 ++++++++++++++++++-----
 src/backend/access/brin/brin_inclusion.c | 140 ++++++++++++++------
 src/backend/access/brin/brin_minmax.c    |  92 ++++++++++---
 src/backend/access/brin/brin_validate.c  |   4 +-
 src/include/catalog/pg_proc.dat          |   4 +-
 5 files changed, 303 insertions(+), 95 deletions(-)

diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index 27ba596c6e..963b7079cf 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -390,6 +390,9 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
 	BrinMemTuple *dtup;
 	BrinTuple  *btup = NULL;
 	Size		btupsz = 0;
+	ScanKey   **keys;
+	int		   *nkeys;
+	int			keyno;
 
 	opaque = (BrinOpaque *) scan->opaque;
 	bdesc = opaque->bo_bdesc;
@@ -411,6 +414,66 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
 	 */
 	consistentFn = palloc0(sizeof(FmgrInfo) * bdesc->bd_tupdesc->natts);
 
+	/*
+	 * Make room for per-attribute lists of scan keys that we'll pass to the
+	 * consistent support procedure. We allocate space for all attributes, so
+	 * that we don't have to bother determining which attributes are used.
+	 *
+	 * XXX The widest table can have ~1600 attributes, so this may allocate a
+	 * couple kilobytes of memory). We could invent a more compact approach
+	 * (with just space for used attributes) but that would make the matching
+	 * more complicated, so it may not be a win.
+	 */
+	keys = palloc0(sizeof(ScanKey *) * bdesc->bd_tupdesc->natts);
+	nkeys = palloc0(sizeof(int) * bdesc->bd_tupdesc->natts);
+
+	/*
+	 * Preprocess the scan keys - split them into per-attribute arrays.
+	 */
+	for (keyno = 0; keyno < scan->numberOfKeys; keyno++)
+	{
+		ScanKey		key = &scan->keyData[keyno];
+		AttrNumber	keyattno = key->sk_attno;
+
+		/*
+		 * The collation of the scan key must match the collation used in the
+		 * index column (but only if the search is not IS NULL/ IS NOT NULL).
+		 * Otherwise we shouldn't be using this index ...
+		 */
+		Assert((key->sk_flags & SK_ISNULL) ||
+			   (key->sk_collation ==
+				TupleDescAttr(bdesc->bd_tupdesc,
+							  keyattno - 1)->attcollation));
+
+		/* First time we see this index attribute, so init as needed. */
+		if (!keys[keyattno - 1])
+		{
+			FmgrInfo   *tmp;
+
+			/*
+			 * This is a bit of an overkill - we don't know how many scan keys
+			 * are there for this attribute, so we simply allocate the largest
+			 * number possible. This may waste a bit of memory, but we only
+			 * expect small number of scan keys in general, so this should be
+			 * negligible, and it's cheaper than having to repalloc
+			 * repeatedly.
+			 */
+			keys[keyattno - 1] = palloc0(sizeof(ScanKey) * scan->numberOfKeys);
+
+			/* First time this column, so look up consistent function */
+			Assert(consistentFn[keyattno - 1].fn_oid == InvalidOid);
+
+			tmp = index_getprocinfo(idxRel, keyattno,
+									BRIN_PROCNUM_CONSISTENT);
+			fmgr_info_copy(&consistentFn[keyattno - 1], tmp,
+						   CurrentMemoryContext);
+		}
+
+		/* Add key to the per-attribute array. */
+		keys[keyattno - 1][nkeys[keyattno - 1]] = key;
+		nkeys[keyattno - 1]++;
+	}
+
 	/* allocate an initial in-memory tuple, out of the per-range memcxt */
 	dtup = brin_new_memtuple(bdesc);
 
@@ -471,7 +534,7 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
 			}
 			else
 			{
-				int			keyno;
+				int			attno;
 
 				/*
 				 * Compare scan keys with summary values stored for the range.
@@ -481,51 +544,78 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
 				 * no keys.
 				 */
 				addrange = true;
-				for (keyno = 0; keyno < scan->numberOfKeys; keyno++)
+				for (attno = 1; attno <= bdesc->bd_tupdesc->natts; attno++)
 				{
-					ScanKey		key = &scan->keyData[keyno];
-					AttrNumber	keyattno = key->sk_attno;
-					BrinValues *bval = &dtup->bt_columns[keyattno - 1];
+					BrinValues *bval;
 					Datum		add;
 
-					/*
-					 * The collation of the scan key must match the collation
-					 * used in the index column (but only if the search is not
-					 * IS NULL/ IS NOT NULL).  Otherwise we shouldn't be using
-					 * this index ...
-					 */
-					Assert((key->sk_flags & SK_ISNULL) ||
-						   (key->sk_collation ==
-							TupleDescAttr(bdesc->bd_tupdesc,
-										  keyattno - 1)->attcollation));
+					/* skip attributes without any san keys */
+					if (nkeys[attno - 1] == 0)
+						continue;
 
-					/* First time this column? look up consistent function */
-					if (consistentFn[keyattno - 1].fn_oid == InvalidOid)
-					{
-						FmgrInfo   *tmp;
+					bval = &dtup->bt_columns[attno - 1];
 
-						tmp = index_getprocinfo(idxRel, keyattno,
-												BRIN_PROCNUM_CONSISTENT);
-						fmgr_info_copy(&consistentFn[keyattno - 1], tmp,
-									   CurrentMemoryContext);
-					}
+					Assert((nkeys[attno - 1] > 0) &&
+						   (nkeys[attno - 1] <= scan->numberOfKeys));
 
 					/*
 					 * Check whether the scan key is consistent with the page
 					 * range values; if so, have the pages in the range added
 					 * to the output bitmap.
 					 *
-					 * When there are multiple scan keys, failure to meet the
-					 * criteria for a single one of them is enough to discard
-					 * the range as a whole, so break out of the loop as soon
-					 * as a false return value is obtained.
+					 * The opclass may or may not support processing of
+					 * multiple scan keys. We can determine that based on the
+					 * number of arguments - functions with extra parameter
+					 * (number of scan keys) do support this, otherwise we
+					 * have to simply pass the scan keys one by one, as
+					 * before.
 					 */
-					add = FunctionCall3Coll(&consistentFn[keyattno - 1],
-											key->sk_collation,
-											PointerGetDatum(bdesc),
-											PointerGetDatum(bval),
-											PointerGetDatum(key));
-					addrange = DatumGetBool(add);
+					if (consistentFn[attno - 1].fn_nargs >= 4)
+					{
+						Oid			collation;
+
+						/*
+						 * Collation from the first key (has to be the same
+						 * for all keys for the same attribue).
+						 */
+						collation = keys[attno - 1][0]->sk_collation;
+
+						/* Check all keys at once */
+						add = FunctionCall4Coll(&consistentFn[attno - 1],
+												collation,
+												PointerGetDatum(bdesc),
+												PointerGetDatum(bval),
+												PointerGetDatum(keys[attno - 1]),
+												Int32GetDatum(nkeys[attno - 1]));
+						addrange = DatumGetBool(add);
+					}
+					else
+					{
+						/*
+						 * Check keys one by one
+						 *
+						 * When there are multiple scan keys, failure to meet
+						 * the criteria for a single one of them is enough to
+						 * discard the range as a whole, so break out of the
+						 * loop as soon as a false return value is obtained.
+						 */
+						int			keyno;
+
+						for (keyno = 0; keyno < nkeys[attno - 1]; keyno++)
+						{
+							add = FunctionCall3Coll(&consistentFn[attno - 1],
+													keys[attno - 1][keyno]->sk_collation,
+													PointerGetDatum(bdesc),
+													PointerGetDatum(bval),
+													PointerGetDatum(keys[attno - 1][keyno]));
+							addrange = DatumGetBool(add);
+
+							/* mismatching key, no need to look further  */
+							if (!addrange)
+								break;
+						}
+					}
+
 					if (!addrange)
 						break;
 				}
diff --git a/src/backend/access/brin/brin_inclusion.c b/src/backend/access/brin/brin_inclusion.c
index 12e5bddd1f..a260074c91 100644
--- a/src/backend/access/brin/brin_inclusion.c
+++ b/src/backend/access/brin/brin_inclusion.c
@@ -85,6 +85,8 @@ static FmgrInfo *inclusion_get_procinfo(BrinDesc *bdesc, uint16 attno,
 										uint16 procnum);
 static FmgrInfo *inclusion_get_strategy_procinfo(BrinDesc *bdesc, uint16 attno,
 												 Oid subtype, uint16 strategynum);
+static bool inclusion_consistent_key(BrinDesc *bdesc, BrinValues *column,
+									 ScanKey key, Oid colloid);
 
 
 /*
@@ -251,6 +253,10 @@ brin_inclusion_add_value(PG_FUNCTION_ARGS)
 /*
  * BRIN inclusion consistent function
  *
+ * We inspect the IS NULL scan keys first, which allows us to make a decision
+ * without looking at the contents of the page range. Only when the page range
+ * matches all those keys, we check the regular scan keys.
+ *
  * All of the strategies are optional.
  */
 Datum
@@ -258,24 +264,31 @@ brin_inclusion_consistent(PG_FUNCTION_ARGS)
 {
 	BrinDesc   *bdesc = (BrinDesc *) PG_GETARG_POINTER(0);
 	BrinValues *column = (BrinValues *) PG_GETARG_POINTER(1);
-	ScanKey		key = (ScanKey) PG_GETARG_POINTER(2);
-	Oid			colloid = PG_GET_COLLATION(),
-				subtype;
-	Datum		unionval;
-	AttrNumber	attno;
-	Datum		query;
-	FmgrInfo   *finfo;
-	Datum		result;
-
-	Assert(key->sk_attno == column->bv_attno);
+	ScanKey    *keys = (ScanKey *) PG_GETARG_POINTER(2);
+	int			nkeys = PG_GETARG_INT32(3);
+	Oid			colloid = PG_GET_COLLATION();
+	int			keyno;
+	bool		has_regular_keys = false;
 
-	/* Handle IS NULL/IS NOT NULL tests. */
-	if (key->sk_flags & SK_ISNULL)
+	/* Handle IS NULL/IS NOT NULL tests */
+	for (keyno = 0; keyno < nkeys; keyno++)
 	{
+		ScanKey		key = keys[keyno];
+
+		Assert(key->sk_attno == column->bv_attno);
+
+		/* Skip regular scan keys (and remember that we have some). */
+		if ((!key->sk_flags & SK_ISNULL))
+		{
+			has_regular_keys = true;
+			continue;
+		}
+
 		if (key->sk_flags & SK_SEARCHNULL)
 		{
 			if (column->bv_allnulls || column->bv_hasnulls)
-				PG_RETURN_BOOL(true);
+				continue;		/* this key is fine, continue */
+
 			PG_RETURN_BOOL(false);
 		}
 
@@ -284,7 +297,12 @@ brin_inclusion_consistent(PG_FUNCTION_ARGS)
 		 * only nulls.
 		 */
 		if (key->sk_flags & SK_SEARCHNOTNULL)
-			PG_RETURN_BOOL(!column->bv_allnulls);
+		{
+			if (column->bv_allnulls)
+				PG_RETURN_BOOL(false);
+
+			continue;
+		}
 
 		/*
 		 * Neither IS NULL nor IS NOT NULL was used; assume all indexable
@@ -293,7 +311,14 @@ brin_inclusion_consistent(PG_FUNCTION_ARGS)
 		PG_RETURN_BOOL(false);
 	}
 
-	/* If it is all nulls, it cannot possibly be consistent. */
+	/* If there are no regular keys, the page range is considered consistent. */
+	if (!has_regular_keys)
+		PG_RETURN_BOOL(true);
+
+	/*
+	 * If is all nulls, it cannot possibly be consistent (at this point we
+	 * know there are at least some regular scan keys).
+	 */
 	if (column->bv_allnulls)
 		PG_RETURN_BOOL(false);
 
@@ -301,10 +326,45 @@ brin_inclusion_consistent(PG_FUNCTION_ARGS)
 	if (DatumGetBool(column->bv_values[INCLUSION_UNMERGEABLE]))
 		PG_RETURN_BOOL(true);
 
-	attno = key->sk_attno;
-	subtype = key->sk_subtype;
-	query = key->sk_argument;
-	unionval = column->bv_values[INCLUSION_UNION];
+	/* Check that the range is consistent with all regular scan keys. */
+	for (keyno = 0; keyno < nkeys; keyno++)
+	{
+		ScanKey		key = keys[keyno];
+
+		/* Skip IS NULL/IS NOT NULL keys (already handled above). */
+		if (key->sk_flags & SK_ISNULL)
+			continue;
+
+		/*
+		 * When there are multiple scan keys, failure to meet the criteria for
+		 * a single one of them is enough to discard the range as a whole, so
+		 * break out of the loop as soon as a false return value is obtained.
+		 */
+		if (!inclusion_consistent_key(bdesc, column, key, colloid))
+			PG_RETURN_BOOL(false);
+	}
+
+	PG_RETURN_BOOL(true);
+}
+
+/*
+ * inclusion_consistent_key
+ *		Determine if the range is consistent with a single scan key.
+ */
+static bool
+inclusion_consistent_key(BrinDesc *bdesc, BrinValues *column, ScanKey key,
+						 Oid colloid)
+{
+	FmgrInfo   *finfo;
+	AttrNumber	attno = key->sk_attno;
+	Oid			subtype = key->sk_subtype;
+	Datum		query = key->sk_argument;
+	Datum		unionval = column->bv_values[INCLUSION_UNION];
+	Datum		result;
+
+	/* This should be called only for regular keys, not for IS [NOT] NULL. */
+	Assert(!(key->sk_flags & SK_ISNULL));
+
 	switch (key->sk_strategy)
 	{
 			/*
@@ -324,49 +384,49 @@ brin_inclusion_consistent(PG_FUNCTION_ARGS)
 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
 													RTOverRightStrategyNumber);
 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
-			PG_RETURN_BOOL(!DatumGetBool(result));
+			return !DatumGetBool(result);
 
 		case RTOverLeftStrategyNumber:
 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
 													RTRightStrategyNumber);
 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
-			PG_RETURN_BOOL(!DatumGetBool(result));
+			return !DatumGetBool(result);
 
 		case RTOverRightStrategyNumber:
 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
 													RTLeftStrategyNumber);
 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
-			PG_RETURN_BOOL(!DatumGetBool(result));
+			return !DatumGetBool(result);
 
 		case RTRightStrategyNumber:
 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
 													RTOverLeftStrategyNumber);
 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
-			PG_RETURN_BOOL(!DatumGetBool(result));
+			return !DatumGetBool(result);
 
 		case RTBelowStrategyNumber:
 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
 													RTOverAboveStrategyNumber);
 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
-			PG_RETURN_BOOL(!DatumGetBool(result));
+			return !DatumGetBool(result);
 
 		case RTOverBelowStrategyNumber:
 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
 													RTAboveStrategyNumber);
 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
-			PG_RETURN_BOOL(!DatumGetBool(result));
+			return !DatumGetBool(result);
 
 		case RTOverAboveStrategyNumber:
 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
 													RTBelowStrategyNumber);
 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
-			PG_RETURN_BOOL(!DatumGetBool(result));
+			return !DatumGetBool(result);
 
 		case RTAboveStrategyNumber:
 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
 													RTOverBelowStrategyNumber);
 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
-			PG_RETURN_BOOL(!DatumGetBool(result));
+			return !DatumGetBool(result);
 
 			/*
 			 * Overlap and contains strategies
@@ -384,7 +444,7 @@ brin_inclusion_consistent(PG_FUNCTION_ARGS)
 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
 													key->sk_strategy);
 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
-			PG_RETURN_DATUM(result);
+			return DatumGetBool(result);
 
 			/*
 			 * Contained by strategies
@@ -404,9 +464,9 @@ brin_inclusion_consistent(PG_FUNCTION_ARGS)
 													RTOverlapStrategyNumber);
 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
 			if (DatumGetBool(result))
-				PG_RETURN_BOOL(true);
+				return true;
 
-			PG_RETURN_DATUM(column->bv_values[INCLUSION_CONTAINS_EMPTY]);
+			return DatumGetBool(column->bv_values[INCLUSION_CONTAINS_EMPTY]);
 
 			/*
 			 * Adjacent strategy
@@ -423,12 +483,12 @@ brin_inclusion_consistent(PG_FUNCTION_ARGS)
 													RTOverlapStrategyNumber);
 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
 			if (DatumGetBool(result))
-				PG_RETURN_BOOL(true);
+				return true;
 
 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
 													RTAdjacentStrategyNumber);
 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
-			PG_RETURN_DATUM(result);
+			return DatumGetBool(result);
 
 			/*
 			 * Basic comparison strategies
@@ -458,9 +518,9 @@ brin_inclusion_consistent(PG_FUNCTION_ARGS)
 													RTRightStrategyNumber);
 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
 			if (!DatumGetBool(result))
-				PG_RETURN_BOOL(true);
+				return true;
 
-			PG_RETURN_DATUM(column->bv_values[INCLUSION_CONTAINS_EMPTY]);
+			return DatumGetBool(column->bv_values[INCLUSION_CONTAINS_EMPTY]);
 
 		case RTSameStrategyNumber:
 		case RTEqualStrategyNumber:
@@ -468,30 +528,30 @@ brin_inclusion_consistent(PG_FUNCTION_ARGS)
 													RTContainsStrategyNumber);
 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
 			if (DatumGetBool(result))
-				PG_RETURN_BOOL(true);
+				return true;
 
-			PG_RETURN_DATUM(column->bv_values[INCLUSION_CONTAINS_EMPTY]);
+			return DatumGetBool(column->bv_values[INCLUSION_CONTAINS_EMPTY]);
 
 		case RTGreaterEqualStrategyNumber:
 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
 													RTLeftStrategyNumber);
 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
 			if (!DatumGetBool(result))
-				PG_RETURN_BOOL(true);
+				return true;
 
-			PG_RETURN_DATUM(column->bv_values[INCLUSION_CONTAINS_EMPTY]);
+			return DatumGetBool(column->bv_values[INCLUSION_CONTAINS_EMPTY]);
 
 		case RTGreaterStrategyNumber:
 			/* no need to check for empty elements */
 			finfo = inclusion_get_strategy_procinfo(bdesc, attno, subtype,
 													RTLeftStrategyNumber);
 			result = FunctionCall2Coll(finfo, colloid, unionval, query);
-			PG_RETURN_BOOL(!DatumGetBool(result));
+			return !DatumGetBool(result);
 
 		default:
 			/* shouldn't happen */
 			elog(ERROR, "invalid strategy number %d", key->sk_strategy);
-			PG_RETURN_BOOL(false);
+			return false;
 	}
 }
 
diff --git a/src/backend/access/brin/brin_minmax.c b/src/backend/access/brin/brin_minmax.c
index 2ffbd9bf0d..e116084a02 100644
--- a/src/backend/access/brin/brin_minmax.c
+++ b/src/backend/access/brin/brin_minmax.c
@@ -30,6 +30,8 @@ typedef struct MinmaxOpaque
 
 static FmgrInfo *minmax_get_strategy_procinfo(BrinDesc *bdesc, uint16 attno,
 											  Oid subtype, uint16 strategynum);
+static bool minmax_consistent_key(BrinDesc *bdesc, BrinValues *column,
+								  ScanKey key, Oid colloid);
 
 
 Datum
@@ -140,29 +142,41 @@ brin_minmax_add_value(PG_FUNCTION_ARGS)
  * Given an index tuple corresponding to a certain page range and a scan key,
  * return whether the scan key is consistent with the index tuple's min/max
  * values.  Return true if so, false otherwise.
+ *
+ * We inspect the IS NULL scan keys first, which allows us to make a decision
+ * without looking at the contents of the page range. Only when the page range
+ * matches all those keys, we check the regular scan keys.
  */
 Datum
 brin_minmax_consistent(PG_FUNCTION_ARGS)
 {
 	BrinDesc   *bdesc = (BrinDesc *) PG_GETARG_POINTER(0);
 	BrinValues *column = (BrinValues *) PG_GETARG_POINTER(1);
-	ScanKey		key = (ScanKey) PG_GETARG_POINTER(2);
-	Oid			colloid = PG_GET_COLLATION(),
-				subtype;
-	AttrNumber	attno;
-	Datum		value;
-	Datum		matches;
-	FmgrInfo   *finfo;
-
-	Assert(key->sk_attno == column->bv_attno);
+	ScanKey    *keys = (ScanKey *) PG_GETARG_POINTER(2);
+	int			nkeys = PG_GETARG_INT32(3);
+	Oid			colloid = PG_GET_COLLATION();
+	int			keyno;
+	bool		has_regular_keys = false;
 
 	/* handle IS NULL/IS NOT NULL tests */
-	if (key->sk_flags & SK_ISNULL)
+	for (keyno = 0; keyno < nkeys; keyno++)
 	{
+		ScanKey		key = keys[keyno];
+
+		Assert(key->sk_attno == column->bv_attno);
+
+		/* Skip regular scan keys (and remember that we have some). */
+		if ((!key->sk_flags & SK_ISNULL))
+		{
+			has_regular_keys = true;
+			continue;
+		}
+
 		if (key->sk_flags & SK_SEARCHNULL)
 		{
 			if (column->bv_allnulls || column->bv_hasnulls)
-				PG_RETURN_BOOL(true);
+				continue;		/* this key is fine, continue */
+
 			PG_RETURN_BOOL(false);
 		}
 
@@ -171,7 +185,12 @@ brin_minmax_consistent(PG_FUNCTION_ARGS)
 		 * only nulls.
 		 */
 		if (key->sk_flags & SK_SEARCHNOTNULL)
-			PG_RETURN_BOOL(!column->bv_allnulls);
+		{
+			if (column->bv_allnulls)
+				PG_RETURN_BOOL(false);
+
+			continue;
+		}
 
 		/*
 		 * Neither IS NULL nor IS NOT NULL was used; assume all indexable
@@ -180,13 +199,52 @@ brin_minmax_consistent(PG_FUNCTION_ARGS)
 		PG_RETURN_BOOL(false);
 	}
 
-	/* if the range is all empty, it cannot possibly be consistent */
+	/* If there are no regular keys, the page range is considered consistent. */
+	if (!has_regular_keys)
+		PG_RETURN_BOOL(true);
+
+	/*
+	 * If is all nulls, it cannot possibly be consistent (at this point we
+	 * know there are at least some regular scan keys).
+	 */
 	if (column->bv_allnulls)
 		PG_RETURN_BOOL(false);
 
-	attno = key->sk_attno;
-	subtype = key->sk_subtype;
-	value = key->sk_argument;
+	/* Check that the range is consistent with all scan keys. */
+	for (keyno = 0; keyno < nkeys; keyno++)
+	{
+		ScanKey		key = keys[keyno];
+
+		/* ignore IS NULL/IS NOT NULL tests handled above */
+		if (key->sk_flags & SK_ISNULL)
+			continue;
+
+		/*
+		 * When there are multiple scan keys, failure to meet the criteria for
+		 * a single one of them is enough to discard the range as a whole, so
+		 * break out of the loop as soon as a false return value is obtained.
+		 */
+		if (!minmax_consistent_key(bdesc, column, key, colloid))
+			PG_RETURN_DATUM(false);;
+	}
+
+	PG_RETURN_DATUM(true);
+}
+
+/*
+ * minmax_consistent_key
+ *		Determine if the range is consistent with a single scan key.
+ */
+static bool
+minmax_consistent_key(BrinDesc *bdesc, BrinValues *column, ScanKey key,
+					  Oid colloid)
+{
+	FmgrInfo   *finfo;
+	AttrNumber	attno = key->sk_attno;
+	Oid			subtype = key->sk_subtype;
+	Datum		value = key->sk_argument;
+	Datum		matches;
+
 	switch (key->sk_strategy)
 	{
 		case BTLessStrategyNumber:
@@ -229,7 +287,7 @@ brin_minmax_consistent(PG_FUNCTION_ARGS)
 			break;
 	}
 
-	PG_RETURN_DATUM(matches);
+	return DatumGetBool(matches);
 }
 
 /*
diff --git a/src/backend/access/brin/brin_validate.c b/src/backend/access/brin/brin_validate.c
index 6d4253c05e..11835d85cd 100644
--- a/src/backend/access/brin/brin_validate.c
+++ b/src/backend/access/brin/brin_validate.c
@@ -97,8 +97,8 @@ brinvalidate(Oid opclassoid)
 				break;
 			case BRIN_PROCNUM_CONSISTENT:
 				ok = check_amproc_signature(procform->amproc, BOOLOID, true,
-											3, 3, INTERNALOID, INTERNALOID,
-											INTERNALOID);
+											3, 4, INTERNALOID, INTERNALOID,
+											INTERNALOID, INT4OID);
 				break;
 			case BRIN_PROCNUM_UNION:
 				ok = check_amproc_signature(procform->amproc, BOOLOID, true,
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 1487710d59..33841e14f2 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -8191,7 +8191,7 @@
   prosrc => 'brin_minmax_add_value' },
 { oid => '3385', descr => 'BRIN minmax support',
   proname => 'brin_minmax_consistent', prorettype => 'bool',
-  proargtypes => 'internal internal internal',
+  proargtypes => 'internal internal internal int4',
   prosrc => 'brin_minmax_consistent' },
 { oid => '3386', descr => 'BRIN minmax support',
   proname => 'brin_minmax_union', prorettype => 'bool',
@@ -8207,7 +8207,7 @@
   prosrc => 'brin_inclusion_add_value' },
 { oid => '4107', descr => 'BRIN inclusion support',
   proname => 'brin_inclusion_consistent', prorettype => 'bool',
-  proargtypes => 'internal internal internal',
+  proargtypes => 'internal internal internal int4',
   prosrc => 'brin_inclusion_consistent' },
 { oid => '4108', descr => 'BRIN inclusion support',
   proname => 'brin_inclusion_union', prorettype => 'bool',
-- 
2.26.2

