diff --git a/src/backend/access/brin/brin_minmax_multi.c b/src/backend/access/brin/brin_minmax_multi.c index a581659fe2..99ef97206a 100644 --- a/src/backend/access/brin/brin_minmax_multi.c +++ b/src/backend/access/brin/brin_minmax_multi.c @@ -149,13 +149,16 @@ typedef struct MinMaxMultiOptions * single-point ranges. That is, we have (2*nranges + nvalues) boundary * values in the array. * - * +---------------------------------+-------------------------------+ - * | ranges (sorted pairs of values) | sorted values (single points) | - * +---------------------------------+-------------------------------+ + * +-------------------------+----------------------------------+ + * | ranges (2 * nranges of) | single point values (nvalues of) | + * +-------------------------+----------------------------------+ * * This allows us to quickly add new values, and store outliers without * making the other ranges very wide. * + * 'nsorted' denotes how many of 'nvalues' in the values[] array are sorted. + * When nsorted == nvalues, all single point values are sorted. + * * We never store more than maxvalues values (as set by values_per_range * reloption). If needed we merge some of the ranges. * @@ -173,10 +176,10 @@ typedef struct Ranges FmgrInfo *cmp; /* (2*nranges + nvalues) <= maxvalues */ - int nranges; /* number of ranges in the array (stored) */ - int nsorted; /* number of sorted values (ranges + points) */ - int nvalues; /* number of values in the data array (all) */ - int maxvalues; /* maximum number of values (reloption) */ + int nranges; /* number of ranges in the values[] array */ + int nsorted; /* number of nvalues which are sorted */ + int nvalues; /* number of point values in values[] */ + int maxvalues; /* number of elements in the values[] array */ /* * We simply add the values into a large buffer, without any expensive @@ -318,102 +321,99 @@ AssertCheckRanges(Ranges *ranges, FmgrInfo *cmpFn, Oid colloid) * Check that none of the values are not covered by ranges (both sorted * and unsorted) */ - for (i = 0; i < ranges->nvalues; i++) + if (ranges->nranges > 0) { - Datum compar; - int start, - end; - Datum minvalue, - maxvalue; - - Datum value = ranges->values[2 * ranges->nranges + i]; - - if (ranges->nranges == 0) - break; - - minvalue = ranges->values[0]; - maxvalue = ranges->values[2 * ranges->nranges - 1]; - - /* - * Is the value smaller than the minval? If yes, we'll recurse to the - * left side of range array. - */ - compar = FunctionCall2Coll(cmpFn, colloid, value, minvalue); - - /* smaller than the smallest value in the first range */ - if (DatumGetBool(compar)) - continue; - - /* - * Is the value greater than the maxval? If yes, we'll recurse to the - * right side of range array. - */ - compar = FunctionCall2Coll(cmpFn, colloid, maxvalue, value); - - /* larger than the largest value in the last range */ - if (DatumGetBool(compar)) - continue; - - start = 0; /* first range */ - end = ranges->nranges - 1; /* last range */ - while (true) + for (i = 0; i < ranges->nvalues; i++) { - int midpoint = (start + end) / 2; - - /* this means we ran out of ranges in the last step */ - if (start > end) - break; + Datum compar; + int start, + end; + Datum minvalue = ranges->values[0]; + Datum maxvalue = ranges->values[2 * ranges->nranges - 1]; + Datum value = ranges->values[2 * ranges->nranges + i]; - /* copy the min/max values from the ranges */ - minvalue = ranges->values[2 * midpoint]; - maxvalue = ranges->values[2 * midpoint + 1]; + compar = FunctionCall2Coll(cmpFn, colloid, value, minvalue); /* - * Is the value smaller than the minval? If yes, we'll recurse to - * the left side of range array. + * If the value is smaller than the lower bound in the first range + * then it cannot possibly be in any of the ranges. */ - compar = FunctionCall2Coll(cmpFn, colloid, value, minvalue); - - /* smaller than the smallest value in this range */ if (DatumGetBool(compar)) - { - end = (midpoint - 1); continue; - } - /* - * Is the value greater than the minval? If yes, we'll recurse to - * the right side of range array. - */ compar = FunctionCall2Coll(cmpFn, colloid, maxvalue, value); - /* larger than the largest value in this range */ + /* + * Likewise, if the value is larger than the upper bound of the + * final range, then it cannot possibly be inside any of the + * ranges. + */ if (DatumGetBool(compar)) - { - start = (midpoint + 1); continue; - } - /* hey, we found a matching range */ - Assert(false); + /* bsearch the ranges to see if 'value' fits within any of them */ + start = 0; /* first range */ + end = ranges->nranges - 1; /* last range */ + while (true) + { + int midpoint = (start + end) / 2; + + /* this means we ran out of ranges in the last step */ + if (start > end) + break; + + /* copy the min/max values from the ranges */ + minvalue = ranges->values[2 * midpoint]; + maxvalue = ranges->values[2 * midpoint + 1]; + + /* + * Is the value smaller than the minval? If yes, we'll recurse + * to the left side of range array. + */ + compar = FunctionCall2Coll(cmpFn, colloid, value, minvalue); + + /* smaller than the smallest value in this range */ + if (DatumGetBool(compar)) + { + end = (midpoint - 1); + continue; + } + + /* + * Is the value greater than the minval? If yes, we'll recurse + * to the right side of range array. + */ + compar = FunctionCall2Coll(cmpFn, colloid, maxvalue, value); + + /* larger than the largest value in this range */ + if (DatumGetBool(compar)) + { + start = (midpoint + 1); + continue; + } + + /* hey, we found a matching range */ + Assert(false); + } } } - /* and values in the unsorted part must not be in sorted part */ - for (i = ranges->nsorted; i < ranges->nvalues; i++) + /* and values in the unsorted part must not be in the sorted part */ + if (ranges->nsorted > 0) { compare_context cxt; - Datum value = ranges->values[2 * ranges->nranges + i]; - - if (ranges->nsorted == 0) - break; cxt.colloid = ranges->colloid; cxt.cmpFn = ranges->cmp; - Assert(bsearch_arg(&value, &ranges->values[2 * ranges->nranges], - ranges->nsorted, sizeof(Datum), - compare_values, (void *) &cxt) == NULL); + for (i = ranges->nsorted; i < ranges->nvalues; i++) + { + Datum value = ranges->values[2 * ranges->nranges + i]; + + Assert(bsearch_arg(&value, &ranges->values[2 * ranges->nranges], + ranges->nsorted, sizeof(Datum), + compare_values, (void *) &cxt) == NULL); + } } #endif } @@ -923,8 +923,8 @@ has_matching_range(BrinDesc *bdesc, Oid colloid, Ranges *ranges, { Datum compar; - Datum minvalue = ranges->values[0]; - Datum maxvalue = ranges->values[2 * ranges->nranges - 1]; + Datum minvalue; + Datum maxvalue; FmgrInfo *cmpLessFn; FmgrInfo *cmpGreaterFn; @@ -936,6 +936,9 @@ has_matching_range(BrinDesc *bdesc, Oid colloid, Ranges *ranges, if (ranges->nranges == 0) return false; + minvalue = ranges->values[0]; + maxvalue = ranges->values[2 * ranges->nranges - 1]; + /* * Otherwise, need to compare the new value with boundaries of all the * ranges. First check if it's less than the absolute minimum, which is