From 9ee3f931469fbe9f074098f59dac7431bdb760df Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=C3=81lvaro=20Herrera?= <alvherre@alvh.no-ip.org>
Date: Wed, 12 Feb 2025 11:55:06 +0100
Subject: [PATCH v1] BRIN: be stricter about required support procs

---
 src/backend/access/brin/brin_bloom.c        | 20 ++++++++++++---
 src/backend/access/brin/brin_inclusion.c    | 27 +++++++++++++--------
 src/backend/access/brin/brin_minmax_multi.c | 23 ++++++++++++++----
 3 files changed, 51 insertions(+), 19 deletions(-)

diff --git a/src/backend/access/brin/brin_bloom.c b/src/backend/access/brin/brin_bloom.c
index 40e1f303a0a..011562853cf 100644
--- a/src/backend/access/brin/brin_bloom.c
+++ b/src/backend/access/brin/brin_bloom.c
@@ -442,7 +442,7 @@ typedef struct BloomOpaque
 } BloomOpaque;
 
 static FmgrInfo *bloom_get_procinfo(BrinDesc *bdesc, uint16 attno,
-									uint16 procnum);
+									uint16 procnum, bool missing_ok);
 
 
 Datum
@@ -574,7 +574,7 @@ brin_bloom_add_value(PG_FUNCTION_ARGS)
 	 * Compute the hash of the new value, using the supplied hash function,
 	 * and then add the hash value to the bloom filter.
 	 */
-	hashFn = bloom_get_procinfo(bdesc, attno, PROCNUM_HASH);
+	hashFn = bloom_get_procinfo(bdesc, attno, PROCNUM_HASH, false);
 
 	hashValue = DatumGetUInt32(FunctionCall1Coll(hashFn, colloid, newval));
 
@@ -634,7 +634,7 @@ brin_bloom_consistent(PG_FUNCTION_ARGS)
 				 * We want to return the current page range if the bloom
 				 * filter seems to contain the value.
 				 */
-				finfo = bloom_get_procinfo(bdesc, attno, PROCNUM_HASH);
+				finfo = bloom_get_procinfo(bdesc, attno, PROCNUM_HASH, false);
 
 				hashValue = DatumGetUInt32(FunctionCall1Coll(finfo, colloid, value));
 				matches &= bloom_contains_value(filter, hashValue);
@@ -703,7 +703,7 @@ brin_bloom_union(PG_FUNCTION_ARGS)
  * or null if it does not exist.
  */
 static FmgrInfo *
-bloom_get_procinfo(BrinDesc *bdesc, uint16 attno, uint16 procnum)
+bloom_get_procinfo(BrinDesc *bdesc, uint16 attno, uint16 procnum, bool missing_ok)
 {
 	BloomOpaque *opaque;
 	uint16		basenum = procnum - PROCNUM_BASE;
@@ -732,6 +732,18 @@ bloom_get_procinfo(BrinDesc *bdesc, uint16 attno, uint16 procnum)
 		}
 		else
 		{
+			/*
+			 * XXX like minmax_multi_get_procinfo, it seems there are no cases
+			 * of optional support procs in brin_bloom.c
+			 */
+			if (!missing_ok)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+						 errmsg_internal("invalid opclass definition"),
+						 errdetail_internal("opclass %u is missing support function %d",
+											InvalidOid,		/* FIXME */
+											procnum)));
+
 			opaque->extra_proc_missing[basenum] = true;
 			return NULL;
 		}
diff --git a/src/backend/access/brin/brin_inclusion.c b/src/backend/access/brin/brin_inclusion.c
index ef2247347f9..5a6565a709a 100644
--- a/src/backend/access/brin/brin_inclusion.c
+++ b/src/backend/access/brin/brin_inclusion.c
@@ -82,7 +82,7 @@ typedef struct InclusionOpaque
 } InclusionOpaque;
 
 static FmgrInfo *inclusion_get_procinfo(BrinDesc *bdesc, uint16 attno,
-										uint16 procnum);
+										uint16 procnum, bool missing_ok);
 static FmgrInfo *inclusion_get_strategy_procinfo(BrinDesc *bdesc, uint16 attno,
 												 Oid subtype, uint16 strategynum);
 
@@ -179,7 +179,7 @@ brin_inclusion_add_value(PG_FUNCTION_ARGS)
 	 * new value for emptiness; if it returns true, we need to set the
 	 * "contains empty" flag in the element (unless already set).
 	 */
-	finfo = inclusion_get_procinfo(bdesc, attno, PROCNUM_EMPTY);
+	finfo = inclusion_get_procinfo(bdesc, attno, PROCNUM_EMPTY, true);
 	if (finfo != NULL && DatumGetBool(FunctionCall1Coll(finfo, colloid, newval)))
 	{
 		if (!DatumGetBool(column->bv_values[INCLUSION_CONTAINS_EMPTY]))
@@ -195,7 +195,7 @@ brin_inclusion_add_value(PG_FUNCTION_ARGS)
 		PG_RETURN_BOOL(true);
 
 	/* Check if the new value is already contained. */
-	finfo = inclusion_get_procinfo(bdesc, attno, PROCNUM_CONTAINS);
+	finfo = inclusion_get_procinfo(bdesc, attno, PROCNUM_CONTAINS, true);
 	if (finfo != NULL &&
 		DatumGetBool(FunctionCall2Coll(finfo, colloid,
 									   column->bv_values[INCLUSION_UNION],
@@ -210,7 +210,7 @@ brin_inclusion_add_value(PG_FUNCTION_ARGS)
 	 * it's not going to be used any longer.  However, the BRIN framework
 	 * doesn't allow for the value not being present.  Improve someday.
 	 */
-	finfo = inclusion_get_procinfo(bdesc, attno, PROCNUM_MERGEABLE);
+	finfo = inclusion_get_procinfo(bdesc, attno, PROCNUM_MERGEABLE, true);
 	if (finfo != NULL &&
 		!DatumGetBool(FunctionCall2Coll(finfo, colloid,
 										column->bv_values[INCLUSION_UNION],
@@ -221,8 +221,7 @@ brin_inclusion_add_value(PG_FUNCTION_ARGS)
 	}
 
 	/* Finally, merge the new value to the existing union. */
-	finfo = inclusion_get_procinfo(bdesc, attno, PROCNUM_MERGE);
-	Assert(finfo != NULL);
+	finfo = inclusion_get_procinfo(bdesc, attno, PROCNUM_MERGE, false);
 	result = FunctionCall2Coll(finfo, colloid,
 							   column->bv_values[INCLUSION_UNION], newval);
 	if (!attr->attbyval &&
@@ -506,7 +505,7 @@ brin_inclusion_union(PG_FUNCTION_ARGS)
 	}
 
 	/* Check if A and B are mergeable; if not, mark A unmergeable. */
-	finfo = inclusion_get_procinfo(bdesc, attno, PROCNUM_MERGEABLE);
+	finfo = inclusion_get_procinfo(bdesc, attno, PROCNUM_MERGEABLE, true);
 	if (finfo != NULL &&
 		!DatumGetBool(FunctionCall2Coll(finfo, colloid,
 										col_a->bv_values[INCLUSION_UNION],
@@ -517,8 +516,7 @@ brin_inclusion_union(PG_FUNCTION_ARGS)
 	}
 
 	/* Finally, merge B to A. */
-	finfo = inclusion_get_procinfo(bdesc, attno, PROCNUM_MERGE);
-	Assert(finfo != NULL);
+	finfo = inclusion_get_procinfo(bdesc, attno, PROCNUM_MERGE, false);
 	result = FunctionCall2Coll(finfo, colloid,
 							   col_a->bv_values[INCLUSION_UNION],
 							   col_b->bv_values[INCLUSION_UNION]);
@@ -542,7 +540,8 @@ brin_inclusion_union(PG_FUNCTION_ARGS)
  * or null if it is not exists.
  */
 static FmgrInfo *
-inclusion_get_procinfo(BrinDesc *bdesc, uint16 attno, uint16 procnum)
+inclusion_get_procinfo(BrinDesc *bdesc, uint16 attno, uint16 procnum,
+					   bool missing_ok)
 {
 	InclusionOpaque *opaque;
 	uint16		basenum = procnum - PROCNUM_BASE;
@@ -571,6 +570,14 @@ inclusion_get_procinfo(BrinDesc *bdesc, uint16 attno, uint16 procnum)
 		}
 		else
 		{
+			if (!missing_ok)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+						 errmsg_internal("invalid opclass definition"),
+						 errdetail_internal("opclass %u is missing support function %d",
+											InvalidOid,		/* FIXME */
+											procnum)));
+
 			opaque->extra_proc_missing[basenum] = true;
 			return NULL;
 		}
diff --git a/src/backend/access/brin/brin_minmax_multi.c b/src/backend/access/brin/brin_minmax_multi.c
index 88214720bff..245b72b58a0 100644
--- a/src/backend/access/brin/brin_minmax_multi.c
+++ b/src/backend/access/brin/brin_minmax_multi.c
@@ -256,7 +256,7 @@ typedef struct DistanceValue
 /* Cache for support and strategy procedures. */
 
 static FmgrInfo *minmax_multi_get_procinfo(BrinDesc *bdesc, uint16 attno,
-										   uint16 procnum);
+										   uint16 procnum, bool missing_ok);
 
 static FmgrInfo *minmax_multi_get_strategy_procinfo(BrinDesc *bdesc,
 													uint16 attno, Oid subtype,
@@ -1663,7 +1663,7 @@ ensure_free_space_in_buffer(BrinDesc *bdesc, Oid colloid,
 	AssertCheckExpandedRanges(bdesc, colloid, attno, attr, eranges, neranges);
 
 	/* and we'll also need the 'distance' procedure */
-	distanceFn = minmax_multi_get_procinfo(bdesc, attno, PROCNUM_DISTANCE);
+	distanceFn = minmax_multi_get_procinfo(bdesc, attno, PROCNUM_DISTANCE, false);
 
 	/* build array of gap distances and sort them in ascending order */
 	distances = build_distances(distanceFn, colloid, eranges, neranges);
@@ -1814,7 +1814,8 @@ compactify_ranges(BrinDesc *bdesc, Ranges *ranges, int max_values)
 											   BTLessStrategyNumber);
 
 	/* and we'll also need the 'distance' procedure */
-	distanceFn = minmax_multi_get_procinfo(bdesc, ranges->attno, PROCNUM_DISTANCE);
+	distanceFn = minmax_multi_get_procinfo(bdesc, ranges->attno, PROCNUM_DISTANCE,
+										   false);
 
 	/*
 	 * The distanceFn calls (which may internally call e.g. numeric_le) may
@@ -2820,7 +2821,7 @@ brin_minmax_multi_union(PG_FUNCTION_ARGS)
 	 */
 
 	/* build array of gap distances and sort them in ascending order */
-	distanceFn = minmax_multi_get_procinfo(bdesc, attno, PROCNUM_DISTANCE);
+	distanceFn = minmax_multi_get_procinfo(bdesc, attno, PROCNUM_DISTANCE, false);
 	distances = build_distances(distanceFn, colloid, eranges, neranges);
 
 	/*
@@ -2861,7 +2862,8 @@ brin_minmax_multi_union(PG_FUNCTION_ARGS)
  * or null if it does not exist.
  */
 static FmgrInfo *
-minmax_multi_get_procinfo(BrinDesc *bdesc, uint16 attno, uint16 procnum)
+minmax_multi_get_procinfo(BrinDesc *bdesc, uint16 attno, uint16 procnum,
+						  bool missing_ok)
 {
 	MinmaxMultiOpaque *opaque;
 	uint16		basenum = procnum - PROCNUM_BASE;
@@ -2890,6 +2892,17 @@ minmax_multi_get_procinfo(BrinDesc *bdesc, uint16 attno, uint16 procnum)
 		}
 		else
 		{
+			/*
+			 * FIXME it seems that in minmax_multi we don't have any cases
+			 * of optional support procs, so perhaps we should remove this.
+			 */
+			if (!missing_ok)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+						 errmsg_internal("invalid opclass definition"),
+						 errdetail_internal("opclass %u is missing support function %d",
+											InvalidOid,		/* FIXME */
+											procnum)));
 			opaque->extra_proc_missing[basenum] = true;
 			return NULL;
 		}
-- 
2.39.5

